input/{async,thread}: add an additional Cond field
This eliminates the ScopeExchangeInputStreamHandler kludge.
This commit is contained in:
parent
cf962d94c7
commit
b050e0132e
|
@ -2,9 +2,7 @@
|
||||||
// Copyright The Music Player Daemon Project
|
// Copyright The Music Player Daemon Project
|
||||||
|
|
||||||
#include "AsyncInputStream.hxx"
|
#include "AsyncInputStream.hxx"
|
||||||
#include "CondHandler.hxx"
|
|
||||||
#include "tag/Tag.hxx"
|
#include "tag/Tag.hxx"
|
||||||
#include "thread/Cond.hxx"
|
|
||||||
#include "event/Loop.hxx"
|
#include "event/Loop.hxx"
|
||||||
|
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
@ -126,10 +124,7 @@ AsyncInputStream::Seek(std::unique_lock<Mutex> &lock,
|
||||||
|
|
||||||
deferred_seek.Schedule();
|
deferred_seek.Schedule();
|
||||||
|
|
||||||
CondInputStreamHandler cond_handler;
|
caller_cond.wait(lock, [this]{ return seek_state == SeekState::NONE; });
|
||||||
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
|
|
||||||
cond_handler.cond.wait(lock,
|
|
||||||
[this]{ return seek_state == SeekState::NONE; });
|
|
||||||
|
|
||||||
Check();
|
Check();
|
||||||
}
|
}
|
||||||
|
@ -146,6 +141,7 @@ AsyncInputStream::SeekDone() noexcept
|
||||||
open = true;
|
open = true;
|
||||||
|
|
||||||
seek_state = SeekState::NONE;
|
seek_state = SeekState::NONE;
|
||||||
|
caller_cond.notify_one();
|
||||||
InvokeOnAvailable();
|
InvokeOnAvailable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,8 +165,6 @@ AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
|
||||||
{
|
{
|
||||||
assert(!GetEventLoop().IsInside());
|
assert(!GetEventLoop().IsInside());
|
||||||
|
|
||||||
CondInputStreamHandler cond_handler;
|
|
||||||
|
|
||||||
/* wait for data */
|
/* wait for data */
|
||||||
CircularBuffer<std::byte>::Range r;
|
CircularBuffer<std::byte>::Range r;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -180,8 +174,7 @@ AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
|
||||||
if (!r.empty() || IsEOF())
|
if (!r.empty() || IsEOF())
|
||||||
break;
|
break;
|
||||||
|
|
||||||
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
|
caller_cond.wait(lock);
|
||||||
cond_handler.cond.wait(lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const size_t nbytes = std::min(dest.size(), r.size());
|
const size_t nbytes = std::min(dest.size(), r.size());
|
||||||
|
@ -203,8 +196,10 @@ AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept
|
||||||
|
|
||||||
if (!IsReady())
|
if (!IsReady())
|
||||||
SetReady();
|
SetReady();
|
||||||
else
|
else {
|
||||||
|
caller_cond.notify_one();
|
||||||
InvokeOnAvailable();
|
InvokeOnAvailable();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@ -234,8 +229,10 @@ AsyncInputStream::AppendToBuffer(std::span<const std::byte> src) noexcept
|
||||||
|
|
||||||
if (!IsReady())
|
if (!IsReady())
|
||||||
SetReady();
|
SetReady();
|
||||||
else
|
else {
|
||||||
|
caller_cond.notify_one();
|
||||||
InvokeOnAvailable();
|
InvokeOnAvailable();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@ -247,6 +244,7 @@ AsyncInputStream::DeferredResume() noexcept
|
||||||
Resume();
|
Resume();
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
postponed_exception = std::current_exception();
|
postponed_exception = std::current_exception();
|
||||||
|
caller_cond.notify_one();
|
||||||
InvokeOnAvailable();
|
InvokeOnAvailable();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -269,6 +267,7 @@ AsyncInputStream::DeferredSeek() noexcept
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
seek_state = SeekState::NONE;
|
seek_state = SeekState::NONE;
|
||||||
postponed_exception = std::current_exception();
|
postponed_exception = std::current_exception();
|
||||||
|
caller_cond.notify_one();
|
||||||
InvokeOnAvailable();
|
InvokeOnAvailable();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "InputStream.hxx"
|
#include "InputStream.hxx"
|
||||||
|
#include "thread/Cond.hxx"
|
||||||
#include "event/InjectEvent.hxx"
|
#include "event/InjectEvent.hxx"
|
||||||
#include "util/HugeAllocator.hxx"
|
#include "util/HugeAllocator.hxx"
|
||||||
#include "util/CircularBuffer.hxx"
|
#include "util/CircularBuffer.hxx"
|
||||||
|
@ -21,6 +22,11 @@ class AsyncInputStream : public InputStream {
|
||||||
InjectEvent deferred_resume;
|
InjectEvent deferred_resume;
|
||||||
InjectEvent deferred_seek;
|
InjectEvent deferred_seek;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signalled when the caller shall be woken up.
|
||||||
|
*/
|
||||||
|
Cond caller_cond;
|
||||||
|
|
||||||
HugeArray<std::byte> allocation;
|
HugeArray<std::byte> allocation;
|
||||||
|
|
||||||
CircularBuffer<std::byte> buffer{allocation};
|
CircularBuffer<std::byte> buffer{allocation};
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
// Copyright The Music Player Daemon Project
|
// Copyright The Music Player Daemon Project
|
||||||
|
|
||||||
#include "ThreadInputStream.hxx"
|
#include "ThreadInputStream.hxx"
|
||||||
#include "CondHandler.hxx"
|
|
||||||
#include "thread/Name.hxx"
|
#include "thread/Name.hxx"
|
||||||
|
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
@ -79,10 +78,12 @@ ThreadInputStream::ThreadFunc() noexcept
|
||||||
nbytes = ThreadRead(w);
|
nbytes = ThreadRead(w);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
postponed_exception = std::current_exception();
|
postponed_exception = std::current_exception();
|
||||||
|
caller_cond.notify_one();
|
||||||
InvokeOnAvailable();
|
InvokeOnAvailable();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
caller_cond.notify_one();
|
||||||
InvokeOnAvailable();
|
InvokeOnAvailable();
|
||||||
|
|
||||||
if (nbytes == 0) {
|
if (nbytes == 0) {
|
||||||
|
@ -120,8 +121,6 @@ ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
|
||||||
{
|
{
|
||||||
assert(!thread.IsInside());
|
assert(!thread.IsInside());
|
||||||
|
|
||||||
CondInputStreamHandler cond_handler;
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (postponed_exception)
|
if (postponed_exception)
|
||||||
std::rethrow_exception(postponed_exception);
|
std::rethrow_exception(postponed_exception);
|
||||||
|
@ -139,8 +138,7 @@ ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
|
||||||
if (eof)
|
if (eof)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
|
caller_cond.wait(lock);
|
||||||
cond_handler.cond.wait(lock);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,11 @@ class ThreadInputStream : public InputStream {
|
||||||
*/
|
*/
|
||||||
Cond wake_cond;
|
Cond wake_cond;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signalled when the caller shall be woken up.
|
||||||
|
*/
|
||||||
|
Cond caller_cond;
|
||||||
|
|
||||||
std::exception_ptr postponed_exception;
|
std::exception_ptr postponed_exception;
|
||||||
|
|
||||||
HugeArray<std::byte> allocation;
|
HugeArray<std::byte> allocation;
|
||||||
|
|
Loading…
Reference in New Issue