EventLoop: new implementation using epoll
Implement an event loop without GLib.
This commit is contained in:
parent
342333f72a
commit
c1f4f1fdb6
@ -41,7 +41,9 @@
|
||||
|
||||
static char *avahiName;
|
||||
static int avahiRunning;
|
||||
#ifndef USE_EPOLL
|
||||
static AvahiGLibPoll *avahi_glib_poll;
|
||||
#endif
|
||||
static const AvahiPoll *avahi_poll;
|
||||
static AvahiClient *avahiClient;
|
||||
static AvahiEntryGroup *avahiGroup;
|
||||
@ -229,9 +231,14 @@ AvahiInit(EventLoop &loop, const char *serviceName)
|
||||
|
||||
avahiRunning = 1;
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
// TODO
|
||||
(void)loop;
|
||||
#else
|
||||
avahi_glib_poll = avahi_glib_poll_new(loop.GetContext(),
|
||||
G_PRIORITY_DEFAULT);
|
||||
avahi_poll = avahi_glib_poll_get(avahi_glib_poll);
|
||||
#endif
|
||||
|
||||
avahiClient = avahi_client_new(avahi_poll, AVAHI_CLIENT_NO_FAIL,
|
||||
avahiClientCallback, NULL, &error);
|
||||
@ -258,10 +265,14 @@ AvahiDeinit(void)
|
||||
avahiClient = NULL;
|
||||
}
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
// TODO
|
||||
#else
|
||||
if (avahi_glib_poll != NULL) {
|
||||
avahi_glib_poll_free(avahi_glib_poll);
|
||||
avahi_glib_poll = NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
avahi_free(avahiName);
|
||||
avahiName = NULL;
|
||||
|
@ -27,7 +27,11 @@
|
||||
|
||||
#include <assert.h>
|
||||
|
||||
class BlockingCallMonitor final : DeferredMonitor {
|
||||
class BlockingCallMonitor final
|
||||
#ifndef USE_EPOLL
|
||||
: DeferredMonitor
|
||||
#endif
|
||||
{
|
||||
const std::function<void()> f;
|
||||
|
||||
Mutex mutex;
|
||||
@ -36,13 +40,24 @@ class BlockingCallMonitor final : DeferredMonitor {
|
||||
bool done;
|
||||
|
||||
public:
|
||||
#ifdef USE_EPOLL
|
||||
BlockingCallMonitor(EventLoop &loop, std::function<void()> &&_f)
|
||||
:f(std::move(_f)), done(false) {
|
||||
loop.AddCall([this](){
|
||||
this->DoRun();
|
||||
});
|
||||
}
|
||||
#else
|
||||
BlockingCallMonitor(EventLoop &_loop, std::function<void()> &&_f)
|
||||
:DeferredMonitor(_loop), f(std::move(_f)), done(false) {}
|
||||
#endif
|
||||
|
||||
void Run() {
|
||||
#ifndef USE_EPOLL
|
||||
assert(!done);
|
||||
|
||||
Schedule();
|
||||
#endif
|
||||
|
||||
mutex.lock();
|
||||
while (!done)
|
||||
@ -50,8 +65,18 @@ public:
|
||||
mutex.unlock();
|
||||
}
|
||||
|
||||
#ifndef USE_EPOLL
|
||||
private:
|
||||
virtual void RunDeferred() override {
|
||||
DoRun();
|
||||
}
|
||||
|
||||
#else
|
||||
public:
|
||||
#endif
|
||||
void DoRun() {
|
||||
assert(!done);
|
||||
|
||||
f();
|
||||
|
||||
mutex.lock();
|
||||
|
@ -24,20 +24,44 @@
|
||||
void
|
||||
DeferredMonitor::Cancel()
|
||||
{
|
||||
#ifdef USE_EPOLL
|
||||
pending = false;
|
||||
#else
|
||||
const auto id = source_id.exchange(0);
|
||||
if (id != 0)
|
||||
g_source_remove(id);
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
DeferredMonitor::Schedule()
|
||||
{
|
||||
#ifdef USE_EPOLL
|
||||
if (!pending.exchange(true))
|
||||
fd.Write();
|
||||
#else
|
||||
const unsigned id = loop.AddIdle(Callback, this);
|
||||
const auto old_id = source_id.exchange(id);
|
||||
if (old_id != 0)
|
||||
g_source_remove(old_id);
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
|
||||
bool
|
||||
DeferredMonitor::OnSocketReady(unsigned)
|
||||
{
|
||||
fd.Read();
|
||||
|
||||
if (pending.exchange(false))
|
||||
RunDeferred();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
void
|
||||
DeferredMonitor::Run()
|
||||
{
|
||||
@ -53,3 +77,5 @@ DeferredMonitor::Callback(gpointer data)
|
||||
monitor.Run();
|
||||
return false;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -21,8 +21,14 @@
|
||||
#define MPD_SOCKET_DEFERRED_MONITOR_HXX
|
||||
|
||||
#include "check.h"
|
||||
#include "gcc.h"
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
#include "SocketMonitor.hxx"
|
||||
#include "WakeFD.hxx"
|
||||
#else
|
||||
#include <glib.h>
|
||||
#endif
|
||||
|
||||
#include <atomic>
|
||||
|
||||
@ -31,21 +37,47 @@ class EventLoop;
|
||||
/**
|
||||
* Defer execution of an event into an #EventLoop.
|
||||
*/
|
||||
class DeferredMonitor {
|
||||
class DeferredMonitor
|
||||
#ifdef USE_EPOLL
|
||||
: private SocketMonitor
|
||||
#endif
|
||||
{
|
||||
#ifdef USE_EPOLL
|
||||
std::atomic_bool pending;
|
||||
WakeFD fd;
|
||||
#else
|
||||
EventLoop &loop;
|
||||
|
||||
std::atomic<guint> source_id;
|
||||
#endif
|
||||
|
||||
public:
|
||||
#ifdef USE_EPOLL
|
||||
DeferredMonitor(EventLoop &_loop)
|
||||
:SocketMonitor(_loop), pending(false) {
|
||||
SocketMonitor::Open(fd.Get());
|
||||
SocketMonitor::Schedule(SocketMonitor::READ);
|
||||
}
|
||||
#else
|
||||
DeferredMonitor(EventLoop &_loop)
|
||||
:loop(_loop), source_id(0) {}
|
||||
#endif
|
||||
|
||||
~DeferredMonitor() {
|
||||
#ifdef USE_EPOLL
|
||||
/* avoid closing the WakeFD twice */
|
||||
SocketMonitor::Steal();
|
||||
#else
|
||||
Cancel();
|
||||
#endif
|
||||
}
|
||||
|
||||
EventLoop &GetEventLoop() {
|
||||
#ifdef USE_EPOLL
|
||||
return SocketMonitor::GetEventLoop();
|
||||
#else
|
||||
return loop;
|
||||
#endif
|
||||
}
|
||||
|
||||
void Schedule();
|
||||
@ -55,8 +87,12 @@ protected:
|
||||
virtual void RunDeferred() = 0;
|
||||
|
||||
private:
|
||||
#ifdef USE_EPOLL
|
||||
virtual bool OnSocketReady(unsigned flags) override final;
|
||||
#else
|
||||
void Run();
|
||||
static gboolean Callback(gpointer data);
|
||||
#endif
|
||||
};
|
||||
|
||||
#endif /* MAIN_NOTIFY_H */
|
||||
|
@ -29,8 +29,13 @@ IdleMonitor::Cancel()
|
||||
if (!IsActive())
|
||||
return;
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
active = false;
|
||||
loop.RemoveIdle(*this);
|
||||
#else
|
||||
g_source_remove(source_id);
|
||||
source_id = 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
@ -42,19 +47,32 @@ IdleMonitor::Schedule()
|
||||
/* already scheduled */
|
||||
return;
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
active = true;
|
||||
loop.AddIdle(*this);
|
||||
#else
|
||||
source_id = loop.AddIdle(Callback, this);
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
IdleMonitor::Run()
|
||||
{
|
||||
assert(loop.IsInside());
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
assert(active);
|
||||
active = false;
|
||||
#else
|
||||
assert(source_id != 0);
|
||||
source_id = 0;
|
||||
#endif
|
||||
|
||||
OnIdle();
|
||||
}
|
||||
|
||||
#ifndef USE_EPOLL
|
||||
|
||||
gboolean
|
||||
IdleMonitor::Callback(gpointer data)
|
||||
{
|
||||
@ -62,3 +80,5 @@ IdleMonitor::Callback(gpointer data)
|
||||
monitor.Run();
|
||||
return false;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -22,7 +22,9 @@
|
||||
|
||||
#include "check.h"
|
||||
|
||||
#ifndef USE_EPOLL
|
||||
#include <glib.h>
|
||||
#endif
|
||||
|
||||
class EventLoop;
|
||||
|
||||
@ -32,13 +34,26 @@ class EventLoop;
|
||||
* methods must be run from EventLoop's thread.
|
||||
*/
|
||||
class IdleMonitor {
|
||||
#ifdef USE_EPOLL
|
||||
friend class EventLoop;
|
||||
#endif
|
||||
|
||||
EventLoop &loop;
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
bool active;
|
||||
#else
|
||||
guint source_id;
|
||||
#endif
|
||||
|
||||
public:
|
||||
#ifdef USE_EPOLL
|
||||
IdleMonitor(EventLoop &_loop)
|
||||
:loop(_loop), active(false) {}
|
||||
#else
|
||||
IdleMonitor(EventLoop &_loop)
|
||||
:loop(_loop), source_id(0) {}
|
||||
#endif
|
||||
|
||||
~IdleMonitor() {
|
||||
Cancel();
|
||||
@ -49,7 +64,11 @@ public:
|
||||
}
|
||||
|
||||
bool IsActive() const {
|
||||
#ifdef USE_EPOLL
|
||||
return active;
|
||||
#else
|
||||
return source_id != 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
void Schedule();
|
||||
@ -60,7 +79,9 @@ protected:
|
||||
|
||||
private:
|
||||
void Run();
|
||||
#ifndef USE_EPOLL
|
||||
static gboolean Callback(gpointer data);
|
||||
#endif
|
||||
};
|
||||
|
||||
#endif /* MAIN_NOTIFY_H */
|
||||
|
@ -19,6 +19,89 @@
|
||||
|
||||
#include "config.h"
|
||||
#include "Loop.hxx"
|
||||
#include "system/clock.h"
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
|
||||
#include "TimeoutMonitor.hxx"
|
||||
#include "SocketMonitor.hxx"
|
||||
#include "IdleMonitor.hxx"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
EventLoop::EventLoop(Default)
|
||||
:SocketMonitor(*this),
|
||||
now_ms(::monotonic_clock_ms()),
|
||||
quit(false),
|
||||
n_events(0)
|
||||
{
|
||||
SocketMonitor::Open(wake_fd.Get());
|
||||
SocketMonitor::Schedule(SocketMonitor::READ);
|
||||
}
|
||||
|
||||
EventLoop::~EventLoop()
|
||||
{
|
||||
assert(idle.empty());
|
||||
assert(timers.empty());
|
||||
|
||||
/* avoid closing the WakeFD twice */
|
||||
SocketMonitor::Steal();
|
||||
}
|
||||
|
||||
void
|
||||
EventLoop::Break()
|
||||
{
|
||||
if (IsInside())
|
||||
quit = true;
|
||||
else
|
||||
AddCall([this]() { Break(); });
|
||||
}
|
||||
|
||||
bool
|
||||
EventLoop::RemoveFD(int _fd, SocketMonitor &m)
|
||||
{
|
||||
for (unsigned i = 0, n = n_events; i < n; ++i)
|
||||
if (events[i].data.ptr == &m)
|
||||
events[i].events = 0;
|
||||
|
||||
return epoll.Remove(_fd);
|
||||
}
|
||||
|
||||
void
|
||||
EventLoop::AddIdle(IdleMonitor &i)
|
||||
{
|
||||
assert(std::find(idle.begin(), idle.end(), &i) == idle.end());
|
||||
|
||||
idle.push_back(&i);
|
||||
}
|
||||
|
||||
void
|
||||
EventLoop::RemoveIdle(IdleMonitor &i)
|
||||
{
|
||||
auto it = std::find(idle.begin(), idle.end(), &i);
|
||||
assert(it != idle.end());
|
||||
|
||||
idle.erase(it);
|
||||
}
|
||||
|
||||
void
|
||||
EventLoop::AddTimer(TimeoutMonitor &t, unsigned ms)
|
||||
{
|
||||
timers.insert(TimerRecord(t, now_ms + ms));
|
||||
}
|
||||
|
||||
void
|
||||
EventLoop::CancelTimer(TimeoutMonitor &t)
|
||||
{
|
||||
for (auto i = timers.begin(), end = timers.end(); i != end; ++i) {
|
||||
if (&i->timer == &t) {
|
||||
timers.erase(i);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
void
|
||||
EventLoop::Run()
|
||||
@ -26,11 +109,122 @@ EventLoop::Run()
|
||||
assert(thread.IsNull());
|
||||
thread = ThreadId::GetCurrent();
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
assert(!quit);
|
||||
|
||||
do {
|
||||
now_ms = ::monotonic_clock_ms();
|
||||
|
||||
/* invoke timers */
|
||||
|
||||
int timeout_ms;
|
||||
while (true) {
|
||||
auto i = timers.begin();
|
||||
if (i == timers.end()) {
|
||||
timeout_ms = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
timeout_ms = i->due_ms - now_ms;
|
||||
if (timeout_ms > 0)
|
||||
break;
|
||||
|
||||
TimeoutMonitor &m = i->timer;
|
||||
timers.erase(i);
|
||||
|
||||
m.Run();
|
||||
|
||||
if (quit)
|
||||
return;
|
||||
}
|
||||
|
||||
/* invoke idle */
|
||||
|
||||
const bool idle_empty = idle.empty();
|
||||
while (!idle.empty()) {
|
||||
IdleMonitor &m = *idle.front();
|
||||
idle.pop_front();
|
||||
m.Run();
|
||||
|
||||
if (quit)
|
||||
return;
|
||||
}
|
||||
|
||||
if (!idle_empty)
|
||||
/* re-evaluate timers because one of the
|
||||
IdleMonitors may have added a new
|
||||
timeout */
|
||||
continue;
|
||||
|
||||
/* wait for new event */
|
||||
|
||||
const int n = epoll.Wait(events, MAX_EVENTS, timeout_ms);
|
||||
n_events = std::max(n, 0);
|
||||
|
||||
now_ms = ::monotonic_clock_ms();
|
||||
|
||||
assert(!quit);
|
||||
|
||||
/* invoke sockets */
|
||||
|
||||
for (int i = 0; i < n; ++i) {
|
||||
const auto &e = events[i];
|
||||
|
||||
if (e.events != 0) {
|
||||
SocketMonitor &m = *(SocketMonitor *)e.data.ptr;
|
||||
m.Dispatch(e.events);
|
||||
|
||||
if (quit)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
n_events = 0;
|
||||
} while (!quit);
|
||||
#else
|
||||
g_main_loop_run(loop);
|
||||
#endif
|
||||
|
||||
assert(thread.IsInside());
|
||||
}
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
|
||||
void
|
||||
EventLoop::AddCall(std::function<void()> &&f)
|
||||
{
|
||||
mutex.lock();
|
||||
calls.push_back(f);
|
||||
mutex.unlock();
|
||||
|
||||
wake_fd.Write();
|
||||
}
|
||||
|
||||
bool
|
||||
EventLoop::OnSocketReady(gcc_unused unsigned flags)
|
||||
{
|
||||
assert(!quit);
|
||||
|
||||
wake_fd.Read();
|
||||
|
||||
mutex.lock();
|
||||
|
||||
while (!calls.empty() && !quit) {
|
||||
auto f = std::move(calls.front());
|
||||
calls.pop_front();
|
||||
|
||||
mutex.unlock();
|
||||
f();
|
||||
mutex.lock();
|
||||
}
|
||||
|
||||
mutex.unlock();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
guint
|
||||
EventLoop::AddIdle(GSourceFunc function, gpointer data)
|
||||
{
|
||||
@ -60,3 +254,5 @@ EventLoop::AddTimeoutSeconds(guint interval_s,
|
||||
g_source_attach(source, GetContext());
|
||||
return source;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -24,13 +24,76 @@
|
||||
#include "thread/Id.hxx"
|
||||
#include "gcc.h"
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
#include "system/EPollFD.hxx"
|
||||
#include "thread/Mutex.hxx"
|
||||
#include "WakeFD.hxx"
|
||||
#include "SocketMonitor.hxx"
|
||||
|
||||
#include <functional>
|
||||
#include <list>
|
||||
#include <set>
|
||||
#else
|
||||
#include <glib.h>
|
||||
#endif
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
class TimeoutMonitor;
|
||||
class IdleMonitor;
|
||||
class SocketMonitor;
|
||||
#endif
|
||||
|
||||
#include <assert.h>
|
||||
|
||||
class EventLoop {
|
||||
class EventLoop final
|
||||
#ifdef USE_EPOLL
|
||||
: private SocketMonitor
|
||||
#endif
|
||||
{
|
||||
#ifdef USE_EPOLL
|
||||
struct TimerRecord {
|
||||
/**
|
||||
* Projected monotonic_clock_ms() value when this
|
||||
* timer is due.
|
||||
*/
|
||||
const unsigned due_ms;
|
||||
|
||||
TimeoutMonitor &timer;
|
||||
|
||||
constexpr TimerRecord(TimeoutMonitor &_timer,
|
||||
unsigned _due_ms)
|
||||
:due_ms(_due_ms), timer(_timer) {}
|
||||
|
||||
bool operator<(const TimerRecord &other) const {
|
||||
return due_ms < other.due_ms;
|
||||
}
|
||||
|
||||
bool IsDue(unsigned _now_ms) const {
|
||||
return _now_ms >= due_ms;
|
||||
}
|
||||
};
|
||||
|
||||
EPollFD epoll;
|
||||
|
||||
WakeFD wake_fd;
|
||||
|
||||
std::multiset<TimerRecord> timers;
|
||||
std::list<IdleMonitor *> idle;
|
||||
|
||||
Mutex mutex;
|
||||
std::list<std::function<void()>> calls;
|
||||
|
||||
unsigned now_ms;
|
||||
|
||||
bool quit;
|
||||
|
||||
static constexpr unsigned MAX_EVENTS = 16;
|
||||
unsigned n_events;
|
||||
epoll_event events[MAX_EVENTS];
|
||||
#else
|
||||
GMainContext *context;
|
||||
GMainLoop *loop;
|
||||
#endif
|
||||
|
||||
/**
|
||||
* A reference to the thread that is currently inside Run().
|
||||
@ -38,6 +101,43 @@ class EventLoop {
|
||||
ThreadId thread;
|
||||
|
||||
public:
|
||||
#ifdef USE_EPOLL
|
||||
struct Default {};
|
||||
|
||||
EventLoop(Default dummy=Default());
|
||||
~EventLoop();
|
||||
|
||||
unsigned GetTimeMS() const {
|
||||
return now_ms;
|
||||
}
|
||||
|
||||
void Break();
|
||||
|
||||
bool AddFD(int _fd, unsigned flags, SocketMonitor &m) {
|
||||
return epoll.Add(_fd, flags, &m);
|
||||
}
|
||||
|
||||
bool ModifyFD(int _fd, unsigned flags, SocketMonitor &m) {
|
||||
return epoll.Modify(_fd, flags, &m);
|
||||
}
|
||||
|
||||
bool RemoveFD(int fd, SocketMonitor &m);
|
||||
|
||||
void AddIdle(IdleMonitor &i);
|
||||
void RemoveIdle(IdleMonitor &i);
|
||||
|
||||
void AddTimer(TimeoutMonitor &t, unsigned ms);
|
||||
void CancelTimer(TimeoutMonitor &t);
|
||||
|
||||
void AddCall(std::function<void()> &&f);
|
||||
|
||||
void Run();
|
||||
|
||||
private:
|
||||
virtual bool OnSocketReady(unsigned flags) override;
|
||||
|
||||
public:
|
||||
#else
|
||||
EventLoop()
|
||||
:context(g_main_context_new()),
|
||||
loop(g_main_loop_new(context, false)),
|
||||
@ -54,16 +154,6 @@ public:
|
||||
g_main_context_unref(context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Are we currently running inside this EventLoop's thread?
|
||||
*/
|
||||
gcc_pure
|
||||
bool IsInside() const {
|
||||
assert(!thread.IsNull());
|
||||
|
||||
return thread.IsInside();
|
||||
}
|
||||
|
||||
GMainContext *GetContext() {
|
||||
return context;
|
||||
}
|
||||
@ -85,6 +175,17 @@ public:
|
||||
|
||||
GSource *AddTimeoutSeconds(guint interval_s,
|
||||
GSourceFunc function, gpointer data);
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Are we currently running inside this EventLoop's thread?
|
||||
*/
|
||||
gcc_pure
|
||||
bool IsInside() const {
|
||||
assert(!thread.IsNull());
|
||||
|
||||
return thread.IsInside();
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* MAIN_NOTIFY_H */
|
||||
|
@ -25,6 +25,48 @@
|
||||
|
||||
#include <assert.h>
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
|
||||
MultiSocketMonitor::MultiSocketMonitor(EventLoop &_loop)
|
||||
:IdleMonitor(_loop), TimeoutMonitor(_loop), ready(false) {
|
||||
}
|
||||
|
||||
MultiSocketMonitor::~MultiSocketMonitor()
|
||||
{
|
||||
// TODO
|
||||
}
|
||||
|
||||
void
|
||||
MultiSocketMonitor::Prepare()
|
||||
{
|
||||
int timeout_ms = PrepareSockets();
|
||||
if (timeout_ms >= 0)
|
||||
TimeoutMonitor::Schedule(timeout_ms);
|
||||
else
|
||||
TimeoutMonitor::Cancel();
|
||||
|
||||
}
|
||||
|
||||
void
|
||||
MultiSocketMonitor::OnIdle()
|
||||
{
|
||||
if (ready) {
|
||||
ready = false;
|
||||
DispatchSockets();
|
||||
|
||||
/* TODO: don't refresh always; require users to call
|
||||
InvalidateSockets() */
|
||||
refresh = true;
|
||||
}
|
||||
|
||||
if (refresh) {
|
||||
refresh = false;
|
||||
Prepare();
|
||||
}
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
/**
|
||||
* The vtable for our GSource implementation. Unfortunately, we
|
||||
* cannot declare it "const", because g_source_new() takes a non-const
|
||||
@ -117,3 +159,5 @@ MultiSocketMonitor::Dispatch(GSource *_source,
|
||||
monitor.Dispatch();
|
||||
return true;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -22,10 +22,17 @@
|
||||
|
||||
#include "check.h"
|
||||
#include "gcc.h"
|
||||
#include "glib_compat.h"
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
#include "IdleMonitor.hxx"
|
||||
#include "TimeoutMonitor.hxx"
|
||||
#include "SocketMonitor.hxx"
|
||||
#else
|
||||
#include "glib_compat.h"
|
||||
#include <glib.h>
|
||||
|
||||
#endif
|
||||
|
||||
#include <forward_list>
|
||||
|
||||
#include <assert.h>
|
||||
@ -44,7 +51,57 @@ class EventLoop;
|
||||
/**
|
||||
* Monitor multiple sockets.
|
||||
*/
|
||||
class MultiSocketMonitor {
|
||||
class MultiSocketMonitor
|
||||
#ifdef USE_EPOLL
|
||||
: private IdleMonitor, private TimeoutMonitor
|
||||
#endif
|
||||
{
|
||||
#ifdef USE_EPOLL
|
||||
class SingleFD final : public SocketMonitor {
|
||||
MultiSocketMonitor &multi;
|
||||
|
||||
unsigned revents;
|
||||
|
||||
public:
|
||||
SingleFD(MultiSocketMonitor &_multi, int _fd, unsigned events)
|
||||
:SocketMonitor(_fd, _multi.GetEventLoop()),
|
||||
multi(_multi), revents(0) {
|
||||
Schedule(events);
|
||||
}
|
||||
|
||||
int GetFD() const {
|
||||
return SocketMonitor::Get();
|
||||
}
|
||||
|
||||
unsigned GetEvents() const {
|
||||
return SocketMonitor::GetScheduledFlags();
|
||||
}
|
||||
|
||||
void SetEvents(unsigned _events) {
|
||||
revents &= _events;
|
||||
SocketMonitor::Schedule(_events);
|
||||
}
|
||||
|
||||
unsigned GetReturnedEvents() const {
|
||||
return revents;
|
||||
}
|
||||
|
||||
void ClearReturnedEvents() {
|
||||
revents = 0;
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual bool OnSocketReady(unsigned flags) override {
|
||||
revents = flags;
|
||||
multi.SetReady();
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
friend class SingleFD;
|
||||
|
||||
bool ready, refresh;
|
||||
#else
|
||||
struct Source {
|
||||
GSource base;
|
||||
|
||||
@ -78,34 +135,57 @@ class MultiSocketMonitor {
|
||||
EventLoop &loop;
|
||||
Source *source;
|
||||
uint64_t absolute_timeout_us;
|
||||
#endif
|
||||
|
||||
std::forward_list<SingleFD> fds;
|
||||
|
||||
public:
|
||||
#ifdef USE_EPOLL
|
||||
static constexpr unsigned READ = SocketMonitor::READ;
|
||||
static constexpr unsigned WRITE = SocketMonitor::WRITE;
|
||||
static constexpr unsigned ERROR = SocketMonitor::ERROR;
|
||||
static constexpr unsigned HANGUP = SocketMonitor::HANGUP;
|
||||
#else
|
||||
static constexpr unsigned READ = G_IO_IN;
|
||||
static constexpr unsigned WRITE = G_IO_OUT;
|
||||
static constexpr unsigned ERROR = G_IO_ERR;
|
||||
static constexpr unsigned HANGUP = G_IO_HUP;
|
||||
#endif
|
||||
|
||||
MultiSocketMonitor(EventLoop &_loop);
|
||||
~MultiSocketMonitor();
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
using IdleMonitor::GetEventLoop;
|
||||
#else
|
||||
EventLoop &GetEventLoop() {
|
||||
return loop;
|
||||
}
|
||||
#endif
|
||||
|
||||
public:
|
||||
#ifndef USE_EPOLL
|
||||
gcc_pure
|
||||
uint64_t GetTime() const {
|
||||
return g_source_get_time(&source->base);
|
||||
}
|
||||
#endif
|
||||
|
||||
void InvalidateSockets() {
|
||||
#ifdef USE_EPOLL
|
||||
refresh = true;
|
||||
IdleMonitor::Schedule();
|
||||
#else
|
||||
/* no-op because GLib always calls the GSource's
|
||||
"prepare" method before each poll() anyway */
|
||||
#endif
|
||||
}
|
||||
|
||||
void AddSocket(int fd, unsigned events) {
|
||||
fds.emplace_front(*this, fd, events);
|
||||
#ifndef USE_EPOLL
|
||||
g_source_add_poll(&source->base, &fds.front().pfd);
|
||||
#endif
|
||||
}
|
||||
|
||||
template<typename E>
|
||||
@ -120,7 +200,11 @@ public:
|
||||
i->SetEvents(events);
|
||||
prev = i;
|
||||
} else {
|
||||
#ifdef USE_EPOLL
|
||||
i->Steal();
|
||||
#else
|
||||
g_source_remove_poll(&source->base, &i->pfd);
|
||||
#endif
|
||||
fds.erase_after(prev);
|
||||
}
|
||||
}
|
||||
@ -133,6 +217,23 @@ protected:
|
||||
virtual int PrepareSockets() = 0;
|
||||
virtual void DispatchSockets() = 0;
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
private:
|
||||
void SetReady() {
|
||||
ready = true;
|
||||
IdleMonitor::Schedule();
|
||||
}
|
||||
|
||||
void Prepare();
|
||||
|
||||
virtual void OnTimeout() final {
|
||||
SetReady();
|
||||
IdleMonitor::Schedule();
|
||||
}
|
||||
|
||||
virtual void OnIdle() final;
|
||||
|
||||
#else
|
||||
public:
|
||||
/* GSource callbacks */
|
||||
static gboolean Prepare(GSource *source, gint *timeout_r);
|
||||
@ -147,6 +248,7 @@ private:
|
||||
void Dispatch() {
|
||||
DispatchSockets();
|
||||
}
|
||||
#endif
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -32,6 +32,19 @@
|
||||
#include <sys/socket.h>
|
||||
#endif
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
|
||||
void
|
||||
SocketMonitor::Dispatch(unsigned flags)
|
||||
{
|
||||
flags &= GetScheduledFlags();
|
||||
|
||||
if (flags != 0 && !OnSocketReady(flags) && IsDefined())
|
||||
Cancel();
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
/*
|
||||
* GSource methods
|
||||
*
|
||||
@ -88,6 +101,8 @@ SocketMonitor::SocketMonitor(int _fd, EventLoop &_loop)
|
||||
Open(_fd);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
SocketMonitor::~SocketMonitor()
|
||||
{
|
||||
if (IsDefined())
|
||||
@ -98,10 +113,14 @@ void
|
||||
SocketMonitor::Open(int _fd)
|
||||
{
|
||||
assert(fd < 0);
|
||||
#ifndef USE_EPOLL
|
||||
assert(source == nullptr);
|
||||
#endif
|
||||
assert(_fd >= 0);
|
||||
|
||||
fd = _fd;
|
||||
|
||||
#ifndef USE_EPOLL
|
||||
poll = {fd, 0, 0};
|
||||
|
||||
source = (Source *)g_source_new(&socket_monitor_source_funcs,
|
||||
@ -110,6 +129,7 @@ SocketMonitor::Open(int _fd)
|
||||
|
||||
g_source_attach(&source->base, loop.GetContext());
|
||||
g_source_add_poll(&source->base, &poll);
|
||||
#endif
|
||||
}
|
||||
|
||||
int
|
||||
@ -122,9 +142,11 @@ SocketMonitor::Steal()
|
||||
int result = fd;
|
||||
fd = -1;
|
||||
|
||||
#ifndef USE_EPOLL
|
||||
g_source_destroy(&source->base);
|
||||
g_source_unref(&source->base);
|
||||
source = nullptr;
|
||||
#endif
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -143,10 +165,21 @@ SocketMonitor::Schedule(unsigned flags)
|
||||
if (flags == GetScheduledFlags())
|
||||
return;
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
if (scheduled_flags == 0)
|
||||
loop.AddFD(fd, flags, *this);
|
||||
else if (flags == 0)
|
||||
loop.RemoveFD(fd, *this);
|
||||
else
|
||||
loop.ModifyFD(fd, flags, *this);
|
||||
|
||||
scheduled_flags = flags;
|
||||
#else
|
||||
poll.events = flags;
|
||||
poll.revents &= flags;
|
||||
|
||||
loop.WakeUp();
|
||||
#endif
|
||||
}
|
||||
|
||||
SocketMonitor::ssize_t
|
||||
|
@ -22,7 +22,11 @@
|
||||
|
||||
#include "check.h"
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
#include <sys/epoll.h>
|
||||
#else
|
||||
#include <glib.h>
|
||||
#endif
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
@ -40,29 +44,55 @@
|
||||
class EventLoop;
|
||||
|
||||
class SocketMonitor {
|
||||
#ifdef USE_EPOLL
|
||||
#else
|
||||
struct Source {
|
||||
GSource base;
|
||||
|
||||
SocketMonitor *monitor;
|
||||
};
|
||||
#endif
|
||||
|
||||
int fd;
|
||||
EventLoop &loop;
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
/**
|
||||
* A bit mask of events that is currently registered in the EventLoop.
|
||||
*/
|
||||
unsigned scheduled_flags;
|
||||
#else
|
||||
Source *source;
|
||||
GPollFD poll;
|
||||
#endif
|
||||
|
||||
public:
|
||||
#ifdef USE_EPOLL
|
||||
static constexpr unsigned READ = EPOLLIN;
|
||||
static constexpr unsigned WRITE = EPOLLOUT;
|
||||
static constexpr unsigned ERROR = EPOLLERR;
|
||||
static constexpr unsigned HANGUP = EPOLLHUP;
|
||||
#else
|
||||
static constexpr unsigned READ = G_IO_IN;
|
||||
static constexpr unsigned WRITE = G_IO_OUT;
|
||||
static constexpr unsigned ERROR = G_IO_ERR;
|
||||
static constexpr unsigned HANGUP = G_IO_HUP;
|
||||
#endif
|
||||
|
||||
typedef std::make_signed<size_t>::type ssize_t;
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
SocketMonitor(EventLoop &_loop)
|
||||
:fd(-1), loop(_loop), scheduled_flags(0) {}
|
||||
|
||||
SocketMonitor(int _fd, EventLoop &_loop)
|
||||
:fd(_fd), loop(_loop), scheduled_flags(0) {}
|
||||
#else
|
||||
SocketMonitor(EventLoop &_loop)
|
||||
:fd(-1), loop(_loop), source(nullptr) {}
|
||||
|
||||
SocketMonitor(int _fd, EventLoop &_loop);
|
||||
#endif
|
||||
|
||||
~SocketMonitor();
|
||||
|
||||
@ -93,7 +123,11 @@ public:
|
||||
unsigned GetScheduledFlags() const {
|
||||
assert(IsDefined());
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
return scheduled_flags;
|
||||
#else
|
||||
return poll.events;
|
||||
#endif
|
||||
}
|
||||
|
||||
void Schedule(unsigned flags);
|
||||
@ -128,6 +162,9 @@ protected:
|
||||
virtual bool OnSocketReady(unsigned flags) = 0;
|
||||
|
||||
public:
|
||||
#ifdef USE_EPOLL
|
||||
void Dispatch(unsigned flags);
|
||||
#else
|
||||
/* GSource callbacks */
|
||||
static gboolean Prepare(GSource *source, gint *timeout_r);
|
||||
static gboolean Check(GSource *source);
|
||||
@ -146,6 +183,7 @@ private:
|
||||
|
||||
OnSocketReady(poll.revents & poll.events);
|
||||
}
|
||||
#endif
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -24,10 +24,15 @@
|
||||
void
|
||||
TimeoutMonitor::Cancel()
|
||||
{
|
||||
if (source != nullptr) {
|
||||
if (IsActive()) {
|
||||
#ifdef USE_EPOLL
|
||||
active = false;
|
||||
loop.CancelTimer(*this);
|
||||
#else
|
||||
g_source_destroy(source);
|
||||
g_source_unref(source);
|
||||
source = nullptr;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,23 +40,39 @@ void
|
||||
TimeoutMonitor::Schedule(unsigned ms)
|
||||
{
|
||||
Cancel();
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
active = true;
|
||||
loop.AddTimer(*this, ms);
|
||||
#else
|
||||
source = loop.AddTimeout(ms, Callback, this);
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
TimeoutMonitor::ScheduleSeconds(unsigned s)
|
||||
{
|
||||
Cancel();
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
Schedule(s * 1000u);
|
||||
#else
|
||||
source = loop.AddTimeoutSeconds(s, Callback, this);
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
TimeoutMonitor::Run()
|
||||
{
|
||||
#ifndef USE_EPOLL
|
||||
Cancel();
|
||||
#endif
|
||||
|
||||
OnTimeout();
|
||||
}
|
||||
|
||||
#ifndef USE_EPOLL
|
||||
|
||||
gboolean
|
||||
TimeoutMonitor::Callback(gpointer data)
|
||||
{
|
||||
@ -59,3 +80,5 @@ TimeoutMonitor::Callback(gpointer data)
|
||||
monitor.Run();
|
||||
return false;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -22,17 +22,34 @@
|
||||
|
||||
#include "check.h"
|
||||
|
||||
#ifndef USE_EPOLL
|
||||
#include <glib.h>
|
||||
#endif
|
||||
|
||||
class EventLoop;
|
||||
|
||||
class TimeoutMonitor {
|
||||
#ifdef USE_EPOLL
|
||||
friend class EventLoop;
|
||||
#endif
|
||||
|
||||
EventLoop &loop;
|
||||
|
||||
#ifdef USE_EPOLL
|
||||
bool active;
|
||||
#else
|
||||
GSource *source;
|
||||
#endif
|
||||
|
||||
public:
|
||||
#ifdef USE_EPOLL
|
||||
TimeoutMonitor(EventLoop &_loop)
|
||||
:loop(_loop), active(false) {
|
||||
}
|
||||
#else
|
||||
TimeoutMonitor(EventLoop &_loop)
|
||||
:loop(_loop), source(nullptr) {}
|
||||
#endif
|
||||
|
||||
~TimeoutMonitor() {
|
||||
Cancel();
|
||||
@ -43,7 +60,11 @@ public:
|
||||
}
|
||||
|
||||
bool IsActive() const {
|
||||
#ifdef USE_EPOLL
|
||||
return active;
|
||||
#else
|
||||
return source != nullptr;
|
||||
#endif
|
||||
}
|
||||
|
||||
void Schedule(unsigned ms);
|
||||
@ -55,7 +76,10 @@ protected:
|
||||
|
||||
private:
|
||||
void Run();
|
||||
|
||||
#ifndef USE_EPOLL
|
||||
static gboolean Callback(gpointer data);
|
||||
#endif
|
||||
};
|
||||
|
||||
#endif /* MAIN_NOTIFY_H */
|
||||
|
Loading…
Reference in New Issue
Block a user