diff --git a/src/input/CurlInputPlugin.cxx b/src/input/CurlInputPlugin.cxx index 08df6cbd9..be919bb0e 100644 --- a/src/input/CurlInputPlugin.cxx +++ b/src/input/CurlInputPlugin.cxx @@ -42,6 +42,7 @@ extern "C" { #include #include +#include #include #include @@ -69,7 +70,7 @@ static const size_t CURL_RESUME_AT = 384 * 1024; /** * Buffers created by input_curl_writefunction(). */ -struct buffer { +class CurlInputBuffer { /** size of the payload */ size_t size; @@ -77,7 +78,55 @@ struct buffer { size_t consumed; /** the payload */ - unsigned char data[sizeof(long)]; + uint8_t *data; + +public: + CurlInputBuffer(const void *_data, size_t _size) + :size(_size), consumed(0), data(new uint8_t[size]) { + memcpy(data, _data, size); + } + + ~CurlInputBuffer() { + delete[] data; + } + + CurlInputBuffer(const CurlInputBuffer &) = delete; + CurlInputBuffer &operator=(const CurlInputBuffer &) = delete; + + const void *Begin() const { + return data + consumed; + } + + size_t TotalSize() const { + return size; + } + + size_t Available() const { + return size - consumed; + } + + /** + * Mark a part of the buffer as consumed. + * + * @return false if the buffer is now empty + */ + bool Consume(size_t length) { + assert(consumed < size); + + consumed += length; + if (consumed < size) + return true; + + assert(consumed == size); + return false; + } + + bool Read(void *dest, size_t length) { + assert(consumed + length <= size); + + memcpy(dest, data + consumed, length); + return Consume(length); + } }; struct input_curl { @@ -93,7 +142,7 @@ struct input_curl { /** list of buffers, where input_curl_writefunction() appends to, and input_curl_read() reads from them */ - GQueue *buffers; + std::list buffers; /** * Is the connection currently paused? That happens when the @@ -119,7 +168,6 @@ struct input_curl { input_curl(const char *url, GMutex *mutex, GCond *cond) :range(nullptr), request_headers(nullptr), - buffers(g_queue_new()), paused(false), meta_name(nullptr), tag(nullptr), @@ -707,32 +755,12 @@ curl_total_buffer_size(const struct input_curl *c) { size_t total = 0; - for (GList *i = g_queue_peek_head_link(c->buffers); - i != NULL; i = g_list_next(i)) { - struct buffer *buffer = (struct buffer *)i->data; - total += buffer->size; - } + for (const auto &i : c->buffers) + total += i.TotalSize(); return total; } -static void -buffer_free_callback(gpointer data, G_GNUC_UNUSED gpointer user_data) -{ - struct buffer *buffer = (struct buffer *)data; - - assert(buffer->consumed <= buffer->size); - - g_free(buffer); -} - -static void -input_curl_flush_buffers(struct input_curl *c) -{ - g_queue_foreach(c->buffers, buffer_free_callback, NULL); - g_queue_clear(c->buffers); -} - input_curl::~input_curl() { if (tag != NULL) @@ -740,9 +768,6 @@ input_curl::~input_curl() g_free(meta_name); input_curl_easy_free_indirect(this); - input_curl_flush_buffers(this); - - g_queue_free(buffers); if (postponed_error != NULL) g_error_free(postponed_error); @@ -777,7 +802,7 @@ input_curl_tag(struct input_stream *is) static bool fill_buffer(struct input_curl *c, GError **error_r) { - while (c->easy != NULL && g_queue_is_empty(c->buffers)) + while (c->easy != NULL && c->buffers.empty()) g_cond_wait(c->base.cond, c->base.mutex); if (c->postponed_error != NULL) { @@ -786,78 +811,50 @@ fill_buffer(struct input_curl *c, GError **error_r) return false; } - return !g_queue_is_empty(c->buffers); -} - -/** - * Mark a part of the buffer object as consumed. - */ -static struct buffer * -consume_buffer(struct buffer *buffer, size_t length) -{ - assert(buffer != NULL); - assert(buffer->consumed < buffer->size); - - buffer->consumed += length; - if (buffer->consumed < buffer->size) - return buffer; - - assert(buffer->consumed == buffer->size); - - g_free(buffer); - - return NULL; + return !c->buffers.empty(); } static size_t -read_from_buffer(IcyMetaDataParser &icy, GQueue *buffers, +read_from_buffer(IcyMetaDataParser &icy, std::list &buffers, void *dest0, size_t length) { - struct buffer *buffer = (struct buffer *)g_queue_pop_head(buffers); + auto &buffer = buffers.front(); uint8_t *dest = (uint8_t *)dest0; size_t nbytes = 0; - assert(buffer->size > 0); - assert(buffer->consumed < buffer->size); - - if (length > buffer->size - buffer->consumed) - length = buffer->size - buffer->consumed; + if (length > buffer.Available()) + length = buffer.Available(); while (true) { size_t chunk; chunk = icy.Data(length); if (chunk > 0) { - memcpy(dest, buffer->data + buffer->consumed, - chunk); - buffer = consume_buffer(buffer, chunk); + const bool empty = !buffer.Read(dest, chunk); nbytes += chunk; dest += chunk; length -= chunk; - if (length == 0) + if (empty) { + buffers.pop_front(); break; - - assert(buffer != NULL); + } } - chunk = icy.Meta(buffer->data + buffer->consumed, length); + chunk = icy.Meta(buffer.Begin(), length); if (chunk > 0) { - buffer = consume_buffer(buffer, chunk); + const bool empty = !buffer.Consume(chunk); length -= chunk; - if (length == 0) + if (empty) { + buffers.pop_front(); break; - - assert(buffer != NULL); + } } } - if (buffer != NULL) - g_queue_push_head(buffers, buffer); - return nbytes; } @@ -884,7 +881,7 @@ input_curl_available(struct input_stream *is) struct input_curl *c = (struct input_curl *)is; return c->postponed_error != NULL || c->easy == NULL || - !g_queue_is_empty(c->buffers); + !c->buffers.empty(); } static size_t @@ -905,7 +902,7 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, /* send buffer contents */ - while (size > 0 && !g_queue_is_empty(c->buffers)) { + while (size > 0 && !c->buffers.empty()) { size_t copy = read_from_buffer(c->icy, c->buffers, dest + nbytes, size); @@ -941,7 +938,7 @@ input_curl_eof(G_GNUC_UNUSED struct input_stream *is) { struct input_curl *c = (struct input_curl *)is; - return c->easy == NULL && g_queue_is_empty(c->buffers); + return c->easy == NULL && c->buffers.empty(); } /** called by curl when new data is available */ @@ -1047,13 +1044,7 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) return CURL_WRITEFUNC_PAUSE; } - struct buffer *buffer = (struct buffer *) - g_malloc(sizeof(*buffer) - sizeof(buffer->data) + size); - buffer->size = size; - buffer->consumed = 0; - memcpy(buffer->data, ptr, size); - - g_queue_push_tail(c->buffers, buffer); + c->buffers.emplace_back(ptr, size); c->base.ready = true; g_cond_broadcast(c->base.cond); @@ -1164,19 +1155,15 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, /* check if we can fast-forward the buffer */ - while (offset > is->offset && !g_queue_is_empty(c->buffers)) { - struct buffer *buffer; - size_t length; - - buffer = (struct buffer *)g_queue_pop_head(c->buffers); - - length = buffer->size - buffer->consumed; + while (offset > is->offset && !c->buffers.empty()) { + auto &buffer = c->buffers.front(); + size_t length = buffer.Available(); if (offset - is->offset < (goffset)length) length = offset - is->offset; - buffer = consume_buffer(buffer, length); - if (buffer != NULL) - g_queue_push_head(c->buffers, buffer); + const bool empty = !buffer.Consume(length); + if (empty) + c->buffers.pop_front(); is->offset += length; } @@ -1189,7 +1176,7 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, g_mutex_unlock(c->base.mutex); input_curl_easy_free_indirect(c); - input_curl_flush_buffers(c); + c->buffers.clear(); is->offset = offset; if (is->offset == is->size) {