diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx index f351bbc38..4c80e8e57 100644 --- a/src/input/AsyncInputStream.cxx +++ b/src/input/AsyncInputStream.cxx @@ -31,7 +31,9 @@ AsyncInputStream::AsyncInputStream(const char *_url, Mutex &_mutex, Cond &_cond, size_t _buffer_size, size_t _resume_at) - :InputStream(_url, _mutex, _cond), DeferredMonitor(io_thread_get()), + :InputStream(_url, _mutex, _cond), + deferred_resume(io_thread_get(), BIND_THIS_METHOD(DeferredResume)), + deferred_seek(io_thread_get(), BIND_THIS_METHOD(DeferredSeek)), allocation(_buffer_size), buffer((uint8_t *)allocation.get(), _buffer_size), resume_at(_resume_at), @@ -141,7 +143,7 @@ AsyncInputStream::Seek(offset_type new_offset, Error &error) seek_offset = new_offset; seek_state = SeekState::SCHEDULED; - DeferredMonitor::Schedule(); + deferred_seek.Schedule(); while (seek_state != SeekState::NONE) cond.wait(mutex); @@ -208,7 +210,7 @@ AsyncInputStream::Read(void *ptr, size_t read_size, Error &error) offset += (offset_type)nbytes; if (paused && buffer.GetSize() < resume_at) - DeferredMonitor::Schedule(); + deferred_resume.Schedule(); return nbytes; } @@ -240,16 +242,24 @@ AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) } void -AsyncInputStream::RunDeferred() +AsyncInputStream::DeferredResume() { const ScopeLock protect(mutex); Resume(); - - if (seek_state == SeekState::SCHEDULED) { - seek_state = SeekState::PENDING; - buffer.Clear(); - paused = false; - DoSeek(seek_offset); - } +} + +void +AsyncInputStream::DeferredSeek() +{ + const ScopeLock protect(mutex); + if (seek_state != SeekState::SCHEDULED) + return; + + Resume(); + + seek_state = SeekState::PENDING; + buffer.Clear(); + paused = false; + DoSeek(seek_offset); } diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx index 5f229ee81..d4107e627 100644 --- a/src/input/AsyncInputStream.hxx +++ b/src/input/AsyncInputStream.hxx @@ -21,7 +21,7 @@ #define MPD_ASYNC_INPUT_STREAM_HXX #include "InputStream.hxx" -#include "event/DeferredMonitor.hxx" +#include "event/DeferredCall.hxx" #include "util/HugeAllocator.hxx" #include "util/CircularBuffer.hxx" #include "util/Error.hxx" @@ -32,11 +32,14 @@ * buffer, and that buffer is then consumed by another thread using * the regular #InputStream API. */ -class AsyncInputStream : public InputStream, private DeferredMonitor { +class AsyncInputStream : public InputStream { enum class SeekState : uint8_t { NONE, SCHEDULED, PENDING }; + DeferredCall deferred_resume; + DeferredCall deferred_seek; + HugeAllocation allocation; CircularBuffer buffer; @@ -162,8 +165,9 @@ protected: private: void Resume(); - /* virtual methods from DeferredMonitor */ - void RunDeferred() final; + /* for DeferredCall */ + void DeferredResume(); + void DeferredSeek(); }; #endif