event/Loop: use io_uring_prep_poll_multishot() on the epoll fd
This wraps epoll and io_uring the other way: previously, we had the io_uring file descriptor registered in epoll and when it became ready, we could query its completion ring. The problem is that the poll wakeups cause considerable overhead in the Linux kernel, see https://lore.kernel.org/io-uring/20250128133927.3989681-9-max.kellermann@ionos.com/ By wrapping epoll inside io_uring using io_uring_prep_poll_multishot(), the "outer" loop is io_uring and inside it, we have epoll. This adds another system call for epoll, but that will go away as soon as most operations are going through io_uring. Previously, io_uring had this extra system call.
This commit is contained in:

committed by
Max Kellermann

parent
bca9e3e347
commit
624da8ce5b
@@ -38,6 +38,10 @@ class EpollBackend
|
||||
public:
|
||||
EpollBackend() = default;
|
||||
|
||||
FileDescriptor GetFileDescriptor() const noexcept {
|
||||
return epoll.GetFileDescriptor();
|
||||
}
|
||||
|
||||
auto ReadEvents(int timeout_ms) noexcept {
|
||||
EpollBackendResult result;
|
||||
int ret = epoll.Wait(result.events.data(), result.events.size(),
|
||||
|
@@ -13,6 +13,8 @@
|
||||
|
||||
#ifdef HAVE_URING
|
||||
#include "uring/Manager.hxx"
|
||||
#include "io/uring/Operation.hxx"
|
||||
#include "io/uring/Queue.hxx"
|
||||
#endif
|
||||
|
||||
EventLoop::EventLoop(
|
||||
@@ -38,6 +40,7 @@ EventLoop::~EventLoop() noexcept
|
||||
/* if Run() was never called (maybe because startup failed and
|
||||
an exception is pending), we need to destruct the
|
||||
Uring::Manager here or else the assertions below fail */
|
||||
uring_poll.reset();
|
||||
uring.reset();
|
||||
#endif
|
||||
|
||||
@@ -54,20 +57,46 @@ EventLoop::~EventLoop() noexcept
|
||||
void
|
||||
EventLoop::SetVolatile() noexcept
|
||||
{
|
||||
#ifdef HAVE_URING
|
||||
if (uring)
|
||||
uring->SetVolatile();
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef HAVE_URING
|
||||
|
||||
class EventLoop::UringPoll final : Uring::Operation {
|
||||
EventLoop &event_loop;
|
||||
|
||||
public:
|
||||
UringPoll(EventLoop &_event_loop) noexcept
|
||||
:event_loop(_event_loop) {}
|
||||
|
||||
void Start();
|
||||
|
||||
private:
|
||||
void OnUringCompletion(int res) noexcept override {
|
||||
(void)res; // TODO
|
||||
|
||||
event_loop.epoll_ready = true;
|
||||
}
|
||||
};
|
||||
|
||||
inline void
|
||||
EventLoop::UringPoll::Start()
|
||||
{
|
||||
assert(!IsUringPending());
|
||||
assert(event_loop.GetUring());
|
||||
|
||||
auto &queue = *event_loop.GetUring();
|
||||
|
||||
auto &s = queue.RequireSubmitEntry();
|
||||
io_uring_prep_poll_multishot(&s, event_loop.poll_backend.GetFileDescriptor().Get(), EPOLLIN);
|
||||
queue.Push(s, *this);
|
||||
}
|
||||
|
||||
void
|
||||
EventLoop::EnableUring(unsigned entries, unsigned flags)
|
||||
{
|
||||
assert(!uring);
|
||||
|
||||
uring = std::make_unique<Uring::Manager>(*this, entries, flags);
|
||||
uring = std::make_unique<Uring::Manager>(entries, flags);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -75,12 +104,13 @@ EventLoop::EnableUring(unsigned entries, struct io_uring_params ¶ms)
|
||||
{
|
||||
assert(!uring);
|
||||
|
||||
uring = std::make_unique<Uring::Manager>(*this, entries, params);
|
||||
uring = std::make_unique<Uring::Manager>(entries, params);
|
||||
}
|
||||
|
||||
void
|
||||
EventLoop::DisableUring() noexcept
|
||||
{
|
||||
uring_poll.reset();
|
||||
uring.reset();
|
||||
}
|
||||
|
||||
@@ -265,6 +295,29 @@ ExportTimeoutMS(Event::Duration timeout) noexcept
|
||||
: -1;
|
||||
}
|
||||
|
||||
#ifdef HAVE_URING
|
||||
|
||||
static constexpr struct __kernel_timespec
|
||||
ExportTimeoutKernelTimespec(Event::Duration timeout) noexcept
|
||||
{
|
||||
if (timeout < timeout.zero())
|
||||
// TODO if there is no time, there should be no timeout at all
|
||||
return { .tv_sec = 24 * 3600 };
|
||||
|
||||
if (timeout >= std::chrono::duration_cast<Event::Duration>(std::chrono::hours{24})) [[unlikely]]
|
||||
return {
|
||||
.tv_sec = std::chrono::ceil<std::chrono::duration<__kernel_time64_t>>(timeout).count(),
|
||||
};
|
||||
|
||||
const auto nsec = std::chrono::ceil<std::chrono::nanoseconds>(timeout);
|
||||
return {
|
||||
.tv_sec = nsec.count() / 1000000000,
|
||||
.tv_nsec = nsec.count() % 1000000000,
|
||||
};
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
inline bool
|
||||
EventLoop::Wait(Event::Duration timeout) noexcept
|
||||
{
|
||||
@@ -350,7 +403,39 @@ EventLoop::Run() noexcept
|
||||
if (!next.empty())
|
||||
timeout = Event::Duration{0};
|
||||
|
||||
Wait(timeout);
|
||||
#ifdef HAVE_URING
|
||||
if (uring) {
|
||||
/* use io_uring_enter() and invoke
|
||||
epoll_wait() only if it's reported
|
||||
to be ready */
|
||||
|
||||
if (!uring_poll) [[unlikely]] {
|
||||
/* start polling on the epoll
|
||||
file descriptor */
|
||||
uring_poll = std::make_unique<UringPoll>(*this);
|
||||
uring_poll->Start();
|
||||
}
|
||||
|
||||
/* repeat epoll_wait() until it
|
||||
returns no more events; this is a
|
||||
temporary workaround because
|
||||
io_uring_prep_poll_multishot() is
|
||||
edge-triggered, so we have to
|
||||
consume all events to rearm it */
|
||||
|
||||
if (!epoll_ready) {
|
||||
auto kernel_timeout = ExportTimeoutKernelTimespec(timeout);
|
||||
Uring::Queue &uring_queue = *uring;
|
||||
uring_queue.SubmitAndWaitDispatchCompletions(kernel_timeout);
|
||||
}
|
||||
|
||||
if (epoll_ready) {
|
||||
/* invoke epoll_wait() */
|
||||
epoll_ready = Wait(Event::Duration{0});
|
||||
}
|
||||
} else
|
||||
#endif
|
||||
Wait(timeout);
|
||||
|
||||
idle.splice(std::next(idle.begin()), next);
|
||||
|
||||
|
@@ -97,7 +97,14 @@ class EventLoop final
|
||||
|
||||
#ifdef HAVE_URING
|
||||
std::unique_ptr<Uring::Manager> uring;
|
||||
#endif
|
||||
|
||||
/**
|
||||
* This class handles IORING_POLL_ADD_MULTI on the epoll file
|
||||
* descriptor and sets #epoll_ready.
|
||||
*/
|
||||
class UringPoll;
|
||||
std::unique_ptr<UringPoll> uring_poll;
|
||||
#endif // HAVE_URING
|
||||
|
||||
#ifdef HAVE_THREADED_EVENT_LOOP
|
||||
/**
|
||||
@@ -122,6 +129,14 @@ class EventLoop final
|
||||
*/
|
||||
bool again;
|
||||
|
||||
#ifdef HAVE_URING
|
||||
/**
|
||||
* Set by #UringPoll to signal that we should invoke
|
||||
* epoll_wait().
|
||||
*/
|
||||
bool epoll_ready = false;
|
||||
#endif // HAVE_URING
|
||||
|
||||
#ifdef HAVE_THREADED_EVENT_LOOP
|
||||
bool quit_injected = false;
|
||||
|
||||
|
@@ -7,10 +7,6 @@ configure_file(output: 'Features.h', configuration: event_features)
|
||||
|
||||
event_sources = []
|
||||
|
||||
if uring_dep.found()
|
||||
event_sources += 'uring/Manager.cxx'
|
||||
endif
|
||||
|
||||
if is_windows
|
||||
event_sources += 'WinSelectBackend.cxx'
|
||||
elif is_linux and get_option('epoll')
|
||||
|
@@ -1,62 +0,0 @@
|
||||
// SPDX-License-Identifier: BSD-2-Clause
|
||||
// Copyright CM4all GmbH
|
||||
// author: Max Kellermann <mk@cm4all.com>
|
||||
|
||||
#include "Manager.hxx"
|
||||
#include "util/PrintException.hxx"
|
||||
|
||||
namespace Uring {
|
||||
|
||||
Manager::Manager(EventLoop &event_loop,
|
||||
unsigned entries, unsigned flags)
|
||||
:Queue(entries, flags),
|
||||
event(event_loop, BIND_THIS_METHOD(OnReady), GetFileDescriptor())
|
||||
{
|
||||
event.ScheduleRead();
|
||||
}
|
||||
|
||||
Manager::Manager(EventLoop &event_loop,
|
||||
unsigned entries, struct io_uring_params ¶ms)
|
||||
:Queue(entries, params),
|
||||
event(event_loop, BIND_THIS_METHOD(OnReady), GetFileDescriptor())
|
||||
{
|
||||
event.ScheduleRead();
|
||||
}
|
||||
|
||||
void
|
||||
Manager::Submit()
|
||||
{
|
||||
/* defer in "idle" mode to allow accumulation of more
|
||||
events */
|
||||
defer_submit_event.ScheduleIdle();
|
||||
}
|
||||
|
||||
void
|
||||
Manager::DispatchCompletions() noexcept
|
||||
{
|
||||
try {
|
||||
Queue::DispatchCompletions();
|
||||
} catch (...) {
|
||||
PrintException(std::current_exception());
|
||||
}
|
||||
|
||||
CheckVolatileEvent();
|
||||
}
|
||||
|
||||
inline void
|
||||
Manager::OnReady(unsigned) noexcept
|
||||
{
|
||||
DispatchCompletions();
|
||||
}
|
||||
|
||||
void
|
||||
Manager::DeferredSubmit() noexcept
|
||||
{
|
||||
try {
|
||||
Queue::Submit();
|
||||
} catch (...) {
|
||||
PrintException(std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace Uring
|
@@ -5,49 +5,17 @@
|
||||
#pragma once
|
||||
|
||||
#include "io/uring/Queue.hxx"
|
||||
#include "event/DeferEvent.hxx"
|
||||
#include "event/PipeEvent.hxx"
|
||||
|
||||
namespace Uring {
|
||||
|
||||
class Manager final : public Queue {
|
||||
PipeEvent event;
|
||||
|
||||
/**
|
||||
* Responsible for invoking Queue::Submit() only once per
|
||||
* #EventLoop iteration.
|
||||
*/
|
||||
DeferEvent defer_submit_event{GetEventLoop(), BIND_THIS_METHOD(DeferredSubmit)};
|
||||
|
||||
bool volatile_event = false;
|
||||
|
||||
public:
|
||||
explicit Manager(EventLoop &event_loop,
|
||||
unsigned entries=1024, unsigned flags=0);
|
||||
explicit Manager(EventLoop &event_loop,
|
||||
unsigned entries, struct io_uring_params ¶ms);
|
||||
|
||||
EventLoop &GetEventLoop() const noexcept {
|
||||
return event.GetEventLoop();
|
||||
}
|
||||
|
||||
void SetVolatile() noexcept {
|
||||
volatile_event = true;
|
||||
CheckVolatileEvent();
|
||||
}
|
||||
using Queue::Queue;
|
||||
|
||||
// virtual methods from class Uring::Queue
|
||||
void Submit() override;
|
||||
|
||||
private:
|
||||
void CheckVolatileEvent() noexcept {
|
||||
if (volatile_event && !HasPending())
|
||||
event.Cancel();
|
||||
void Submit() override {
|
||||
/* this will be done by EventLoop::Run() */
|
||||
}
|
||||
|
||||
void DispatchCompletions() noexcept;
|
||||
void OnReady(unsigned events) noexcept;
|
||||
void DeferredSubmit() noexcept;
|
||||
};
|
||||
|
||||
} // namespace Uring
|
||||
|
Reference in New Issue
Block a user