refactor: track active request timeout per peer per block (#7899)

This commit is contained in:
Yat Ho
2025-12-10 12:42:49 +08:00
committed by GitHub
parent bf48eadaeb
commit a8e20bf358

View File

@@ -399,15 +399,20 @@ public:
update_active(); update_active();
} }
void maybe_cancel_block_request(tr_block_index_t block) override void cancel_block_request(tr_block_index_t block)
{
if (active_requests.test(block))
{ {
cancels_sent_to_peer.add(tr_time(), 1); cancels_sent_to_peer.add(tr_time(), 1);
active_requests.unset(block); active_requests.unset(block);
publish(tr_peer_event::SentCancel(tor_.block_info(), block)); publish(tr_peer_event::SentCancel(tor_.block_info(), block));
protocol_send_cancel(peer_request::from_block(tor_, block)); protocol_send_cancel(peer_request::from_block(tor_, block));
} }
void maybe_cancel_block_request(tr_block_index_t block) override
{
if (active_requests.test(block))
{
cancel_block_request(block);
}
} }
void set_choke(bool peer_is_choked) override void set_choke(bool peer_is_choked) override
@@ -465,11 +470,7 @@ public:
TR_ASSERT(client_is_interested()); TR_ASSERT(client_is_interested());
TR_ASSERT(!client_is_choked()); TR_ASSERT(!client_is_choked());
if (active_requests.has_none()) auto const timeout = tr_time() + RequestTimeoutSecs;
{
request_timeout_base_ = tr_time();
}
for (auto const *span = block_spans, *span_end = span + n_spans; span != span_end; ++span) for (auto const *span = block_spans, *span_end = span + n_spans; span != span_end; ++span)
{ {
auto const [block_begin, block_end] = *span; auto const [block_begin, block_end] = *span;
@@ -490,6 +491,8 @@ public:
protocol_send_request({ loc.piece, loc.piece_offset, req_len }); protocol_send_request({ loc.piece, loc.piece_offset, req_len });
offset += req_len; offset += req_len;
} }
request_timeouts_.emplace_back(block, timeout);
} }
active_requests.set_span(block_begin, block_end); active_requests.set_span(block_begin, block_end);
@@ -730,7 +733,7 @@ private:
time_t choke_changed_at_ = 0; time_t choke_changed_at_ = 0;
time_t request_timeout_base_ = {}; std::vector<std::pair<tr_block_index_t, time_t>> request_timeouts_;
tr_incoming incoming_ = {}; tr_incoming incoming_ = {};
@@ -749,7 +752,7 @@ private:
static auto constexpr SendPexInterval = 90s; static auto constexpr SendPexInterval = 90s;
// how many seconds we expect the next piece block to arrive // how many seconds we expect the next piece block to arrive
static auto constexpr RequestTimeoutSecs = time_t{ 15 }; static auto constexpr RequestTimeoutSecs = time_t{ 25 };
}; };
// --- // ---
@@ -1456,6 +1459,7 @@ ReadResult tr_peerMsgsImpl::process_peer_message(uint8_t id, MessageReader& payl
{ {
publish(tr_peer_event::GotChoke()); publish(tr_peer_event::GotChoke());
active_requests.set_has_none(); active_requests.set_has_none();
request_timeouts_.clear();
} }
update_active(TR_PEER_TO_CLIENT); update_active(TR_PEER_TO_CLIENT);
@@ -1756,7 +1760,6 @@ int tr_peerMsgsImpl::client_got_block(std::unique_ptr<Cache::BlockData> block_da
} }
active_requests.unset(block); active_requests.unset(block);
request_timeout_base_ = tr_time();
publish(tr_peer_event::GotBlock(tor_.block_info(), block)); publish(tr_peer_event::GotBlock(tor_.block_info(), block));
return 0; return 0;
@@ -1917,20 +1920,37 @@ void tr_peerMsgsImpl::maybe_send_block_requests()
} }
} }
void tr_peerMsgsImpl::check_request_timeout(time_t now) void tr_peerMsgsImpl::check_request_timeout(time_t const now)
{ {
if (active_requests.has_none() || now - request_timeout_base_ <= RequestTimeoutSecs) std::sort(std::begin(request_timeouts_), std::end(request_timeouts_));
for (auto it = std::begin(request_timeouts_); it != std::end(request_timeouts_);)
{ {
return; auto const [block, timeout] = *it;
if (auto const next = std::next(it); next != std::end(request_timeouts_) && block == next->first)
{
// A new request superseded this request, discard
it = request_timeouts_.erase(it);
continue;
} }
// If we didn't receive any piece data from this peer for a while, if (!active_requests.test(block))
// cancel all active requests so that we will send a new batch.
// If the peer still doesn't send anything to us, then it will
// naturally get weeded out by the peer mgr.
for (size_t block = 0; block < std::size(active_requests); ++block)
{ {
maybe_cancel_block_request(block); // request no longer active, discard
it = request_timeouts_.erase(it);
continue;
}
if (now >= timeout)
{
// request timed out, discard
cancel_block_request(block);
it = request_timeouts_.erase(it);
continue;
}
++it;
} }
} }