refactor: tr_webseed simplification (#2613)

* refactor: remove effective-url caching in webseeds

The upcoming CURLSH refactor is a more effective way of doing this and
the current appoach makes tr_web harder to refactor, so remove it.

* refactor: webseed rate limiting

The current code that limits the number of parallel fetches is overly
complicated and is also interwoven into the tr_webseed class. Extract
it into a new private helper class "ConnectionLimiter" with a simpler
public API for tr_webseed to use.
This commit is contained in:
Charles Kerr
2022-02-12 16:52:40 -06:00
committed by GitHub
parent cddd400f7e
commit dfe79af34c
4 changed files with 121 additions and 165 deletions

View File

@@ -131,12 +131,14 @@ std::vector<tr_block_span_t> makeSpans(tr_block_index_t const* sorted_blocks, si
std::vector<tr_block_span_t> Wishlist::next(Wishlist::PeerInfo const& peer_info, size_t n_wanted_blocks)
{
if (n_wanted_blocks == 0)
{
return {};
}
size_t n_blocks = 0;
auto spans = std::vector<tr_block_span_t>{};
// sanity clause
TR_ASSERT(n_wanted_blocks > 0);
// We usually won't need all the candidates until endgame, so don't
// waste cycles sorting all of them here. partial sort is enough.
auto candidates = getCandidates(peer_info);

View File

@@ -582,10 +582,3 @@ long tr_webGetTaskResponseCode(struct tr_web_task* task)
curl_easy_getinfo(task->curl_easy, CURLINFO_RESPONSE_CODE, &code);
return code;
}
char const* tr_webGetTaskRealUrl(struct tr_web_task* task)
{
char* url = nullptr;
curl_easy_getinfo(task->curl_easy, CURLINFO_EFFECTIVE_URL, &url);
return url;
}

View File

@@ -48,5 +48,3 @@ struct tr_web_task* tr_webRunWebseed(
struct evbuffer* buffer);
long tr_webGetTaskResponseCode(struct tr_web_task* task);
char const* tr_webGetTaskRealUrl(struct tr_web_task* task);

View File

@@ -7,7 +7,6 @@
#include <set>
#include <string>
#include <string_view>
#include <vector>
#include <event2/buffer.h>
#include <event2/event.h>
@@ -24,6 +23,8 @@
#include "web.h"
#include "webseed.h"
using namespace std::literals;
namespace
{
@@ -47,14 +48,89 @@ struct tr_webseed_task
auto constexpr TR_IDLE_TIMER_MSEC = 2000;
auto constexpr FAILURE_RETRY_INTERVAL = 150;
auto constexpr MAX_CONSECUTIVE_FAILURES = 5;
auto constexpr MAX_WEBSEED_CONNECTIONS = 4;
void webseed_timer_func(evutil_socket_t fd, short what, void* vw);
/**
* Manages how many web tasks should be running at a time.
*
* - When all is well, allow multiple tasks running in parallel.
* - If we get an error, throttle down to only one at a time
* until we get piece data.
* - If we have too many errors in a row, put the peer in timeout
* and don't allow _any_ connections for awhile.
*/
class ConnectionLimiter
{
public:
void taskStarted()
{
++n_tasks;
}
void taskFinished(bool success)
{
if (!success)
{
taskFailed();
}
TR_ASSERT(n_tasks > 0);
--n_tasks;
}
void gotData()
{
TR_ASSERT(n_tasks > 0);
n_consecutive_failures = 0;
paused_until = 0;
}
[[nodiscard]] size_t slotsAvailable() const
{
if (isPaused())
{
return 0;
}
auto const max = maxConnections();
if (n_tasks >= max)
{
return 0;
}
return max - n_tasks;
}
private:
[[nodiscard]] bool isPaused() const
{
return paused_until > tr_time();
}
[[nodiscard]] size_t maxConnections() const
{
return n_consecutive_failures > 0 ? 1 : MaxConnections;
}
void taskFailed()
{
TR_ASSERT(n_tasks > 0);
if (++n_consecutive_failures >= MaxConsecutiveFailures)
{
paused_until = tr_time() + TimeoutIntervalSecs;
}
}
static time_t constexpr TimeoutIntervalSecs = 120;
static size_t constexpr MaxConnections = 4;
static size_t constexpr MaxConsecutiveFailures = MaxConnections;
size_t n_tasks = 0;
size_t n_consecutive_failures = 0;
time_t paused_until = 0;
};
struct tr_webseed : public tr_peer
{
public:
@@ -71,8 +147,6 @@ public:
have.setHasAll();
tr_peerUpdateProgress(tor, this);
file_urls.resize(tor->fileCount());
tr_timerAddMsec(timer, TR_IDLE_TIMER_MSEC);
}
@@ -110,14 +184,9 @@ public:
void* const callback_data;
Bandwidth bandwidth;
ConnectionLimiter connection_limiter;
std::set<tr_webseed_task*> tasks;
struct event* const timer;
int consecutive_failures = 0;
int retry_tickcount = 0;
int retry_challenge = 0;
int idle_connections = 0;
int active_transfers = 0;
std::vector<std::string> file_urls;
event* const timer;
};
} // namespace
@@ -243,43 +312,6 @@ static void write_block_func(void* vdata)
****
***/
struct connection_succeeded_data
{
tr_webseed* webseed = nullptr;
std::string real_url;
tr_piece_index_t piece_index = 0;
uint32_t piece_offset = 0;
};
static void connection_succeeded(void* vdata)
{
auto* data = static_cast<struct connection_succeeded_data*>(vdata);
struct tr_webseed* w = data->webseed;
if (++w->active_transfers >= w->retry_challenge && w->retry_challenge != 0)
{
/* the server seems to be accepting more connections now */
w->consecutive_failures = w->retry_tickcount = w->retry_challenge = 0;
}
if (!std::empty(data->real_url))
{
tr_torrent const* const tor = tr_torrentFindFromId(w->session, w->torrent_id);
if (tor != nullptr)
{
auto const file_index = tor->fileOffset(data->piece_index, data->piece_offset).index;
w->file_urls[file_index].assign(data->real_url);
}
}
delete data;
}
/***
****
***/
static void on_content_changed(struct evbuffer* buf, struct evbuffer_cb_info const* info, void* vtask)
{
size_t const n_added = info->n_added;
@@ -289,7 +321,7 @@ static void on_content_changed(struct evbuffer* buf, struct evbuffer_cb_info con
if (!task->dead && n_added > 0)
{
struct tr_webseed* w = task->webseed;
auto* const w = task->webseed;
w->bandwidth.notifyBandwidthConsumed(TR_DOWN, n_added, true, tr_time_msec());
fire_client_got_piece_data(w, n_added);
@@ -301,17 +333,7 @@ static void on_content_changed(struct evbuffer* buf, struct evbuffer_cb_info con
if (task->response_code == 206)
{
auto const* real_url = tr_webGetTaskRealUrl(task->web_task);
/* processing this uses a tr_torrent pointer,
so push the work to the libevent thread... */
tr_runInEventThread(
session,
connection_succeeded,
new connection_succeeded_data{ w,
real_url != nullptr ? real_url : "",
task->piece_index,
task->piece_offset + task->blocks_done * task->block_size + len - 1 });
task->webseed->connection_limiter.gotData();
}
}
@@ -346,34 +368,16 @@ static void task_request_next_chunk(struct tr_webseed_task* task);
static void on_idle(tr_webseed* w)
{
auto want = int{};
int const running_tasks = std::size(w->tasks);
tr_torrent* tor = tr_torrentFindFromId(w->session, w->torrent_id);
if (w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES)
auto* const tor = tr_torrentFindFromId(w->session, w->torrent_id);
if (tor == nullptr || !tor->isRunning || tor->isDone() || w->connection_limiter.slotsAvailable() < 1)
{
want = w->idle_connections;
if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL)
{
/* some time has passed since our connection attempts failed. try again */
++want;
/* if this challenge is fulfilled we will reset consecutive_failures */
w->retry_challenge = running_tasks + want;
}
}
else
{
want = MAX_WEBSEED_CONNECTIONS - running_tasks;
w->retry_challenge = running_tasks + w->idle_connections + 1;
return;
}
if (tor != nullptr && tor->isRunning && !tor->isDone() && want > 0)
for (auto const span : tr_peerMgrGetNextRequests(tor, w, 1))
{
auto n_tasks = size_t{};
w->connection_limiter.taskStarted();
for (auto const span : tr_peerMgrGetNextRequests(tor, w, want))
{
auto const [begin, end] = span;
auto* const task = tr_new0(tr_webseed_task, 1);
task->session = tor->session;
@@ -390,16 +394,8 @@ static void on_idle(tr_webseed* w)
w->tasks.insert(task);
task_request_next_chunk(task);
--w->idle_connections;
++n_tasks;
tr_peerMgrClientSentRequests(tor, w, span);
}
if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL && n_tasks > 0)
{
w->retry_tickcount = 0;
}
}
}
static void web_response_func(
@@ -412,6 +408,9 @@ static void web_response_func(
{
auto* t = static_cast<struct tr_webseed_task*>(vtask);
bool const success = response_code == 206;
tr_webseed* w = t->webseed;
w->connection_limiter.taskFinished(success);
if (t->dead)
{
@@ -420,17 +419,10 @@ static void web_response_func(
return;
}
tr_webseed* w = t->webseed;
tr_torrent* tor = tr_torrentFindFromId(session, w->torrent_id);
if (tor != nullptr)
{
/* active_transfers was only increased if the connection was successful */
if (t->response_code == 206)
{
--w->active_transfers;
}
if (!success)
{
tr_block_index_t const blocks_remain = (t->length + tor->blockSize() - 1) / tor->blockSize() - t->blocks_done;
@@ -440,16 +432,6 @@ static void web_response_func(
fire_client_got_rejs(tor, w, t->block + t->blocks_done, blocks_remain);
}
if (t->blocks_done != 0)
{
++w->idle_connections;
}
else if (++w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES && w->retry_tickcount == 0)
{
/* now wait a while until retrying to establish a connection */
++w->retry_tickcount;
}
w->tasks.erase(t);
evbuffer_free(t->content);
tr_free(t);
@@ -477,8 +459,6 @@ static void web_response_func(
fire_client_got_blocks(tor, t->webseed, t->block + t->blocks_done, 1);
}
++w->idle_connections;
w->tasks.erase(t);
evbuffer_free(t->content);
tr_free(t);
@@ -491,18 +471,13 @@ static void web_response_func(
static std::string make_url(tr_webseed* w, std::string_view name)
{
struct evbuffer* buf = evbuffer_new();
auto url = w->base_url;
evbuffer_add(buf, std::data(w->base_url), std::size(w->base_url));
/* if url ends with a '/', add the torrent name */
if (*std::rbegin(w->base_url) == '/' && !std::empty(name))
if (tr_strvEndsWith(url, "/"sv) && !std::empty(name))
{
tr_http_escape(buf, name, false);
tr_http_escape(url, name, false);
}
auto url = std::string{ (char const*)evbuffer_pullup(buf, -1), evbuffer_get_length(buf) };
evbuffer_free(buf);
return url;
}
@@ -513,8 +488,6 @@ static void task_request_next_chunk(struct tr_webseed_task* t)
if (tor != nullptr)
{
auto& urls = t->webseed->file_urls;
auto const piece_size = tor->pieceSize();
uint64_t const remain = t->length - t->blocks_done * tor->blockSize() - evbuffer_get_length(t->content);
@@ -525,15 +498,12 @@ static void task_request_next_chunk(struct tr_webseed_task* t)
auto const [file_index, file_offset] = tor->fileOffset(step_piece, step_piece_offset);
uint64_t this_pass = std::min(remain, tor->fileSize(file_index) - file_offset);
if (std::empty(urls[file_index]))
{
urls[file_index] = make_url(t->webseed, tor->fileSubpath(file_index));
}
auto const url = make_url(t->webseed, tor->fileSubpath(file_index));
char range[64];
tr_snprintf(range, sizeof(range), "%" PRIu64 "-%" PRIu64, file_offset, file_offset + this_pass - 1);
t->web_task = tr_webRunWebseed(tor, urls[file_index].c_str(), range, web_response_func, t, t->content);
t->web_task = tr_webRunWebseed(tor, url.c_str(), range, web_response_func, t, t->content);
}
}
@@ -547,14 +517,7 @@ namespace
void webseed_timer_func(evutil_socket_t /*fd*/, short /*what*/, void* vw)
{
auto* w = static_cast<tr_webseed*>(vw);
if (w->retry_tickcount != 0)
{
++w->retry_tickcount;
}
on_idle(w);
tr_timerAddMsec(w->timer, TR_IDLE_TIMER_MSEC);
}