input/async: use class DeferredCall

This commit is contained in:
Max Kellermann 2016-06-17 18:30:45 +02:00
parent 829616534e
commit 5d11759f7d
2 changed files with 29 additions and 15 deletions

View File

@ -31,7 +31,9 @@ AsyncInputStream::AsyncInputStream(const char *_url,
Mutex &_mutex, Cond &_cond, Mutex &_mutex, Cond &_cond,
size_t _buffer_size, size_t _buffer_size,
size_t _resume_at) size_t _resume_at)
:InputStream(_url, _mutex, _cond), DeferredMonitor(io_thread_get()), :InputStream(_url, _mutex, _cond),
deferred_resume(io_thread_get(), BIND_THIS_METHOD(DeferredResume)),
deferred_seek(io_thread_get(), BIND_THIS_METHOD(DeferredSeek)),
allocation(_buffer_size), allocation(_buffer_size),
buffer((uint8_t *)allocation.get(), _buffer_size), buffer((uint8_t *)allocation.get(), _buffer_size),
resume_at(_resume_at), resume_at(_resume_at),
@ -141,7 +143,7 @@ AsyncInputStream::Seek(offset_type new_offset, Error &error)
seek_offset = new_offset; seek_offset = new_offset;
seek_state = SeekState::SCHEDULED; seek_state = SeekState::SCHEDULED;
DeferredMonitor::Schedule(); deferred_seek.Schedule();
while (seek_state != SeekState::NONE) while (seek_state != SeekState::NONE)
cond.wait(mutex); cond.wait(mutex);
@ -208,7 +210,7 @@ AsyncInputStream::Read(void *ptr, size_t read_size, Error &error)
offset += (offset_type)nbytes; offset += (offset_type)nbytes;
if (paused && buffer.GetSize() < resume_at) if (paused && buffer.GetSize() < resume_at)
DeferredMonitor::Schedule(); deferred_resume.Schedule();
return nbytes; return nbytes;
} }
@ -240,16 +242,24 @@ AsyncInputStream::AppendToBuffer(const void *data, size_t append_size)
} }
void void
AsyncInputStream::RunDeferred() AsyncInputStream::DeferredResume()
{ {
const ScopeLock protect(mutex); const ScopeLock protect(mutex);
Resume(); Resume();
}
if (seek_state == SeekState::SCHEDULED) {
seek_state = SeekState::PENDING; void
buffer.Clear(); AsyncInputStream::DeferredSeek()
paused = false; {
DoSeek(seek_offset); const ScopeLock protect(mutex);
} if (seek_state != SeekState::SCHEDULED)
return;
Resume();
seek_state = SeekState::PENDING;
buffer.Clear();
paused = false;
DoSeek(seek_offset);
} }

View File

@ -21,7 +21,7 @@
#define MPD_ASYNC_INPUT_STREAM_HXX #define MPD_ASYNC_INPUT_STREAM_HXX
#include "InputStream.hxx" #include "InputStream.hxx"
#include "event/DeferredMonitor.hxx" #include "event/DeferredCall.hxx"
#include "util/HugeAllocator.hxx" #include "util/HugeAllocator.hxx"
#include "util/CircularBuffer.hxx" #include "util/CircularBuffer.hxx"
#include "util/Error.hxx" #include "util/Error.hxx"
@ -32,11 +32,14 @@
* buffer, and that buffer is then consumed by another thread using * buffer, and that buffer is then consumed by another thread using
* the regular #InputStream API. * the regular #InputStream API.
*/ */
class AsyncInputStream : public InputStream, private DeferredMonitor { class AsyncInputStream : public InputStream {
enum class SeekState : uint8_t { enum class SeekState : uint8_t {
NONE, SCHEDULED, PENDING NONE, SCHEDULED, PENDING
}; };
DeferredCall deferred_resume;
DeferredCall deferred_seek;
HugeAllocation allocation; HugeAllocation allocation;
CircularBuffer<uint8_t> buffer; CircularBuffer<uint8_t> buffer;
@ -162,8 +165,9 @@ protected:
private: private:
void Resume(); void Resume();
/* virtual methods from DeferredMonitor */ /* for DeferredCall */
void RunDeferred() final; void DeferredResume();
void DeferredSeek();
}; };
#endif #endif