refactor: allow explicitly queuing functions in session thread (#6406)

* feat: allow explicitly queuing functions in session thread

* refactor: rename `tr_session_thread`-related functions to snake_case
This commit is contained in:
Yat Ho
2023-12-24 22:32:14 +08:00
committed by GitHub
parent 9932f567fb
commit d2d7987553
8 changed files with 61 additions and 50 deletions

View File

@@ -759,7 +759,7 @@ void tr_rpc_server::set_enabled(bool is_enabled)
{ {
is_enabled_ = is_enabled; is_enabled_ = is_enabled;
session->runInSessionThread( session->run_in_session_thread(
[this]() [this]()
{ {
if (!is_enabled_) if (!is_enabled_)
@@ -784,7 +784,7 @@ void tr_rpc_server::set_port(tr_port port) noexcept
if (is_enabled()) if (is_enabled())
{ {
session->runInSessionThread(&restart_server, this); session->run_in_session_thread(&restart_server, this);
} }
} }
@@ -894,7 +894,7 @@ void tr_rpc_server::load(tr_variant const& src)
{ {
auto const rpc_uri = bind_address_->to_string(this->port()) + this->url_; auto const rpc_uri = bind_address_->to_string(this->port()) + this->url_;
tr_logAddInfo(fmt::format(_("Serving RPC and Web requests on {address}"), fmt::arg("address", rpc_uri))); tr_logAddInfo(fmt::format(_("Serving RPC and Web requests on {address}"), fmt::arg("address", rpc_uri)));
session->runInSessionThread(start_server, this); session->run_in_session_thread(start_server, this);
if (this->is_whitelist_enabled()) if (this->is_whitelist_enabled())
{ {

View File

@@ -107,7 +107,7 @@ unsigned long thread_current_id()
return hashed; return hashed;
} }
void initEvthreadsOnce() void init_evthreads_once()
{ {
evthread_lock_callbacks constexpr LockCbs{ evthread_lock_callbacks constexpr LockCbs{
EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE, lock_alloc, lock_free, lock_lock, lock_unlock EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE, lock_alloc, lock_free, lock_lock, lock_unlock
@@ -126,7 +126,7 @@ void initEvthreadsOnce()
} // namespace tr_evthread_init_helpers } // namespace tr_evthread_init_helpers
auto makeEventBase() auto make_event_base()
{ {
tr_session_thread::tr_evthread_init(); tr_session_thread::tr_evthread_init();
@@ -142,7 +142,7 @@ void tr_session_thread::tr_evthread_init()
using namespace tr_evthread_init_helpers; using namespace tr_evthread_init_helpers;
static auto evthread_flag = std::once_flag{}; static auto evthread_flag = std::once_flag{};
std::call_once(evthread_flag, initEvthreadsOnce); std::call_once(evthread_flag, init_evthreads_once);
} }
class tr_session_thread_impl final : public tr_session_thread class tr_session_thread_impl final : public tr_session_thread
@@ -152,7 +152,7 @@ public:
{ {
auto lock = std::unique_lock(is_looping_mutex_); auto lock = std::unique_lock(is_looping_mutex_);
thread_ = std::thread(&tr_session_thread_impl::sessionThreadFunc, this, event_base()); thread_ = std::thread(&tr_session_thread_impl::session_thread_func, this, event_base());
thread_id_ = thread_.get_id(); thread_id_ = thread_.get_id();
// wait for the session thread's main loop to start // wait for the session thread's main loop to start
@@ -170,7 +170,7 @@ public:
TR_ASSERT(is_looping_); TR_ASSERT(is_looping_);
// Stop the first event loop. This is the steady-state loop that runs // Stop the first event loop. This is the steady-state loop that runs
// continuously, even when there are no events. See: sessionThreadFunc() // continuously, even when there are no events. See: session_thread_func()
is_shutting_down_ = true; is_shutting_down_ = true;
event_base_loopexit(event_base(), nullptr); event_base_loopexit(event_base(), nullptr);
@@ -193,6 +193,15 @@ public:
return thread_id_ == std::this_thread::get_id(); return thread_id_ == std::this_thread::get_id();
} }
void queue(std::function<void(void)>&& func) override
{
work_queue_mutex_.lock();
work_queue_.emplace_back(std::move(func));
work_queue_mutex_.unlock();
event_active(work_queue_event_.get(), 0, {});
}
void run(std::function<void(void)>&& func) override void run(std::function<void(void)>&& func) override
{ {
if (am_in_session_thread()) if (am_in_session_thread())
@@ -201,11 +210,7 @@ public:
} }
else else
{ {
work_queue_mutex_.lock(); queue(std::move(func));
work_queue_.emplace_back(std::move(func));
work_queue_mutex_.unlock();
event_active(work_queue_event_.get(), 0, {});
} }
} }
@@ -213,7 +218,7 @@ private:
using callback = std::function<void(void)>; using callback = std::function<void(void)>;
using work_queue_t = std::list<callback>; using work_queue_t = std::list<callback>;
void sessionThreadFunc(struct event_base* evbase) void session_thread_func(struct event_base* evbase)
{ {
#ifndef _WIN32 #ifndef _WIN32
/* Don't exit when writing on a broken socket */ /* Don't exit when writing on a broken socket */
@@ -247,11 +252,11 @@ private:
ToggleLooping({}, {}, this); ToggleLooping({}, {}, this);
} }
static void onWorkAvailableStatic(evutil_socket_t /*fd*/, short /*flags*/, void* vself) static void on_work_available_static(evutil_socket_t /*fd*/, short /*flags*/, void* vself)
{ {
static_cast<tr_session_thread_impl*>(vself)->onWorkAvailable(); static_cast<tr_session_thread_impl*>(vself)->on_work_available();
} }
void onWorkAvailable() void on_work_available()
{ {
TR_ASSERT(am_in_session_thread()); TR_ASSERT(am_in_session_thread());
@@ -268,9 +273,9 @@ private:
} }
} }
libtransmission::evhelpers::evbase_unique_ptr const evbase_{ makeEventBase() }; libtransmission::evhelpers::evbase_unique_ptr const evbase_{ make_event_base() };
libtransmission::evhelpers::event_unique_ptr const work_queue_event_{ libtransmission::evhelpers::event_unique_ptr const work_queue_event_{
event_new(evbase_.get(), -1, 0, onWorkAvailableStatic, this) event_new(evbase_.get(), -1, 0, on_work_available_static, this)
}; };
work_queue_t work_queue_; work_queue_t work_queue_;

View File

@@ -28,15 +28,19 @@ public:
[[nodiscard]] virtual bool am_in_session_thread() const noexcept = 0; [[nodiscard]] virtual bool am_in_session_thread() const noexcept = 0;
virtual void queue(std::function<void(void)>&& func) = 0;
virtual void run(std::function<void(void)>&& func) = 0; virtual void run(std::function<void(void)>&& func) = 0;
template<typename Func, typename... Args>
void queue(Func&& func, Args&&... args)
{
queue(std::function<void(void)>{ std::bind(std::forward<Func>(func), std::forward<Args>(args)...) });
}
template<typename Func, typename... Args> template<typename Func, typename... Args>
void run(Func&& func, Args&&... args) void run(Func&& func, Args&&... args)
{ {
run(std::function<void(void)>{ run(std::function<void(void)>{ std::bind(std::forward<Func>(func), std::forward<Args>(args)...) });
[func = std::forward<Func&&>(func), args = std::make_tuple(std::forward<Args>(args)...)]()
{
std::apply(std::move(func), std::move(args));
} });
} }
}; };

View File

@@ -354,7 +354,7 @@ void tr_session::WebMediator::notifyBandwidthConsumed(int torrent_id, size_t byt
void tr_session::WebMediator::run(tr_web::FetchDoneFunc&& func, tr_web::FetchResponse&& response) const void tr_session::WebMediator::run(tr_web::FetchDoneFunc&& func, tr_web::FetchResponse&& response) const
{ {
session_->runInSessionThread(std::move(func), std::move(response)); session_->run_in_session_thread(std::move(func), std::move(response));
} }
time_t tr_session::WebMediator::now() const time_t tr_session::WebMediator::now() const
@@ -573,7 +573,7 @@ tr_session* tr_sessionInit(char const* config_dir, bool message_queueing_enabled
// run initImpl() in the libtransmission thread // run initImpl() in the libtransmission thread
auto data = tr_session::init_data{ message_queueing_enabled, config_dir, settings }; auto data = tr_session::init_data{ message_queueing_enabled, config_dir, settings };
auto lock = session->unique_lock(); auto lock = session->unique_lock();
session->runInSessionThread([&session, &data]() { session->initImpl(data); }); session->run_in_session_thread([&session, &data]() { session->initImpl(data); });
data.done_cv.wait(lock); // wait for the session to be ready data.done_cv.wait(lock); // wait for the session to be ready
return session; return session;
@@ -876,7 +876,7 @@ void tr_sessionSet(tr_session* session, tr_variant const& settings)
// do the work in the session thread // do the work in the session thread
auto done_promise = std::promise<void>{}; auto done_promise = std::promise<void>{};
auto done_future = done_promise.get_future(); auto done_future = done_promise.get_future();
session->runInSessionThread( session->run_in_session_thread(
[&session, &settings, &done_promise]() [&session, &settings, &done_promise]()
{ {
session->setSettings(settings, false); session->setSettings(settings, false);
@@ -962,7 +962,7 @@ void tr_sessionSetPeerPort(tr_session* session, uint16_t hport)
if (auto const port = tr_port::from_host(hport); port != session->localPeerPort()) if (auto const port = tr_port::from_host(hport); port != session->localPeerPort())
{ {
session->runInSessionThread( session->run_in_session_thread(
[session, port]() [session, port]()
{ {
auto settings = session->settings_; auto settings = session->settings_;
@@ -1112,7 +1112,7 @@ void tr_session::AltSpeedMediator::is_active_changed(bool is_active, tr_session_
} }
}; };
session_.runInSessionThread(in_session_thread); session_.run_in_session_thread(in_session_thread);
} }
// --- Session primary speed limits // --- Session primary speed limits
@@ -1412,7 +1412,8 @@ void tr_sessionClose(tr_session* session, size_t timeout_secs)
auto closed_promise = std::promise<void>{}; auto closed_promise = std::promise<void>{};
auto closed_future = closed_promise.get_future(); auto closed_future = closed_promise.get_future();
auto const deadline = std::chrono::steady_clock::now() + std::chrono::seconds{ timeout_secs }; auto const deadline = std::chrono::steady_clock::now() + std::chrono::seconds{ timeout_secs };
session->runInSessionThread([&closed_promise, deadline, session]() { session->closeImplPart1(&closed_promise, deadline); }); session->run_in_session_thread([&closed_promise, deadline, session]()
{ session->closeImplPart1(&closed_promise, deadline); });
closed_future.wait(); closed_future.wait();
delete session; delete session;
@@ -1469,7 +1470,7 @@ size_t tr_sessionLoadTorrents(tr_session* session, tr_ctor* ctor)
auto loaded_promise = std::promise<size_t>{}; auto loaded_promise = std::promise<size_t>{};
auto loaded_future = loaded_promise.get_future(); auto loaded_future = loaded_promise.get_future();
session->runInSessionThread(session_load_torrents, session, ctor, &loaded_promise); session->run_in_session_thread(session_load_torrents, session, ctor, &loaded_promise);
loaded_future.wait(); loaded_future.wait();
auto const n_torrents = loaded_future.get(); auto const n_torrents = loaded_future.get();
@@ -1518,7 +1519,7 @@ void tr_sessionSetDHTEnabled(tr_session* session, bool enabled)
if (enabled != session->allowsDHT()) if (enabled != session->allowsDHT())
{ {
session->runInSessionThread( session->run_in_session_thread(
[session, enabled]() [session, enabled]()
{ {
auto settings = session->settings_; auto settings = session->settings_;
@@ -1555,7 +1556,7 @@ void tr_sessionSetUTPEnabled(tr_session* session, bool enabled)
return; return;
} }
session->runInSessionThread( session->run_in_session_thread(
[session, enabled]() [session, enabled]()
{ {
auto settings = session->settings_; auto settings = session->settings_;
@@ -1570,7 +1571,7 @@ void tr_sessionSetLPDEnabled(tr_session* session, bool enabled)
if (enabled != session->allowsLPD()) if (enabled != session->allowsLPD())
{ {
session->runInSessionThread( session->run_in_session_thread(
[session, enabled]() [session, enabled]()
{ {
auto settings = session->settings_; auto settings = session->settings_;
@@ -1655,7 +1656,7 @@ tr_bandwidth& tr_session::getBandwidthGroup(std::string_view name)
void tr_sessionSetPortForwardingEnabled(tr_session* session, bool enabled) void tr_sessionSetPortForwardingEnabled(tr_session* session, bool enabled)
{ {
session->runInSessionThread( session->run_in_session_thread(
[session, enabled]() [session, enabled]()
{ {
session->settings_.port_forwarding_enabled = enabled; session->settings_.port_forwarding_enabled = enabled;

View File

@@ -366,15 +366,16 @@ public:
return session_thread_->am_in_session_thread(); return session_thread_->am_in_session_thread();
} }
void runInSessionThread(std::function<void(void)>&& func) template<typename Func, typename... Args>
void queue_session_thread(Func&& func, Args&&... args)
{ {
session_thread_->run(std::move(func)); session_thread_->queue(std::forward<Func>(func), std::forward<Args>(args)...);
} }
template<typename Func, typename... Args> template<typename Func, typename... Args>
void runInSessionThread(Func&& func, Args&&... args) void run_in_session_thread(Func&& func, Args&&... args)
{ {
session_thread_->run(std::forward<Func&&>(func), std::forward<Args>(args)...); session_thread_->run(std::forward<Func>(func), std::forward<Args>(args)...);
} }
[[nodiscard]] auto* event_base() noexcept [[nodiscard]] auto* event_base() noexcept

View File

@@ -710,7 +710,7 @@ void tr_torrent::start(bool bypass_queue, std::optional<bool> has_any_local_data
is_running_ = true; is_running_ = true;
set_dirty(); set_dirty();
session->runInSessionThread([this]() { start_in_session_thread(); }); session->run_in_session_thread([this]() { start_in_session_thread(); });
} }
void tr_torrent::start_in_session_thread() void tr_torrent::start_in_session_thread()
@@ -786,7 +786,7 @@ void tr_torrentStop(tr_torrent* tor)
tor->start_when_stable_ = false; tor->start_when_stable_ = false;
tor->set_dirty(); tor->set_dirty();
tor->session->runInSessionThread([tor]() { tor->stop_now(); }); tor->session->run_in_session_thread([tor]() { tor->stop_now(); });
} }
void tr_torrentRemove(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func, void* user_data) void tr_torrentRemove(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func, void* user_data)
@@ -797,7 +797,7 @@ void tr_torrentRemove(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func
tor->is_deleting_ = true; tor->is_deleting_ = true;
tor->session->runInSessionThread(removeTorrentInSessionThread, tor, delete_flag, delete_func, user_data); tor->session->run_in_session_thread(removeTorrentInSessionThread, tor, delete_flag, delete_func, user_data);
} }
void tr_torrentFreeInSessionThread(tr_torrent* tor) void tr_torrentFreeInSessionThread(tr_torrent* tor)
@@ -1175,7 +1175,7 @@ void tr_torrent::set_location(std::string_view location, bool move_from_old_path
*setme_state = TR_LOC_MOVING; *setme_state = TR_LOC_MOVING;
} }
session->runInSessionThread([this, loc = std::string(location), move_from_old_path, setme_state]() session->run_in_session_thread([this, loc = std::string(location), move_from_old_path, setme_state]()
{ set_location_in_session_thread(loc, move_from_old_path, setme_state); }); { set_location_in_session_thread(loc, move_from_old_path, setme_state); });
} }
@@ -1263,7 +1263,7 @@ void tr_torrentManualUpdate(tr_torrent* tor)
TR_ASSERT(tr_isTorrent(tor)); TR_ASSERT(tr_isTorrent(tor));
tor->session->runInSessionThread(torrentManualUpdateImpl, tor); tor->session->run_in_session_thread(torrentManualUpdateImpl, tor);
} }
bool tr_torrentCanManualUpdate(tr_torrent const* tor) bool tr_torrentCanManualUpdate(tr_torrent const* tor)
@@ -1539,7 +1539,7 @@ void tr_torrentStartNow(tr_torrent* tor)
void tr_torrentVerify(tr_torrent* tor) void tr_torrentVerify(tr_torrent* tor)
{ {
tor->session->runInSessionThread( tor->session->run_in_session_thread(
[tor]() [tor]()
{ {
TR_ASSERT(tor->session->am_in_session_thread()); TR_ASSERT(tor->session->am_in_session_thread());
@@ -1646,7 +1646,7 @@ void tr_torrent::VerifyMediator::on_verify_done(bool const aborted)
if (!aborted && !tor_->is_deleting_) if (!aborted && !tor_->is_deleting_)
{ {
tor_->session->runInSessionThread( tor_->session->run_in_session_thread(
[tor = tor_]() [tor = tor_]()
{ {
if (tor->is_deleting_) if (tor->is_deleting_)
@@ -2493,7 +2493,7 @@ void tr_torrent::rename_path(
tr_torrent_rename_done_func callback, tr_torrent_rename_done_func callback,
void* callback_user_data) void* callback_user_data)
{ {
this->session->runInSessionThread( this->session->run_in_session_thread(
[this, oldpath = std::string(oldpath), newname = std::string(newname), callback, callback_user_data]() [this, oldpath = std::string(oldpath), newname = std::string(newname), callback, callback_user_data]()
{ rename_path_in_session_thread(oldpath, newname, callback, callback_user_data); }); { rename_path_in_session_thread(oldpath, newname, callback, callback_user_data); });
} }

View File

@@ -386,7 +386,7 @@ void useFetchedBlocks(tr_webseed_task* task)
auto block_buf = std::make_unique<Cache::BlockData>(block_size); auto block_buf = std::make_unique<Cache::BlockData>(block_size);
evbuffer_remove(task->content(), std::data(*block_buf), std::size(*block_buf)); evbuffer_remove(task->content(), std::data(*block_buf), std::size(*block_buf));
auto* const data = new write_block_data{ session, tor->id(), task->loc.block, std::move(block_buf), webseed }; auto* const data = new write_block_data{ session, tor->id(), task->loc.block, std::move(block_buf), webseed };
session->runInSessionThread(&write_block_data::write_block_func, data); session->run_in_session_thread(&write_block_data::write_block_func, data);
} }
task->loc = tor->byte_loc(task->loc.byte + block_size); task->loc = tor->byte_loc(task->loc.byte + block_size);

View File

@@ -106,7 +106,7 @@ TEST_P(IncompleteDirTest, incompleteDir)
std::fill_n(std::data(*data.buf), tr_block_info::BlockSize, '\0'); std::fill_n(std::data(*data.buf), tr_block_info::BlockSize, '\0');
data.block = block_index; data.block = block_index;
data.done = false; data.done = false;
session_->runInSessionThread(test_incomplete_dir_threadfunc, &data); session_->run_in_session_thread(test_incomplete_dir_threadfunc, &data);
auto const test = [&data]() auto const test = [&data]()
{ {