From b050e0132ed3a207f86545773eaecb8ff11c1b2b Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Mon, 29 Jul 2024 23:10:46 +0200 Subject: [PATCH] input/{async,thread}: add an additional Cond field This eliminates the ScopeExchangeInputStreamHandler kludge. --- src/input/AsyncInputStream.cxx | 23 +++++++++++------------ src/input/AsyncInputStream.hxx | 6 ++++++ src/input/ThreadInputStream.cxx | 8 +++----- src/input/ThreadInputStream.hxx | 5 +++++ 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx index f158264e1..b6e464be5 100644 --- a/src/input/AsyncInputStream.cxx +++ b/src/input/AsyncInputStream.cxx @@ -2,9 +2,7 @@ // Copyright The Music Player Daemon Project #include "AsyncInputStream.hxx" -#include "CondHandler.hxx" #include "tag/Tag.hxx" -#include "thread/Cond.hxx" #include "event/Loop.hxx" #include @@ -126,10 +124,7 @@ AsyncInputStream::Seek(std::unique_lock &lock, deferred_seek.Schedule(); - CondInputStreamHandler cond_handler; - const ScopeExchangeInputStreamHandler h(*this, &cond_handler); - cond_handler.cond.wait(lock, - [this]{ return seek_state == SeekState::NONE; }); + caller_cond.wait(lock, [this]{ return seek_state == SeekState::NONE; }); Check(); } @@ -146,6 +141,7 @@ AsyncInputStream::SeekDone() noexcept open = true; seek_state = SeekState::NONE; + caller_cond.notify_one(); InvokeOnAvailable(); } @@ -169,8 +165,6 @@ AsyncInputStream::Read(std::unique_lock &lock, { assert(!GetEventLoop().IsInside()); - CondInputStreamHandler cond_handler; - /* wait for data */ CircularBuffer::Range r; while (true) { @@ -180,8 +174,7 @@ AsyncInputStream::Read(std::unique_lock &lock, if (!r.empty() || IsEOF()) break; - const ScopeExchangeInputStreamHandler h(*this, &cond_handler); - cond_handler.cond.wait(lock); + caller_cond.wait(lock); } const size_t nbytes = std::min(dest.size(), r.size()); @@ -203,8 +196,10 @@ AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept if (!IsReady()) SetReady(); - else + else { + caller_cond.notify_one(); InvokeOnAvailable(); + } } void @@ -234,8 +229,10 @@ AsyncInputStream::AppendToBuffer(std::span src) noexcept if (!IsReady()) SetReady(); - else + else { + caller_cond.notify_one(); InvokeOnAvailable(); + } } void @@ -247,6 +244,7 @@ AsyncInputStream::DeferredResume() noexcept Resume(); } catch (...) { postponed_exception = std::current_exception(); + caller_cond.notify_one(); InvokeOnAvailable(); } } @@ -269,6 +267,7 @@ AsyncInputStream::DeferredSeek() noexcept } catch (...) { seek_state = SeekState::NONE; postponed_exception = std::current_exception(); + caller_cond.notify_one(); InvokeOnAvailable(); } } diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx index b9a0a5054..b767005e6 100644 --- a/src/input/AsyncInputStream.hxx +++ b/src/input/AsyncInputStream.hxx @@ -4,6 +4,7 @@ #pragma once #include "InputStream.hxx" +#include "thread/Cond.hxx" #include "event/InjectEvent.hxx" #include "util/HugeAllocator.hxx" #include "util/CircularBuffer.hxx" @@ -21,6 +22,11 @@ class AsyncInputStream : public InputStream { InjectEvent deferred_resume; InjectEvent deferred_seek; + /** + * Signalled when the caller shall be woken up. + */ + Cond caller_cond; + HugeArray allocation; CircularBuffer buffer{allocation}; diff --git a/src/input/ThreadInputStream.cxx b/src/input/ThreadInputStream.cxx index 145488561..9c159f016 100644 --- a/src/input/ThreadInputStream.cxx +++ b/src/input/ThreadInputStream.cxx @@ -2,7 +2,6 @@ // Copyright The Music Player Daemon Project #include "ThreadInputStream.hxx" -#include "CondHandler.hxx" #include "thread/Name.hxx" #include @@ -79,10 +78,12 @@ ThreadInputStream::ThreadFunc() noexcept nbytes = ThreadRead(w); } catch (...) { postponed_exception = std::current_exception(); + caller_cond.notify_one(); InvokeOnAvailable(); break; } + caller_cond.notify_one(); InvokeOnAvailable(); if (nbytes == 0) { @@ -120,8 +121,6 @@ ThreadInputStream::Read(std::unique_lock &lock, { assert(!thread.IsInside()); - CondInputStreamHandler cond_handler; - while (true) { if (postponed_exception) std::rethrow_exception(postponed_exception); @@ -139,8 +138,7 @@ ThreadInputStream::Read(std::unique_lock &lock, if (eof) return 0; - const ScopeExchangeInputStreamHandler h(*this, &cond_handler); - cond_handler.cond.wait(lock); + caller_cond.wait(lock); } } diff --git a/src/input/ThreadInputStream.hxx b/src/input/ThreadInputStream.hxx index 1a9898f69..44f7d8da3 100644 --- a/src/input/ThreadInputStream.hxx +++ b/src/input/ThreadInputStream.hxx @@ -39,6 +39,11 @@ class ThreadInputStream : public InputStream { */ Cond wake_cond; + /** + * Signalled when the caller shall be woken up. + */ + Cond caller_cond; + std::exception_ptr postponed_exception; HugeArray allocation;