diff --git a/src/input/BufferingInputStream.cxx b/src/input/BufferingInputStream.cxx index ebfffd943..9542e48b2 100644 --- a/src/input/BufferingInputStream.cxx +++ b/src/input/BufferingInputStream.cxx @@ -111,10 +111,8 @@ BufferingInputStream::Read(std::unique_lock &lock, void *ptr, size_t s) return nbytes; } - if (read_error) { - wake_cond.notify_one(); - std::rethrow_exception(std::exchange(read_error, {})); - } + if (read_error) + std::rethrow_exception(read_error); client_cond.wait(lock); } @@ -152,11 +150,7 @@ BufferingInputStream::RunThread() noexcept } seek = false; - read_error = {}; client_cond.notify_one(); - } else if (read_error) { - /* wait for client to consume the read error */ - wake_cond.wait(lock); } else if (offset != input->GetOffset() && !IsAvailable()) { /* a past Seek() call was a no-op because data was already available at that position, but @@ -180,6 +174,7 @@ BufferingInputStream::RunThread() noexcept read_error = std::current_exception(); client_cond.notify_one(); OnBufferAvailable(); + break; } } else if (input->IsEOF()) { /* our input has reached its end: prepare @@ -198,6 +193,7 @@ BufferingInputStream::RunThread() noexcept read_error = std::current_exception(); client_cond.notify_one(); OnBufferAvailable(); + break; } } else if (input->IsAvailable()) { const auto read_offset = input->GetOffset(); @@ -221,6 +217,7 @@ BufferingInputStream::RunThread() noexcept read_error = std::current_exception(); client_cond.notify_one(); OnBufferAvailable(); + break; } } else { /* we need more data at our @@ -235,6 +232,7 @@ BufferingInputStream::RunThread() noexcept read_error = std::current_exception(); client_cond.notify_one(); OnBufferAvailable(); + break; } } @@ -248,6 +246,9 @@ BufferingInputStream::RunThread() noexcept read_offset + nbytes); } catch (...) { read_error = std::current_exception(); + client_cond.notify_one(); + OnBufferAvailable(); + break; } client_cond.notify_one();