event/Loop: add compile-time option to disable multithreading

Not for MPD, but for other applications which might want to copy its
event loop, but do not need multi-threading.
This commit is contained in:
Max Kellermann 2020-10-13 15:50:12 +02:00
parent e9f6af61f9
commit 38dab040b3
3 changed files with 81 additions and 12 deletions

View File

@ -37,18 +37,27 @@ EventLoop::TimerCompare::operator()(const TimerEvent &a,
return a.due < b.due; return a.due < b.due;
} }
EventLoop::EventLoop(ThreadId _thread) EventLoop::EventLoop(
:SocketMonitor(*this), #ifdef HAVE_THREADED_EVENT_LOOP
ThreadId _thread
#endif
)
:
#ifdef HAVE_THREADED_EVENT_LOOP
SocketMonitor(*this),
thread(_thread),
/* if this instance is hosted by an EventThread (no ThreadId /* if this instance is hosted by an EventThread (no ThreadId
known yet) then we're not yet alive until the thread is known yet) then we're not yet alive until the thread is
started; for the main EventLoop instance, we assume it's started; for the main EventLoop instance, we assume it's
already alive, because nobody but EventThread will call already alive, because nobody but EventThread will call
SetAlive() */ SetAlive() */
alive(!_thread.IsNull()), alive(!_thread.IsNull()),
quit(false), #endif
thread(_thread) quit(false)
{ {
#ifdef HAVE_THREADED_EVENT_LOOP
SocketMonitor::Open(SocketDescriptor(wake_fd.Get())); SocketMonitor::Open(SocketDescriptor(wake_fd.Get()));
#endif
} }
EventLoop::~EventLoop() noexcept EventLoop::~EventLoop() noexcept
@ -83,13 +92,17 @@ EventLoop::Break() noexcept
if (quit.exchange(true)) if (quit.exchange(true))
return; return;
#ifdef HAVE_THREADED_EVENT_LOOP
wake_fd.Write(); wake_fd.Write();
#endif
} }
bool bool
EventLoop::Abandon(int _fd) noexcept EventLoop::Abandon(int _fd) noexcept
{ {
#ifdef HAVE_THREADED_EVENT_LOOP
assert(!IsAlive() || IsInside()); assert(!IsAlive() || IsInside());
#endif
return poll_group.Abandon(_fd); return poll_group.Abandon(_fd);
} }
@ -97,7 +110,9 @@ EventLoop::Abandon(int _fd) noexcept
bool bool
EventLoop::RemoveFD(int _fd) noexcept EventLoop::RemoveFD(int _fd) noexcept
{ {
#ifdef HAVE_THREADED_EVENT_LOOP
assert(!IsAlive() || IsInside()); assert(!IsAlive() || IsInside());
#endif
return poll_group.Remove(_fd); return poll_group.Remove(_fd);
} }
@ -169,15 +184,19 @@ ExportTimeoutMS(Event::Duration timeout)
void void
EventLoop::Run() noexcept EventLoop::Run() noexcept
{ {
#ifdef HAVE_THREADED_EVENT_LOOP
if (thread.IsNull()) if (thread.IsNull())
thread = ThreadId::GetCurrent(); thread = ThreadId::GetCurrent();
#endif
assert(IsInside()); assert(IsInside());
assert(!quit); assert(!quit);
#ifdef HAVE_THREADED_EVENT_LOOP
assert(alive); assert(alive);
assert(busy); assert(busy);
SocketMonitor::Schedule(SocketMonitor::READ); SocketMonitor::Schedule(SocketMonitor::READ);
#endif
AtScopeExit(this) { AtScopeExit(this) {
#ifdef HAVE_URING #ifdef HAVE_URING
/* make sure that the Uring::Manager gets destructed /* make sure that the Uring::Manager gets destructed
@ -188,7 +207,9 @@ EventLoop::Run() noexcept
uring_initialized = false; uring_initialized = false;
#endif #endif
#ifdef HAVE_THREADED_EVENT_LOOP
SocketMonitor::Cancel(); SocketMonitor::Cancel();
#endif
}; };
do { do {
@ -212,6 +233,7 @@ EventLoop::Run() noexcept
return; return;
} }
#ifdef HAVE_THREADED_EVENT_LOOP
/* try to handle DeferEvents without WakeFD /* try to handle DeferEvents without WakeFD
overhead */ overhead */
{ {
@ -225,6 +247,7 @@ EventLoop::Run() noexcept
new timeout */ new timeout */
continue; continue;
} }
#endif
/* wait for new event */ /* wait for new event */
@ -240,10 +263,12 @@ EventLoop::Run() noexcept
now = std::chrono::steady_clock::now(); now = std::chrono::steady_clock::now();
#ifdef HAVE_THREADED_EVENT_LOOP
{ {
const std::lock_guard<Mutex> lock(mutex); const std::lock_guard<Mutex> lock(mutex);
busy = true; busy = true;
} }
#endif
/* invoke sockets */ /* invoke sockets */
while (!ready_sockets.empty() && !quit) { while (!ready_sockets.empty() && !quit) {
@ -254,11 +279,15 @@ EventLoop::Run() noexcept
} }
} while (!quit); } while (!quit);
#ifdef HAVE_THREADED_EVENT_LOOP
#ifndef NDEBUG #ifndef NDEBUG
assert(thread.IsInside()); assert(thread.IsInside());
#endif #endif
#endif
} }
#ifdef HAVE_THREADED_EVENT_LOOP
void void
EventLoop::AddDeferred(DeferEvent &d) noexcept EventLoop::AddDeferred(DeferEvent &d) noexcept
{ {
@ -316,3 +345,5 @@ EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
return true; return true;
} }
#endif

View File

@ -22,11 +22,15 @@
#include "Chrono.hxx" #include "Chrono.hxx"
#include "PollGroup.hxx" #include "PollGroup.hxx"
#include "WakeFD.hxx"
#include "SocketMonitor.hxx" #include "SocketMonitor.hxx"
#include "event/Features.h"
#include "util/Compiler.h"
#ifdef HAVE_THREADED_EVENT_LOOP
#include "WakeFD.hxx"
#include "thread/Id.hxx" #include "thread/Id.hxx"
#include "thread/Mutex.hxx" #include "thread/Mutex.hxx"
#include "util/Compiler.h" #endif
#include <boost/intrusive/set.hpp> #include <boost/intrusive/set.hpp>
#include <boost/intrusive/list.hpp> #include <boost/intrusive/list.hpp>
@ -54,9 +58,14 @@ class DeferEvent;
* *
* @see SocketMonitor, MultiSocketMonitor, TimerEvent, IdleEvent * @see SocketMonitor, MultiSocketMonitor, TimerEvent, IdleEvent
*/ */
class EventLoop final : SocketMonitor class EventLoop final
#ifdef HAVE_THREADED_EVENT_LOOP
: SocketMonitor
#endif
{ {
#ifdef HAVE_THREADED_EVENT_LOOP
WakeFD wake_fd; WakeFD wake_fd;
#endif
struct TimerCompare { struct TimerCompare {
constexpr bool operator()(const TimerEvent &a, constexpr bool operator()(const TimerEvent &a,
@ -76,6 +85,7 @@ class EventLoop final : SocketMonitor
boost::intrusive::constant_time_size<false>>; boost::intrusive::constant_time_size<false>>;
IdleList idle; IdleList idle;
#ifdef HAVE_THREADED_EVENT_LOOP
Mutex mutex; Mutex mutex;
using DeferredList = using DeferredList =
@ -83,6 +93,7 @@ class EventLoop final : SocketMonitor
boost::intrusive::base_hook<boost::intrusive::list_base_hook<>>, boost::intrusive::base_hook<boost::intrusive::list_base_hook<>>,
boost::intrusive::constant_time_size<false>>; boost::intrusive::constant_time_size<false>>;
DeferredList deferred; DeferredList deferred;
#endif
using ReadySocketList = using ReadySocketList =
boost::intrusive::list<SocketMonitor, boost::intrusive::list<SocketMonitor,
@ -103,6 +114,12 @@ class EventLoop final : SocketMonitor
Event::Clock::time_point now = Event::Clock::now(); Event::Clock::time_point now = Event::Clock::now();
#ifdef HAVE_THREADED_EVENT_LOOP
/**
* A reference to the thread that is currently inside Run().
*/
ThreadId thread = ThreadId::Null();
/** /**
* Is this #EventLoop alive, i.e. can events be scheduled? * Is this #EventLoop alive, i.e. can events be scheduled?
* This is used by BlockingCall() to determine whether * This is used by BlockingCall() to determine whether
@ -110,6 +127,7 @@ class EventLoop final : SocketMonitor
* there's no #EventThread yet/anymore). * there's no #EventThread yet/anymore).
*/ */
bool alive; bool alive;
#endif
std::atomic_bool quit; std::atomic_bool quit;
@ -119,6 +137,7 @@ class EventLoop final : SocketMonitor
*/ */
bool again; bool again;
#ifdef HAVE_THREADED_EVENT_LOOP
/** /**
* True when handling callbacks, false when waiting for I/O or * True when handling callbacks, false when waiting for I/O or
* timeout. * timeout.
@ -126,6 +145,7 @@ class EventLoop final : SocketMonitor
* Protected with #mutex. * Protected with #mutex.
*/ */
bool busy = true; bool busy = true;
#endif
#ifdef HAVE_URING #ifdef HAVE_URING
bool uring_initialized = false; bool uring_initialized = false;
@ -133,18 +153,17 @@ class EventLoop final : SocketMonitor
PollGroup poll_group; PollGroup poll_group;
/**
* A reference to the thread that is currently inside Run().
*/
ThreadId thread = ThreadId::Null();
public: public:
/** /**
* Throws on error. * Throws on error.
*/ */
#ifdef HAVE_THREADED_EVENT_LOOP
explicit EventLoop(ThreadId _thread); explicit EventLoop(ThreadId _thread);
EventLoop():EventLoop(ThreadId::GetCurrent()) {} EventLoop():EventLoop(ThreadId::GetCurrent()) {}
#else
EventLoop();
#endif
~EventLoop() noexcept; ~EventLoop() noexcept;
@ -152,7 +171,9 @@ public:
* A caching wrapper for Event::Clock::now(). * A caching wrapper for Event::Clock::now().
*/ */
auto GetTime() const { auto GetTime() const {
#ifdef HAVE_THREADED_EVENT_LOOP
assert(IsInside()); assert(IsInside());
#endif
return now; return now;
} }
@ -170,13 +191,17 @@ public:
void Break() noexcept; void Break() noexcept;
bool AddFD(int _fd, unsigned flags, SocketMonitor &m) noexcept { bool AddFD(int _fd, unsigned flags, SocketMonitor &m) noexcept {
#ifdef HAVE_THREADED_EVENT_LOOP
assert(!IsAlive() || IsInside()); assert(!IsAlive() || IsInside());
#endif
return poll_group.Add(_fd, flags, &m); return poll_group.Add(_fd, flags, &m);
} }
bool ModifyFD(int _fd, unsigned flags, SocketMonitor &m) noexcept { bool ModifyFD(int _fd, unsigned flags, SocketMonitor &m) noexcept {
#ifdef HAVE_THREADED_EVENT_LOOP
assert(!IsAlive() || IsInside()); assert(!IsAlive() || IsInside());
#endif
return poll_group.Modify(_fd, flags, &m); return poll_group.Modify(_fd, flags, &m);
} }
@ -195,6 +220,7 @@ public:
void AddTimer(TimerEvent &t, Event::Duration d) noexcept; void AddTimer(TimerEvent &t, Event::Duration d) noexcept;
#ifdef HAVE_THREADED_EVENT_LOOP
/** /**
* Schedule a call to DeferEvent::RunDeferred(). * Schedule a call to DeferEvent::RunDeferred().
* *
@ -209,6 +235,7 @@ public:
* This method is thread-safe. * This method is thread-safe.
*/ */
void RemoveDeferred(DeferEvent &d) noexcept; void RemoveDeferred(DeferEvent &d) noexcept;
#endif
/** /**
* The main function of this class. It will loop until * The main function of this class. It will loop until
@ -217,12 +244,14 @@ public:
void Run() noexcept; void Run() noexcept;
private: private:
#ifdef HAVE_THREADED_EVENT_LOOP
/** /**
* Invoke all pending DeferEvents. * Invoke all pending DeferEvents.
* *
* Caller must lock the mutex. * Caller must lock the mutex.
*/ */
void HandleDeferred() noexcept; void HandleDeferred() noexcept;
#endif
/** /**
* Invoke all expired #TimerEvent instances and return the * Invoke all expired #TimerEvent instances and return the
@ -231,9 +260,12 @@ private:
*/ */
Event::Duration HandleTimers() noexcept; Event::Duration HandleTimers() noexcept;
#ifdef HAVE_THREADED_EVENT_LOOP
bool OnSocketReady(unsigned flags) noexcept override; bool OnSocketReady(unsigned flags) noexcept override;
#endif
public: public:
#ifdef HAVE_THREADED_EVENT_LOOP
void SetAlive(bool _alive) noexcept { void SetAlive(bool _alive) noexcept {
alive = _alive; alive = _alive;
} }
@ -241,13 +273,18 @@ public:
bool IsAlive() const noexcept { bool IsAlive() const noexcept {
return alive; return alive;
} }
#endif
/** /**
* Are we currently running inside this EventLoop's thread? * Are we currently running inside this EventLoop's thread?
*/ */
gcc_pure gcc_pure
bool IsInside() const noexcept { bool IsInside() const noexcept {
#ifdef HAVE_THREADED_EVENT_LOOP
return thread.IsInside(); return thread.IsInside();
#else
return true;
#endif
} }
}; };

View File

@ -2,6 +2,7 @@ event_features = configuration_data()
event_features.set('USE_EVENTFD', is_linux and get_option('eventfd')) event_features.set('USE_EVENTFD', is_linux and get_option('eventfd'))
event_features.set('USE_SIGNALFD', is_linux and get_option('signalfd')) event_features.set('USE_SIGNALFD', is_linux and get_option('signalfd'))
event_features.set('USE_EPOLL', is_linux and get_option('epoll')) event_features.set('USE_EPOLL', is_linux and get_option('epoll'))
event_features.set('HAVE_THREADED_EVENT_LOOP', true)
configure_file(output: 'Features.h', configuration: event_features) configure_file(output: 'Features.h', configuration: event_features)
event_sources = [] event_sources = []