input/MaybeBuffered: proxy InputStream implementation which auto-uses BufferedInputStream

This commit is contained in:
Max Kellermann
2018-06-21 22:19:46 +02:00
parent 12f2418445
commit 6681b14b71
4 changed files with 98 additions and 6 deletions

View File

@@ -32,6 +32,8 @@ BufferedInputStream::BufferedInputStream(InputStreamPtr _input)
{
assert(IsEligible(*input));
input->SetHandler(this);
if (input->HasMimeType())
SetMimeType(input->GetMimeType());
@@ -66,10 +68,12 @@ void
BufferedInputStream::Seek(offset_type new_offset)
{
auto r = buffer.Read(new_offset);
if (r.HasData())
if (r.HasData()) {
/* nice, we already have some data at the desired
offset and this method call is a no-op */
offset = new_offset;
return;
}
seek_offset = new_offset;
seek = true;
@@ -80,6 +84,8 @@ BufferedInputStream::Seek(offset_type new_offset)
if (seek_error)
std::rethrow_exception(std::exchange(seek_error, {}));
offset = input->GetOffset();
}
bool
@@ -101,17 +107,26 @@ BufferedInputStream::Read(void *ptr, size_t s)
return 0;
while (true) {
assert(size == buffer.size());
auto r = buffer.Read(offset);
if (r.HasData()) {
/* yay, we have some data */
size_t nbytes = std::min(s, r.defined_buffer.size);
memcpy(ptr, r.defined_buffer.data, nbytes);
offset += nbytes;
if (!IsAvailable()) {
/* wake up the sleeping thread */
idle = false;
wake_cond.signal();
}
return nbytes;
}
if (read_error) {
wake_cond.broadcast();
wake_cond.signal();
std::rethrow_exception(std::exchange(read_error, {}));
}
@@ -133,6 +148,8 @@ BufferedInputStream::RunThread() noexcept
const std::lock_guard<Mutex> lock(mutex);
while (!stop) {
assert(size == buffer.size());
if (seek) {
try {
input->Seek(seek_offset);
@@ -140,8 +157,6 @@ BufferedInputStream::RunThread() noexcept
seek_error = std::current_exception();
}
offset = input->GetOffset();
idle = false;
seek = false;
client_cond.signal();
@@ -151,8 +166,7 @@ BufferedInputStream::RunThread() noexcept
auto w = buffer.Write(read_offset);
if (w.empty()) {
auto r = buffer.Read(offset);
if (r.HasData()) {
if (IsAvailable()) {
/* we still have enough data
for the next Read() - sleep
until we need more data */