diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx index 60facb26b..37509e998 100644 --- a/src/input/plugins/CurlInputPlugin.cxx +++ b/src/input/plugins/CurlInputPlugin.cxx @@ -33,6 +33,8 @@ #include "util/ASCII.hxx" #include "util/CharUtil.hxx" #include "util/NumberParser.hxx" +#include "util/CircularBuffer.hxx" +#include "util/HugeAllocator.hxx" #include "util/Error.hxx" #include "util/Domain.hxx" #include "Log.hxx" @@ -46,9 +48,6 @@ #endif #include -#include - -#include #include @@ -68,68 +67,6 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024; */ static const size_t CURL_RESUME_AT = 384 * 1024; -/** - * Buffers created by input_curl_writefunction(). - */ -class CurlInputBuffer { - /** size of the payload */ - size_t size; - - /** how much has been consumed yet? */ - size_t consumed; - - /** the payload */ - uint8_t *data; - -public: - CurlInputBuffer(const void *_data, size_t _size) - :size(_size), consumed(0), data(new uint8_t[size]) { - memcpy(data, _data, size); - } - - ~CurlInputBuffer() { - delete[] data; - } - - CurlInputBuffer(const CurlInputBuffer &) = delete; - CurlInputBuffer &operator=(const CurlInputBuffer &) = delete; - - const void *Begin() const { - return data + consumed; - } - - size_t TotalSize() const { - return size; - } - - size_t Available() const { - return size - consumed; - } - - /** - * Mark a part of the buffer as consumed. - * - * @return false if the buffer is now empty - */ - bool Consume(size_t length) { - assert(consumed < size); - - consumed += length; - if (consumed < size) - return true; - - assert(consumed == size); - return false; - } - - bool Read(void *dest, size_t length) { - assert(consumed + length <= size); - - memcpy(dest, data + consumed, length); - return Consume(length); - } -}; - struct CurlInputStream { InputStream base; @@ -141,9 +78,11 @@ struct CurlInputStream { /** the curl handles */ CURL *easy; - /** list of buffers, where input_curl_writefunction() appends - to, and input_curl_read() reads from them */ - std::list buffers; + /** + * A buffer where input_curl_writefunction() appends + * to, and input_curl_read() reads from. + */ + CircularBuffer buffer; /** * Is the connection currently paused? That happens when the @@ -167,9 +106,11 @@ struct CurlInputStream { Error postponed_error; - CurlInputStream(const char *url, Mutex &mutex, Cond &cond) + CurlInputStream(const char *url, Mutex &mutex, Cond &cond, + void *_buffer) :base(input_plugin_curl, url, mutex, cond), request_headers(nullptr), + buffer((uint8_t *)_buffer, CURL_MAX_BUFFERED), paused(false), tag(nullptr) {} @@ -184,7 +125,7 @@ struct CurlInputStream { bool Check(Error &error); bool IsEOF() const { - return easy == nullptr && buffers.empty(); + return easy == nullptr && buffer.IsEmpty(); } bool Seek(InputPlugin::offset_type offset, int whence, Error &error); @@ -193,7 +134,7 @@ struct CurlInputStream { bool IsAvailable() const { return postponed_error.IsDefined() || easy == nullptr || - !buffers.empty(); + !buffer.IsEmpty(); } size_t Read(void *ptr, size_t size, Error &error); @@ -224,13 +165,14 @@ struct CurlInputStream { bool FillBuffer(Error &error); /** - * Determine the total sizes of all buffers, including - * portions that have already been consumed. + * Returns the number of bytes stored in the buffer. * * The caller must lock the mutex. */ gcc_pure - size_t GetTotalBufferSize() const; + size_t GetTotalBufferSize() const { + return buffer.GetSize(); + } void CopyIcyTag(); @@ -699,21 +641,14 @@ input_curl_finish(void) curl_global_cleanup(); } -size_t -CurlInputStream::GetTotalBufferSize() const -{ - size_t total = 0; - for (const auto &i : buffers) - total += i.TotalSize(); - - return total; -} - CurlInputStream::~CurlInputStream() { delete tag; FreeEasyIndirect(); + + buffer.Clear(); + HugeFree(buffer.Write().data, CURL_MAX_BUFFERED); } inline bool @@ -753,7 +688,7 @@ input_curl_tag(InputStream *is) inline bool CurlInputStream::FillBuffer(Error &error) { - while (easy != nullptr && buffers.empty()) + while (easy != nullptr && buffer.IsEmpty()) base.cond.wait(base.mutex); if (postponed_error.IsDefined()) { @@ -762,51 +697,44 @@ CurlInputStream::FillBuffer(Error &error) return false; } - return !buffers.empty(); + return !buffer.IsEmpty(); } static size_t -read_from_buffer(IcyMetaDataParser &icy, std::list &buffers, +read_from_buffer(IcyMetaDataParser &icy, CircularBuffer &buffer, void *dest0, size_t length) { - auto &buffer = buffers.front(); uint8_t *dest = (uint8_t *)dest0; size_t nbytes = 0; - if (length > buffer.Available()) - length = buffer.Available(); - while (true) { - size_t chunk; + auto r = buffer.Read(); + if (r.IsEmpty()) + break; - chunk = icy.Data(length); + if (r.size > length) + r.size = length; + + size_t chunk = icy.Data(r.size); if (chunk > 0) { - const bool empty = !buffer.Read(dest, chunk); + memcpy(dest, r.data, chunk); + buffer.Consume(chunk); nbytes += chunk; dest += chunk; length -= chunk; - if (empty) { - buffers.pop_front(); - break; - } - if (length == 0) break; } - chunk = icy.Meta(buffer.Begin(), length); + r = buffer.Read(); + if (r.IsEmpty()) + break; + + chunk = icy.Meta(r.data, r.size); if (chunk > 0) { - const bool empty = !buffer.Consume(chunk); - - length -= chunk; - - if (empty) { - buffers.pop_front(); - break; - } - + buffer.Consume(chunk); if (length == 0) break; } @@ -843,8 +771,7 @@ input_curl_available(InputStream *is) inline size_t CurlInputStream::Read(void *ptr, size_t size, Error &error) { - size_t nbytes = 0; - char *dest = (char *)ptr; + size_t nbytes; do { /* fill the buffer */ @@ -852,15 +779,7 @@ CurlInputStream::Read(void *ptr, size_t size, Error &error) if (!FillBuffer(error)) return 0; - /* send buffer contents */ - - while (size > 0 && !buffers.empty()) { - size_t copy = read_from_buffer(icy, buffers, - dest + nbytes, size); - - nbytes += copy; - size -= copy; - } + nbytes = read_from_buffer(icy, buffer, ptr, size); } while (nbytes == 0); if (icy.IsDefined()) @@ -986,12 +905,28 @@ CurlInputStream::DataReceived(const void *ptr, size_t size) const ScopeLock protect(base.mutex); - if (GetTotalBufferSize() + size >= CURL_MAX_BUFFERED) { + if (size > buffer.GetSpace()) { paused = true; return CURL_WRITEFUNC_PAUSE; } - buffers.emplace_back(ptr, size); + auto w = buffer.Write(); + assert(!w.IsEmpty()); + + size_t nbytes = std::min(w.size, size); + memcpy(w.data, ptr, nbytes); + buffer.Append(nbytes); + + const size_t remaining = size - nbytes; + if (remaining > 0) { + w = buffer.Write(); + assert(!w.IsEmpty()); + assert(w.size >= remaining); + + memcpy(w.data, (const uint8_t *)ptr + nbytes, remaining); + buffer.Append(size); + } + base.ready = true; base.cond.broadcast(); return size; @@ -1108,17 +1043,18 @@ CurlInputStream::Seek(InputPlugin::offset_type offset, int whence, /* check if we can fast-forward the buffer */ - while (offset > base.offset && !buffers.empty()) { - auto &buffer = buffers.front(); - size_t length = buffer.Available(); - if (offset - base.offset < (InputPlugin::offset_type)length) - length = offset - base.offset; + while (offset > base.offset) { + auto r = buffer.Read(); + if (r.IsEmpty()) + break; - const bool empty = !buffer.Consume(length); - if (empty) - buffers.pop_front(); + const size_t nbytes = + offset - base.offset < (InputPlugin::offset_type)r.size + ? offset - base.offset + : r.size; - base.offset += length; + buffer.Consume(nbytes); + base.offset += nbytes; } if (offset == base.offset) @@ -1129,7 +1065,7 @@ CurlInputStream::Seek(InputPlugin::offset_type offset, int whence, base.mutex.unlock(); FreeEasyIndirect(); - buffers.clear(); + buffer.Clear(); base.offset = offset; if (base.offset == base.size) { @@ -1181,7 +1117,13 @@ inline InputStream * CurlInputStream::Open(const char *url, Mutex &mutex, Cond &cond, Error &error) { - CurlInputStream *c = new CurlInputStream(url, mutex, cond); + void *buffer = HugeAllocate(CURL_MAX_BUFFERED); + if (buffer == nullptr) { + error.Set(curl_domain, "Out of memory"); + return nullptr; + } + + CurlInputStream *c = new CurlInputStream(url, mutex, cond, buffer); if (!c->InitEasy(error) || !input_curl_easy_add_indirect(c, error)) { delete c;