diff --git a/src/input/ProxyInputStream.cxx b/src/input/ProxyInputStream.cxx index efa648d5e..86d2e82aa 100644 --- a/src/input/ProxyInputStream.cxx +++ b/src/input/ProxyInputStream.cxx @@ -20,6 +20,9 @@ #include "config.h" #include "ProxyInputStream.hxx" #include "tag/Tag.hxx" +#include "thread/Cond.hxx" + +#include ProxyInputStream::ProxyInputStream(InputStreamPtr _input) noexcept :InputStream(_input->GetURI(), _input->mutex, _input->cond), @@ -30,9 +33,24 @@ ProxyInputStream::ProxyInputStream(InputStreamPtr _input) noexcept ProxyInputStream::~ProxyInputStream() noexcept = default; +void +ProxyInputStream::SetInput(InputStreamPtr _input) noexcept +{ + assert(!input); + assert(_input); + + input = std::move(_input); + + /* this call wakes up client threads if the new input is + ready */ + CopyAttributes(); +} + void ProxyInputStream::CopyAttributes() { + assert(input); + if (input->IsReady()) { if (!IsReady()) { if (input->HasMimeType()) @@ -53,12 +71,16 @@ ProxyInputStream::CopyAttributes() void ProxyInputStream::Check() { - input->Check(); + if (input) + input->Check(); } void ProxyInputStream::Update() noexcept { + if (!input) + return; + input->Update(); CopyAttributes(); } @@ -66,6 +88,9 @@ ProxyInputStream::Update() noexcept void ProxyInputStream::Seek(offset_type new_offset) { + while (!input) + cond.wait(mutex); + input->Seek(new_offset); CopyAttributes(); } @@ -73,24 +98,30 @@ ProxyInputStream::Seek(offset_type new_offset) bool ProxyInputStream::IsEOF() noexcept { - return input->IsEOF(); + return input && input->IsEOF(); } std::unique_ptr ProxyInputStream::ReadTag() { + if (!input) + return nullptr; + return input->ReadTag(); } bool ProxyInputStream::IsAvailable() noexcept { - return input->IsAvailable(); + return input && input->IsAvailable(); } size_t ProxyInputStream::Read(void *ptr, size_t read_size) { + while (!input) + cond.wait(mutex); + size_t nbytes = input->Read(ptr, read_size); CopyAttributes(); return nbytes; diff --git a/src/input/ProxyInputStream.hxx b/src/input/ProxyInputStream.hxx index 6f768ad47..e55330a64 100644 --- a/src/input/ProxyInputStream.hxx +++ b/src/input/ProxyInputStream.hxx @@ -29,15 +29,25 @@ struct Tag; * An #InputStream that forwards all methods call to another * #InputStream instance. This can be used as a base class to * override selected methods. + * + * The inner #InputStream instance may be nullptr initially, to be set + * later. */ class ProxyInputStream : public InputStream { protected: InputStreamPtr input; public: - gcc_nonnull_all explicit ProxyInputStream(InputStreamPtr _input) noexcept; + /** + * Construct an instance without an #InputStream instance. + * Once that instance becomes available, call SetInput(). + */ + ProxyInputStream(const char *_uri, + Mutex &_mutex, Cond &_cond) noexcept + :InputStream(_uri, _mutex, _cond) {} + virtual ~ProxyInputStream() noexcept; ProxyInputStream(const ProxyInputStream &) = delete; @@ -53,6 +63,14 @@ public: size_t Read(void *ptr, size_t read_size) override; protected: + /** + * If this instance was initialized without an input, this + * method can set it. + * + * Caller must lock the mutex. + */ + void SetInput(InputStreamPtr _input) noexcept; + /** * Copy public attributes from the underlying input stream to the * "rewind" input stream. This function is called when a method of