input/Stream: remove attribute "cond", replace with handler interface

This adds a bit of overhead, but also adds flexibility to the API,
because arbitrary triggers may be invoked from that virtual method
implementation, not just Cond::signal().

The motivation for this is to make the handlers more dynamic, for the
upcoming buffering class utilizing ProxyInputStream.
This commit is contained in:
Max Kellermann
2018-06-22 19:37:18 +02:00
parent 01d8eb6290
commit d0fbf6db59
66 changed files with 403 additions and 280 deletions

View File

@@ -19,6 +19,7 @@
#include "config.h"
#include "AsyncInputStream.hxx"
#include "CondHandler.hxx"
#include "tag/Tag.hxx"
#include "thread/Cond.hxx"
#include "event/Loop.hxx"
@@ -29,10 +30,10 @@
#include <string.h>
AsyncInputStream::AsyncInputStream(EventLoop &event_loop, const char *_url,
Mutex &_mutex, Cond &_cond,
Mutex &_mutex,
size_t _buffer_size,
size_t _resume_at)
:InputStream(_url, _mutex, _cond),
:InputStream(_url, _mutex),
deferred_resume(event_loop, BIND_THIS_METHOD(DeferredResume)),
deferred_seek(event_loop, BIND_THIS_METHOD(DeferredSeek)),
allocation(_buffer_size),
@@ -133,8 +134,10 @@ AsyncInputStream::Seek(offset_type new_offset)
deferred_seek.Schedule();
CondInputStreamHandler cond_handler;
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
while (seek_state != SeekState::NONE)
cond.wait(mutex);
cond_handler.cond.wait(mutex);
Check();
}
@@ -151,7 +154,7 @@ AsyncInputStream::SeekDone() noexcept
open = true;
seek_state = SeekState::NONE;
cond.broadcast();
InvokeOnAvailable();
}
std::unique_ptr<Tag>
@@ -173,6 +176,8 @@ AsyncInputStream::Read(void *ptr, size_t read_size)
{
assert(!GetEventLoop().IsInside());
CondInputStreamHandler cond_handler;
/* wait for data */
CircularBuffer<uint8_t>::Range r;
while (true) {
@@ -182,7 +187,8 @@ AsyncInputStream::Read(void *ptr, size_t read_size)
if (!r.empty() || IsEOF())
break;
cond.wait(mutex);
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
cond_handler.cond.wait(mutex);
}
const size_t nbytes = std::min(read_size, r.size);
@@ -205,7 +211,7 @@ AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept
if (!IsReady())
SetReady();
else
cond.broadcast();
InvokeOnAvailable();
}
void
@@ -231,7 +237,7 @@ AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) noexcept
if (!IsReady())
SetReady();
else
cond.broadcast();
InvokeOnAvailable();
}
void
@@ -243,7 +249,7 @@ AsyncInputStream::DeferredResume() noexcept
Resume();
} catch (...) {
postponed_exception = std::current_exception();
cond.broadcast();
InvokeOnAvailable();
}
}
@@ -265,6 +271,6 @@ AsyncInputStream::DeferredSeek() noexcept
} catch (...) {
seek_state = SeekState::NONE;
postponed_exception = std::current_exception();
cond.broadcast();
InvokeOnAvailable();
}
}