diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx index 7b036d3f8..10408fbf8 100644 --- a/src/input/AsyncInputStream.cxx +++ b/src/input/AsyncInputStream.cxx @@ -159,26 +159,12 @@ AsyncInputStream::IsAvailable() const noexcept !buffer.empty(); } -size_t -AsyncInputStream::Read(std::unique_lock &lock, - std::span dest) +inline std::size_t +AsyncInputStream::ReadFromBuffer(std::span dest) noexcept { - assert(!GetEventLoop().IsInside()); - - /* wait for data */ - CircularBuffer::Range r; - while (true) { - Check(); - - r = buffer.Read(); - if (!r.empty()) - break; - - if (IsEOF()) - return 0; - - caller_cond.wait(lock); - } + const auto r = buffer.Read(); + if (r.empty()) + return 0; const size_t nbytes = std::min(dest.size(), r.size()); memcpy(dest.data(), r.data(), nbytes); @@ -191,13 +177,33 @@ AsyncInputStream::Read(std::unique_lock &lock, buffer.Clear(); offset += (offset_type)nbytes; - - if (paused && buffer.GetSize() < resume_at) - deferred_resume.Schedule(); - return nbytes; } +size_t +AsyncInputStream::Read(std::unique_lock &lock, + std::span dest) +{ + assert(!GetEventLoop().IsInside()); + + /* wait for data */ + while (true) { + Check(); + + if (std::size_t nbytes = ReadFromBuffer(dest); nbytes > 0) { + if (paused && buffer.GetSize() < resume_at) + deferred_resume.Schedule(); + + return nbytes; + } + + if (IsEOF()) + return 0; + + caller_cond.wait(lock); + } +} + void AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept { diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx index b767005e6..4135efce8 100644 --- a/src/input/AsyncInputStream.hxx +++ b/src/input/AsyncInputStream.hxx @@ -152,6 +152,7 @@ protected: void SeekDone() noexcept; private: + std::size_t ReadFromBuffer(std::span dest) noexcept; void Resume(); /* for InjectEvent */ diff --git a/src/input/ThreadInputStream.cxx b/src/input/ThreadInputStream.cxx index e72fd17a5..eedcdd183 100644 --- a/src/input/ThreadInputStream.cxx +++ b/src/input/ThreadInputStream.cxx @@ -149,6 +149,27 @@ ThreadInputStream::Seek([[maybe_unused]] std::unique_lock &lock, wake_cond.notify_one(); } +inline std::size_t +ThreadInputStream::ReadFromBuffer(std::span dest) noexcept +{ + const auto r = buffer.Read(); + if (r.empty()) + return 0; + + const size_t nbytes = std::min(dest.size(), r.size()); + memcpy(dest.data(), r.data(), nbytes); + buffer.Consume(nbytes); + + if (buffer.empty()) + /* when the buffer becomes empty, reset its head and + tail so the next write can fill the whole buffer + and not just the part after the tail */ + buffer.Clear(); + + offset += (offset_type)nbytes; + return nbytes; +} + size_t ThreadInputStream::Read(std::unique_lock &lock, std::span dest) @@ -164,21 +185,8 @@ ThreadInputStream::Read(std::unique_lock &lock, continue; } - auto r = buffer.Read(); - if (!r.empty()) { - size_t nbytes = std::min(dest.size(), r.size()); - memcpy(dest.data(), r.data(), nbytes); - buffer.Consume(nbytes); - - if (buffer.empty()) - /* when the buffer becomes empty, - reset its head and tail so the next - write can fill the whole buffer and - not just the part after the tail */ - buffer.Clear(); - + if (std::size_t nbytes = ReadFromBuffer(dest); nbytes > 0) { wake_cond.notify_one(); - offset += nbytes; return nbytes; } diff --git a/src/input/ThreadInputStream.hxx b/src/input/ThreadInputStream.hxx index e76bd6c0d..f74cd1b00 100644 --- a/src/input/ThreadInputStream.hxx +++ b/src/input/ThreadInputStream.hxx @@ -151,6 +151,8 @@ protected: virtual void Cancel() noexcept {} private: + std::size_t ReadFromBuffer(std::span dest) noexcept; + bool IsSeeking() const noexcept { return seek_offset != UNKNOWN_SIZE; }