feat: split connectable and non-connectable pool (#5801)

This commit is contained in:
tearfur
2023-08-01 22:56:26 +08:00
committed by GitHub
parent 212bf69bb3
commit a2849219f7
12 changed files with 527 additions and 242 deletions

View File

@@ -69,7 +69,7 @@ namespace global_source_ip_helpers
auto const save = errno;
auto const [dst_ss, dst_sslen] = dst_addr.to_sockaddr(dst_port);
auto const [bind_ss, bind_sslen] = bind_addr.to_sockaddr(tr_port::fromHost(0));
auto const [bind_ss, bind_sslen] = bind_addr.to_sockaddr(tr_port{});
if (auto const sock = socket(dst_ss.ss_family, SOCK_DGRAM, 0); sock != TR_BAD_SOCKET)
{
if (bind(sock, reinterpret_cast<sockaddr const*>(&bind_ss), bind_sslen) == 0)

View File

@@ -389,12 +389,12 @@ public:
std::size_t operator()(tr_socket_address const& socket_address) const noexcept
{
auto const& [addr, port] = socket_address;
return hash_combine(ip_hash(addr), port_hash(port));
return hash_combine(ip_hash(addr), PortHasher(port.host()));
}
private:
// https://stackoverflow.com/a/27952689/11390656
[[nodiscard]] static constexpr std::size_t hash_combine(std::size_t const& a, std::size_t const& b)
[[nodiscard]] static constexpr std::size_t hash_combine(std::size_t const a, std::size_t const b)
{
return a ^ (b + 0x9e3779b9U + (a << 6U) + (a >> 2U));
}
@@ -414,11 +414,6 @@ private:
}
}
[[nodiscard]] static std::size_t port_hash(tr_port const& port) noexcept
{
return PortHasher(port.host());
}
constexpr static std::hash<uint32_t> IPv4Hasher{};
constexpr static std::hash<std::string_view> IPv6Hasher{};
constexpr static std::hash<uint16_t> PortHasher{};

View File

@@ -14,12 +14,12 @@
#include <cstdint> // uint8_t, uint32_t, uint64_t
#include <string>
#include "transmission.h"
#include "libtransmission/transmission.h"
#include "bitfield.h"
#include "block-info.h"
#include "history.h"
#include "net.h" // tr_port
#include "libtransmission/bitfield.h"
#include "libtransmission/block-info.h"
#include "libtransmission/history.h"
#include "libtransmission/net.h" // tr_port
/**
* @addtogroup peers Peers
@@ -28,7 +28,6 @@
class tr_peer;
class tr_swarm;
class tr_peer_info;
struct tr_bandwidth;
// --- Peer Publish / Subscribe
@@ -38,19 +37,20 @@ class tr_peer_event
public:
enum class Type
{
ClientGotBlock,
// Unless otherwise specified, all events are for BT peers only
ClientGotBlock, // applies to webseed too
ClientGotChoke,
ClientGotPieceData,
ClientGotPieceData, // applies to webseed too
ClientGotAllowedFast,
ClientGotSuggest,
ClientGotPort,
ClientGotRej,
ClientGotRej, // applies to webseed too
ClientGotBitfield,
ClientGotHave,
ClientGotHaveAll,
ClientGotHaveNone,
ClientSentPieceData,
Error
Error // generic
};
Type type = Type::Error;
@@ -170,7 +170,7 @@ public:
}
};
using tr_peer_callback = void (*)(tr_peer* peer, tr_peer_event const& event, void* client_data);
using tr_peer_callback_generic = void (*)(tr_peer* peer, tr_peer_event const& event, void* client_data);
/**
* State information about a connected peer.
@@ -181,7 +181,7 @@ using tr_peer_callback = void (*)(tr_peer* peer, tr_peer_event const& event, voi
class tr_peer
{
public:
tr_peer(tr_torrent const* tor, tr_peer_info* const peer_info = nullptr);
tr_peer(tr_torrent const* tor);
virtual ~tr_peer();
virtual bool isTransferringPieces(uint64_t now, tr_direction dir, tr_bytes_per_second_t* setme_bytes_per_second) const = 0;
@@ -241,9 +241,6 @@ public:
/// The following fields are only to be used in peer-mgr.cc.
/// TODO(ckerr): refactor them out of `tr_peer`
// hook to private peer-mgr information
tr_peer_info* const peer_info;
// whether or not this peer sent us any given block
tr_bitfield blame;

View File

@@ -265,7 +265,7 @@ public:
[[nodiscard]] constexpr auto const& socket_address() const noexcept
{
return socket_.socketAddress();
return socket_.socket_address();
}
[[nodiscard]] auto display_name() const

View File

@@ -12,7 +12,6 @@
#include <cstdint>
#include <ctime> // time_t
#include <iterator> // std::back_inserter
#include <map>
#include <optional>
#include <tuple> // std::tie
#include <unordered_map>
@@ -58,6 +57,9 @@ static auto constexpr CancelHistorySec = int{ 60 };
// ---
namespace
{
class HandshakeMediator final : public tr_handshake::Mediator
{
private:
@@ -120,6 +122,10 @@ private:
tr_session& session_;
};
using Handshakes = std::unordered_map<tr_socket_address, tr_handshake>;
} // anonymous namespace
bool tr_peer_info::is_blocklisted(tr_session const* session) const
{
if (blocklisted_)
@@ -127,20 +133,55 @@ bool tr_peer_info::is_blocklisted(tr_session const* session) const
return *blocklisted_;
}
auto const value = session->addressIsBlocked(addr());
auto const value = session->addressIsBlocked(listen_address());
blocklisted_ = value;
return value;
}
using Handshakes = std::unordered_map<tr_socket_address, tr_handshake>;
#define tr_logAddDebugSwarm(swarm, msg) tr_logAddDebugTor((swarm)->tor, msg)
#define tr_logAddTraceSwarm(swarm, msg) tr_logAddTraceTor((swarm)->tor, msg)
namespace
{
/* better goes first */
constexpr struct
{
[[nodiscard]] constexpr static int compare(tr_peer_info const& a, tr_peer_info const& b) noexcept // <=>
{
if (auto const val = a.compare_by_piece_data_time(b); val != 0)
{
return -val;
}
if (auto const val = tr_compare_3way(a.from_best(), b.from_best()); val != 0)
{
return val;
}
return a.compare_by_failure_count(b);
}
[[nodiscard]] constexpr bool operator()(tr_peer_info const& a, tr_peer_info const& b) const noexcept
{
return compare(a, b) < 0;
}
[[nodiscard]] constexpr bool operator()(tr_peer_info const* a, tr_peer_info const* b) const noexcept
{
return compare(*a, *b) < 0;
}
} CompareAtomsByUsefulness{};
} // namespace
/** @brief Opaque, per-torrent data structure for peer connection information */
class tr_swarm
{
public:
using Peers = std::vector<tr_peerMsgs*>;
using Pool = std::unordered_map<tr_socket_address, tr_peer_info>;
[[nodiscard]] auto unique_lock() const
{
return tor->unique_lock();
@@ -222,37 +263,48 @@ public:
auto const lock = unique_lock();
is_running = false;
removeAllPeers();
remove_all_peers();
outgoing_handshakes.clear();
}
void removePeer(tr_peer* peer)
void remove_peer(tr_peerMsgs* peer)
{
auto const lock = unique_lock();
auto* const peer_info = peer->peer_info;
auto const socket_address = peer->socket_address();
auto const was_incoming = peer->is_incoming_connection();
TR_ASSERT(peer_info != nullptr);
if (auto iter = std::find(std::begin(peers), std::end(peers), peer); iter != std::end(peers))
{
peers.erase(iter);
}
--stats.peer_count;
--stats.peer_from_count[peer_info->from_first()];
TR_ASSERT(stats.peer_count == peerCount());
if (auto iter = std::find(std::begin(peers), std::end(peers), peer); iter != std::end(peers))
{
peers.erase(iter);
TR_ASSERT(stats.peer_count == peerCount());
}
delete peer;
if (was_incoming)
{
[[maybe_unused]] auto const port_empty = std::empty(peer_info->listen_port());
if (incoming_pool.erase(socket_address) != 0U)
{
TR_ASSERT(port_empty);
}
}
graveyard_pool.erase(socket_address);
}
void removeAllPeers()
void remove_all_peers()
{
auto tmp = peers;
auto tmp = Peers{};
std::swap(tmp, peers);
for (auto* peer : tmp)
{
removePeer(peer);
remove_peer(peer);
}
TR_ASSERT(stats.peer_count == 0);
@@ -270,7 +322,7 @@ public:
return is_endgame_;
}
void addStrike(tr_peer* peer) const
void addStrike(tr_peerMsgs* peer) const
{
tr_logAddTraceSwarm(
this,
@@ -292,20 +344,20 @@ public:
webseeds.reserve(n);
for (size_t i = 0; i < n; ++i)
{
webseeds.emplace_back(tr_webseedNew(tor, tor->webseed(i), &tr_swarm::peerCallbackFunc, this));
webseeds.emplace_back(tr_webseedNew(tor, tor->webseed(i), &tr_swarm::peer_callback_webseed, this));
}
webseeds.shrink_to_fit();
stats.active_webseed_count = 0;
}
[[nodiscard]] TR_CONSTEXPR20 auto isAllSeeds() const noexcept
[[nodiscard]] TR_CONSTEXPR20 auto is_all_seeds() const noexcept
{
if (!pool_is_all_seeds_)
{
pool_is_all_seeds_ = std::all_of(
std::begin(pool),
std::end(pool),
std::begin(connectable_pool),
std::end(connectable_pool),
[](auto const& key_val) { return key_val.second.is_seed(); });
}
@@ -314,17 +366,22 @@ public:
[[nodiscard]] tr_peer_info* get_existing_peer_info(tr_socket_address const& socket_address) noexcept
{
auto&& it = pool.find(socket_address);
return it != pool.end() ? &it->second : nullptr;
auto&& it = connectable_pool.find(socket_address);
return it != connectable_pool.end() ? &it->second : nullptr;
}
tr_peer_info& ensure_info_exists(tr_socket_address const& socket_address, uint8_t const flags, tr_peer_from const from)
tr_peer_info& ensure_info_exists(
tr_socket_address const& socket_address,
uint8_t const flags,
tr_peer_from const from,
bool is_connectable)
{
TR_ASSERT(socket_address.is_valid());
TR_ASSERT(from < TR_PEER_FROM__MAX);
auto&& [info_it, is_new] = pool.try_emplace(socket_address, socket_address, flags, from);
auto& peer_info = info_it->second;
auto&& [it, is_new] = is_connectable ? connectable_pool.try_emplace(socket_address, socket_address, flags, from) :
incoming_pool.try_emplace(socket_address, socket_address.address(), flags, from);
auto& peer_info = it->second;
if (!is_new)
{
peer_info.found_at(from);
@@ -343,12 +400,31 @@ public:
mark_all_seeds_flag_dirty();
}
static void peerCallbackFunc(tr_peer* peer, tr_peer_event const& event, void* vs)
static void peer_callback_webseed(tr_peer* const peer, tr_peer_event const& event, void* const vs)
{
TR_ASSERT(peer != nullptr);
auto* s = static_cast<tr_swarm*>(vs);
auto const lock = s->unique_lock();
switch (event.type)
{
case tr_peer_event::Type::ClientGotPieceData:
on_client_got_piece_data(s->tor, event.length, tr_time());
break;
default:
peer_callback_common(peer, event, s);
break;
}
}
static void peer_callback_bt(tr_peerMsgs* const msgs, tr_peer_event const& event, void* const vs)
{
TR_ASSERT(msgs != nullptr);
auto* s = static_cast<tr_swarm*>(vs);
TR_ASSERT(msgs->swarm == s);
auto const lock = s->unique_lock();
switch (event.type)
{
case tr_peer_event::Type::ClientSentPieceData:
@@ -362,32 +438,20 @@ public:
tor->set_dirty();
tor->session->add_uploaded(event.length);
// this should always be a tr_peerMsgs since it's
// impossible to upload piece data to a webseed...
TR_ASSERT(peer->peer_info != nullptr);
peer->peer_info->set_latest_piece_data_time(now);
break;
msgs->peer_info->set_latest_piece_data_time(now);
}
break;
case tr_peer_event::Type::ClientGotPieceData:
{
auto const now = tr_time();
auto* const tor = s->tor;
tor->downloadedCur += event.length;
tor->set_date_active(now);
tor->set_dirty();
tor->session->add_downloaded(event.length);
if (auto* const info = peer->peer_info; info != nullptr)
{
info->set_latest_piece_data_time(now);
}
break;
on_client_got_piece_data(s->tor, event.length, now);
msgs->peer_info->set_latest_piece_data_time(now);
}
break;
case tr_peer_event::Type::ClientGotHave:
case tr_peer_event::Type::ClientGotHaveAll:
case tr_peer_event::Type::ClientGotHaveNone:
@@ -396,18 +460,24 @@ public:
/* noop */
break;
case tr_peer_event::Type::ClientGotRej:
s->active_requests.remove(s->tor->piece_loc(event.pieceIndex, event.offset).block, peer);
break;
case tr_peer_event::Type::ClientGotChoke:
s->active_requests.remove(peer);
s->active_requests.remove(msgs);
break;
case tr_peer_event::Type::ClientGotPort:
if (auto* const info = peer->peer_info; info != nullptr)
if (std::empty(event.port))
{
info->port() = event.port;
// Do nothing
}
// If we don't know the listening port of this peer (i.e. incoming connection and first time ClientGotPort)
else if (auto const& info = *msgs->peer_info; std::empty(info.listen_port()))
{
s->on_got_port(msgs, event, false);
}
// If we got a new listening port from a known connectable peer
else if (info.listen_port() != event.port)
{
s->on_got_port(msgs, event, true);
}
break;
@@ -417,32 +487,8 @@ public:
// not currently supported
break;
case tr_peer_event::Type::ClientGotBlock:
{
auto* const tor = s->tor;
auto const loc = tor->piece_loc(event.pieceIndex, event.offset);
s->cancelAllRequestsForBlock(loc.block, peer);
peer->blocks_sent_to_client.add(tr_time(), 1);
tr_torrentGotBlock(tor, loc.block);
break;
}
case tr_peer_event::Type::Error:
if (event.err == ERANGE || event.err == EMSGSIZE || event.err == ENOTCONN)
{
/* some protocol error from the peer */
peer->do_purge = true;
tr_logAddDebugSwarm(
s,
fmt::format(
"setting {} do_purge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
peer->display_name()));
}
else
{
tr_logAddDebugSwarm(s, fmt::format("unhandled error: {}", tr_strerror(event.err)));
}
default:
peer_callback_common(msgs, event, s);
break;
}
}
@@ -465,11 +511,13 @@ public:
std::vector<std::unique_ptr<tr_peer>> webseeds;
// depends-on: active_requests
std::vector<tr_peerMsgs*> peers;
Peers peers;
// tr_peers hold pointers to the items in this container,
// tr_peerMsgs hold pointers to the items in these containers,
// therefore references to elements within cannot invalidate
std::unordered_map<tr_socket_address, tr_peer_info> pool;
Pool incoming_pool;
Pool connectable_pool;
Pool graveyard_pool;
tr_peerMsgs* optimistic = nullptr; /* the optimistic peer, or nullptr if none */
@@ -508,7 +556,7 @@ private:
{
auto const lock = tor->unique_lock();
for (auto& [socket_address, atom] : pool)
for (auto& [socket_address, atom] : connectable_pool)
{
mark_peer_as_seed(atom);
}
@@ -580,6 +628,146 @@ private:
void on_torrent_started();
void on_torrent_stopped();
// ---
static void peer_callback_common(tr_peer* const peer, tr_peer_event const& event, tr_swarm* const s)
{
switch (event.type)
{
case tr_peer_event::Type::ClientGotRej:
s->active_requests.remove(s->tor->piece_loc(event.pieceIndex, event.offset).block, peer);
break;
case tr_peer_event::Type::ClientGotBlock:
{
auto* const tor = s->tor;
auto const loc = tor->piece_loc(event.pieceIndex, event.offset);
s->cancelAllRequestsForBlock(loc.block, peer);
peer->blocks_sent_to_client.add(tr_time(), 1);
tr_torrentGotBlock(tor, loc.block);
}
break;
case tr_peer_event::Type::Error:
if (event.err == ERANGE || event.err == EMSGSIZE || event.err == ENOTCONN)
{
/* some protocol error from the peer */
peer->do_purge = true;
tr_logAddDebugSwarm(
s,
fmt::format(
"setting {} do_purge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
peer->display_name()));
}
else
{
tr_logAddDebugSwarm(s, fmt::format("unhandled error: {}", tr_strerror(event.err)));
}
break;
default:
TR_ASSERT_MSG(false, "This should be unreachable code");
break;
}
}
static void on_client_got_piece_data(tr_torrent* const tor, uint32_t const sent_length, time_t const now)
{
tor->downloadedCur += sent_length;
tor->set_date_active(now);
tor->set_dirty();
tor->session->add_downloaded(sent_length);
}
void on_got_port(tr_peerMsgs* const msgs, tr_peer_event const& event, bool was_connectable)
{
auto& info_this = *msgs->peer_info;
TR_ASSERT(info_this.is_connected());
TR_ASSERT(was_connectable != std::empty(info_this.listen_port()));
// If we already know about this peer, merge the info objects without invalidating references
if (auto it_that = connectable_pool.find({ info_this.listen_address(), event.port });
it_that != std::end(connectable_pool))
{
auto& info_that = it_that->second;
TR_ASSERT(it_that->first == info_that.listen_socket_address());
TR_ASSERT(it_that->first.address() == info_this.listen_address());
TR_ASSERT(it_that->first.port() != info_this.listen_port());
// If there is an existing connection to this peer, keep the better one
if (info_that.is_connected() && on_got_port_duplicate_connection(msgs, it_that, was_connectable))
{
return;
}
info_this.merge(info_that);
connectable_pool.erase(info_that.listen_socket_address());
}
else if (!was_connectable)
{
info_this.set_connectable();
}
auto nh = was_connectable ? connectable_pool.extract(info_this.listen_socket_address()) :
incoming_pool.extract(msgs->socket_address());
TR_ASSERT(!std::empty(nh));
if (was_connectable)
{
TR_ASSERT(nh.key() == nh.mapped().listen_socket_address());
}
else
{
TR_ASSERT(nh.key().address() == nh.mapped().listen_address());
}
nh.key().port_ = event.port;
[[maybe_unused]] auto const inserted = connectable_pool.insert(std::move(nh)).inserted;
TR_ASSERT(inserted);
info_this.set_listen_port(event.port);
mark_all_seeds_flag_dirty();
}
bool on_got_port_duplicate_connection(tr_peerMsgs* const msgs, Pool::iterator& it_that, bool was_connectable)
{
auto& info_this = *msgs->peer_info;
auto& info_that = it_that->second;
TR_ASSERT(info_that.is_connected());
if (CompareAtomsByUsefulness(info_this, info_that))
{
auto it = std::find_if(
std::begin(peers),
std::end(peers),
[&info_that](tr_peerMsgs const* const peer) { return peer->peer_info == &info_that; });
TR_ASSERT(it != std::end(peers));
(*it)->do_purge = true;
if (was_connectable)
{
// Note that it_that is invalid after this point
graveyard_pool.insert(connectable_pool.extract(it_that));
}
return false;
}
info_that.merge(info_this);
msgs->do_purge = true;
if (was_connectable)
{
graveyard_pool.insert(connectable_pool.extract(info_this.listen_socket_address()));
}
mark_all_seeds_flag_dirty();
return true;
}
// ---
// number of bad pieces a peer is allowed to send before we ban them
static auto constexpr MaxBadPiecesPerPeer = int{ 5 };
@@ -685,9 +873,12 @@ private:
since the blocklist has changed, erase that cached value */
for (auto* const tor : session->torrents())
{
for (auto& [socket_address, atom] : tor->swarm->pool)
for (auto& pool : { std::ref(tor->swarm->connectable_pool), std::ref(tor->swarm->incoming_pool) })
{
atom.set_blocklisted_dirty();
for (auto& [socket_address, atom] : pool.get())
{
atom.set_blocklisted_dirty();
}
}
}
}
@@ -703,16 +894,11 @@ private:
// --- tr_peer virtual functions
tr_peer::tr_peer(tr_torrent const* tor, tr_peer_info* peer_info_in)
tr_peer::tr_peer(tr_torrent const* tor)
: session{ tor->session }
, swarm{ tor->swarm }
, peer_info{ peer_info_in }
, blame{ tor->block_count() }
{
if (auto* const info = peer_info; info != nullptr)
{
info->set_connected(tr_time());
}
}
tr_peer::~tr_peer()
@@ -721,11 +907,6 @@ tr_peer::~tr_peer()
{
swarm->active_requests.remove(this);
}
if (auto* const info = peer_info; info != nullptr)
{
info->set_connected(tr_time(), false);
}
}
// ---
@@ -879,7 +1060,7 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, tr_
tr_swarm* swarm = tor->swarm;
auto* peer = tr_peerMsgsNew(tor, peer_info, std::move(io), client, &tr_swarm::peerCallbackFunc, swarm);
auto* peer = tr_peerMsgsNew(tor, peer_info, std::move(io), client, &tr_swarm::peer_callback_bt, swarm);
swarm->peers.push_back(peer);
@@ -896,7 +1077,6 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, tr_
TR_ASSERT(result.io != nullptr);
bool const ok = result.is_connected;
bool success = false;
auto* const s = manager->get_existing_swarm(result.io->torrent_hash());
@@ -936,7 +1116,9 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, tr_
}
else /* looking good */
{
auto& info = s->ensure_info_exists(socket_address, 0, TR_PEER_FROM_INCOMING);
// If this is an outgoing connection, then we are sure we already have the peer info object
auto& info = result.io->is_incoming() ? s->ensure_info_exists(socket_address, 0U, TR_PEER_FROM_INCOMING, false) :
*s->get_existing_peer_info(socket_address);
if (!result.io->is_incoming())
{
@@ -953,7 +1135,7 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, tr_
{
tr_logAddTraceSwarm(s, fmt::format("banned peer {} tried to reconnect", info.display_name()));
}
else if (result.io->is_incoming() && s->peerCount() >= s->tor->peer_limit())
else if (s->peerCount() >= s->tor->peer_limit())
{
// too many peers already
}
@@ -974,11 +1156,11 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, tr_
result.io->set_bandwidth(&s->tor->bandwidth_);
create_bit_torrent_peer(s->tor, result.io, &info, client);
success = true;
return true;
}
}
return success;
return false;
}
} // namespace handshake_helpers
} // namespace
@@ -997,13 +1179,13 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_peer_socket&& socket)
tr_logAddTrace(fmt::format("Banned IP address '{}' tried to connect to us", socket.display_name()));
socket.close();
}
else if (manager->incoming_handshakes.count(socket.socketAddress()) != 0U)
else if (manager->incoming_handshakes.count(socket.socket_address()) != 0U)
{
socket.close();
}
else /* we don't have a connection to them yet... */
{
auto socket_address = socket.socketAddress();
auto socket_address = socket.socket_address();
manager->incoming_handshakes.try_emplace(
socket_address,
&manager->handshake_mediator_,
@@ -1022,9 +1204,12 @@ size_t tr_peerMgrAddPex(tr_torrent* tor, tr_peer_from from, tr_pex const* pex, s
for (tr_pex const* const end = pex + n_pex; pex != end; ++pex)
{
if (tr_isPex(pex) && /* safeguard against corrupt data */
!s->manager->session->addressIsBlocked(pex->addr) && pex->is_valid_for_peers())
!s->manager->session->addressIsBlocked(pex->addr) && pex->is_valid_for_peers() && from != TR_PEER_FROM_INCOMING &&
(from != TR_PEER_FROM_PEX || (pex->flags & ADDED_F_CONNECTABLE) != 0))
{
s->ensure_info_exists({ pex->addr, pex->port }, pex->flags, from);
// we store this peer since it is supposedly connectable (socket address should be the peer's listening address)
// don't care about non-connectable peers that we are not connected to
s->ensure_info_exists({ pex->addr, pex->port }, pex->flags, from, true);
++n_used;
}
}
@@ -1087,35 +1272,6 @@ namespace
namespace get_peers_helpers
{
/* better goes first */
constexpr struct
{
[[nodiscard]] constexpr static int compare(tr_peer_info const& a, tr_peer_info const& b) noexcept // <=>
{
if (auto const val = a.compare_by_piece_data_time(b); val != 0)
{
return -val;
}
if (auto const val = tr_compare_3way(a.from_best(), b.from_best()); val != 0)
{
return val;
}
return a.compare_by_failure_count(b);
}
[[nodiscard]] constexpr bool operator()(tr_peer_info const& a, tr_peer_info const& b) const noexcept
{
return compare(a, b) < 0;
}
[[nodiscard]] constexpr bool operator()(tr_peer_info const* a, tr_peer_info const* b) const noexcept
{
return compare(*a, *b) < 0;
}
} CompareAtomsByUsefulness{};
[[nodiscard]] bool is_peer_interesting(tr_torrent const* tor, tr_peer_info const& info)
{
if (tor->is_done() && info.is_seed())
@@ -1171,7 +1327,7 @@ std::vector<tr_pex> tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t address_ty
}
else /* TR_PEERS_INTERESTING */
{
for (auto const& [socket_address, peer_info] : s->pool)
for (auto const& [socket_address, peer_info] : s->connectable_pool)
{
if (is_peer_interesting(tor, peer_info))
{
@@ -1191,7 +1347,7 @@ std::vector<tr_pex> tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t address_ty
for (auto const* const info : infos)
{
auto const& [addr, port] = info->socket_address();
auto const& [addr, port] = info->listen_socket_address();
if (addr.type == address_type)
{
@@ -1338,7 +1494,7 @@ namespace peer_stat_helpers
{
auto stats = tr_peer_stat{};
auto const [addr, port] = peer->socketAddress();
auto const [addr, port] = peer->socket_address();
addr.display_name(stats.addr, sizeof(stats.addr));
stats.client = peer->user_agent().c_str();
@@ -1788,16 +1944,15 @@ auto constexpr MaxUploadIdleSecs = time_t{ 60 * 5 };
return false;
}
void close_peer(tr_peer* peer)
void close_peer(tr_peerMsgs* peer)
{
TR_ASSERT(peer != nullptr);
tr_logAddTraceSwarm(peer->swarm, fmt::format("removing bad peer {}", peer->display_name()));
peer->swarm->removePeer(peer);
peer->swarm->remove_peer(peer);
}
constexpr struct
{
[[nodiscard]] constexpr static int compare(tr_peer const* a, tr_peer const* b) // <=>
[[nodiscard]] constexpr static int compare(tr_peerMsgs const* a, tr_peerMsgs const* b) // <=>
{
if (a->do_purge != b->do_purge)
{
@@ -1807,18 +1962,18 @@ constexpr struct
return -a->peer_info->compare_by_piece_data_time(*b->peer_info);
}
[[nodiscard]] constexpr bool operator()(tr_peer const* a, tr_peer const* b) const // less than
[[nodiscard]] constexpr bool operator()(tr_peerMsgs const* a, tr_peerMsgs const* b) const // less than
{
return compare(a, b) < 0;
}
} ComparePeerByMostActive{};
constexpr auto ComparePeerByLeastActive = [](tr_peer const* a, tr_peer const* b)
constexpr auto ComparePeerByLeastActive = [](tr_peerMsgs const* a, tr_peerMsgs const* b)
{
return ComparePeerByMostActive(b, a);
};
using bad_peers_t = small::vector<tr_peer*, 512U>;
using bad_peers_t = small::vector<tr_peerMsgs*, 512U>;
bad_peers_t& get_peers_to_close(tr_swarm const* const swarm, time_t const now_sec, bad_peers_t& bad_peers_buf)
{
@@ -1842,6 +1997,7 @@ void close_bad_peers(tr_swarm* s, time_t const now_sec, bad_peers_t& bad_peers_b
{
for (auto* peer : get_peers_to_close(s, now_sec, bad_peers_buf))
{
tr_logAddTraceSwarm(peer->swarm, fmt::format("removing bad peer {}", peer->display_name()));
close_peer(peer);
}
}
@@ -1876,7 +2032,7 @@ void enforceSessionPeerLimit(tr_session* session)
}
// Make a list of all the peers.
auto peers = std::vector<tr_peer*>{};
auto peers = std::vector<tr_peerMsgs*>{};
peers.reserve(tr_peerMsgs::size());
for (auto const* const tor : session->torrents())
{
@@ -1908,7 +2064,7 @@ void tr_peerMgr::reconnectPulse()
if (!swarm->is_running)
{
swarm->removeAllPeers();
swarm->remove_all_peers();
}
else
{
@@ -2001,8 +2157,8 @@ void tr_peerMgr::bandwidthPulse()
bool tr_swarm::peer_is_in_use(tr_peer_info const& peer_info) const
{
return peer_info.is_connected() || outgoing_handshakes.count(peer_info.socket_address()) != 0U ||
manager->incoming_handshakes.count(peer_info.socket_address()) != 0U;
// TODO(tearfur): maybe it's possible to store each handshake in the peer_info objects
return peer_info.is_connected() || outgoing_handshakes.count(peer_info.listen_socket_address()) != 0U;
}
namespace
@@ -2010,7 +2166,7 @@ namespace
namespace connect_helpers
{
/* is this atom someone that we'd want to initiate a connection to? */
[[nodiscard]] bool isPeerCandidate(tr_torrent const* tor, tr_peer_info const& peer_info, time_t const now)
[[nodiscard]] bool is_peer_candidate(tr_torrent const* tor, tr_peer_info const& peer_info, time_t const now)
{
// have we already tried and failed to connect?
if (auto const conn = peer_info.is_connectable(); conn && !*conn)
@@ -2055,7 +2211,7 @@ struct peer_candidate
{
peer_candidate() = default;
peer_candidate(uint64_t score_in, tr_torrent* tor_in, tr_peer_info* peer_info_in)
peer_candidate(uint64_t score_in, tr_torrent const* const tor_in, tr_peer_info const* const peer_info_in)
: score{ score_in }
, tor{ tor_in }
, peer_info{ peer_info_in }
@@ -2063,8 +2219,8 @@ struct peer_candidate
}
uint64_t score;
tr_torrent* tor;
tr_peer_info* peer_info;
tr_torrent const* tor;
tr_peer_info const* peer_info;
};
[[nodiscard]] bool torrentWasRecentlyStarted(tr_torrent const* tor)
@@ -2149,7 +2305,7 @@ struct peer_candidate
}
auto candidates = std::vector<peer_candidate>{};
candidates.reserve(tr_peer_info::known_peer_count());
candidates.reserve(tr_peer_info::known_connectable_count());
/* populate the candidate array */
auto salter = tr_salt_shaker{};
@@ -2165,7 +2321,7 @@ struct peer_candidate
/* if everyone in the swarm is seeds and pex is disabled because
* the torrent is private, then don't initiate connections */
bool const seeding = tor->is_done();
if (seeding && swarm->isAllSeeds() && tor->is_private())
if (seeding && swarm->is_all_seeds() && tor->is_private())
{
continue;
}
@@ -2182,9 +2338,9 @@ struct peer_candidate
continue;
}
for (auto& [socket_address, atom] : swarm->pool)
for (auto const& [socket_address, atom] : swarm->connectable_pool)
{
if (isPeerCandidate(tor, atom, now))
if (is_peer_candidate(tor, atom, now))
{
candidates.emplace_back(getPeerCandidateScore(tor, atom, salter()), tor, &atom);
}
@@ -2206,12 +2362,12 @@ struct peer_candidate
auto ret = tr_peerMgr::OutboundCandidates{};
for (auto it = std::crbegin(candidates), end = std::crend(candidates); it != end; ++it)
{
ret.emplace_back(it->tor->id(), it->peer_info->socket_address());
ret.emplace_back(it->tor->id(), it->peer_info->listen_socket_address());
}
return ret;
}
void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, tr_peer_info& peer_info)
void initiate_connection(tr_peerMgr* mgr, tr_swarm* s, tr_peer_info& peer_info)
{
using namespace handshake_helpers;
@@ -2231,9 +2387,9 @@ void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, tr_peer_info& peer_info)
auto peer_io = tr_peerIo::new_outgoing(
session,
&session->top_bandwidth_,
peer_info.socket_address(),
peer_info.listen_socket_address(),
s->tor->info_hash(),
s->tor->completeness == TR_SEED,
s->tor->is_seed(),
utp);
if (!peer_io)
@@ -2245,7 +2401,7 @@ void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, tr_peer_info& peer_info)
else
{
s->outgoing_handshakes.try_emplace(
peer_info.socket_address(),
peer_info.listen_socket_address(),
&mgr->handshake_mediator_,
peer_io,
session->encryptionMode(),
@@ -2281,7 +2437,7 @@ void tr_peerMgr::make_new_peer_connections()
{
if (auto* const peer_info = tor->swarm->get_existing_peer_info(sock_addr); peer_info != nullptr)
{
initiateConnection(this, tor->swarm, *peer_info);
initiate_connection(this, tor->swarm, *peer_info);
}
}
}

View File

@@ -9,7 +9,6 @@
#error only libtransmission should #include this header.
#endif
#include <atomic>
#include <cstddef> // size_t
#include <cstdint> // uint8_t, uint64_t
#include <ctime>
@@ -60,12 +59,20 @@ class tr_peer_info
{
public:
tr_peer_info(tr_socket_address socket_address, uint8_t pex_flags, tr_peer_from from)
: socket_address_{ std::move(socket_address) }
: listen_socket_address_{ socket_address }
, from_first_{ from }
, from_best_{ from }
{
++n_known_peers;
TR_ASSERT(!std::empty(socket_address.port()));
++n_known_connectable_;
set_pex_flags(pex_flags);
}
tr_peer_info(tr_address address, uint8_t pex_flags, tr_peer_from from)
: listen_socket_address_{ address, tr_port{} }
, from_first_{ from }
, from_best_{ from }
{
set_pex_flags(pex_flags);
}
@@ -76,30 +83,51 @@ public:
~tr_peer_info()
{
[[maybe_unused]] auto const n_prev = n_known_peers--;
TR_ASSERT(n_prev > 0U);
if (!std::empty(listen_socket_address_.port()))
{
[[maybe_unused]] auto const n_prev = n_known_connectable_--;
TR_ASSERT(n_prev > 0U);
}
}
[[nodiscard]] static auto known_peer_count() noexcept
[[nodiscard]] static auto known_connectable_count() noexcept
{
return n_known_peers.load();
return n_known_connectable_;
}
// ---
[[nodiscard]] constexpr auto const& socket_address() const noexcept
[[nodiscard]] constexpr auto const& listen_socket_address() const noexcept
{
return socket_address_;
return listen_socket_address_;
}
[[nodiscard]] constexpr auto& port() noexcept
[[nodiscard]] constexpr auto const& listen_address() const noexcept
{
return socket_address_.port_;
return listen_socket_address_.address();
}
[[nodiscard]] constexpr auto listen_port() const noexcept
{
return listen_socket_address_.port();
}
void set_listen_port(tr_port port_in) noexcept
{
if (!std::empty(port_in))
{
auto& port = listen_socket_address_.port_;
if (std::empty(port)) // increment known connectable peers if we did not know the listening port of this peer before
{
++n_known_connectable_;
}
port = port_in;
}
}
[[nodiscard]] auto display_name() const
{
return socket_address_.display_name();
return listen_socket_address_.display_name();
}
// ---
@@ -263,9 +291,16 @@ public:
// ---
constexpr void set_pex_flags(uint8_t pex_flags) noexcept
constexpr void set_pex_flags(uint8_t pex_flags, bool merge_unsupported = false) noexcept
{
pex_flags_ = pex_flags;
if (merge_unsupported)
{
pex_flags_ |= pex_flags;
}
else
{
pex_flags_ = pex_flags;
}
if ((pex_flags & ADDED_F_CONNECTABLE) != 0U)
{
@@ -316,17 +351,100 @@ public:
return ret;
}
// ---
// merge two peer info objects that supposedly describes the same peer
void merge(tr_peer_info const& that) noexcept
{
TR_ASSERT(is_connectable_.value_or(true) || !is_connected());
TR_ASSERT(that.is_connectable_.value_or(true) || !that.is_connected());
connection_attempted_at_ = std::max(connection_attempted_at_, that.connection_attempted_at_);
connection_changed_at_ = std::max(connection_changed_at_, that.connection_changed_at_);
piece_data_at_ = std::max(piece_data_at_, that.piece_data_at_);
/* no need to merge blocklist since it gets updated elsewhere */
{
// This part is frankly convoluted and confusing, but the idea is:
// 1. If the two peer info objects agree that this peer is connectable/non-connectable,
// then the answer is straightforward: We keep the agreed value.
// 2. If the two peer info objects disagrees as to whether this peer is connectable,
// then we reset the flag to an empty value, so that we can try for ourselves when
// initiating outgoing connections.
// 3. If one object has knowledge and the other doesn't, then we take the word of the
// peer info object with knowledge with one exception:
// - If the object with knowledge says the peer is not connectable, but we are
// currently connected to the peer, then we give it the benefit of the doubt.
// The connectable flag will be reset to an empty value.
// 4. In case both objects have no knowledge about whether this peer is connectable,
// we shall not make any assumptions: We keep the flag empty.
//
// Truth table:
// +-----------------+---------------+----------------------+--------------------+---------+
// | is_connectable_ | is_connected_ | that.is_connectable_ | that.is_connected_ | Result |
// +=================+===============+======================+====================+=========+
// | T | T | T | T | T |
// | T | T | T | F | T |
// | T | T | F | F | ? |
// | T | T | ? | T | T |
// | T | T | ? | F | T |
// | T | F | T | T | T |
// | T | F | T | F | T |
// | T | F | F | F | ? |
// | T | F | ? | T | T |
// | T | F | ? | F | T |
// | F | F | T | T | ? |
// | F | F | T | F | ? |
// | F | F | F | F | F |
// | F | F | ? | T | ? |
// | F | F | ? | F | F |
// | ? | T | T | T | T |
// | ? | T | T | F | T |
// | ? | T | F | F | ? |
// | ? | T | ? | T | ? |
// | ? | T | ? | F | ? |
// | ? | F | T | T | T |
// | ? | F | T | F | T |
// | ? | F | F | F | F |
// | ? | F | ? | T | ? |
// | ? | F | ? | F | ? |
// | N/A | N/A | F | T | Invalid |
// | F | T | N/A | N/A | Invalid |
// +-----------------+---------------+----------------------+--------------------+---------+
auto const conn_this = is_connectable_ && *is_connectable_;
auto const conn_that = that.is_connectable_ && *that.is_connectable_;
if ((!is_connectable_ && !that.is_connectable_) ||
is_connectable_.value_or(conn_that || is_connected()) !=
that.is_connectable_.value_or(conn_this || that.is_connected()))
{
is_connectable_.reset();
}
else
{
set_connectable(conn_this || conn_that);
}
}
set_utp_supported(supports_utp() || that.supports_utp());
/* from_first_ should never be modified */
found_at(that.from_best());
/* num_consecutive_fails_ is already the latest */
set_pex_flags(that.pex_flags(), true);
if (that.is_banned())
{
ban();
}
/* is_connected_ should already be set */
set_seed(is_seed() || that.is_seed());
}
private:
[[nodiscard]] constexpr tr_address const& addr() const noexcept
{
return socket_address_.address();
}
[[nodiscard]] constexpr auto port() const noexcept
{
return socket_address_.port();
}
[[nodiscard]] constexpr time_t get_reconnect_interval_secs(time_t const now) const noexcept
{
// if we were recently connected to this peer and transferring piece
@@ -369,9 +487,10 @@ private:
// the minimum we'll wait before attempting to reconnect to a peer
static auto constexpr MinimumReconnectIntervalSecs = time_t{ 5U };
static auto inline n_known_peers = std::atomic<size_t>{};
static auto inline n_known_connectable_ = size_t{};
tr_socket_address socket_address_;
// if the port is 0, it SHOULD mean we don't know this peer's listen socket address
tr_socket_address listen_socket_address_;
time_t connection_attempted_at_ = {};
time_t connection_changed_at_ = {};

View File

@@ -46,7 +46,6 @@
#include "libtransmission/variant.h"
#include "libtransmission/version.h"
class tr_peer_info;
struct tr_error;
#ifndef EBADMSG
@@ -328,7 +327,7 @@ public:
tr_peer_info* const peer_info_in,
std::shared_ptr<tr_peerIo> io_in,
tr_interned_string client,
tr_peer_callback callback,
tr_peer_callback_bt callback,
void* callback_data)
: tr_peerMsgs{ torrent_in, peer_info_in, client, io_in->is_encrypted(), io_in->is_incoming(), io_in->is_utp() }
, torrent{ torrent_in }
@@ -375,9 +374,9 @@ public:
set_active(TR_UP, false);
set_active(TR_DOWN, false);
if (this->io)
if (io)
{
this->io->clear();
io->clear();
}
}
@@ -409,14 +408,14 @@ public:
}
}
[[nodiscard]] tr_socket_address socketAddress() const override
[[nodiscard]] tr_socket_address socket_address() const override
{
return io->socket_address();
}
[[nodiscard]] std::string display_name() const override
{
auto const [addr, port] = socketAddress();
auto const [addr, port] = socket_address();
return addr.display_name(port);
}
@@ -679,7 +678,7 @@ private:
friend void parseLtepHandshake(tr_peerMsgsImpl* msgs, MessageReader& payload);
friend void parseUtMetadata(tr_peerMsgsImpl* msgs, MessageReader& payload_in);
tr_peer_callback const callback_;
tr_peer_callback_bt const callback_;
void* const callback_data_;
// seconds between periodic sendPex() calls
@@ -937,7 +936,7 @@ void sendLtepHandshake(tr_peerMsgsImpl* msgs)
bool const allow_metadata_xfer = msgs->torrent->is_public();
/* decide if we want to advertise pex support */
bool const allow_pex = msgs->session->allows_pex() && msgs->torrent->allows_pex();
bool const allow_pex = msgs->torrent->allows_pex();
auto val = tr_variant{};
tr_variantInitDict(&val, 8);
@@ -1107,7 +1106,7 @@ void parseLtepHandshake(tr_peerMsgsImpl* msgs, MessageReader& payload)
}
/* get peer's listening port */
if (tr_variantDictFindInt(&val, TR_KEY_p, &i))
if (tr_variantDictFindInt(&val, TR_KEY_p, &i) && i > 0)
{
pex.port.setHost(i);
msgs->publish(tr_peer_event::GotPort(pex.port));
@@ -2122,8 +2121,27 @@ void tr_peerMsgsImpl::sendPex()
} // namespace
tr_peerMsgs::tr_peerMsgs(
tr_torrent const* tor,
tr_peer_info* peer_info_in,
tr_interned_string user_agent,
bool connection_is_encrypted,
bool connection_is_incoming,
bool connection_is_utp)
: tr_peer{ tor }
, peer_info{ peer_info_in }
, user_agent_{ user_agent }
, connection_is_encrypted_{ connection_is_encrypted }
, connection_is_incoming_{ connection_is_incoming }
, connection_is_utp_{ connection_is_utp }
{
peer_info->set_connected(tr_time());
++n_peers;
}
tr_peerMsgs::~tr_peerMsgs()
{
peer_info->set_connected(tr_time(), false);
[[maybe_unused]] auto const n_prev = n_peers--;
TR_ASSERT(n_prev > 0U);
}
@@ -2133,7 +2151,7 @@ tr_peerMsgs* tr_peerMsgsNew(
tr_peer_info* const peer_info,
std::shared_ptr<tr_peerIo> io,
tr_interned_string user_agent,
tr_peer_callback callback,
tr_peer_callback_bt callback,
void* callback_data)
{
return new tr_peerMsgsImpl(torrent, peer_info, std::move(io), user_agent, callback, callback_data);

View File

@@ -13,7 +13,6 @@
#include <atomic>
#include <cstddef> // for size_t
#include <memory>
#include <utility> // for std::pair<>
#include "libtransmission/transmission.h" // for tr_direction, tr_block_ind...
@@ -22,6 +21,7 @@
#include "libtransmission/peer-common.h" // for tr_peer
class tr_peerIo;
class tr_peerMsgs;
class tr_peer_info;
struct tr_torrent;
@@ -30,6 +30,8 @@ struct tr_torrent;
* @{
*/
using tr_peer_callback_bt = void (*)(tr_peerMsgs* peer, tr_peer_event const& event, void* client_data);
class tr_peerMsgs : public tr_peer
{
public:
@@ -39,17 +41,9 @@ public:
tr_interned_string user_agent,
bool connection_is_encrypted,
bool connection_is_incoming,
bool connection_is_utp)
: tr_peer{ tor, peer_info_in }
, user_agent_{ user_agent }
, connection_is_encrypted_{ connection_is_encrypted }
, connection_is_incoming_{ connection_is_incoming }
, connection_is_utp_{ connection_is_utp }
{
++n_peers;
}
bool connection_is_utp);
virtual ~tr_peerMsgs() override;
~tr_peerMsgs() override;
[[nodiscard]] static auto size() noexcept
{
@@ -101,7 +95,7 @@ public:
return is_active_[direction];
}
[[nodiscard]] virtual tr_socket_address socketAddress() const = 0;
[[nodiscard]] virtual tr_socket_address socket_address() const = 0;
virtual void cancel_block_request(tr_block_index_t block) = 0;
@@ -145,6 +139,10 @@ protected:
user_agent_ = val;
}
public:
// TODO(tearfur): change this to reference
tr_peer_info* const peer_info;
private:
static inline auto n_peers = std::atomic<size_t>{};
@@ -176,7 +174,7 @@ tr_peerMsgs* tr_peerMsgsNew(
tr_peer_info* peer_info,
std::shared_ptr<tr_peerIo> io,
tr_interned_string user_agent,
tr_peer_callback callback,
tr_peer_callback_bt callback,
void* callback_data);
/* @} */

View File

@@ -60,7 +60,7 @@ public:
size_t try_read(InBuf& buf, size_t max, bool buf_is_empty, tr_error** error) const;
size_t try_write(OutBuf& buf, size_t max, tr_error** error) const;
[[nodiscard]] constexpr auto const& socketAddress() const noexcept
[[nodiscard]] constexpr auto const& socket_address() const noexcept
{
return socket_address_;
}

View File

@@ -158,7 +158,7 @@ public:
return unix_addr_.to_string();
}
if (port.empty())
if (std::empty(port))
{
return { inet_addr_.display_name() };
}

View File

@@ -166,7 +166,7 @@ void onBufferGotData(evbuffer* /*buf*/, evbuffer_cb_info const* info, void* vtas
class tr_webseed final : public tr_peer
{
public:
tr_webseed(struct tr_torrent* tor, std::string_view url, tr_peer_callback callback_in, void* callback_data_in)
tr_webseed(struct tr_torrent* tor, std::string_view url, tr_peer_callback_webseed callback_in, void* callback_data_in)
: tr_peer{ tor }
, torrent_id{ tr_torrentId(tor) }
, base_url{ url }
@@ -314,7 +314,7 @@ public:
tr_torrent_id_t const torrent_id;
std::string const base_url;
tr_peer_callback const callback;
tr_peer_callback_webseed const callback;
void* const callback_data;
ConnectionLimiter connection_limiter;
@@ -538,7 +538,7 @@ void task_request_next_chunk(tr_webseed_task* task)
// ---
tr_peer* tr_webseedNew(tr_torrent* torrent, std::string_view url, tr_peer_callback callback, void* callback_data)
tr_peer* tr_webseedNew(tr_torrent* torrent, std::string_view url, tr_peer_callback_webseed callback, void* callback_data)
{
return new tr_webseed(torrent, url, callback, callback_data);
}

View File

@@ -15,6 +15,8 @@
#include "libtransmission/peer-common.h"
tr_peer* tr_webseedNew(struct tr_torrent* torrent, std::string_view, tr_peer_callback callback, void* callback_data);
using tr_peer_callback_webseed = tr_peer_callback_generic;
tr_peer* tr_webseedNew(struct tr_torrent* torrent, std::string_view, tr_peer_callback_webseed callback, void* callback_data);
tr_webseed_view tr_webseedView(tr_peer const* peer);