event/SocketMonitor: add ready_flags

By making each SocketMonitor keep track of its ready flags, this
removes a lot of overhead from EventLoop::RemoveFD().
This commit is contained in:
Max Kellermann 2020-10-08 20:53:43 +02:00
parent 41c0bbab13
commit 41bc17a27f
6 changed files with 54 additions and 43 deletions

View File

@ -87,20 +87,18 @@ EventLoop::Break() noexcept
} }
bool bool
EventLoop::Abandon(int _fd, SocketMonitor &m) noexcept EventLoop::Abandon(int _fd) noexcept
{ {
assert(!IsAlive() || IsInside()); assert(!IsAlive() || IsInside());
poll_result.Clear(&m);
return poll_group.Abandon(_fd); return poll_group.Abandon(_fd);
} }
bool bool
EventLoop::RemoveFD(int _fd, SocketMonitor &m) noexcept EventLoop::RemoveFD(int _fd) noexcept
{ {
assert(!IsAlive() || IsInside()); assert(!IsAlive() || IsInside());
poll_result.Clear(&m);
return poll_group.Remove(_fd); return poll_group.Remove(_fd);
} }
@ -230,8 +228,16 @@ EventLoop::Run() noexcept
/* wait for new event */ /* wait for new event */
PollResult poll_result;
poll_group.ReadEvents(poll_result, ExportTimeoutMS(timeout)); poll_group.ReadEvents(poll_result, ExportTimeoutMS(timeout));
ready_sockets.clear();
for (size_t i = 0; i < poll_result.GetSize(); ++i) {
auto &sm = *(SocketMonitor *)poll_result.GetObject(i);
sm.SetReadyFlags(poll_result.GetEvents(i));
ready_sockets.push_back(sm);
}
now = std::chrono::steady_clock::now(); now = std::chrono::steady_clock::now();
{ {
@ -240,19 +246,12 @@ EventLoop::Run() noexcept
} }
/* invoke sockets */ /* invoke sockets */
for (size_t i = 0; i < poll_result.GetSize(); ++i) { while (!ready_sockets.empty() && !quit) {
auto events = poll_result.GetEvents(i); auto &sm = ready_sockets.front();
if (events != 0) { ready_sockets.pop_front();
if (quit)
break;
auto m = (SocketMonitor *)poll_result.GetObject(i); sm.Dispatch();
m->Dispatch(events);
} }
}
poll_result.Reset();
} while (!quit); } while (!quit);
#ifndef NDEBUG #ifndef NDEBUG

View File

@ -88,6 +88,19 @@ class EventLoop final : SocketMonitor
boost::intrusive::constant_time_size<false>>; boost::intrusive::constant_time_size<false>>;
DeferredList deferred; DeferredList deferred;
using ReadySocketList =
boost::intrusive::list<SocketMonitor,
boost::intrusive::member_hook<SocketMonitor,
SocketMonitor::ReadyListHook,
&SocketMonitor::ready_siblings>,
boost::intrusive::constant_time_size<false>>;
/**
* A linked list of #SocketMonitor instances which have a
* non-zero "ready_flags" field, and need to be dispatched.
*/
ReadySocketList ready_sockets;
#ifdef HAVE_URING #ifdef HAVE_URING
std::unique_ptr<Uring::Manager> uring; std::unique_ptr<Uring::Manager> uring;
#endif #endif
@ -123,7 +136,6 @@ class EventLoop final : SocketMonitor
#endif #endif
PollGroup poll_group; PollGroup poll_group;
PollResult poll_result;
/** /**
* A reference to the thread that is currently inside Run(). * A reference to the thread that is currently inside Run().
@ -178,9 +190,9 @@ public:
* has been closed. This is like RemoveFD(), but does not * has been closed. This is like RemoveFD(), but does not
* attempt to use #EPOLL_CTL_DEL. * attempt to use #EPOLL_CTL_DEL.
*/ */
bool Abandon(int fd, SocketMonitor &m) noexcept; bool Abandon(int fd) noexcept;
bool RemoveFD(int fd, SocketMonitor &m) noexcept; bool RemoveFD(int fd) noexcept;
void AddIdle(IdleMonitor &i) noexcept; void AddIdle(IdleMonitor &i) noexcept;
void RemoveIdle(IdleMonitor &i) noexcept; void RemoveIdle(IdleMonitor &i) noexcept;

View File

@ -45,16 +45,6 @@ public:
void *GetObject(size_t i) const noexcept { void *GetObject(size_t i) const noexcept {
return events[i].data.ptr; return events[i].data.ptr;
} }
void Reset() noexcept {
n_events = 0;
}
void Clear(void *obj) noexcept {
for (size_t i = 0; i < n_events; ++i)
if (events[i].data.ptr == obj)
events[i].events = 0;
}
}; };
class PollGroupEpoll class PollGroupEpoll

View File

@ -57,16 +57,6 @@ public:
return items[i].obj; return items[i].obj;
} }
void Reset() noexcept {
items.clear();
}
void Clear(void *obj) noexcept {
for (auto i = items.begin(); i != items.end(); ++i)
if (i->obj == obj)
i->events = 0;
}
void Add(unsigned events, void *obj) noexcept { void Add(unsigned events, void *obj) noexcept {
items.emplace_back(events, obj); items.emplace_back(events, obj);
} }

View File

@ -28,9 +28,10 @@
#endif #endif
void void
SocketMonitor::Dispatch(unsigned flags) noexcept SocketMonitor::Dispatch() noexcept
{ {
flags &= GetScheduledFlags() | IMPLICIT_FLAGS; const unsigned flags = std::exchange(ready_flags, 0) &
(GetScheduledFlags() | IMPLICIT_FLAGS);
if (flags != 0) if (flags != 0)
OnSocketReady(flags); OnSocketReady(flags);
@ -79,7 +80,7 @@ SocketMonitor::Schedule(unsigned flags) noexcept
if (scheduled_flags == 0) if (scheduled_flags == 0)
success = loop.AddFD(fd.Get(), flags, *this); success = loop.AddFD(fd.Get(), flags, *this);
else if (flags == 0) else if (flags == 0)
success = loop.RemoveFD(fd.Get(), *this); success = loop.RemoveFD(fd.Get());
else else
success = loop.ModifyFD(fd.Get(), flags, *this); success = loop.ModifyFD(fd.Get(), flags, *this);

View File

@ -23,6 +23,8 @@
#include "PollGroup.hxx" #include "PollGroup.hxx"
#include "net/SocketDescriptor.hxx" #include "net/SocketDescriptor.hxx"
#include <boost/intrusive/list_hook.hpp>
#include <cassert> #include <cassert>
#include <cstddef> #include <cstddef>
#include <type_traits> #include <type_traits>
@ -43,14 +45,27 @@ class EventLoop;
* as thread-safe. * as thread-safe.
*/ */
class SocketMonitor { class SocketMonitor {
friend class EventLoop;
using ReadyListHook = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
ReadyListHook ready_siblings;
SocketDescriptor fd = SocketDescriptor::Undefined(); SocketDescriptor fd = SocketDescriptor::Undefined();
EventLoop &loop; EventLoop &loop;
/** /**
* A bit mask of events that is currently registered in the EventLoop. * A bit mask of events that is currently registered in the
* #EventLoop.
*/ */
unsigned scheduled_flags = 0; unsigned scheduled_flags = 0;
/**
* A bit mask of events which have been reported as "ready" by
* epoll_wait(). If non-zero, then the #EventLoop will call
* Dispatch() soon.
*/
unsigned ready_flags = 0;
public: public:
static constexpr unsigned READ = PollGroup::READ; static constexpr unsigned READ = PollGroup::READ;
static constexpr unsigned WRITE = PollGroup::WRITE; static constexpr unsigned WRITE = PollGroup::WRITE;
@ -103,6 +118,10 @@ public:
return scheduled_flags; return scheduled_flags;
} }
void SetReadyFlags(unsigned flags) noexcept {
ready_flags = flags;
}
/** /**
* @return true on success, false on error (with errno set if * @return true on success, false on error (with errno set if
* USE_EPOLL is defined) * USE_EPOLL is defined)
@ -136,7 +155,7 @@ protected:
virtual bool OnSocketReady(unsigned flags) noexcept = 0; virtual bool OnSocketReady(unsigned flags) noexcept = 0;
public: public:
void Dispatch(unsigned flags) noexcept; void Dispatch() noexcept;
}; };
#endif #endif