refactor: watchdir (#3606)

This commit is contained in:
Charles Kerr
2022-08-10 08:34:51 -05:00
committed by GitHub
parent b1cc968969
commit c66303fae2
21 changed files with 1063 additions and 1396 deletions

View File

@@ -31,6 +31,7 @@
#include <libtransmission/error.h>
#include <libtransmission/file.h>
#include <libtransmission/log.h>
#include <libtransmission/timer-ev.h>
#include <libtransmission/tr-getopt.h>
#include <libtransmission/tr-macros.h>
#include <libtransmission/tr-strbuf.h>
@@ -60,6 +61,7 @@ static void sd_notifyf(int /*status*/, char const* /*fmt*/, ...)
#endif
using namespace std::literals;
using libtransmission::Watchdir;
/***
****
@@ -226,28 +228,30 @@ static char const* getConfigDir(int argc, char const* const* argv)
return configDir;
}
static auto onFileAdded(tr_watchdir_t dir, char const* name, void* vsession)
static auto onFileAdded(tr_session* session, std::string_view dirname, std::string_view basename)
{
auto const* session = static_cast<tr_session const*>(vsession);
auto const lowercase = tr_strlower(basename);
auto const is_torrent = tr_strvEndsWith(lowercase, ".torrent"sv);
auto const is_magnet = tr_strvEndsWith(lowercase, ".magnet"sv);
if (!tr_str_has_suffix(name, ".torrent") && !tr_str_has_suffix(name, ".magnet"))
if (!is_torrent && !is_magnet)
{
return TR_WATCHDIR_IGNORE;
return Watchdir::Action::Done;
}
auto const filename = tr_pathbuf{ tr_watchdir_get_path(dir), '/', name };
auto const filename = tr_pathbuf{ dirname, '/', basename };
tr_ctor* const ctor = tr_ctorNew(session);
bool retry = false;
if (tr_str_has_suffix(name, ".torrent"))
if (is_torrent)
{
if (!tr_ctorSetMetainfoFromFile(ctor, filename, nullptr))
{
retry = true;
}
}
else // ".magnet" suffix
else // is_magnet
{
auto content = std::vector<char>{};
tr_error* error = nullptr;
@@ -255,7 +259,7 @@ static auto onFileAdded(tr_watchdir_t dir, char const* name, void* vsession)
{
tr_logAddWarn(fmt::format(
_("Couldn't read '{path}': {error} ({error_code})"),
fmt::arg("path", name),
fmt::arg("path", basename),
fmt::arg("error", error->message),
fmt::arg("error_code", error->code)));
tr_error_free(error);
@@ -275,12 +279,12 @@ static auto onFileAdded(tr_watchdir_t dir, char const* name, void* vsession)
if (retry)
{
tr_ctorFree(ctor);
return TR_WATCHDIR_RETRY;
return Watchdir::Action::Retry;
}
if (tr_torrentNew(ctor, nullptr) == nullptr)
{
tr_logAddError(fmt::format(_("Couldn't add torrent file '{path}'"), fmt::arg("path", name)));
tr_logAddError(fmt::format(_("Couldn't add torrent file '{path}'"), fmt::arg("path", basename)));
}
else
{
@@ -291,13 +295,13 @@ static auto onFileAdded(tr_watchdir_t dir, char const* name, void* vsession)
{
tr_error* error = nullptr;
tr_logAddInfo(fmt::format(_("Removing torrent file '{path}'"), fmt::arg("path", name)));
tr_logAddInfo(fmt::format(_("Removing torrent file '{path}'"), fmt::arg("path", basename)));
if (!tr_sys_path_remove(filename, &error))
{
tr_logAddError(fmt::format(
_("Couldn't remove '{path}': {error} ({error_code})"),
fmt::arg("path", name),
fmt::arg("path", basename),
fmt::arg("error", error->message),
fmt::arg("error_code", error->code)));
tr_error_free(error);
@@ -310,7 +314,7 @@ static auto onFileAdded(tr_watchdir_t dir, char const* name, void* vsession)
}
tr_ctorFree(ctor);
return TR_WATCHDIR_ACCEPT;
return Watchdir::Action::Done;
}
static char const* levelName(tr_log_level level)
@@ -712,7 +716,7 @@ static int daemon_start(void* varg, [[maybe_unused]] bool foreground)
bool pidfile_created = false;
tr_session* session = nullptr;
struct event* status_ev = nullptr;
tr_watchdir_t watchdir = nullptr;
auto watchdir = std::unique_ptr<Watchdir>{};
auto* arg = static_cast<daemon_data*>(varg);
tr_variant* const settings = &arg->settings;
@@ -802,11 +806,14 @@ static int daemon_start(void* varg, [[maybe_unused]] bool foreground)
{
tr_logAddInfo(fmt::format(_("Watching '{path}' for new torrent files"), fmt::arg("path", dir)));
watchdir = tr_watchdir_new(dir, &onFileAdded, mySession, ev_base, force_generic);
if (watchdir == nullptr)
auto handler = [session](std::string_view dirname, std::string_view basename)
{
goto CLEANUP;
}
return onFileAdded(session, dirname, basename);
};
auto timer_maker = libtransmission::EvTimerMaker{ ev_base };
watchdir = force_generic ? Watchdir::createGeneric(dir, handler, timer_maker) :
Watchdir::create(dir, handler, timer_maker, ev_base);
}
}
@@ -877,7 +884,7 @@ CLEANUP:
sd_notify(0, "STATUS=Closing transmission session...\n");
printf("Closing transmission session...");
tr_watchdir_free(watchdir);
watchdir.reset();
if (status_ev != nullptr)
{

View File

@@ -208,7 +208,7 @@ set(${PROJECT_NAME}_PRIVATE_HEADERS
variant-common.h
verify.h
version.h
watchdir-common.h
watchdir-base.h
webseed.h
)

View File

@@ -18,8 +18,6 @@
#include <string_view>
#include <vector>
#include <event2/buffer.h>
#include <fmt/core.h>
#define LIBTRANSMISSION_ANNOUNCER_MODULE

View File

@@ -10,8 +10,6 @@
#include <utility> // std::make_pair()
#include <vector>
#include <event2/buffer.h>
#include <fmt/core.h>
#include "transmission.h"

View File

@@ -20,7 +20,6 @@
#include "block-info.h"
class tr_torrents;
struct evbuffer;
struct tr_torrent;
class Cache

View File

@@ -12,8 +12,6 @@
#include <string_view>
#include <utility>
#include <event2/buffer.h>
#include <fmt/chrono.h>
#include <fmt/format.h>

View File

@@ -540,7 +540,7 @@ tr_peerIo* tr_peerIoNew(
#endif
default:
TR_ASSERT_MSG(false, fmt::format(FMT_STRING("unsupported peer socket type {:d}"), socket.type));
TR_ASSERT_MSG(false, fmt::format("unsupported peer socket type {:d}", socket.type));
}
return io;

View File

@@ -36,7 +36,6 @@
#define UTF_CPP_CPLUSPLUS 201703L
#include <utf8.h>
#include <event2/buffer.h>
#include <event2/event.h>
#include <fmt/format.h>
@@ -306,29 +305,6 @@ std::string_view tr_strvStrip(std::string_view str)
return str;
}
bool tr_str_has_suffix(char const* str, char const* suffix)
{
if (str == nullptr)
{
return false;
}
if (suffix == nullptr)
{
return true;
}
auto const str_len = strlen(str);
auto const suffix_len = strlen(suffix);
if (str_len < suffix_len)
{
return false;
}
return evutil_ascii_strncasecmp(str + str_len - suffix_len, suffix, suffix_len) == 0;
}
/****
*****
****/

View File

@@ -181,9 +181,6 @@ size_t tr_strlcpy(void* dst, void const* src, size_t siz);
@param errnum the error number to describe */
[[nodiscard]] char const* tr_strerror(int errnum);
/** @brief Returns true if the string ends with the specified case-insensitive suffix */
[[nodiscard]] bool tr_str_has_suffix(char const* str, char const* suffix);
template<typename T>
[[nodiscard]] std::string tr_strlower(T in)
{

View File

@@ -0,0 +1,147 @@
// This file Copyright © 2015-2022 Mnemosyne LLC.
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#pragma once
#include <algorithm>
#include <chrono>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include "timer.h"
#include "watchdir.h"
namespace libtransmission::impl
{
// base class for concrete tr_watchdirs
class BaseWatchdir : public Watchdir
{
public:
BaseWatchdir(std::string_view dirname, Callback callback, TimerMaker& timer_maker)
: dirname_{ dirname }
, callback_{ std::move(callback) }
, retry_timer_{ timer_maker.create() }
{
retry_timer_->setCallback([this]() { onRetryTimer(); });
}
~BaseWatchdir() override = default;
BaseWatchdir(BaseWatchdir&&) = delete;
BaseWatchdir(BaseWatchdir const&) = delete;
BaseWatchdir& operator=(BaseWatchdir&&) = delete;
BaseWatchdir& operator=(BaseWatchdir const&) = delete;
[[nodiscard]] std::string_view dirname() const noexcept override
{
return dirname_;
}
[[nodiscard]] constexpr auto timeoutDuration() const noexcept
{
return timeout_duration_;
}
constexpr void setTimeoutDuration(std::chrono::seconds timeout_duration) noexcept
{
timeout_duration_ = timeout_duration;
}
[[nodiscard]] constexpr auto retryDuration() const noexcept
{
return retry_duration_;
}
void setRetryDuration(std::chrono::milliseconds retry_duration) noexcept
{
retry_duration_ = retry_duration;
for (auto& [basename, info] : pending_)
{
setNextKickTime(info);
}
}
protected:
void scan();
void processFile(std::string_view basename);
private:
using Timestamp = std::chrono::time_point<std::chrono::steady_clock>;
struct Pending
{
size_t strikes = 0U;
Timestamp first_kick_at = {};
Timestamp last_kick_at = {};
Timestamp next_kick_at = {};
};
void setNextKickTime(Pending& item)
{
item.next_kick_at = item.last_kick_at + retry_duration_;
}
[[nodiscard]] auto nextKickTime() const
{
auto next_time = std::optional<Timestamp>{};
for (auto const& [name, info] : pending_)
{
if (!next_time || info.next_kick_at < *next_time)
{
next_time = info.next_kick_at;
}
}
return next_time;
}
void restartTimerIfPending()
{
if (auto next_kick_time = nextKickTime(); next_kick_time)
{
using namespace std::chrono;
auto const now = steady_clock::now();
auto duration = duration_cast<milliseconds>(*next_kick_time - now);
retry_timer_->startSingleShot(duration);
}
}
void onRetryTimer()
{
using namespace std::chrono;
auto const now = steady_clock::now();
auto tmp = decltype(pending_){};
std::swap(tmp, pending_);
for (auto const& [basename, info] : tmp)
{
if (info.next_kick_at <= now)
{
processFile(basename);
}
else
{
pending_.emplace(basename, info);
}
}
restartTimerIfPending();
}
std::string const dirname_;
Callback const callback_;
std::unique_ptr<Timer> const retry_timer_;
std::map<std::string, Pending, std::less<>> pending_;
std::set<std::string, std::less<>> handled_;
std::chrono::milliseconds retry_duration_ = std::chrono::seconds{ 5 };
std::chrono::seconds timeout_duration_ = std::chrono::seconds{ 15 };
};
} // namespace libtransmission::impl

View File

@@ -1,46 +0,0 @@
// This file Copyright © 2015-2022 Mnemosyne LLC.
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#pragma once
#ifndef LIBTRANSMISSION_WATCHDIR_MODULE
#error only the libtransmission watchdir module should #include this header.
#endif
#include <string>
#include <unordered_set>
struct tr_watchdir_backend
{
void (*free_func)(struct tr_watchdir_backend*);
};
#define BACKEND_DOWNCAST(b) (reinterpret_cast<tr_watchdir_backend*>(b))
/* ... */
tr_watchdir_backend* tr_watchdir_get_backend(tr_watchdir_t handle);
struct event_base* tr_watchdir_get_event_base(tr_watchdir_t handle);
/* ... */
void tr_watchdir_process(tr_watchdir_t handle, char const* name);
void tr_watchdir_scan(tr_watchdir_t handle, std::unordered_set<std::string>* dir_entries);
/* ... */
tr_watchdir_backend* tr_watchdir_generic_new(tr_watchdir_t handle);
#ifdef WITH_INOTIFY
tr_watchdir_backend* tr_watchdir_inotify_new(tr_watchdir_t handle);
#endif
#ifdef WITH_KQUEUE
tr_watchdir_backend* tr_watchdir_kqueue_new(tr_watchdir_t handle);
#endif
#ifdef _WIN32
tr_watchdir_backend* tr_watchdir_win32_new(tr_watchdir_t handle);
#endif

View File

@@ -3,103 +3,57 @@
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#include <cerrno>
#include <string>
#include <unordered_set>
#include <event2/event.h>
#include <fmt/core.h>
#define LIBTRANSMISSION_WATCHDIR_MODULE
#include "transmission.h"
#include "log.h"
#include "tr-assert.h"
#include "utils.h"
#include "watchdir.h"
#include "watchdir-common.h"
/***
****
***/
#include "watchdir-base.h"
struct tr_watchdir_generic
namespace libtransmission
{
tr_watchdir_backend base;
namespace
{
class GenericWatchdir final : public impl::BaseWatchdir
{
public:
GenericWatchdir(
std::string_view dirname,
Callback callback,
libtransmission::TimerMaker& timer_maker,
std::chrono::milliseconds rescan_interval)
: BaseWatchdir{ dirname, std::move(callback), timer_maker }
, rescan_timer_{ timer_maker.create() }
{
rescan_timer_->setCallback([this]() { scan(); });
rescan_timer_->startRepeating(rescan_interval);
scan();
}
struct event* event;
std::unordered_set<std::string> dir_entries;
private:
std::unique_ptr<Timer> rescan_timer_;
};
#define BACKEND_UPCAST(b) (reinterpret_cast<tr_watchdir_generic*>(b))
} // namespace
/* Non-static and mutable for unit tests. default to 10 sec. */
auto tr_watchdir_generic_interval = timeval{ 10, 0 };
/***
****
***/
static void tr_watchdir_generic_on_event(evutil_socket_t /*fd*/, short /*type*/, void* context)
std::unique_ptr<Watchdir> Watchdir::createGeneric(
std::string_view dirname,
Callback callback,
libtransmission::TimerMaker& timer_maker,
std::chrono::milliseconds rescan_interval)
{
auto const handle = static_cast<tr_watchdir_t>(context);
auto* const backend = BACKEND_UPCAST(tr_watchdir_get_backend(handle));
tr_watchdir_scan(handle, &backend->dir_entries);
return std::make_unique<GenericWatchdir>(dirname, std::move(callback), timer_maker, rescan_interval);
}
static void tr_watchdir_generic_free(tr_watchdir_backend* backend_base)
#if !defined(WITH_INOTIFY) && !defined(WITH_KQUEUE) && !defined(_WIN32)
// no native impl, so use generic
std::unique_ptr<Watchdir> Watchdir::create(
std::string_view dirname,
Callback callback,
libtransmission::TimerMaker& timer_maker,
struct event_base* /*evbase*/)
{
auto* const backend = BACKEND_UPCAST(backend_base);
if (backend == nullptr)
{
return;
}
TR_ASSERT(backend->base.free_func == &tr_watchdir_generic_free);
if (backend->event != nullptr)
{
event_del(backend->event);
event_free(backend->event);
}
delete backend;
return std::make_unique<GenericWatchdir>(dirname, std::move(callback), timer_maker, genericRescanInterval());
}
#endif
tr_watchdir_backend* tr_watchdir_generic_new(tr_watchdir_t handle)
{
auto* backend = new tr_watchdir_generic{};
backend->base.free_func = &tr_watchdir_generic_free;
backend->event = event_new(tr_watchdir_get_event_base(handle), -1, EV_PERSIST, &tr_watchdir_generic_on_event, handle);
if (backend->event == nullptr)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't create event: {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto FAIL;
}
if (event_add(backend->event, &tr_watchdir_generic_interval) == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't add event: {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto FAIL;
}
/* Run initial scan on startup */
event_active(backend->event, EV_READ, 0);
return BACKEND_DOWNCAST(backend);
FAIL:
tr_watchdir_generic_free(BACKEND_DOWNCAST(backend));
return nullptr;
}
} // namespace libtransmission

View File

@@ -6,6 +6,8 @@
#include <cerrno>
#include <climits> /* NAME_MAX */
#include <iostream> // NOCOMMIT
#include <unistd.h> /* close() */
#include <sys/inotify.h>
@@ -18,53 +20,111 @@
#define LIBTRANSMISSION_WATCHDIR_MODULE
#include "transmission.h"
#include "log.h"
#include "tr-assert.h"
#include "tr-strbuf.h"
#include "utils.h"
#include "watchdir.h"
#include "watchdir-common.h"
#include "watchdir-base.h"
/***
****
***/
struct tr_watchdir_inotify
namespace libtransmission
{
tr_watchdir_backend base;
int infd;
int inwd;
struct bufferevent* event;
};
#define BACKEND_UPCAST(b) ((tr_watchdir_inotify*)(b))
#define INOTIFY_WATCH_MASK (IN_CLOSE_WRITE | IN_MOVED_TO | IN_CREATE)
/***
****
***/
static void tr_watchdir_inotify_on_first_scan(evutil_socket_t /*fd*/, short /*type*/, void* context)
namespace
{
auto const handle = static_cast<tr_watchdir_t>(context);
tr_watchdir_scan(handle, nullptr);
}
static void tr_watchdir_inotify_on_event(struct bufferevent* event, void* context)
class INotifyWatchdir final : public impl::BaseWatchdir
{
TR_ASSERT(context != nullptr);
private:
static auto constexpr InotifyWatchMask = uint32_t{ IN_CLOSE_WRITE | IN_MOVED_TO | IN_CREATE };
auto const handle = static_cast<tr_watchdir_t>(context);
#ifdef TR_ENABLE_ASSERTS
tr_watchdir_inotify const* const backend = BACKEND_UPCAST(tr_watchdir_get_backend(handle));
#endif
public:
INotifyWatchdir(std::string_view dirname, Callback callback, TimerMaker& timer_maker, event_base* evbase)
: BaseWatchdir{ dirname, std::move(callback), timer_maker }
{
init(evbase);
scan();
}
INotifyWatchdir(INotifyWatchdir&&) = delete;
INotifyWatchdir(INotifyWatchdir const&) = delete;
INotifyWatchdir& operator=(INotifyWatchdir&&) = delete;
INotifyWatchdir& operator=(INotifyWatchdir const&) = delete;
~INotifyWatchdir() override
{
if (event_ != nullptr)
{
bufferevent_disable(event_, EV_READ);
bufferevent_free(event_);
}
if (infd_ != -1)
{
if (inwd_ != -1)
{
inotify_rm_watch(infd_, inwd_);
}
close(infd_);
}
}
private:
void init(struct event_base* evbase)
{
infd_ = inotify_init();
if (infd_ == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("path", dirname()),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
return;
}
inwd_ = inotify_add_watch(infd_, tr_pathbuf{ dirname() }, InotifyWatchMask | IN_ONLYDIR);
if (inwd_ == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("path", dirname()),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
return;
}
event_ = bufferevent_socket_new(evbase, infd_, 0);
if (event_ == nullptr)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("path", dirname()),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
return;
}
// guarantees at least the sizeof an inotify event will be available in the event buffer
bufferevent_setwatermark(event_, EV_READ, sizeof(struct inotify_event), 0);
bufferevent_setcb(event_, onInotifyEvent, nullptr, nullptr, this);
bufferevent_enable(event_, EV_READ);
}
static void onInotifyEvent(struct bufferevent* event, void* vself)
{
static_cast<INotifyWatchdir*>(vself)->handleInotifyEvent(event);
}
void handleInotifyEvent(struct bufferevent* event)
{
struct inotify_event ev;
auto name = std::string{};
/* Read the size of the struct excluding name into buf. Guaranteed to have at
least sizeof(ev) available */
// Read the size of the struct excluding name into buf.
// Guaranteed to have at least sizeof(ev) available.
auto nread = size_t{};
while ((nread = bufferevent_read(event, &ev, sizeof(ev))) != 0)
{
@@ -87,11 +147,11 @@ static void tr_watchdir_inotify_on_event(struct bufferevent* event, void* contex
break;
}
TR_ASSERT(ev.wd == backend->inwd);
TR_ASSERT((ev.mask & INOTIFY_WATCH_MASK) != 0);
TR_ASSERT(ev.wd == inwd_);
TR_ASSERT((ev.mask & InotifyWatchMask) != 0);
TR_ASSERT(ev.len > 0);
/* Consume entire name into buffer */
// consume entire name into buffer
name.resize(ev.len);
nread = bufferevent_read(event, std::data(name), ev.len);
if (nread == static_cast<size_t>(-1))
@@ -113,109 +173,27 @@ static void tr_watchdir_inotify_on_event(struct bufferevent* event, void* contex
break;
}
tr_watchdir_process(handle, name.c_str());
// NB: `name` may have extra trailing zeroes from inotify;
// pass the c_str() so that processFile gets the right strlen
processFile(name.c_str());
}
}
}
static void tr_watchdir_inotify_free(tr_watchdir_backend* backend_base)
private:
int infd_ = -1;
int inwd_ = -1;
struct bufferevent* event_ = nullptr;
};
} // namespace
std::unique_ptr<Watchdir> Watchdir::create(
std::string_view dirname,
Callback callback,
libtransmission::TimerMaker& timer_maker,
event_base* evbase)
{
auto* const backend = BACKEND_UPCAST(backend_base);
if (backend == nullptr)
{
return;
}
TR_ASSERT(backend->base.free_func == &tr_watchdir_inotify_free);
if (backend->event != nullptr)
{
bufferevent_disable(backend->event, EV_READ);
bufferevent_free(backend->event);
}
if (backend->infd != -1)
{
if (backend->inwd != -1)
{
inotify_rm_watch(backend->infd, backend->inwd);
}
close(backend->infd);
}
tr_free(backend);
return std::make_unique<INotifyWatchdir>(dirname, std::move(callback), timer_maker, evbase);
}
tr_watchdir_backend* tr_watchdir_inotify_new(tr_watchdir_t handle)
{
char const* const path = tr_watchdir_get_path(handle);
auto* const backend = tr_new0(tr_watchdir_inotify, 1);
backend->base.free_func = &tr_watchdir_inotify_free;
backend->infd = inotify_init();
if (backend->infd == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("path", path),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto FAIL;
}
backend->inwd = inotify_add_watch(backend->infd, path, INOTIFY_WATCH_MASK | IN_ONLYDIR);
if (backend->inwd == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("path", path),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto FAIL;
}
backend->event = bufferevent_socket_new(tr_watchdir_get_event_base(handle), backend->infd, 0);
if (backend->event == nullptr)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("path", path),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto FAIL;
}
/* Guarantees at least the sizeof an inotify event will be available in the
event buffer */
bufferevent_setwatermark(backend->event, EV_READ, sizeof(struct inotify_event), 0);
bufferevent_setcb(backend->event, &tr_watchdir_inotify_on_event, nullptr, nullptr, handle);
bufferevent_enable(backend->event, EV_READ);
/* Perform an initial scan on the directory */
if (event_base_once(
tr_watchdir_get_event_base(handle),
-1,
EV_TIMEOUT,
&tr_watchdir_inotify_on_first_scan,
handle,
nullptr) == -1)
{
auto const error_code = errno;
tr_logAddWarn(fmt::format(
_("Couldn't scan '{path}': {error} ({error_code})"),
fmt::arg("path", path),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
}
return BACKEND_DOWNCAST(backend);
FAIL:
tr_watchdir_inotify_free(BACKEND_DOWNCAST(backend));
return nullptr;
}
} // namespace libtransmission

View File

@@ -3,12 +3,12 @@
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#include <cerrno> /* errno */
#include <cerrno> // for errno
#include <string>
#include <unordered_set>
#include <fcntl.h> /* open() */
#include <unistd.h> /* close() */
#include <fcntl.h> // for open()
#include <unistd.h> // for close()
#include <sys/types.h>
#include <sys/event.h>
@@ -25,41 +25,124 @@
#include "transmission.h"
#include "log.h"
#include "tr-assert.h"
#include "utils.h"
#include "watchdir.h"
#include "watchdir-common.h"
#include "tr-strbuf.h"
#include "utils.h" // for _()
#include "watchdir-base.h"
/***
****
***/
struct tr_watchdir_kqueue
namespace libtransmission
{
tr_watchdir_backend base;
int kq;
int dirfd;
struct event* event;
std::unordered_set<std::string> dir_entries;
};
#define BACKEND_UPCAST(b) (reinterpret_cast<tr_watchdir_kqueue*>(b))
#define KQUEUE_WATCH_MASK (NOTE_WRITE | NOTE_EXTEND)
/***
****
***/
static void tr_watchdir_kqueue_on_event(evutil_socket_t /*fd*/, short /*type*/, void* context)
namespace
{
auto const handle = static_cast<tr_watchdir_t>(context);
auto* const backend = BACKEND_UPCAST(tr_watchdir_get_backend(handle));
class KQueueWatchdir final : public impl::BaseWatchdir
{
public:
KQueueWatchdir(std::string_view dirname, Callback callback, libtransmission::TimerMaker& timer_maker, event_base* evbase)
: BaseWatchdir{ dirname, std::move(callback), timer_maker }
{
init(evbase);
scan();
}
KQueueWatchdir(KQueueWatchdir&&) = delete;
KQueueWatchdir(KQueueWatchdir const&) = delete;
KQueueWatchdir& operator=(KQueueWatchdir&&) = delete;
KQueueWatchdir& operator=(KQueueWatchdir const&) = delete;
~KQueueWatchdir() override
{
if (event_ != nullptr)
{
event_del(event_);
event_free(event_);
}
if (kq_ != -1)
{
close(kq_);
}
if (dirfd_ != -1)
{
close(dirfd_);
}
}
private:
void init(struct event_base* evbase)
{
kq_ = kqueue();
if (kq_ == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
return;
}
// open fd for watching
auto const szdirname = tr_pathbuf{ dirname() };
dirfd_ = open(szdirname, O_RDONLY | O_EVTONLY);
if (dirfd_ == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("path", dirname()),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
return;
}
// register kevent filter with kqueue descriptor
struct kevent ke;
static auto constexpr KqueueWatchMask = (NOTE_WRITE | NOTE_EXTEND);
EV_SET(&ke, dirfd_, EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, KqueueWatchMask, 0, NULL);
if (kevent(kq_, &ke, 1, nullptr, 0, nullptr) == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("path", dirname()),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
return;
}
// create libevent task for event descriptor
event_ = event_new(evbase, kq_, EV_READ | EV_ET | EV_PERSIST, &onKqueueEvent, this);
if (event_ == nullptr)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't create event: {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
return;
}
if (event_add(event_, nullptr) == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't add event: {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
return;
}
}
static void onKqueueEvent(evutil_socket_t /*fd*/, short /*type*/, void* vself)
{
static_cast<KQueueWatchdir*>(vself)->handleKqueueEvent();
}
void handleKqueueEvent()
{
struct kevent ke;
auto ts = timespec{};
if (kevent(backend->kq, nullptr, 0, &ke, 1, &ts) == -1)
if (kevent(kq_, nullptr, 0, &ke, 1, &ts) == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
@@ -69,119 +152,23 @@ static void tr_watchdir_kqueue_on_event(evutil_socket_t /*fd*/, short /*type*/,
return;
}
/* Read directory with generic scan */
tr_watchdir_scan(handle, &backend->dir_entries);
}
scan();
}
static void tr_watchdir_kqueue_free(tr_watchdir_backend* backend_base)
int kq_ = -1;
int dirfd_ = -1;
struct event* event_ = nullptr;
};
} // namespace
std::unique_ptr<Watchdir> Watchdir::create(
std::string_view dirname,
Callback callback,
TimerMaker& timer_maker,
event_base* evbase)
{
tr_watchdir_kqueue* const backend = BACKEND_UPCAST(backend_base);
if (backend == nullptr)
{
return;
}
TR_ASSERT(backend->base.free_func == &tr_watchdir_kqueue_free);
if (backend->event != nullptr)
{
event_del(backend->event);
event_free(backend->event);
}
if (backend->kq != -1)
{
close(backend->kq);
}
if (backend->dirfd != -1)
{
close(backend->dirfd);
}
delete backend;
return std::make_unique<KQueueWatchdir>(dirname, std::move(callback), timer_maker, evbase);
}
tr_watchdir_backend* tr_watchdir_kqueue_new(tr_watchdir_t handle)
{
char const* const path = tr_watchdir_get_path(handle);
struct kevent ke;
auto* backend = new tr_watchdir_kqueue{};
backend->base.free_func = &tr_watchdir_kqueue_free;
backend->dirfd = -1;
backend->kq = kqueue();
if (backend->kq == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto fail;
}
/* Open fd for watching */
backend->dirfd = open(path, O_RDONLY | O_EVTONLY);
if (backend->dirfd == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("path", path),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto fail;
}
/* Register kevent filter with kqueue descriptor */
EV_SET(&ke, backend->dirfd, EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, KQUEUE_WATCH_MASK, 0, NULL);
if (kevent(backend->kq, &ke, 1, nullptr, 0, nullptr) == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't watch '{path}': {error} ({error_code})"),
fmt::arg("path", path),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto fail;
}
/* Create libevent task for event descriptor */
if ((backend->event = event_new(
tr_watchdir_get_event_base(handle),
backend->kq,
EV_READ | EV_ET | EV_PERSIST,
&tr_watchdir_kqueue_on_event,
handle)) == nullptr)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't create event: {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto fail;
}
if (event_add(backend->event, nullptr) == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't add event: {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto fail;
}
/* Trigger one event for the initial scan */
event_active(backend->event, EV_READ, 0);
return BACKEND_DOWNCAST(backend);
fail:
tr_watchdir_kqueue_free(BACKEND_DOWNCAST(backend));
return nullptr;
}
} // namespace libtransmission

View File

@@ -3,10 +3,11 @@
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#include <cstddef> /* offsetof */
#include <errno.h>
#include <array>
#include <cerrno>
#include <cstddef> // for offsetof
#include <process.h> /* _beginthreadex() */
#include <process.h> // for _beginthreadex()
#include <windows.h>
@@ -23,34 +24,14 @@
#include "net.h"
#include "tr-assert.h"
#include "utils.h"
#include "watchdir.h"
#include "watchdir-common.h"
#include "watchdir-base.h"
/***
****
***/
struct tr_watchdir_win32
namespace libtransmission
{
namespace
{
tr_watchdir_backend base;
HANDLE fd;
OVERLAPPED overlapped;
DWORD buffer[8 * 1024 / sizeof(DWORD)];
evutil_socket_t notify_pipe[2];
struct bufferevent* event;
HANDLE thread;
};
#define BACKEND_UPCAST(b) ((tr_watchdir_win32*)(b))
#define WIN32_WATCH_MASK (FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_SIZE | FILE_NOTIFY_CHANGE_LAST_WRITE)
/***
****
***/
static BOOL tr_get_overlapped_result_ex(
BOOL tr_get_overlapped_result_ex(
HANDLE handle,
LPOVERLAPPED overlapped,
LPDWORD bytes_transferred,
@@ -91,34 +72,152 @@ static BOOL tr_get_overlapped_result_ex(
return GetOverlappedResult(handle, overlapped, bytes_transferred, FALSE);
}
static unsigned int __stdcall tr_watchdir_win32_thread(void* context)
class Win32Watchdir final : public impl::BaseWatchdir
{
auto const handle = static_cast<tr_watchdir_t>(context);
tr_watchdir_win32* const backend = BACKEND_UPCAST(tr_watchdir_get_backend(handle));
public:
Win32Watchdir(
std::string_view dirname,
Callback callback,
libtransmission::TimerMaker& timer_maker,
struct event_base* event_base)
: BaseWatchdir{ dirname, std::move(callback), timer_maker }
{
init(event_base);
scan();
}
Win32Watchdir(Win32Watchdir&&) = delete;
Win32Watchdir(Win32Watchdir const&) = delete;
Win32Watchdir& operator=(Win32Watchdir&&) = delete;
Win32Watchdir& operator=(Win32Watchdir const&) = delete;
~Win32Watchdir() override
{
if (fd_ != INVALID_HANDLE_VALUE)
{
CancelIoEx(fd_, &overlapped_);
}
if (thread_ != nullptr)
{
WaitForSingleObject(thread_, INFINITE);
CloseHandle(thread_);
}
if (event_ != nullptr)
{
bufferevent_free(event_);
}
if (notify_pipe_[0] != TR_BAD_SOCKET)
{
evutil_closesocket(notify_pipe_[0]);
}
if (notify_pipe_[1] != TR_BAD_SOCKET)
{
evutil_closesocket(notify_pipe_[1]);
}
if (fd_ != INVALID_HANDLE_VALUE)
{
CloseHandle(fd_);
}
}
private:
static auto constexpr Win32WatchMask =
(FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_SIZE | FILE_NOTIFY_CHANGE_LAST_WRITE);
void init(struct event_base* event_base)
{
tr_net_init();
auto const path = dirname();
auto const wide_path = tr_win32_utf8_to_native(path);
if (std::empty(wide_path))
{
tr_logAddError(fmt::format(_("Couldn't convert '{path}' to native path"), fmt::arg("path", path)));
return;
}
if ((fd_ = CreateFileW(
wide_path.c_str(),
FILE_LIST_DIRECTORY,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
nullptr,
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
nullptr)) == INVALID_HANDLE_VALUE)
{
tr_logAddError(fmt::format(_("Couldn't read '{path}'"), fmt::arg("path", path)));
return;
}
overlapped_.Pointer = this;
if (!ReadDirectoryChangesW(fd_, buffer_, sizeof(buffer_), false, Win32WatchMask, nullptr, &overlapped_, nullptr))
{
tr_logAddError(fmt::format(_("Couldn't read '{path}'"), fmt::arg("path", path)));
return;
}
if (evutil_socketpair(AF_INET, SOCK_STREAM, 0, std::data(notify_pipe_)) == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't create pipe: {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
return;
}
event_ = bufferevent_socket_new(event_base, notify_pipe_[0], 0);
if (event_ == nullptr)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't create event: {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
return;
}
bufferevent_setwatermark(event_, EV_READ, sizeof(FILE_NOTIFY_INFORMATION), 0);
bufferevent_setcb(event_, &Win32Watchdir::onBufferEvent, nullptr, nullptr, this);
bufferevent_enable(event_, EV_READ);
thread_ = (HANDLE)_beginthreadex(nullptr, 0, Win32Watchdir::staticThreadFunc, this, 0, nullptr);
if (thread_ == nullptr)
{
tr_logAddError(_("Couldn't create thread"));
return;
}
}
static unsigned int __stdcall staticThreadFunc(void* vself)
{
return static_cast<Win32Watchdir*>(vself)->threadFunc();
}
unsigned int threadFunc()
{
DWORD bytes_transferred;
while (tr_get_overlapped_result_ex(backend->fd, &backend->overlapped, &bytes_transferred, INFINITE, FALSE))
while (tr_get_overlapped_result_ex(fd_, &overlapped_, &bytes_transferred, INFINITE, FALSE))
{
PFILE_NOTIFY_INFORMATION info = (PFILE_NOTIFY_INFORMATION)backend->buffer;
PFILE_NOTIFY_INFORMATION info = (PFILE_NOTIFY_INFORMATION)buffer_;
while (info->NextEntryOffset != 0)
{
*((BYTE**)&info) += info->NextEntryOffset;
}
info->NextEntryOffset = bytes_transferred - ((BYTE*)info - (BYTE*)backend->buffer);
info->NextEntryOffset = bytes_transferred - ((BYTE*)info - (BYTE*)buffer_);
send(backend->notify_pipe[1], (char const*)backend->buffer, bytes_transferred, 0);
send(notify_pipe_[1], (char const*)buffer_, bytes_transferred, 0);
if (!ReadDirectoryChangesW(
backend->fd,
backend->buffer,
sizeof(backend->buffer),
FALSE,
WIN32_WATCH_MASK,
nullptr,
&backend->overlapped,
nullptr))
if (!ReadDirectoryChangesW(fd_, buffer_, sizeof(buffer_), FALSE, Win32WatchMask, nullptr, &overlapped_, nullptr))
{
tr_logAddError(_("Couldn't read directory changes"));
return 0;
@@ -131,28 +230,38 @@ static unsigned int __stdcall tr_watchdir_win32_thread(void* context)
}
return 0;
}
}
static void tr_watchdir_win32_on_first_scan(evutil_socket_t /*fd*/, short /*type*/, void* context)
{
auto const handle = static_cast<tr_watchdir_t>(context);
static void onFirstScan(evutil_socket_t /*unused*/, short /*unused*/, void* vself)
{
static_cast<Win32Watchdir*>(vself)->scan();
}
tr_watchdir_scan(handle, nullptr);
}
static void onBufferEvent(struct bufferevent* event, void* vself)
{
static_cast<Win32Watchdir*>(vself)->processBufferEvent(event);
}
static void tr_watchdir_win32_on_event(struct bufferevent* event, void* context)
{
auto const handle = static_cast<tr_watchdir_t>(context);
size_t nread;
void processBufferEvent(struct bufferevent* event)
{
size_t name_size = MAX_PATH * sizeof(WCHAR);
auto* buffer = static_cast<char*>(tr_malloc(sizeof(FILE_NOTIFY_INFORMATION) + name_size));
PFILE_NOTIFY_INFORMATION ev = (PFILE_NOTIFY_INFORMATION)buffer;
auto buffer = std::vector<char>{};
buffer.resize(sizeof(FILE_NOTIFY_INFORMATION) + name_size);
PFILE_NOTIFY_INFORMATION ev = (PFILE_NOTIFY_INFORMATION)std::data(buffer);
size_t const header_size = offsetof(FILE_NOTIFY_INFORMATION, FileName);
/* Read the size of the struct excluding name into buf. Guaranteed to have at
least sizeof(*ev) available */
while ((nread = bufferevent_read(event, ev, header_size)) != 0)
// Read the size of the struct excluding name into buf.
// Guaranteed to have at least sizeof(*ev) available
for (;;)
{
auto nread = bufferevent_read(event, ev, header_size);
if (nread == 0)
{
break;
}
if (nread == (size_t)-1)
{
auto const error_code = errno;
@@ -181,12 +290,13 @@ static void tr_watchdir_win32_on_event(struct bufferevent* event, void* context)
if (nleft > name_size)
{
name_size = nleft;
buffer = static_cast<char*>(tr_realloc(buffer, sizeof(FILE_NOTIFY_INFORMATION) + name_size));
ev = (PFILE_NOTIFY_INFORMATION)buffer;
buffer.resize(sizeof(FILE_NOTIFY_INFORMATION) + name_size);
ev = (PFILE_NOTIFY_INFORMATION)std::data(buffer);
}
/* Consume entire name into buffer */
if ((nread = bufferevent_read(event, buffer + header_size, nleft)) == (size_t)-1)
// consume entire name into buffer
nread = bufferevent_read(event, &buffer[header_size], nleft);
if (nread == (size_t)-1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
@@ -205,159 +315,35 @@ static void tr_watchdir_win32_on_event(struct bufferevent* event, void* context)
break;
}
if (ev->Action == FILE_ACTION_ADDED || ev->Action == FILE_ACTION_MODIFIED || ev->Action == FILE_ACTION_RENAMED_NEW_NAME)
if (ev->Action == FILE_ACTION_ADDED || ev->Action == FILE_ACTION_MODIFIED ||
ev->Action == FILE_ACTION_RENAMED_NEW_NAME)
{
if (auto const name = tr_win32_native_to_utf8({ ev->FileName, ev->FileNameLength / sizeof(WCHAR) });
!std::empty(name))
{
tr_watchdir_process(handle, name.c_str());
processFile(name);
}
}
}
}
tr_free(buffer);
}
HANDLE fd_ = INVALID_HANDLE_VALUE;
OVERLAPPED overlapped_ = {};
DWORD buffer_[8 * 1024 / sizeof(DWORD)];
std::array<evutil_socket_t, 2> notify_pipe_{ static_cast<evutil_socket_t>(-1), static_cast<evutil_socket_t>(-1) };
struct bufferevent* event_ = nullptr;
HANDLE thread_ = {};
};
static void tr_watchdir_win32_free(tr_watchdir_backend* backend_base)
} // namespace
std::unique_ptr<Watchdir> Watchdir::create(
std::string_view dirname,
Callback callback,
TimerMaker& timer_maker,
struct event_base* event_base)
{
tr_watchdir_win32* const backend = BACKEND_UPCAST(backend_base);
if (backend == nullptr)
{
return;
}
TR_ASSERT(backend->base.free_func == &tr_watchdir_win32_free);
if (backend->fd != INVALID_HANDLE_VALUE)
{
CancelIoEx(backend->fd, &backend->overlapped);
}
if (backend->thread != nullptr)
{
WaitForSingleObject(backend->thread, INFINITE);
CloseHandle(backend->thread);
}
if (backend->event != nullptr)
{
bufferevent_free(backend->event);
}
if (backend->notify_pipe[0] != TR_BAD_SOCKET)
{
evutil_closesocket(backend->notify_pipe[0]);
}
if (backend->notify_pipe[1] != TR_BAD_SOCKET)
{
evutil_closesocket(backend->notify_pipe[1]);
}
if (backend->fd != INVALID_HANDLE_VALUE)
{
CloseHandle(backend->fd);
}
tr_free(backend);
return std::make_unique<Win32Watchdir>(dirname, std::move(callback), timer_maker, event_base);
}
tr_watchdir_backend* tr_watchdir_win32_new(tr_watchdir_t handle)
{
char const* const path = tr_watchdir_get_path(handle);
auto* const backend = tr_new0(tr_watchdir_win32, 1);
backend->base.free_func = &tr_watchdir_win32_free;
backend->fd = INVALID_HANDLE_VALUE;
backend->notify_pipe[0] = backend->notify_pipe[1] = TR_BAD_SOCKET;
auto const wide_path = tr_win32_utf8_to_native(path);
if (!std::empty(wide_path))
{
tr_logAddError(fmt::format(_("Couldn't convert '{path}' to native path"), fmt::arg("path", path)));
goto fail;
}
if ((backend->fd = CreateFileW(
wide_path.c_str(),
FILE_LIST_DIRECTORY,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
nullptr,
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
nullptr)) == INVALID_HANDLE_VALUE)
{
tr_logAddError(fmt::format(_("Couldn't read '{path}'"), fmt::arg("path", path)));
goto fail;
}
backend->overlapped.Pointer = handle;
if (!ReadDirectoryChangesW(
backend->fd,
backend->buffer,
sizeof(backend->buffer),
FALSE,
WIN32_WATCH_MASK,
nullptr,
&backend->overlapped,
nullptr))
{
tr_logAddError(fmt::format(_("Couldn't read '{path}'"), fmt::arg("path", path)));
goto fail;
}
if (evutil_socketpair(AF_INET, SOCK_STREAM, 0, backend->notify_pipe) == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't create pipe: {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto fail;
}
if ((backend->event = bufferevent_socket_new(tr_watchdir_get_event_base(handle), backend->notify_pipe[0], 0)) == nullptr)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't create event: {error} ({error_code})"),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
goto fail;
}
bufferevent_setwatermark(backend->event, EV_READ, sizeof(FILE_NOTIFY_INFORMATION), 0);
bufferevent_setcb(backend->event, &tr_watchdir_win32_on_event, nullptr, nullptr, handle);
bufferevent_enable(backend->event, EV_READ);
if ((backend->thread = (HANDLE)_beginthreadex(nullptr, 0, &tr_watchdir_win32_thread, handle, 0, nullptr)) == nullptr)
{
tr_logAddError(_("Couldn't create thread"));
goto fail;
}
/* Perform an initial scan on the directory */
if (event_base_once(
tr_watchdir_get_event_base(handle),
-1,
EV_TIMEOUT,
&tr_watchdir_win32_on_first_scan,
handle,
nullptr) == -1)
{
auto const error_code = errno;
tr_logAddError(fmt::format(
_("Couldn't scan '{path}': {error} ({error_code})"),
fmt::arg("path", path),
fmt::arg("error", tr_strerror(error_code)),
fmt::arg("error_code", error_code)));
}
return BACKEND_DOWNCAST(backend);
fail:
tr_watchdir_win32_free(BACKEND_DOWNCAST(backend));
return nullptr;
}
} // namespace libtransmission

View File

@@ -3,270 +3,43 @@
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#include <cstring> // strcmp()
#include <map>
#include <memory>
#include <string>
#include <string_view>
#include <unordered_set>
#include <event2/event.h>
#include <event2/util.h>
#include <fmt/core.h>
#define LIBTRANSMISSION_WATCHDIR_MODULE
#include <set>
#include "transmission.h"
#include "error.h"
#include "error-types.h"
#include "error.h"
#include "file.h"
#include "log.h"
#include "tr-assert.h"
#include "tr-strbuf.h"
#include "utils.h"
#include "watchdir.h"
#include "watchdir-common.h"
#include "utils.h" // for _()
#include "watchdir-base.h"
using namespace std::literals;
/***
****
***/
/* Non-static and mutable for unit tests */
auto tr_watchdir_retry_limit = size_t{ 3 };
auto tr_watchdir_retry_start_interval = timeval{ 1, 0 };
auto tr_watchdir_retry_max_interval = timeval{ 10, 0 };
class tr_watchdir_retry
namespace libtransmission
{
public:
tr_watchdir_retry(tr_watchdir_retry&&) = delete;
tr_watchdir_retry(tr_watchdir_retry const&) = delete;
tr_watchdir_retry& operator=(tr_watchdir_retry&&) = delete;
tr_watchdir_retry& operator=(tr_watchdir_retry const&) = delete;
tr_watchdir_retry(tr_watchdir_t handle_in, struct event_base* base, std::string_view name_in)
: handle_{ handle_in }
, name_{ name_in }
, timer_{ evtimer_new(base, onRetryTimer, this) }
{
restart();
}
~tr_watchdir_retry()
{
evtimer_del(timer_);
event_free(timer_);
}
void restart()
{
evtimer_del(timer_);
counter_ = 0U;
interval_ = tr_watchdir_retry_start_interval;
evtimer_add(timer_, &interval_);
}
bool bump()
{
evtimer_del(timer_);
if (++counter_ >= tr_watchdir_retry_limit)
{
return false;
}
// keep doubling the interval, but clamp at max_interval
evutil_timeradd(&interval_, &interval_, &interval_);
if (evutil_timercmp(&interval_, &tr_watchdir_retry_max_interval, >))
{
interval_ = tr_watchdir_retry_max_interval;
}
evtimer_add(timer_, &interval_);
return true;
}
[[nodiscard]] auto const& name() const noexcept
{
return name_;
}
private:
static void onRetryTimer(evutil_socket_t /*fd*/, short /*type*/, void* self);
tr_watchdir_t handle_ = nullptr;
std::string name_;
size_t counter_ = 0U;
struct event* const timer_;
struct timeval interval_ = tr_watchdir_retry_start_interval;
};
// TODO: notify / kqueue / win32 / generic should subclass from tr_watchdir
struct tr_watchdir
namespace
{
public:
tr_watchdir(
std::string_view path,
event_base* event_base,
tr_watchdir_cb callback,
void* callback_user_data,
bool force_generic = false)
: path_{ path }
, event_base_{ event_base }
, callback_{ callback }
, callback_user_data_{ callback_user_data }
[[nodiscard]] constexpr std::string_view actionToString(Watchdir::Action action)
{
switch (action)
{
// TODO: backends should be subclasses
if (!force_generic && (backend_ == nullptr))
{
#if defined(WITH_INOTIFY)
backend_ = tr_watchdir_inotify_new(this);
#elif defined(WITH_KQUEUE)
backend_ = tr_watchdir_kqueue_new(this);
#elif defined(_WIN32)
backend_ = tr_watchdir_win32_new(this);
#endif
case Watchdir::Action::Retry:
return "retry";
case Watchdir::Action::Done:
return "done";
}
if (backend_ == nullptr)
{
backend_ = tr_watchdir_generic_new(this);
}
}
return "???";
}
tr_watchdir(tr_watchdir&&) = delete;
tr_watchdir(tr_watchdir const&) = delete;
tr_watchdir& operator=(tr_watchdir&&) = delete;
tr_watchdir& operator=(tr_watchdir const&) = delete;
~tr_watchdir()
{
if (backend_ != nullptr)
{
backend_->free_func(backend_);
}
}
[[nodiscard]] constexpr auto const& path() const noexcept
{
return path_;
}
[[nodiscard]] constexpr auto* backend() noexcept
{
return backend_;
}
[[nodiscard]] constexpr auto* eventBase() noexcept
{
return event_base_;
}
tr_watchdir_status invoke(char const* name)
{
/* File may be gone while we're retrying */
if (!is_regular_file(path(), name))
{
return TR_WATCHDIR_IGNORE;
}
auto const ret = (*callback_)(this, name, callback_user_data_);
TR_ASSERT(ret == TR_WATCHDIR_ACCEPT || ret == TR_WATCHDIR_IGNORE || ret == TR_WATCHDIR_RETRY);
tr_logAddDebug(fmt::format("Callback decided to {:s} file '{:s}'", statusToString(ret), name));
return ret;
}
void erase(std::string_view name)
{
active_retries_.erase(std::string{ name });
}
void scan(std::unordered_set<std::string>* dir_entries)
{
auto new_dir_entries = std::unordered_set<std::string>{};
tr_error* error = nullptr;
auto const dir = tr_sys_dir_open(path().c_str(), &error);
if (dir == TR_BAD_SYS_DIR)
{
tr_logAddWarn(fmt::format(
_("Couldn't read '{path}': {error} ({error_code})"),
fmt::arg("path", path()),
fmt::arg("error", error->message),
fmt::arg("error_code", error->code)));
tr_error_free(error);
return;
}
char const* name = nullptr;
while ((name = tr_sys_dir_read_name(dir, &error)) != nullptr)
{
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0)
{
continue;
}
if (dir_entries != nullptr)
{
auto const namestr = std::string(name);
new_dir_entries.insert(namestr);
if (dir_entries->count(namestr) != 0)
{
continue;
}
}
process(name);
}
if (error != nullptr)
{
tr_logAddWarn(fmt::format(
_("Couldn't read '{path}': {error} ({error_code})"),
fmt::arg("path", path()),
fmt::arg("error", error->message),
fmt::arg("error_code", error->code)));
tr_error_free(error);
}
tr_sys_dir_close(dir);
if (dir_entries != nullptr)
{
*dir_entries = new_dir_entries;
}
}
void process(char const* name_cstr)
{
auto& retries = active_retries_;
auto name = std::string{ name_cstr };
auto it = retries.find(name);
if (it != std::end(retries)) // if we already have it, restart it
{
it->second->restart();
return;
}
if (invoke(name_cstr) != TR_WATCHDIR_RETRY)
{
return;
}
retries.try_emplace(name, std::make_unique<tr_watchdir_retry>(this, event_base_, name));
}
private:
static bool is_regular_file(std::string_view dir, std::string_view name)
{
[[nodiscard]] bool isRegularFile(std::string_view dir, std::string_view name)
{
auto const path = tr_pathbuf{ dir, '/', name };
tr_error* error = nullptr;
@@ -286,104 +59,100 @@ private:
}
return info && info->isFile();
}
}
static constexpr std::string_view statusToString(tr_watchdir_status status)
{
switch (status)
{
case TR_WATCHDIR_ACCEPT:
return "accept"sv;
} // namespace
case TR_WATCHDIR_IGNORE:
return "ignore"sv;
std::chrono::milliseconds Watchdir::generic_rescan_interval_ = Watchdir::DefaultGenericRescanInterval;
case TR_WATCHDIR_RETRY:
return "retry"sv;
default:
return "???"sv;
}
}
std::string const path_;
struct event_base* const event_base_;
tr_watchdir_backend* backend_ = nullptr;
tr_watchdir_cb const callback_;
void* const callback_user_data_;
std::map<std::string /*name*/, std::unique_ptr<tr_watchdir_retry>> active_retries_;
};
/***
****
***/
void tr_watchdir_retry::onRetryTimer(evutil_socket_t /*fd*/, short /*type*/, void* vself)
namespace impl
{
TR_ASSERT(vself != nullptr);
auto* const retry = static_cast<tr_watchdir_retry*>(vself);
auto const handle = retry->handle_;
if (handle->invoke(retry->name_.c_str()) == TR_WATCHDIR_RETRY)
{
if (retry->bump())
void BaseWatchdir::processFile(std::string_view basename)
{
if (!isRegularFile(dirname_, basename) || handled_.count(basename) != 0)
{
return;
}
tr_logAddWarn(fmt::format(_("Couldn't add torrent file '{path}'"), fmt::arg("path", retry->name())));
auto const action = callback_(dirname_, basename);
tr_logAddDebug(fmt::format("Callback decided to {:s} file '{:s}'", actionToString(action), basename));
if (action == Action::Retry)
{
auto const [iter, added] = pending_.try_emplace(std::string{ basename }, Pending{});
auto const now = std::chrono::steady_clock::now();
auto& info = iter->second;
++info.strikes;
info.last_kick_at = now;
if (info.first_kick_at == Timestamp{})
{
info.first_kick_at = now;
}
handle->erase(retry->name());
if (now - info.first_kick_at > timeoutDuration())
{
tr_logAddWarn(fmt::format(_("Couldn't add torrent file '{path}'"), fmt::arg("path", basename)));
pending_.erase(iter);
}
else
{
setNextKickTime(info);
restartTimerIfPending();
}
}
else if (action == Action::Done)
{
handled_.insert(std::string{ basename });
}
}
/***
****
***/
tr_watchdir_t tr_watchdir_new(
std::string_view path,
tr_watchdir_cb callback,
void* callback_user_data,
struct event_base* event_base,
bool force_generic)
void BaseWatchdir::scan()
{
return new tr_watchdir{ path, event_base, callback, callback_user_data, force_generic };
auto new_dir_entries = std::set<std::string>{};
tr_error* error = nullptr;
auto const dir = tr_sys_dir_open(dirname_.c_str(), &error);
if (dir == TR_BAD_SYS_DIR)
{
tr_logAddWarn(fmt::format(
_("Couldn't read '{path}': {error} ({error_code})"),
fmt::arg("path", dirname()),
fmt::arg("error", error->message),
fmt::arg("error_code", error->code)));
tr_error_free(error);
return;
}
for (;;)
{
char const* const name = tr_sys_dir_read_name(dir, &error);
if (name == nullptr)
{
break;
}
if ("."sv == name || ".."sv == name)
{
continue;
}
processFile(name);
}
if (error != nullptr)
{
tr_logAddWarn(fmt::format(
_("Couldn't read '{path}': {error} ({error_code})"),
fmt::arg("path", dirname()),
fmt::arg("error", error->message),
fmt::arg("error_code", error->code)));
tr_error_free(error);
}
tr_sys_dir_close(dir);
}
void tr_watchdir_free(tr_watchdir_t handle)
{
delete handle;
}
char const* tr_watchdir_get_path(tr_watchdir_t handle)
{
TR_ASSERT(handle != nullptr);
return handle->path().c_str();
}
tr_watchdir_backend* tr_watchdir_get_backend(tr_watchdir_t handle)
{
TR_ASSERT(handle != nullptr);
return handle->backend();
}
struct event_base* tr_watchdir_get_event_base(tr_watchdir_t handle)
{
TR_ASSERT(handle != nullptr);
return handle->eventBase();
}
void tr_watchdir_process(tr_watchdir_t handle, char const* name)
{
handle->process(name);
}
void tr_watchdir_scan(tr_watchdir_t handle, std::unordered_set<std::string>* dir_entries)
{
handle->scan(dir_entries);
}
} // namespace impl
} // namespace libtransmission

View File

@@ -5,30 +5,65 @@
#pragma once
#include <functional>
#include <memory>
#include <string_view>
struct event_base;
#include "timer.h"
using tr_watchdir_t = struct tr_watchdir*;
enum tr_watchdir_status
extern "C"
{
TR_WATCHDIR_ACCEPT,
TR_WATCHDIR_IGNORE,
TR_WATCHDIR_RETRY
struct event_base;
}
namespace libtransmission
{
class Watchdir
{
public:
Watchdir() = default;
virtual ~Watchdir() = default;
Watchdir(Watchdir&&) = delete;
Watchdir(Watchdir const&) = delete;
Watchdir& operator=(Watchdir&&) = delete;
Watchdir& operator=(Watchdir const&) = delete;
[[nodiscard]] virtual std::string_view dirname() const noexcept = 0;
enum class Action
{
Done,
Retry
};
using Callback = std::function<Action(std::string_view dirname, std::string_view basename)>;
[[nodiscard]] static auto genericRescanInterval() noexcept
{
return generic_rescan_interval_;
}
static void setGenericRescanInterval(std::chrono::milliseconds interval) noexcept
{
generic_rescan_interval_ = interval;
}
static std::unique_ptr<Watchdir> create(
std::string_view dirname,
Callback callback,
libtransmission::TimerMaker& timer_maker,
struct event_base* evbase);
static std::unique_ptr<Watchdir> createGeneric(
std::string_view dirname,
Callback callback,
libtransmission::TimerMaker& timer_maker,
std::chrono::milliseconds rescan_interval = generic_rescan_interval_);
private:
static constexpr std::chrono::milliseconds DefaultGenericRescanInterval{ 1000 };
static std::chrono::milliseconds generic_rescan_interval_;
};
using tr_watchdir_cb = tr_watchdir_status (*)(tr_watchdir_t handle, char const* name, void* user_data);
/* ... */
tr_watchdir_t tr_watchdir_new(
std::string_view path,
tr_watchdir_cb callback,
void* callback_user_data,
struct event_base* event_base,
bool force_generic);
void tr_watchdir_free(tr_watchdir_t handle);
char const* tr_watchdir_get_path(tr_watchdir_t handle);
} // namespace libtransmission

View File

@@ -99,7 +99,7 @@ TEST_P(SubprocessTest, SpawnAsyncMissingExec)
TEST_P(SubprocessTest, SpawnAsyncArgs)
{
auto const result_path = buildSandboxPath("result.txt");
bool const allow_batch_metachars = TR_IF_WIN32(false, true) || !tr_str_has_suffix(self_path_.c_str(), ".cmd");
bool const allow_batch_metachars = TR_IF_WIN32(false, true) || !tr_strvEndsWith(tr_strlower(self_path_), ".cmd"sv);
auto const test_arg1 = std::string{ "arg1 " };
auto const test_arg2 = std::string{ " arg2" };

View File

@@ -256,6 +256,8 @@ protected:
0600,
nullptr);
blockingFileWrite(fd, payload, n);
tr_sys_file_flush(fd);
tr_sys_file_flush(fd);
tr_sys_file_close(fd);
sync();

View File

@@ -3,36 +3,38 @@
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#include <chrono>
#include <memory>
#include <string>
#include <vector>
#define LIBTRANSMISSION_WATCHDIR_MODULE
#include "transmission.h"
#include "file.h"
#include "net.h"
#include "watchdir.h"
#include "watchdir-base.h"
#include "timer-ev.h"
#include "test-fixtures.h"
#include <event2/event.h>
using namespace std::literals;
/***
****
***/
extern struct timeval tr_watchdir_generic_interval;
extern size_t tr_watchdir_retry_limit;
extern struct timeval tr_watchdir_retry_start_interval;
extern struct timeval tr_watchdir_retry_max_interval;
static auto constexpr GenericRescanInterval = 100ms;
static auto constexpr RetryDuration = 100ms;
namespace
{
auto constexpr FiftyMsec = timeval{ 0, 50000 };
auto constexpr OneHundredMsec = timeval{ 0, 100000 };
auto constexpr TwoHundredMsec = timeval{ 0, 200000 };
} // namespace
// should be at least 2x the watchdir-generic size to ensure that
// we have time to pump all events at least once in processEvents()
static auto constexpr ProcessEventsTimeout = 300ms;
static_assert(ProcessEventsTimeout > GenericRescanInterval);
namespace libtransmission
{
@@ -52,15 +54,15 @@ class WatchDirTest
{
private:
std::shared_ptr<struct event_base> ev_base_;
std::unique_ptr<libtransmission::TimerMaker> timer_maker_;
protected:
void SetUp() override
{
SandboxedTest::SetUp();
ev_base_.reset(event_base_new(), event_base_free);
// speed up generic implementation
tr_watchdir_generic_interval = OneHundredMsec;
timer_maker_ = std::make_unique<libtransmission::EvTimerMaker>(ev_base_.get());
Watchdir::setGenericRescanInterval(GenericRescanInterval);
}
void TearDown() override
@@ -70,78 +72,60 @@ protected:
SandboxedTest::TearDown();
}
auto createWatchDir(std::string const& path, tr_watchdir_cb cb, void* cb_data)
auto createWatchDir(std::string_view path, Watchdir::Callback callback)
{
auto const force_generic = GetParam() == WatchMode::GENERIC;
return tr_watchdir_new(path.c_str(), cb, cb_data, ev_base_.get(), force_generic);
auto watchdir = force_generic ?
Watchdir::createGeneric(path, std::move(callback), *timer_maker_, GenericRescanInterval) :
Watchdir::create(path, std::move(callback), *timer_maker_, ev_base_.get());
dynamic_cast<impl::BaseWatchdir*>(watchdir.get())->setRetryDuration(RetryDuration);
return watchdir;
}
std::string createFile(std::string const& parent_dir, std::string const& name)
void createFile(std::string_view dirname, std::string_view basename, std::string_view contents = ""sv)
{
auto path = parent_dir;
path += TR_PATH_DELIMITER;
path += name;
createFileWithContents(path, "");
return path;
createFileWithContents(tr_pathbuf{ dirname, '/', basename }, contents);
}
static std::string createDir(std::string const& parent_dir, std::string const& name)
static std::string createDir(std::string_view dirname, std::string_view basename)
{
auto path = parent_dir;
auto path = std::string{ dirname };
path += TR_PATH_DELIMITER;
path += name;
path += basename;
tr_sys_dir_create(path, 0, 0700);
return path;
}
void processEvents()
void processEvents(std::chrono::milliseconds wait_interval = ProcessEventsTimeout)
{
event_base_loopexit(ev_base_.get(), &TwoHundredMsec);
auto tv = timeval{};
auto const seconds = std::chrono::duration_cast<std::chrono::seconds>(wait_interval);
tv.tv_sec = static_cast<decltype(tv.tv_sec)>(seconds.count());
wait_interval -= seconds;
auto const usec = std::chrono::duration_cast<std::chrono::microseconds>(wait_interval);
tv.tv_usec = static_cast<decltype(tv.tv_usec)>(usec.count());
event_base_loopexit(ev_base_.get(), &tv);
event_base_dispatch(ev_base_.get());
}
struct CallbackData
{
explicit CallbackData(tr_watchdir_status status = TR_WATCHDIR_ACCEPT)
: result{ status }
{
}
tr_watchdir_status result{};
tr_watchdir_t wd = {};
std::string name = {};
};
static tr_watchdir_status callback(tr_watchdir_t wd, char const* name, void* vdata) noexcept
{
auto* data = static_cast<CallbackData*>(vdata);
auto const result = data->result;
if (result != TR_WATCHDIR_RETRY)
{
data->wd = wd;
data->name = name;
}
return result;
}
};
TEST_P(WatchDirTest, construct)
{
auto const path = sandboxDir();
auto wd = createWatchDir(path, &callback, nullptr);
EXPECT_NE(nullptr, wd);
EXPECT_TRUE(tr_sys_path_is_same(path.c_str(), tr_watchdir_get_path(wd)));
auto callback = [](std::string_view /*dirname*/, std::string_view /*basename*/)
{
return Watchdir::Action::Done;
};
auto watchdir = createWatchDir(path, callback);
EXPECT_TRUE(watchdir);
EXPECT_EQ(path, watchdir->dirname());
processEvents();
tr_watchdir_free(wd);
}
TEST_P(WatchDirTest, initialScan)
@@ -151,214 +135,114 @@ TEST_P(WatchDirTest, initialScan)
// setup: start with an empty directory.
// this block confirms that it's empty
{
auto wd_data = CallbackData(TR_WATCHDIR_ACCEPT);
auto wd = createWatchDir(path, &callback, &wd_data);
EXPECT_NE(nullptr, wd);
auto called = bool{ false };
auto callback = [&called](std::string_view /*dirname*/, std::string_view /*basename*/)
{
called = true;
return Watchdir::Action::Done;
};
auto watchdir = createWatchDir(path, callback);
EXPECT_TRUE(watchdir);
processEvents();
EXPECT_EQ(nullptr, wd_data.wd);
EXPECT_EQ("", wd_data.name);
tr_watchdir_free(wd);
EXPECT_FALSE(called);
}
// add a file
auto const base_name = std::string{ "test.txt" };
auto const base_name = "test.txt"sv;
createFile(path, base_name);
// confirm that a wd will pick up the file that
// was created before the wd was instantiated
{
auto wd_data = CallbackData(TR_WATCHDIR_ACCEPT);
auto wd = createWatchDir(path, &callback, &wd_data);
EXPECT_NE(nullptr, wd);
auto names = std::set<std::string>{};
auto callback = [&names](std::string_view /*dirname*/, std::string_view basename)
{
names.insert(std::string{ basename });
return Watchdir::Action::Done;
};
auto watchdir = createWatchDir(path, callback);
EXPECT_TRUE(watchdir);
processEvents();
EXPECT_EQ(wd, wd_data.wd);
EXPECT_EQ(base_name, wd_data.name);
tr_watchdir_free(wd);
EXPECT_EQ(1U, std::size(names));
EXPECT_EQ(base_name, *names.begin());
}
}
TEST_P(WatchDirTest, watch)
{
auto const path = sandboxDir();
auto const dirname = sandboxDir();
// create a new watchdir and confirm it's empty
auto wd_data = CallbackData(TR_WATCHDIR_ACCEPT);
auto wd = createWatchDir(path, &callback, &wd_data);
EXPECT_NE(nullptr, wd);
auto names = std::vector<std::string>{};
auto callback = [&names](std::string_view /*dirname*/, std::string_view basename)
{
names.emplace_back(std::string{ basename });
return Watchdir::Action::Done;
};
auto watchdir = createWatchDir(dirname, callback);
processEvents();
EXPECT_EQ(nullptr, wd_data.wd);
EXPECT_EQ("", wd_data.name);
EXPECT_TRUE(watchdir);
EXPECT_TRUE(std::empty(names));
// test that a new file in an empty directory shows up
auto const file1 = std::string{ "test1" };
createFile(path, file1);
auto const file1 = "test1"sv;
createFile(dirname, file1);
processEvents();
EXPECT_EQ(wd, wd_data.wd);
EXPECT_EQ(file1, wd_data.name);
EXPECT_EQ(1U, std::size(names));
if (!std::empty(names))
{
EXPECT_EQ(file1, names.front());
}
// test that a new file in a nonempty directory shows up
wd_data = CallbackData(TR_WATCHDIR_ACCEPT);
auto const file2 = std::string{ "test2" };
createFile(path, file2);
names.clear();
auto const file2 = "test2"sv;
createFile(dirname, file2);
processEvents();
EXPECT_EQ(wd, wd_data.wd);
EXPECT_EQ(file2, wd_data.name);
processEvents();
EXPECT_EQ(1U, std::size(names));
if (!std::empty(names))
{
EXPECT_EQ(file2, names.front());
}
// test that folders don't trigger the callback
wd_data = CallbackData(TR_WATCHDIR_ACCEPT);
createDir(path, "test3");
names.clear();
createDir(dirname, "test3"sv);
processEvents();
EXPECT_EQ(nullptr, wd_data.wd);
EXPECT_EQ("", wd_data.name);
// cleanup
tr_watchdir_free(wd);
}
TEST_P(WatchDirTest, watchTwoDirs)
{
auto top = sandboxDir();
// create two empty directories and watch them
auto wd1_data = CallbackData(TR_WATCHDIR_ACCEPT);
auto const dir1 = createDir(top, "a");
auto wd1 = createWatchDir(dir1, &callback, &wd1_data);
EXPECT_NE(wd1, nullptr);
auto wd2_data = CallbackData(TR_WATCHDIR_ACCEPT);
auto const dir2 = createDir(top, "b");
auto wd2 = createWatchDir(dir2, &callback, &wd2_data);
EXPECT_NE(wd2, nullptr);
processEvents();
EXPECT_EQ(nullptr, wd1_data.wd);
EXPECT_EQ("", wd1_data.name);
EXPECT_EQ(nullptr, wd2_data.wd);
EXPECT_EQ("", wd2_data.name);
// add a file into directory 1 and confirm it triggers
// a callback with the right wd
auto const file1 = std::string{ "test.txt" };
createFile(dir1, file1);
processEvents();
EXPECT_EQ(wd1, wd1_data.wd);
EXPECT_EQ(file1, wd1_data.name);
EXPECT_EQ(nullptr, wd2_data.wd);
EXPECT_EQ("", wd2_data.name);
// add a file into directory 2 and confirm it triggers
// a callback with the right wd
wd1_data = CallbackData(TR_WATCHDIR_ACCEPT);
wd2_data = CallbackData(TR_WATCHDIR_ACCEPT);
auto const file2 = std::string{ "test2.txt" };
createFile(dir2, file2);
processEvents();
EXPECT_EQ(nullptr, wd1_data.wd);
EXPECT_EQ("", wd1_data.name);
EXPECT_EQ(wd2, wd2_data.wd);
EXPECT_EQ(file2, wd2_data.name);
// TODO(ckerr): watchdir.c seems to treat IGNORE and ACCEPT identically
// so I'm not sure what's intended or what this is supposed to
// be testing.
wd1_data = CallbackData(TR_WATCHDIR_IGNORE);
wd2_data = CallbackData(TR_WATCHDIR_IGNORE);
auto const file3 = std::string{ "test3.txt" };
auto const file4 = std::string{ "test4.txt" };
createFile(dir1, file3);
createFile(dir2, file4);
processEvents();
EXPECT_EQ(wd1, wd1_data.wd);
EXPECT_EQ(file3, wd1_data.name);
EXPECT_EQ(wd2, wd2_data.wd);
EXPECT_EQ(file4, wd2_data.name);
// confirm that callbacks don't get confused
// when there's a new file in directory 'a'
// and a new directory in directory 'b'
wd1_data = CallbackData(TR_WATCHDIR_ACCEPT);
wd2_data = CallbackData(TR_WATCHDIR_ACCEPT);
auto const file5 = std::string{ "test5.txt" };
createFile(dir1, file5);
createDir(dir2, file5);
processEvents();
EXPECT_EQ(wd1, wd1_data.wd);
EXPECT_EQ(file5, wd1_data.name);
EXPECT_EQ(nullptr, wd2_data.wd);
EXPECT_EQ("", wd2_data.name);
// reverse the order of the previous test:
// confirm that callbacks don't get confused
// when there's a new file in directory 'b'
// and a new directory in directory 'a'
wd1_data = CallbackData(TR_WATCHDIR_ACCEPT);
wd2_data = CallbackData(TR_WATCHDIR_ACCEPT);
auto const file6 = std::string{ "test6.txt" };
createDir(dir1, file6);
createFile(dir2, file6);
processEvents();
EXPECT_EQ(nullptr, wd1_data.wd);
EXPECT_EQ("", wd1_data.name);
EXPECT_EQ(wd2, wd2_data.wd);
EXPECT_EQ(file6, wd2_data.name);
// confirm that creating new directories in BOTH
// watchdirs still triggers no callbacks
wd1_data = CallbackData(TR_WATCHDIR_ACCEPT);
wd2_data = CallbackData(TR_WATCHDIR_ACCEPT);
auto const file7 = std::string{ "test7.txt" };
auto const file8 = std::string{ "test8.txt" };
createDir(dir1, file7);
createDir(dir2, file8);
processEvents();
EXPECT_EQ(nullptr, wd1_data.wd);
EXPECT_EQ("", wd1_data.name);
EXPECT_EQ(nullptr, wd2_data.wd);
EXPECT_EQ("", wd2_data.name);
// cleanup
tr_watchdir_free(wd2);
tr_watchdir_free(wd1);
EXPECT_TRUE(std::empty(names));
}
TEST_P(WatchDirTest, retry)
{
auto const path = sandboxDir();
// tune retry logic
tr_watchdir_retry_limit = 10;
tr_watchdir_retry_start_interval = FiftyMsec;
tr_watchdir_retry_max_interval = tr_watchdir_retry_start_interval;
// test setup:
// Start watching the test directory.
// Create a file and return 'retry' back to the watchdir code
// from our callback. This should cause the wd to wait a bit
// and try again.
auto wd_data = CallbackData(TR_WATCHDIR_RETRY);
auto wd = createWatchDir(path, &callback, &wd_data);
EXPECT_NE(nullptr, wd);
processEvents();
EXPECT_EQ(nullptr, wd_data.wd);
EXPECT_EQ("", wd_data.name);
// Create a file and return 'retry' back to the watchdir code from our callback.
// This should cause the wd to wait a bit and try again.
auto names = std::vector<std::string>{};
auto callback = [&names](std::string_view /*dirname*/, std::string_view basename)
{
names.emplace_back(std::string{ basename });
return Watchdir::Action::Retry;
};
auto watchdir = createWatchDir(path, callback);
auto constexpr FastRetryWaitTime = 20ms;
auto constexpr ThreeRetries = FastRetryWaitTime * 4;
dynamic_cast<impl::BaseWatchdir*>(watchdir.get())->setRetryDuration(FastRetryWaitTime);
auto const test_file = std::string{ "test" };
processEvents(ThreeRetries);
EXPECT_EQ(0U, std::size(names));
auto const test_file = "test.txt"sv;
createFile(path, test_file);
processEvents();
EXPECT_EQ(nullptr, wd_data.wd);
EXPECT_EQ("", wd_data.name);
// confirm that wd retries.
// return 'accept' in the callback so it won't keep retrying.
wd_data = CallbackData(TR_WATCHDIR_ACCEPT);
processEvents();
EXPECT_EQ(wd, wd_data.wd);
EXPECT_EQ(test_file, wd_data.name);
tr_watchdir_free(wd);
processEvents(ThreeRetries);
EXPECT_LE(2, std::size(names));
for (auto const& name : names)
{
EXPECT_EQ(test_file, name);
}
}
INSTANTIATE_TEST_SUITE_P( //

View File

@@ -10,8 +10,6 @@
#include <string_view>
#include <vector>
#include <event2/buffer.h>
#include <libtransmission/transmission.h>
#include <libtransmission/error.h>