input/curl: use CircularBuffer

Replaces its own weird buffering code.
This commit is contained in:
Max Kellermann 2014-03-15 20:37:14 +01:00
parent 328860c8ba
commit c7e2f558a0

View File

@ -33,6 +33,8 @@
#include "util/ASCII.hxx" #include "util/ASCII.hxx"
#include "util/CharUtil.hxx" #include "util/CharUtil.hxx"
#include "util/NumberParser.hxx" #include "util/NumberParser.hxx"
#include "util/CircularBuffer.hxx"
#include "util/HugeAllocator.hxx"
#include "util/Error.hxx" #include "util/Error.hxx"
#include "util/Domain.hxx" #include "util/Domain.hxx"
#include "Log.hxx" #include "Log.hxx"
@ -46,9 +48,6 @@
#endif #endif
#include <string.h> #include <string.h>
#include <errno.h>
#include <list>
#include <curl/curl.h> #include <curl/curl.h>
@ -68,68 +67,6 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024;
*/ */
static const size_t CURL_RESUME_AT = 384 * 1024; static const size_t CURL_RESUME_AT = 384 * 1024;
/**
* Buffers created by input_curl_writefunction().
*/
class CurlInputBuffer {
/** size of the payload */
size_t size;
/** how much has been consumed yet? */
size_t consumed;
/** the payload */
uint8_t *data;
public:
CurlInputBuffer(const void *_data, size_t _size)
:size(_size), consumed(0), data(new uint8_t[size]) {
memcpy(data, _data, size);
}
~CurlInputBuffer() {
delete[] data;
}
CurlInputBuffer(const CurlInputBuffer &) = delete;
CurlInputBuffer &operator=(const CurlInputBuffer &) = delete;
const void *Begin() const {
return data + consumed;
}
size_t TotalSize() const {
return size;
}
size_t Available() const {
return size - consumed;
}
/**
* Mark a part of the buffer as consumed.
*
* @return false if the buffer is now empty
*/
bool Consume(size_t length) {
assert(consumed < size);
consumed += length;
if (consumed < size)
return true;
assert(consumed == size);
return false;
}
bool Read(void *dest, size_t length) {
assert(consumed + length <= size);
memcpy(dest, data + consumed, length);
return Consume(length);
}
};
struct CurlInputStream { struct CurlInputStream {
InputStream base; InputStream base;
@ -141,9 +78,11 @@ struct CurlInputStream {
/** the curl handles */ /** the curl handles */
CURL *easy; CURL *easy;
/** list of buffers, where input_curl_writefunction() appends /**
to, and input_curl_read() reads from them */ * A buffer where input_curl_writefunction() appends
std::list<CurlInputBuffer> buffers; * to, and input_curl_read() reads from.
*/
CircularBuffer<uint8_t> buffer;
/** /**
* Is the connection currently paused? That happens when the * Is the connection currently paused? That happens when the
@ -167,9 +106,11 @@ struct CurlInputStream {
Error postponed_error; Error postponed_error;
CurlInputStream(const char *url, Mutex &mutex, Cond &cond) CurlInputStream(const char *url, Mutex &mutex, Cond &cond,
void *_buffer)
:base(input_plugin_curl, url, mutex, cond), :base(input_plugin_curl, url, mutex, cond),
request_headers(nullptr), request_headers(nullptr),
buffer((uint8_t *)_buffer, CURL_MAX_BUFFERED),
paused(false), paused(false),
tag(nullptr) {} tag(nullptr) {}
@ -184,7 +125,7 @@ struct CurlInputStream {
bool Check(Error &error); bool Check(Error &error);
bool IsEOF() const { bool IsEOF() const {
return easy == nullptr && buffers.empty(); return easy == nullptr && buffer.IsEmpty();
} }
bool Seek(InputPlugin::offset_type offset, int whence, Error &error); bool Seek(InputPlugin::offset_type offset, int whence, Error &error);
@ -193,7 +134,7 @@ struct CurlInputStream {
bool IsAvailable() const { bool IsAvailable() const {
return postponed_error.IsDefined() || easy == nullptr || return postponed_error.IsDefined() || easy == nullptr ||
!buffers.empty(); !buffer.IsEmpty();
} }
size_t Read(void *ptr, size_t size, Error &error); size_t Read(void *ptr, size_t size, Error &error);
@ -224,13 +165,14 @@ struct CurlInputStream {
bool FillBuffer(Error &error); bool FillBuffer(Error &error);
/** /**
* Determine the total sizes of all buffers, including * Returns the number of bytes stored in the buffer.
* portions that have already been consumed.
* *
* The caller must lock the mutex. * The caller must lock the mutex.
*/ */
gcc_pure gcc_pure
size_t GetTotalBufferSize() const; size_t GetTotalBufferSize() const {
return buffer.GetSize();
}
void CopyIcyTag(); void CopyIcyTag();
@ -699,21 +641,14 @@ input_curl_finish(void)
curl_global_cleanup(); curl_global_cleanup();
} }
size_t
CurlInputStream::GetTotalBufferSize() const
{
size_t total = 0;
for (const auto &i : buffers)
total += i.TotalSize();
return total;
}
CurlInputStream::~CurlInputStream() CurlInputStream::~CurlInputStream()
{ {
delete tag; delete tag;
FreeEasyIndirect(); FreeEasyIndirect();
buffer.Clear();
HugeFree(buffer.Write().data, CURL_MAX_BUFFERED);
} }
inline bool inline bool
@ -753,7 +688,7 @@ input_curl_tag(InputStream *is)
inline bool inline bool
CurlInputStream::FillBuffer(Error &error) CurlInputStream::FillBuffer(Error &error)
{ {
while (easy != nullptr && buffers.empty()) while (easy != nullptr && buffer.IsEmpty())
base.cond.wait(base.mutex); base.cond.wait(base.mutex);
if (postponed_error.IsDefined()) { if (postponed_error.IsDefined()) {
@ -762,51 +697,44 @@ CurlInputStream::FillBuffer(Error &error)
return false; return false;
} }
return !buffers.empty(); return !buffer.IsEmpty();
} }
static size_t static size_t
read_from_buffer(IcyMetaDataParser &icy, std::list<CurlInputBuffer> &buffers, read_from_buffer(IcyMetaDataParser &icy, CircularBuffer<uint8_t> &buffer,
void *dest0, size_t length) void *dest0, size_t length)
{ {
auto &buffer = buffers.front();
uint8_t *dest = (uint8_t *)dest0; uint8_t *dest = (uint8_t *)dest0;
size_t nbytes = 0; size_t nbytes = 0;
if (length > buffer.Available())
length = buffer.Available();
while (true) { while (true) {
size_t chunk; auto r = buffer.Read();
if (r.IsEmpty())
break;
chunk = icy.Data(length); if (r.size > length)
r.size = length;
size_t chunk = icy.Data(r.size);
if (chunk > 0) { if (chunk > 0) {
const bool empty = !buffer.Read(dest, chunk); memcpy(dest, r.data, chunk);
buffer.Consume(chunk);
nbytes += chunk; nbytes += chunk;
dest += chunk; dest += chunk;
length -= chunk; length -= chunk;
if (empty) {
buffers.pop_front();
break;
}
if (length == 0) if (length == 0)
break; break;
} }
chunk = icy.Meta(buffer.Begin(), length); r = buffer.Read();
if (r.IsEmpty())
break;
chunk = icy.Meta(r.data, r.size);
if (chunk > 0) { if (chunk > 0) {
const bool empty = !buffer.Consume(chunk); buffer.Consume(chunk);
length -= chunk;
if (empty) {
buffers.pop_front();
break;
}
if (length == 0) if (length == 0)
break; break;
} }
@ -843,8 +771,7 @@ input_curl_available(InputStream *is)
inline size_t inline size_t
CurlInputStream::Read(void *ptr, size_t size, Error &error) CurlInputStream::Read(void *ptr, size_t size, Error &error)
{ {
size_t nbytes = 0; size_t nbytes;
char *dest = (char *)ptr;
do { do {
/* fill the buffer */ /* fill the buffer */
@ -852,15 +779,7 @@ CurlInputStream::Read(void *ptr, size_t size, Error &error)
if (!FillBuffer(error)) if (!FillBuffer(error))
return 0; return 0;
/* send buffer contents */ nbytes = read_from_buffer(icy, buffer, ptr, size);
while (size > 0 && !buffers.empty()) {
size_t copy = read_from_buffer(icy, buffers,
dest + nbytes, size);
nbytes += copy;
size -= copy;
}
} while (nbytes == 0); } while (nbytes == 0);
if (icy.IsDefined()) if (icy.IsDefined())
@ -986,12 +905,28 @@ CurlInputStream::DataReceived(const void *ptr, size_t size)
const ScopeLock protect(base.mutex); const ScopeLock protect(base.mutex);
if (GetTotalBufferSize() + size >= CURL_MAX_BUFFERED) { if (size > buffer.GetSpace()) {
paused = true; paused = true;
return CURL_WRITEFUNC_PAUSE; return CURL_WRITEFUNC_PAUSE;
} }
buffers.emplace_back(ptr, size); auto w = buffer.Write();
assert(!w.IsEmpty());
size_t nbytes = std::min(w.size, size);
memcpy(w.data, ptr, nbytes);
buffer.Append(nbytes);
const size_t remaining = size - nbytes;
if (remaining > 0) {
w = buffer.Write();
assert(!w.IsEmpty());
assert(w.size >= remaining);
memcpy(w.data, (const uint8_t *)ptr + nbytes, remaining);
buffer.Append(size);
}
base.ready = true; base.ready = true;
base.cond.broadcast(); base.cond.broadcast();
return size; return size;
@ -1108,17 +1043,18 @@ CurlInputStream::Seek(InputPlugin::offset_type offset, int whence,
/* check if we can fast-forward the buffer */ /* check if we can fast-forward the buffer */
while (offset > base.offset && !buffers.empty()) { while (offset > base.offset) {
auto &buffer = buffers.front(); auto r = buffer.Read();
size_t length = buffer.Available(); if (r.IsEmpty())
if (offset - base.offset < (InputPlugin::offset_type)length) break;
length = offset - base.offset;
const bool empty = !buffer.Consume(length); const size_t nbytes =
if (empty) offset - base.offset < (InputPlugin::offset_type)r.size
buffers.pop_front(); ? offset - base.offset
: r.size;
base.offset += length; buffer.Consume(nbytes);
base.offset += nbytes;
} }
if (offset == base.offset) if (offset == base.offset)
@ -1129,7 +1065,7 @@ CurlInputStream::Seek(InputPlugin::offset_type offset, int whence,
base.mutex.unlock(); base.mutex.unlock();
FreeEasyIndirect(); FreeEasyIndirect();
buffers.clear(); buffer.Clear();
base.offset = offset; base.offset = offset;
if (base.offset == base.size) { if (base.offset == base.size) {
@ -1181,7 +1117,13 @@ inline InputStream *
CurlInputStream::Open(const char *url, Mutex &mutex, Cond &cond, CurlInputStream::Open(const char *url, Mutex &mutex, Cond &cond,
Error &error) Error &error)
{ {
CurlInputStream *c = new CurlInputStream(url, mutex, cond); void *buffer = HugeAllocate(CURL_MAX_BUFFERED);
if (buffer == nullptr) {
error.Set(curl_domain, "Out of memory");
return nullptr;
}
CurlInputStream *c = new CurlInputStream(url, mutex, cond, buffer);
if (!c->InitEasy(error) || !input_curl_easy_add_indirect(c, error)) { if (!c->InitEasy(error) || !input_curl_easy_add_indirect(c, error)) {
delete c; delete c;