output/httpd: move all broadcast operations to the IOThread

Add a Page queue to class HttpdOutput, and use DeferredMonitor to
flush this queue inside the IOThread.  This fixes a thread-safety
issue: much of EventLoop is not thread-safe, and the httpd plugin
ignored that problem.
This commit is contained in:
Max Kellermann 2013-12-31 16:31:36 +01:00
parent 9bd4ed3e60
commit c9da3363a0
2 changed files with 76 additions and 11 deletions

View File

@ -29,6 +29,7 @@
#include "Timer.hxx" #include "Timer.hxx"
#include "thread/Mutex.hxx" #include "thread/Mutex.hxx"
#include "event/ServerSocket.hxx" #include "event/ServerSocket.hxx"
#include "event/DeferredMonitor.hxx"
#ifdef _LIBCPP_VERSION #ifdef _LIBCPP_VERSION
/* can't use incomplete template arguments with libc++ */ /* can't use incomplete template arguments with libc++ */
@ -36,6 +37,8 @@
#endif #endif
#include <forward_list> #include <forward_list>
#include <queue>
#include <list>
struct config_param; struct config_param;
class Error; class Error;
@ -46,7 +49,7 @@ class Page;
struct Encoder; struct Encoder;
struct Tag; struct Tag;
class HttpdOutput final : ServerSocket { class HttpdOutput final : ServerSocket, DeferredMonitor {
struct audio_output base; struct audio_output base;
/** /**
@ -80,6 +83,12 @@ public:
*/ */
mutable Mutex mutex; mutable Mutex mutex;
/**
* This condition gets signalled when an item is removed from
* #pages.
*/
Cond cond;
private: private:
/** /**
* A #Timer object to synchronize this output with the * A #Timer object to synchronize this output with the
@ -97,6 +106,14 @@ private:
*/ */
Page *metadata; Page *metadata;
/**
* The page queue, i.e. pages from the encoder to be
* broadcasted to all clients. This container is necessary to
* pass pages from the OutputThread to the IOThread. It is
* protected by #mutex, and removing signals #cond.
*/
std::queue<Page *, std::list<Page *>> pages;
public: public:
/** /**
* The configured name. * The configured name.
@ -248,6 +265,8 @@ public:
void CancelAllClients(); void CancelAllClients();
private: private:
virtual void RunDeferred() override;
virtual void OnAccept(int fd, const sockaddr &address, virtual void OnAccept(int fd, const sockaddr &address,
size_t address_length, int uid) override; size_t address_length, int uid) override;
}; };

View File

@ -29,6 +29,7 @@
#include "IcyMetaDataServer.hxx" #include "IcyMetaDataServer.hxx"
#include "system/fd_util.h" #include "system/fd_util.h"
#include "IOThread.hxx" #include "IOThread.hxx"
#include "event/Call.hxx"
#include "util/Error.hxx" #include "util/Error.hxx"
#include "util/Domain.hxx" #include "util/Domain.hxx"
#include "Log.hxx" #include "Log.hxx"
@ -49,7 +50,7 @@ const Domain httpd_output_domain("httpd_output");
inline inline
HttpdOutput::HttpdOutput(EventLoop &_loop) HttpdOutput::HttpdOutput(EventLoop &_loop)
:ServerSocket(_loop), :ServerSocket(_loop), DeferredMonitor(_loop),
encoder(nullptr), unflushed_input(0), encoder(nullptr), unflushed_input(0),
metadata(nullptr) metadata(nullptr)
{ {
@ -162,7 +163,7 @@ httpd_output_finish(struct audio_output *ao)
inline void inline void
HttpdOutput::AddClient(int fd) HttpdOutput::AddClient(int fd)
{ {
clients.emplace_front(*this, fd, GetEventLoop(), clients.emplace_front(*this, fd, ServerSocket::GetEventLoop(),
encoder->plugin.tag == nullptr); encoder->plugin.tag == nullptr);
++clients_cnt; ++clients_cnt;
@ -171,6 +172,29 @@ HttpdOutput::AddClient(int fd)
clients.front().PushMetaData(metadata); clients.front().PushMetaData(metadata);
} }
void
HttpdOutput::RunDeferred()
{
/* this method runs in the IOThread; it broadcasts pages from
our own queue to all clients */
const ScopeLock protect(mutex);
while (!pages.empty()) {
Page *page = pages.front();
pages.pop();
for (auto &client : clients)
client.PushPage(page);
page->Unref();
}
/* wake up the client that may be waiting for the queue to be
flushed */
cond.broadcast();
}
void void
HttpdOutput::OnAccept(int fd, const sockaddr &address, HttpdOutput::OnAccept(int fd, const sockaddr &address,
size_t address_length, gcc_unused int uid) size_t address_length, gcc_unused int uid)
@ -393,19 +417,29 @@ HttpdOutput::BroadcastPage(Page *page)
{ {
assert(page != nullptr); assert(page != nullptr);
const ScopeLock protect(mutex); mutex.lock();
for (auto &client : clients) pages.push(page);
client.PushPage(page); page->Ref();
mutex.unlock();
DeferredMonitor::Schedule();
} }
void void
HttpdOutput::BroadcastFromEncoder() HttpdOutput::BroadcastFromEncoder()
{ {
/* synchronize with the IOThread */
mutex.lock();
while (!pages.empty())
cond.wait(mutex);
Page *page; Page *page;
while ((page = ReadPage()) != nullptr) { while ((page = ReadPage()) != nullptr)
BroadcastPage(page); pages.push(page);
page->Unref();
} mutex.unlock();
DeferredMonitor::Schedule();
} }
inline bool inline bool
@ -519,15 +553,27 @@ inline void
HttpdOutput::CancelAllClients() HttpdOutput::CancelAllClients()
{ {
const ScopeLock protect(mutex); const ScopeLock protect(mutex);
while (!pages.empty()) {
Page *page = pages.front();
pages.pop();
page->Unref();
}
for (auto &client : clients) for (auto &client : clients)
client.CancelQueue(); client.CancelQueue();
cond.broadcast();
} }
static void static void
httpd_output_cancel(struct audio_output *ao) httpd_output_cancel(struct audio_output *ao)
{ {
HttpdOutput *httpd = HttpdOutput::Cast(ao); HttpdOutput *httpd = HttpdOutput::Cast(ao);
httpd->CancelAllClients();
BlockingCall(io_thread_get(), [httpd](){
httpd->CancelAllClients();
});
} }
const struct audio_output_plugin httpd_output_plugin = { const struct audio_output_plugin httpd_output_plugin = {