input/{async,thread}: add an additional Cond field

This eliminates the ScopeExchangeInputStreamHandler kludge.
This commit is contained in:
Max Kellermann
2024-07-29 23:10:46 +02:00
parent 687475cf3c
commit bd78307940
4 changed files with 25 additions and 17 deletions

@ -18,9 +18,7 @@
*/ */
#include "AsyncInputStream.hxx" #include "AsyncInputStream.hxx"
#include "CondHandler.hxx"
#include "tag/Tag.hxx" #include "tag/Tag.hxx"
#include "thread/Cond.hxx"
#include "event/Loop.hxx" #include "event/Loop.hxx"
#include <cassert> #include <cassert>
@ -142,10 +140,7 @@ AsyncInputStream::Seek(std::unique_lock<Mutex> &lock,
deferred_seek.Schedule(); deferred_seek.Schedule();
CondInputStreamHandler cond_handler; caller_cond.wait(lock, [this]{ return seek_state == SeekState::NONE; });
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
cond_handler.cond.wait(lock,
[this]{ return seek_state == SeekState::NONE; });
Check(); Check();
} }
@ -162,6 +157,7 @@ AsyncInputStream::SeekDone() noexcept
open = true; open = true;
seek_state = SeekState::NONE; seek_state = SeekState::NONE;
caller_cond.notify_one();
InvokeOnAvailable(); InvokeOnAvailable();
} }
@ -185,8 +181,6 @@ AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
{ {
assert(!GetEventLoop().IsInside()); assert(!GetEventLoop().IsInside());
CondInputStreamHandler cond_handler;
/* wait for data */ /* wait for data */
CircularBuffer<uint8_t>::Range r; CircularBuffer<uint8_t>::Range r;
while (true) { while (true) {
@ -196,8 +190,7 @@ AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
if (!r.empty() || IsEOF()) if (!r.empty() || IsEOF())
break; break;
const ScopeExchangeInputStreamHandler h(*this, &cond_handler); caller_cond.wait(lock);
cond_handler.cond.wait(lock);
} }
const size_t nbytes = std::min(read_size, r.size); const size_t nbytes = std::min(read_size, r.size);
@ -219,8 +212,10 @@ AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept
if (!IsReady()) if (!IsReady())
SetReady(); SetReady();
else else {
caller_cond.notify_one();
InvokeOnAvailable(); InvokeOnAvailable();
}
} }
void void
@ -245,8 +240,10 @@ AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) noexcept
if (!IsReady()) if (!IsReady())
SetReady(); SetReady();
else else {
caller_cond.notify_one();
InvokeOnAvailable(); InvokeOnAvailable();
}
} }
void void
@ -258,6 +255,7 @@ AsyncInputStream::DeferredResume() noexcept
Resume(); Resume();
} catch (...) { } catch (...) {
postponed_exception = std::current_exception(); postponed_exception = std::current_exception();
caller_cond.notify_one();
InvokeOnAvailable(); InvokeOnAvailable();
} }
} }
@ -280,6 +278,7 @@ AsyncInputStream::DeferredSeek() noexcept
} catch (...) { } catch (...) {
seek_state = SeekState::NONE; seek_state = SeekState::NONE;
postponed_exception = std::current_exception(); postponed_exception = std::current_exception();
caller_cond.notify_one();
InvokeOnAvailable(); InvokeOnAvailable();
} }
} }

@ -21,6 +21,7 @@
#define MPD_ASYNC_INPUT_STREAM_HXX #define MPD_ASYNC_INPUT_STREAM_HXX
#include "InputStream.hxx" #include "InputStream.hxx"
#include "thread/Cond.hxx"
#include "event/InjectEvent.hxx" #include "event/InjectEvent.hxx"
#include "util/HugeAllocator.hxx" #include "util/HugeAllocator.hxx"
#include "util/CircularBuffer.hxx" #include "util/CircularBuffer.hxx"
@ -41,6 +42,11 @@ class AsyncInputStream : public InputStream {
InjectEvent deferred_resume; InjectEvent deferred_resume;
InjectEvent deferred_seek; InjectEvent deferred_seek;
/**
* Signalled when the caller shall be woken up.
*/
Cond caller_cond;
HugeArray<uint8_t> allocation; HugeArray<uint8_t> allocation;
CircularBuffer<uint8_t> buffer; CircularBuffer<uint8_t> buffer;

@ -18,7 +18,6 @@
*/ */
#include "ThreadInputStream.hxx" #include "ThreadInputStream.hxx"
#include "CondHandler.hxx"
#include "thread/Name.hxx" #include "thread/Name.hxx"
#include <cassert> #include <cassert>
@ -95,10 +94,12 @@ ThreadInputStream::ThreadFunc() noexcept
nbytes = ThreadRead(w.data, w.size); nbytes = ThreadRead(w.data, w.size);
} catch (...) { } catch (...) {
postponed_exception = std::current_exception(); postponed_exception = std::current_exception();
caller_cond.notify_one();
InvokeOnAvailable(); InvokeOnAvailable();
break; break;
} }
caller_cond.notify_one();
InvokeOnAvailable(); InvokeOnAvailable();
if (nbytes == 0) { if (nbytes == 0) {
@ -136,8 +137,6 @@ ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
{ {
assert(!thread.IsInside()); assert(!thread.IsInside());
CondInputStreamHandler cond_handler;
while (true) { while (true) {
if (postponed_exception) if (postponed_exception)
std::rethrow_exception(postponed_exception); std::rethrow_exception(postponed_exception);
@ -155,8 +154,7 @@ ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
if (eof) if (eof)
return 0; return 0;
const ScopeExchangeInputStreamHandler h(*this, &cond_handler); caller_cond.wait(lock);
cond_handler.cond.wait(lock);
} }
} }

@ -56,6 +56,11 @@ class ThreadInputStream : public InputStream {
*/ */
Cond wake_cond; Cond wake_cond;
/**
* Signalled when the caller shall be woken up.
*/
Cond caller_cond;
std::exception_ptr postponed_exception; std::exception_ptr postponed_exception;
HugeArray<uint8_t> allocation; HugeArray<uint8_t> allocation;