From c9da3363a04bfdf8a0f0b2d974291e422a0345d2 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Tue, 31 Dec 2013 16:31:36 +0100 Subject: [PATCH] output/httpd: move all broadcast operations to the IOThread Add a Page queue to class HttpdOutput, and use DeferredMonitor to flush this queue inside the IOThread. This fixes a thread-safety issue: much of EventLoop is not thread-safe, and the httpd plugin ignored that problem. --- src/output/HttpdInternal.hxx | 21 +++++++++- src/output/HttpdOutputPlugin.cxx | 66 +++++++++++++++++++++++++++----- 2 files changed, 76 insertions(+), 11 deletions(-) diff --git a/src/output/HttpdInternal.hxx b/src/output/HttpdInternal.hxx index df9099335..f20d88b4e 100644 --- a/src/output/HttpdInternal.hxx +++ b/src/output/HttpdInternal.hxx @@ -29,6 +29,7 @@ #include "Timer.hxx" #include "thread/Mutex.hxx" #include "event/ServerSocket.hxx" +#include "event/DeferredMonitor.hxx" #ifdef _LIBCPP_VERSION /* can't use incomplete template arguments with libc++ */ @@ -36,6 +37,8 @@ #endif #include +#include +#include struct config_param; class Error; @@ -46,7 +49,7 @@ class Page; struct Encoder; struct Tag; -class HttpdOutput final : ServerSocket { +class HttpdOutput final : ServerSocket, DeferredMonitor { struct audio_output base; /** @@ -80,6 +83,12 @@ public: */ mutable Mutex mutex; + /** + * This condition gets signalled when an item is removed from + * #pages. + */ + Cond cond; + private: /** * A #Timer object to synchronize this output with the @@ -97,6 +106,14 @@ private: */ Page *metadata; + /** + * The page queue, i.e. pages from the encoder to be + * broadcasted to all clients. This container is necessary to + * pass pages from the OutputThread to the IOThread. It is + * protected by #mutex, and removing signals #cond. + */ + std::queue> pages; + public: /** * The configured name. @@ -248,6 +265,8 @@ public: void CancelAllClients(); private: + virtual void RunDeferred() override; + virtual void OnAccept(int fd, const sockaddr &address, size_t address_length, int uid) override; }; diff --git a/src/output/HttpdOutputPlugin.cxx b/src/output/HttpdOutputPlugin.cxx index 2790dd98d..fa6934ff0 100644 --- a/src/output/HttpdOutputPlugin.cxx +++ b/src/output/HttpdOutputPlugin.cxx @@ -29,6 +29,7 @@ #include "IcyMetaDataServer.hxx" #include "system/fd_util.h" #include "IOThread.hxx" +#include "event/Call.hxx" #include "util/Error.hxx" #include "util/Domain.hxx" #include "Log.hxx" @@ -49,7 +50,7 @@ const Domain httpd_output_domain("httpd_output"); inline HttpdOutput::HttpdOutput(EventLoop &_loop) - :ServerSocket(_loop), + :ServerSocket(_loop), DeferredMonitor(_loop), encoder(nullptr), unflushed_input(0), metadata(nullptr) { @@ -162,7 +163,7 @@ httpd_output_finish(struct audio_output *ao) inline void HttpdOutput::AddClient(int fd) { - clients.emplace_front(*this, fd, GetEventLoop(), + clients.emplace_front(*this, fd, ServerSocket::GetEventLoop(), encoder->plugin.tag == nullptr); ++clients_cnt; @@ -171,6 +172,29 @@ HttpdOutput::AddClient(int fd) clients.front().PushMetaData(metadata); } +void +HttpdOutput::RunDeferred() +{ + /* this method runs in the IOThread; it broadcasts pages from + our own queue to all clients */ + + const ScopeLock protect(mutex); + + while (!pages.empty()) { + Page *page = pages.front(); + pages.pop(); + + for (auto &client : clients) + client.PushPage(page); + + page->Unref(); + } + + /* wake up the client that may be waiting for the queue to be + flushed */ + cond.broadcast(); +} + void HttpdOutput::OnAccept(int fd, const sockaddr &address, size_t address_length, gcc_unused int uid) @@ -393,19 +417,29 @@ HttpdOutput::BroadcastPage(Page *page) { assert(page != nullptr); - const ScopeLock protect(mutex); - for (auto &client : clients) - client.PushPage(page); + mutex.lock(); + pages.push(page); + page->Ref(); + mutex.unlock(); + + DeferredMonitor::Schedule(); } void HttpdOutput::BroadcastFromEncoder() { + /* synchronize with the IOThread */ + mutex.lock(); + while (!pages.empty()) + cond.wait(mutex); + Page *page; - while ((page = ReadPage()) != nullptr) { - BroadcastPage(page); - page->Unref(); - } + while ((page = ReadPage()) != nullptr) + pages.push(page); + + mutex.unlock(); + + DeferredMonitor::Schedule(); } inline bool @@ -519,15 +553,27 @@ inline void HttpdOutput::CancelAllClients() { const ScopeLock protect(mutex); + + while (!pages.empty()) { + Page *page = pages.front(); + pages.pop(); + page->Unref(); + } + for (auto &client : clients) client.CancelQueue(); + + cond.broadcast(); } static void httpd_output_cancel(struct audio_output *ao) { HttpdOutput *httpd = HttpdOutput::Cast(ao); - httpd->CancelAllClients(); + + BlockingCall(io_thread_get(), [httpd](){ + httpd->CancelAllClients(); + }); } const struct audio_output_plugin httpd_output_plugin = {