diff --git a/src/input/ThreadInputStream.cxx b/src/input/ThreadInputStream.cxx index 8b03373ef..a637102e7 100644 --- a/src/input/ThreadInputStream.cxx +++ b/src/input/ThreadInputStream.cxx @@ -64,7 +64,10 @@ ThreadInputStream::ThreadFunc() const ScopeLock lock(mutex); - if (!Open(postponed_error)) { + try { + Open(); + } catch (...) { + postponed_exception = std::current_exception(); cond.broadcast(); return; } @@ -73,25 +76,27 @@ ThreadInputStream::ThreadFunc() SetReady(); while (!close) { - assert(!postponed_error.IsDefined()); + assert(!postponed_exception); auto w = buffer->Write(); if (w.IsEmpty()) { wake_cond.wait(mutex); } else { - Error error; size_t nbytes; - { + try { const ScopeUnlock unlock(mutex); - nbytes = ThreadRead(w.data, w.size, error); + nbytes = ThreadRead(w.data, w.size); + } catch (...) { + postponed_exception = std::current_exception(); + cond.broadcast(); + break; } cond.broadcast(); if (nbytes == 0) { eof = true; - postponed_error = std::move(error); break; } @@ -110,14 +115,12 @@ ThreadInputStream::ThreadFunc(void *ctx) } bool -ThreadInputStream::Check(Error &error) +ThreadInputStream::Check(Error &) { assert(!thread.IsInside()); - if (postponed_error.IsDefined()) { - error = std::move(postponed_error); - return false; - } + if (postponed_exception) + std::rethrow_exception(postponed_exception); return true; } @@ -127,19 +130,17 @@ ThreadInputStream::IsAvailable() { assert(!thread.IsInside()); - return !buffer->IsEmpty() || eof || postponed_error.IsDefined(); + return !buffer->IsEmpty() || eof || postponed_exception; } inline size_t -ThreadInputStream::Read(void *ptr, size_t read_size, Error &error) +ThreadInputStream::Read(void *ptr, size_t read_size, Error &) { assert(!thread.IsInside()); while (true) { - if (postponed_error.IsDefined()) { - error = std::move(postponed_error); - return 0; - } + if (postponed_exception) + std::rethrow_exception(postponed_exception); auto r = buffer->Read(); if (!r.IsEmpty()) { diff --git a/src/input/ThreadInputStream.hxx b/src/input/ThreadInputStream.hxx index 6ad45545c..5001c070d 100644 --- a/src/input/ThreadInputStream.hxx +++ b/src/input/ThreadInputStream.hxx @@ -24,7 +24,8 @@ #include "InputStream.hxx" #include "thread/Thread.hxx" #include "thread/Cond.hxx" -#include "util/Error.hxx" + +#include #include @@ -51,7 +52,7 @@ class ThreadInputStream : public InputStream { */ Cond wake_cond; - Error postponed_error; + std::exception_ptr postponed_exception; const size_t buffer_size; CircularBuffer *buffer = nullptr; @@ -103,9 +104,10 @@ protected: * * The #InputStream is locked. Unlock/relock it if you do a * blocking operation. + * + * Throws std::runtime_error on error. */ - virtual bool Open(gcc_unused Error &error) { - return true; + virtual void Open() { } /** @@ -113,9 +115,11 @@ protected: * * The #InputStream is not locked. * - * @return 0 on end-of-file or on error + * Throws std::runtime_error on error. + * + * @return 0 on end-of-file */ - virtual size_t ThreadRead(void *ptr, size_t size, Error &error) = 0; + virtual size_t ThreadRead(void *ptr, size_t size) = 0; /** * Optional deinitialization before leaving the thread. diff --git a/src/input/plugins/MmsInputPlugin.cxx b/src/input/plugins/MmsInputPlugin.cxx index 9649cc85f..717d0f5fe 100644 --- a/src/input/plugins/MmsInputPlugin.cxx +++ b/src/input/plugins/MmsInputPlugin.cxx @@ -21,12 +21,13 @@ #include "MmsInputPlugin.hxx" #include "input/ThreadInputStream.hxx" #include "input/InputPlugin.hxx" +#include "system/Error.hxx" #include "util/StringCompare.hxx" -#include "util/Error.hxx" -#include "util/Domain.hxx" #include +#include + static constexpr size_t MMS_BUFFER_SIZE = 256 * 1024; class MmsInputStream final : public ThreadInputStream { @@ -39,27 +40,23 @@ public: } protected: - virtual bool Open(gcc_unused Error &error) override; - virtual size_t ThreadRead(void *ptr, size_t size, - Error &error) override; + virtual void Open() override; + virtual size_t ThreadRead(void *ptr, size_t size) override; void Close() override { mmsx_close(mms); } }; -static constexpr Domain mms_domain("mms"); - -bool -MmsInputStream::Open(Error &error) +void +MmsInputStream::Open() { Unlock(); mms = mmsx_connect(nullptr, nullptr, GetURI(), 128 * 1024); if (mms == nullptr) { Lock(); - error.Set(mms_domain, "mmsx_connect() failed"); - return false; + throw std::runtime_error("mmsx_connect() failed"); } Lock(); @@ -67,7 +64,6 @@ MmsInputStream::Open(Error &error) /* TODO: is this correct? at least this selects the ffmpeg decoder, which seems to work fine */ SetMimeType("audio/x-ms-wma"); - return true; } static InputStream * @@ -86,7 +82,7 @@ input_mms_open(const char *url, } size_t -MmsInputStream::ThreadRead(void *ptr, size_t read_size, Error &error) +MmsInputStream::ThreadRead(void *ptr, size_t read_size) { /* unfortunately, mmsx_read() blocks until the whole buffer has been filled; to avoid big latencies, limit the size of @@ -98,7 +94,7 @@ MmsInputStream::ThreadRead(void *ptr, size_t read_size, Error &error) int nbytes = mmsx_read(nullptr, mms, (char *)ptr, read_size); if (nbytes <= 0) { if (nbytes < 0) - error.SetErrno("mmsx_read() failed"); + throw MakeErrno("mmsx_read() failed"); return 0; }