event/DeferEvent: split the thread-safe version into new class InjectEvent

This commit is contained in:
Max Kellermann 2020-12-01 16:25:11 +01:00
parent 1ecbc2ff0f
commit 774b4313f2
22 changed files with 226 additions and 68 deletions

View File

@ -22,7 +22,7 @@
#include "input/RemoteTagScanner.hxx"
#include "tag/Tag.hxx"
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include "thread/Mutex.hxx"
#include <boost/intrusive/list.hpp>
@ -40,7 +40,7 @@ class RemoteTagCache final {
RemoteTagCacheHandler &handler;
DeferEvent defer_invoke_handler;
InjectEvent defer_invoke_handler;
Mutex mutex;

View File

@ -70,7 +70,7 @@ ThreadBackgroundCommand::Cancel() noexcept
CancelThread();
thread.Join();
/* cancel the DeferEvent, just in case the Thread has
/* cancel the InjectEvent, just in case the Thread has
meanwhile finished execution */
defer_finish.Cancel();
}

View File

@ -21,7 +21,7 @@
#define MPD_THREAD_BACKGROUND_COMMAND_HXX
#include "BackgroundCommand.hxx"
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include "thread/Thread.hxx"
#include <exception>
@ -34,7 +34,7 @@ class Response;
*/
class ThreadBackgroundCommand : public BackgroundCommand {
Thread thread;
DeferEvent defer_finish;
InjectEvent defer_finish;
Client &client;
/**

View File

@ -20,7 +20,7 @@
#ifndef MPD_UPDATE_REMOVE_HXX
#define MPD_UPDATE_REMOVE_HXX
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include "thread/Mutex.hxx"
#include <forward_list>
@ -39,7 +39,7 @@ class UpdateRemoveService final {
std::forward_list<std::string> uris;
DeferEvent defer;
InjectEvent defer;
public:
UpdateRemoveService(EventLoop &_loop, DatabaseListener &_listener)
@ -55,7 +55,7 @@ public:
void Remove(std::string &&uri);
private:
/* DeferEvent callback */
/* InjectEvent callback */
void RunDeferred() noexcept;
};

View File

@ -22,7 +22,7 @@
#include "Config.hxx"
#include "Queue.hxx"
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include "thread/Thread.hxx"
#include "util/Compiler.h"
@ -40,7 +40,7 @@ class CompositeStorage;
class UpdateService final {
const UpdateConfig config;
DeferEvent defer;
InjectEvent defer;
SimpleDatabase &db;
CompositeStorage &storage;
@ -107,7 +107,7 @@ public:
void CancelMount(const char *uri) noexcept;
private:
/* DeferEvent callback */
/* InjectEvent callback */
void RunDeferred() noexcept;
/* the update thread */

View File

@ -19,7 +19,7 @@
#include "Call.hxx"
#include "Loop.hxx"
#include "DeferEvent.hxx"
#include "InjectEvent.hxx"
#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
@ -28,7 +28,7 @@
class BlockingCallMonitor final
{
DeferEvent defer_event;
InjectEvent event;
const std::function<void()> f;
@ -41,13 +41,13 @@ class BlockingCallMonitor final
public:
BlockingCallMonitor(EventLoop &_loop, std::function<void()> &&_f)
:defer_event(_loop, BIND_THIS_METHOD(RunDeferred)),
:event(_loop, BIND_THIS_METHOD(RunDeferred)),
f(std::move(_f)), done(false) {}
void Run() {
assert(!done);
defer_event.Schedule();
event.Schedule();
{
std::unique_lock<Mutex> lock(mutex);

View File

@ -27,9 +27,12 @@
class EventLoop;
/**
* Invoke a method call in the #EventLoop.
* Defer execution until the next event loop iteration. Use this to
* move calls out of the current stack frame, to avoid surprising side
* effects for callers up in the call chain.
*
* This class is thread-safe.
* This class is not thread-safe, all methods must be called from the
* thread that runs the #EventLoop.
*/
class DeferEvent final
: public boost::intrusive::list_base_hook<>

33
src/event/InjectEvent.cxx Normal file
View File

@ -0,0 +1,33 @@
/*
* Copyright 2003-2020 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "InjectEvent.hxx"
#include "Loop.hxx"
void
InjectEvent::Cancel() noexcept
{
loop.RemoveInject(*this);
}
void
InjectEvent::Schedule() noexcept
{
loop.AddInject(*this);
}

69
src/event/InjectEvent.hxx Normal file
View File

@ -0,0 +1,69 @@
/*
* Copyright 2003-2020 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef MPD_INJECT_EVENT_HXX
#define MPD_INJECT_EVENT_HXX
#include "util/BindMethod.hxx"
#include <boost/intrusive/list_hook.hpp>
class EventLoop;
/**
* Invoke a method call in the #EventLoop.
*
* This class is thread-safe.
*/
class InjectEvent final
: public boost::intrusive::list_base_hook<>
{
friend class EventLoop;
EventLoop &loop;
using Callback = BoundMethod<void() noexcept>;
const Callback callback;
public:
InjectEvent(EventLoop &_loop, Callback _callback) noexcept
:loop(_loop), callback(_callback) {}
~InjectEvent() noexcept {
Cancel();
}
EventLoop &GetEventLoop() const noexcept {
return loop;
}
void Schedule() noexcept;
void Cancel() noexcept;
private:
bool IsPending() const noexcept {
return is_linked();
}
void Run() noexcept {
callback();
}
};
#endif

View File

@ -19,12 +19,13 @@
#include "Loop.hxx"
#include "TimerEvent.hxx"
#include "DeferEvent.hxx"
#include "SocketEvent.hxx"
#include "IdleEvent.hxx"
#include "util/ScopeExit.hxx"
#ifdef HAVE_THREADED_EVENT_LOOP
#include "DeferEvent.hxx"
#include "InjectEvent.hxx"
#endif
#ifdef HAVE_URING
@ -194,6 +195,35 @@ EventLoop::HandleTimers() noexcept
return Event::Duration(-1);
}
void
EventLoop::AddDeferred(DeferEvent &d) noexcept
{
if (d.IsPending())
return;
deferred.push_back(d);
again = true;
}
void
EventLoop::RemoveDeferred(DeferEvent &d) noexcept
{
if (d.IsPending())
deferred.erase(deferred.iterator_to(d));
}
void
EventLoop::RunDeferred() noexcept
{
while (!deferred.empty() && !quit) {
auto &m = deferred.front();
assert(m.IsPending());
deferred.pop_front();
m.RunDeferred();
}
}
template<class ToDuration, class Rep, class Period>
static constexpr ToDuration
duration_cast_round_up(std::chrono::duration<Rep, Period> d) noexcept
@ -281,6 +311,8 @@ EventLoop::Run() noexcept
if (quit)
break;
RunDeferred();
/* invoke idle */
while (!idle.empty()) {
@ -297,7 +329,7 @@ EventLoop::Run() noexcept
overhead */
{
const std::lock_guard<Mutex> lock(mutex);
HandleDeferred();
HandleInject();
busy = false;
if (again)
@ -343,7 +375,7 @@ EventLoop::Run() noexcept
#ifdef HAVE_THREADED_EVENT_LOOP
void
EventLoop::AddDeferred(DeferEvent &d) noexcept
EventLoop::AddInject(InjectEvent &d) noexcept
{
bool must_wake;
@ -353,10 +385,10 @@ EventLoop::AddDeferred(DeferEvent &d) noexcept
return;
/* we don't need to wake up the EventLoop if another
DeferEvent has already done it */
must_wake = !busy && deferred.empty();
InjectEvent has already done it */
must_wake = !busy && inject.empty();
deferred.push_back(d);
inject.push_back(d);
again = true;
}
@ -365,25 +397,25 @@ EventLoop::AddDeferred(DeferEvent &d) noexcept
}
void
EventLoop::RemoveDeferred(DeferEvent &d) noexcept
EventLoop::RemoveInject(InjectEvent &d) noexcept
{
const std::lock_guard<Mutex> protect(mutex);
if (d.IsPending())
deferred.erase(deferred.iterator_to(d));
inject.erase(inject.iterator_to(d));
}
void
EventLoop::HandleDeferred() noexcept
EventLoop::HandleInject() noexcept
{
while (!deferred.empty() && !quit) {
auto &m = deferred.front();
while (!inject.empty() && !quit) {
auto &m = inject.front();
assert(m.IsPending());
deferred.pop_front();
inject.pop_front();
const ScopeUnlock unlock(mutex);
m.RunDeferred();
m.Run();
}
}
@ -395,7 +427,7 @@ EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
wake_fd.Read();
const std::lock_guard<Mutex> lock(mutex);
HandleDeferred();
HandleInject();
}
#endif

View File

@ -49,6 +49,7 @@ namespace Uring { class Queue; class Manager; }
class TimerEvent;
class IdleEvent;
class DeferEvent;
class InjectEvent;
/**
* An event loop that polls for events on file/socket descriptors.
@ -78,17 +79,23 @@ class EventLoop final
boost::intrusive::constant_time_size<false>>;
TimerSet timers;
using DeferList =
boost::intrusive::list<DeferEvent,
boost::intrusive::base_hook<boost::intrusive::list_base_hook<>>,
boost::intrusive::constant_time_size<false>>;
DeferList deferred;
using IdleList = IntrusiveList<IdleEvent>;
IdleList idle;
#ifdef HAVE_THREADED_EVENT_LOOP
Mutex mutex;
using DeferredList =
boost::intrusive::list<DeferEvent,
using InjectList =
boost::intrusive::list<InjectEvent,
boost::intrusive::base_hook<boost::intrusive::list_base_hook<>>,
boost::intrusive::constant_time_size<false>>;
DeferredList deferred;
InjectList inject;
#endif
using SocketList = IntrusiveList<SocketEvent>;
@ -202,21 +209,32 @@ public:
void AddTimer(TimerEvent &t, Event::Duration d) noexcept;
#ifdef HAVE_THREADED_EVENT_LOOP
/**
* Schedule a call to DeferEvent::RunDeferred().
*
* This method is thread-safe.
*/
void AddDeferred(DeferEvent &d) noexcept;
/**
* Cancel a pending call to DeferEvent::RunDeferred().
* However after returning, the call may still be running.
*/
void RemoveDeferred(DeferEvent &d) noexcept;
#ifdef HAVE_THREADED_EVENT_LOOP
/**
* Schedule a call to the InjectEvent.
*
* This method is thread-safe.
*/
void RemoveDeferred(DeferEvent &d) noexcept;
void AddInject(InjectEvent &d) noexcept;
/**
* Cancel a pending call to the InjectEvent.
* However after returning, the call may still be running.
*
* This method is thread-safe.
*/
void RemoveInject(InjectEvent &d) noexcept;
#endif
/**
@ -226,13 +244,15 @@ public:
void Run() noexcept;
private:
void RunDeferred() noexcept;
#ifdef HAVE_THREADED_EVENT_LOOP
/**
* Invoke all pending DeferEvents.
* Invoke all pending InjectEvents.
*
* Caller must lock the mutex.
*/
void HandleDeferred() noexcept;
void HandleInject() noexcept;
#endif
/**

View File

@ -23,7 +23,7 @@ void
MaskMonitor::OrMask(unsigned new_mask) noexcept
{
if (pending_mask.fetch_or(new_mask) == 0)
defer.Schedule();
event.Schedule();
}
void

View File

@ -20,7 +20,7 @@
#ifndef MPD_EVENT_MASK_MONITOR_HXX
#define MPD_EVENT_MASK_MONITOR_HXX
#include "DeferEvent.hxx"
#include "InjectEvent.hxx"
#include "util/BindMethod.hxx"
#include <atomic>
@ -32,7 +32,7 @@
* This class is thread-safe.
*/
class MaskMonitor final {
DeferEvent defer;
InjectEvent event;
typedef BoundMethod<void(unsigned) noexcept> Callback;
const Callback callback;
@ -41,21 +41,21 @@ class MaskMonitor final {
public:
MaskMonitor(EventLoop &_loop, Callback _callback) noexcept
:defer(_loop, BIND_THIS_METHOD(RunDeferred)),
:event(_loop, BIND_THIS_METHOD(RunDeferred)),
callback(_callback), pending_mask(0) {}
auto &GetEventLoop() const noexcept {
return defer.GetEventLoop();
return event.GetEventLoop();
}
void Cancel() noexcept {
defer.Cancel();
event.Cancel();
}
void OrMask(unsigned new_mask) noexcept;
protected:
/* DeferEvent callback */
/* InjectEvent callback */
void RunDeferred() noexcept;
};

View File

@ -24,6 +24,7 @@ event = static_library(
'SignalMonitor.cxx',
'TimerEvent.cxx',
'IdleEvent.cxx',
'InjectEvent.cxx',
'DeferEvent.cxx',
'MaskMonitor.cxx',
'SocketEvent.cxx',

View File

@ -21,7 +21,7 @@
#define MPD_ASYNC_INPUT_STREAM_HXX
#include "InputStream.hxx"
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include "util/HugeAllocator.hxx"
#include "util/CircularBuffer.hxx"
@ -38,8 +38,8 @@ class AsyncInputStream : public InputStream {
NONE, SCHEDULED, PENDING
};
DeferEvent deferred_resume;
DeferEvent deferred_seek;
InjectEvent deferred_resume;
InjectEvent deferred_seek;
HugeArray<uint8_t> allocation;
@ -166,7 +166,7 @@ protected:
private:
void Resume();
/* for DeferEvent */
/* for InjectEvent */
void DeferredResume() noexcept;
void DeferredSeek() noexcept;
};

View File

@ -39,7 +39,7 @@
#include "pcm/AudioFormat.hxx"
#include "Log.hxx"
#include "event/MultiSocketMonitor.hxx"
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include <alsa/asoundlib.h>
@ -80,7 +80,7 @@ class AlsaInputStream final
AlsaNonBlockPcm non_block;
DeferEvent defer_invalidate_sockets;
InjectEvent defer_invalidate_sockets;
public:

View File

@ -22,7 +22,7 @@
#include "mixer/Listener.hxx"
#include "output/OutputAPI.hxx"
#include "event/MultiSocketMonitor.hxx"
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include "event/Call.hxx"
#include "util/ASCII.hxx"
#include "util/Domain.hxx"
@ -41,7 +41,7 @@ extern "C" {
static constexpr unsigned VOLUME_MIXER_ALSA_INDEX_DEFAULT = 0;
class AlsaMixerMonitor final : MultiSocketMonitor {
DeferEvent defer_invalidate_sockets;
InjectEvent defer_invalidate_sockets;
snd_mixer_t *mixer;

View File

@ -37,7 +37,7 @@
#include "util/ConstBuffer.hxx"
#include "util/StringView.hxx"
#include "event/MultiSocketMonitor.hxx"
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include "event/Call.hxx"
#include "Log.hxx"
@ -55,7 +55,7 @@ static constexpr unsigned MPD_ALSA_BUFFER_TIME_US = 500000;
class AlsaOutput final
: AudioOutput, MultiSocketMonitor {
DeferEvent defer_invalidate_sockets;
InjectEvent defer_invalidate_sockets;
/**
* This timer is used to re-schedule the #MultiSocketMonitor

View File

@ -31,7 +31,7 @@
#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
#include "event/ServerSocket.hxx"
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include "util/Cast.hxx"
#include "util/Compiler.h"
@ -115,7 +115,7 @@ private:
*/
std::queue<PagePtr, std::list<PagePtr>> pages;
DeferEvent defer_broadcast;
InjectEvent defer_broadcast;
public:
/**
@ -269,7 +269,7 @@ public:
bool Pause() override;
private:
/* DeferEvent callback */
/* InjectEvent callback */
void OnDeferredBroadcast() noexcept;
void OnAccept(UniqueSocketDescriptor fd,

View File

@ -31,7 +31,7 @@
#include "lib/curl/Escape.hxx"
#include "lib/expat/ExpatParser.hxx"
#include "fs/Traits.hxx"
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
#include "time/Parser.hxx"
@ -84,7 +84,7 @@ CurlStorage::MapToRelativeUTF8(std::string_view uri_utf8) const noexcept
}
class BlockingHttpRequest : protected CurlResponseHandler {
DeferEvent defer_start;
InjectEvent defer_start;
std::exception_ptr postponed_error;
@ -136,7 +136,7 @@ protected:
}
private:
/* DeferEvent callback */
/* InjectEvent callback */
void OnDeferredStart() noexcept {
assert(!done);

View File

@ -32,7 +32,7 @@
#include "thread/Cond.hxx"
#include "event/Loop.hxx"
#include "event/Call.hxx"
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include "event/TimerEvent.hxx"
#include "util/ASCII.hxx"
#include "util/StringCompare.hxx"
@ -61,7 +61,7 @@ class NfsStorage final
NfsConnection *connection;
DeferEvent defer_connect;
InjectEvent defer_connect;
TimerEvent reconnect_timer;
Mutex mutex;
@ -115,7 +115,7 @@ public:
reconnect_timer.Schedule(std::chrono::seconds(5));
}
/* DeferEvent callback */
/* InjectEvent callback */
void OnDeferredConnect() noexcept {
if (state == State::INITIAL)
Connect();

View File

@ -33,7 +33,7 @@
#include "thread/Cond.hxx"
#include "thread/SafeSingleton.hxx"
#include "event/Call.hxx"
#include "event/DeferEvent.hxx"
#include "event/InjectEvent.hxx"
#include "fs/AllocatedPath.hxx"
#include "util/StringCompare.hxx"
#include "util/RuntimeError.hxx"
@ -62,7 +62,7 @@ class UdisksStorage final : public Storage {
std::exception_ptr mount_error;
DeferEvent defer_mount, defer_unmount;
InjectEvent defer_mount, defer_unmount;
public:
template<typename B, typename I, typename IP>