event/SocketMonitor: refactor to SocketEvent

Similar to commits 1686f4e857 and
30a5dd267b
This commit is contained in:
Max Kellermann 2020-10-14 14:24:16 +02:00
parent 4d68a12f03
commit 5a4055fb08
26 changed files with 282 additions and 261 deletions

View File

@ -42,7 +42,7 @@
#include "util/ScopeExit.hxx" #include "util/ScopeExit.hxx"
#include "util/RuntimeError.hxx" #include "util/RuntimeError.hxx"
#include "protocol/Ack.hxx" #include "protocol/Ack.hxx"
#include "event/SocketMonitor.hxx" #include "event/SocketEvent.hxx"
#include "event/IdleEvent.hxx" #include "event/IdleEvent.hxx"
#include "Log.hxx" #include "Log.hxx"
@ -85,7 +85,8 @@ public:
} }
}; };
class ProxyDatabase final : public Database, SocketMonitor { class ProxyDatabase final : public Database {
SocketEvent socket_event;
IdleEvent idle_event; IdleEvent idle_event;
DatabaseListener &listener; DatabaseListener &listener;
@ -149,10 +150,8 @@ private:
void Disconnect() noexcept; void Disconnect() noexcept;
void OnSocketReady(unsigned flags) noexcept;
void OnIdle() noexcept; void OnIdle() noexcept;
/* virtual methods from SocketMonitor */
bool OnSocketReady(unsigned flags) noexcept override;
}; };
static constexpr struct { static constexpr struct {
@ -446,7 +445,7 @@ SendGroup(mpd_connection *connection, ConstBuffer<TagType> group)
ProxyDatabase::ProxyDatabase(EventLoop &_loop, DatabaseListener &_listener, ProxyDatabase::ProxyDatabase(EventLoop &_loop, DatabaseListener &_listener,
const ConfigBlock &block) const ConfigBlock &block)
:Database(proxy_db_plugin), :Database(proxy_db_plugin),
SocketMonitor(_loop), socket_event(_loop, BIND_THIS_METHOD(OnSocketReady)),
idle_event(_loop, BIND_THIS_METHOD(OnIdle)), idle_event(_loop, BIND_THIS_METHOD(OnIdle)),
listener(_listener), listener(_listener),
host(block.GetBlockValue("host", "")), host(block.GetBlockValue("host", "")),
@ -527,7 +526,7 @@ ProxyDatabase::Connect()
idle_received = ~0U; idle_received = ~0U;
is_idle = false; is_idle = false;
SocketMonitor::Open(SocketDescriptor(mpd_async_get_fd(mpd_connection_get_async(connection)))); socket_event.Open(SocketDescriptor(mpd_async_get_fd(mpd_connection_get_async(connection))));
idle_event.Schedule(); idle_event.Schedule();
} }
@ -574,13 +573,13 @@ ProxyDatabase::Disconnect() noexcept
assert(connection != nullptr); assert(connection != nullptr);
idle_event.Cancel(); idle_event.Cancel();
SocketMonitor::Steal(); socket_event.Steal();
mpd_connection_free(connection); mpd_connection_free(connection);
connection = nullptr; connection = nullptr;
} }
bool void
ProxyDatabase::OnSocketReady([[maybe_unused]] unsigned flags) noexcept ProxyDatabase::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
{ {
assert(connection != nullptr); assert(connection != nullptr);
@ -588,8 +587,8 @@ ProxyDatabase::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
if (!is_idle) { if (!is_idle) {
// TODO: can this happen? // TODO: can this happen?
idle_event.Schedule(); idle_event.Schedule();
SocketMonitor::Cancel(); socket_event.Cancel();
return true; return;
} }
auto idle = (unsigned)mpd_recv_idle(connection, false); auto idle = (unsigned)mpd_recv_idle(connection, false);
@ -599,7 +598,7 @@ ProxyDatabase::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
} catch (...) { } catch (...) {
LogError(std::current_exception()); LogError(std::current_exception());
Disconnect(); Disconnect();
return false; return;
} }
} }
@ -607,8 +606,7 @@ ProxyDatabase::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
idle_received |= idle; idle_received |= idle;
is_idle = false; is_idle = false;
idle_event.Schedule(); idle_event.Schedule();
SocketMonitor::Cancel(); socket_event.Cancel();
return true;
} }
void void
@ -636,14 +634,14 @@ ProxyDatabase::OnIdle() noexcept
LogError(std::current_exception()); LogError(std::current_exception());
} }
SocketMonitor::Steal(); socket_event.Steal();
mpd_connection_free(connection); mpd_connection_free(connection);
connection = nullptr; connection = nullptr;
return; return;
} }
is_idle = true; is_idle = true;
SocketMonitor::ScheduleRead(); socket_event.ScheduleRead();
} }
const LightSong * const LightSong *

View File

@ -30,14 +30,14 @@
#include <sys/inotify.h> #include <sys/inotify.h>
bool void
InotifySource::OnSocketReady([[maybe_unused]] unsigned flags) noexcept InotifySource::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
{ {
uint8_t buffer[4096]; uint8_t buffer[4096];
static_assert(sizeof(buffer) >= sizeof(struct inotify_event) + NAME_MAX + 1, static_assert(sizeof(buffer) >= sizeof(struct inotify_event) + NAME_MAX + 1,
"inotify buffer too small"); "inotify buffer too small");
auto ifd = GetSocket().ToFileDescriptor(); auto ifd = socket_event.GetSocket().ToFileDescriptor();
ssize_t nbytes = ifd.Read(buffer, sizeof(buffer)); ssize_t nbytes = ifd.Read(buffer, sizeof(buffer));
if (nbytes < 0) if (nbytes < 0)
FatalSystemError("Failed to read from inotify"); FatalSystemError("Failed to read from inotify");
@ -63,8 +63,6 @@ InotifySource::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
callback(event->wd, event->mask, name, callback_ctx); callback(event->wd, event->mask, name, callback_ctx);
p += sizeof(*event) + event->len; p += sizeof(*event) + event->len;
} }
return true;
} }
static FileDescriptor static FileDescriptor
@ -79,17 +77,17 @@ InotifyInit()
InotifySource::InotifySource(EventLoop &_loop, InotifySource::InotifySource(EventLoop &_loop,
mpd_inotify_callback_t _callback, void *_ctx) mpd_inotify_callback_t _callback, void *_ctx)
:SocketMonitor(SocketDescriptor::FromFileDescriptor(InotifyInit()), :socket_event(_loop, BIND_THIS_METHOD(OnSocketReady),
_loop), SocketDescriptor::FromFileDescriptor(InotifyInit())),
callback(_callback), callback_ctx(_ctx) callback(_callback), callback_ctx(_ctx)
{ {
ScheduleRead(); socket_event.ScheduleRead();
} }
int int
InotifySource::Add(const char *path_fs, unsigned mask) InotifySource::Add(const char *path_fs, unsigned mask)
{ {
auto ifd = GetSocket().ToFileDescriptor(); auto ifd = socket_event.GetSocket().ToFileDescriptor();
int wd = inotify_add_watch(ifd.Get(), path_fs, mask); int wd = inotify_add_watch(ifd.Get(), path_fs, mask);
if (wd < 0) if (wd < 0)
throw MakeErrno("inotify_add_watch() has failed"); throw MakeErrno("inotify_add_watch() has failed");
@ -100,7 +98,7 @@ InotifySource::Add(const char *path_fs, unsigned mask)
void void
InotifySource::Remove(unsigned wd) noexcept InotifySource::Remove(unsigned wd) noexcept
{ {
auto ifd = GetSocket().ToFileDescriptor(); auto ifd = socket_event.GetSocket().ToFileDescriptor();
int ret = inotify_rm_watch(ifd.Get(), wd); int ret = inotify_rm_watch(ifd.Get(), wd);
if (ret < 0 && errno != EINVAL) if (ret < 0 && errno != EINVAL)
LogErrno(inotify_domain, "inotify_rm_watch() has failed"); LogErrno(inotify_domain, "inotify_rm_watch() has failed");

View File

@ -20,12 +20,14 @@
#ifndef MPD_INOTIFY_SOURCE_HXX #ifndef MPD_INOTIFY_SOURCE_HXX
#define MPD_INOTIFY_SOURCE_HXX #define MPD_INOTIFY_SOURCE_HXX
#include "event/SocketMonitor.hxx" #include "event/SocketEvent.hxx"
typedef void (*mpd_inotify_callback_t)(int wd, unsigned mask, typedef void (*mpd_inotify_callback_t)(int wd, unsigned mask,
const char *name, void *ctx); const char *name, void *ctx);
class InotifySource final : private SocketMonitor { class InotifySource final {
SocketEvent socket_event;
mpd_inotify_callback_t callback; mpd_inotify_callback_t callback;
void *callback_ctx; void *callback_ctx;
@ -43,7 +45,7 @@ public:
mpd_inotify_callback_t callback, void *ctx); mpd_inotify_callback_t callback, void *ctx);
~InotifySource() noexcept { ~InotifySource() noexcept {
Close(); socket_event.Close();
} }
/** /**
@ -63,7 +65,7 @@ public:
void Remove(unsigned wd) noexcept; void Remove(unsigned wd) noexcept;
private: private:
bool OnSocketReady(unsigned flags) noexcept override; void OnSocketReady(unsigned flags) noexcept;
}; };
#endif #endif

View File

@ -69,7 +69,7 @@ BufferedSocket::ResumeInput() noexcept
while (true) { while (true) {
const auto buffer = input.Read(); const auto buffer = input.Read();
if (buffer.empty()) { if (buffer.empty()) {
ScheduleRead(); event.ScheduleRead();
return true; return true;
} }
@ -81,11 +81,11 @@ BufferedSocket::ResumeInput() noexcept
return false; return false;
} }
ScheduleRead(); event.ScheduleRead();
return true; return true;
case InputResult::PAUSE: case InputResult::PAUSE:
CancelRead(); event.CancelRead();
return true; return true;
case InputResult::AGAIN: case InputResult::AGAIN:
@ -97,25 +97,23 @@ BufferedSocket::ResumeInput() noexcept
} }
} }
bool void
BufferedSocket::OnSocketReady(unsigned flags) noexcept BufferedSocket::OnSocketReady(unsigned flags) noexcept
{ {
assert(IsDefined()); assert(IsDefined());
if (gcc_unlikely(flags & (ERROR|HANGUP))) { if (gcc_unlikely(flags & (SocketEvent::ERROR|SocketEvent::HANGUP))) {
OnSocketClosed(); OnSocketClosed();
return false; return;
} }
if (flags & READ) { if (flags & SocketEvent::READ) {
assert(!input.IsFull()); assert(!input.IsFull());
if (!ReadToBuffer() || !ResumeInput()) if (!ReadToBuffer() || !ResumeInput())
return false; return;
if (!input.IsFull()) if (!input.IsFull())
ScheduleRead(); event.ScheduleRead();
} }
return true;
} }

View File

@ -20,7 +20,7 @@
#ifndef MPD_BUFFERED_SOCKET_HXX #ifndef MPD_BUFFERED_SOCKET_HXX
#define MPD_BUFFERED_SOCKET_HXX #define MPD_BUFFERED_SOCKET_HXX
#include "SocketMonitor.hxx" #include "SocketEvent.hxx"
#include "util/StaticFifoBuffer.hxx" #include "util/StaticFifoBuffer.hxx"
#include <cassert> #include <cassert>
@ -30,20 +30,37 @@
class EventLoop; class EventLoop;
/** /**
* A #SocketMonitor specialization that adds an input buffer. * A #SocketEvent specialization that adds an input buffer.
*/ */
class BufferedSocket : protected SocketMonitor { class BufferedSocket {
StaticFifoBuffer<uint8_t, 8192> input; StaticFifoBuffer<uint8_t, 8192> input;
protected:
SocketEvent event;
public: public:
using ssize_t = SocketEvent::ssize_t;
BufferedSocket(SocketDescriptor _fd, EventLoop &_loop) noexcept BufferedSocket(SocketDescriptor _fd, EventLoop &_loop) noexcept
:SocketMonitor(_fd, _loop) { :event(_loop, BIND_THIS_METHOD(OnSocketReady), _fd) {
ScheduleRead(); event.ScheduleRead();
} }
using SocketMonitor::GetEventLoop; auto &GetEventLoop() const noexcept {
using SocketMonitor::IsDefined; return event.GetEventLoop();
using SocketMonitor::Close; }
bool IsDefined() const noexcept {
return event.IsDefined();
}
auto GetSocket() const noexcept {
return event.GetSocket();
}
void Close() noexcept {
event.Close();
}
private: private:
/** /**
@ -116,8 +133,7 @@ protected:
virtual void OnSocketError(std::exception_ptr ep) noexcept = 0; virtual void OnSocketError(std::exception_ptr ep) noexcept = 0;
virtual void OnSocketClosed() noexcept = 0; virtual void OnSocketClosed() noexcept = 0;
/* virtual methods from class SocketMonitor */ virtual void OnSocketReady(unsigned flags) noexcept;
bool OnSocketReady(unsigned flags) noexcept override;
}; };
#endif #endif

View File

@ -35,7 +35,7 @@ FullyBufferedSocket::DirectWrite(const void *data, size_t length) noexcept
return 0; return 0;
idle_event.Cancel(); idle_event.Cancel();
BufferedSocket::Cancel(); event.Cancel();
if (IsSocketErrorClosed(code)) if (IsSocketErrorClosed(code))
OnSocketClosed(); OnSocketClosed();
@ -54,7 +54,7 @@ FullyBufferedSocket::Flush() noexcept
const auto data = output.Read(); const auto data = output.Read();
if (data.empty()) { if (data.empty()) {
idle_event.Cancel(); idle_event.Cancel();
CancelWrite(); event.CancelWrite();
return true; return true;
} }
@ -66,7 +66,7 @@ FullyBufferedSocket::Flush() noexcept
if (output.empty()) { if (output.empty()) {
idle_event.Cancel(); idle_event.Cancel();
CancelWrite(); event.CancelWrite();
} }
return true; return true;
@ -92,23 +92,23 @@ FullyBufferedSocket::Write(const void *data, size_t length) noexcept
return true; return true;
} }
bool void
FullyBufferedSocket::OnSocketReady(unsigned flags) noexcept FullyBufferedSocket::OnSocketReady(unsigned flags) noexcept
{ {
if (flags & WRITE) { if (flags & SocketEvent::WRITE) {
assert(!output.empty()); assert(!output.empty());
assert(!idle_event.IsActive()); assert(!idle_event.IsActive());
if (!Flush()) if (!Flush())
return false; return;
} }
return BufferedSocket::OnSocketReady(flags); BufferedSocket::OnSocketReady(flags);
} }
void void
FullyBufferedSocket::OnIdle() noexcept FullyBufferedSocket::OnIdle() noexcept
{ {
if (Flush() && !output.empty()) if (Flush() && !output.empty())
ScheduleWrite(); event.ScheduleWrite();
} }

View File

@ -69,10 +69,10 @@ protected:
*/ */
bool Write(const void *data, size_t length) noexcept; bool Write(const void *data, size_t length) noexcept;
/* virtual methods from class SocketMonitor */
bool OnSocketReady(unsigned flags) noexcept override;
void OnIdle() noexcept; void OnIdle() noexcept;
/* virtual methods from class BufferedSocket */
void OnSocketReady(unsigned flags) noexcept override;
}; };
#endif #endif

View File

@ -19,7 +19,7 @@
#include "Loop.hxx" #include "Loop.hxx"
#include "TimerEvent.hxx" #include "TimerEvent.hxx"
#include "SocketMonitor.hxx" #include "SocketEvent.hxx"
#include "IdleEvent.hxx" #include "IdleEvent.hxx"
#include "util/ScopeExit.hxx" #include "util/ScopeExit.hxx"
@ -47,7 +47,7 @@ EventLoop::EventLoop(
) )
: :
#ifdef HAVE_THREADED_EVENT_LOOP #ifdef HAVE_THREADED_EVENT_LOOP
SocketMonitor(*this), wake_event(*this, BIND_THIS_METHOD(OnSocketReady)),
thread(_thread), thread(_thread),
/* if this instance is hosted by an EventThread (no ThreadId /* if this instance is hosted by an EventThread (no ThreadId
known yet) then we're not yet alive until the thread is known yet) then we're not yet alive until the thread is
@ -59,7 +59,7 @@ EventLoop::EventLoop(
quit(false) quit(false)
{ {
#ifdef HAVE_THREADED_EVENT_LOOP #ifdef HAVE_THREADED_EVENT_LOOP
SocketMonitor::Open(SocketDescriptor(wake_fd.Get())); wake_event.Open(SocketDescriptor(wake_fd.Get()));
#endif #endif
} }
@ -198,7 +198,7 @@ EventLoop::Run() noexcept
assert(alive); assert(alive);
assert(busy); assert(busy);
SocketMonitor::Schedule(SocketMonitor::READ); wake_event.Schedule(SocketEvent::READ);
#endif #endif
#ifdef HAVE_URING #ifdef HAVE_URING
@ -214,7 +214,7 @@ EventLoop::Run() noexcept
#ifdef HAVE_THREADED_EVENT_LOOP #ifdef HAVE_THREADED_EVENT_LOOP
AtScopeExit(this) { AtScopeExit(this) {
SocketMonitor::Cancel(); wake_event.Cancel();
}; };
#endif #endif
@ -262,9 +262,9 @@ EventLoop::Run() noexcept
ready_sockets.clear(); ready_sockets.clear();
for (size_t i = 0; i < poll_result.GetSize(); ++i) { for (size_t i = 0; i < poll_result.GetSize(); ++i) {
auto &sm = *(SocketMonitor *)poll_result.GetObject(i); auto &s = *(SocketEvent *)poll_result.GetObject(i);
sm.SetReadyFlags(poll_result.GetEvents(i)); s.SetReadyFlags(poll_result.GetEvents(i));
ready_sockets.push_back(sm); ready_sockets.push_back(s);
} }
now = std::chrono::steady_clock::now(); now = std::chrono::steady_clock::now();
@ -339,7 +339,7 @@ EventLoop::HandleDeferred() noexcept
} }
} }
bool void
EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
{ {
assert(IsInside()); assert(IsInside());
@ -348,8 +348,6 @@ EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
const std::lock_guard<Mutex> lock(mutex); const std::lock_guard<Mutex> lock(mutex);
HandleDeferred(); HandleDeferred();
return true;
} }
#endif #endif

View File

@ -22,7 +22,7 @@
#include "Chrono.hxx" #include "Chrono.hxx"
#include "PollGroup.hxx" #include "PollGroup.hxx"
#include "SocketMonitor.hxx" #include "SocketEvent.hxx"
#include "event/Features.h" #include "event/Features.h"
#include "util/Compiler.h" #include "util/Compiler.h"
@ -56,15 +56,13 @@ class DeferEvent;
* thread that runs it, except where explicitly documented as * thread that runs it, except where explicitly documented as
* thread-safe. * thread-safe.
* *
* @see SocketMonitor, MultiSocketMonitor, TimerEvent, IdleEvent * @see SocketEvent, MultiSocketMonitor, TimerEvent, IdleEvent
*/ */
class EventLoop final class EventLoop final
#ifdef HAVE_THREADED_EVENT_LOOP
: SocketMonitor
#endif
{ {
#ifdef HAVE_THREADED_EVENT_LOOP #ifdef HAVE_THREADED_EVENT_LOOP
WakeFD wake_fd; WakeFD wake_fd;
SocketEvent wake_event;
#endif #endif
struct TimerCompare { struct TimerCompare {
@ -96,14 +94,14 @@ class EventLoop final
#endif #endif
using ReadySocketList = using ReadySocketList =
boost::intrusive::list<SocketMonitor, boost::intrusive::list<SocketEvent,
boost::intrusive::member_hook<SocketMonitor, boost::intrusive::member_hook<SocketEvent,
SocketMonitor::ReadyListHook, SocketEvent::ReadyListHook,
&SocketMonitor::ready_siblings>, &SocketEvent::ready_siblings>,
boost::intrusive::constant_time_size<false>>; boost::intrusive::constant_time_size<false>>;
/** /**
* A linked list of #SocketMonitor instances which have a * A linked list of #SocketEvent instances which have a
* non-zero "ready_flags" field, and need to be dispatched. * non-zero "ready_flags" field, and need to be dispatched.
*/ */
ReadySocketList ready_sockets; ReadySocketList ready_sockets;
@ -190,7 +188,7 @@ public:
*/ */
void Break() noexcept; void Break() noexcept;
bool AddFD(int _fd, unsigned flags, SocketMonitor &m) noexcept { bool AddFD(int _fd, unsigned flags, SocketEvent &m) noexcept {
#ifdef HAVE_THREADED_EVENT_LOOP #ifdef HAVE_THREADED_EVENT_LOOP
assert(!IsAlive() || IsInside()); assert(!IsAlive() || IsInside());
#endif #endif
@ -198,7 +196,7 @@ public:
return poll_group.Add(_fd, flags, &m); return poll_group.Add(_fd, flags, &m);
} }
bool ModifyFD(int _fd, unsigned flags, SocketMonitor &m) noexcept { bool ModifyFD(int _fd, unsigned flags, SocketEvent &m) noexcept {
#ifdef HAVE_THREADED_EVENT_LOOP #ifdef HAVE_THREADED_EVENT_LOOP
assert(!IsAlive() || IsInside()); assert(!IsAlive() || IsInside());
#endif #endif
@ -207,7 +205,7 @@ public:
} }
/** /**
* Remove the given #SocketMonitor after the file descriptor * Remove the given #SocketEvent after the file descriptor
* has been closed. This is like RemoveFD(), but does not * has been closed. This is like RemoveFD(), but does not
* attempt to use #EPOLL_CTL_DEL. * attempt to use #EPOLL_CTL_DEL.
*/ */
@ -261,7 +259,7 @@ private:
Event::Duration HandleTimers() noexcept; Event::Duration HandleTimers() noexcept;
#ifdef HAVE_THREADED_EVENT_LOOP #ifdef HAVE_THREADED_EVENT_LOOP
bool OnSocketReady(unsigned flags) noexcept override; void OnSocketReady(unsigned flags) noexcept;
#endif #endif
public: public:

View File

@ -22,7 +22,7 @@
#include "IdleEvent.hxx" #include "IdleEvent.hxx"
#include "TimerEvent.hxx" #include "TimerEvent.hxx"
#include "SocketMonitor.hxx" #include "SocketEvent.hxx"
#include "event/Features.h" #include "event/Features.h"
#include <cassert> #include <cassert>
@ -36,35 +36,43 @@ struct pollfd;
class EventLoop; class EventLoop;
/** /**
* Similar to #SocketMonitor, but monitors multiple sockets. To use * Similar to #SocketEvent, but monitors multiple sockets. To use
* it, implement the methods PrepareSockets() and DispatchSockets(). * it, implement the methods PrepareSockets() and DispatchSockets().
* In PrepareSockets(), use UpdateSocketList() and AddSocket(). * In PrepareSockets(), use UpdateSocketList() and AddSocket().
* DispatchSockets() will be called if at least one socket is ready. * DispatchSockets() will be called if at least one socket is ready.
*/ */
class MultiSocketMonitor class MultiSocketMonitor
{ {
class SingleFD final : public SocketMonitor { class SingleFD final {
MultiSocketMonitor &multi; MultiSocketMonitor &multi;
SocketEvent event;
unsigned revents; unsigned revents;
public: public:
SingleFD(MultiSocketMonitor &_multi, SingleFD(MultiSocketMonitor &_multi,
SocketDescriptor _fd) noexcept SocketDescriptor _fd) noexcept
:SocketMonitor(_fd, _multi.GetEventLoop()), :multi(_multi),
multi(_multi), revents(0) {} event(multi.GetEventLoop(),
BIND_THIS_METHOD(OnSocketReady), _fd),
revents(0) {}
SocketDescriptor GetSocket() const noexcept { SocketDescriptor GetSocket() const noexcept {
return SocketMonitor::GetSocket(); return event.GetSocket();
} }
unsigned GetEvents() const noexcept { unsigned GetEvents() const noexcept {
return SocketMonitor::GetScheduledFlags(); return event.GetScheduledFlags();
} }
void SetEvents(unsigned _events) noexcept { void SetEvents(unsigned _events) noexcept {
revents &= _events; revents &= _events;
SocketMonitor::Schedule(_events); event.Schedule(_events);
}
bool Schedule(unsigned events) noexcept {
return event.Schedule(events);
} }
unsigned GetReturnedEvents() const noexcept { unsigned GetReturnedEvents() const noexcept {
@ -75,11 +83,10 @@ class MultiSocketMonitor
revents = 0; revents = 0;
} }
protected: private:
bool OnSocketReady(unsigned flags) noexcept override { void OnSocketReady(unsigned flags) noexcept {
revents = flags; revents = flags;
multi.SetReady(); multi.SetReady();
return true;
} }
}; };
@ -119,11 +126,6 @@ class MultiSocketMonitor
#endif #endif
public: public:
static constexpr unsigned READ = SocketMonitor::READ;
static constexpr unsigned WRITE = SocketMonitor::WRITE;
static constexpr unsigned ERROR = SocketMonitor::ERROR;
static constexpr unsigned HANGUP = SocketMonitor::HANGUP;
MultiSocketMonitor(EventLoop &_loop) noexcept; MultiSocketMonitor(EventLoop &_loop) noexcept;
EventLoop &GetEventLoop() const noexcept { EventLoop &GetEventLoop() const noexcept {

View File

@ -29,7 +29,7 @@
#include "net/Resolver.hxx" #include "net/Resolver.hxx"
#include "net/AddressInfo.hxx" #include "net/AddressInfo.hxx"
#include "net/ToString.hxx" #include "net/ToString.hxx"
#include "event/SocketMonitor.hxx" #include "event/SocketEvent.hxx"
#include "fs/AllocatedPath.hxx" #include "fs/AllocatedPath.hxx"
#include "util/RuntimeError.hxx" #include "util/RuntimeError.hxx"
#include "util/Domain.hxx" #include "util/Domain.hxx"
@ -43,9 +43,11 @@
#include <sys/stat.h> #include <sys/stat.h>
#endif #endif
class ServerSocket::OneServerSocket final : private SocketMonitor { class ServerSocket::OneServerSocket final {
ServerSocket &parent; ServerSocket &parent;
SocketEvent event;
const unsigned serial; const unsigned serial;
#ifdef HAVE_UN #ifdef HAVE_UN
@ -59,8 +61,9 @@ public:
OneServerSocket(EventLoop &_loop, ServerSocket &_parent, OneServerSocket(EventLoop &_loop, ServerSocket &_parent,
unsigned _serial, unsigned _serial,
A &&_address) noexcept A &&_address) noexcept
:SocketMonitor(_loop), :parent(_parent),
parent(_parent), serial(_serial), event(_loop, BIND_THIS_METHOD(OnSocketReady)),
serial(_serial),
#ifdef HAVE_UN #ifdef HAVE_UN
path(nullptr), path(nullptr),
#endif #endif
@ -88,10 +91,15 @@ public:
} }
#endif #endif
bool IsDefined() const noexcept {
return event.IsDefined();
}
void Open(); void Open();
using SocketMonitor::IsDefined; void Close() noexcept {
using SocketMonitor::Close; event.Close();
}
[[nodiscard]] gcc_pure [[nodiscard]] gcc_pure
std::string ToString() const noexcept { std::string ToString() const noexcept {
@ -99,14 +107,14 @@ public:
} }
void SetFD(UniqueSocketDescriptor _fd) noexcept { void SetFD(UniqueSocketDescriptor _fd) noexcept {
SocketMonitor::Open(_fd.Release()); event.Open(_fd.Release());
SocketMonitor::ScheduleRead(); event.ScheduleRead();
} }
void Accept() noexcept; void Accept() noexcept;
private: private:
bool OnSocketReady(unsigned flags) noexcept override; void OnSocketReady(unsigned flags) noexcept;
}; };
static constexpr Domain server_socket_domain("server_socket"); static constexpr Domain server_socket_domain("server_socket");
@ -140,7 +148,7 @@ inline void
ServerSocket::OneServerSocket::Accept() noexcept ServerSocket::OneServerSocket::Accept() noexcept
{ {
StaticSocketAddress peer_address; StaticSocketAddress peer_address;
UniqueSocketDescriptor peer_fd(GetSocket().AcceptNonBlock(peer_address)); UniqueSocketDescriptor peer_fd(event.GetSocket().AcceptNonBlock(peer_address));
if (!peer_fd.IsDefined()) { if (!peer_fd.IsDefined()) {
const SocketErrorMessage msg; const SocketErrorMessage msg;
FormatError(server_socket_domain, FormatError(server_socket_domain,
@ -160,11 +168,10 @@ ServerSocket::OneServerSocket::Accept() noexcept
parent.OnAccept(std::move(peer_fd), peer_address, uid); parent.OnAccept(std::move(peer_fd), peer_address, uid);
} }
bool void
ServerSocket::OneServerSocket::OnSocketReady([[maybe_unused]] unsigned flags) noexcept ServerSocket::OneServerSocket::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
{ {
Accept(); Accept();
return true;
} }
inline void inline void

View File

@ -22,7 +22,7 @@
#ifndef _WIN32 #ifndef _WIN32
#include "SocketMonitor.hxx" #include "SocketEvent.hxx"
#include "util/Manual.hxx" #include "util/Manual.hxx"
#include "system/Error.hxx" #include "system/Error.hxx"
@ -44,33 +44,37 @@
#include <pthread.h> #include <pthread.h>
#endif #endif
class SignalMonitor final : private SocketMonitor { class SignalMonitor final {
#ifdef USE_SIGNALFD #ifdef USE_SIGNALFD
SignalFD fd; SignalFD fd;
#else #else
WakeFD fd; WakeFD fd;
#endif #endif
SocketEvent event;
public: public:
explicit SignalMonitor(EventLoop &_loop) explicit SignalMonitor(EventLoop &_loop)
:SocketMonitor(_loop) { :event(_loop, BIND_THIS_METHOD(OnSocketReady)) {
#ifndef USE_SIGNALFD #ifndef USE_SIGNALFD
SocketMonitor::Open(SocketDescriptor(fd.Get())); event.Open(SocketDescriptor(fd.Get()));
SocketMonitor::ScheduleRead(); event.ScheduleRead();
#endif #endif
} }
using SocketMonitor::GetEventLoop; auto &GetEventLoop() const noexcept {
return event.GetEventLoop();
}
#ifdef USE_SIGNALFD #ifdef USE_SIGNALFD
void Update(sigset_t &mask) noexcept { void Update(sigset_t &mask) noexcept {
const bool was_open = SocketMonitor::IsDefined(); const bool was_open = event.IsDefined();
fd.Create(mask); fd.Create(mask);
if (!was_open) { if (!was_open) {
SocketMonitor::Open(SocketDescriptor(fd.Get())); event.Open(SocketDescriptor(fd.Get()));
SocketMonitor::ScheduleRead(); event.ScheduleRead();
} }
} }
#else #else
@ -80,7 +84,7 @@ public:
#endif #endif
private: private:
bool OnSocketReady(unsigned flags) noexcept override; void OnSocketReady(unsigned flags) noexcept;
}; };
/* this should be enough - is it? */ /* this should be enough - is it? */
@ -195,7 +199,7 @@ SignalMonitorRegister(int signo, SignalHandler handler)
#endif #endif
} }
bool void
SignalMonitor::OnSocketReady(unsigned) noexcept SignalMonitor::OnSocketReady(unsigned) noexcept
{ {
#ifdef USE_SIGNALFD #ifdef USE_SIGNALFD
@ -213,8 +217,6 @@ SignalMonitor::OnSocketReady(unsigned) noexcept
if (signal_pending[i].exchange(false)) if (signal_pending[i].exchange(false))
signal_handlers[i](); signal_handlers[i]();
#endif #endif
return true;
} }
#endif #endif

View File

@ -17,7 +17,7 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/ */
#include "SocketMonitor.hxx" #include "SocketEvent.hxx"
#include "Loop.hxx" #include "Loop.hxx"
#include "event/Features.h" #include "event/Features.h"
@ -29,23 +29,23 @@
#endif #endif
void void
SocketMonitor::Dispatch() noexcept SocketEvent::Dispatch() noexcept
{ {
const unsigned flags = std::exchange(ready_flags, 0) & const unsigned flags = std::exchange(ready_flags, 0) &
(GetScheduledFlags() | IMPLICIT_FLAGS); (GetScheduledFlags() | IMPLICIT_FLAGS);
if (flags != 0) if (flags != 0)
OnSocketReady(flags); callback(flags);
} }
SocketMonitor::~SocketMonitor() noexcept SocketEvent::~SocketEvent() noexcept
{ {
if (IsDefined()) if (IsDefined())
Cancel(); Cancel();
} }
void void
SocketMonitor::Open(SocketDescriptor _fd) noexcept SocketEvent::Open(SocketDescriptor _fd) noexcept
{ {
assert(!fd.IsDefined()); assert(!fd.IsDefined());
assert(_fd.IsDefined()); assert(_fd.IsDefined());
@ -54,7 +54,7 @@ SocketMonitor::Open(SocketDescriptor _fd) noexcept
} }
SocketDescriptor SocketDescriptor
SocketMonitor::Steal() noexcept SocketEvent::Steal() noexcept
{ {
assert(IsDefined()); assert(IsDefined());
@ -64,13 +64,13 @@ SocketMonitor::Steal() noexcept
} }
void void
SocketMonitor::Close() noexcept SocketEvent::Close() noexcept
{ {
Steal().Close(); Steal().Close();
} }
bool bool
SocketMonitor::Schedule(unsigned flags) noexcept SocketEvent::Schedule(unsigned flags) noexcept
{ {
assert(IsDefined()); assert(IsDefined());

View File

@ -17,11 +17,12 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/ */
#ifndef MPD_SOCKET_MONITOR_HXX #ifndef MPD_SOCKET_EVENT_HXX
#define MPD_SOCKET_MONITOR_HXX #define MPD_SOCKET_EVENT_HXX
#include "PollGroup.hxx" #include "PollGroup.hxx"
#include "net/SocketDescriptor.hxx" #include "net/SocketDescriptor.hxx"
#include "util/BindMethod.hxx"
#include <boost/intrusive/list_hook.hpp> #include <boost/intrusive/list_hook.hpp>
@ -34,8 +35,8 @@ class EventLoop;
/** /**
* Monitor events on a socket. Call Schedule() to announce events * Monitor events on a socket. Call Schedule() to announce events
* you're interested in, or Cancel() to cancel your subscription. The * you're interested in, or Cancel() to cancel your subscription. The
* #EventLoop will invoke virtual method OnSocketReady() as soon as * #EventLoop will invoke the callback as soon as any of the
* any of the subscribed events are ready. * subscribed events are ready.
* *
* This class does not feel responsible for closing the socket. Call * This class does not feel responsible for closing the socket. Call
* Close() to do it manually. * Close() to do it manually.
@ -44,14 +45,18 @@ class EventLoop;
* thread that runs the #EventLoop, except where explicitly documented * thread that runs the #EventLoop, except where explicitly documented
* as thread-safe. * as thread-safe.
*/ */
class SocketMonitor { class SocketEvent {
friend class EventLoop; friend class EventLoop;
EventLoop &loop;
using ReadyListHook = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>; using ReadyListHook = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
ReadyListHook ready_siblings; ReadyListHook ready_siblings;
using Callback = BoundMethod<void(unsigned events) noexcept>;
const Callback callback;
SocketDescriptor fd = SocketDescriptor::Undefined(); SocketDescriptor fd = SocketDescriptor::Undefined();
EventLoop &loop;
/** /**
* A bit mask of events that is currently registered in the * A bit mask of events that is currently registered in the
@ -80,13 +85,13 @@ public:
typedef std::make_signed<size_t>::type ssize_t; typedef std::make_signed<size_t>::type ssize_t;
explicit SocketMonitor(EventLoop &_loop) noexcept SocketEvent(EventLoop &_loop, Callback _callback,
:loop(_loop) {} SocketDescriptor _fd=SocketDescriptor::Undefined()) noexcept
:loop(_loop),
callback(_callback),
fd(_fd) {}
SocketMonitor(SocketDescriptor _fd, EventLoop &_loop) noexcept ~SocketEvent() noexcept;
:fd(_fd), loop(_loop) {}
~SocketMonitor() noexcept;
auto &GetEventLoop() const noexcept { auto &GetEventLoop() const noexcept {
return loop; return loop;
@ -148,12 +153,6 @@ public:
Schedule(GetScheduledFlags() & ~WRITE); Schedule(GetScheduledFlags() & ~WRITE);
} }
protected:
/**
* @return false if the socket has been closed
*/
virtual bool OnSocketReady(unsigned flags) noexcept = 0;
public: public:
void Dispatch() noexcept; void Dispatch() noexcept;
}; };

View File

@ -22,15 +22,13 @@
namespace Uring { namespace Uring {
bool void
Manager::OnSocketReady(unsigned) noexcept Manager::OnSocketReady(unsigned) noexcept
{ {
try { try {
DispatchCompletions(); DispatchCompletions();
return true;
} catch (...) { } catch (...) {
PrintException(std::current_exception()); PrintException(std::current_exception());
return false;
} }
} }

View File

@ -19,23 +19,24 @@
#pragma once #pragma once
#include "SocketMonitor.hxx" #include "SocketEvent.hxx"
#include "IdleEvent.hxx" #include "IdleEvent.hxx"
#include "io/uring/Queue.hxx" #include "io/uring/Queue.hxx"
namespace Uring { namespace Uring {
class Manager final : public Queue, SocketMonitor { class Manager final : public Queue {
SocketEvent event;
IdleEvent idle_event; IdleEvent idle_event;
public: public:
explicit Manager(EventLoop &event_loop) explicit Manager(EventLoop &event_loop)
:Queue(1024, 0), :Queue(1024, 0),
SocketMonitor(SocketDescriptor::FromFileDescriptor(GetFileDescriptor()), event(event_loop, BIND_THIS_METHOD(OnSocketReady),
event_loop), SocketDescriptor::FromFileDescriptor(GetFileDescriptor())),
idle_event(event_loop, BIND_THIS_METHOD(OnIdle)) idle_event(event_loop, BIND_THIS_METHOD(OnIdle))
{ {
SocketMonitor::ScheduleRead(); event.ScheduleRead();
} }
void Push(struct io_uring_sqe &sqe, void Push(struct io_uring_sqe &sqe,
@ -45,7 +46,7 @@ public:
} }
private: private:
bool OnSocketReady(unsigned flags) noexcept override; void OnSocketReady(unsigned flags) noexcept;
void OnIdle() noexcept; void OnIdle() noexcept;
}; };

View File

@ -26,7 +26,7 @@ event = static_library(
'IdleEvent.cxx', 'IdleEvent.cxx',
'DeferEvent.cxx', 'DeferEvent.cxx',
'MaskMonitor.cxx', 'MaskMonitor.cxx',
'SocketMonitor.cxx', 'SocketEvent.cxx',
'BufferedSocket.cxx', 'BufferedSocket.cxx',
'FullyBufferedSocket.cxx', 'FullyBufferedSocket.cxx',
'MultiSocketMonitor.cxx', 'MultiSocketMonitor.cxx',

View File

@ -31,7 +31,7 @@
#include "Request.hxx" #include "Request.hxx"
#include "Log.hxx" #include "Log.hxx"
#include "event/Loop.hxx" #include "event/Loop.hxx"
#include "event/SocketMonitor.hxx" #include "event/SocketEvent.hxx"
#include "util/RuntimeError.hxx" #include "util/RuntimeError.hxx"
#include "util/Domain.hxx" #include "util/Domain.hxx"
@ -42,12 +42,15 @@ static constexpr Domain curlm_domain("curlm");
/** /**
* Monitor for one socket created by CURL. * Monitor for one socket created by CURL.
*/ */
class CurlSocket final : SocketMonitor { class CurlSocket final {
CurlGlobal &global; CurlGlobal &global;
SocketEvent socket_event;
public: public:
CurlSocket(CurlGlobal &_global, EventLoop &_loop, SocketDescriptor _fd) CurlSocket(CurlGlobal &_global, EventLoop &_loop, SocketDescriptor _fd)
:SocketMonitor(_fd, _loop), global(_global) {} :global(_global),
socket_event(_loop, BIND_THIS_METHOD(OnSocketReady), _fd) {}
~CurlSocket() noexcept { ~CurlSocket() noexcept {
/* TODO: sometimes, CURL uses CURL_POLL_REMOVE after /* TODO: sometimes, CURL uses CURL_POLL_REMOVE after
@ -59,6 +62,10 @@ public:
better solution? */ better solution? */
} }
auto &GetEventLoop() const noexcept {
return socket_event.GetEventLoop();
}
/** /**
* Callback function for CURLMOPT_SOCKETFUNCTION. * Callback function for CURLMOPT_SOCKETFUNCTION.
*/ */
@ -66,13 +73,17 @@ public:
curl_socket_t s, int action, curl_socket_t s, int action,
void *userp, void *socketp) noexcept; void *userp, void *socketp) noexcept;
bool OnSocketReady(unsigned flags) noexcept override;
private: private:
SocketDescriptor GetSocket() const noexcept {
return socket_event.GetSocket();
}
void OnSocketReady(unsigned flags) noexcept;
static constexpr int FlagsToCurlCSelect(unsigned flags) noexcept { static constexpr int FlagsToCurlCSelect(unsigned flags) noexcept {
return (flags & (READ | HANGUP) ? CURL_CSELECT_IN : 0) | return (flags & (SocketEvent::READ | SocketEvent::HANGUP) ? CURL_CSELECT_IN : 0) |
(flags & WRITE ? CURL_CSELECT_OUT : 0) | (flags & SocketEvent::WRITE ? CURL_CSELECT_OUT : 0) |
(flags & ERROR ? CURL_CSELECT_ERR : 0); (flags & SocketEvent::ERROR ? CURL_CSELECT_ERR : 0);
} }
gcc_const gcc_const
@ -82,13 +93,13 @@ private:
return 0; return 0;
case CURL_POLL_IN: case CURL_POLL_IN:
return READ; return SocketEvent::READ;
case CURL_POLL_OUT: case CURL_POLL_OUT:
return WRITE; return SocketEvent::WRITE;
case CURL_POLL_INOUT: case CURL_POLL_INOUT:
return READ|WRITE; return SocketEvent::READ|SocketEvent::WRITE;
} }
assert(false); assert(false);
@ -130,17 +141,16 @@ CurlSocket::SocketFunction([[maybe_unused]] CURL *easy,
unsigned flags = CurlPollToFlags(action); unsigned flags = CurlPollToFlags(action);
if (flags != 0) if (flags != 0)
cs->Schedule(flags); cs->socket_event.Schedule(flags);
return 0; return 0;
} }
bool void
CurlSocket::OnSocketReady(unsigned flags) noexcept CurlSocket::OnSocketReady(unsigned flags) noexcept
{ {
assert(GetEventLoop().IsInside()); assert(GetEventLoop().IsInside());
global.SocketAction(GetSocket().Get(), FlagsToCurlCSelect(flags)); global.SocketAction(GetSocket().Get(), FlagsToCurlCSelect(flags));
return true;
} }
void void

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2007-2018 Content Management AG * Copyright 2007-2020 CM4all GmbH
* All rights reserved. * All rights reserved.
* *
* author: Max Kellermann <mk@cm4all.com> * author: Max Kellermann <mk@cm4all.com>
@ -36,8 +36,8 @@ namespace ODBus {
WatchManager::Watch::Watch(EventLoop &event_loop, WatchManager::Watch::Watch(EventLoop &event_loop,
WatchManager &_parent, DBusWatch &_watch) noexcept WatchManager &_parent, DBusWatch &_watch) noexcept
:SocketMonitor(event_loop), :parent(_parent), watch(_watch),
parent(_parent), watch(_watch) event(event_loop, BIND_THIS_METHOD(OnSocketReady))
{ {
Toggled(); Toggled();
} }
@ -45,30 +45,30 @@ WatchManager::Watch::Watch(EventLoop &event_loop,
static constexpr unsigned static constexpr unsigned
DbusToLibevent(unsigned flags) noexcept DbusToLibevent(unsigned flags) noexcept
{ {
return ((flags & DBUS_WATCH_READABLE) != 0) * SocketMonitor::READ | return ((flags & DBUS_WATCH_READABLE) != 0) * SocketEvent::READ |
((flags & DBUS_WATCH_WRITABLE) != 0) * SocketMonitor::WRITE; ((flags & DBUS_WATCH_WRITABLE) != 0) * SocketEvent::WRITE;
} }
void void
WatchManager::Watch::Toggled() noexcept WatchManager::Watch::Toggled() noexcept
{ {
if (SocketMonitor::IsDefined()) if (event.IsDefined())
SocketMonitor::Cancel(); event.Cancel();
if (dbus_watch_get_enabled(&watch)) { if (dbus_watch_get_enabled(&watch)) {
SocketMonitor::Open(SocketDescriptor(dbus_watch_get_unix_fd(&watch))); event.Open(SocketDescriptor(dbus_watch_get_unix_fd(&watch)));
SocketMonitor::Schedule(DbusToLibevent(dbus_watch_get_flags(&watch))); event.Schedule(DbusToLibevent(dbus_watch_get_flags(&watch)));
} }
} }
static constexpr unsigned static constexpr unsigned
LibeventToDbus(unsigned flags) noexcept LibeventToDbus(unsigned flags) noexcept
{ {
return ((flags & SocketMonitor::READ) != 0) * DBUS_WATCH_READABLE | return ((flags & SocketEvent::READ) != 0) * DBUS_WATCH_READABLE |
((flags & SocketMonitor::WRITE) != 0) * DBUS_WATCH_WRITABLE; ((flags & SocketEvent::WRITE) != 0) * DBUS_WATCH_WRITABLE;
} }
bool void
WatchManager::Watch::OnSocketReady(unsigned events) noexcept WatchManager::Watch::OnSocketReady(unsigned events) noexcept
{ {
/* copy the "parent" reference to the stack, because the /* copy the "parent" reference to the stack, because the
@ -79,7 +79,6 @@ WatchManager::Watch::OnSocketReady(unsigned events) noexcept
dbus_watch_handle(&watch, LibeventToDbus(events)); dbus_watch_handle(&watch, LibeventToDbus(events));
_parent.ScheduleDispatch(); _parent.ScheduleDispatch();
return true;
} }
void void

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2007-2018 Content Management AG * Copyright 2007-2020 CM4all GmbH
* All rights reserved. * All rights reserved.
* *
* author: Max Kellermann <mk@cm4all.com> * author: Max Kellermann <mk@cm4all.com>
@ -34,7 +34,7 @@
#define ODBUS_WATCH_HXX #define ODBUS_WATCH_HXX
#include "Connection.hxx" #include "Connection.hxx"
#include "event/SocketMonitor.hxx" #include "event/SocketEvent.hxx"
#include "event/DeferEvent.hxx" #include "event/DeferEvent.hxx"
#include <dbus/dbus.h> #include <dbus/dbus.h>
@ -58,9 +58,10 @@ class WatchManager {
Connection connection; Connection connection;
class Watch final : SocketMonitor { class Watch {
WatchManager &parent; WatchManager &parent;
DBusWatch &watch; DBusWatch &watch;
SocketEvent event;
public: public:
Watch(EventLoop &event_loop, WatchManager &_parent, Watch(EventLoop &event_loop, WatchManager &_parent,
@ -69,7 +70,7 @@ class WatchManager {
void Toggled() noexcept; void Toggled() noexcept;
private: private:
bool OnSocketReady(unsigned flags) noexcept override; void OnSocketReady(unsigned flags) noexcept;
}; };
std::map<DBusWatch *, Watch> watches; std::map<DBusWatch *, Watch> watches;

View File

@ -183,17 +183,17 @@ NfsConnection::CancellableCallback::Callback(int err,
static constexpr unsigned static constexpr unsigned
libnfs_to_events(int i) noexcept libnfs_to_events(int i) noexcept
{ {
return ((i & POLLIN) ? SocketMonitor::READ : 0) | return ((i & POLLIN) ? SocketEvent::READ : 0) |
((i & POLLOUT) ? SocketMonitor::WRITE : 0); ((i & POLLOUT) ? SocketEvent::WRITE : 0);
} }
static constexpr int static constexpr int
events_to_libnfs(unsigned i) noexcept events_to_libnfs(unsigned i) noexcept
{ {
return ((i & SocketMonitor::READ) ? POLLIN : 0) | return ((i & SocketEvent::READ) ? POLLIN : 0) |
((i & SocketMonitor::WRITE) ? POLLOUT : 0) | ((i & SocketEvent::WRITE) ? POLLOUT : 0) |
((i & SocketMonitor::HANGUP) ? POLLHUP : 0) | ((i & SocketEvent::HANGUP) ? POLLHUP : 0) |
((i & SocketMonitor::ERROR) ? POLLERR : 0); ((i & SocketEvent::ERROR) ? POLLERR : 0);
} }
NfsConnection::~NfsConnection() noexcept NfsConnection::~NfsConnection() noexcept
@ -403,8 +403,8 @@ NfsConnection::DestroyContext() noexcept
new leases */ new leases */
defer_new_lease.Cancel(); defer_new_lease.Cancel();
if (SocketMonitor::IsDefined()) if (socket_event.IsDefined())
SocketMonitor::Steal(); socket_event.Steal();
callbacks.ForEach([](CancellableCallback &c){ callbacks.ForEach([](CancellableCallback &c){
c.PrepareDestroyContext(); c.PrepareDestroyContext();
@ -434,25 +434,25 @@ NfsConnection::ScheduleSocket() noexcept
const int which_events = nfs_which_events(context); const int which_events = nfs_which_events(context);
if (which_events == POLLOUT && SocketMonitor::IsDefined()) if (which_events == POLLOUT && socket_event.IsDefined())
/* kludge: if libnfs asks only for POLLOUT, it means /* kludge: if libnfs asks only for POLLOUT, it means
that it is currently waiting for the connect() to that it is currently waiting for the connect() to
finish - rpc_reconnect_requeue() may have been finish - rpc_reconnect_requeue() may have been
called from inside nfs_service(); we must now called from inside nfs_service(); we must now
unregister the old socket and register the new one unregister the old socket and register the new one
instead */ instead */
SocketMonitor::Steal(); socket_event.Steal();
if (!SocketMonitor::IsDefined()) { if (!socket_event.IsDefined()) {
SocketDescriptor _fd(nfs_get_fd(context)); SocketDescriptor _fd(nfs_get_fd(context));
if (!_fd.IsDefined()) if (!_fd.IsDefined())
return; return;
_fd.EnableCloseOnExec(); _fd.EnableCloseOnExec();
SocketMonitor::Open(_fd); socket_event.Open(_fd);
} }
SocketMonitor::Schedule(libnfs_to_events(which_events)); socket_event.Schedule(libnfs_to_events(which_events));
} }
inline int inline int
@ -480,16 +480,14 @@ NfsConnection::Service(unsigned flags) noexcept
return result; return result;
} }
bool void
NfsConnection::OnSocketReady(unsigned flags) noexcept NfsConnection::OnSocketReady(unsigned flags) noexcept
{ {
assert(GetEventLoop().IsInside()); assert(GetEventLoop().IsInside());
assert(deferred_close.empty()); assert(deferred_close.empty());
bool closed = false;
const bool was_mounted = mount_finished; const bool was_mounted = mount_finished;
if (!mount_finished || (flags & SocketMonitor::HANGUP) != 0) if (!mount_finished || (flags & SocketEvent::HANGUP) != 0)
/* until the mount is finished, the NFS client may use /* until the mount is finished, the NFS client may use
various sockets, therefore we unregister and various sockets, therefore we unregister and
re-register it each time */ re-register it each time */
@ -497,7 +495,7 @@ NfsConnection::OnSocketReady(unsigned flags) noexcept
which is a sure sign that libnfs will close the which is a sure sign that libnfs will close the
socket, which can lead to a race condition if socket, which can lead to a race condition if
epoll_ctl() is called later */ epoll_ctl() is called later */
SocketMonitor::Steal(); socket_event.Steal();
const int result = Service(flags); const int result = Service(flags);
@ -509,7 +507,6 @@ NfsConnection::OnSocketReady(unsigned flags) noexcept
if (!was_mounted && mount_finished) { if (!was_mounted && mount_finished) {
if (postponed_mount_error) { if (postponed_mount_error) {
DestroyContext(); DestroyContext();
closed = true;
BroadcastMountError(std::move(postponed_mount_error)); BroadcastMountError(std::move(postponed_mount_error));
} else if (result == 0) } else if (result == 0)
BroadcastMountSuccess(); BroadcastMountSuccess();
@ -521,7 +518,6 @@ NfsConnection::OnSocketReady(unsigned flags) noexcept
BroadcastError(std::make_exception_ptr(e)); BroadcastError(std::make_exception_ptr(e));
DestroyContext(); DestroyContext();
closed = true;
} else if (nfs_get_fd(context) < 0) { } else if (nfs_get_fd(context) < 0) {
/* this happens when rpc_reconnect_requeue() is called /* this happens when rpc_reconnect_requeue() is called
after the connection broke, but autoreconnect was after the connection broke, but autoreconnect was
@ -535,7 +531,6 @@ NfsConnection::OnSocketReady(unsigned flags) noexcept
BroadcastError(std::make_exception_ptr(e)); BroadcastError(std::make_exception_ptr(e));
DestroyContext(); DestroyContext();
closed = true;
} }
assert(context == nullptr || nfs_get_fd(context) >= 0); assert(context == nullptr || nfs_get_fd(context) >= 0);
@ -547,8 +542,6 @@ NfsConnection::OnSocketReady(unsigned flags) noexcept
if (context != nullptr) if (context != nullptr)
ScheduleSocket(); ScheduleSocket();
return !closed;
} }
inline void inline void

View File

@ -21,7 +21,7 @@
#define MPD_NFS_CONNECTION_HXX #define MPD_NFS_CONNECTION_HXX
#include "Cancellable.hxx" #include "Cancellable.hxx"
#include "event/SocketMonitor.hxx" #include "event/SocketEvent.hxx"
#include "event/TimerEvent.hxx" #include "event/TimerEvent.hxx"
#include "event/DeferEvent.hxx" #include "event/DeferEvent.hxx"
#include "util/Compiler.h" #include "util/Compiler.h"
@ -40,7 +40,7 @@ class NfsLease;
/** /**
* An asynchronous connection to a NFS server. * An asynchronous connection to a NFS server.
*/ */
class NfsConnection : SocketMonitor { class NfsConnection {
class CancellableCallback : public CancellablePointer<NfsCallback> { class CancellableCallback : public CancellablePointer<NfsCallback> {
NfsConnection &connection; NfsConnection &connection;
@ -93,6 +93,7 @@ class NfsConnection : SocketMonitor {
void Callback(int err, void *data) noexcept; void Callback(int err, void *data) noexcept;
}; };
SocketEvent socket_event;
DeferEvent defer_new_lease; DeferEvent defer_new_lease;
TimerEvent mount_timeout_event; TimerEvent mount_timeout_event;
@ -141,7 +142,7 @@ public:
gcc_nonnull_all gcc_nonnull_all
NfsConnection(EventLoop &_loop, NfsConnection(EventLoop &_loop,
const char *_server, const char *_export_name) noexcept const char *_server, const char *_export_name) noexcept
:SocketMonitor(_loop), :socket_event(_loop, BIND_THIS_METHOD(OnSocketReady)),
defer_new_lease(_loop, BIND_THIS_METHOD(RunDeferred)), defer_new_lease(_loop, BIND_THIS_METHOD(RunDeferred)),
mount_timeout_event(_loop, BIND_THIS_METHOD(OnMountTimeout)), mount_timeout_event(_loop, BIND_THIS_METHOD(OnMountTimeout)),
server(_server), export_name(_export_name), server(_server), export_name(_export_name),
@ -152,6 +153,10 @@ public:
*/ */
~NfsConnection() noexcept; ~NfsConnection() noexcept;
auto &GetEventLoop() const noexcept {
return socket_event.GetEventLoop();
}
gcc_pure gcc_pure
const char *GetServer() const noexcept { const char *GetServer() const noexcept {
return server.c_str(); return server.c_str();
@ -162,8 +167,6 @@ public:
return export_name.c_str(); return export_name.c_str();
} }
using SocketMonitor::GetEventLoop;
/** /**
* Ensure that the connection is established. The connection * Ensure that the connection is established. The connection
* is kept up while at least one #NfsLease is registered. * is kept up while at least one #NfsLease is registered.
@ -231,8 +234,7 @@ private:
*/ */
int Service(unsigned flags) noexcept; int Service(unsigned flags) noexcept;
/* virtual methods from SocketMonitor */ void OnSocketReady(unsigned flags) noexcept;
bool OnSocketReady(unsigned flags) noexcept override;
/* callback for #mount_timeout_event */ /* callback for #mount_timeout_event */
void OnMountTimeout() noexcept; void OnMountTimeout() noexcept;

View File

@ -216,7 +216,7 @@ HttpdClient::CancelQueue() noexcept
ClearQueue(); ClearQueue();
if (current_page == nullptr) if (current_page == nullptr)
CancelWrite(); event.CancelWrite();
} }
ssize_t ssize_t
@ -259,7 +259,7 @@ HttpdClient::TryWrite() noexcept
/* another thread has removed the event source /* another thread has removed the event source
while this thread was waiting for while this thread was waiting for
httpd.mutex */ httpd.mutex */
CancelWrite(); event.CancelWrite();
return true; return true;
} }
@ -354,7 +354,7 @@ HttpdClient::TryWrite() noexcept
if (pages.empty()) if (pages.empty())
/* all pages are sent: remove the /* all pages are sent: remove the
event source */ event source */
CancelWrite(); event.CancelWrite();
} }
} }
@ -377,7 +377,7 @@ HttpdClient::PushPage(PagePtr page) noexcept
queue_size += page->GetSize(); queue_size += page->GetSize();
pages.emplace(std::move(page)); pages.emplace(std::move(page));
ScheduleWrite(); event.ScheduleWrite();
} }
void void
@ -389,17 +389,14 @@ HttpdClient::PushMetaData(PagePtr page) noexcept
metadata_sent = false; metadata_sent = false;
} }
bool void
HttpdClient::OnSocketReady(unsigned flags) noexcept HttpdClient::OnSocketReady(unsigned flags) noexcept
{ {
if (!BufferedSocket::OnSocketReady(flags)) if (flags & SocketEvent::WRITE)
return false;
if (flags & WRITE)
if (!TryWrite()) if (!TryWrite())
return false; return;
return true; BufferedSocket::OnSocketReady(flags);
} }
BufferedSocket::InputResult BufferedSocket::InputResult

View File

@ -196,8 +196,8 @@ private:
void ClearQueue() noexcept; void ClearQueue() noexcept;
protected: protected:
/* virtual methods from class SocketMonitor */ /* virtual methods from class BufferedSocket */
bool OnSocketReady(unsigned flags) noexcept override; void OnSocketReady(unsigned flags) noexcept override;
InputResult OnSocketInput(void *data, size_t length) noexcept override; InputResult OnSocketInput(void *data, size_t length) noexcept override;
void OnSocketError(std::exception_ptr ep) noexcept override; void OnSocketError(std::exception_ptr ep) noexcept override;

View File

@ -18,28 +18,29 @@
*/ */
#include "AvahiPoll.hxx" #include "AvahiPoll.hxx"
#include "event/SocketMonitor.hxx" #include "event/SocketEvent.hxx"
#include "event/TimerEvent.hxx" #include "event/TimerEvent.hxx"
#include "time/Convert.hxx" #include "time/Convert.hxx"
static unsigned static unsigned
FromAvahiWatchEvent(AvahiWatchEvent e) FromAvahiWatchEvent(AvahiWatchEvent e)
{ {
return (e & AVAHI_WATCH_IN ? SocketMonitor::READ : 0) | return (e & AVAHI_WATCH_IN ? SocketEvent::READ : 0) |
(e & AVAHI_WATCH_OUT ? SocketMonitor::WRITE : 0); (e & AVAHI_WATCH_OUT ? SocketEvent::WRITE : 0);
} }
static AvahiWatchEvent static AvahiWatchEvent
ToAvahiWatchEvent(unsigned e) ToAvahiWatchEvent(unsigned e)
{ {
return AvahiWatchEvent((e & SocketMonitor::READ ? AVAHI_WATCH_IN : 0) | return AvahiWatchEvent((e & SocketEvent::READ ? AVAHI_WATCH_IN : 0) |
(e & SocketMonitor::WRITE ? AVAHI_WATCH_OUT : 0) | (e & SocketEvent::WRITE ? AVAHI_WATCH_OUT : 0) |
(e & SocketMonitor::ERROR ? AVAHI_WATCH_ERR : 0) | (e & SocketEvent::ERROR ? AVAHI_WATCH_ERR : 0) |
(e & SocketMonitor::HANGUP ? AVAHI_WATCH_HUP : 0)); (e & SocketEvent::HANGUP ? AVAHI_WATCH_HUP : 0));
} }
struct AvahiWatch final : private SocketMonitor { struct AvahiWatch final {
private: SocketEvent event;
const AvahiWatchCallback callback; const AvahiWatchCallback callback;
void *const userdata; void *const userdata;
@ -49,14 +50,14 @@ public:
AvahiWatch(SocketDescriptor _fd, AvahiWatchEvent _event, AvahiWatch(SocketDescriptor _fd, AvahiWatchEvent _event,
AvahiWatchCallback _callback, void *_userdata, AvahiWatchCallback _callback, void *_userdata,
EventLoop &_loop) EventLoop &_loop)
:SocketMonitor(_fd, _loop), :event(_loop, BIND_THIS_METHOD(OnSocketReady), _fd),
callback(_callback), userdata(_userdata), callback(_callback), userdata(_userdata),
received(AvahiWatchEvent(0)) { received(AvahiWatchEvent(0)) {
Schedule(FromAvahiWatchEvent(_event)); event.Schedule(FromAvahiWatchEvent(_event));
} }
static void WatchUpdate(AvahiWatch *w, AvahiWatchEvent event) { static void WatchUpdate(AvahiWatch *w, AvahiWatchEvent event) {
w->Schedule(FromAvahiWatchEvent(event)); w->event.Schedule(FromAvahiWatchEvent(event));
} }
static AvahiWatchEvent WatchGetEvents(AvahiWatch *w) { static AvahiWatchEvent WatchGetEvents(AvahiWatch *w) {
@ -68,12 +69,10 @@ public:
} }
private: private:
/* virtual methods from class SocketMonitor */ void OnSocketReady(unsigned flags) noexcept {
bool OnSocketReady(unsigned flags) noexcept override {
received = ToAvahiWatchEvent(flags); received = ToAvahiWatchEvent(flags);
callback(this, GetSocket().Get(), received, userdata); callback(this, event.GetSocket().Get(), received, userdata);
received = AvahiWatchEvent(0); received = AvahiWatchEvent(0);
return true;
} }
}; };

View File

@ -20,7 +20,7 @@
#include "ZeroconfBonjour.hxx" #include "ZeroconfBonjour.hxx"
#include "ZeroconfInternal.hxx" #include "ZeroconfInternal.hxx"
#include "Listen.hxx" #include "Listen.hxx"
#include "event/SocketMonitor.hxx" #include "event/SocketEvent.hxx"
#include "util/Domain.hxx" #include "util/Domain.hxx"
#include "Log.hxx" #include "Log.hxx"
#include "util/Compiler.h" #include "util/Compiler.h"
@ -31,15 +31,19 @@
static constexpr Domain bonjour_domain("bonjour"); static constexpr Domain bonjour_domain("bonjour");
class BonjourMonitor final : public SocketMonitor { class BonjourMonitor final {
DNSServiceRef service_ref; DNSServiceRef service_ref;
SocketEvent socket_event;
public: public:
BonjourMonitor(EventLoop &_loop, DNSServiceRef _service_ref) BonjourMonitor(EventLoop &_loop, DNSServiceRef _service_ref)
:SocketMonitor(SocketDescriptor(DNSServiceRefSockFD(_service_ref)), :service_ref(_service_ref),
_loop), socket_event(SocketDescriptor(DNSServiceRefSockFD(service_ref)),
service_ref(_service_ref) { BIND_THIS_METHOD(OnSocketReady),
ScheduleRead(); _loop)
{
socket_event.ScheduleRead();
} }
~BonjourMonitor() { ~BonjourMonitor() {
@ -48,9 +52,8 @@ public:
protected: protected:
/* virtual methods from class SocketMonitor */ /* virtual methods from class SocketMonitor */
bool OnSocketReady([[maybe_unused]] unsigned flags) noexcept override { void OnSocketReady([[maybe_unused]] unsigned flags) noexcept override {
DNSServiceProcessResult(service_ref); DNSServiceProcessResult(service_ref);
return true;
} }
}; };