thread/Cond: add wait() overload which takes a unique_lock<>

Just like std::condition_variable, which however has no way to specify
the std::mutex directly.
This commit is contained in:
Max Kellermann 2019-04-25 18:53:38 +02:00
parent b51bae5500
commit 92022658f9
18 changed files with 66 additions and 42 deletions

View File

@ -270,7 +270,7 @@ GetChromaprintCommand::OpenUri(const char *uri2)
auto is = InputStream::Open(uri2, mutex); auto is = InputStream::Open(uri2, mutex);
is->SetHandler(this); is->SetHandler(this);
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
while (true) { while (true) {
if (cancel) if (cancel)
throw StopDecoder(); throw StopDecoder();
@ -281,7 +281,7 @@ GetChromaprintCommand::OpenUri(const char *uri2)
return is; return is;
} }
cond.wait(mutex); cond.wait(lock);
} }
} }
@ -294,7 +294,7 @@ GetChromaprintCommand::Read(InputStream &is, void *buffer, size_t length)
if (length == 0) if (length == 0)
return 0; return 0;
std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
while (true) { while (true) {
if (cancel) if (cancel)
@ -303,7 +303,7 @@ GetChromaprintCommand::Read(InputStream &is, void *buffer, size_t length)
if (is.IsAvailable()) if (is.IsAvailable())
break; break;
cond.wait(mutex); cond.wait(lock);
} }
return is.Read(buffer, length); return is.Read(buffer, length);

View File

@ -366,7 +366,7 @@ DecoderBridge::OpenUri(const char *uri)
auto is = InputStream::Open(uri, mutex); auto is = InputStream::Open(uri, mutex);
is->SetHandler(&dc); is->SetHandler(&dc);
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
while (true) { while (true) {
if (dc.command == DecoderCommand::STOP) if (dc.command == DecoderCommand::STOP)
throw StopDecoder(); throw StopDecoder();
@ -377,7 +377,7 @@ DecoderBridge::OpenUri(const char *uri)
return is; return is;
} }
cond.wait(mutex); cond.wait(lock);
} }
} }
@ -391,7 +391,7 @@ try {
if (length == 0) if (length == 0)
return 0; return 0;
std::lock_guard<Mutex> lock(is.mutex); std::unique_lock<Mutex> lock(is.mutex);
while (true) { while (true) {
if (CheckCancelRead()) if (CheckCancelRead())
@ -400,7 +400,7 @@ try {
if (is.IsAvailable()) if (is.IsAvailable())
break; break;
dc.cond.wait(is.mutex); dc.cond.wait(lock);
} }
size_t nbytes = is.Read(buffer, length); size_t nbytes = is.Read(buffer, length);

View File

@ -52,9 +52,9 @@ public:
defer_event.Schedule(); defer_event.Schedule();
{ {
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
while (!done) while (!done)
cond.wait(mutex); cond.wait(lock);
} }
if (exception) if (exception)

View File

@ -149,7 +149,7 @@ BufferedInputStream::RunThread() noexcept
{ {
SetThreadName("input_buffered"); SetThreadName("input_buffered");
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
while (!stop) { while (!stop) {
assert(size == buffer.size()); assert(size == buffer.size());
@ -205,6 +205,6 @@ BufferedInputStream::RunThread() noexcept
client_cond.notify_one(); client_cond.notify_one();
InvokeOnAvailable(); InvokeOnAvailable();
} else } else
wake_cond.wait(mutex); wake_cond.wait(lock);
} }
} }

View File

@ -57,14 +57,14 @@ InputStream::OpenReady(const char *uri, Mutex &mutex)
is->SetHandler(&handler); is->SetHandler(&handler);
{ {
const std::lock_guard<Mutex> protect(mutex); std::unique_lock<Mutex> lock(mutex);
while (true) { while (true) {
is->Update(); is->Update();
if (is->IsReady()) if (is->IsReady())
break; break;
handler.cond.wait(mutex); handler.cond.wait(lock);
} }
is->Check(); is->Check();

View File

@ -67,7 +67,7 @@ ThreadInputStream::ThreadFunc() noexcept
{ {
FormatThreadName("input:%s", plugin); FormatThreadName("input:%s", plugin);
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
try { try {
Open(); Open();
@ -85,7 +85,7 @@ ThreadInputStream::ThreadFunc() noexcept
auto w = buffer.Write(); auto w = buffer.Write();
if (w.empty()) { if (w.empty()) {
wake_cond.wait(mutex); wake_cond.wait(lock);
} else { } else {
size_t nbytes; size_t nbytes;

View File

@ -59,9 +59,9 @@ public:
private: private:
bool LockWaitFinished() noexcept { bool LockWaitFinished() noexcept {
const std::lock_guard<Mutex> protect(mutex); std::unique_lock<Mutex> lock(mutex);
while (!finished) while (!finished)
if (!cond.wait_for(mutex, timeout)) if (!cond.wait_for(lock, timeout))
return false; return false;
return true; return true;

View File

@ -238,7 +238,7 @@ SmbclientNeighborExplorer::ThreadFunc() noexcept
{ {
SetThreadName("smbclient"); SetThreadName("smbclient");
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
while (!quit) { while (!quit) {
Run(); Run();
@ -247,7 +247,7 @@ SmbclientNeighborExplorer::ThreadFunc() noexcept
break; break;
// TODO: sleep for how long? // TODO: sleep for how long?
cond.wait_for(mutex, std::chrono::seconds(10)); cond.wait_for(lock, std::chrono::seconds(10));
} }
} }

View File

@ -412,7 +412,7 @@ AudioOutputControl::Task() noexcept
SetThreadTimerSlackUS(100); SetThreadTimerSlackUS(100);
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
while (true) { while (true) {
switch (command) { switch (command) {
@ -516,7 +516,7 @@ AudioOutputControl::Task() noexcept
if (command == Command::NONE) { if (command == Command::NONE) {
woken_for_play = false; woken_for_play = false;
wake_cond.wait(mutex); wake_cond.wait(lock);
} }
} }
} }

View File

@ -796,7 +796,7 @@ AlsaOutput::DrainInternal()
void void
AlsaOutput::Drain() AlsaOutput::Drain()
{ {
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
if (error) if (error)
std::rethrow_exception(error); std::rethrow_exception(error);
@ -806,7 +806,7 @@ AlsaOutput::Drain()
Activate(); Activate();
while (drain && active) while (drain && active)
cond.wait(mutex); cond.wait(lock);
if (error) if (error)
std::rethrow_exception(error); std::rethrow_exception(error);
@ -882,7 +882,7 @@ AlsaOutput::Play(const void *chunk, size_t size)
been played */ been played */
return size; return size;
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
while (true) { while (true) {
if (error) if (error)
@ -905,7 +905,7 @@ AlsaOutput::Play(const void *chunk, size_t size)
/* wait for the DispatchSockets() to make room in the /* wait for the DispatchSockets() to make room in the
ring_buffer */ ring_buffer */
cond.wait(mutex); cond.wait(lock);
} }
} }

View File

@ -277,9 +277,9 @@ HttpdOutput::BroadcastFromEncoder()
{ {
/* synchronize with the IOThread */ /* synchronize with the IOThread */
{ {
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
while (!pages.empty()) while (!pages.empty())
cond.wait(mutex); cond.wait(lock);
} }
bool empty = true; bool empty = true;

View File

@ -317,13 +317,13 @@ SlesOutput::Play(const void *chunk, size_t size)
pause = false; pause = false;
} }
const std::lock_guard<Mutex> protect(mutex); std::unique_lock<Mutex> lock(mutex);
assert(filled < BUFFER_SIZE); assert(filled < BUFFER_SIZE);
while (n_queued == N_BUFFERS) { while (n_queued == N_BUFFERS) {
assert(filled == 0); assert(filled == 0);
cond.wait(mutex); cond.wait(lock);
} }
size_t nbytes = std::min(BUFFER_SIZE - filled, size); size_t nbytes = std::min(BUFFER_SIZE - filled, size);
@ -346,12 +346,12 @@ SlesOutput::Play(const void *chunk, size_t size)
void void
SlesOutput::Drain() SlesOutput::Drain()
{ {
const std::lock_guard<Mutex> protect(mutex); std::unique_lock<Mutex> lock(mutex);
assert(filled < BUFFER_SIZE); assert(filled < BUFFER_SIZE);
while (n_queued > 0) while (n_queued > 0)
cond.wait(mutex); cond.wait(lock);
} }
void void

View File

@ -124,9 +124,9 @@ public:
} }
void Wait() { void Wait() {
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
while (!done) while (!done)
cond.wait(mutex); cond.wait(lock);
if (postponed_error) if (postponed_error)
std::rethrow_exception(postponed_error); std::rethrow_exception(postponed_error);

View File

@ -167,7 +167,7 @@ private:
} }
void WaitConnected() { void WaitConnected() {
const std::lock_guard<Mutex> protect(mutex); std::unique_lock<Mutex> lock(mutex);
while (true) { while (true) {
switch (state) { switch (state) {
@ -179,7 +179,7 @@ private:
} }
if (state == State::INITIAL) if (state == State::INITIAL)
cond.wait(mutex); cond.wait(lock);
break; break;
case State::CONNECTING: case State::CONNECTING:

View File

@ -198,7 +198,7 @@ UdisksStorage::OnListReply(ODBus::Message reply) noexcept
void void
UdisksStorage::MountWait() UdisksStorage::MountWait()
{ {
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
if (mounted_storage) if (mounted_storage)
/* already mounted */ /* already mounted */
@ -210,7 +210,7 @@ UdisksStorage::MountWait()
} }
while (want_mount) while (want_mount)
cond.wait(mutex); cond.wait(lock);
if (mount_error) if (mount_error)
std::rethrow_exception(mount_error); std::rethrow_exception(mount_error);
@ -272,7 +272,7 @@ try {
void void
UdisksStorage::UnmountWait() UdisksStorage::UnmountWait()
{ {
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
if (!mounted_storage) if (!mounted_storage)
/* not mounted */ /* not mounted */
@ -281,7 +281,7 @@ UdisksStorage::UnmountWait()
defer_unmount.Schedule(); defer_unmount.Schedule();
while (mounted_storage) while (mounted_storage)
cond.wait(mutex); cond.wait(lock);
if (mount_error) if (mount_error)
std::rethrow_exception(mount_error); std::rethrow_exception(mount_error);

View File

@ -33,6 +33,7 @@
#include "PosixMutex.hxx" #include "PosixMutex.hxx"
#include <chrono> #include <chrono>
#include <mutex>
#include <sys/time.h> #include <sys/time.h>
@ -74,6 +75,11 @@ public:
pthread_cond_wait(&cond, &mutex.mutex); pthread_cond_wait(&cond, &mutex.mutex);
} }
template<typename M>
void wait(std::unique_lock<M> &lock) noexcept {
wait(*lock.mutex());
}
private: private:
bool wait_for(PosixMutex &mutex, uint_least32_t timeout_us) noexcept { bool wait_for(PosixMutex &mutex, uint_least32_t timeout_us) noexcept {
struct timeval now; struct timeval now;
@ -102,6 +108,12 @@ public:
return wait_for(mutex, timeout_us); return wait_for(mutex, timeout_us);
} }
template<typename M>
bool wait_for(std::unique_lock<M> &lock,
std::chrono::steady_clock::duration timeout) noexcept {
return wait_for(*lock.mutex(), timeout);
}
}; };
#endif #endif

View File

@ -33,6 +33,7 @@
#include "CriticalSection.hxx" #include "CriticalSection.hxx"
#include <chrono> #include <chrono>
#include <mutex>
/** /**
* Wrapper for a CONDITION_VARIABLE, backend for the Cond class. * Wrapper for a CONDITION_VARIABLE, backend for the Cond class.
@ -69,9 +70,20 @@ public:
return wait_for(mutex, timeout_ms); return wait_for(mutex, timeout_ms);
} }
template<typename M>
bool wait_for(std::unique_lock<M> &lock,
std::chrono::steady_clock::duration timeout) noexcept {
return wait_for(*lock.mutex(), timeout);
}
void wait(CriticalSection &mutex) noexcept { void wait(CriticalSection &mutex) noexcept {
wait_for(mutex, INFINITE); wait_for(mutex, INFINITE);
} }
template<typename M>
void wait(std::unique_lock<M> &lock) noexcept {
wait(*lock.mutex());
}
}; };
#endif #endif

View File

@ -175,9 +175,9 @@ class DumpRemoteTagHandler final : public RemoteTagHandler {
public: public:
Tag Wait() { Tag Wait() {
const std::lock_guard<Mutex> lock(mutex); std::unique_lock<Mutex> lock(mutex);
while (!done) while (!done)
cond.wait(mutex); cond.wait(lock);
if (error) if (error)
std::rethrow_exception(error); std::rethrow_exception(error);