From a8a39b6a380f0f4661214dbb4b67b21ef551e507 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Mon, 22 Feb 2021 16:31:26 +0100 Subject: [PATCH] output/snapcast: queue chunks --- src/output/plugins/snapcast/Chunk.hxx | 55 +++++++++++++++++++ src/output/plugins/snapcast/Client.cxx | 49 +++++++++++++++-- src/output/plugins/snapcast/Client.hxx | 24 +++++++- src/output/plugins/snapcast/Internal.hxx | 13 +++-- .../plugins/snapcast/SnapcastOutputPlugin.cxx | 44 ++++++++++----- 5 files changed, 160 insertions(+), 25 deletions(-) create mode 100644 src/output/plugins/snapcast/Chunk.hxx diff --git a/src/output/plugins/snapcast/Chunk.hxx b/src/output/plugins/snapcast/Chunk.hxx new file mode 100644 index 000000000..e06b216e6 --- /dev/null +++ b/src/output/plugins/snapcast/Chunk.hxx @@ -0,0 +1,55 @@ +/* + * Copyright 2003-2021 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_SNAPCAST_CHUNK_HXX +#define MPD_SNAPCAST_CHUNK_HXX + +#include "util/AllocatedArray.hxx" + +#include +#include +#include +#include +#include + +/** + * A chunk of data to be transmitted to connected Snapcast clients. + */ +struct SnapcastChunk { + std::chrono::steady_clock::time_point time; + AllocatedArray payload; + + SnapcastChunk(std::chrono::steady_clock::time_point _time, + AllocatedArray &&_payload) noexcept + :time(_time), payload(std::move(_payload)) {} +}; + +using SnapcastChunkPtr = std::shared_ptr; + +using SnapcastChunkQueue = std::queue>; + +inline void +ClearQueue(SnapcastChunkQueue &q) noexcept +{ + while (!q.empty()) + q.pop(); +} + +#endif diff --git a/src/output/plugins/snapcast/Client.cxx b/src/output/plugins/snapcast/Client.cxx index 5c2e71904..8434258b2 100644 --- a/src/output/plugins/snapcast/Client.cxx +++ b/src/output/plugins/snapcast/Client.cxx @@ -57,12 +57,50 @@ SnapcastClient::LockClose() noexcept Close(); } +void +SnapcastClient::Push(SnapcastChunkPtr chunk) noexcept +{ + if (!active) + return; + + chunks.emplace(std::move(chunk)); + event.ScheduleWrite(); +} + +SnapcastChunkPtr +SnapcastClient::LockPopQueue() noexcept +{ + const std::lock_guard protect(output.mutex); + if (chunks.empty()) + return nullptr; + + auto chunk = std::move(chunks.front()); + chunks.pop(); + return chunk; +} + void SnapcastClient::OnSocketReady(unsigned flags) noexcept { - if (flags & SocketEvent::WRITE) - // TODO - {} + if (flags & SocketEvent::WRITE) { + constexpr auto max_age = std::chrono::milliseconds(500); + const auto min_time = GetEventLoop().SteadyNow() - max_age; + + while (auto chunk = LockPopQueue()) { + if (chunk->time < min_time) + /* discard old chunks */ + continue; + + const ConstBuffer payload = chunk->payload; + if (!SendWireChunk(payload.ToVoid(), chunk->time)) { + // TODO: handle EAGAIN + LockClose(); + return; + } + } + + event.CancelWrite(); + } BufferedSocket::OnSocketReady(flags); } @@ -187,12 +225,11 @@ SendWireChunk(SocketDescriptor s, const PackedBE16 id, return SendT(s, base) && SendT(s, hdr) && Send(s, payload); } -void +bool SnapcastClient::SendWireChunk(ConstBuffer payload, std::chrono::steady_clock::time_point t) noexcept { - if (active) - ::SendWireChunk(GetSocket(), next_id++, payload, t); + return ::SendWireChunk(GetSocket(), next_id++, payload, t); } BufferedSocket::InputResult diff --git a/src/output/plugins/snapcast/Client.hxx b/src/output/plugins/snapcast/Client.hxx index 50ff1b992..d62e73a82 100644 --- a/src/output/plugins/snapcast/Client.hxx +++ b/src/output/plugins/snapcast/Client.hxx @@ -20,6 +20,7 @@ #ifndef MPD_OUTPUT_SNAPCAST_CLIENT_HXX #define MPD_OUTPUT_SNAPCAST_CLIENT_HXX +#include "Chunk.hxx" #include "event/BufferedSocket.hxx" #include "util/IntrusiveList.hxx" @@ -35,6 +36,11 @@ class SnapcastClient final : BufferedSocket, public IntrusiveListHook { SnapcastOutput &output; + /** + * A queue of #Page objects to be sent to the client. + */ + SnapcastChunkQueue chunks; + uint16_t next_id = 1; bool active = false; @@ -54,10 +60,24 @@ public: void LockClose() noexcept; - void SendWireChunk(ConstBuffer payload, - std::chrono::steady_clock::time_point t) noexcept; + /** + * Caller must lock the mutex. + */ + void Push(SnapcastChunkPtr chunk) noexcept; + + /** + * Caller must lock the mutex. + */ + void Cancel() noexcept { + ClearQueue(chunks); + } private: + SnapcastChunkPtr LockPopQueue() noexcept; + + bool SendWireChunk(ConstBuffer payload, + std::chrono::steady_clock::time_point t) noexcept; + bool SendServerSettings(const SnapcastBase &request) noexcept; bool SendCodecHeader(const SnapcastBase &request) noexcept; bool SendTime(const SnapcastBase &request_header, diff --git a/src/output/plugins/snapcast/Internal.hxx b/src/output/plugins/snapcast/Internal.hxx index db11cecf5..a9bbd1684 100644 --- a/src/output/plugins/snapcast/Internal.hxx +++ b/src/output/plugins/snapcast/Internal.hxx @@ -20,10 +20,12 @@ #ifndef MPD_OUTPUT_SNAPCAST_INTERNAL_HXX #define MPD_OUTPUT_SNAPCAST_INTERNAL_HXX +#include "Chunk.hxx" #include "output/Interface.hxx" #include "output/Timer.hxx" #include "thread/Mutex.hxx" #include "event/ServerSocket.hxx" +#include "event/InjectEvent.hxx" #include "util/AllocatedArray.hxx" #include "util/IntrusiveList.hxx" @@ -41,6 +43,8 @@ class SnapcastOutput final : AudioOutput, ServerSocket { */ bool open; + InjectEvent inject_event; + /** * The configured encoder plugin. */ @@ -69,10 +73,12 @@ class SnapcastOutput final : AudioOutput, ServerSocket { */ IntrusiveList clients; + SnapcastChunkQueue chunks; + public: /** - * This mutex protects the listener socket and the client - * list. + * This mutex protects the listener socket, the #clients list + * and the #chunks queue. */ mutable Mutex mutex; @@ -161,8 +167,7 @@ public: bool Pause() override; private: - void BroadcastWireChunk(ConstBuffer payload, - std::chrono::steady_clock::time_point t) noexcept; + void OnInject() noexcept; /* virtual methods from class ServerSocket */ void OnAccept(UniqueSocketDescriptor fd, diff --git a/src/output/plugins/snapcast/SnapcastOutputPlugin.cxx b/src/output/plugins/snapcast/SnapcastOutputPlugin.cxx index f80857a80..82b5846e3 100644 --- a/src/output/plugins/snapcast/SnapcastOutputPlugin.cxx +++ b/src/output/plugins/snapcast/SnapcastOutputPlugin.cxx @@ -40,6 +40,7 @@ SnapcastOutput::SnapcastOutput(EventLoop &_loop, const ConfigBlock &block) :AudioOutput(FLAG_ENABLE_DISABLE|FLAG_PAUSE| FLAG_NEED_FULLY_DEFINED_AUDIO_FORMAT), ServerSocket(_loop), + inject_event(_loop, BIND_THIS_METHOD(OnInject)), // TODO: support other encoder plugins? prepared_encoder(encoder_init(wave_encoder_plugin, block)) { @@ -146,15 +147,33 @@ SnapcastOutput::Close() noexcept delete timer; BlockingCall(GetEventLoop(), [this](){ + inject_event.Cancel(); + const std::lock_guard protect(mutex); open = false; clients.clear_and_dispose(DeleteDisposer{}); }); + ClearQueue(chunks); + codec_header = nullptr; delete encoder; } +void +SnapcastOutput::OnInject() noexcept +{ + const std::lock_guard protect(mutex); + + while (!chunks.empty()) { + const auto chunk = std::move(chunks.front()); + chunks.pop(); + + for (auto &client : clients) + client.Push(chunk); + } +} + void SnapcastOutput::RemoveClient(SnapcastClient &client) noexcept { @@ -185,17 +204,6 @@ SnapcastOutput::Delay() const noexcept : std::chrono::steady_clock::duration::zero(); } -inline void -SnapcastOutput::BroadcastWireChunk(ConstBuffer payload, - std::chrono::steady_clock::time_point t) noexcept -{ - const std::lock_guard protect(mutex); - - // TODO: no blocking send(), enqueue chunks, send() in I/O thread - for (auto &client : clients) - client.SendWireChunk(payload, t); -} - size_t SnapcastOutput::Play(const void *chunk, size_t size) { @@ -233,7 +241,12 @@ SnapcastOutput::Play(const void *chunk, size_t size) if (nbytes == 0) break; - BroadcastWireChunk({buffer, nbytes}, now); + const std::lock_guard protect(mutex); + if (chunks.empty()) + inject_event.Schedule(); + + const ConstBuffer payload{buffer, nbytes}; + chunks.emplace(std::make_shared(now, AllocatedArray{payload})); } return size; @@ -251,7 +264,12 @@ SnapcastOutput::Pause() void SnapcastOutput::Cancel() noexcept { - // TODO + const std::lock_guard protect(mutex); + + ClearQueue(chunks); + + for (auto &client : clients) + client.Cancel(); } const struct AudioOutputPlugin snapcast_output_plugin = {