diff --git a/src/input/ThreadInputStream.cxx b/src/input/ThreadInputStream.cxx index 9c159f016..8487acf41 100644 --- a/src/input/ThreadInputStream.cxx +++ b/src/input/ThreadInputStream.cxx @@ -46,6 +46,14 @@ ThreadInputStream::Start() thread.Start(); } +void +ThreadInputStream::ThreadSeek([[maybe_unused]] offset_type new_offset) +{ + assert(!IsSeekable()); + + throw std::runtime_error{"Not seekable"}; +} + inline void ThreadInputStream::ThreadFunc() noexcept { @@ -67,6 +75,24 @@ ThreadInputStream::ThreadFunc() noexcept while (!close) { assert(!postponed_exception); + if (IsSeeking()) { + const auto seek_offset_copy = offset = seek_offset; + seek_offset = UNKNOWN_SIZE; + eof = false; + buffer.Clear(); + + try { + const ScopeUnlock unlock(mutex); + ThreadSeek(seek_offset_copy); + } catch (...) { + postponed_exception = std::current_exception(); + InvokeOnAvailable(); + break; + } + + offset = seek_offset_copy; + } + auto w = buffer.Write(); if (w.empty()) { wake_cond.wait(lock); @@ -115,6 +141,14 @@ ThreadInputStream::IsAvailable() const noexcept return !IsEOF() || postponed_exception; } +void +ThreadInputStream::Seek([[maybe_unused]] std::unique_lock &lock, + offset_type new_offset) +{ + seek_offset = new_offset; + wake_cond.notify_one(); +} + size_t ThreadInputStream::Read(std::unique_lock &lock, std::span dest) @@ -125,6 +159,11 @@ ThreadInputStream::Read(std::unique_lock &lock, if (postponed_exception) std::rethrow_exception(postponed_exception); + if (IsSeeking()) { + caller_cond.wait(lock); + continue; + } + auto r = buffer.Read(); if (!r.empty()) { size_t nbytes = std::min(dest.size(), r.size()); @@ -147,5 +186,5 @@ ThreadInputStream::IsEOF() const noexcept { assert(!thread.IsInside()); - return eof && buffer.empty(); + return eof && buffer.empty() && !IsSeeking(); } diff --git a/src/input/ThreadInputStream.hxx b/src/input/ThreadInputStream.hxx index 44f7d8da3..e76bd6c0d 100644 --- a/src/input/ThreadInputStream.hxx +++ b/src/input/ThreadInputStream.hxx @@ -20,8 +20,6 @@ * another thread using the regular #InputStream API. This class * manages the thread and the buffer. * - * This works only for "streams": unknown length, no seeking, no tags. - * * The implementation must call Stop() before its destruction * completes. This cannot be done in ~ThreadInputStream() because at * this point, the class has been morphed back to #ThreadInputStream @@ -50,6 +48,8 @@ class ThreadInputStream : public InputStream { CircularBuffer buffer{allocation}; + offset_type seek_offset = UNKNOWN_SIZE; + /** * Shall the stream be closed? */ @@ -81,6 +81,8 @@ public: void Check() final; bool IsEOF() const noexcept final; bool IsAvailable() const noexcept final; + void Seek(std::unique_lock &lock, + offset_type new_offset) final; size_t Read(std::unique_lock &lock, std::span dest) override final; @@ -123,6 +125,16 @@ protected: */ virtual std::size_t ThreadRead(std::span dest) = 0; + /** + * The actual Seek() implementation. This virtual method will + * be called from within the thread. + * + * The #InputStream is not locked. + * + * Throws on error. + */ + virtual void ThreadSeek(offset_type new_offset); + /** * Optional deinitialization before leaving the thread. * @@ -139,5 +151,9 @@ protected: virtual void Cancel() noexcept {} private: + bool IsSeeking() const noexcept { + return seek_offset != UNKNOWN_SIZE; + } + void ThreadFunc() noexcept; };