From a357d84dce668d126fe984680e5d17f6d41b2fe6 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Sat, 4 Jan 2014 14:56:02 +0100 Subject: [PATCH] event/DeferredMonitor: make fully thread-safe Instead of creating a new eventfd for each DeferredMonitor instance, reuse EventLoop's eventfd, and add a std::list to EventLoop that manages the list of pending DeferredMonitors. This std::list is protected by the same mutex as the "calls" list. The bottom line is: reduced overhead because the per-instance eventfd was eliminated, slightly added overhead due to Mutex usage (but negligible), and we're thread-safe now. This subsystem is now good enough to replace EventLoop::AddCall(). --- src/event/DeferredMonitor.cxx | 20 ++------------ src/event/DeferredMonitor.hxx | 43 +++++------------------------ src/event/Loop.cxx | 51 +++++++++++++++++++++++++++++++++++ src/event/Loop.hxx | 17 ++++++++++++ 4 files changed, 77 insertions(+), 54 deletions(-) diff --git a/src/event/DeferredMonitor.cxx b/src/event/DeferredMonitor.cxx index 40b4b0b62..5f295946e 100644 --- a/src/event/DeferredMonitor.cxx +++ b/src/event/DeferredMonitor.cxx @@ -25,7 +25,7 @@ void DeferredMonitor::Cancel() { #ifdef USE_INTERNAL_EVENTLOOP - pending = false; + loop.RemoveDeferred(*this); #endif #ifdef USE_GLIB_EVENTLOOP const auto id = source_id.exchange(0); @@ -38,8 +38,7 @@ void DeferredMonitor::Schedule() { #ifdef USE_INTERNAL_EVENTLOOP - if (!pending.exchange(true)) - fd.Write(); + loop.AddDeferred(*this); #endif #ifdef USE_GLIB_EVENTLOOP const unsigned id = loop.AddIdle(Callback, this); @@ -49,21 +48,6 @@ DeferredMonitor::Schedule() #endif } -#ifdef USE_INTERNAL_EVENTLOOP - -bool -DeferredMonitor::OnSocketReady(unsigned) -{ - fd.Read(); - - if (pending.exchange(false)) - RunDeferred(); - - return true; -} - -#endif - #ifdef USE_GLIB_EVENTLOOP void diff --git a/src/event/DeferredMonitor.hxx b/src/event/DeferredMonitor.hxx index 96ad5e282..b319d25eb 100644 --- a/src/event/DeferredMonitor.hxx +++ b/src/event/DeferredMonitor.hxx @@ -23,11 +23,6 @@ #include "check.h" #include "Compiler.h" -#ifdef USE_INTERNAL_EVENTLOOP -#include "SocketMonitor.hxx" -#include "WakeFD.hxx" -#endif - #ifdef USE_GLIB_EVENTLOOP #include #endif @@ -39,31 +34,24 @@ class EventLoop; /** * Defer execution of an event into an #EventLoop. * - * This class is thread-safe, however the constructor must be called - * from the thread that runs the #EventLoop + * This class is thread-safe. */ -class DeferredMonitor +class DeferredMonitor { + EventLoop &loop; + #ifdef USE_INTERNAL_EVENTLOOP - : private SocketMonitor -#endif -{ -#ifdef USE_INTERNAL_EVENTLOOP - std::atomic_bool pending; - WakeFD fd; + friend class EventLoop; + bool pending; #endif #ifdef USE_GLIB_EVENTLOOP - EventLoop &loop; std::atomic source_id; #endif public: #ifdef USE_INTERNAL_EVENTLOOP DeferredMonitor(EventLoop &_loop) - :SocketMonitor(_loop), pending(false) { - SocketMonitor::Open(fd.Get()); - SocketMonitor::Schedule(SocketMonitor::READ); - } + :loop(_loop), pending(false) {} #endif #ifdef USE_GLIB_EVENTLOOP @@ -72,24 +60,11 @@ public: #endif ~DeferredMonitor() { -#ifdef USE_INTERNAL_EVENTLOOP - /* avoid closing the WakeFD twice */ - SocketMonitor::Steal(); -#endif - -#ifdef USE_GLIB_EVENTLOOP Cancel(); -#endif } EventLoop &GetEventLoop() { -#ifdef USE_INTERNAL_EVENTLOOP - return SocketMonitor::GetEventLoop(); -#endif - -#ifdef USE_GLIB_EVENTLOOP return loop; -#endif } void Schedule(); @@ -99,10 +74,6 @@ protected: virtual void RunDeferred() = 0; private: -#ifdef USE_INTERNAL_EVENTLOOP - virtual bool OnSocketReady(unsigned flags) override final; -#endif - #ifdef USE_GLIB_EVENTLOOP void Run(); static gboolean Callback(gpointer data); diff --git a/src/event/Loop.cxx b/src/event/Loop.cxx index f7b3df022..c525fc103 100644 --- a/src/event/Loop.cxx +++ b/src/event/Loop.cxx @@ -26,6 +26,7 @@ #include "TimeoutMonitor.hxx" #include "SocketMonitor.hxx" #include "IdleMonitor.hxx" +#include "DeferredMonitor.hxx" #include @@ -204,6 +205,44 @@ EventLoop::AddCall(std::function &&f) wake_fd.Write(); } +void +EventLoop::AddDeferred(DeferredMonitor &d) +{ + mutex.lock(); + if (d.pending) { + mutex.unlock(); + return; + } + + assert(std::find(deferred.begin(), + deferred.end(), &d) == deferred.end()); + + d.pending = true; + deferred.push_back(&d); + mutex.unlock(); + + wake_fd.Write(); +} + +void +EventLoop::RemoveDeferred(DeferredMonitor &d) +{ + const ScopeLock protect(mutex); + + if (!d.pending) { + assert(std::find(deferred.begin(), + deferred.end(), &d) == deferred.end()); + return; + } + + d.pending = false; + + auto i = std::find(deferred.begin(), deferred.end(), &d); + assert(i != deferred.end()); + + deferred.erase(i); +} + bool EventLoop::OnSocketReady(gcc_unused unsigned flags) { @@ -213,6 +252,18 @@ EventLoop::OnSocketReady(gcc_unused unsigned flags) mutex.lock(); + while (!deferred.empty() && !quit) { + DeferredMonitor &m = *deferred.front(); + assert(m.pending); + + deferred.pop_front(); + m.pending = false; + + mutex.unlock(); + m.RunDeferred(); + mutex.lock(); + } + while (!calls.empty() && !quit) { auto f = std::move(calls.front()); calls.pop_front(); diff --git a/src/event/Loop.hxx b/src/event/Loop.hxx index 885e74679..0586a1f68 100644 --- a/src/event/Loop.hxx +++ b/src/event/Loop.hxx @@ -42,6 +42,7 @@ #ifdef USE_INTERNAL_EVENTLOOP class TimeoutMonitor; class IdleMonitor; +class DeferredMonitor; class SocketMonitor; #endif @@ -91,6 +92,7 @@ class EventLoop final Mutex mutex; std::list> calls; + std::list deferred; unsigned now_ms; @@ -161,6 +163,21 @@ public: */ void AddCall(std::function &&f); + /** + * Schedule a call to DeferredMonitor::RunDeferred(). + * + * This method is thread-safe. + */ + void AddDeferred(DeferredMonitor &d); + + /** + * Cancel a pending call to DeferredMonitor::RunDeferred(). + * However after returning, the call may still be running. + * + * This method is thread-safe. + */ + void RemoveDeferred(DeferredMonitor &d); + /** * The main function of this class. It will loop until * Break() gets called. Can be called only once.