From 7f9ef4730efbb376243f65761f9dad7b36f3679d Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Fri, 3 Mar 2023 17:43:51 -0600 Subject: [PATCH] fix: assertion failure in readBtPiece() (#5097) --- libtransmission/peer-msgs.cc | 412 ++++++++++++++++------------------- libtransmission/tr-buffer.h | 14 ++ 2 files changed, 201 insertions(+), 225 deletions(-) diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc index 24e3359d6..2504dcba4 100644 --- a/libtransmission/peer-msgs.cc +++ b/libtransmission/peer-msgs.cc @@ -81,6 +81,47 @@ auto constexpr FextAllowedFast = uint8_t{ 17 }; // see also LtepMessageIds below auto constexpr Ltep = uint8_t{ 20 }; +[[nodiscard]] constexpr std::string_view debug_name(uint8_t type) noexcept +{ + switch (type) + { + case Bitfield: + return "bitfield"sv; + case Cancel: + return "cancel"sv; + case Choke: + return "choke"sv; + case FextAllowedFast: + return "fext-allow-fast"sv; + case FextHaveAll: + return "fext-have-all"sv; + case FextHaveNone: + return "fext-have-none"sv; + case FextReject: + return "fext-reject"sv; + case FextSuggest: + return "fext-suggest"sv; + case Have: + return "have"sv; + case Interested: + return "interested"sv; + case Ltep: + return "ltep"sv; + case NotInterested: + return "not-interested"sv; + case Piece: + return "piece"sv; + case Port: + return "port"sv; + case Request: + return "request"sv; + case Unchoke: + return "unchoke"sv; + default: + return "unknown"sv; + } +} + } // namespace BtPeerMsgs namespace LtepMessages @@ -145,14 +186,6 @@ auto constexpr MaxPexPeerCount = size_t{ 50 }; // --- -enum class AwaitingBt -{ - Length, - Id, - Message, - Piece -}; - enum class EncryptionPreference { Unknown, @@ -186,9 +219,9 @@ peer_request blockToReq(tr_torrent const* tor, tr_block_index_t block) * the current message that it's sending us. */ struct tr_incoming { - uint8_t id = 0; // the protocol message, e.g. BtPeerMsgs::Piece - uint32_t length = 0; // the full message payload length. Includes the +1 for id length - std::optional block_req; // metadata for incoming blocks + std::optional length; // the full message payload length. Includes the +1 for id length + std::optional id; // the protocol message, e.g. BtPeerMsgs::Piece + libtransmission::Buffer payload; struct incoming_piece_data { @@ -237,6 +270,7 @@ void updateDesiredRequestCount(tr_peerMsgsImpl* msgs); #define logdbg(msgs, text) myLogMacro(msgs, TR_LOG_DEBUG, text) #define logtrace(msgs, text) myLogMacro(msgs, TR_LOG_TRACE, text) +#define logwarn(msgs, text) myLogMacro(msgs, TR_LOG_WARN, text) /** * Low-level communication state information about a connected peer. @@ -660,7 +694,6 @@ public: * very quickly; others aren't as urgent. */ int8_t outMessagesBatchPeriod; - AwaitingBt state = AwaitingBt::Length; uint8_t ut_pex_id = 0; uint8_t ut_metadata_id = 0; @@ -983,16 +1016,11 @@ void sendLtepHandshake(tr_peerMsgsImpl* msgs) tr_variantClear(&val); } -void parseLtepHandshake(tr_peerMsgsImpl* msgs, uint32_t len) +void parseLtepHandshake(tr_peerMsgsImpl* msgs, libtransmission::Buffer& payload) { msgs->peerSentLtepHandshake = true; - // LTEP messages are usually just a couple hundred bytes, - // so try using a strbuf to handle it on the stack - auto tmp = tr_strbuf{}; - tmp.resize(len); - msgs->io->read_bytes(std::data(tmp), std::size(tmp)); - auto const handshake_sv = tmp.sv(); + auto const handshake_sv = payload.pullup_sv(); auto val = tr_variant{}; if (!tr_variantFromBuf(&val, TR_VARIANT_PARSE_BENC | TR_VARIANT_PARSE_INPLACE, handshake_sv) || !tr_variantIsDict(&val)) @@ -1089,16 +1117,14 @@ void parseLtepHandshake(tr_peerMsgsImpl* msgs, uint32_t len) tr_variantClear(&val); } -void parseUtMetadata(tr_peerMsgsImpl* msgs, uint32_t msglen) +void parseUtMetadata(tr_peerMsgsImpl* msgs, libtransmission::Buffer& payload_in) { int64_t msg_type = -1; int64_t piece = -1; int64_t total_size = 0; - auto tmp = std::vector{}; - tmp.resize(msglen); - msgs->io->read_bytes(std::data(tmp), std::size(tmp)); - char const* const msg_end = std::data(tmp) + std::size(tmp); + auto const tmp = payload_in.pullup_sv(); + auto const* const msg_end = std::data(tmp) + std::size(tmp); auto dict = tr_variant{}; char const* benc_end = nullptr; @@ -1158,7 +1184,7 @@ void parseUtMetadata(tr_peerMsgsImpl* msgs, uint32_t msglen) } } -void parseUtPex(tr_peerMsgsImpl* msgs, uint32_t msglen) +void parseUtPex(tr_peerMsgsImpl* msgs, libtransmission::Buffer& payload) { auto* const tor = msgs->torrent; if (!tor->allowsPex()) @@ -1166,9 +1192,7 @@ void parseUtPex(tr_peerMsgsImpl* msgs, uint32_t msglen) return; } - auto tmp = std::vector{}; - tmp.resize(msglen); - msgs->io->read_bytes(std::data(tmp), std::size(tmp)); + auto const tmp = payload.pullup_sv(); if (tr_variant val; tr_variantFromBuf(&val, TR_VARIANT_PARSE_BENC | TR_VARIANT_PARSE_INPLACE, tmp)) { @@ -1208,18 +1232,16 @@ void parseUtPex(tr_peerMsgsImpl* msgs, uint32_t msglen) } } -void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen) +void parseLtep(tr_peerMsgsImpl* msgs, libtransmission::Buffer& payload) { - TR_ASSERT(msglen > 0); + TR_ASSERT(!std::empty(payload)); - auto ltep_msgid = uint8_t{}; - msgs->io->read_uint8(<ep_msgid); - msglen--; + auto const ltep_msgid = payload.to_uint8(); if (ltep_msgid == LtepMessages::Handshake) { logtrace(msgs, "got ltep handshake"); - parseLtepHandshake(msgs, msglen); + parseLtepHandshake(msgs, payload); if (msgs->io->supports_ltep()) { @@ -1231,73 +1253,23 @@ void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen) { logtrace(msgs, "got ut pex"); msgs->peerSupportsPex = true; - parseUtPex(msgs, msglen); + parseUtPex(msgs, payload); } else if (ltep_msgid == UT_METADATA_ID) { logtrace(msgs, "got ut metadata"); msgs->peerSupportsMetadataXfer = true; - parseUtMetadata(msgs, msglen); + parseUtMetadata(msgs, payload); } else { logtrace(msgs, fmt::format(FMT_STRING("skipping unknown ltep message ({:d})"), static_cast(ltep_msgid))); - msgs->io->read_buffer_drain(msglen); } } -ReadState readBtLength(tr_peerMsgsImpl* msgs, size_t inlen) -{ - auto len = uint32_t{}; - if (inlen < sizeof(len)) - { - return READ_LATER; - } +using ReadResult = std::pair; - msgs->io->read_uint32(&len); - if (len == 0) /* peer sent us a keepalive message */ - { - logtrace(msgs, "got KeepAlive"); - } - else - { - msgs->incoming.length = len; - msgs->state = AwaitingBt::Id; - } - - return READ_NOW; -} - -ReadState readBtMessage(tr_peerMsgsImpl* /*msgs*/, size_t /*inlen*/); - -ReadState readBtId(tr_peerMsgsImpl* msgs, size_t inlen) -{ - if (inlen < sizeof(uint8_t)) - { - return READ_LATER; - } - - auto id = uint8_t{}; - msgs->io->read_uint8(&id); - msgs->incoming.id = id; - logtrace( - msgs, - fmt::format(FMT_STRING("msgs->incoming.id is now {:d}: msgs->incoming.length is {:d}"), id, msgs->incoming.length)); - - if (id == BtPeerMsgs::Piece) - { - msgs->state = AwaitingBt::Piece; - return READ_NOW; - } - - if (msgs->incoming.length != 1) - { - msgs->state = AwaitingBt::Message; - return READ_NOW; - } - - return readBtMessage(msgs, inlen - 1); -} +ReadResult process_peer_message(tr_peerMsgsImpl* msgs, uint8_t id, libtransmission::Buffer& payload); void prefetchPieces(tr_peerMsgsImpl* msgs) { @@ -1413,109 +1385,74 @@ bool messageLengthIsCorrect(tr_peerMsgsImpl const* msg, uint8_t id, uint32_t len int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr> block_data, tr_block_index_t block); -ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_bytes_read) +ReadResult read_piece_data(tr_peerMsgsImpl* msgs, libtransmission::Buffer& payload) { - TR_ASSERT(msgs->io->read_buffer_size() >= inlen); + // + auto const piece = payload.to_uint32(); + auto const offset = payload.to_uint32(); + auto const len = std::size(payload); - logtrace(msgs, "In readBtPiece"); - - // If this is the first we've seen of the piece data, parse out the header - auto& incoming = msgs->incoming; - if (!incoming.block_req) - { - if (inlen < 8) - { - return READ_LATER; - } - - auto req = peer_request{}; - msgs->io->read_uint32(&req.index); - msgs->io->read_uint32(&req.offset); - req.length = incoming.length - 9; - logtrace(msgs, fmt::format(FMT_STRING("got incoming block header {:d}:{:d}->{:d}"), req.index, req.offset, req.length)); - incoming.block_req = req; - return READ_NOW; - } - - auto& req = incoming.block_req; - auto const loc = msgs->torrent->pieceLoc(req->index, req->offset); + auto const loc = msgs->torrent->pieceLoc(piece, offset); auto const block = loc.block; auto const block_size = msgs->torrent->blockSize(block); - auto const n_this_pass = std::min(size_t{ req->length }, inlen); - TR_ASSERT(loc.block_offset + n_this_pass <= block_size); - if (n_this_pass == 0) + if (loc.block_offset + len > block_size) { - return READ_LATER; + logwarn(msgs, fmt::format("got unaligned piece {:d}:{:d}->{:d}", piece, offset, len)); + return { READ_ERR, len }; } - auto& incoming_block = incoming.blocks.try_emplace(block, block_size).first->second; - msgs->io->read_bytes(std::data(*incoming_block.buf) + loc.block_offset, n_this_pass); - - msgs->publish(tr_peer_event::GotPieceData(n_this_pass)); - *setme_piece_bytes_read += n_this_pass; - incoming_block.have.setSpan(loc.block_offset, loc.block_offset + n_this_pass); - logtrace(msgs, fmt::format("got {:d} bytes for req {:d}:{:d}->{:d}", n_this_pass, req->index, req->offset, req->length)); - - // if we haven't gotten the full response yet, - // update what part of `req` is unfulfilled and wait for more - if (req->length > n_this_pass) + if (!tr_peerMgrDidPeerRequest(msgs->torrent, msgs, block)) { - req->length -= n_this_pass; - auto const new_loc = msgs->torrent->byteLoc(loc.byte + n_this_pass); - req->index = new_loc.piece; - req->offset = new_loc.piece_offset; - return READ_LATER; + logwarn(msgs, fmt::format("got unrequested piece {:d}:{:d}->{:d}", piece, offset, len)); + return { READ_ERR, len }; } - // we've got the entire response message - req.reset(); - msgs->state = AwaitingBt::Length; + auto& blocks = msgs->incoming.blocks; + auto& incoming_block = blocks.try_emplace(block, block_size).first->second; + payload.to_buf(std::data(*incoming_block.buf) + loc.block_offset, len); + msgs->publish(tr_peer_event::GotPieceData(len)); + incoming_block.have.setSpan(loc.block_offset, loc.block_offset + len); + logtrace(msgs, fmt::format("got {:d} bytes for req {:d}:{:d}->{:d}", len, piece, offset, len)); // if we haven't gotten the entire block yet, wait for more if (!incoming_block.have.hasAll()) { - return READ_LATER; + return { READ_LATER, len }; } // we've got the entire block, so send it along. auto block_buf = std::move(incoming_block.buf); - incoming.blocks.erase(block); // note: invalidates `incoming_block` local + blocks.erase(block); // note: invalidates `incoming_block` local auto const ok = clientGotBlock(msgs, std::move(block_buf), block) == 0; - return ok ? READ_NOW : READ_ERR; + return { ok ? READ_NOW : READ_ERR, len }; } -ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) +ReadResult process_peer_message(tr_peerMsgsImpl* msgs, uint8_t id, libtransmission::Buffer& payload) { - uint8_t const id = msgs->incoming.id; -#ifdef TR_ENABLE_ASSERTS - auto const start_buflen = msgs->io->read_buffer_size(); -#endif bool const fext = msgs->io->supports_fext(); auto ui32 = uint32_t{}; - auto msglen = uint32_t{ msgs->incoming.length }; - - TR_ASSERT(msglen > 0); - - --msglen; /* id length */ logtrace( msgs, - fmt::format(FMT_STRING("got BT id {:d}, len {:d}, buffer size is {:d}"), static_cast(id), msglen, inlen)); + fmt::format( + "got peer msg '{:s}' ({:d}) with payload len {:d}", + BtPeerMsgs::debug_name(id), + static_cast(id), + std::size(payload))); - if (inlen < msglen) - { - return READ_LATER; - } - - if (!messageLengthIsCorrect(msgs, id, msglen + 1)) + if (!messageLengthIsCorrect(msgs, id, sizeof(id) + std::size(payload))) { logdbg( msgs, - fmt::format(FMT_STRING("bad packet - BT message #{:d} with a length of {:d}"), static_cast(id), msglen)); + fmt::format( + "bad msg: '{:s}' ({:d}) with payload len {:d}", + BtPeerMsgs::debug_name(id), + static_cast(id), + std::size(payload))); msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return READ_ERR; + return { READ_ERR, {} }; } switch (id) @@ -1552,13 +1489,13 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) break; case BtPeerMsgs::Have: - msgs->io->read_uint32(&ui32); + ui32 = payload.to_uint32(); logtrace(msgs, fmt::format(FMT_STRING("got Have: {:d}"), ui32)); if (msgs->torrent->hasMetainfo() && ui32 >= msgs->torrent->pieceCount()) { msgs->publish(tr_peer_event::GotError(ERANGE)); - return READ_ERR; + return { READ_ERR, {} }; } /* a peer can send the same HAVE message twice... */ @@ -1574,10 +1511,9 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) case BtPeerMsgs::Bitfield: { logtrace(msgs, "got a bitfield"); - auto tmp = std::vector(msglen); - msgs->io->read_bytes(std::data(tmp), std::size(tmp)); - msgs->have_ = tr_bitfield{ msgs->torrent->hasMetainfo() ? msgs->torrent->pieceCount() : std::size(tmp) * 8 }; - msgs->have_.setRaw(std::data(tmp), std::size(tmp)); + auto const [buf, buflen] = payload.pullup(); + msgs->have_ = tr_bitfield{ msgs->torrent->hasMetainfo() ? msgs->torrent->pieceCount() : buflen * 8 }; + msgs->have_.setRaw(reinterpret_cast(buf), buflen); msgs->publish(tr_peer_event::GotBitfield(&msgs->have_)); msgs->invalidatePercentDone(); break; @@ -1586,9 +1522,9 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) case BtPeerMsgs::Request: { struct peer_request r; - msgs->io->read_uint32(&r.index); - msgs->io->read_uint32(&r.offset); - msgs->io->read_uint32(&r.length); + r.index = payload.to_uint32(); + r.offset = payload.to_uint32(); + r.length = payload.to_uint32(); logtrace(msgs, fmt::format(FMT_STRING("got Request: {:d}:{:d}->{:d}"), r.index, r.offset, r.length)); peerMadeRequest(msgs, &r); break; @@ -1597,9 +1533,9 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) case BtPeerMsgs::Cancel: { struct peer_request r; - msgs->io->read_uint32(&r.index); - msgs->io->read_uint32(&r.offset); - msgs->io->read_uint32(&r.length); + r.index = payload.to_uint32(); + r.offset = payload.to_uint32(); + r.length = payload.to_uint32(); msgs->cancels_sent_to_client.add(tr_time(), 1); logtrace(msgs, fmt::format(FMT_STRING("got a Cancel {:d}:{:d}->{:d}"), r.index, r.offset, r.length)); @@ -1620,7 +1556,7 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) } case BtPeerMsgs::Piece: - TR_ASSERT(false); /* handled elsewhere! */ + return read_piece_data(msgs, payload); break; case BtPeerMsgs::Port: @@ -1633,8 +1569,7 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) { logtrace(msgs, "Got a BtPeerMsgs::Port"); - auto hport = uint16_t{}; - msgs->io->read_uint16(&hport); // read_uint16 performs ntoh + auto const hport = payload.to_uint16(); if (auto const dht_port = tr_port::fromHost(hport); !std::empty(dht_port)) { msgs->dht_port = dht_port; @@ -1645,32 +1580,32 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) case BtPeerMsgs::FextSuggest: logtrace(msgs, "Got a BtPeerMsgs::FextSuggest"); - msgs->io->read_uint32(&ui32); if (fext) { - msgs->publish(tr_peer_event::GotSuggest(ui32)); + auto const piece = payload.to_uint32(); + msgs->publish(tr_peer_event::GotSuggest(piece)); } else { msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return READ_ERR; + return { READ_ERR, {} }; } break; case BtPeerMsgs::FextAllowedFast: logtrace(msgs, "Got a BtPeerMsgs::FextAllowedFast"); - msgs->io->read_uint32(&ui32); if (fext) { - msgs->publish(tr_peer_event::GotAllowedFast(ui32)); + auto const piece = payload.to_uint32(); + msgs->publish(tr_peer_event::GotAllowedFast(piece)); } else { msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return READ_ERR; + return { READ_ERR, {} }; } break; @@ -1687,7 +1622,7 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) else { msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return READ_ERR; + return { READ_ERR, {} }; } break; @@ -1704,7 +1639,7 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) else { msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return READ_ERR; + return { READ_ERR, {} }; } break; @@ -1712,10 +1647,9 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) case BtPeerMsgs::FextReject: { struct peer_request r; - logtrace(msgs, "Got a BtPeerMsgs::FextReject"); - msgs->io->read_uint32(&r.index); - msgs->io->read_uint32(&r.offset); - msgs->io->read_uint32(&r.length); + r.index = payload.to_uint32(); + r.offset = payload.to_uint32(); + r.length = payload.to_uint32(); if (fext) { @@ -1725,7 +1659,7 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) else { msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return READ_ERR; + return { READ_ERR, {} }; } break; @@ -1733,20 +1667,15 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) case BtPeerMsgs::Ltep: logtrace(msgs, "Got a BtPeerMsgs::Ltep"); - parseLtep(msgs, msglen); + parseLtep(msgs, payload); break; default: logtrace(msgs, fmt::format(FMT_STRING("peer sent us an UNKNOWN: {:d}"), static_cast(id))); - msgs->io->read_buffer_drain(msglen); break; } - TR_ASSERT(msglen + 1 == msgs->incoming.length); - TR_ASSERT(msgs->io->read_buffer_size() == start_buflen - msglen); - - msgs->state = AwaitingBt::Length; - return READ_NOW; + return { READ_NOW, {} }; } /* returns 0 on success, or an errno on failure */ @@ -1812,48 +1741,81 @@ void didWrite(tr_peerIo* /*io*/, size_t bytes_written, bool was_piece_data, void ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece) { auto* msgs = static_cast(vmsgs); - size_t const inlen = io->read_buffer_size(); - logtrace( - msgs, - fmt::format(FMT_STRING("canRead: inlen is {:d}, msgs->state is {:d}"), inlen, static_cast(msgs->state))); + // https://www.bittorrent.org/beps/bep_0003.html + // Next comes an alternating stream of length prefixes and messages. + // Messages of length zero are keepalives, and ignored. + // All non-keepalive messages start with a single byte which gives their type. + // + // https://wiki.theory.org/BitTorrentSpecification + // All of the remaining messages in the protocol take the form of + // . The length prefix is a four byte + // big-endian value. The message ID is a single decimal byte. + // The payload is message dependent. - auto ret = ReadState{}; - if (inlen == 0) + // read + auto& current_message_len = msgs->incoming.length; // the full message payload length. Includes the +1 for id length + if (!current_message_len) { - ret = READ_LATER; - } - else if (msgs->state == AwaitingBt::Piece) - { - ret = readBtPiece(msgs, inlen, piece); - } - else - { - switch (msgs->state) + auto message_len = uint32_t{}; + if (io->read_buffer_size() < sizeof(message_len)) { - case AwaitingBt::Length: - ret = readBtLength(msgs, inlen); - break; + return READ_LATER; + } - case AwaitingBt::Id: - ret = readBtId(msgs, inlen); - break; + io->read_uint32(&message_len); + current_message_len = message_len; - case AwaitingBt::Message: - ret = readBtMessage(msgs, inlen); - break; - - default: -#ifdef TR_ENABLE_ASSERTS - TR_ASSERT_MSG(false, fmt::format(FMT_STRING("unhandled peer messages state {:d}"), static_cast(msgs->state))); -#else - ret = READ_ERR; - break; -#endif + // The keep-alive message is a message with zero bytes, + // specified with the length prefix set to zero. + // There is no message ID and no payload. + if (auto const is_keepalive = message_len == uint32_t{}; is_keepalive) + { + logtrace(msgs, "got KeepAlive"); + current_message_len.reset(); + return READ_NOW; } } - return ret; + // read + auto& current_message_type = msgs->incoming.id; + if (!current_message_type) + { + auto message_type = uint8_t{}; + if (io->read_buffer_size() < sizeof(message_type)) + { + return READ_LATER; + } + + io->read_uint8(&message_type); + current_message_type = message_type; + } + + // read + auto& payload = msgs->incoming.payload; + auto const full_payload_len = *current_message_len - sizeof(uint8_t /*message_type*/); + auto n_left = full_payload_len - std::size(payload); + while (n_left > 0U && io->read_buffer_size() > 0U) + { + auto buf = std::array{}; + auto const n_this_pass = std::min({ n_left, io->read_buffer_size(), std::size(buf) }); + io->read_bytes(std::data(buf), n_this_pass); + payload.add(std::data(buf), n_this_pass); + n_left -= n_this_pass; + logtrace(msgs, fmt::format("read {:d} payload bytes; {:d} left to go", n_this_pass, n_left)); + } + + if (n_left > 0U) + { + return READ_LATER; + } + + auto const [read_state, n_piece_bytes_read] = process_peer_message(msgs, *current_message_type, payload); + current_message_type.reset(); + current_message_len.reset(); + payload.clear(); + *piece = n_piece_bytes_read; + return read_state; } // --- diff --git a/libtransmission/tr-buffer.h b/libtransmission/tr-buffer.h index b6c25f59e..5dcb4b599 100644 --- a/libtransmission/tr-buffer.h +++ b/libtransmission/tr-buffer.h @@ -10,6 +10,7 @@ #include #include #include +#include #include @@ -197,6 +198,13 @@ public: return evbuffer_remove(buf_.get(), tgt, n_bytes); } + [[nodiscard]] auto to_uint8() + { + auto tmp = uint8_t{}; + to_buf(&tmp, sizeof(tmp)); + return tmp; + } + [[nodiscard]] uint16_t to_uint16() { auto tmp = uint16_t{}; @@ -247,6 +255,12 @@ public: return { reinterpret_cast(evbuffer_pullup(buf_.get(), -1)), size() }; } + [[nodiscard]] auto pullup_sv() + { + auto const [buf, buflen] = pullup(); + return std::string_view{ reinterpret_cast(buf), buflen }; + } + void reserve(size_t n_bytes) { evbuffer_expand(buf_.get(), n_bytes - size());