From bd78307940ae7e36ea8e8c5958c0440db8853c04 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Mon, 29 Jul 2024 23:10:46 +0200 Subject: [PATCH] input/{async,thread}: add an additional Cond field This eliminates the ScopeExchangeInputStreamHandler kludge. --- src/input/AsyncInputStream.cxx | 23 +++++++++++------------ src/input/AsyncInputStream.hxx | 6 ++++++ src/input/ThreadInputStream.cxx | 8 +++----- src/input/ThreadInputStream.hxx | 5 +++++ 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx index 4b21f3391..dcb35fd39 100644 --- a/src/input/AsyncInputStream.cxx +++ b/src/input/AsyncInputStream.cxx @@ -18,9 +18,7 @@ */ #include "AsyncInputStream.hxx" -#include "CondHandler.hxx" #include "tag/Tag.hxx" -#include "thread/Cond.hxx" #include "event/Loop.hxx" #include @@ -142,10 +140,7 @@ AsyncInputStream::Seek(std::unique_lock &lock, deferred_seek.Schedule(); - CondInputStreamHandler cond_handler; - const ScopeExchangeInputStreamHandler h(*this, &cond_handler); - cond_handler.cond.wait(lock, - [this]{ return seek_state == SeekState::NONE; }); + caller_cond.wait(lock, [this]{ return seek_state == SeekState::NONE; }); Check(); } @@ -162,6 +157,7 @@ AsyncInputStream::SeekDone() noexcept open = true; seek_state = SeekState::NONE; + caller_cond.notify_one(); InvokeOnAvailable(); } @@ -185,8 +181,6 @@ AsyncInputStream::Read(std::unique_lock &lock, { assert(!GetEventLoop().IsInside()); - CondInputStreamHandler cond_handler; - /* wait for data */ CircularBuffer::Range r; while (true) { @@ -196,8 +190,7 @@ AsyncInputStream::Read(std::unique_lock &lock, if (!r.empty() || IsEOF()) break; - const ScopeExchangeInputStreamHandler h(*this, &cond_handler); - cond_handler.cond.wait(lock); + caller_cond.wait(lock); } const size_t nbytes = std::min(read_size, r.size); @@ -219,8 +212,10 @@ AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept if (!IsReady()) SetReady(); - else + else { + caller_cond.notify_one(); InvokeOnAvailable(); + } } void @@ -245,8 +240,10 @@ AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) noexcept if (!IsReady()) SetReady(); - else + else { + caller_cond.notify_one(); InvokeOnAvailable(); + } } void @@ -258,6 +255,7 @@ AsyncInputStream::DeferredResume() noexcept Resume(); } catch (...) { postponed_exception = std::current_exception(); + caller_cond.notify_one(); InvokeOnAvailable(); } } @@ -280,6 +278,7 @@ AsyncInputStream::DeferredSeek() noexcept } catch (...) { seek_state = SeekState::NONE; postponed_exception = std::current_exception(); + caller_cond.notify_one(); InvokeOnAvailable(); } } diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx index 3563c162f..4a6de1fda 100644 --- a/src/input/AsyncInputStream.hxx +++ b/src/input/AsyncInputStream.hxx @@ -21,6 +21,7 @@ #define MPD_ASYNC_INPUT_STREAM_HXX #include "InputStream.hxx" +#include "thread/Cond.hxx" #include "event/InjectEvent.hxx" #include "util/HugeAllocator.hxx" #include "util/CircularBuffer.hxx" @@ -41,6 +42,11 @@ class AsyncInputStream : public InputStream { InjectEvent deferred_resume; InjectEvent deferred_seek; + /** + * Signalled when the caller shall be woken up. + */ + Cond caller_cond; + HugeArray allocation; CircularBuffer buffer; diff --git a/src/input/ThreadInputStream.cxx b/src/input/ThreadInputStream.cxx index 030bb9ad8..de9bae5ae 100644 --- a/src/input/ThreadInputStream.cxx +++ b/src/input/ThreadInputStream.cxx @@ -18,7 +18,6 @@ */ #include "ThreadInputStream.hxx" -#include "CondHandler.hxx" #include "thread/Name.hxx" #include @@ -95,10 +94,12 @@ ThreadInputStream::ThreadFunc() noexcept nbytes = ThreadRead(w.data, w.size); } catch (...) { postponed_exception = std::current_exception(); + caller_cond.notify_one(); InvokeOnAvailable(); break; } + caller_cond.notify_one(); InvokeOnAvailable(); if (nbytes == 0) { @@ -136,8 +137,6 @@ ThreadInputStream::Read(std::unique_lock &lock, { assert(!thread.IsInside()); - CondInputStreamHandler cond_handler; - while (true) { if (postponed_exception) std::rethrow_exception(postponed_exception); @@ -155,8 +154,7 @@ ThreadInputStream::Read(std::unique_lock &lock, if (eof) return 0; - const ScopeExchangeInputStreamHandler h(*this, &cond_handler); - cond_handler.cond.wait(lock); + caller_cond.wait(lock); } } diff --git a/src/input/ThreadInputStream.hxx b/src/input/ThreadInputStream.hxx index 131c79d9a..38a90ca1d 100644 --- a/src/input/ThreadInputStream.hxx +++ b/src/input/ThreadInputStream.hxx @@ -56,6 +56,11 @@ class ThreadInputStream : public InputStream { */ Cond wake_cond; + /** + * Signalled when the caller shall be woken up. + */ + Cond caller_cond; + std::exception_ptr postponed_exception; HugeArray allocation;