event/DeferEvent: split the thread-safe version into new class InjectEvent

This commit is contained in:
Max Kellermann
2020-12-01 16:25:11 +01:00
parent 1ecbc2ff0f
commit 774b4313f2
22 changed files with 226 additions and 68 deletions

View File

@@ -19,7 +19,7 @@
#include "Call.hxx"
#include "Loop.hxx"
#include "DeferEvent.hxx"
#include "InjectEvent.hxx"
#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
@@ -28,7 +28,7 @@
class BlockingCallMonitor final
{
DeferEvent defer_event;
InjectEvent event;
const std::function<void()> f;
@@ -41,13 +41,13 @@ class BlockingCallMonitor final
public:
BlockingCallMonitor(EventLoop &_loop, std::function<void()> &&_f)
:defer_event(_loop, BIND_THIS_METHOD(RunDeferred)),
:event(_loop, BIND_THIS_METHOD(RunDeferred)),
f(std::move(_f)), done(false) {}
void Run() {
assert(!done);
defer_event.Schedule();
event.Schedule();
{
std::unique_lock<Mutex> lock(mutex);

View File

@@ -27,9 +27,12 @@
class EventLoop;
/**
* Invoke a method call in the #EventLoop.
* Defer execution until the next event loop iteration. Use this to
* move calls out of the current stack frame, to avoid surprising side
* effects for callers up in the call chain.
*
* This class is thread-safe.
* This class is not thread-safe, all methods must be called from the
* thread that runs the #EventLoop.
*/
class DeferEvent final
: public boost::intrusive::list_base_hook<>

33
src/event/InjectEvent.cxx Normal file
View File

@@ -0,0 +1,33 @@
/*
* Copyright 2003-2020 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "InjectEvent.hxx"
#include "Loop.hxx"
void
InjectEvent::Cancel() noexcept
{
loop.RemoveInject(*this);
}
void
InjectEvent::Schedule() noexcept
{
loop.AddInject(*this);
}

69
src/event/InjectEvent.hxx Normal file
View File

@@ -0,0 +1,69 @@
/*
* Copyright 2003-2020 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef MPD_INJECT_EVENT_HXX
#define MPD_INJECT_EVENT_HXX
#include "util/BindMethod.hxx"
#include <boost/intrusive/list_hook.hpp>
class EventLoop;
/**
* Invoke a method call in the #EventLoop.
*
* This class is thread-safe.
*/
class InjectEvent final
: public boost::intrusive::list_base_hook<>
{
friend class EventLoop;
EventLoop &loop;
using Callback = BoundMethod<void() noexcept>;
const Callback callback;
public:
InjectEvent(EventLoop &_loop, Callback _callback) noexcept
:loop(_loop), callback(_callback) {}
~InjectEvent() noexcept {
Cancel();
}
EventLoop &GetEventLoop() const noexcept {
return loop;
}
void Schedule() noexcept;
void Cancel() noexcept;
private:
bool IsPending() const noexcept {
return is_linked();
}
void Run() noexcept {
callback();
}
};
#endif

View File

@@ -19,12 +19,13 @@
#include "Loop.hxx"
#include "TimerEvent.hxx"
#include "DeferEvent.hxx"
#include "SocketEvent.hxx"
#include "IdleEvent.hxx"
#include "util/ScopeExit.hxx"
#ifdef HAVE_THREADED_EVENT_LOOP
#include "DeferEvent.hxx"
#include "InjectEvent.hxx"
#endif
#ifdef HAVE_URING
@@ -194,6 +195,35 @@ EventLoop::HandleTimers() noexcept
return Event::Duration(-1);
}
void
EventLoop::AddDeferred(DeferEvent &d) noexcept
{
if (d.IsPending())
return;
deferred.push_back(d);
again = true;
}
void
EventLoop::RemoveDeferred(DeferEvent &d) noexcept
{
if (d.IsPending())
deferred.erase(deferred.iterator_to(d));
}
void
EventLoop::RunDeferred() noexcept
{
while (!deferred.empty() && !quit) {
auto &m = deferred.front();
assert(m.IsPending());
deferred.pop_front();
m.RunDeferred();
}
}
template<class ToDuration, class Rep, class Period>
static constexpr ToDuration
duration_cast_round_up(std::chrono::duration<Rep, Period> d) noexcept
@@ -281,6 +311,8 @@ EventLoop::Run() noexcept
if (quit)
break;
RunDeferred();
/* invoke idle */
while (!idle.empty()) {
@@ -297,7 +329,7 @@ EventLoop::Run() noexcept
overhead */
{
const std::lock_guard<Mutex> lock(mutex);
HandleDeferred();
HandleInject();
busy = false;
if (again)
@@ -343,7 +375,7 @@ EventLoop::Run() noexcept
#ifdef HAVE_THREADED_EVENT_LOOP
void
EventLoop::AddDeferred(DeferEvent &d) noexcept
EventLoop::AddInject(InjectEvent &d) noexcept
{
bool must_wake;
@@ -353,10 +385,10 @@ EventLoop::AddDeferred(DeferEvent &d) noexcept
return;
/* we don't need to wake up the EventLoop if another
DeferEvent has already done it */
must_wake = !busy && deferred.empty();
InjectEvent has already done it */
must_wake = !busy && inject.empty();
deferred.push_back(d);
inject.push_back(d);
again = true;
}
@@ -365,25 +397,25 @@ EventLoop::AddDeferred(DeferEvent &d) noexcept
}
void
EventLoop::RemoveDeferred(DeferEvent &d) noexcept
EventLoop::RemoveInject(InjectEvent &d) noexcept
{
const std::lock_guard<Mutex> protect(mutex);
if (d.IsPending())
deferred.erase(deferred.iterator_to(d));
inject.erase(inject.iterator_to(d));
}
void
EventLoop::HandleDeferred() noexcept
EventLoop::HandleInject() noexcept
{
while (!deferred.empty() && !quit) {
auto &m = deferred.front();
while (!inject.empty() && !quit) {
auto &m = inject.front();
assert(m.IsPending());
deferred.pop_front();
inject.pop_front();
const ScopeUnlock unlock(mutex);
m.RunDeferred();
m.Run();
}
}
@@ -395,7 +427,7 @@ EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
wake_fd.Read();
const std::lock_guard<Mutex> lock(mutex);
HandleDeferred();
HandleInject();
}
#endif

View File

@@ -49,6 +49,7 @@ namespace Uring { class Queue; class Manager; }
class TimerEvent;
class IdleEvent;
class DeferEvent;
class InjectEvent;
/**
* An event loop that polls for events on file/socket descriptors.
@@ -78,17 +79,23 @@ class EventLoop final
boost::intrusive::constant_time_size<false>>;
TimerSet timers;
using DeferList =
boost::intrusive::list<DeferEvent,
boost::intrusive::base_hook<boost::intrusive::list_base_hook<>>,
boost::intrusive::constant_time_size<false>>;
DeferList deferred;
using IdleList = IntrusiveList<IdleEvent>;
IdleList idle;
#ifdef HAVE_THREADED_EVENT_LOOP
Mutex mutex;
using DeferredList =
boost::intrusive::list<DeferEvent,
using InjectList =
boost::intrusive::list<InjectEvent,
boost::intrusive::base_hook<boost::intrusive::list_base_hook<>>,
boost::intrusive::constant_time_size<false>>;
DeferredList deferred;
InjectList inject;
#endif
using SocketList = IntrusiveList<SocketEvent>;
@@ -202,21 +209,32 @@ public:
void AddTimer(TimerEvent &t, Event::Duration d) noexcept;
#ifdef HAVE_THREADED_EVENT_LOOP
/**
* Schedule a call to DeferEvent::RunDeferred().
*
* This method is thread-safe.
*/
void AddDeferred(DeferEvent &d) noexcept;
/**
* Cancel a pending call to DeferEvent::RunDeferred().
* However after returning, the call may still be running.
*/
void RemoveDeferred(DeferEvent &d) noexcept;
#ifdef HAVE_THREADED_EVENT_LOOP
/**
* Schedule a call to the InjectEvent.
*
* This method is thread-safe.
*/
void RemoveDeferred(DeferEvent &d) noexcept;
void AddInject(InjectEvent &d) noexcept;
/**
* Cancel a pending call to the InjectEvent.
* However after returning, the call may still be running.
*
* This method is thread-safe.
*/
void RemoveInject(InjectEvent &d) noexcept;
#endif
/**
@@ -226,13 +244,15 @@ public:
void Run() noexcept;
private:
void RunDeferred() noexcept;
#ifdef HAVE_THREADED_EVENT_LOOP
/**
* Invoke all pending DeferEvents.
* Invoke all pending InjectEvents.
*
* Caller must lock the mutex.
*/
void HandleDeferred() noexcept;
void HandleInject() noexcept;
#endif
/**

View File

@@ -23,7 +23,7 @@ void
MaskMonitor::OrMask(unsigned new_mask) noexcept
{
if (pending_mask.fetch_or(new_mask) == 0)
defer.Schedule();
event.Schedule();
}
void

View File

@@ -20,7 +20,7 @@
#ifndef MPD_EVENT_MASK_MONITOR_HXX
#define MPD_EVENT_MASK_MONITOR_HXX
#include "DeferEvent.hxx"
#include "InjectEvent.hxx"
#include "util/BindMethod.hxx"
#include <atomic>
@@ -32,7 +32,7 @@
* This class is thread-safe.
*/
class MaskMonitor final {
DeferEvent defer;
InjectEvent event;
typedef BoundMethod<void(unsigned) noexcept> Callback;
const Callback callback;
@@ -41,21 +41,21 @@ class MaskMonitor final {
public:
MaskMonitor(EventLoop &_loop, Callback _callback) noexcept
:defer(_loop, BIND_THIS_METHOD(RunDeferred)),
:event(_loop, BIND_THIS_METHOD(RunDeferred)),
callback(_callback), pending_mask(0) {}
auto &GetEventLoop() const noexcept {
return defer.GetEventLoop();
return event.GetEventLoop();
}
void Cancel() noexcept {
defer.Cancel();
event.Cancel();
}
void OrMask(unsigned new_mask) noexcept;
protected:
/* DeferEvent callback */
/* InjectEvent callback */
void RunDeferred() noexcept;
};

View File

@@ -24,6 +24,7 @@ event = static_library(
'SignalMonitor.cxx',
'TimerEvent.cxx',
'IdleEvent.cxx',
'InjectEvent.cxx',
'DeferEvent.cxx',
'MaskMonitor.cxx',
'SocketEvent.cxx',