output/httpd: move functions into the HttpdOutput class

This commit is contained in:
Max Kellermann 2013-01-27 23:23:46 +01:00
parent 27f8ef2f33
commit 2aa34882b7
3 changed files with 214 additions and 175 deletions

View File

@ -55,7 +55,7 @@ HttpdClient::~HttpdClient()
void void
HttpdClient::Close() HttpdClient::Close()
{ {
httpd_output_remove_client(httpd, this); httpd->RemoveClient(*this);
} }
void void
@ -74,7 +74,7 @@ HttpdClient::BeginResponse()
write_source_id = 0; write_source_id = 0;
current_page = nullptr; current_page = nullptr;
httpd_output_send_header(httpd, this); httpd->SendHeader(*this);
} }
/** /**

View File

@ -122,21 +122,79 @@ struct HttpdOutput {
* at the same time. * at the same time.
*/ */
guint clients_max, clients_cnt; guint clients_max, clients_cnt;
bool Bind(GError **error_r);
void Unbind();
/**
* Caller must lock the mutex.
*/
bool OpenEncoder(struct audio_format *audio_format,
GError **error_r);
/**
* Caller must lock the mutex.
*/
bool Open(struct audio_format *audio_format, GError **error_r);
/**
* Caller must lock the mutex.
*/
void Close();
/**
* Check whether there is at least one client.
*
* Caller must lock the mutex.
*/
gcc_pure
bool HasClients() const {
return !clients.empty();
}
/**
* Check whether there is at least one client.
*/
gcc_pure
bool LockHasClients() const {
const ScopeLock protect(mutex);
return HasClients();
}
void AddClient(int fd);
/**
* Removes a client from the httpd_output.clients linked list.
*/
void RemoveClient(HttpdClient &client);
/**
* Sends the encoder header to the client. This is called
* right after the response headers have been sent.
*/
void SendHeader(HttpdClient &client) const;
/**
* Reads data from the encoder (as much as available) and
* returns it as a new #page object.
*/
page *ReadPage();
/**
* Broadcasts a page struct to all clients.
*
* Mutext must not be locked.
*/
void BroadcastPage(struct page *page);
/**
* Broadcasts data from the encoder to all clients.
*/
void BroadcastFromEncoder();
bool EncodeAndPlay(const void *chunk, size_t size, GError **error_r);
void SendTag(const struct tag *tag);
}; };
/**
* Removes a client from the httpd_output.clients linked list.
*/
void
httpd_output_remove_client(struct HttpdOutput *httpd,
HttpdClient *client);
/**
* Sends the encoder header to the client. This is called right after
* the response headers have been sent.
*/
void
httpd_output_send_header(struct HttpdOutput *httpd,
HttpdClient *client);
#endif #endif

View File

@ -54,49 +54,26 @@ httpd_output_quark(void)
return g_quark_from_static_string("httpd_output"); return g_quark_from_static_string("httpd_output");
} }
/**
* Check whether there is at least one client.
*
* Caller must lock the mutex.
*/
G_GNUC_PURE
static bool
httpd_output_has_clients(const HttpdOutput *httpd)
{
return !httpd->clients.empty();
}
/**
* Check whether there is at least one client.
*/
G_GNUC_PURE
static bool
httpd_output_lock_has_clients(const HttpdOutput *httpd)
{
const ScopeLock protect(httpd->mutex);
return httpd_output_has_clients(httpd);
}
static void static void
httpd_listen_in_event(int fd, const struct sockaddr *address, httpd_listen_in_event(int fd, const struct sockaddr *address,
size_t address_length, int uid, void *ctx); size_t address_length, int uid, void *ctx);
static bool inline bool
httpd_output_bind(HttpdOutput *httpd, GError **error_r) HttpdOutput::Bind(GError **error_r)
{ {
httpd->open = false; open = false;
const ScopeLock protect(httpd->mutex); const ScopeLock protect(mutex);
return server_socket_open(httpd->server_socket, error_r); return server_socket_open(server_socket, error_r);
} }
static void inline void
httpd_output_unbind(HttpdOutput *httpd) HttpdOutput::Unbind()
{ {
assert(!httpd->open); assert(!open);
const ScopeLock protect(httpd->mutex); const ScopeLock protect(mutex);
server_socket_close(httpd->server_socket); server_socket_close(server_socket);
} }
static struct audio_output * static struct audio_output *
@ -191,16 +168,16 @@ httpd_output_finish(struct audio_output *ao)
* Creates a new #HttpdClient object and adds it into the * Creates a new #HttpdClient object and adds it into the
* HttpdOutput.clients linked list. * HttpdOutput.clients linked list.
*/ */
static void inline void
httpd_client_add(HttpdOutput *httpd, int fd) HttpdOutput::AddClient(int fd)
{ {
httpd->clients.emplace_front(httpd, fd, clients.emplace_front(this, fd,
httpd->encoder->plugin->tag == NULL); encoder->plugin->tag == NULL);
httpd->clients_cnt++; ++clients_cnt;
/* pass metadata to client */ /* pass metadata to client */
if (httpd->metadata) if (metadata != nullptr)
httpd->clients.front().PushMetaData(httpd->metadata); clients.front().PushMetaData(metadata);
} }
static void static void
@ -245,7 +222,7 @@ httpd_listen_in_event(int fd, const struct sockaddr *address,
if (httpd->open && if (httpd->open &&
(httpd->clients_max == 0 || (httpd->clients_max == 0 ||
httpd->clients_cnt < httpd->clients_max)) httpd->clients_cnt < httpd->clients_max))
httpd_client_add(httpd, fd); httpd->AddClient(fd);
else else
close_socket(fd); close_socket(fd);
} else if (fd < 0 && errno != EINTR) { } else if (fd < 0 && errno != EINTR) {
@ -253,56 +230,34 @@ httpd_listen_in_event(int fd, const struct sockaddr *address,
} }
} }
/** struct page *
* Reads data from the encoder (as much as available) and returns it HttpdOutput::ReadPage()
* as a new #page object.
*/
static struct page *
httpd_output_read_page(HttpdOutput *httpd)
{ {
if (httpd->unflushed_input >= 65536) { if (unflushed_input >= 65536) {
/* we have fed a lot of input into the encoder, but it /* we have fed a lot of input into the encoder, but it
didn't give anything back yet - flush now to avoid didn't give anything back yet - flush now to avoid
buffer underruns */ buffer underruns */
encoder_flush(httpd->encoder, NULL); encoder_flush(encoder, NULL);
httpd->unflushed_input = 0; unflushed_input = 0;
} }
size_t size = 0; size_t size = 0;
do { do {
size_t nbytes = encoder_read(httpd->encoder, size_t nbytes = encoder_read(encoder,
httpd->buffer + size, buffer + size,
sizeof(httpd->buffer) - size); sizeof(buffer) - size);
if (nbytes == 0) if (nbytes == 0)
break; break;
httpd->unflushed_input = 0; unflushed_input = 0;
size += nbytes; size += nbytes;
} while (size < sizeof(httpd->buffer)); } while (size < sizeof(buffer));
if (size == 0) if (size == 0)
return NULL; return NULL;
return page_new_copy(httpd->buffer, size); return page_new_copy(buffer, size);
}
static bool
httpd_output_encoder_open(HttpdOutput *httpd,
struct audio_format *audio_format,
GError **error)
{
if (!encoder_open(httpd->encoder, audio_format, error))
return false;
/* we have to remember the encoder header, i.e. the first
bytes of encoder output after opening it, because it has to
be sent to every new client */
httpd->header = httpd_output_read_page(httpd);
httpd->unflushed_input = 0;
return true;
} }
static bool static bool
@ -310,7 +265,7 @@ httpd_output_enable(struct audio_output *ao, GError **error_r)
{ {
HttpdOutput *httpd = (HttpdOutput *)ao; HttpdOutput *httpd = (HttpdOutput *)ao;
return httpd_output_bind(httpd, error_r); return httpd->Bind(error_r);
} }
static void static void
@ -318,7 +273,44 @@ httpd_output_disable(struct audio_output *ao)
{ {
HttpdOutput *httpd = (HttpdOutput *)ao; HttpdOutput *httpd = (HttpdOutput *)ao;
httpd_output_unbind(httpd); httpd->Unbind();
}
inline bool
HttpdOutput::OpenEncoder(struct audio_format *audio_format, GError **error)
{
if (!encoder_open(encoder, audio_format, error))
return false;
/* we have to remember the encoder header, i.e. the first
bytes of encoder output after opening it, because it has to
be sent to every new client */
header = ReadPage();
unflushed_input = 0;
return true;
}
inline bool
HttpdOutput::Open(struct audio_format *audio_format, GError **error_r)
{
assert(!open);
assert(clients.empty());
/* open the encoder */
if (!OpenEncoder(audio_format, error_r))
return false;
/* initialize other attributes */
clients_cnt = 0;
timer = timer_new(audio_format);
open = true;
return true;
} }
static bool static bool
@ -330,20 +322,24 @@ httpd_output_open(struct audio_output *ao, struct audio_format *audio_format,
assert(httpd->clients.empty()); assert(httpd->clients.empty());
const ScopeLock protect(httpd->mutex); const ScopeLock protect(httpd->mutex);
return httpd->Open(audio_format, error);
}
/* open the encoder */ inline void
HttpdOutput::Close()
{
assert(open);
if (!httpd_output_encoder_open(httpd, audio_format, error)) open = false;
return false;
/* initialize other attributes */ timer_free(timer);
httpd->clients_cnt = 0; clients.clear();
httpd->timer = timer_new(audio_format);
httpd->open = true; if (header != NULL)
page_unref(header);
return true; encoder_close(encoder);
} }
static void static void
@ -352,42 +348,30 @@ httpd_output_close(struct audio_output *ao)
HttpdOutput *httpd = (HttpdOutput *)ao; HttpdOutput *httpd = (HttpdOutput *)ao;
const ScopeLock protect(httpd->mutex); const ScopeLock protect(httpd->mutex);
httpd->Close();
httpd->open = false;
timer_free(httpd->timer);
httpd->clients.clear();
if (httpd->header != NULL)
page_unref(httpd->header);
encoder_close(httpd->encoder);
} }
void void
httpd_output_remove_client(HttpdOutput *httpd, HttpdClient *client) HttpdOutput::RemoveClient(HttpdClient &client)
{ {
assert(httpd != NULL); assert(clients_cnt > 0);
assert(httpd->clients_cnt > 0);
assert(client != NULL);
for (auto prev = httpd->clients.before_begin(), i = std::next(prev);; for (auto prev = clients.before_begin(), i = std::next(prev);;
prev = i, i = std::next(prev)) { prev = i, i = std::next(prev)) {
assert(i != httpd->clients.end()); assert(i != clients.end());
if (&*i == client) { if (&*i == &client) {
httpd->clients.erase_after(prev); clients.erase_after(prev);
httpd->clients_cnt--; clients_cnt--;
break; break;
} }
} }
} }
void void
httpd_output_send_header(HttpdOutput *httpd, HttpdClient *client) HttpdOutput::SendHeader(HttpdClient &client) const
{ {
if (httpd->header != NULL) if (header != NULL)
client->PushPage(httpd->header); client.PushPage(header);
} }
static unsigned static unsigned
@ -395,7 +379,7 @@ httpd_output_delay(struct audio_output *ao)
{ {
HttpdOutput *httpd = (HttpdOutput *)ao; HttpdOutput *httpd = (HttpdOutput *)ao;
if (!httpd_output_lock_has_clients(httpd) && httpd->base.pause) { if (!httpd->LockHasClients() && httpd->base.pause) {
/* if there's no client and this output is paused, /* if there's no client and this output is paused,
then httpd_output_pause() will not do anything, it then httpd_output_pause() will not do anything, it
will not fill the buffer and it will not update the will not fill the buffer and it will not update the
@ -413,52 +397,44 @@ httpd_output_delay(struct audio_output *ao)
: 0; : 0;
} }
/** void
* Broadcasts a page struct to all clients. HttpdOutput::BroadcastPage(struct page *page)
*/
static void
httpd_output_broadcast_page(HttpdOutput *httpd, struct page *page)
{ {
assert(page != NULL); assert(page != NULL);
const ScopeLock protect(httpd->mutex); const ScopeLock protect(mutex);
for (auto &client : httpd->clients) for (auto &client : clients)
client.PushPage(page); client.PushPage(page);
} }
/** void
* Broadcasts data from the encoder to all clients. HttpdOutput::BroadcastFromEncoder()
*/
static void
httpd_output_encoder_to_clients(HttpdOutput *httpd)
{ {
httpd->mutex.lock(); mutex.lock();
for (auto &client : httpd->clients) { for (auto &client : clients) {
if (client.GetQueueSize() > 256 * 1024) { if (client.GetQueueSize() > 256 * 1024) {
g_debug("client is too slow, flushing its queue"); g_debug("client is too slow, flushing its queue");
client.CancelQueue(); client.CancelQueue();
} }
} }
httpd->mutex.unlock(); mutex.unlock();
struct page *page; struct page *page;
while ((page = httpd_output_read_page(httpd)) != NULL) { while ((page = ReadPage()) != nullptr) {
httpd_output_broadcast_page(httpd, page); BroadcastPage(page);
page_unref(page); page_unref(page);
} }
} }
static bool inline bool
httpd_output_encode_and_play(HttpdOutput *httpd, HttpdOutput::EncodeAndPlay(const void *chunk, size_t size, GError **error_r)
const void *chunk, size_t size, GError **error)
{ {
if (!encoder_write(httpd->encoder, chunk, size, error)) if (!encoder_write(encoder, chunk, size, error_r))
return false; return false;
httpd->unflushed_input += size; unflushed_input += size;
httpd_output_encoder_to_clients(httpd);
BroadcastFromEncoder();
return true; return true;
} }
@ -468,8 +444,8 @@ httpd_output_play(struct audio_output *ao, const void *chunk, size_t size,
{ {
HttpdOutput *httpd = (HttpdOutput *)ao; HttpdOutput *httpd = (HttpdOutput *)ao;
if (httpd_output_lock_has_clients(httpd)) { if (httpd->LockHasClients()) {
if (!httpd_output_encode_and_play(httpd, chunk, size, error_r)) if (!httpd->EncodeAndPlay(chunk, size, error_r))
return 0; return 0;
} }
@ -485,7 +461,7 @@ httpd_output_pause(struct audio_output *ao)
{ {
HttpdOutput *httpd = (HttpdOutput *)ao; HttpdOutput *httpd = (HttpdOutput *)ao;
if (httpd_output_lock_has_clients(httpd)) { if (httpd->LockHasClients()) {
static const char silence[1020] = { 0 }; static const char silence[1020] = { 0 };
return httpd_output_play(ao, silence, sizeof(silence), return httpd_output_play(ao, silence, sizeof(silence),
NULL) > 0; NULL) > 0;
@ -494,55 +470,60 @@ httpd_output_pause(struct audio_output *ao)
} }
} }
static void inline void
httpd_output_tag(struct audio_output *ao, const struct tag *tag) HttpdOutput::SendTag(const struct tag *tag)
{ {
HttpdOutput *httpd = (HttpdOutput *)ao;
assert(tag != NULL); assert(tag != NULL);
if (httpd->encoder->plugin->tag != NULL) { if (encoder->plugin->tag != NULL) {
/* embed encoder tags */ /* embed encoder tags */
/* flush the current stream, and end it */ /* flush the current stream, and end it */
encoder_pre_tag(httpd->encoder, NULL); encoder_pre_tag(encoder, NULL);
httpd_output_encoder_to_clients(httpd); BroadcastFromEncoder();
/* send the tag to the encoder - which starts a new /* send the tag to the encoder - which starts a new
stream now */ stream now */
encoder_tag(httpd->encoder, tag, NULL); encoder_tag(encoder, tag, NULL);
/* the first page generated by the encoder will now be /* the first page generated by the encoder will now be
used as the new "header" page, which is sent to all used as the new "header" page, which is sent to all
new clients */ new clients */
struct page *page = httpd_output_read_page(httpd); struct page *page = ReadPage();
if (page != NULL) { if (page != NULL) {
if (httpd->header != NULL) if (header != NULL)
page_unref(httpd->header); page_unref(header);
httpd->header = page; header = page;
httpd_output_broadcast_page(httpd, page); BroadcastPage(page);
} }
} else { } else {
/* use Icy-Metadata */ /* use Icy-Metadata */
if (httpd->metadata != NULL) if (metadata != NULL)
page_unref (httpd->metadata); page_unref(metadata);
httpd->metadata = metadata = icy_server_metadata_page(tag, TAG_ALBUM,
icy_server_metadata_page(tag, TAG_ALBUM, TAG_ARTIST, TAG_TITLE,
TAG_ARTIST, TAG_TITLE, TAG_NUM_OF_ITEM_TYPES);
TAG_NUM_OF_ITEM_TYPES); if (metadata != NULL) {
if (httpd->metadata != NULL) { const ScopeLock protect(mutex);
const ScopeLock protect(httpd->mutex); for (auto &client : clients)
for (auto &client : httpd->clients) client.PushMetaData(metadata);
client.PushMetaData(httpd->metadata);
} }
} }
} }
static void
httpd_output_tag(struct audio_output *ao, const struct tag *tag)
{
HttpdOutput *httpd = (HttpdOutput *)ao;
httpd->SendTag(tag);
}
static void static void
httpd_output_cancel(struct audio_output *ao) httpd_output_cancel(struct audio_output *ao)
{ {