output/snapcast: implement Drain()

This commit is contained in:
Max Kellermann 2021-02-22 22:47:21 +01:00
parent a8a39b6a38
commit 8e8fbe14b1
4 changed files with 48 additions and 1 deletions

View File

@ -76,6 +76,10 @@ SnapcastClient::LockPopQueue() noexcept
auto chunk = std::move(chunks.front()); auto chunk = std::move(chunks.front());
chunks.pop(); chunks.pop();
if (chunks.empty())
output.drain_cond.notify_one();
return chunk; return chunk;
} }

View File

@ -65,6 +65,13 @@ public:
*/ */
void Push(SnapcastChunkPtr chunk) noexcept; void Push(SnapcastChunkPtr chunk) noexcept;
/**
* Caller must lock the mutex.
*/
bool IsDrained() const noexcept {
return chunks.empty();
}
/** /**
* Caller must lock the mutex. * Caller must lock the mutex.
*/ */

View File

@ -24,6 +24,7 @@
#include "output/Interface.hxx" #include "output/Interface.hxx"
#include "output/Timer.hxx" #include "output/Timer.hxx"
#include "thread/Mutex.hxx" #include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
#include "event/ServerSocket.hxx" #include "event/ServerSocket.hxx"
#include "event/InjectEvent.hxx" #include "event/InjectEvent.hxx"
#include "util/AllocatedArray.hxx" #include "util/AllocatedArray.hxx"
@ -82,6 +83,12 @@ public:
*/ */
mutable Mutex mutex; mutable Mutex mutex;
/**
* This cond is signalled when a #SnapcastClient has an empty
* queue.
*/
Cond drain_cond;
SnapcastOutput(EventLoop &_loop, const ConfigBlock &block); SnapcastOutput(EventLoop &_loop, const ConfigBlock &block);
~SnapcastOutput() noexcept override; ~SnapcastOutput() noexcept override;
@ -162,13 +169,19 @@ public:
size_t Play(const void *chunk, size_t size) override; size_t Play(const void *chunk, size_t size) override;
// TODO: void Drain() override; void Drain() override;
void Cancel() noexcept override; void Cancel() noexcept override;
bool Pause() override; bool Pause() override;
private: private:
void OnInject() noexcept; void OnInject() noexcept;
/**
* Caller must lock the mutex.
*/
[[gnu::pure]]
bool IsDrained() const noexcept;
/* virtual methods from class ServerSocket */ /* virtual methods from class ServerSocket */
void OnAccept(UniqueSocketDescriptor fd, void OnAccept(UniqueSocketDescriptor fd,
SocketAddress address, int uid) noexcept override; SocketAddress address, int uid) noexcept override;

View File

@ -181,6 +181,9 @@ SnapcastOutput::RemoveClient(SnapcastClient &client) noexcept
client.unlink(); client.unlink();
delete &client; delete &client;
if (clients.empty())
drain_cond.notify_one();
} }
std::chrono::steady_clock::duration std::chrono::steady_clock::duration
@ -261,6 +264,26 @@ SnapcastOutput::Pause()
return true; return true;
} }
inline bool
SnapcastOutput::IsDrained() const noexcept
{
if (!chunks.empty())
return false;
for (const auto &client : clients)
if (!client.IsDrained())
return false;
return true;
}
void
SnapcastOutput::Drain()
{
std::unique_lock<Mutex> protect(mutex);
drain_cond.wait(protect, [this]{ return IsDrained(); });
}
void void
SnapcastOutput::Cancel() noexcept SnapcastOutput::Cancel() noexcept
{ {