mirror of
https://github.com/transmission/transmission.git
synced 2025-12-20 02:18:42 +00:00
* chore: housekeeping * refactor: allow downloading torrents to steal connection slots To be exact, the way it works is that downloading torrents will be allowed to start outgoing connections despite the session peer limit is reached. The periodic peer limit enforcements should keep the downloading connections and remove the idle connections in seeding torrents. * refactor: prefer peers from downloading torrents when enforcing peer limit
777 lines
20 KiB
C++
777 lines
20 KiB
C++
// This file Copyright © Mnemosyne LLC.
|
|
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
|
|
// or any future license endorsed by Mnemosyne LLC.
|
|
// License text can be found in the licenses/ folder.
|
|
|
|
#include <cerrno>
|
|
#include <cstdint>
|
|
#include <functional>
|
|
#include <type_traits> // std::underlying_type_t
|
|
|
|
#ifdef _WIN32
|
|
#include <ws2tcpip.h>
|
|
#else
|
|
#include <arpa/inet.h> // ntohl, ntohs
|
|
#endif
|
|
|
|
#include <event2/event.h>
|
|
|
|
#include <libutp/utp.h>
|
|
|
|
#include <fmt/format.h>
|
|
|
|
#include "libtransmission/transmission.h"
|
|
|
|
#include "libtransmission/bandwidth.h"
|
|
#include "libtransmission/block-info.h" // tr_block_info
|
|
#include "libtransmission/error.h"
|
|
#include "libtransmission/log.h"
|
|
#include "libtransmission/net.h"
|
|
#include "libtransmission/peer-io.h"
|
|
#include "libtransmission/peer-socket.h" // tr_peer_socket, tr_netOpen...
|
|
#include "libtransmission/session.h"
|
|
#include "libtransmission/tr-assert.h"
|
|
#include "libtransmission/utils.h" // for _()
|
|
|
|
struct sockaddr;
|
|
|
|
#define tr_logAddErrorIo(io, msg) tr_logAddError(msg, (io)->display_name())
|
|
#define tr_logAddWarnIo(io, msg) tr_logAddWarn(msg, (io)->display_name())
|
|
#define tr_logAddDebugIo(io, msg) tr_logAddDebug(msg, (io)->display_name())
|
|
#define tr_logAddTraceIo(io, msg) tr_logAddTrace(msg, (io)->display_name())
|
|
|
|
namespace
|
|
{
|
|
// Helps us to ignore errors that say "try again later"
|
|
// since that's what peer-io does by default anyway.
|
|
[[nodiscard]] constexpr auto can_retry_from_error(int error_code) noexcept
|
|
{
|
|
#ifdef _WIN32
|
|
return error_code == 0 || error_code == WSAEWOULDBLOCK || error_code == WSAEINTR || error_code == WSAEINPROGRESS;
|
|
#else
|
|
return error_code == 0 || error_code == EAGAIN || error_code == EWOULDBLOCK || error_code == EINTR ||
|
|
error_code == EINPROGRESS;
|
|
#endif
|
|
}
|
|
|
|
size_t get_desired_output_buffer_size(tr_peerIo const* io, uint64_t now)
|
|
{
|
|
// this is all kind of arbitrary, but what seems to work well is
|
|
// being large enough to hold the next 20 seconds' worth of input,
|
|
// or a few blocks, whichever is bigger. OK to tweak this as needed.
|
|
static auto constexpr PeriodSecs = uint64_t{ 15U };
|
|
|
|
// the 3 is an arbitrary number of blocks;
|
|
// the .5 is to leave room for protocol messages
|
|
static auto constexpr Floor = static_cast<uint64_t>(tr_block_info::BlockSize * 3.5);
|
|
|
|
auto const current_speed = io->get_piece_speed(now, TR_UP);
|
|
return std::max(Floor, current_speed.base_quantity() * PeriodSecs);
|
|
}
|
|
} // namespace
|
|
|
|
// ---
|
|
|
|
tr_peerIo::tr_peerIo(
|
|
tr_session* session,
|
|
tr_peer_socket&& socket,
|
|
tr_bandwidth* parent_bandwidth,
|
|
tr_sha1_digest_t const* info_hash,
|
|
bool is_incoming,
|
|
bool client_is_seed)
|
|
: bandwidth_{ parent_bandwidth }
|
|
, info_hash_{ info_hash != nullptr ? *info_hash : tr_sha1_digest_t{} }
|
|
, session_{ session }
|
|
, client_is_seed_{ client_is_seed }
|
|
, is_incoming_{ is_incoming }
|
|
{
|
|
set_socket(std::move(socket));
|
|
}
|
|
|
|
std::shared_ptr<tr_peerIo> tr_peerIo::create(
|
|
tr_session* session,
|
|
tr_peer_socket&& socket,
|
|
tr_bandwidth* parent,
|
|
tr_sha1_digest_t const* info_hash,
|
|
bool is_incoming,
|
|
bool is_seed)
|
|
{
|
|
TR_ASSERT(session != nullptr);
|
|
auto lock = session->unique_lock();
|
|
|
|
auto io = std::make_shared<tr_peerIo>(session, std::move(socket), parent, info_hash, is_incoming, is_seed);
|
|
io->bandwidth().set_peer(io);
|
|
tr_logAddTraceIo(io, fmt::format("bandwidth is {}; its parent is {}", fmt::ptr(&io->bandwidth()), fmt::ptr(parent)));
|
|
return io;
|
|
}
|
|
|
|
std::shared_ptr<tr_peerIo> tr_peerIo::new_incoming(tr_session* session, tr_bandwidth* parent, tr_peer_socket socket)
|
|
{
|
|
TR_ASSERT(session != nullptr);
|
|
return tr_peerIo::create(session, std::move(socket), parent, nullptr, true, false);
|
|
}
|
|
|
|
std::shared_ptr<tr_peerIo> tr_peerIo::new_outgoing(
|
|
tr_session* session,
|
|
tr_bandwidth* parent,
|
|
tr_socket_address const& socket_address,
|
|
tr_sha1_digest_t const& info_hash,
|
|
bool client_is_seed,
|
|
bool utp)
|
|
{
|
|
TR_ASSERT(session != nullptr);
|
|
TR_ASSERT(socket_address.is_valid());
|
|
TR_ASSERT(utp || session->allowsTCP());
|
|
|
|
// N.B. This array needs to be kept in the same order as
|
|
// the tr_preferred_transport enum.
|
|
auto const get_socket = std::array<std::function<tr_peer_socket()>, TR_NUM_PREFERRED_TRANSPORT>{
|
|
[&]() -> tr_peer_socket
|
|
{
|
|
#ifdef WITH_UTP
|
|
if (utp)
|
|
{
|
|
auto* const sock = utp_create_socket(session->utp_context);
|
|
auto const [ss, sslen] = socket_address.to_sockaddr();
|
|
if (utp_connect(sock, reinterpret_cast<sockaddr const*>(&ss), sslen) == 0)
|
|
{
|
|
return { socket_address, sock };
|
|
}
|
|
}
|
|
#endif
|
|
return {};
|
|
},
|
|
[&]() -> tr_peer_socket
|
|
{
|
|
if (auto sock = tr_net_open_peer_socket(session, socket_address, client_is_seed); sock != TR_BAD_SOCKET)
|
|
{
|
|
return { session, socket_address, sock };
|
|
}
|
|
return {};
|
|
}
|
|
};
|
|
|
|
for (auto const& transport : session->preferred_transport())
|
|
{
|
|
if (auto sock = get_socket[transport](); sock.is_valid())
|
|
{
|
|
return tr_peerIo::create(session, std::move(sock), parent, &info_hash, false, client_is_seed);
|
|
}
|
|
}
|
|
|
|
return {};
|
|
}
|
|
|
|
tr_peerIo::~tr_peerIo()
|
|
{
|
|
auto const lock = session_->unique_lock();
|
|
|
|
clear_callbacks();
|
|
tr_logAddTraceIo(this, "in tr_peerIo destructor");
|
|
event_disable(EV_READ | EV_WRITE);
|
|
close();
|
|
}
|
|
|
|
// ---
|
|
|
|
void tr_peerIo::set_socket(tr_peer_socket socket_in)
|
|
{
|
|
close(); // tear down the previous socket, if any
|
|
|
|
socket_ = std::move(socket_in);
|
|
|
|
if (socket_.is_tcp())
|
|
{
|
|
event_read_.reset(event_new(session_->event_base(), socket_.handle.tcp, EV_READ, &tr_peerIo::event_read_cb, this));
|
|
event_write_.reset(event_new(session_->event_base(), socket_.handle.tcp, EV_WRITE, &tr_peerIo::event_write_cb, this));
|
|
}
|
|
#ifdef WITH_UTP
|
|
else if (socket_.is_utp())
|
|
{
|
|
utp_set_userdata(socket_.handle.utp, this);
|
|
}
|
|
#endif
|
|
else
|
|
{
|
|
TR_ASSERT_MSG(false, "unsupported peer socket type");
|
|
}
|
|
}
|
|
|
|
void tr_peerIo::close()
|
|
{
|
|
socket_.close();
|
|
event_write_.reset();
|
|
event_read_.reset();
|
|
inbuf_.clear();
|
|
outbuf_.clear();
|
|
outbuf_info_.clear();
|
|
encrypt_disable();
|
|
decrypt_disable();
|
|
}
|
|
|
|
void tr_peerIo::clear()
|
|
{
|
|
clear_callbacks();
|
|
set_enabled(TR_UP, false);
|
|
set_enabled(TR_DOWN, false);
|
|
close();
|
|
}
|
|
|
|
bool tr_peerIo::reconnect()
|
|
{
|
|
TR_ASSERT(!is_incoming());
|
|
TR_ASSERT(session_->allowsTCP());
|
|
|
|
auto const pending_events = pending_events_;
|
|
event_disable(EV_READ | EV_WRITE);
|
|
|
|
close();
|
|
|
|
auto const s = tr_net_open_peer_socket(session_, socket_address(), client_is_seed());
|
|
if (s == TR_BAD_SOCKET)
|
|
{
|
|
return false;
|
|
}
|
|
set_socket({ session_, socket_address(), s });
|
|
|
|
event_enable(pending_events);
|
|
|
|
return true;
|
|
}
|
|
|
|
// ---
|
|
|
|
void tr_peerIo::did_write_wrapper(size_t bytes_transferred)
|
|
{
|
|
auto const keep_alive = shared_from_this();
|
|
auto const now = tr_time_msec();
|
|
|
|
if (bytes_transferred > 0U)
|
|
{
|
|
bandwidth().notify_bandwidth_consumed(TR_UP, bytes_transferred, false, now);
|
|
}
|
|
|
|
while (bytes_transferred > 0U && !std::empty(outbuf_info_))
|
|
{
|
|
auto& [n_bytes_left, is_piece_data] = outbuf_info_.front();
|
|
auto const payload = std::min(n_bytes_left, bytes_transferred);
|
|
|
|
if (is_piece_data)
|
|
{
|
|
bandwidth().notify_bandwidth_consumed(TR_UP, payload, true, now);
|
|
}
|
|
|
|
if (did_write_ != nullptr)
|
|
{
|
|
did_write_(this, payload, is_piece_data, user_data_);
|
|
}
|
|
|
|
bytes_transferred -= payload;
|
|
n_bytes_left -= payload;
|
|
if (n_bytes_left == 0U)
|
|
{
|
|
outbuf_info_.pop_front();
|
|
}
|
|
}
|
|
}
|
|
|
|
size_t tr_peerIo::try_write(size_t max)
|
|
{
|
|
static auto constexpr Dir = TR_UP;
|
|
|
|
if (max == 0U)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
auto& buf = outbuf_;
|
|
max = std::min(max, std::size(buf));
|
|
max = bandwidth().clamp(Dir, max);
|
|
if (max == 0U)
|
|
{
|
|
set_enabled(Dir, false);
|
|
return {};
|
|
}
|
|
|
|
auto error = tr_error{};
|
|
auto const n_written = socket_.try_write(buf, max, &error);
|
|
// enable further writes if there's more data to write
|
|
set_enabled(Dir, !std::empty(buf) && (!error || can_retry_from_error(error.code())));
|
|
|
|
if (error)
|
|
{
|
|
if (!can_retry_from_error(error.code()))
|
|
{
|
|
tr_logAddTraceIo(
|
|
this,
|
|
fmt::format("try_write err: wrote:{}, errno:{} ({})", n_written, error.code(), error.message()));
|
|
call_error_callback(error);
|
|
}
|
|
}
|
|
else if (n_written > 0U)
|
|
{
|
|
did_write_wrapper(n_written);
|
|
}
|
|
|
|
return n_written;
|
|
}
|
|
|
|
void tr_peerIo::event_write_cb([[maybe_unused]] evutil_socket_t fd, short /*event*/, void* vio)
|
|
{
|
|
auto* const io = static_cast<tr_peerIo*>(vio);
|
|
tr_logAddTraceIo(io, "libevent says this peer socket is ready for writing");
|
|
|
|
TR_ASSERT(io->socket_.is_tcp());
|
|
TR_ASSERT(io->socket_.handle.tcp == fd);
|
|
|
|
io->pending_events_ &= ~EV_WRITE;
|
|
|
|
// Write as much as possible. Since the socket is non-blocking,
|
|
// write() will return if it can't write any more without blocking
|
|
io->try_write(SIZE_MAX);
|
|
}
|
|
|
|
// ---
|
|
|
|
void tr_peerIo::can_read_wrapper(size_t bytes_transferred)
|
|
{
|
|
// try to consume the input buffer
|
|
|
|
if (can_read_ == nullptr)
|
|
{
|
|
return;
|
|
}
|
|
|
|
auto const lock = session_->unique_lock();
|
|
auto const keep_alive = shared_from_this();
|
|
|
|
auto const now = tr_time_msec();
|
|
auto done = false;
|
|
auto err = false;
|
|
|
|
if (socket_.is_tcp() && bytes_transferred > 0U)
|
|
{
|
|
bandwidth().notify_bandwidth_consumed(TR_DOWN, bytes_transferred, false, now);
|
|
}
|
|
|
|
// In normal conditions, only continue processing if we still have bandwidth
|
|
// quota for it.
|
|
//
|
|
// The read buffer will grow indefinitely if libutp keeps buffering data faster
|
|
// than the bandwidth limit allows. To safeguard against that, we keep
|
|
// processing if the read buffer is more than twice as large as the target size.
|
|
while (!done && !err &&
|
|
(socket_.is_tcp() || read_buffer_size() > RcvBuf * 2U || bandwidth().clamp(TR_DOWN, read_buffer_size()) != 0U))
|
|
{
|
|
auto piece = size_t{};
|
|
auto const old_size = read_buffer_size();
|
|
auto const read_state = can_read_ != nullptr ? can_read_(this, user_data_, &piece) : ReadState::Err;
|
|
auto const used = old_size - read_buffer_size();
|
|
|
|
if (piece > 0U)
|
|
{
|
|
bandwidth().notify_bandwidth_consumed(TR_DOWN, piece, true, now);
|
|
}
|
|
|
|
if (socket_.is_utp() && used > 0U)
|
|
{
|
|
bandwidth().notify_bandwidth_consumed(TR_DOWN, used, false, now);
|
|
}
|
|
|
|
switch (read_state)
|
|
{
|
|
case ReadState::Now:
|
|
case ReadState::Break:
|
|
if (std::empty(inbuf_))
|
|
{
|
|
done = true;
|
|
}
|
|
break;
|
|
|
|
case ReadState::Later:
|
|
done = true;
|
|
break;
|
|
|
|
case ReadState::Err:
|
|
err = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
size_t tr_peerIo::try_read(size_t max)
|
|
{
|
|
static auto constexpr Dir = TR_DOWN;
|
|
|
|
if (max == 0U)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
// Do not write more than the bandwidth allows.
|
|
// If there is no bandwidth left available, disable writes.
|
|
max = bandwidth().clamp(Dir, max);
|
|
if (max == 0U)
|
|
{
|
|
set_enabled(Dir, false);
|
|
return {};
|
|
}
|
|
|
|
auto& buf = inbuf_;
|
|
auto error = tr_error{};
|
|
auto const n_read = socket_.try_read(buf, max, std::empty(buf), &error);
|
|
set_enabled(Dir, !error || can_retry_from_error(error.code()));
|
|
|
|
if (error)
|
|
{
|
|
if (!can_retry_from_error(error.code()))
|
|
{
|
|
tr_logAddTraceIo(this, fmt::format("try_read err: n_read:{} errno:{} ({})", n_read, error.code(), error.message()));
|
|
call_error_callback(error);
|
|
}
|
|
}
|
|
else if (!std::empty(buf))
|
|
{
|
|
can_read_wrapper(n_read);
|
|
}
|
|
|
|
return n_read;
|
|
}
|
|
|
|
void tr_peerIo::event_read_cb([[maybe_unused]] evutil_socket_t fd, short /*event*/, void* vio)
|
|
{
|
|
static auto constexpr MaxLen = RcvBuf;
|
|
|
|
auto* const io = static_cast<tr_peerIo*>(vio);
|
|
tr_logAddTraceIo(io, "libevent says this peer socket is ready for reading");
|
|
|
|
TR_ASSERT(io->socket_.is_tcp());
|
|
TR_ASSERT(io->socket_.handle.tcp == fd);
|
|
|
|
io->pending_events_ &= ~EV_READ;
|
|
|
|
// if we don't have any bandwidth left, stop reading
|
|
auto const n_used = std::size(io->inbuf_);
|
|
auto const n_left = n_used >= MaxLen ? 0U : MaxLen - n_used;
|
|
io->try_read(n_left);
|
|
}
|
|
|
|
// ---
|
|
|
|
void tr_peerIo::event_enable(short event)
|
|
{
|
|
TR_ASSERT(session_ != nullptr);
|
|
|
|
bool const need_events = socket_.is_tcp();
|
|
TR_ASSERT(!need_events || event_read_);
|
|
TR_ASSERT(!need_events || event_write_);
|
|
|
|
if ((event & EV_READ) != 0 && (pending_events_ & EV_READ) == 0)
|
|
{
|
|
tr_logAddTraceIo(this, "enabling ready-to-read polling");
|
|
|
|
if (need_events)
|
|
{
|
|
event_add(event_read_.get(), nullptr);
|
|
}
|
|
|
|
pending_events_ |= EV_READ;
|
|
}
|
|
|
|
if ((event & EV_WRITE) != 0 && (pending_events_ & EV_WRITE) == 0)
|
|
{
|
|
tr_logAddTraceIo(this, "enabling ready-to-write polling");
|
|
|
|
if (need_events)
|
|
{
|
|
event_add(event_write_.get(), nullptr);
|
|
}
|
|
|
|
pending_events_ |= EV_WRITE;
|
|
}
|
|
}
|
|
|
|
void tr_peerIo::event_disable(short event)
|
|
{
|
|
bool const need_events = socket_.is_tcp();
|
|
TR_ASSERT(!need_events || event_read_);
|
|
TR_ASSERT(!need_events || event_write_);
|
|
|
|
if ((event & EV_READ) != 0 && (pending_events_ & EV_READ) != 0)
|
|
{
|
|
tr_logAddTraceIo(this, "disabling ready-to-read polling");
|
|
|
|
if (need_events)
|
|
{
|
|
event_del(event_read_.get());
|
|
}
|
|
|
|
pending_events_ &= ~EV_READ;
|
|
}
|
|
|
|
if ((event & EV_WRITE) != 0 && (pending_events_ & EV_WRITE) != 0)
|
|
{
|
|
tr_logAddTraceIo(this, "disabling ready-to-write polling");
|
|
|
|
if (need_events)
|
|
{
|
|
event_del(event_write_.get());
|
|
}
|
|
|
|
pending_events_ &= ~EV_WRITE;
|
|
}
|
|
}
|
|
|
|
void tr_peerIo::set_enabled(tr_direction dir, bool is_enabled)
|
|
{
|
|
TR_ASSERT(tr_isDirection(dir));
|
|
|
|
short const event = dir == TR_UP ? EV_WRITE : EV_READ;
|
|
|
|
if (is_enabled)
|
|
{
|
|
event_enable(event);
|
|
}
|
|
else
|
|
{
|
|
event_disable(event);
|
|
}
|
|
}
|
|
|
|
size_t tr_peerIo::flush(tr_direction dir, size_t limit)
|
|
{
|
|
TR_ASSERT(tr_isDirection(dir));
|
|
|
|
return dir == TR_DOWN ? try_read(limit) : try_write(limit);
|
|
}
|
|
|
|
size_t tr_peerIo::flush_outgoing_protocol_msgs()
|
|
{
|
|
size_t byte_count = 0U;
|
|
|
|
/* count up how many bytes are used by non-piece-data messages
|
|
at the front of our outbound queue */
|
|
for (auto const& [n_bytes, is_piece_data] : outbuf_info_)
|
|
{
|
|
if (is_piece_data)
|
|
{
|
|
break;
|
|
}
|
|
|
|
byte_count += n_bytes;
|
|
}
|
|
|
|
return flush(TR_UP, byte_count);
|
|
}
|
|
|
|
void tr_peerIo::write_bytes(void const* bytes, size_t n_bytes, bool is_piece_data)
|
|
{
|
|
if (n_bytes == 0U)
|
|
{
|
|
return;
|
|
}
|
|
|
|
outbuf_info_.emplace_back(n_bytes, is_piece_data);
|
|
|
|
auto [resbuf, reslen] = outbuf_.reserve_space(n_bytes);
|
|
filter_.encrypt(reinterpret_cast<std::byte const*>(bytes), n_bytes, resbuf);
|
|
outbuf_.commit_space(n_bytes);
|
|
|
|
session_->queue_session_thread(
|
|
[ptr = std::weak_ptr{ shared_from_this() }]()
|
|
{
|
|
if (auto io = ptr.lock(); io)
|
|
{
|
|
io->try_write(SIZE_MAX);
|
|
}
|
|
});
|
|
}
|
|
|
|
// ---
|
|
|
|
size_t tr_peerIo::get_write_buffer_space(uint64_t now) const noexcept
|
|
{
|
|
size_t const desired_len = get_desired_output_buffer_size(this, now);
|
|
size_t const current_len = std::size(outbuf_);
|
|
return desired_len > current_len ? desired_len - current_len : 0U;
|
|
}
|
|
|
|
// ---
|
|
|
|
void tr_peerIo::read_bytes(void* bytes, size_t n_bytes)
|
|
{
|
|
auto walk = reinterpret_cast<std::byte*>(bytes);
|
|
n_bytes = std::min(n_bytes, std::size(inbuf_));
|
|
if (n_decrypt_remain_)
|
|
{
|
|
if (auto& n_remain = *n_decrypt_remain_; n_remain <= n_bytes)
|
|
{
|
|
filter_.decrypt(std::data(inbuf_), n_remain, walk);
|
|
inbuf_.drain(n_remain);
|
|
if (walk != nullptr)
|
|
{
|
|
walk += n_remain;
|
|
}
|
|
n_bytes -= n_remain;
|
|
filter_.decrypt_disable();
|
|
n_decrypt_remain_.reset();
|
|
}
|
|
else
|
|
{
|
|
n_remain -= n_bytes;
|
|
}
|
|
}
|
|
filter_.decrypt(std::data(inbuf_), n_bytes, walk);
|
|
inbuf_.drain(n_bytes);
|
|
}
|
|
|
|
void tr_peerIo::read_uint16(uint16_t* setme)
|
|
{
|
|
auto tmp = uint16_t{};
|
|
read_bytes(&tmp, sizeof(tmp));
|
|
*setme = ntohs(tmp);
|
|
}
|
|
|
|
void tr_peerIo::read_uint32(uint32_t* setme)
|
|
{
|
|
auto tmp = uint32_t{};
|
|
read_bytes(&tmp, sizeof(tmp));
|
|
*setme = ntohl(tmp);
|
|
}
|
|
|
|
// --- UTP
|
|
|
|
#ifdef WITH_UTP
|
|
|
|
void tr_peerIo::on_utp_state_change(int state)
|
|
{
|
|
if (state == UTP_STATE_CONNECT)
|
|
{
|
|
tr_logAddTraceIo(this, "utp_on_state_change -- changed to connected");
|
|
}
|
|
else if (state == UTP_STATE_WRITABLE)
|
|
{
|
|
tr_logAddTraceIo(this, "utp_on_state_change -- changed to writable");
|
|
|
|
if ((pending_events_ & EV_WRITE) != 0)
|
|
{
|
|
try_write(SIZE_MAX);
|
|
}
|
|
}
|
|
else if (state == UTP_STATE_EOF)
|
|
{
|
|
auto error = tr_error{};
|
|
error.set_from_errno(ENOTCONN);
|
|
call_error_callback(error);
|
|
}
|
|
else if (state == UTP_STATE_DESTROYING)
|
|
{
|
|
tr_logAddErrorIo(this, "Impossible state UTP_STATE_DESTROYING");
|
|
}
|
|
else
|
|
{
|
|
tr_logAddErrorIo(this, fmt::format(fmt::runtime(_("Unknown state: {state}")), fmt::arg("state", state)));
|
|
}
|
|
}
|
|
|
|
void tr_peerIo::on_utp_error(int errcode)
|
|
{
|
|
tr_logAddTraceIo(this, fmt::format("utp_on_error -- {}", utp_error_code_names[errcode]));
|
|
|
|
if (got_error_ == nullptr)
|
|
{
|
|
return;
|
|
}
|
|
|
|
auto error = tr_error{};
|
|
switch (errcode)
|
|
{
|
|
case UTP_ECONNREFUSED:
|
|
error.set_from_errno(ECONNREFUSED);
|
|
break;
|
|
case UTP_ECONNRESET:
|
|
error.set_from_errno(ECONNRESET);
|
|
break;
|
|
case UTP_ETIMEDOUT:
|
|
error.set_from_errno(ETIMEDOUT);
|
|
break;
|
|
default:
|
|
error.set(errcode, utp_error_code_names[errcode]);
|
|
break;
|
|
}
|
|
|
|
call_error_callback(error);
|
|
}
|
|
|
|
#endif /* #ifdef WITH_UTP */
|
|
|
|
void tr_peerIo::utp_init([[maybe_unused]] struct_utp_context* ctx)
|
|
{
|
|
#ifdef WITH_UTP
|
|
utp_context_set_option(ctx, UTP_RCVBUF, RcvBuf);
|
|
|
|
// note: all the callback handlers here need to check `userdata` for nullptr
|
|
// because libutp can fire callbacks on a socket after utp_close() is called
|
|
|
|
utp_set_callback(
|
|
ctx,
|
|
UTP_ON_READ,
|
|
[](utp_callback_arguments* args) -> uint64
|
|
{
|
|
if (auto* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
|
|
{
|
|
// The peer io object can destruct inside can_read_wrapper(), so keep
|
|
// it alive for the duration of this code block. This can happen when
|
|
// a BT handshake did not complete successfully for example.
|
|
auto const keep_alive = io->shared_from_this();
|
|
|
|
io->inbuf_.add(args->buf, args->len);
|
|
io->set_enabled(TR_DOWN, true);
|
|
io->can_read_wrapper(args->len);
|
|
|
|
// utp_read_drained() notifies libutp that we read a packet from them.
|
|
// It opens up the congestion window by sending an ACK (soonish) if
|
|
// one was not going to be sent.
|
|
utp_read_drained(args->socket);
|
|
}
|
|
return {};
|
|
});
|
|
|
|
utp_set_callback(
|
|
ctx,
|
|
UTP_GET_READ_BUFFER_SIZE,
|
|
[](utp_callback_arguments* args) -> uint64
|
|
{
|
|
if (auto const* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
|
|
{
|
|
return io->read_buffer_size();
|
|
}
|
|
return {};
|
|
});
|
|
|
|
utp_set_callback(
|
|
ctx,
|
|
UTP_ON_ERROR,
|
|
[](utp_callback_arguments* args) -> uint64
|
|
{
|
|
if (auto* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
|
|
{
|
|
io->on_utp_error(args->error_code);
|
|
}
|
|
return {};
|
|
});
|
|
|
|
utp_set_callback(
|
|
ctx,
|
|
UTP_ON_STATE_CHANGE,
|
|
[](utp_callback_arguments* args) -> uint64
|
|
{
|
|
if (auto* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
|
|
{
|
|
io->on_utp_state_change(args->state);
|
|
}
|
|
return {};
|
|
});
|
|
#endif
|
|
}
|