diff --git a/libtransmission/peer-mgr-wishlist.cc b/libtransmission/peer-mgr-wishlist.cc index 23b8d7e6a..9cbc8c934 100644 --- a/libtransmission/peer-mgr-wishlist.cc +++ b/libtransmission/peer-mgr-wishlist.cc @@ -131,12 +131,14 @@ std::vector makeSpans(tr_block_index_t const* sorted_blocks, si std::vector Wishlist::next(Wishlist::PeerInfo const& peer_info, size_t n_wanted_blocks) { + if (n_wanted_blocks == 0) + { + return {}; + } + size_t n_blocks = 0; auto spans = std::vector{}; - // sanity clause - TR_ASSERT(n_wanted_blocks > 0); - // We usually won't need all the candidates until endgame, so don't // waste cycles sorting all of them here. partial sort is enough. auto candidates = getCandidates(peer_info); diff --git a/libtransmission/web.cc b/libtransmission/web.cc index c65d7104e..ed2e50e90 100644 --- a/libtransmission/web.cc +++ b/libtransmission/web.cc @@ -582,10 +582,3 @@ long tr_webGetTaskResponseCode(struct tr_web_task* task) curl_easy_getinfo(task->curl_easy, CURLINFO_RESPONSE_CODE, &code); return code; } - -char const* tr_webGetTaskRealUrl(struct tr_web_task* task) -{ - char* url = nullptr; - curl_easy_getinfo(task->curl_easy, CURLINFO_EFFECTIVE_URL, &url); - return url; -} diff --git a/libtransmission/web.h b/libtransmission/web.h index 6e1ce3d91..c9c29120c 100644 --- a/libtransmission/web.h +++ b/libtransmission/web.h @@ -48,5 +48,3 @@ struct tr_web_task* tr_webRunWebseed( struct evbuffer* buffer); long tr_webGetTaskResponseCode(struct tr_web_task* task); - -char const* tr_webGetTaskRealUrl(struct tr_web_task* task); diff --git a/libtransmission/webseed.cc b/libtransmission/webseed.cc index bf7e15bcd..5087f52e1 100644 --- a/libtransmission/webseed.cc +++ b/libtransmission/webseed.cc @@ -7,7 +7,6 @@ #include #include #include -#include #include #include @@ -24,6 +23,8 @@ #include "web.h" #include "webseed.h" +using namespace std::literals; + namespace { @@ -47,14 +48,89 @@ struct tr_webseed_task auto constexpr TR_IDLE_TIMER_MSEC = 2000; -auto constexpr FAILURE_RETRY_INTERVAL = 150; - -auto constexpr MAX_CONSECUTIVE_FAILURES = 5; - -auto constexpr MAX_WEBSEED_CONNECTIONS = 4; - void webseed_timer_func(evutil_socket_t fd, short what, void* vw); +/** + * Manages how many web tasks should be running at a time. + * + * - When all is well, allow multiple tasks running in parallel. + * - If we get an error, throttle down to only one at a time + * until we get piece data. + * - If we have too many errors in a row, put the peer in timeout + * and don't allow _any_ connections for awhile. + */ +class ConnectionLimiter +{ +public: + void taskStarted() + { + ++n_tasks; + } + + void taskFinished(bool success) + { + if (!success) + { + taskFailed(); + } + + TR_ASSERT(n_tasks > 0); + --n_tasks; + } + + void gotData() + { + TR_ASSERT(n_tasks > 0); + n_consecutive_failures = 0; + paused_until = 0; + } + + [[nodiscard]] size_t slotsAvailable() const + { + if (isPaused()) + { + return 0; + } + + auto const max = maxConnections(); + if (n_tasks >= max) + { + return 0; + } + + return max - n_tasks; + } + +private: + [[nodiscard]] bool isPaused() const + { + return paused_until > tr_time(); + } + + [[nodiscard]] size_t maxConnections() const + { + return n_consecutive_failures > 0 ? 1 : MaxConnections; + } + + void taskFailed() + { + TR_ASSERT(n_tasks > 0); + + if (++n_consecutive_failures >= MaxConsecutiveFailures) + { + paused_until = tr_time() + TimeoutIntervalSecs; + } + } + + static time_t constexpr TimeoutIntervalSecs = 120; + static size_t constexpr MaxConnections = 4; + static size_t constexpr MaxConsecutiveFailures = MaxConnections; + + size_t n_tasks = 0; + size_t n_consecutive_failures = 0; + time_t paused_until = 0; +}; + struct tr_webseed : public tr_peer { public: @@ -71,8 +147,6 @@ public: have.setHasAll(); tr_peerUpdateProgress(tor, this); - file_urls.resize(tor->fileCount()); - tr_timerAddMsec(timer, TR_IDLE_TIMER_MSEC); } @@ -110,14 +184,9 @@ public: void* const callback_data; Bandwidth bandwidth; + ConnectionLimiter connection_limiter; std::set tasks; - struct event* const timer; - int consecutive_failures = 0; - int retry_tickcount = 0; - int retry_challenge = 0; - int idle_connections = 0; - int active_transfers = 0; - std::vector file_urls; + event* const timer; }; } // namespace @@ -243,43 +312,6 @@ static void write_block_func(void* vdata) **** ***/ -struct connection_succeeded_data -{ - tr_webseed* webseed = nullptr; - std::string real_url; - tr_piece_index_t piece_index = 0; - uint32_t piece_offset = 0; -}; - -static void connection_succeeded(void* vdata) -{ - auto* data = static_cast(vdata); - struct tr_webseed* w = data->webseed; - - if (++w->active_transfers >= w->retry_challenge && w->retry_challenge != 0) - { - /* the server seems to be accepting more connections now */ - w->consecutive_failures = w->retry_tickcount = w->retry_challenge = 0; - } - - if (!std::empty(data->real_url)) - { - tr_torrent const* const tor = tr_torrentFindFromId(w->session, w->torrent_id); - - if (tor != nullptr) - { - auto const file_index = tor->fileOffset(data->piece_index, data->piece_offset).index; - w->file_urls[file_index].assign(data->real_url); - } - } - - delete data; -} - -/*** -**** -***/ - static void on_content_changed(struct evbuffer* buf, struct evbuffer_cb_info const* info, void* vtask) { size_t const n_added = info->n_added; @@ -289,7 +321,7 @@ static void on_content_changed(struct evbuffer* buf, struct evbuffer_cb_info con if (!task->dead && n_added > 0) { - struct tr_webseed* w = task->webseed; + auto* const w = task->webseed; w->bandwidth.notifyBandwidthConsumed(TR_DOWN, n_added, true, tr_time_msec()); fire_client_got_piece_data(w, n_added); @@ -301,17 +333,7 @@ static void on_content_changed(struct evbuffer* buf, struct evbuffer_cb_info con if (task->response_code == 206) { - auto const* real_url = tr_webGetTaskRealUrl(task->web_task); - - /* processing this uses a tr_torrent pointer, - so push the work to the libevent thread... */ - tr_runInEventThread( - session, - connection_succeeded, - new connection_succeeded_data{ w, - real_url != nullptr ? real_url : "", - task->piece_index, - task->piece_offset + task->blocks_done * task->block_size + len - 1 }); + task->webseed->connection_limiter.gotData(); } } @@ -346,59 +368,33 @@ static void task_request_next_chunk(struct tr_webseed_task* task); static void on_idle(tr_webseed* w) { - auto want = int{}; - int const running_tasks = std::size(w->tasks); - tr_torrent* tor = tr_torrentFindFromId(w->session, w->torrent_id); - - if (w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES) + auto* const tor = tr_torrentFindFromId(w->session, w->torrent_id); + if (tor == nullptr || !tor->isRunning || tor->isDone() || w->connection_limiter.slotsAvailable() < 1) { - want = w->idle_connections; - - if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL) - { - /* some time has passed since our connection attempts failed. try again */ - ++want; - /* if this challenge is fulfilled we will reset consecutive_failures */ - w->retry_challenge = running_tasks + want; - } - } - else - { - want = MAX_WEBSEED_CONNECTIONS - running_tasks; - w->retry_challenge = running_tasks + w->idle_connections + 1; + return; } - if (tor != nullptr && tor->isRunning && !tor->isDone() && want > 0) + for (auto const span : tr_peerMgrGetNextRequests(tor, w, 1)) { - auto n_tasks = size_t{}; + w->connection_limiter.taskStarted(); - for (auto const span : tr_peerMgrGetNextRequests(tor, w, want)) - { - auto const [begin, end] = span; - auto* const task = tr_new0(tr_webseed_task, 1); - task->session = tor->session; - task->webseed = w; - task->block = begin; - task->piece_index = tor->pieceForBlock(begin); - task->piece_offset = tor->blockSize() * begin - tor->pieceSize() * task->piece_index; - task->length = (end - 1 - begin) * tor->blockSize() + tor->blockSize(end - 1); - task->blocks_done = 0; - task->response_code = 0; - task->block_size = tor->blockSize(); - task->content = evbuffer_new(); - evbuffer_add_cb(task->content, on_content_changed, task); - w->tasks.insert(task); - task_request_next_chunk(task); + auto const [begin, end] = span; + auto* const task = tr_new0(tr_webseed_task, 1); + task->session = tor->session; + task->webseed = w; + task->block = begin; + task->piece_index = tor->pieceForBlock(begin); + task->piece_offset = tor->blockSize() * begin - tor->pieceSize() * task->piece_index; + task->length = (end - 1 - begin) * tor->blockSize() + tor->blockSize(end - 1); + task->blocks_done = 0; + task->response_code = 0; + task->block_size = tor->blockSize(); + task->content = evbuffer_new(); + evbuffer_add_cb(task->content, on_content_changed, task); + w->tasks.insert(task); + task_request_next_chunk(task); - --w->idle_connections; - ++n_tasks; - tr_peerMgrClientSentRequests(tor, w, span); - } - - if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL && n_tasks > 0) - { - w->retry_tickcount = 0; - } + tr_peerMgrClientSentRequests(tor, w, span); } } @@ -412,6 +408,9 @@ static void web_response_func( { auto* t = static_cast(vtask); bool const success = response_code == 206; + tr_webseed* w = t->webseed; + + w->connection_limiter.taskFinished(success); if (t->dead) { @@ -420,17 +419,10 @@ static void web_response_func( return; } - tr_webseed* w = t->webseed; tr_torrent* tor = tr_torrentFindFromId(session, w->torrent_id); if (tor != nullptr) { - /* active_transfers was only increased if the connection was successful */ - if (t->response_code == 206) - { - --w->active_transfers; - } - if (!success) { tr_block_index_t const blocks_remain = (t->length + tor->blockSize() - 1) / tor->blockSize() - t->blocks_done; @@ -440,16 +432,6 @@ static void web_response_func( fire_client_got_rejs(tor, w, t->block + t->blocks_done, blocks_remain); } - if (t->blocks_done != 0) - { - ++w->idle_connections; - } - else if (++w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES && w->retry_tickcount == 0) - { - /* now wait a while until retrying to establish a connection */ - ++w->retry_tickcount; - } - w->tasks.erase(t); evbuffer_free(t->content); tr_free(t); @@ -477,8 +459,6 @@ static void web_response_func( fire_client_got_blocks(tor, t->webseed, t->block + t->blocks_done, 1); } - ++w->idle_connections; - w->tasks.erase(t); evbuffer_free(t->content); tr_free(t); @@ -491,18 +471,13 @@ static void web_response_func( static std::string make_url(tr_webseed* w, std::string_view name) { - struct evbuffer* buf = evbuffer_new(); + auto url = w->base_url; - evbuffer_add(buf, std::data(w->base_url), std::size(w->base_url)); - - /* if url ends with a '/', add the torrent name */ - if (*std::rbegin(w->base_url) == '/' && !std::empty(name)) + if (tr_strvEndsWith(url, "/"sv) && !std::empty(name)) { - tr_http_escape(buf, name, false); + tr_http_escape(url, name, false); } - auto url = std::string{ (char const*)evbuffer_pullup(buf, -1), evbuffer_get_length(buf) }; - evbuffer_free(buf); return url; } @@ -513,8 +488,6 @@ static void task_request_next_chunk(struct tr_webseed_task* t) if (tor != nullptr) { - auto& urls = t->webseed->file_urls; - auto const piece_size = tor->pieceSize(); uint64_t const remain = t->length - t->blocks_done * tor->blockSize() - evbuffer_get_length(t->content); @@ -525,15 +498,12 @@ static void task_request_next_chunk(struct tr_webseed_task* t) auto const [file_index, file_offset] = tor->fileOffset(step_piece, step_piece_offset); uint64_t this_pass = std::min(remain, tor->fileSize(file_index) - file_offset); - if (std::empty(urls[file_index])) - { - urls[file_index] = make_url(t->webseed, tor->fileSubpath(file_index)); - } + auto const url = make_url(t->webseed, tor->fileSubpath(file_index)); char range[64]; tr_snprintf(range, sizeof(range), "%" PRIu64 "-%" PRIu64, file_offset, file_offset + this_pass - 1); - t->web_task = tr_webRunWebseed(tor, urls[file_index].c_str(), range, web_response_func, t, t->content); + t->web_task = tr_webRunWebseed(tor, url.c_str(), range, web_response_func, t, t->content); } } @@ -547,14 +517,7 @@ namespace void webseed_timer_func(evutil_socket_t /*fd*/, short /*what*/, void* vw) { auto* w = static_cast(vw); - - if (w->retry_tickcount != 0) - { - ++w->retry_tickcount; - } - on_idle(w); - tr_timerAddMsec(w->timer, TR_IDLE_TIMER_MSEC); }