Client: move output buffer code to new class PeakBuffer

This commit is contained in:
Max Kellermann 2013-01-15 01:12:08 +01:00
parent 4fa724461e
commit 3e03527930
7 changed files with 245 additions and 165 deletions

View File

@ -346,6 +346,7 @@ libutil_a_SOURCES = \
src/util/LazyRandomEngine.cxx src/util/LazyRandomEngine.hxx \ src/util/LazyRandomEngine.cxx src/util/LazyRandomEngine.hxx \
src/util/SliceBuffer.hxx \ src/util/SliceBuffer.hxx \
src/util/HugeAllocator.cxx src/util/HugeAllocator.hxx \ src/util/HugeAllocator.cxx src/util/HugeAllocator.hxx \
src/util/PeakBuffer.cxx src/util/PeakBuffer.hxx \
src/util/list.h \ src/util/list.h \
src/util/list_sort.c src/util/list_sort.h \ src/util/list_sort.c src/util/list_sort.h \
src/util/byte_reverse.c src/util/byte_reverse.h \ src/util/byte_reverse.c src/util/byte_reverse.h \

View File

@ -46,7 +46,7 @@ client_out_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition,
g_timer_start(client->last_activity); g_timer_start(client->last_activity);
if (g_queue_is_empty(client->deferred_send)) { if (client->output_buffer.IsEmpty()) {
/* done sending deferred buffers exist: schedule /* done sending deferred buffers exist: schedule
read */ read */
client->source_id = g_io_add_watch(client->channel, client->source_id = g_io_add_watch(client->channel,
@ -97,7 +97,7 @@ client_in_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition,
return false; return false;
} }
if (!g_queue_is_empty(client->deferred_send)) { if (!client->output_buffer.IsEmpty()) {
/* deferred buffers exist: schedule write */ /* deferred buffers exist: schedule write */
client->source_id = g_io_add_watch(client->channel, client->source_id = g_io_add_watch(client->channel,
GIOCondition(G_IO_OUT|G_IO_ERR|G_IO_HUP), GIOCondition(G_IO_OUT|G_IO_ERR|G_IO_HUP),

View File

@ -25,6 +25,7 @@
#include "ClientMessage.hxx" #include "ClientMessage.hxx"
#include "CommandListBuilder.hxx" #include "CommandListBuilder.hxx"
#include "command.h" #include "command.h"
#include "util/PeakBuffer.hxx"
#include <set> #include <set>
#include <string> #include <string>
@ -40,12 +41,8 @@ enum {
CLIENT_MAX_MESSAGES = 64, CLIENT_MAX_MESSAGES = 64,
}; };
struct deferred_buffer {
size_t size;
char data[sizeof(long)];
};
struct Partition; struct Partition;
class PeakBuffer;
class Client { class Client {
public: public:
@ -71,12 +68,9 @@ public:
CommandListBuilder cmd_list; CommandListBuilder cmd_list;
GQueue *deferred_send; /* for output if client is slow */
size_t deferred_bytes; /* mem deferred_send consumes */
unsigned int num; /* client number */ unsigned int num; /* client number */
char send_buf[16384]; PeakBuffer output_buffer;
size_t send_buf_used; /* bytes used this instance */
/** is this client waiting for an "idle" response? */ /** is this client waiting for an "idle" response? */
bool idle_waiting; bool idle_waiting;

View File

@ -55,9 +55,8 @@ Client::Client(Partition &_partition,
permission(getDefaultPermissions()), permission(getDefaultPermissions()),
uid(_uid), uid(_uid),
last_activity(g_timer_new()), last_activity(g_timer_new()),
deferred_send(g_queue_new()), deferred_bytes(0),
num(_num), num(_num),
send_buf_used(0), output_buffer(16384, client_max_output_buffer_size),
idle_waiting(false), idle_flags(0), idle_waiting(false), idle_flags(0),
num_subscriptions(0) num_subscriptions(0)
{ {
@ -78,20 +77,10 @@ Client::Client(Partition &_partition,
client_in_event, this); client_in_event, this);
} }
static void
deferred_buffer_free(gpointer data, G_GNUC_UNUSED gpointer user_data)
{
struct deferred_buffer *buffer = (struct deferred_buffer *)data;
g_free(buffer);
}
Client::~Client() Client::~Client()
{ {
g_timer_destroy(last_activity); g_timer_destroy(last_activity);
g_queue_foreach(deferred_send, deferred_buffer_free, NULL);
g_queue_free(deferred_send);
fifo_buffer_free(input); fifo_buffer_free(input);
} }

View File

@ -25,20 +25,18 @@
#include <stdio.h> #include <stdio.h>
static size_t static size_t
client_write_deferred_buffer(Client *client, client_write_direct(Client *client, const void *data, size_t length)
const struct deferred_buffer *buffer)
{ {
GError *error = NULL;
GIOStatus status;
gsize bytes_written;
assert(client != NULL); assert(client != NULL);
assert(client->channel != NULL); assert(client->channel != NULL);
assert(buffer != NULL); assert(data != NULL);
assert(length > 0);
status = g_io_channel_write_chars gsize bytes_written;
(client->channel, buffer->data, buffer->size, GError *error = NULL;
&bytes_written, &error); GIOStatus status =
g_io_channel_write_chars(client->channel, (const gchar *)data,
length, &bytes_written, &error);
switch (status) { switch (status) {
case G_IO_STATUS_NORMAL: case G_IO_STATUS_NORMAL:
return bytes_written; return bytes_written;
@ -56,186 +54,75 @@ client_write_deferred_buffer(Client *client,
/* I/O error */ /* I/O error */
client->SetExpired(); client->SetExpired();
g_warning("failed to flush buffer for %i: %s", g_warning("failed to write to %i: %s",
client->num, error->message); client->num, error->message);
g_error_free(error); g_error_free(error);
return 0; return 0;
} }
/* unreachable */ /* unreachable */
assert(false);
return 0; return 0;
} }
void void
client_write_deferred(Client *client) client_write_deferred(Client *client)
{ {
size_t ret; assert(!client_is_expired(client));
while (!g_queue_is_empty(client->deferred_send)) { while (true) {
struct deferred_buffer *buf = size_t length;
(struct deferred_buffer *) const void *data = client->output_buffer.Read(&length);
g_queue_peek_head(client->deferred_send); if (data == nullptr)
assert(buf->size > 0);
assert(buf->size <= client->deferred_bytes);
ret = client_write_deferred_buffer(client, buf);
if (ret == 0)
break; break;
if (ret < buf->size) { size_t nbytes = client_write_direct(client, data, length);
assert(client->deferred_bytes >= (size_t)ret); if (nbytes == 0)
client->deferred_bytes -= ret; return;
buf->size -= ret;
memmove(buf->data, buf->data + ret, buf->size);
break;
} else {
size_t decr = sizeof(*buf) -
sizeof(buf->data) + buf->size;
assert(client->deferred_bytes >= decr); client->output_buffer.Consume(nbytes);
client->deferred_bytes -= decr;
g_free(buf); if (nbytes < length)
g_queue_pop_head(client->deferred_send); return;
}
g_timer_start(client->last_activity); g_timer_start(client->last_activity);
} }
if (g_queue_is_empty(client->deferred_send)) {
g_debug("[%u] buffer empty %lu", client->num,
(unsigned long)client->deferred_bytes);
assert(client->deferred_bytes == 0);
}
} }
static void static void
client_defer_output(Client *client, const void *data, size_t length) client_defer_output(Client *client, const void *data, size_t length)
{ {
size_t alloc; if (!client->output_buffer.Append(data, length)) {
struct deferred_buffer *buf; g_warning("[%u] output buffer size is "
assert(length > 0);
alloc = sizeof(*buf) - sizeof(buf->data) + length;
client->deferred_bytes += alloc;
if (client->deferred_bytes > client_max_output_buffer_size) {
g_warning("[%u] output buffer size (%lu) is "
"larger than the max (%lu)", "larger than the max (%lu)",
client->num, client->num,
(unsigned long)client->deferred_bytes,
(unsigned long)client_max_output_buffer_size); (unsigned long)client_max_output_buffer_size);
/* cause client to close */ /* cause client to close */
client->SetExpired(); client->SetExpired();
return; return;
} }
buf = (struct deferred_buffer *)g_malloc(alloc);
buf->size = length;
memcpy(buf->data, data, length);
g_queue_push_tail(client->deferred_send, buf);
}
static void
client_write_direct(Client *client, const char *data, size_t length)
{
GError *error = NULL;
GIOStatus status;
gsize bytes_written;
assert(client != NULL);
assert(client->channel != NULL);
assert(data != NULL);
assert(length > 0);
assert(g_queue_is_empty(client->deferred_send));
status = g_io_channel_write_chars(client->channel, data, length,
&bytes_written, &error);
switch (status) {
case G_IO_STATUS_NORMAL:
case G_IO_STATUS_AGAIN:
break;
case G_IO_STATUS_EOF:
/* client has disconnected */
client->SetExpired();
return;
case G_IO_STATUS_ERROR:
/* I/O error */
client->SetExpired();
g_warning("failed to write to %i: %s",
client->num, error->message);
g_error_free(error);
return;
}
if (bytes_written < length)
client_defer_output(client, data + bytes_written,
length - bytes_written);
if (!g_queue_is_empty(client->deferred_send))
g_debug("[%u] buffer created", client->num);
} }
void void
client_write_output(Client *client) client_write_output(Client *client)
{ {
if (client->IsExpired() || !client->send_buf_used)
return;
if (!g_queue_is_empty(client->deferred_send)) {
client_defer_output(client, client->send_buf,
client->send_buf_used);
if (client->IsExpired()) if (client->IsExpired())
return; return;
/* try to flush the deferred buffers now; the current
server command may take too long to finish, and
meanwhile try to feed output to the client,
otherwise it will time out. One reason why
deferring is slow might be that currently each
client_write() allocates a new deferred buffer.
This should be optimized after MPD 0.14. */
client_write_deferred(client); client_write_deferred(client);
} else
client_write_direct(client, client->send_buf,
client->send_buf_used);
client->send_buf_used = 0;
} }
/** /**
* Write a block of data to the client. * Write a block of data to the client.
*/ */
static void static void
client_write(Client *client, const char *buffer, size_t buflen) client_write(Client *client, const char *data, size_t length)
{ {
/* if the client is going to be closed, do nothing */ /* if the client is going to be closed, do nothing */
if (client->IsExpired()) if (client->IsExpired() || length == 0)
return; return;
while (buflen > 0 && !client->IsExpired()) { client_defer_output(client, data, length);
size_t copylen;
assert(client->send_buf_used < sizeof(client->send_buf));
copylen = sizeof(client->send_buf) - client->send_buf_used;
if (copylen > buflen)
copylen = buflen;
memcpy(client->send_buf + client->send_buf_used, buffer,
copylen);
buflen -= copylen;
client->send_buf_used += copylen;
buffer += copylen;
if (client->send_buf_used >= sizeof(client->send_buf))
client_write_output(client);
}
} }
void void

143
src/util/PeakBuffer.cxx Normal file
View File

@ -0,0 +1,143 @@
/*
* Copyright (C) 2003-2013 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "PeakBuffer.hxx"
#include "HugeAllocator.hxx"
#include "fifo_buffer.h"
#include <algorithm>
#include <assert.h>
#include <stdint.h>
#include <string.h>
PeakBuffer::~PeakBuffer()
{
if (normal_buffer != nullptr)
fifo_buffer_free(normal_buffer);
if (peak_buffer != nullptr)
HugeFree(peak_buffer, peak_size);
}
bool
PeakBuffer::IsEmpty() const
{
return (normal_buffer == nullptr ||
fifo_buffer_is_empty(normal_buffer)) &&
(peak_buffer == nullptr ||
fifo_buffer_is_empty(peak_buffer));
}
const void *
PeakBuffer::Read(size_t *length_r) const
{
if (normal_buffer != nullptr) {
const void *p = fifo_buffer_read(normal_buffer, length_r);
if (p != nullptr)
return p;
}
if (peak_buffer != nullptr) {
const void *p = fifo_buffer_read(peak_buffer, length_r);
if (p != nullptr)
return p;
}
return nullptr;
}
void
PeakBuffer::Consume(size_t length)
{
if (normal_buffer != nullptr && !fifo_buffer_is_empty(normal_buffer)) {
fifo_buffer_consume(normal_buffer, length);
return;
}
if (peak_buffer != nullptr && !fifo_buffer_is_empty(peak_buffer)) {
fifo_buffer_consume(peak_buffer, length);
if (fifo_buffer_is_empty(peak_buffer)) {
HugeFree(peak_buffer, peak_size);
peak_buffer = nullptr;
}
return;
}
}
static size_t
AppendTo(fifo_buffer *buffer, const void *data, size_t length)
{
assert(data != nullptr);
assert(length > 0);
size_t total = 0;
do {
size_t max_length;
void *p = fifo_buffer_write(buffer, &max_length);
if (p == nullptr)
break;
const size_t nbytes = std::min(length, max_length);
memcpy(p, data, nbytes);
fifo_buffer_append(buffer, nbytes);
data = (const uint8_t *)data + nbytes;
length -= nbytes;
total += nbytes;
} while (length > 0);
return total;
}
bool
PeakBuffer::Append(const void *data, size_t length)
{
if (length == 0)
return true;
if (peak_buffer != nullptr && !fifo_buffer_is_empty(peak_buffer)) {
size_t nbytes = AppendTo(peak_buffer, data, length);
return nbytes == length;
}
if (normal_buffer == nullptr)
normal_buffer = fifo_buffer_new(normal_size);
size_t nbytes = AppendTo(normal_buffer, data, length);
if (nbytes > 0) {
data = (const uint8_t *)data + nbytes;
length -= nbytes;
if (length == 0)
return true;
}
if (peak_buffer == nullptr && peak_size > 0) {
peak_buffer = (fifo_buffer *)HugeAllocate(peak_size);
if (peak_buffer == nullptr)
return false;
fifo_buffer_init(peak_buffer, peak_size);
}
nbytes = AppendTo(peak_buffer, data, length);
return nbytes == length;
}

66
src/util/PeakBuffer.hxx Normal file
View File

@ -0,0 +1,66 @@
/*
* Copyright (C) 2003-2013 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef MPD_PEAK_BUFFER_HXX
#define MPD_PEAK_BUFFER_HXX
#include "gcc.h"
#include <stddef.h>
struct fifo_buffer;
/**
* A FIFO-like buffer that will allocate more memory on demand to
* allow large peaks. This second buffer will be given back to the
* kernel when it has been consumed.
*/
class PeakBuffer {
size_t normal_size, peak_size;
fifo_buffer *normal_buffer, *peak_buffer;
public:
PeakBuffer(size_t _normal_size, size_t _peak_size)
:normal_size(_normal_size), peak_size(_peak_size),
normal_buffer(nullptr), peak_buffer(nullptr) {}
PeakBuffer(PeakBuffer &&other)
:normal_size(other.normal_size), peak_size(other.peak_size),
normal_buffer(other.normal_buffer),
peak_buffer(other.peak_buffer) {
other.normal_buffer = nullptr;
other.peak_buffer = nullptr;
}
~PeakBuffer();
PeakBuffer(const PeakBuffer &) = delete;
PeakBuffer &operator=(const PeakBuffer &) = delete;
gcc_pure
bool IsEmpty() const;
const void *Read(size_t *length_r) const;
void Consume(size_t length);
bool Append(const void *data, size_t length);
};
#endif