EventLoop: new implementation using epoll

Implement an event loop without GLib.
This commit is contained in:
Max Kellermann 2013-08-07 22:16:59 +02:00
parent 342333f72a
commit c1f4f1fdb6
14 changed files with 716 additions and 16 deletions

View File

@ -41,7 +41,9 @@
static char *avahiName; static char *avahiName;
static int avahiRunning; static int avahiRunning;
#ifndef USE_EPOLL
static AvahiGLibPoll *avahi_glib_poll; static AvahiGLibPoll *avahi_glib_poll;
#endif
static const AvahiPoll *avahi_poll; static const AvahiPoll *avahi_poll;
static AvahiClient *avahiClient; static AvahiClient *avahiClient;
static AvahiEntryGroup *avahiGroup; static AvahiEntryGroup *avahiGroup;
@ -229,9 +231,14 @@ AvahiInit(EventLoop &loop, const char *serviceName)
avahiRunning = 1; avahiRunning = 1;
#ifdef USE_EPOLL
// TODO
(void)loop;
#else
avahi_glib_poll = avahi_glib_poll_new(loop.GetContext(), avahi_glib_poll = avahi_glib_poll_new(loop.GetContext(),
G_PRIORITY_DEFAULT); G_PRIORITY_DEFAULT);
avahi_poll = avahi_glib_poll_get(avahi_glib_poll); avahi_poll = avahi_glib_poll_get(avahi_glib_poll);
#endif
avahiClient = avahi_client_new(avahi_poll, AVAHI_CLIENT_NO_FAIL, avahiClient = avahi_client_new(avahi_poll, AVAHI_CLIENT_NO_FAIL,
avahiClientCallback, NULL, &error); avahiClientCallback, NULL, &error);
@ -258,10 +265,14 @@ AvahiDeinit(void)
avahiClient = NULL; avahiClient = NULL;
} }
#ifdef USE_EPOLL
// TODO
#else
if (avahi_glib_poll != NULL) { if (avahi_glib_poll != NULL) {
avahi_glib_poll_free(avahi_glib_poll); avahi_glib_poll_free(avahi_glib_poll);
avahi_glib_poll = NULL; avahi_glib_poll = NULL;
} }
#endif
avahi_free(avahiName); avahi_free(avahiName);
avahiName = NULL; avahiName = NULL;

View File

@ -27,7 +27,11 @@
#include <assert.h> #include <assert.h>
class BlockingCallMonitor final : DeferredMonitor { class BlockingCallMonitor final
#ifndef USE_EPOLL
: DeferredMonitor
#endif
{
const std::function<void()> f; const std::function<void()> f;
Mutex mutex; Mutex mutex;
@ -36,13 +40,24 @@ class BlockingCallMonitor final : DeferredMonitor {
bool done; bool done;
public: public:
#ifdef USE_EPOLL
BlockingCallMonitor(EventLoop &loop, std::function<void()> &&_f)
:f(std::move(_f)), done(false) {
loop.AddCall([this](){
this->DoRun();
});
}
#else
BlockingCallMonitor(EventLoop &_loop, std::function<void()> &&_f) BlockingCallMonitor(EventLoop &_loop, std::function<void()> &&_f)
:DeferredMonitor(_loop), f(std::move(_f)), done(false) {} :DeferredMonitor(_loop), f(std::move(_f)), done(false) {}
#endif
void Run() { void Run() {
#ifndef USE_EPOLL
assert(!done); assert(!done);
Schedule(); Schedule();
#endif
mutex.lock(); mutex.lock();
while (!done) while (!done)
@ -50,8 +65,18 @@ public:
mutex.unlock(); mutex.unlock();
} }
#ifndef USE_EPOLL
private: private:
virtual void RunDeferred() override { virtual void RunDeferred() override {
DoRun();
}
#else
public:
#endif
void DoRun() {
assert(!done);
f(); f();
mutex.lock(); mutex.lock();

View File

@ -24,20 +24,44 @@
void void
DeferredMonitor::Cancel() DeferredMonitor::Cancel()
{ {
#ifdef USE_EPOLL
pending = false;
#else
const auto id = source_id.exchange(0); const auto id = source_id.exchange(0);
if (id != 0) if (id != 0)
g_source_remove(id); g_source_remove(id);
#endif
} }
void void
DeferredMonitor::Schedule() DeferredMonitor::Schedule()
{ {
#ifdef USE_EPOLL
if (!pending.exchange(true))
fd.Write();
#else
const unsigned id = loop.AddIdle(Callback, this); const unsigned id = loop.AddIdle(Callback, this);
const auto old_id = source_id.exchange(id); const auto old_id = source_id.exchange(id);
if (old_id != 0) if (old_id != 0)
g_source_remove(old_id); g_source_remove(old_id);
#endif
} }
#ifdef USE_EPOLL
bool
DeferredMonitor::OnSocketReady(unsigned)
{
fd.Read();
if (pending.exchange(false))
RunDeferred();
return true;
}
#else
void void
DeferredMonitor::Run() DeferredMonitor::Run()
{ {
@ -53,3 +77,5 @@ DeferredMonitor::Callback(gpointer data)
monitor.Run(); monitor.Run();
return false; return false;
} }
#endif

View File

@ -21,8 +21,14 @@
#define MPD_SOCKET_DEFERRED_MONITOR_HXX #define MPD_SOCKET_DEFERRED_MONITOR_HXX
#include "check.h" #include "check.h"
#include "gcc.h"
#ifdef USE_EPOLL
#include "SocketMonitor.hxx"
#include "WakeFD.hxx"
#else
#include <glib.h> #include <glib.h>
#endif
#include <atomic> #include <atomic>
@ -31,21 +37,47 @@ class EventLoop;
/** /**
* Defer execution of an event into an #EventLoop. * Defer execution of an event into an #EventLoop.
*/ */
class DeferredMonitor { class DeferredMonitor
#ifdef USE_EPOLL
: private SocketMonitor
#endif
{
#ifdef USE_EPOLL
std::atomic_bool pending;
WakeFD fd;
#else
EventLoop &loop; EventLoop &loop;
std::atomic<guint> source_id; std::atomic<guint> source_id;
#endif
public: public:
#ifdef USE_EPOLL
DeferredMonitor(EventLoop &_loop)
:SocketMonitor(_loop), pending(false) {
SocketMonitor::Open(fd.Get());
SocketMonitor::Schedule(SocketMonitor::READ);
}
#else
DeferredMonitor(EventLoop &_loop) DeferredMonitor(EventLoop &_loop)
:loop(_loop), source_id(0) {} :loop(_loop), source_id(0) {}
#endif
~DeferredMonitor() { ~DeferredMonitor() {
#ifdef USE_EPOLL
/* avoid closing the WakeFD twice */
SocketMonitor::Steal();
#else
Cancel(); Cancel();
#endif
} }
EventLoop &GetEventLoop() { EventLoop &GetEventLoop() {
#ifdef USE_EPOLL
return SocketMonitor::GetEventLoop();
#else
return loop; return loop;
#endif
} }
void Schedule(); void Schedule();
@ -55,8 +87,12 @@ protected:
virtual void RunDeferred() = 0; virtual void RunDeferred() = 0;
private: private:
#ifdef USE_EPOLL
virtual bool OnSocketReady(unsigned flags) override final;
#else
void Run(); void Run();
static gboolean Callback(gpointer data); static gboolean Callback(gpointer data);
#endif
}; };
#endif /* MAIN_NOTIFY_H */ #endif /* MAIN_NOTIFY_H */

View File

@ -29,8 +29,13 @@ IdleMonitor::Cancel()
if (!IsActive()) if (!IsActive())
return; return;
#ifdef USE_EPOLL
active = false;
loop.RemoveIdle(*this);
#else
g_source_remove(source_id); g_source_remove(source_id);
source_id = 0; source_id = 0;
#endif
} }
void void
@ -42,19 +47,32 @@ IdleMonitor::Schedule()
/* already scheduled */ /* already scheduled */
return; return;
#ifdef USE_EPOLL
active = true;
loop.AddIdle(*this);
#else
source_id = loop.AddIdle(Callback, this); source_id = loop.AddIdle(Callback, this);
#endif
} }
void void
IdleMonitor::Run() IdleMonitor::Run()
{ {
assert(loop.IsInside()); assert(loop.IsInside());
#ifdef USE_EPOLL
assert(active);
active = false;
#else
assert(source_id != 0); assert(source_id != 0);
source_id = 0; source_id = 0;
#endif
OnIdle(); OnIdle();
} }
#ifndef USE_EPOLL
gboolean gboolean
IdleMonitor::Callback(gpointer data) IdleMonitor::Callback(gpointer data)
{ {
@ -62,3 +80,5 @@ IdleMonitor::Callback(gpointer data)
monitor.Run(); monitor.Run();
return false; return false;
} }
#endif

View File

@ -22,7 +22,9 @@
#include "check.h" #include "check.h"
#ifndef USE_EPOLL
#include <glib.h> #include <glib.h>
#endif
class EventLoop; class EventLoop;
@ -32,13 +34,26 @@ class EventLoop;
* methods must be run from EventLoop's thread. * methods must be run from EventLoop's thread.
*/ */
class IdleMonitor { class IdleMonitor {
#ifdef USE_EPOLL
friend class EventLoop;
#endif
EventLoop &loop; EventLoop &loop;
#ifdef USE_EPOLL
bool active;
#else
guint source_id; guint source_id;
#endif
public: public:
#ifdef USE_EPOLL
IdleMonitor(EventLoop &_loop)
:loop(_loop), active(false) {}
#else
IdleMonitor(EventLoop &_loop) IdleMonitor(EventLoop &_loop)
:loop(_loop), source_id(0) {} :loop(_loop), source_id(0) {}
#endif
~IdleMonitor() { ~IdleMonitor() {
Cancel(); Cancel();
@ -49,7 +64,11 @@ public:
} }
bool IsActive() const { bool IsActive() const {
#ifdef USE_EPOLL
return active;
#else
return source_id != 0; return source_id != 0;
#endif
} }
void Schedule(); void Schedule();
@ -60,7 +79,9 @@ protected:
private: private:
void Run(); void Run();
#ifndef USE_EPOLL
static gboolean Callback(gpointer data); static gboolean Callback(gpointer data);
#endif
}; };
#endif /* MAIN_NOTIFY_H */ #endif /* MAIN_NOTIFY_H */

View File

@ -19,6 +19,89 @@
#include "config.h" #include "config.h"
#include "Loop.hxx" #include "Loop.hxx"
#include "system/clock.h"
#ifdef USE_EPOLL
#include "TimeoutMonitor.hxx"
#include "SocketMonitor.hxx"
#include "IdleMonitor.hxx"
#include <algorithm>
EventLoop::EventLoop(Default)
:SocketMonitor(*this),
now_ms(::monotonic_clock_ms()),
quit(false),
n_events(0)
{
SocketMonitor::Open(wake_fd.Get());
SocketMonitor::Schedule(SocketMonitor::READ);
}
EventLoop::~EventLoop()
{
assert(idle.empty());
assert(timers.empty());
/* avoid closing the WakeFD twice */
SocketMonitor::Steal();
}
void
EventLoop::Break()
{
if (IsInside())
quit = true;
else
AddCall([this]() { Break(); });
}
bool
EventLoop::RemoveFD(int _fd, SocketMonitor &m)
{
for (unsigned i = 0, n = n_events; i < n; ++i)
if (events[i].data.ptr == &m)
events[i].events = 0;
return epoll.Remove(_fd);
}
void
EventLoop::AddIdle(IdleMonitor &i)
{
assert(std::find(idle.begin(), idle.end(), &i) == idle.end());
idle.push_back(&i);
}
void
EventLoop::RemoveIdle(IdleMonitor &i)
{
auto it = std::find(idle.begin(), idle.end(), &i);
assert(it != idle.end());
idle.erase(it);
}
void
EventLoop::AddTimer(TimeoutMonitor &t, unsigned ms)
{
timers.insert(TimerRecord(t, now_ms + ms));
}
void
EventLoop::CancelTimer(TimeoutMonitor &t)
{
for (auto i = timers.begin(), end = timers.end(); i != end; ++i) {
if (&i->timer == &t) {
timers.erase(i);
return;
}
}
}
#endif
void void
EventLoop::Run() EventLoop::Run()
@ -26,11 +109,122 @@ EventLoop::Run()
assert(thread.IsNull()); assert(thread.IsNull());
thread = ThreadId::GetCurrent(); thread = ThreadId::GetCurrent();
#ifdef USE_EPOLL
assert(!quit);
do {
now_ms = ::monotonic_clock_ms();
/* invoke timers */
int timeout_ms;
while (true) {
auto i = timers.begin();
if (i == timers.end()) {
timeout_ms = -1;
break;
}
timeout_ms = i->due_ms - now_ms;
if (timeout_ms > 0)
break;
TimeoutMonitor &m = i->timer;
timers.erase(i);
m.Run();
if (quit)
return;
}
/* invoke idle */
const bool idle_empty = idle.empty();
while (!idle.empty()) {
IdleMonitor &m = *idle.front();
idle.pop_front();
m.Run();
if (quit)
return;
}
if (!idle_empty)
/* re-evaluate timers because one of the
IdleMonitors may have added a new
timeout */
continue;
/* wait for new event */
const int n = epoll.Wait(events, MAX_EVENTS, timeout_ms);
n_events = std::max(n, 0);
now_ms = ::monotonic_clock_ms();
assert(!quit);
/* invoke sockets */
for (int i = 0; i < n; ++i) {
const auto &e = events[i];
if (e.events != 0) {
SocketMonitor &m = *(SocketMonitor *)e.data.ptr;
m.Dispatch(e.events);
if (quit)
break;
}
}
n_events = 0;
} while (!quit);
#else
g_main_loop_run(loop); g_main_loop_run(loop);
#endif
assert(thread.IsInside()); assert(thread.IsInside());
} }
#ifdef USE_EPOLL
void
EventLoop::AddCall(std::function<void()> &&f)
{
mutex.lock();
calls.push_back(f);
mutex.unlock();
wake_fd.Write();
}
bool
EventLoop::OnSocketReady(gcc_unused unsigned flags)
{
assert(!quit);
wake_fd.Read();
mutex.lock();
while (!calls.empty() && !quit) {
auto f = std::move(calls.front());
calls.pop_front();
mutex.unlock();
f();
mutex.lock();
}
mutex.unlock();
return true;
}
#else
guint guint
EventLoop::AddIdle(GSourceFunc function, gpointer data) EventLoop::AddIdle(GSourceFunc function, gpointer data)
{ {
@ -60,3 +254,5 @@ EventLoop::AddTimeoutSeconds(guint interval_s,
g_source_attach(source, GetContext()); g_source_attach(source, GetContext());
return source; return source;
} }
#endif

View File

@ -24,13 +24,76 @@
#include "thread/Id.hxx" #include "thread/Id.hxx"
#include "gcc.h" #include "gcc.h"
#ifdef USE_EPOLL
#include "system/EPollFD.hxx"
#include "thread/Mutex.hxx"
#include "WakeFD.hxx"
#include "SocketMonitor.hxx"
#include <functional>
#include <list>
#include <set>
#else
#include <glib.h> #include <glib.h>
#endif
#ifdef USE_EPOLL
class TimeoutMonitor;
class IdleMonitor;
class SocketMonitor;
#endif
#include <assert.h> #include <assert.h>
class EventLoop { class EventLoop final
#ifdef USE_EPOLL
: private SocketMonitor
#endif
{
#ifdef USE_EPOLL
struct TimerRecord {
/**
* Projected monotonic_clock_ms() value when this
* timer is due.
*/
const unsigned due_ms;
TimeoutMonitor &timer;
constexpr TimerRecord(TimeoutMonitor &_timer,
unsigned _due_ms)
:due_ms(_due_ms), timer(_timer) {}
bool operator<(const TimerRecord &other) const {
return due_ms < other.due_ms;
}
bool IsDue(unsigned _now_ms) const {
return _now_ms >= due_ms;
}
};
EPollFD epoll;
WakeFD wake_fd;
std::multiset<TimerRecord> timers;
std::list<IdleMonitor *> idle;
Mutex mutex;
std::list<std::function<void()>> calls;
unsigned now_ms;
bool quit;
static constexpr unsigned MAX_EVENTS = 16;
unsigned n_events;
epoll_event events[MAX_EVENTS];
#else
GMainContext *context; GMainContext *context;
GMainLoop *loop; GMainLoop *loop;
#endif
/** /**
* A reference to the thread that is currently inside Run(). * A reference to the thread that is currently inside Run().
@ -38,6 +101,43 @@ class EventLoop {
ThreadId thread; ThreadId thread;
public: public:
#ifdef USE_EPOLL
struct Default {};
EventLoop(Default dummy=Default());
~EventLoop();
unsigned GetTimeMS() const {
return now_ms;
}
void Break();
bool AddFD(int _fd, unsigned flags, SocketMonitor &m) {
return epoll.Add(_fd, flags, &m);
}
bool ModifyFD(int _fd, unsigned flags, SocketMonitor &m) {
return epoll.Modify(_fd, flags, &m);
}
bool RemoveFD(int fd, SocketMonitor &m);
void AddIdle(IdleMonitor &i);
void RemoveIdle(IdleMonitor &i);
void AddTimer(TimeoutMonitor &t, unsigned ms);
void CancelTimer(TimeoutMonitor &t);
void AddCall(std::function<void()> &&f);
void Run();
private:
virtual bool OnSocketReady(unsigned flags) override;
public:
#else
EventLoop() EventLoop()
:context(g_main_context_new()), :context(g_main_context_new()),
loop(g_main_loop_new(context, false)), loop(g_main_loop_new(context, false)),
@ -54,16 +154,6 @@ public:
g_main_context_unref(context); g_main_context_unref(context);
} }
/**
* Are we currently running inside this EventLoop's thread?
*/
gcc_pure
bool IsInside() const {
assert(!thread.IsNull());
return thread.IsInside();
}
GMainContext *GetContext() { GMainContext *GetContext() {
return context; return context;
} }
@ -85,6 +175,17 @@ public:
GSource *AddTimeoutSeconds(guint interval_s, GSource *AddTimeoutSeconds(guint interval_s,
GSourceFunc function, gpointer data); GSourceFunc function, gpointer data);
#endif
/**
* Are we currently running inside this EventLoop's thread?
*/
gcc_pure
bool IsInside() const {
assert(!thread.IsNull());
return thread.IsInside();
}
}; };
#endif /* MAIN_NOTIFY_H */ #endif /* MAIN_NOTIFY_H */

View File

@ -25,6 +25,48 @@
#include <assert.h> #include <assert.h>
#ifdef USE_EPOLL
MultiSocketMonitor::MultiSocketMonitor(EventLoop &_loop)
:IdleMonitor(_loop), TimeoutMonitor(_loop), ready(false) {
}
MultiSocketMonitor::~MultiSocketMonitor()
{
// TODO
}
void
MultiSocketMonitor::Prepare()
{
int timeout_ms = PrepareSockets();
if (timeout_ms >= 0)
TimeoutMonitor::Schedule(timeout_ms);
else
TimeoutMonitor::Cancel();
}
void
MultiSocketMonitor::OnIdle()
{
if (ready) {
ready = false;
DispatchSockets();
/* TODO: don't refresh always; require users to call
InvalidateSockets() */
refresh = true;
}
if (refresh) {
refresh = false;
Prepare();
}
}
#else
/** /**
* The vtable for our GSource implementation. Unfortunately, we * The vtable for our GSource implementation. Unfortunately, we
* cannot declare it "const", because g_source_new() takes a non-const * cannot declare it "const", because g_source_new() takes a non-const
@ -117,3 +159,5 @@ MultiSocketMonitor::Dispatch(GSource *_source,
monitor.Dispatch(); monitor.Dispatch();
return true; return true;
} }
#endif

View File

@ -22,10 +22,17 @@
#include "check.h" #include "check.h"
#include "gcc.h" #include "gcc.h"
#include "glib_compat.h"
#ifdef USE_EPOLL
#include "IdleMonitor.hxx"
#include "TimeoutMonitor.hxx"
#include "SocketMonitor.hxx"
#else
#include "glib_compat.h"
#include <glib.h> #include <glib.h>
#endif
#include <forward_list> #include <forward_list>
#include <assert.h> #include <assert.h>
@ -44,7 +51,57 @@ class EventLoop;
/** /**
* Monitor multiple sockets. * Monitor multiple sockets.
*/ */
class MultiSocketMonitor { class MultiSocketMonitor
#ifdef USE_EPOLL
: private IdleMonitor, private TimeoutMonitor
#endif
{
#ifdef USE_EPOLL
class SingleFD final : public SocketMonitor {
MultiSocketMonitor &multi;
unsigned revents;
public:
SingleFD(MultiSocketMonitor &_multi, int _fd, unsigned events)
:SocketMonitor(_fd, _multi.GetEventLoop()),
multi(_multi), revents(0) {
Schedule(events);
}
int GetFD() const {
return SocketMonitor::Get();
}
unsigned GetEvents() const {
return SocketMonitor::GetScheduledFlags();
}
void SetEvents(unsigned _events) {
revents &= _events;
SocketMonitor::Schedule(_events);
}
unsigned GetReturnedEvents() const {
return revents;
}
void ClearReturnedEvents() {
revents = 0;
}
protected:
virtual bool OnSocketReady(unsigned flags) override {
revents = flags;
multi.SetReady();
return true;
}
};
friend class SingleFD;
bool ready, refresh;
#else
struct Source { struct Source {
GSource base; GSource base;
@ -78,34 +135,57 @@ class MultiSocketMonitor {
EventLoop &loop; EventLoop &loop;
Source *source; Source *source;
uint64_t absolute_timeout_us; uint64_t absolute_timeout_us;
#endif
std::forward_list<SingleFD> fds; std::forward_list<SingleFD> fds;
public: public:
#ifdef USE_EPOLL
static constexpr unsigned READ = SocketMonitor::READ;
static constexpr unsigned WRITE = SocketMonitor::WRITE;
static constexpr unsigned ERROR = SocketMonitor::ERROR;
static constexpr unsigned HANGUP = SocketMonitor::HANGUP;
#else
static constexpr unsigned READ = G_IO_IN; static constexpr unsigned READ = G_IO_IN;
static constexpr unsigned WRITE = G_IO_OUT; static constexpr unsigned WRITE = G_IO_OUT;
static constexpr unsigned ERROR = G_IO_ERR; static constexpr unsigned ERROR = G_IO_ERR;
static constexpr unsigned HANGUP = G_IO_HUP; static constexpr unsigned HANGUP = G_IO_HUP;
#endif
MultiSocketMonitor(EventLoop &_loop); MultiSocketMonitor(EventLoop &_loop);
~MultiSocketMonitor(); ~MultiSocketMonitor();
#ifdef USE_EPOLL
using IdleMonitor::GetEventLoop;
#else
EventLoop &GetEventLoop() { EventLoop &GetEventLoop() {
return loop; return loop;
} }
#endif
public:
#ifndef USE_EPOLL
gcc_pure gcc_pure
uint64_t GetTime() const { uint64_t GetTime() const {
return g_source_get_time(&source->base); return g_source_get_time(&source->base);
} }
#endif
void InvalidateSockets() { void InvalidateSockets() {
#ifdef USE_EPOLL
refresh = true;
IdleMonitor::Schedule();
#else
/* no-op because GLib always calls the GSource's /* no-op because GLib always calls the GSource's
"prepare" method before each poll() anyway */ "prepare" method before each poll() anyway */
#endif
} }
void AddSocket(int fd, unsigned events) { void AddSocket(int fd, unsigned events) {
fds.emplace_front(*this, fd, events); fds.emplace_front(*this, fd, events);
#ifndef USE_EPOLL
g_source_add_poll(&source->base, &fds.front().pfd); g_source_add_poll(&source->base, &fds.front().pfd);
#endif
} }
template<typename E> template<typename E>
@ -120,7 +200,11 @@ public:
i->SetEvents(events); i->SetEvents(events);
prev = i; prev = i;
} else { } else {
#ifdef USE_EPOLL
i->Steal();
#else
g_source_remove_poll(&source->base, &i->pfd); g_source_remove_poll(&source->base, &i->pfd);
#endif
fds.erase_after(prev); fds.erase_after(prev);
} }
} }
@ -133,6 +217,23 @@ protected:
virtual int PrepareSockets() = 0; virtual int PrepareSockets() = 0;
virtual void DispatchSockets() = 0; virtual void DispatchSockets() = 0;
#ifdef USE_EPOLL
private:
void SetReady() {
ready = true;
IdleMonitor::Schedule();
}
void Prepare();
virtual void OnTimeout() final {
SetReady();
IdleMonitor::Schedule();
}
virtual void OnIdle() final;
#else
public: public:
/* GSource callbacks */ /* GSource callbacks */
static gboolean Prepare(GSource *source, gint *timeout_r); static gboolean Prepare(GSource *source, gint *timeout_r);
@ -147,6 +248,7 @@ private:
void Dispatch() { void Dispatch() {
DispatchSockets(); DispatchSockets();
} }
#endif
}; };
#endif #endif

View File

@ -32,6 +32,19 @@
#include <sys/socket.h> #include <sys/socket.h>
#endif #endif
#ifdef USE_EPOLL
void
SocketMonitor::Dispatch(unsigned flags)
{
flags &= GetScheduledFlags();
if (flags != 0 && !OnSocketReady(flags) && IsDefined())
Cancel();
}
#else
/* /*
* GSource methods * GSource methods
* *
@ -88,6 +101,8 @@ SocketMonitor::SocketMonitor(int _fd, EventLoop &_loop)
Open(_fd); Open(_fd);
} }
#endif
SocketMonitor::~SocketMonitor() SocketMonitor::~SocketMonitor()
{ {
if (IsDefined()) if (IsDefined())
@ -98,10 +113,14 @@ void
SocketMonitor::Open(int _fd) SocketMonitor::Open(int _fd)
{ {
assert(fd < 0); assert(fd < 0);
#ifndef USE_EPOLL
assert(source == nullptr); assert(source == nullptr);
#endif
assert(_fd >= 0); assert(_fd >= 0);
fd = _fd; fd = _fd;
#ifndef USE_EPOLL
poll = {fd, 0, 0}; poll = {fd, 0, 0};
source = (Source *)g_source_new(&socket_monitor_source_funcs, source = (Source *)g_source_new(&socket_monitor_source_funcs,
@ -110,6 +129,7 @@ SocketMonitor::Open(int _fd)
g_source_attach(&source->base, loop.GetContext()); g_source_attach(&source->base, loop.GetContext());
g_source_add_poll(&source->base, &poll); g_source_add_poll(&source->base, &poll);
#endif
} }
int int
@ -122,9 +142,11 @@ SocketMonitor::Steal()
int result = fd; int result = fd;
fd = -1; fd = -1;
#ifndef USE_EPOLL
g_source_destroy(&source->base); g_source_destroy(&source->base);
g_source_unref(&source->base); g_source_unref(&source->base);
source = nullptr; source = nullptr;
#endif
return result; return result;
} }
@ -143,10 +165,21 @@ SocketMonitor::Schedule(unsigned flags)
if (flags == GetScheduledFlags()) if (flags == GetScheduledFlags())
return; return;
#ifdef USE_EPOLL
if (scheduled_flags == 0)
loop.AddFD(fd, flags, *this);
else if (flags == 0)
loop.RemoveFD(fd, *this);
else
loop.ModifyFD(fd, flags, *this);
scheduled_flags = flags;
#else
poll.events = flags; poll.events = flags;
poll.revents &= flags; poll.revents &= flags;
loop.WakeUp(); loop.WakeUp();
#endif
} }
SocketMonitor::ssize_t SocketMonitor::ssize_t

View File

@ -22,7 +22,11 @@
#include "check.h" #include "check.h"
#ifdef USE_EPOLL
#include <sys/epoll.h>
#else
#include <glib.h> #include <glib.h>
#endif
#include <type_traits> #include <type_traits>
@ -40,29 +44,55 @@
class EventLoop; class EventLoop;
class SocketMonitor { class SocketMonitor {
#ifdef USE_EPOLL
#else
struct Source { struct Source {
GSource base; GSource base;
SocketMonitor *monitor; SocketMonitor *monitor;
}; };
#endif
int fd; int fd;
EventLoop &loop; EventLoop &loop;
#ifdef USE_EPOLL
/**
* A bit mask of events that is currently registered in the EventLoop.
*/
unsigned scheduled_flags;
#else
Source *source; Source *source;
GPollFD poll; GPollFD poll;
#endif
public: public:
#ifdef USE_EPOLL
static constexpr unsigned READ = EPOLLIN;
static constexpr unsigned WRITE = EPOLLOUT;
static constexpr unsigned ERROR = EPOLLERR;
static constexpr unsigned HANGUP = EPOLLHUP;
#else
static constexpr unsigned READ = G_IO_IN; static constexpr unsigned READ = G_IO_IN;
static constexpr unsigned WRITE = G_IO_OUT; static constexpr unsigned WRITE = G_IO_OUT;
static constexpr unsigned ERROR = G_IO_ERR; static constexpr unsigned ERROR = G_IO_ERR;
static constexpr unsigned HANGUP = G_IO_HUP; static constexpr unsigned HANGUP = G_IO_HUP;
#endif
typedef std::make_signed<size_t>::type ssize_t; typedef std::make_signed<size_t>::type ssize_t;
#ifdef USE_EPOLL
SocketMonitor(EventLoop &_loop)
:fd(-1), loop(_loop), scheduled_flags(0) {}
SocketMonitor(int _fd, EventLoop &_loop)
:fd(_fd), loop(_loop), scheduled_flags(0) {}
#else
SocketMonitor(EventLoop &_loop) SocketMonitor(EventLoop &_loop)
:fd(-1), loop(_loop), source(nullptr) {} :fd(-1), loop(_loop), source(nullptr) {}
SocketMonitor(int _fd, EventLoop &_loop); SocketMonitor(int _fd, EventLoop &_loop);
#endif
~SocketMonitor(); ~SocketMonitor();
@ -93,7 +123,11 @@ public:
unsigned GetScheduledFlags() const { unsigned GetScheduledFlags() const {
assert(IsDefined()); assert(IsDefined());
#ifdef USE_EPOLL
return scheduled_flags;
#else
return poll.events; return poll.events;
#endif
} }
void Schedule(unsigned flags); void Schedule(unsigned flags);
@ -128,6 +162,9 @@ protected:
virtual bool OnSocketReady(unsigned flags) = 0; virtual bool OnSocketReady(unsigned flags) = 0;
public: public:
#ifdef USE_EPOLL
void Dispatch(unsigned flags);
#else
/* GSource callbacks */ /* GSource callbacks */
static gboolean Prepare(GSource *source, gint *timeout_r); static gboolean Prepare(GSource *source, gint *timeout_r);
static gboolean Check(GSource *source); static gboolean Check(GSource *source);
@ -146,6 +183,7 @@ private:
OnSocketReady(poll.revents & poll.events); OnSocketReady(poll.revents & poll.events);
} }
#endif
}; };
#endif #endif

View File

@ -24,10 +24,15 @@
void void
TimeoutMonitor::Cancel() TimeoutMonitor::Cancel()
{ {
if (source != nullptr) { if (IsActive()) {
#ifdef USE_EPOLL
active = false;
loop.CancelTimer(*this);
#else
g_source_destroy(source); g_source_destroy(source);
g_source_unref(source); g_source_unref(source);
source = nullptr; source = nullptr;
#endif
} }
} }
@ -35,23 +40,39 @@ void
TimeoutMonitor::Schedule(unsigned ms) TimeoutMonitor::Schedule(unsigned ms)
{ {
Cancel(); Cancel();
#ifdef USE_EPOLL
active = true;
loop.AddTimer(*this, ms);
#else
source = loop.AddTimeout(ms, Callback, this); source = loop.AddTimeout(ms, Callback, this);
#endif
} }
void void
TimeoutMonitor::ScheduleSeconds(unsigned s) TimeoutMonitor::ScheduleSeconds(unsigned s)
{ {
Cancel(); Cancel();
#ifdef USE_EPOLL
Schedule(s * 1000u);
#else
source = loop.AddTimeoutSeconds(s, Callback, this); source = loop.AddTimeoutSeconds(s, Callback, this);
#endif
} }
void void
TimeoutMonitor::Run() TimeoutMonitor::Run()
{ {
#ifndef USE_EPOLL
Cancel(); Cancel();
#endif
OnTimeout(); OnTimeout();
} }
#ifndef USE_EPOLL
gboolean gboolean
TimeoutMonitor::Callback(gpointer data) TimeoutMonitor::Callback(gpointer data)
{ {
@ -59,3 +80,5 @@ TimeoutMonitor::Callback(gpointer data)
monitor.Run(); monitor.Run();
return false; return false;
} }
#endif

View File

@ -22,17 +22,34 @@
#include "check.h" #include "check.h"
#ifndef USE_EPOLL
#include <glib.h> #include <glib.h>
#endif
class EventLoop; class EventLoop;
class TimeoutMonitor { class TimeoutMonitor {
#ifdef USE_EPOLL
friend class EventLoop;
#endif
EventLoop &loop; EventLoop &loop;
#ifdef USE_EPOLL
bool active;
#else
GSource *source; GSource *source;
#endif
public: public:
#ifdef USE_EPOLL
TimeoutMonitor(EventLoop &_loop)
:loop(_loop), active(false) {
}
#else
TimeoutMonitor(EventLoop &_loop) TimeoutMonitor(EventLoop &_loop)
:loop(_loop), source(nullptr) {} :loop(_loop), source(nullptr) {}
#endif
~TimeoutMonitor() { ~TimeoutMonitor() {
Cancel(); Cancel();
@ -43,7 +60,11 @@ public:
} }
bool IsActive() const { bool IsActive() const {
#ifdef USE_EPOLL
return active;
#else
return source != nullptr; return source != nullptr;
#endif
} }
void Schedule(unsigned ms); void Schedule(unsigned ms);
@ -55,7 +76,10 @@ protected:
private: private:
void Run(); void Run();
#ifndef USE_EPOLL
static gboolean Callback(gpointer data); static gboolean Callback(gpointer data);
#endif
}; };
#endif /* MAIN_NOTIFY_H */ #endif /* MAIN_NOTIFY_H */