input/thread: implement size and seek

This commit is contained in:
Max Kellermann 2024-07-29 22:01:51 +02:00
parent dc51015c75
commit ec30716e01
2 changed files with 58 additions and 3 deletions

View File

@ -46,6 +46,14 @@ ThreadInputStream::Start()
thread.Start(); thread.Start();
} }
void
ThreadInputStream::ThreadSeek([[maybe_unused]] offset_type new_offset)
{
assert(!IsSeekable());
throw std::runtime_error{"Not seekable"};
}
inline void inline void
ThreadInputStream::ThreadFunc() noexcept ThreadInputStream::ThreadFunc() noexcept
{ {
@ -67,6 +75,24 @@ ThreadInputStream::ThreadFunc() noexcept
while (!close) { while (!close) {
assert(!postponed_exception); assert(!postponed_exception);
if (IsSeeking()) {
const auto seek_offset_copy = offset = seek_offset;
seek_offset = UNKNOWN_SIZE;
eof = false;
buffer.Clear();
try {
const ScopeUnlock unlock(mutex);
ThreadSeek(seek_offset_copy);
} catch (...) {
postponed_exception = std::current_exception();
InvokeOnAvailable();
break;
}
offset = seek_offset_copy;
}
auto w = buffer.Write(); auto w = buffer.Write();
if (w.empty()) { if (w.empty()) {
wake_cond.wait(lock); wake_cond.wait(lock);
@ -115,6 +141,14 @@ ThreadInputStream::IsAvailable() const noexcept
return !IsEOF() || postponed_exception; return !IsEOF() || postponed_exception;
} }
void
ThreadInputStream::Seek([[maybe_unused]] std::unique_lock<Mutex> &lock,
offset_type new_offset)
{
seek_offset = new_offset;
wake_cond.notify_one();
}
size_t size_t
ThreadInputStream::Read(std::unique_lock<Mutex> &lock, ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
std::span<std::byte> dest) std::span<std::byte> dest)
@ -125,6 +159,11 @@ ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
if (postponed_exception) if (postponed_exception)
std::rethrow_exception(postponed_exception); std::rethrow_exception(postponed_exception);
if (IsSeeking()) {
caller_cond.wait(lock);
continue;
}
auto r = buffer.Read(); auto r = buffer.Read();
if (!r.empty()) { if (!r.empty()) {
size_t nbytes = std::min(dest.size(), r.size()); size_t nbytes = std::min(dest.size(), r.size());
@ -147,5 +186,5 @@ ThreadInputStream::IsEOF() const noexcept
{ {
assert(!thread.IsInside()); assert(!thread.IsInside());
return eof && buffer.empty(); return eof && buffer.empty() && !IsSeeking();
} }

View File

@ -20,8 +20,6 @@
* another thread using the regular #InputStream API. This class * another thread using the regular #InputStream API. This class
* manages the thread and the buffer. * manages the thread and the buffer.
* *
* This works only for "streams": unknown length, no seeking, no tags.
*
* The implementation must call Stop() before its destruction * The implementation must call Stop() before its destruction
* completes. This cannot be done in ~ThreadInputStream() because at * completes. This cannot be done in ~ThreadInputStream() because at
* this point, the class has been morphed back to #ThreadInputStream * this point, the class has been morphed back to #ThreadInputStream
@ -50,6 +48,8 @@ class ThreadInputStream : public InputStream {
CircularBuffer<std::byte> buffer{allocation}; CircularBuffer<std::byte> buffer{allocation};
offset_type seek_offset = UNKNOWN_SIZE;
/** /**
* Shall the stream be closed? * Shall the stream be closed?
*/ */
@ -81,6 +81,8 @@ public:
void Check() final; void Check() final;
bool IsEOF() const noexcept final; bool IsEOF() const noexcept final;
bool IsAvailable() const noexcept final; bool IsAvailable() const noexcept final;
void Seek(std::unique_lock<Mutex> &lock,
offset_type new_offset) final;
size_t Read(std::unique_lock<Mutex> &lock, size_t Read(std::unique_lock<Mutex> &lock,
std::span<std::byte> dest) override final; std::span<std::byte> dest) override final;
@ -123,6 +125,16 @@ protected:
*/ */
virtual std::size_t ThreadRead(std::span<std::byte> dest) = 0; virtual std::size_t ThreadRead(std::span<std::byte> dest) = 0;
/**
* The actual Seek() implementation. This virtual method will
* be called from within the thread.
*
* The #InputStream is not locked.
*
* Throws on error.
*/
virtual void ThreadSeek(offset_type new_offset);
/** /**
* Optional deinitialization before leaving the thread. * Optional deinitialization before leaving the thread.
* *
@ -139,5 +151,9 @@ protected:
virtual void Cancel() noexcept {} virtual void Cancel() noexcept {}
private: private:
bool IsSeeking() const noexcept {
return seek_offset != UNKNOWN_SIZE;
}
void ThreadFunc() noexcept; void ThreadFunc() noexcept;
}; };