event/TimerWheel: optimized container for CoarseTimerEvent

This commit is contained in:
Max Kellermann 2021-02-04 23:01:17 +01:00 committed by Max Kellermann
parent d1957b83c8
commit 5a16e3ffa3
7 changed files with 409 additions and 4 deletions

View File

@ -0,0 +1,59 @@
/*
* Copyright 2007-2021 CM4all GmbH
* All rights reserved.
*
* author: Max Kellermann <mk@cm4all.com>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "CoarseTimerEvent.hxx"
#include "Loop.hxx"
void
CoarseTimerEvent::Schedule(Event::Duration d) noexcept
{
Cancel();
due = loop.SteadyNow() + d;
loop.Insert(*this);
}
void
CoarseTimerEvent::ScheduleEarlier(Event::Duration d) noexcept
{
const auto new_due = loop.SteadyNow() + d;
if (IsPending()) {
if (new_due >= due)
return;
Cancel();
}
due = new_due;
loop.Insert(*this);
}

View File

@ -32,7 +32,11 @@
#pragma once #pragma once
#include "TimerEvent.hxx" #include "Chrono.hxx"
#include "util/BindMethod.hxx"
#include "util/IntrusiveList.hxx"
class EventLoop;
/** /**
* This class invokes a callback function after a certain amount of * This class invokes a callback function after a certain amount of
@ -47,5 +51,53 @@
* thread that runs the #EventLoop, except where explicitly documented * thread that runs the #EventLoop, except where explicitly documented
* as thread-safe. * as thread-safe.
*/ */
using CoarseTimerEvent = TimerEvent; class CoarseTimerEvent final : AutoUnlinkIntrusiveListHook
// TODO: implement {
friend class TimerWheel;
friend class IntrusiveList<CoarseTimerEvent>;
EventLoop &loop;
using Callback = BoundMethod<void() noexcept>;
const Callback callback;
/**
* When is this timer due? This is only valid if IsPending()
* returns true.
*/
Event::TimePoint due;
public:
CoarseTimerEvent(EventLoop &_loop, Callback _callback) noexcept
:loop(_loop), callback(_callback) {}
auto &GetEventLoop() const noexcept {
return loop;
}
constexpr auto GetDue() const noexcept {
return due;
}
bool IsPending() const noexcept {
return is_linked();
}
void Schedule(Event::Duration d) noexcept;
/**
* Like Schedule(), but is a no-op if there is a due time
* earlier than the given one.
*/
void ScheduleEarlier(Event::Duration d) noexcept;
void Cancel() noexcept {
if (IsPending())
unlink();
}
private:
void Run() noexcept {
callback();
}
};

View File

@ -142,6 +142,13 @@ EventLoop::AbandonFD(SocketEvent &event) noexcept
return poll_backend.Abandon(event.GetSocket().Get()); return poll_backend.Abandon(event.GetSocket().Get());
} }
void
EventLoop::Insert(CoarseTimerEvent &t) noexcept
{
coarse_timers.Insert(t);
again = true;
}
void void
EventLoop::Insert(FineTimerEvent &t) noexcept EventLoop::Insert(FineTimerEvent &t) noexcept
{ {
@ -154,7 +161,15 @@ EventLoop::Insert(FineTimerEvent &t) noexcept
inline Event::Duration inline Event::Duration
EventLoop::HandleTimers() noexcept EventLoop::HandleTimers() noexcept
{ {
return timers.Run(SteadyNow()); const auto now = SteadyNow();
auto fine_timeout = timers.Run(now);
auto coarse_timeout = coarse_timers.Run(now);
return fine_timeout.count() < 0 ||
(coarse_timeout.count() >= 0 && coarse_timeout < fine_timeout)
? coarse_timeout
: fine_timeout;
} }
void void

View File

@ -21,6 +21,7 @@
#define EVENT_LOOP_HXX #define EVENT_LOOP_HXX
#include "Chrono.hxx" #include "Chrono.hxx"
#include "TimerWheel.hxx"
#include "TimerList.hxx" #include "TimerList.hxx"
#include "Backend.hxx" #include "Backend.hxx"
#include "SocketEvent.hxx" #include "SocketEvent.hxx"
@ -65,6 +66,7 @@ class EventLoop final
SocketEvent wake_event{*this, BIND_THIS_METHOD(OnSocketReady), wake_fd.GetSocket()}; SocketEvent wake_event{*this, BIND_THIS_METHOD(OnSocketReady), wake_fd.GetSocket()};
#endif #endif
TimerWheel coarse_timers;
TimerList timers; TimerList timers;
using DeferList = IntrusiveList<DeferEvent>; using DeferList = IntrusiveList<DeferEvent>;
@ -204,6 +206,7 @@ public:
*/ */
bool AbandonFD(SocketEvent &event) noexcept; bool AbandonFD(SocketEvent &event) noexcept;
void Insert(CoarseTimerEvent &t) noexcept;
void Insert(FineTimerEvent &t) noexcept; void Insert(FineTimerEvent &t) noexcept;
/** /**

148
src/event/TimerWheel.cxx Normal file
View File

@ -0,0 +1,148 @@
/*
* Copyright 2007-2021 CM4all GmbH
* All rights reserved.
*
* author: Max Kellermann <mk@cm4all.com>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "TimerWheel.hxx"
#include "CoarseTimerEvent.hxx"
#include <cassert>
TimerWheel::TimerWheel() noexcept = default;
TimerWheel::~TimerWheel() noexcept = default;
void
TimerWheel::Insert(CoarseTimerEvent &t) noexcept
{
/* if this timer's due time is already in the past, don't
insert it into an older bucket because Run() won't look at
it in this iteration */
const auto due = std::max(t.GetDue(), last_time);
buckets[BucketIndexAt(due)].push_back(t);
}
void
TimerWheel::Run(List &list, Event::TimePoint now) noexcept
{
/* move all timers to a temporary list to avoid problems with
canceled timers while we traverse the list */
auto tmp = std::move(list);
tmp.clear_and_dispose([&](auto *t){
if (t->GetDue() <= now) {
/* this timer is due: run it */
t->Run();
} else {
/* not yet due: move it back to the given
list */
list.push_back(*t);
}
});
}
inline Event::TimePoint
TimerWheel::GetNextDue(const std::size_t bucket_index,
const Event::TimePoint bucket_start_time) const noexcept
{
Event::TimePoint t = bucket_start_time;
for (std::size_t i = bucket_index;;) {
t += RESOLUTION;
if (!buckets[i].empty())
/* found a non-empty bucket; return this
bucket's end time */
return t;
i = NextBucketIndex(i);
if (i == bucket_index)
/* no timer scheduled - no wakeup */
return Event::TimePoint::max();
}
}
inline Event::Duration
TimerWheel::GetSleep(Event::TimePoint now) const noexcept
{
auto t = GetNextDue(BucketIndexAt(now), GetBucketStartTime(now));
assert(t > now);
if (t == Event::TimePoint::max())
return Event::Duration(-1);
return t - now;
}
Event::Duration
TimerWheel::Run(const Event::TimePoint now) noexcept
{
/* check all buckets between the last time we were invoked and
now */
const std::size_t start_bucket = BucketIndexAt(last_time);
std::size_t end_bucket;
if (now < last_time || now >= last_time + SPAN - RESOLUTION) {
/* too much time has passed (or time warp): check all
buckets */
end_bucket = start_bucket;
} else {
/* check only the relevant range of buckets (between
the last run and now) */
/* note, we're not checking the current bucket index,
we stop at the one before that; all timers in the
same bucket shall be combined, so we only execute
it when the bucket end has passed by */
end_bucket = BucketIndexAt(now);
if (start_bucket == end_bucket)
/* still on the same bucket - don't run any
timers, instead wait until this bucket end
has passed by */
return GetSleep(now);
}
last_time = GetBucketStartTime(now);
assert(BucketIndexAt(last_time) == BucketIndexAt(now));
/* run those buckets */
for (std::size_t i = start_bucket;;) {
Run(buckets[i], now);
i = NextBucketIndex(i);
if (i == end_bucket)
break;
}
/* now determine how much time remains until the next
non-empty bucket passes */
return GetSleep(now);
}

126
src/event/TimerWheel.hxx Normal file
View File

@ -0,0 +1,126 @@
/*
* Copyright 2007-2021 CM4all GmbH
* All rights reserved.
*
* author: Max Kellermann <mk@cm4all.com>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include "Chrono.hxx"
#include "util/IntrusiveList.hxx"
#include <array>
#include <algorithm>
class CoarseTimerEvent;
/**
* A list of #CoarseTimerEvent instances managed in a circular timer
* wheel.
*/
class TimerWheel final {
static constexpr Event::Duration RESOLUTION = std::chrono::seconds(1);
static constexpr Event::Duration SPAN = std::chrono::minutes(2);
static_assert(SPAN % RESOLUTION == Event::Duration::zero());
static constexpr std::size_t N_BUCKETS = SPAN / RESOLUTION;
using List = IntrusiveList<CoarseTimerEvent>;
/**
* Each bucket contains a doubly linked list of
* #CoarseTimerEvent instances scheduled for one #RESOLUTION.
*
* Timers scheduled far into the future (more than #SPAN) may
* also sit in between, so anybody walking those lists should
* check the due time.
*/
std::array<List, N_BUCKETS> buckets;
/**
* The last time Run() was invoked. This is needed to
* determine the range of buckets to be checked, because we
* can't rely on getting a caller for every bucket; there may
* be arbitrary delays.
*/
Event::TimePoint last_time{};
public:
TimerWheel() noexcept;
~TimerWheel() noexcept;
bool IsEmpty() const noexcept {
return std::all_of(buckets.begin(), buckets.end(),
[](const auto &list){
return list.empty();
});
}
void Insert(CoarseTimerEvent &t) noexcept;
/**
* Invoke all expired #CoarseTimerEvent instances and return
* the duration until the next timer expires. Returns a
* negative duration if there is no timeout.
*/
Event::Duration Run(Event::TimePoint now) noexcept;
private:
static constexpr std::size_t NextBucketIndex(std::size_t i) noexcept {
return (i + 1) % N_BUCKETS;
}
static constexpr std::size_t BucketIndexAt(Event::TimePoint t) noexcept {
return std::size_t(t.time_since_epoch() / RESOLUTION)
% N_BUCKETS;
}
static constexpr Event::TimePoint GetBucketStartTime(Event::TimePoint t) noexcept {
return t - t.time_since_epoch() % RESOLUTION;
}
/**
* What is the end time of the next non-empty bucket?
*
* @param bucket_index start searching at this bucket index
* @return the bucket end time or max() if the wheel is empty
*/
[[gnu::pure]]
Event::TimePoint GetNextDue(std::size_t bucket_index,
Event::TimePoint bucket_start_time) const noexcept;
[[gnu::pure]]
Event::Duration GetSleep(Event::TimePoint now) const noexcept;
/**
* Run all due timers in this bucket.
*/
static void Run(List &list, Event::TimePoint now) noexcept;
};

View File

@ -22,7 +22,9 @@ endif
event = static_library( event = static_library(
'event', 'event',
'SignalMonitor.cxx', 'SignalMonitor.cxx',
'TimerWheel.cxx',
'TimerList.cxx', 'TimerList.cxx',
'CoarseTimerEvent.cxx',
'FineTimerEvent.cxx', 'FineTimerEvent.cxx',
'IdleEvent.cxx', 'IdleEvent.cxx',
'InjectEvent.cxx', 'InjectEvent.cxx',