diff --git a/src/event/Loop.cxx b/src/event/Loop.cxx index ace654015..376e10300 100644 --- a/src/event/Loop.cxx +++ b/src/event/Loop.cxx @@ -37,18 +37,27 @@ EventLoop::TimerCompare::operator()(const TimerEvent &a, return a.due < b.due; } -EventLoop::EventLoop(ThreadId _thread) - :SocketMonitor(*this), +EventLoop::EventLoop( +#ifdef HAVE_THREADED_EVENT_LOOP + ThreadId _thread +#endif + ) + : +#ifdef HAVE_THREADED_EVENT_LOOP + SocketMonitor(*this), + thread(_thread), /* if this instance is hosted by an EventThread (no ThreadId known yet) then we're not yet alive until the thread is started; for the main EventLoop instance, we assume it's already alive, because nobody but EventThread will call SetAlive() */ alive(!_thread.IsNull()), - quit(false), - thread(_thread) +#endif + quit(false) { +#ifdef HAVE_THREADED_EVENT_LOOP SocketMonitor::Open(SocketDescriptor(wake_fd.Get())); +#endif } EventLoop::~EventLoop() noexcept @@ -83,13 +92,17 @@ EventLoop::Break() noexcept if (quit.exchange(true)) return; +#ifdef HAVE_THREADED_EVENT_LOOP wake_fd.Write(); +#endif } bool EventLoop::Abandon(int _fd) noexcept { +#ifdef HAVE_THREADED_EVENT_LOOP assert(!IsAlive() || IsInside()); +#endif return poll_group.Abandon(_fd); } @@ -97,7 +110,9 @@ EventLoop::Abandon(int _fd) noexcept bool EventLoop::RemoveFD(int _fd) noexcept { +#ifdef HAVE_THREADED_EVENT_LOOP assert(!IsAlive() || IsInside()); +#endif return poll_group.Remove(_fd); } @@ -169,15 +184,19 @@ ExportTimeoutMS(Event::Duration timeout) void EventLoop::Run() noexcept { +#ifdef HAVE_THREADED_EVENT_LOOP if (thread.IsNull()) thread = ThreadId::GetCurrent(); +#endif assert(IsInside()); assert(!quit); +#ifdef HAVE_THREADED_EVENT_LOOP assert(alive); assert(busy); SocketMonitor::Schedule(SocketMonitor::READ); +#endif AtScopeExit(this) { #ifdef HAVE_URING /* make sure that the Uring::Manager gets destructed @@ -188,7 +207,9 @@ EventLoop::Run() noexcept uring_initialized = false; #endif +#ifdef HAVE_THREADED_EVENT_LOOP SocketMonitor::Cancel(); +#endif }; do { @@ -212,6 +233,7 @@ EventLoop::Run() noexcept return; } +#ifdef HAVE_THREADED_EVENT_LOOP /* try to handle DeferEvents without WakeFD overhead */ { @@ -225,6 +247,7 @@ EventLoop::Run() noexcept new timeout */ continue; } +#endif /* wait for new event */ @@ -240,10 +263,12 @@ EventLoop::Run() noexcept now = std::chrono::steady_clock::now(); +#ifdef HAVE_THREADED_EVENT_LOOP { const std::lock_guard<Mutex> lock(mutex); busy = true; } +#endif /* invoke sockets */ while (!ready_sockets.empty() && !quit) { @@ -254,11 +279,15 @@ EventLoop::Run() noexcept } } while (!quit); +#ifdef HAVE_THREADED_EVENT_LOOP #ifndef NDEBUG assert(thread.IsInside()); #endif +#endif } +#ifdef HAVE_THREADED_EVENT_LOOP + void EventLoop::AddDeferred(DeferEvent &d) noexcept { @@ -316,3 +345,5 @@ EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept return true; } + +#endif diff --git a/src/event/Loop.hxx b/src/event/Loop.hxx index 15f53674f..257979845 100644 --- a/src/event/Loop.hxx +++ b/src/event/Loop.hxx @@ -22,11 +22,15 @@ #include "Chrono.hxx" #include "PollGroup.hxx" -#include "WakeFD.hxx" #include "SocketMonitor.hxx" +#include "event/Features.h" +#include "util/Compiler.h" + +#ifdef HAVE_THREADED_EVENT_LOOP +#include "WakeFD.hxx" #include "thread/Id.hxx" #include "thread/Mutex.hxx" -#include "util/Compiler.h" +#endif #include <boost/intrusive/set.hpp> #include <boost/intrusive/list.hpp> @@ -54,9 +58,14 @@ class DeferEvent; * * @see SocketMonitor, MultiSocketMonitor, TimerEvent, IdleEvent */ -class EventLoop final : SocketMonitor +class EventLoop final +#ifdef HAVE_THREADED_EVENT_LOOP + : SocketMonitor +#endif { +#ifdef HAVE_THREADED_EVENT_LOOP WakeFD wake_fd; +#endif struct TimerCompare { constexpr bool operator()(const TimerEvent &a, @@ -76,6 +85,7 @@ class EventLoop final : SocketMonitor boost::intrusive::constant_time_size<false>>; IdleList idle; +#ifdef HAVE_THREADED_EVENT_LOOP Mutex mutex; using DeferredList = @@ -83,6 +93,7 @@ class EventLoop final : SocketMonitor boost::intrusive::base_hook<boost::intrusive::list_base_hook<>>, boost::intrusive::constant_time_size<false>>; DeferredList deferred; +#endif using ReadySocketList = boost::intrusive::list<SocketMonitor, @@ -103,6 +114,12 @@ class EventLoop final : SocketMonitor Event::Clock::time_point now = Event::Clock::now(); +#ifdef HAVE_THREADED_EVENT_LOOP + /** + * A reference to the thread that is currently inside Run(). + */ + ThreadId thread = ThreadId::Null(); + /** * Is this #EventLoop alive, i.e. can events be scheduled? * This is used by BlockingCall() to determine whether @@ -110,6 +127,7 @@ class EventLoop final : SocketMonitor * there's no #EventThread yet/anymore). */ bool alive; +#endif std::atomic_bool quit; @@ -119,6 +137,7 @@ class EventLoop final : SocketMonitor */ bool again; +#ifdef HAVE_THREADED_EVENT_LOOP /** * True when handling callbacks, false when waiting for I/O or * timeout. @@ -126,6 +145,7 @@ class EventLoop final : SocketMonitor * Protected with #mutex. */ bool busy = true; +#endif #ifdef HAVE_URING bool uring_initialized = false; @@ -133,18 +153,17 @@ class EventLoop final : SocketMonitor PollGroup poll_group; - /** - * A reference to the thread that is currently inside Run(). - */ - ThreadId thread = ThreadId::Null(); - public: /** * Throws on error. */ +#ifdef HAVE_THREADED_EVENT_LOOP explicit EventLoop(ThreadId _thread); EventLoop():EventLoop(ThreadId::GetCurrent()) {} +#else + EventLoop(); +#endif ~EventLoop() noexcept; @@ -152,7 +171,9 @@ public: * A caching wrapper for Event::Clock::now(). */ auto GetTime() const { +#ifdef HAVE_THREADED_EVENT_LOOP assert(IsInside()); +#endif return now; } @@ -170,13 +191,17 @@ public: void Break() noexcept; bool AddFD(int _fd, unsigned flags, SocketMonitor &m) noexcept { +#ifdef HAVE_THREADED_EVENT_LOOP assert(!IsAlive() || IsInside()); +#endif return poll_group.Add(_fd, flags, &m); } bool ModifyFD(int _fd, unsigned flags, SocketMonitor &m) noexcept { +#ifdef HAVE_THREADED_EVENT_LOOP assert(!IsAlive() || IsInside()); +#endif return poll_group.Modify(_fd, flags, &m); } @@ -195,6 +220,7 @@ public: void AddTimer(TimerEvent &t, Event::Duration d) noexcept; +#ifdef HAVE_THREADED_EVENT_LOOP /** * Schedule a call to DeferEvent::RunDeferred(). * @@ -209,6 +235,7 @@ public: * This method is thread-safe. */ void RemoveDeferred(DeferEvent &d) noexcept; +#endif /** * The main function of this class. It will loop until @@ -217,12 +244,14 @@ public: void Run() noexcept; private: +#ifdef HAVE_THREADED_EVENT_LOOP /** * Invoke all pending DeferEvents. * * Caller must lock the mutex. */ void HandleDeferred() noexcept; +#endif /** * Invoke all expired #TimerEvent instances and return the @@ -231,9 +260,12 @@ private: */ Event::Duration HandleTimers() noexcept; +#ifdef HAVE_THREADED_EVENT_LOOP bool OnSocketReady(unsigned flags) noexcept override; +#endif public: +#ifdef HAVE_THREADED_EVENT_LOOP void SetAlive(bool _alive) noexcept { alive = _alive; } @@ -241,13 +273,18 @@ public: bool IsAlive() const noexcept { return alive; } +#endif /** * Are we currently running inside this EventLoop's thread? */ gcc_pure bool IsInside() const noexcept { +#ifdef HAVE_THREADED_EVENT_LOOP return thread.IsInside(); +#else + return true; +#endif } }; diff --git a/src/event/meson.build b/src/event/meson.build index ea4754923..556232cad 100644 --- a/src/event/meson.build +++ b/src/event/meson.build @@ -2,6 +2,7 @@ event_features = configuration_data() event_features.set('USE_EVENTFD', is_linux and get_option('eventfd')) event_features.set('USE_SIGNALFD', is_linux and get_option('signalfd')) event_features.set('USE_EPOLL', is_linux and get_option('epoll')) +event_features.set('HAVE_THREADED_EVENT_LOOP', true) configure_file(output: 'Features.h', configuration: event_features) event_sources = []