From 3b034945801d0cd6d8cf328f7e7502ff9e839c1e Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Thu, 22 Jun 2023 00:24:42 -0500 Subject: [PATCH] perf: use libsmall in libtransmission, pt 3 (#5653) * refactor: use BufferReader, BufferWriter in peer-socket * feat: expose GrowthFactor in tr-buffer * perf: choose better defaults for the peer message buffers * chore: sync tests * refactor: use small::map in ActiveRequests::Impl --- libtransmission/peer-io.cc | 2 +- libtransmission/peer-io.h | 14 ++++- libtransmission/peer-mgr-active-requests.cc | 61 ++++----------------- libtransmission/peer-mgr-wishlist.h | 2 + libtransmission/peer-socket.cc | 6 +- libtransmission/peer-socket.h | 9 +-- libtransmission/tr-buffer.h | 6 +- tests/libtransmission/announcer-udp-test.cc | 36 ++++++------ tests/libtransmission/buffer-test.cc | 8 +-- 9 files changed, 58 insertions(+), 86 deletions(-) diff --git a/libtransmission/peer-io.cc b/libtransmission/peer-io.cc index f5cd7e0b6..c4c3d10f9 100644 --- a/libtransmission/peer-io.cc +++ b/libtransmission/peer-io.cc @@ -419,7 +419,7 @@ size_t tr_peerIo::try_read(size_t max) auto& buf = inbuf_; tr_error* error = nullptr; - auto const n_read = socket_.try_read(buf, max, &error); + auto const n_read = socket_.try_read(buf, max, std::empty(buf), &error); set_enabled(Dir, error == nullptr || canRetryFromError(error->code)); if (error != nullptr) diff --git a/libtransmission/peer-io.h b/libtransmission/peer-io.h index 2cc662adf..634a4cc0d 100644 --- a/libtransmission/peer-io.h +++ b/libtransmission/peer-io.h @@ -20,6 +20,7 @@ #include "transmission.h" #include "bandwidth.h" +#include "block-info.h" #include "net.h" // tr_address #include "peer-mse.h" #include "peer-socket.h" @@ -291,8 +292,17 @@ public: static void utp_init(struct_utp_context* ctx); private: + // size of the buffer we use to hold incoming & outgoing messages + static constexpr auto InitialBufferSize = tr_block_info::BlockSize + sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t); + + // our target socket receive buffer size static constexpr auto RcvBuf = size_t{ 256 * 1024 }; + // start with a buffer size large enough to hold a BT block message, + // but avoid repeated reallocs by scaling up very quickly (5X) when + // we need more capacity. + using Buffer = libtransmission::SmallBuffer>; + friend class libtransmission::test::HandshakeTest; [[nodiscard]] constexpr auto is_seed() const noexcept @@ -344,8 +354,8 @@ private: tr_sha1_digest_t info_hash_; - libtransmission::Buffer inbuf_; - libtransmission::Buffer outbuf_; + Buffer inbuf_; + Buffer outbuf_; tr_session* const session_; diff --git a/libtransmission/peer-mgr-active-requests.cc b/libtransmission/peer-mgr-active-requests.cc index c6535079e..f19a6feca 100644 --- a/libtransmission/peer-mgr-active-requests.cc +++ b/libtransmission/peer-mgr-active-requests.cc @@ -11,53 +11,16 @@ #include #include +#include + #define LIBTRANSMISSION_PEER_MODULE #include "libtransmission/transmission.h" #include "libtransmission/peer-mgr-active-requests.h" +#include "libtransmission/peer-mgr-wishlist.h" #include "libtransmission/tr-assert.h" -namespace -{ - -struct peer_at -{ - tr_peer* peer; - time_t when; - - peer_at(tr_peer* p, time_t w) - : peer{ p } - , when{ w } - { - } - - [[nodiscard]] int compare(peer_at const& that) const // <=> - { - if (peer != that.peer) - { - return peer < that.peer ? -1 : 1; - } - - return 0; - } - - bool operator==(peer_at const& that) const - { - return compare(that) == 0; - } -}; - -struct PeerAtHash -{ - std::size_t operator()(peer_at const& pa) const noexcept - { - return std::hash{}(pa.peer); - } -}; - -} // namespace - class ActiveRequests::Impl { public: @@ -97,7 +60,7 @@ public: std::unordered_map count_; - std::unordered_map> blocks_; + std::unordered_map> blocks_; private: size_t size_ = 0; @@ -126,8 +89,7 @@ bool ActiveRequests::add(tr_block_index_t block, tr_peer* peer, time_t when) bool ActiveRequests::remove(tr_block_index_t block, tr_peer const* peer) { auto const it = impl_->blocks_.find(block); - auto const key = peer_at{ const_cast(peer), 0 }; - auto const removed = it != std::end(impl_->blocks_) && it->second.erase(key) != 0; + auto const removed = it != std::end(impl_->blocks_) && it->second.erase(peer) != 0; if (removed) { @@ -148,10 +110,9 @@ std::vector ActiveRequests::remove(tr_peer const* peer) auto removed = std::vector{}; removed.reserve(impl_->blocks_.size()); - auto const key = peer_at{ const_cast(peer), 0 }; for (auto const& [block, peers_at] : impl_->blocks_) { - if (peers_at.count(key) != 0U) + if (peers_at.count(peer) != 0U) { removed.push_back(block); } @@ -178,7 +139,7 @@ std::vector ActiveRequests::remove(tr_block_index_t block) std::begin(it->second), std::end(it->second), std::begin(removed), - [](auto const& sent) { return sent.peer; }); + [](auto const& iter) { return const_cast(iter.first); }); impl_->blocks_.erase(block); } @@ -194,7 +155,7 @@ std::vector ActiveRequests::remove(tr_block_index_t block) bool ActiveRequests::has(tr_block_index_t block, tr_peer const* peer) const { auto const it = impl_->blocks_.find(block); - return it != std::end(impl_->blocks_) && (it->second.count(peer_at{ const_cast(peer), 0 }) != 0U); + return it != std::end(impl_->blocks_) && (it->second.count(peer) != 0U); } // count how many peers we're asking for `block` @@ -225,11 +186,11 @@ std::vector> ActiveRequests::sentBefore(ti for (auto const& [block, peers_at] : impl_->blocks_) { - for (auto const& sent : peers_at) + for (auto const& [peer, sent_at] : peers_at) { - if (sent.when < when) + if (sent_at < when) { - sent_before.emplace_back(block, sent.peer); + sent_before.emplace_back(block, const_cast(peer)); } } } diff --git a/libtransmission/peer-mgr-wishlist.h b/libtransmission/peer-mgr-wishlist.h index c99ac0eab..dcb8d9e75 100644 --- a/libtransmission/peer-mgr-wishlist.h +++ b/libtransmission/peer-mgr-wishlist.h @@ -22,6 +22,8 @@ class Wishlist { public: + static auto constexpr EndgameMaxPeers = size_t{ 2U }; + struct Mediator { [[nodiscard]] virtual bool clientCanRequestBlock(tr_block_index_t block) const = 0; diff --git a/libtransmission/peer-socket.cc b/libtransmission/peer-socket.cc index 60f59b451..59d0d26e7 100644 --- a/libtransmission/peer-socket.cc +++ b/libtransmission/peer-socket.cc @@ -70,7 +70,7 @@ void tr_peer_socket::close() handle = {}; } -size_t tr_peer_socket::try_write(Buffer& buf, size_t max, tr_error** error) const +size_t tr_peer_socket::try_write(OutBuf& buf, size_t max, tr_error** error) const { if (max == size_t{}) { @@ -107,7 +107,7 @@ size_t tr_peer_socket::try_write(Buffer& buf, size_t max, tr_error** error) cons return {}; } -size_t tr_peer_socket::try_read(Buffer& buf, size_t max, tr_error** error) const +size_t tr_peer_socket::try_read(InBuf& buf, size_t max, [[maybe_unused]] bool is_buf_empty, tr_error** error) const { if (max == size_t{}) { @@ -123,7 +123,7 @@ size_t tr_peer_socket::try_read(Buffer& buf, size_t max, tr_error** error) const // utp_read_drained() notifies libutp that this read buffer is empty. // It opens up the congestion window by sending an ACK (soonish) if // one was not going to be sent. - if (is_utp() && std::empty(buf)) + if (is_utp() && is_buf_empty) { utp_read_drained(handle.utp); } diff --git a/libtransmission/peer-socket.h b/libtransmission/peer-socket.h index f57ab7ad4..a8da39afe 100644 --- a/libtransmission/peer-socket.h +++ b/libtransmission/peer-socket.h @@ -27,8 +27,6 @@ struct tr_session; class tr_peer_socket { public: - using Buffer = libtransmission::Buffer; - tr_peer_socket() = default; tr_peer_socket(tr_session const* session, tr_address const& address, tr_port port, tr_socket_t sock); tr_peer_socket(tr_address const& address, tr_port port, struct UTPSocket* const sock); @@ -56,8 +54,11 @@ public: } void close(); - size_t try_write(Buffer& buf, size_t max, tr_error** error) const; - size_t try_read(Buffer& buf, size_t max, tr_error** error) const; + using InBuf = libtransmission::BufferWriter; + using OutBuf = libtransmission::BufferReader; + + size_t try_read(InBuf& buf, size_t max, bool is_buf_empty, tr_error** error) const; + size_t try_write(OutBuf& buf, size_t max, tr_error** error) const; [[nodiscard]] constexpr std::pair socketAddress() const noexcept { diff --git a/libtransmission/tr-buffer.h b/libtransmission/tr-buffer.h index d5cdd6e73..96477b336 100644 --- a/libtransmission/tr-buffer.h +++ b/libtransmission/tr-buffer.h @@ -233,7 +233,7 @@ public: } }; -template +template> class SmallBuffer final : public BufferReader , public BufferWriter @@ -288,11 +288,9 @@ public: } private: - small::vector buf_ = {}; + small::vector, std::true_type, size_t, GrowthFactor> buf_ = {}; size_t begin_pos_ = {}; size_t end_pos_ = {}; }; -using Buffer = SmallBuffer<0, std::byte>; - } // namespace libtransmission diff --git a/tests/libtransmission/announcer-udp-test.cc b/tests/libtransmission/announcer-udp-test.cc index 26c7e83b8..abed03d83 100644 --- a/tests/libtransmission/announcer-udp-test.cc +++ b/tests/libtransmission/announcer-udp-test.cc @@ -25,6 +25,8 @@ using namespace std::literals; +using Buffer = libtransmission::SmallBuffer<1024>; + class AnnouncerUdpTest : public ::testing::Test { private: @@ -111,7 +113,7 @@ protected: } } - [[nodiscard]] static uint32_t parseConnectionRequest(libtransmission::Buffer& buf) + [[nodiscard]] static uint32_t parseConnectionRequest(Buffer& buf) { EXPECT_EQ(ProtocolId, buf.to_uint64()); EXPECT_EQ(ConnectAction, buf.to_uint32()); @@ -147,7 +149,7 @@ protected: return std::make_pair(buildScrapeRequestFromResponse(response), response); } - [[nodiscard]] static auto parseScrapeRequest(libtransmission::Buffer& buf, uint64_t expected_connection_id) + [[nodiscard]] static auto parseScrapeRequest(Buffer& buf, uint64_t expected_connection_id) { EXPECT_EQ(expected_connection_id, buf.to_uint64()); EXPECT_EQ(ScrapeAction, buf.to_uint32()); @@ -162,7 +164,7 @@ protected: return std::make_pair(transaction_id, info_hashes); } - static void waitForAnnouncerToSendMessage(MockMediator& mediator, libtransmission::Buffer& setme) + static void waitForAnnouncerToSendMessage(MockMediator& mediator, Buffer& setme) { libtransmission::test::waitFor(mediator.eventBase(), [&mediator]() { return !std::empty(mediator.sent_); }); setme.clear(); @@ -172,7 +174,7 @@ protected: [[nodiscard]] static bool sendError(tr_announcer_udp& announcer, uint32_t transaction_id, std::string_view errmsg) { - auto buf = libtransmission::Buffer{}; + auto buf = Buffer{}; buf.add_uint32(ErrorAction); buf.add_uint32(transaction_id); buf.add(errmsg); @@ -187,7 +189,7 @@ protected: [[nodiscard]] static auto sendConnectionResponse(tr_announcer_udp& announcer, uint32_t transaction_id) { auto const connection_id = tr_rand_obj(); - auto buf = libtransmission::Buffer{}; + auto buf = Buffer{}; buf.add_uint32(ConnectAction); buf.add_uint32(transaction_id); buf.add_uint64(connection_id); @@ -250,7 +252,7 @@ protected: EXPECT_EQ(actual.external_ip, expected.external_ip); } - [[nodiscard]] static auto parseAnnounceRequest(libtransmission::Buffer& buf, uint64_t connection_id) + [[nodiscard]] static auto parseAnnounceRequest(Buffer& buf, uint64_t connection_id) { auto req = UdpAnnounceReq{}; req.connection_id = buf.to_uint64(); @@ -313,7 +315,7 @@ TEST_F(AnnouncerUdpTest, canScrape) // The announcer should have sent a UDP connection request. // Inspect that request for validity. - auto sent = libtransmission::Buffer{}; + auto sent = Buffer{}; waitForAnnouncerToSendMessage(mediator, sent); auto connect_transaction_id = parseConnectionRequest(sent); @@ -327,7 +329,7 @@ TEST_F(AnnouncerUdpTest, canScrape) expectEqual(request, info_hashes); // Have the tracker respond to the request - auto buf = libtransmission::Buffer{}; + auto buf = Buffer{}; buf.add_uint32(ScrapeAction); buf.add_uint32(scrape_transaction_id); buf.add_uint32(expected_response.rows[0].seeders); @@ -369,7 +371,7 @@ TEST_F(AnnouncerUdpTest, canDestructCleanlyEvenWhenBusy) // The announcer should have sent a UDP connection request. // Inspect that request for validity. - auto sent = libtransmission::Buffer{}; + auto sent = Buffer{}; waitForAnnouncerToSendMessage(mediator, sent); auto const connect_transaction_id = parseConnectionRequest(sent); EXPECT_NE(0U, connect_transaction_id); @@ -398,7 +400,7 @@ TEST_F(AnnouncerUdpTest, canMultiScrape) announcer->scrape(request, [&response](tr_scrape_response const& resp) { response = resp; }); // Announcer will request a connection. Verify and grant the request - auto sent = libtransmission::Buffer{}; + auto sent = Buffer{}; waitForAnnouncerToSendMessage(mediator, sent); auto connect_transaction_id = parseConnectionRequest(sent); auto const connection_id = sendConnectionResponse(*announcer, connect_transaction_id); @@ -410,7 +412,7 @@ TEST_F(AnnouncerUdpTest, canMultiScrape) expectEqual(request, info_hashes); // Have the tracker respond to the request - auto buf = libtransmission::Buffer{}; + auto buf = Buffer{}; buf.add_uint32(ScrapeAction); buf.add_uint32(scrape_transaction_id); for (int i = 0; i < expected_response.row_count; ++i) @@ -460,7 +462,7 @@ TEST_F(AnnouncerUdpTest, canHandleScrapeError) // The announcer should have sent a UDP connection request. // Inspect that request for validity. - auto sent = libtransmission::Buffer{}; + auto sent = Buffer{}; waitForAnnouncerToSendMessage(mediator, sent); auto connect_transaction_id = parseConnectionRequest(sent); @@ -510,7 +512,7 @@ TEST_F(AnnouncerUdpTest, canHandleConnectError) // The announcer should have sent a UDP connection request. // Inspect that request for validity. - auto sent = libtransmission::Buffer{}; + auto sent = Buffer{}; waitForAnnouncerToSendMessage(mediator, sent); auto transaction_id = parseConnectionRequest(sent); @@ -542,12 +544,12 @@ TEST_F(AnnouncerUdpTest, handleMessageReturnsFalseOnInvalidMessage) // The announcer should have sent a UDP connection request. // Inspect that request for validity. - auto sent = libtransmission::Buffer{}; + auto sent = Buffer{}; waitForAnnouncerToSendMessage(mediator, sent); auto transaction_id = parseConnectionRequest(sent); // send a connection response but with an *invalid* transaction id - auto buf = libtransmission::Buffer{}; + auto buf = Buffer{}; buf.add_uint32(ConnectAction); buf.add_uint32(transaction_id + 1); buf.add_uint64(tr_rand_obj()); @@ -623,7 +625,7 @@ TEST_F(AnnouncerUdpTest, canAnnounce) announcer->announce(request, [&response](tr_announce_response const& resp) { response = resp; }); // Announcer will request a connection. Verify and grant the request - auto sent = libtransmission::Buffer{}; + auto sent = Buffer{}; waitForAnnouncerToSendMessage(mediator, sent); auto connect_transaction_id = parseConnectionRequest(sent); auto const connection_id = sendConnectionResponse(*announcer, connect_transaction_id); @@ -635,7 +637,7 @@ TEST_F(AnnouncerUdpTest, canAnnounce) expectEqual(request, udp_ann_req); // Have the tracker respond to the request - auto buf = libtransmission::Buffer{}; + auto buf = Buffer{}; buf.add_uint32(AnnounceAction); buf.add_uint32(udp_ann_req.transaction_id); buf.add_uint32(expected_response.interval); diff --git a/tests/libtransmission/buffer-test.cc b/tests/libtransmission/buffer-test.cc index 2bdf85beb..25a0c763f 100644 --- a/tests/libtransmission/buffer-test.cc +++ b/tests/libtransmission/buffer-test.cc @@ -11,7 +11,7 @@ using BufferTest = ::testing::Test; using namespace std::literals; -using Buffer = libtransmission::Buffer; +using Buffer = libtransmission::SmallBuffer<1024>; TEST_F(BufferTest, startsWithInSingleSegment) { @@ -103,9 +103,7 @@ TEST_F(BufferTest, NonBufferWriter) auto constexpr Bang = "!"sv; auto out1 = Buffer{}; - - auto out2_vec = std::vector{}; - auto out2 = libtransmission::BufferWriter, std::byte>{ &out2_vec }; + auto out2 = libtransmission::SmallBuffer<1024>{}; out1.add_uint8(1); out2.add_uint8(1); @@ -126,7 +124,7 @@ TEST_F(BufferTest, NonBufferWriter) out2.add(Bang); auto const result1 = out1.to_string_view(); - auto const result2 = std::string_view{ reinterpret_cast(std::data(out2_vec)), std::size(out2_vec) }; + auto const result2 = out2.to_string(); EXPECT_EQ(result1, result2); } #endif