From 3e035279300d1ac238f2f063e5ca5f478923d7cb Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Tue, 15 Jan 2013 01:12:08 +0100 Subject: [PATCH] Client: move output buffer code to new class PeakBuffer --- Makefile.am | 1 + src/ClientEvent.cxx | 4 +- src/ClientInternal.hxx | 12 +-- src/ClientNew.cxx | 13 +-- src/ClientWrite.cxx | 171 +++++++--------------------------------- src/util/PeakBuffer.cxx | 143 +++++++++++++++++++++++++++++++++ src/util/PeakBuffer.hxx | 66 ++++++++++++++++ 7 files changed, 245 insertions(+), 165 deletions(-) create mode 100644 src/util/PeakBuffer.cxx create mode 100644 src/util/PeakBuffer.hxx diff --git a/Makefile.am b/Makefile.am index ffb927e2c..5f6ce85c4 100644 --- a/Makefile.am +++ b/Makefile.am @@ -346,6 +346,7 @@ libutil_a_SOURCES = \ src/util/LazyRandomEngine.cxx src/util/LazyRandomEngine.hxx \ src/util/SliceBuffer.hxx \ src/util/HugeAllocator.cxx src/util/HugeAllocator.hxx \ + src/util/PeakBuffer.cxx src/util/PeakBuffer.hxx \ src/util/list.h \ src/util/list_sort.c src/util/list_sort.h \ src/util/byte_reverse.c src/util/byte_reverse.h \ diff --git a/src/ClientEvent.cxx b/src/ClientEvent.cxx index 1e06ad48a..201709c76 100644 --- a/src/ClientEvent.cxx +++ b/src/ClientEvent.cxx @@ -46,7 +46,7 @@ client_out_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition, g_timer_start(client->last_activity); - if (g_queue_is_empty(client->deferred_send)) { + if (client->output_buffer.IsEmpty()) { /* done sending deferred buffers exist: schedule read */ client->source_id = g_io_add_watch(client->channel, @@ -97,7 +97,7 @@ client_in_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition, return false; } - if (!g_queue_is_empty(client->deferred_send)) { + if (!client->output_buffer.IsEmpty()) { /* deferred buffers exist: schedule write */ client->source_id = g_io_add_watch(client->channel, GIOCondition(G_IO_OUT|G_IO_ERR|G_IO_HUP), diff --git a/src/ClientInternal.hxx b/src/ClientInternal.hxx index 7d2edf046..009568ed4 100644 --- a/src/ClientInternal.hxx +++ b/src/ClientInternal.hxx @@ -25,6 +25,7 @@ #include "ClientMessage.hxx" #include "CommandListBuilder.hxx" #include "command.h" +#include "util/PeakBuffer.hxx" #include #include @@ -40,12 +41,8 @@ enum { CLIENT_MAX_MESSAGES = 64, }; -struct deferred_buffer { - size_t size; - char data[sizeof(long)]; -}; - struct Partition; +class PeakBuffer; class Client { public: @@ -71,12 +68,9 @@ public: CommandListBuilder cmd_list; - GQueue *deferred_send; /* for output if client is slow */ - size_t deferred_bytes; /* mem deferred_send consumes */ unsigned int num; /* client number */ - char send_buf[16384]; - size_t send_buf_used; /* bytes used this instance */ + PeakBuffer output_buffer; /** is this client waiting for an "idle" response? */ bool idle_waiting; diff --git a/src/ClientNew.cxx b/src/ClientNew.cxx index 4a22ca368..144c339ab 100644 --- a/src/ClientNew.cxx +++ b/src/ClientNew.cxx @@ -55,9 +55,8 @@ Client::Client(Partition &_partition, permission(getDefaultPermissions()), uid(_uid), last_activity(g_timer_new()), - deferred_send(g_queue_new()), deferred_bytes(0), num(_num), - send_buf_used(0), + output_buffer(16384, client_max_output_buffer_size), idle_waiting(false), idle_flags(0), num_subscriptions(0) { @@ -78,20 +77,10 @@ Client::Client(Partition &_partition, client_in_event, this); } -static void -deferred_buffer_free(gpointer data, G_GNUC_UNUSED gpointer user_data) -{ - struct deferred_buffer *buffer = (struct deferred_buffer *)data; - g_free(buffer); -} - Client::~Client() { g_timer_destroy(last_activity); - g_queue_foreach(deferred_send, deferred_buffer_free, NULL); - g_queue_free(deferred_send); - fifo_buffer_free(input); } diff --git a/src/ClientWrite.cxx b/src/ClientWrite.cxx index 1fe0e7bbd..86abc152c 100644 --- a/src/ClientWrite.cxx +++ b/src/ClientWrite.cxx @@ -25,20 +25,18 @@ #include static size_t -client_write_deferred_buffer(Client *client, - const struct deferred_buffer *buffer) +client_write_direct(Client *client, const void *data, size_t length) { - GError *error = NULL; - GIOStatus status; - gsize bytes_written; - assert(client != NULL); assert(client->channel != NULL); - assert(buffer != NULL); + assert(data != NULL); + assert(length > 0); - status = g_io_channel_write_chars - (client->channel, buffer->data, buffer->size, - &bytes_written, &error); + gsize bytes_written; + GError *error = NULL; + GIOStatus status = + g_io_channel_write_chars(client->channel, (const gchar *)data, + length, &bytes_written, &error); switch (status) { case G_IO_STATUS_NORMAL: return bytes_written; @@ -56,186 +54,75 @@ client_write_deferred_buffer(Client *client, /* I/O error */ client->SetExpired(); - g_warning("failed to flush buffer for %i: %s", + g_warning("failed to write to %i: %s", client->num, error->message); g_error_free(error); return 0; } /* unreachable */ + assert(false); return 0; } void client_write_deferred(Client *client) { - size_t ret; + assert(!client_is_expired(client)); - while (!g_queue_is_empty(client->deferred_send)) { - struct deferred_buffer *buf = - (struct deferred_buffer *) - g_queue_peek_head(client->deferred_send); - - assert(buf->size > 0); - assert(buf->size <= client->deferred_bytes); - - ret = client_write_deferred_buffer(client, buf); - if (ret == 0) + while (true) { + size_t length; + const void *data = client->output_buffer.Read(&length); + if (data == nullptr) break; - if (ret < buf->size) { - assert(client->deferred_bytes >= (size_t)ret); - client->deferred_bytes -= ret; - buf->size -= ret; - memmove(buf->data, buf->data + ret, buf->size); - break; - } else { - size_t decr = sizeof(*buf) - - sizeof(buf->data) + buf->size; + size_t nbytes = client_write_direct(client, data, length); + if (nbytes == 0) + return; - assert(client->deferred_bytes >= decr); - client->deferred_bytes -= decr; - g_free(buf); - g_queue_pop_head(client->deferred_send); - } + client->output_buffer.Consume(nbytes); + + if (nbytes < length) + return; g_timer_start(client->last_activity); } - - if (g_queue_is_empty(client->deferred_send)) { - g_debug("[%u] buffer empty %lu", client->num, - (unsigned long)client->deferred_bytes); - assert(client->deferred_bytes == 0); - } } static void client_defer_output(Client *client, const void *data, size_t length) { - size_t alloc; - struct deferred_buffer *buf; - - assert(length > 0); - - alloc = sizeof(*buf) - sizeof(buf->data) + length; - client->deferred_bytes += alloc; - if (client->deferred_bytes > client_max_output_buffer_size) { - g_warning("[%u] output buffer size (%lu) is " + if (!client->output_buffer.Append(data, length)) { + g_warning("[%u] output buffer size is " "larger than the max (%lu)", client->num, - (unsigned long)client->deferred_bytes, (unsigned long)client_max_output_buffer_size); /* cause client to close */ client->SetExpired(); return; } - - buf = (struct deferred_buffer *)g_malloc(alloc); - buf->size = length; - memcpy(buf->data, data, length); - - g_queue_push_tail(client->deferred_send, buf); -} - -static void -client_write_direct(Client *client, const char *data, size_t length) -{ - GError *error = NULL; - GIOStatus status; - gsize bytes_written; - - assert(client != NULL); - assert(client->channel != NULL); - assert(data != NULL); - assert(length > 0); - assert(g_queue_is_empty(client->deferred_send)); - - status = g_io_channel_write_chars(client->channel, data, length, - &bytes_written, &error); - switch (status) { - case G_IO_STATUS_NORMAL: - case G_IO_STATUS_AGAIN: - break; - - case G_IO_STATUS_EOF: - /* client has disconnected */ - - client->SetExpired(); - return; - - case G_IO_STATUS_ERROR: - /* I/O error */ - - client->SetExpired(); - g_warning("failed to write to %i: %s", - client->num, error->message); - g_error_free(error); - return; - } - - if (bytes_written < length) - client_defer_output(client, data + bytes_written, - length - bytes_written); - - if (!g_queue_is_empty(client->deferred_send)) - g_debug("[%u] buffer created", client->num); } void client_write_output(Client *client) { - if (client->IsExpired() || !client->send_buf_used) + if (client->IsExpired()) return; - if (!g_queue_is_empty(client->deferred_send)) { - client_defer_output(client, client->send_buf, - client->send_buf_used); - - if (client->IsExpired()) - return; - - /* try to flush the deferred buffers now; the current - server command may take too long to finish, and - meanwhile try to feed output to the client, - otherwise it will time out. One reason why - deferring is slow might be that currently each - client_write() allocates a new deferred buffer. - This should be optimized after MPD 0.14. */ - client_write_deferred(client); - } else - client_write_direct(client, client->send_buf, - client->send_buf_used); - - client->send_buf_used = 0; + client_write_deferred(client); } /** * Write a block of data to the client. */ static void -client_write(Client *client, const char *buffer, size_t buflen) +client_write(Client *client, const char *data, size_t length) { /* if the client is going to be closed, do nothing */ - if (client->IsExpired()) + if (client->IsExpired() || length == 0) return; - while (buflen > 0 && !client->IsExpired()) { - size_t copylen; - - assert(client->send_buf_used < sizeof(client->send_buf)); - - copylen = sizeof(client->send_buf) - client->send_buf_used; - if (copylen > buflen) - copylen = buflen; - - memcpy(client->send_buf + client->send_buf_used, buffer, - copylen); - buflen -= copylen; - client->send_buf_used += copylen; - buffer += copylen; - if (client->send_buf_used >= sizeof(client->send_buf)) - client_write_output(client); - } + client_defer_output(client, data, length); } void diff --git a/src/util/PeakBuffer.cxx b/src/util/PeakBuffer.cxx new file mode 100644 index 000000000..a3659b8f4 --- /dev/null +++ b/src/util/PeakBuffer.cxx @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2003-2013 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "PeakBuffer.hxx" +#include "HugeAllocator.hxx" +#include "fifo_buffer.h" + +#include + +#include +#include +#include + +PeakBuffer::~PeakBuffer() +{ + if (normal_buffer != nullptr) + fifo_buffer_free(normal_buffer); + + if (peak_buffer != nullptr) + HugeFree(peak_buffer, peak_size); +} + +bool +PeakBuffer::IsEmpty() const +{ + return (normal_buffer == nullptr || + fifo_buffer_is_empty(normal_buffer)) && + (peak_buffer == nullptr || + fifo_buffer_is_empty(peak_buffer)); +} + +const void * +PeakBuffer::Read(size_t *length_r) const +{ + if (normal_buffer != nullptr) { + const void *p = fifo_buffer_read(normal_buffer, length_r); + if (p != nullptr) + return p; + } + + if (peak_buffer != nullptr) { + const void *p = fifo_buffer_read(peak_buffer, length_r); + if (p != nullptr) + return p; + } + + return nullptr; +} + +void +PeakBuffer::Consume(size_t length) +{ + if (normal_buffer != nullptr && !fifo_buffer_is_empty(normal_buffer)) { + fifo_buffer_consume(normal_buffer, length); + return; + } + + if (peak_buffer != nullptr && !fifo_buffer_is_empty(peak_buffer)) { + fifo_buffer_consume(peak_buffer, length); + if (fifo_buffer_is_empty(peak_buffer)) { + HugeFree(peak_buffer, peak_size); + peak_buffer = nullptr; + } + + return; + } +} + +static size_t +AppendTo(fifo_buffer *buffer, const void *data, size_t length) +{ + assert(data != nullptr); + assert(length > 0); + + size_t total = 0; + + do { + size_t max_length; + void *p = fifo_buffer_write(buffer, &max_length); + if (p == nullptr) + break; + + const size_t nbytes = std::min(length, max_length); + memcpy(p, data, nbytes); + fifo_buffer_append(buffer, nbytes); + + data = (const uint8_t *)data + nbytes; + length -= nbytes; + total += nbytes; + } while (length > 0); + + return total; +} + +bool +PeakBuffer::Append(const void *data, size_t length) +{ + if (length == 0) + return true; + + if (peak_buffer != nullptr && !fifo_buffer_is_empty(peak_buffer)) { + size_t nbytes = AppendTo(peak_buffer, data, length); + return nbytes == length; + } + + if (normal_buffer == nullptr) + normal_buffer = fifo_buffer_new(normal_size); + + size_t nbytes = AppendTo(normal_buffer, data, length); + if (nbytes > 0) { + data = (const uint8_t *)data + nbytes; + length -= nbytes; + if (length == 0) + return true; + } + + if (peak_buffer == nullptr && peak_size > 0) { + peak_buffer = (fifo_buffer *)HugeAllocate(peak_size); + if (peak_buffer == nullptr) + return false; + + fifo_buffer_init(peak_buffer, peak_size); + } + + nbytes = AppendTo(peak_buffer, data, length); + return nbytes == length; +} diff --git a/src/util/PeakBuffer.hxx b/src/util/PeakBuffer.hxx new file mode 100644 index 000000000..0fbba8d77 --- /dev/null +++ b/src/util/PeakBuffer.hxx @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2003-2013 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_PEAK_BUFFER_HXX +#define MPD_PEAK_BUFFER_HXX + +#include "gcc.h" + +#include + +struct fifo_buffer; + +/** + * A FIFO-like buffer that will allocate more memory on demand to + * allow large peaks. This second buffer will be given back to the + * kernel when it has been consumed. + */ +class PeakBuffer { + size_t normal_size, peak_size; + + fifo_buffer *normal_buffer, *peak_buffer; + +public: + PeakBuffer(size_t _normal_size, size_t _peak_size) + :normal_size(_normal_size), peak_size(_peak_size), + normal_buffer(nullptr), peak_buffer(nullptr) {} + + PeakBuffer(PeakBuffer &&other) + :normal_size(other.normal_size), peak_size(other.peak_size), + normal_buffer(other.normal_buffer), + peak_buffer(other.peak_buffer) { + other.normal_buffer = nullptr; + other.peak_buffer = nullptr; + } + + ~PeakBuffer(); + + PeakBuffer(const PeakBuffer &) = delete; + PeakBuffer &operator=(const PeakBuffer &) = delete; + + gcc_pure + bool IsEmpty() const; + + const void *Read(size_t *length_r) const; + void Consume(size_t length); + + bool Append(const void *data, size_t length); +}; + +#endif