From 555a4d738c84b5076652e74c642a29025261e644 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Fri, 17 May 2019 10:52:39 +0200 Subject: [PATCH] input/buffering: pass offset to Read() and eliminate Seek() Another step towards supporting multiple readers. --- src/input/BufferedInputStream.cxx | 9 ++- src/input/BufferingInputStream.cxx | 101 +++++++++-------------------- src/input/BufferingInputStream.hxx | 12 ++-- 3 files changed, 38 insertions(+), 84 deletions(-) diff --git a/src/input/BufferedInputStream.cxx b/src/input/BufferedInputStream.cxx index 55eb07b5d..ae99a2913 100644 --- a/src/input/BufferedInputStream.cxx +++ b/src/input/BufferedInputStream.cxx @@ -46,11 +46,10 @@ BufferedInputStream::Check() } void -BufferedInputStream::Seek(std::unique_lock &lock, +BufferedInputStream::Seek(std::unique_lock &, offset_type new_offset) { - BufferingInputStream::Seek(lock, new_offset); - InputStream::offset = new_offset; + offset = new_offset; } bool @@ -62,14 +61,14 @@ BufferedInputStream::IsEOF() noexcept bool BufferedInputStream::IsAvailable() noexcept { - return BufferingInputStream::IsAvailable(); + return BufferingInputStream::IsAvailable(offset); } size_t BufferedInputStream::Read(std::unique_lock &lock, void *ptr, size_t s) { - size_t nbytes = BufferingInputStream::Read(lock, ptr, s); + size_t nbytes = BufferingInputStream::Read(lock, offset, ptr, s); InputStream::offset += nbytes; return nbytes; } diff --git a/src/input/BufferingInputStream.cxx b/src/input/BufferingInputStream.cxx index 220d0f5ff..2d36782e4 100644 --- a/src/input/BufferingInputStream.cxx +++ b/src/input/BufferingInputStream.cxx @@ -55,42 +55,25 @@ BufferingInputStream::Check() input->Check(); } -void -BufferingInputStream::Seek(std::unique_lock &lock, size_t new_offset) -{ - if (new_offset >= size()) { - offset = new_offset; - return; - } - - auto r = buffer.Read(new_offset); - if (r.HasData()) { - /* nice, we already have some data at the desired - offset and this method call is a no-op */ - offset = new_offset; - return; - } - - seek_offset = new_offset; - seek = true; - wake_cond.notify_one(); - - client_cond.wait(lock, [this]{ return !seek; }); - - if (seek_error) - std::rethrow_exception(std::exchange(seek_error, {})); - - offset = new_offset; -} - bool -BufferingInputStream::IsAvailable() noexcept +BufferingInputStream::IsAvailable(size_t offset) noexcept { - return offset == size() || buffer.Read(offset).HasData(); + if (offset >= size()) + return true; + + if (buffer.Read(offset).HasData()) + return true; + + /* if no data is available now, make sure it will be soon */ + if (want_offset == INVALID_OFFSET) + want_offset = offset; + + return false; } size_t -BufferingInputStream::Read(std::unique_lock &lock, void *ptr, size_t s) +BufferingInputStream::Read(std::unique_lock &lock, size_t offset, + void *ptr, size_t s) { if (offset >= size()) return 0; @@ -101,13 +84,15 @@ BufferingInputStream::Read(std::unique_lock &lock, void *ptr, size_t s) /* yay, we have some data */ size_t nbytes = std::min(s, r.defined_buffer.size); memcpy(ptr, r.defined_buffer.data, nbytes); - offset += nbytes; return nbytes; } if (error) std::rethrow_exception(error); + if (want_offset == INVALID_OFFSET) + want_offset = offset; + client_cond.wait(lock); } } @@ -132,27 +117,13 @@ inline void BufferingInputStream::RunThreadLocked(std::unique_lock &lock) { while (!stop) { - if (seek) { - try { + if (want_offset != INVALID_OFFSET) { + assert(want_offset < size()); + + const size_t seek_offset = want_offset; + want_offset = INVALID_OFFSET; + if (!buffer.Read(seek_offset).HasData()) input->Seek(lock, seek_offset); - } catch (...) { - seek_error = std::current_exception(); - } - - seek = false; - client_cond.notify_all(); - } else if (offset != input->GetOffset() && !IsAvailable()) { - /* a past Seek() call was a no-op because data - was already available at that position, but - now we've reached a new position where - there is no more data in the buffer, and - our input is reading somewhere else (maybe - stuck at the end of the file); to find a - way out, we now seek our input to our - reading position to be able to fill our - buffer */ - - input->Seek(lock, offset); } else if (input->IsEOF()) { /* our input has reached its end: prepare reading the first remaining hole */ @@ -170,27 +141,13 @@ BufferingInputStream::RunThreadLocked(std::unique_lock &lock) auto w = buffer.Write(read_offset); if (w.empty()) { - if (IsAvailable()) { - /* we still have enough data - for the next Read() - seek - to the first hole */ + size_t new_offset = FindFirstHole(); + if (new_offset == INVALID_OFFSET) + /* the file has been read + completely */ + break; - size_t new_offset = FindFirstHole(); - if (new_offset == INVALID_OFFSET) - /* the file has been - read completely */ - break; - - input->Seek(lock, new_offset); - } else { - /* we need more data at our - current position, because - the next Read() will stall - - seek our input to our - offset to prepare filling - the buffer from there */ - input->Seek(lock, offset); - } + input->Seek(lock, new_offset); continue; } diff --git a/src/input/BufferingInputStream.hxx b/src/input/BufferingInputStream.hxx index 061324142..15f6ef68e 100644 --- a/src/input/BufferingInputStream.hxx +++ b/src/input/BufferingInputStream.hxx @@ -55,11 +55,9 @@ class BufferingInputStream : InputStreamHandler { SparseBuffer buffer; - bool stop = false, seek = false; + bool stop = false; - size_t offset = 0; - - size_t seek_offset; + size_t want_offset = INVALID_OFFSET; std::exception_ptr error, seek_error; @@ -78,9 +76,9 @@ public: } void Check(); - void Seek(std::unique_lock &lock, size_t new_offset); - bool IsAvailable() noexcept; - size_t Read(std::unique_lock &lock, void *ptr, size_t size); + bool IsAvailable(size_t offset) noexcept; + size_t Read(std::unique_lock &lock, size_t offset, + void *ptr, size_t size); protected: virtual void OnBufferAvailable() noexcept {}