input/curl: move code to CurlInputStream methods

This commit is contained in:
Max Kellermann 2014-03-15 20:40:43 +01:00
parent e9f16fca96
commit 23eacbd132

View File

@ -177,6 +177,63 @@ struct CurlInputStream {
CurlInputStream(const CurlInputStream &) = delete;
CurlInputStream &operator=(const CurlInputStream &) = delete;
bool Check(Error &error);
bool IsEOF() const {
return easy == nullptr && buffers.empty();
}
Tag *ReadTag();
bool IsAvailable() const {
return postponed_error.IsDefined() || easy == nullptr ||
!buffers.empty();
}
size_t Read(void *ptr, size_t size, Error &error);
/**
* Frees the current "libcurl easy" handle, and everything
* associated with it.
*
* Runs in the I/O thread.
*/
void FreeEasy();
/**
* Frees the current "libcurl easy" handle, and everything associated
* with it.
*
* The mutex must not be locked.
*/
void FreeEasyIndirect();
void HeaderReceived(const char *name,
const char *value, const char *end);
size_t DataReceived(const void *ptr, size_t size);
void Resume();
bool FillBuffer(Error &error);
/**
* Determine the total sizes of all buffers, including
* portions that have already been consumed.
*
* The caller must lock the mutex.
*/
gcc_pure
size_t GetTotalBufferSize() const;
void CopyIcyTag();
/**
* A HTTP request is finished.
*
* Runs in the I/O thread. The caller must not hold locks.
*/
void RequestDone(CURLcode result, long status);
};
class CurlMulti;
@ -335,14 +392,14 @@ input_curl_find_request(CURL *easy)
return (CurlInputStream *)p;
}
static void
input_curl_resume(CurlInputStream *c)
inline void
CurlInputStream::Resume()
{
assert(io_thread_inside());
if (c->paused) {
c->paused = false;
curl_easy_pause(c->easy, CURLPAUSE_CONT);
if (paused) {
paused = false;
curl_easy_pause(easy, CURLPAUSE_CONT);
if (curl_version_num < 0x072000)
/* libcurl older than 7.32.0 does not update
@ -445,74 +502,55 @@ CurlMulti::Remove(CurlInputStream *c)
curl_multi_remove_handle(multi, c->easy);
}
/**
* Frees the current "libcurl easy" handle, and everything associated
* with it.
*
* Runs in the I/O thread.
*/
static void
input_curl_easy_free(CurlInputStream *c)
void
CurlInputStream::FreeEasy()
{
assert(io_thread_inside());
assert(c != nullptr);
if (c->easy == nullptr)
if (easy == nullptr)
return;
curl_multi->Remove(c);
curl_multi->Remove(this);
curl_easy_cleanup(c->easy);
c->easy = nullptr;
curl_easy_cleanup(easy);
easy = nullptr;
curl_slist_free_all(c->request_headers);
c->request_headers = nullptr;
curl_slist_free_all(request_headers);
request_headers = nullptr;
}
/**
* Frees the current "libcurl easy" handle, and everything associated
* with it.
*
* The mutex must not be locked.
*/
static void
input_curl_easy_free_indirect(CurlInputStream *c)
void
CurlInputStream::FreeEasyIndirect()
{
BlockingCall(io_thread_get(), [c](){
input_curl_easy_free(c);
BlockingCall(io_thread_get(), [this](){
FreeEasy();
curl_multi->InvalidateSockets();
});
assert(c->easy == nullptr);
assert(easy == nullptr);
}
/**
* A HTTP request is finished.
*
* Runs in the I/O thread. The caller must not hold locks.
*/
static void
input_curl_request_done(CurlInputStream *c, CURLcode result, long status)
inline void
CurlInputStream::RequestDone(CURLcode result, long status)
{
assert(io_thread_inside());
assert(c != nullptr);
assert(c->easy == nullptr);
assert(!c->postponed_error.IsDefined());
assert(!postponed_error.IsDefined());
const ScopeLock protect(c->base.mutex);
FreeEasy();
const ScopeLock protect(base.mutex);
if (result != CURLE_OK) {
c->postponed_error.Format(curl_domain, result,
"curl failed: %s", c->error_buffer);
postponed_error.Format(curl_domain, result,
"curl failed: %s", error_buffer);
} else if (status < 200 || status >= 300) {
c->postponed_error.Format(http_domain, status,
"got HTTP status %ld",
status);
postponed_error.Format(http_domain, status,
"got HTTP status %ld",
status);
}
c->base.ready = true;
c->base.cond.broadcast();
base.ready = true;
base.cond.broadcast();
}
static void
@ -524,8 +562,7 @@ input_curl_handle_done(CURL *easy_handle, CURLcode result)
long status = 0;
curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &status);
input_curl_easy_free(c);
input_curl_request_done(c, result, status);
c->RequestDone(result, status);
}
void
@ -656,19 +693,11 @@ input_curl_finish(void)
curl_global_cleanup();
}
/**
* Determine the total sizes of all buffers, including portions that
* have already been consumed.
*
* The caller must lock the mutex.
*/
gcc_pure
static size_t
curl_total_buffer_size(const CurlInputStream *c)
size_t
CurlInputStream::GetTotalBufferSize() const
{
size_t total = 0;
for (const auto &i : c->buffers)
for (const auto &i : buffers)
total += i.TotalSize();
return total;
@ -678,46 +707,56 @@ CurlInputStream::~CurlInputStream()
{
delete tag;
input_curl_easy_free_indirect(this);
FreeEasyIndirect();
}
static bool
input_curl_check(InputStream *is, Error &error)
inline bool
CurlInputStream::Check(Error &error)
{
CurlInputStream *c = (CurlInputStream *)is;
bool success = !c->postponed_error.IsDefined();
bool success = !postponed_error.IsDefined();
if (!success) {
error = std::move(c->postponed_error);
c->postponed_error.Clear();
error = std::move(postponed_error);
postponed_error.Clear();
}
return success;
}
static bool
input_curl_check(InputStream *is, Error &error)
{
CurlInputStream &c = *(CurlInputStream *)is;
return c.Check(error);
}
inline Tag *
CurlInputStream::ReadTag()
{
Tag *result = tag;
tag = nullptr;
return result;
}
static Tag *
input_curl_tag(InputStream *is)
{
CurlInputStream *c = (CurlInputStream *)is;
Tag *tag = c->tag;
c->tag = nullptr;
return tag;
CurlInputStream &c = *(CurlInputStream *)is;
return c.ReadTag();
}
static bool
fill_buffer(CurlInputStream *c, Error &error)
inline bool
CurlInputStream::FillBuffer(Error &error)
{
while (c->easy != nullptr && c->buffers.empty())
c->base.cond.wait(c->base.mutex);
while (easy != nullptr && buffers.empty())
base.cond.wait(base.mutex);
if (c->postponed_error.IsDefined()) {
error = std::move(c->postponed_error);
c->postponed_error.Clear();
if (postponed_error.IsDefined()) {
error = std::move(postponed_error);
postponed_error.Clear();
return false;
}
return !c->buffers.empty();
return !buffers.empty();
}
static size_t
@ -770,54 +809,47 @@ read_from_buffer(IcyMetaDataParser &icy, std::list<CurlInputBuffer> &buffers,
return nbytes;
}
static void
copy_icy_tag(CurlInputStream *c)
inline void
CurlInputStream::CopyIcyTag()
{
Tag *tag = c->icy.ReadTag();
if (tag == nullptr)
Tag *new_tag = icy.ReadTag();
if (new_tag == nullptr)
return;
delete c->tag;
delete tag;
if (!c->meta_name.empty() && !tag->HasType(TAG_NAME)) {
TagBuilder tag_builder(std::move(*tag));
tag_builder.AddItem(TAG_NAME, c->meta_name.c_str());
*tag = tag_builder.Commit();
if (!meta_name.empty() && !new_tag->HasType(TAG_NAME)) {
TagBuilder tag_builder(std::move(*new_tag));
tag_builder.AddItem(TAG_NAME, meta_name.c_str());
*new_tag = tag_builder.Commit();
}
c->tag = tag;
tag = new_tag;
}
static bool
input_curl_available(InputStream *is)
{
CurlInputStream *c = (CurlInputStream *)is;
return c->postponed_error.IsDefined() || c->easy == nullptr ||
!c->buffers.empty();
const CurlInputStream &c = *(const CurlInputStream *)is;
return c.IsAvailable();
}
static size_t
input_curl_read(InputStream *is, void *ptr, size_t size,
Error &error)
inline size_t
CurlInputStream::Read(void *ptr, size_t size, Error &error)
{
CurlInputStream *c = (CurlInputStream *)is;
bool success;
size_t nbytes = 0;
char *dest = (char *)ptr;
do {
/* fill the buffer */
success = fill_buffer(c, error);
if (!success)
if (!FillBuffer(error))
return 0;
/* send buffer contents */
while (size > 0 && !c->buffers.empty()) {
size_t copy = read_from_buffer(c->icy, c->buffers,
while (size > 0 && !buffers.empty()) {
size_t copy = read_from_buffer(icy, buffers,
dest + nbytes, size);
nbytes += copy;
@ -825,24 +857,32 @@ input_curl_read(InputStream *is, void *ptr, size_t size,
}
} while (nbytes == 0);
if (c->icy.IsDefined())
copy_icy_tag(c);
if (icy.IsDefined())
CopyIcyTag();
is->offset += (InputPlugin::offset_type)nbytes;
base.offset += (InputPlugin::offset_type)nbytes;
if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) {
c->base.mutex.unlock();
if (paused && GetTotalBufferSize() < CURL_RESUME_AT) {
base.mutex.unlock();
BlockingCall(io_thread_get(), [c](){
input_curl_resume(c);
BlockingCall(io_thread_get(), [this](){
Resume();
});
c->base.mutex.lock();
base.mutex.lock();
}
return nbytes;
}
static size_t
input_curl_read(InputStream *is, void *ptr, size_t size,
Error &error)
{
CurlInputStream &c = *(CurlInputStream *)is;
return c.Read(ptr, size, error);
}
static void
input_curl_close(InputStream *is)
{
@ -854,23 +894,78 @@ input_curl_close(InputStream *is)
static bool
input_curl_eof(gcc_unused InputStream *is)
{
CurlInputStream *c = (CurlInputStream *)is;
const CurlInputStream &c = *(const CurlInputStream *)is;
return c.IsEOF();
}
return c->easy == nullptr && c->buffers.empty();
inline void
CurlInputStream::HeaderReceived(const char *name,
const char *value, const char *end)
{
if (StringEqualsCaseASCII(name, "accept-ranges")) {
/* a stream with icy-metadata is not seekable */
if (!icy.IsDefined())
base.seekable = true;
} else if (StringEqualsCaseASCII(name, "content-length")) {
char buffer[64];
if ((size_t)(end - value) >= sizeof(buffer))
return;
memcpy(buffer, value, end - value);
buffer[end - value] = 0;
base.size = base.offset + ParseUint64(buffer);
} else if (StringEqualsCaseASCII(name, "content-type")) {
base.mime.assign(value, end);
} else if (StringEqualsCaseASCII(name, "icy-name") ||
StringEqualsCaseASCII(name, "ice-name") ||
StringEqualsCaseASCII(name, "x-audiocast-name")) {
meta_name.assign(value, end);
delete tag;
TagBuilder tag_builder;
tag_builder.AddItem(TAG_NAME, meta_name.c_str());
tag = tag_builder.CommitNew();
} else if (StringEqualsCaseASCII(name, "icy-metaint")) {
char buffer[64];
size_t icy_metaint;
if ((size_t)(end - value) >= sizeof(buffer) ||
icy.IsDefined())
return;
memcpy(buffer, value, end - value);
buffer[end - value] = 0;
icy_metaint = ParseUint64(buffer);
FormatDebug(curl_domain, "icy-metaint=%zu", icy_metaint);
if (icy_metaint > 0) {
icy.Start(icy_metaint);
/* a stream with icy-metadata is not
seekable */
base.seekable = false;
}
}
}
/** called by curl when new data is available */
static size_t
input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
{
CurlInputStream *c = (CurlInputStream *)stream;
char name[64];
CurlInputStream &c = *(CurlInputStream *)stream;
size *= nmemb;
const char *header = (const char *)ptr;
const char *end = header + size;
char name[64];
const char *value = (const char *)memchr(header, ':', size);
if (value == nullptr || (size_t)(value - header) >= sizeof(name))
return size;
@ -890,56 +985,25 @@ input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
while (end > value && IsWhitespaceOrNull(end[-1]))
--end;
if (StringEqualsCaseASCII(name, "accept-ranges")) {
/* a stream with icy-metadata is not seekable */
if (!c->icy.IsDefined())
c->base.seekable = true;
} else if (StringEqualsCaseASCII(name, "content-length")) {
char buffer[64];
c.HeaderReceived(name, value, end);
return size;
}
if ((size_t)(end - header) >= sizeof(buffer))
return size;
inline size_t
CurlInputStream::DataReceived(const void *ptr, size_t size)
{
assert(size > 0);
memcpy(buffer, value, end - value);
buffer[end - value] = 0;
const ScopeLock protect(base.mutex);
c->base.size = c->base.offset + ParseUint64(buffer);
} else if (StringEqualsCaseASCII(name, "content-type")) {
c->base.mime.assign(value, end);
} else if (StringEqualsCaseASCII(name, "icy-name") ||
StringEqualsCaseASCII(name, "ice-name") ||
StringEqualsCaseASCII(name, "x-audiocast-name")) {
c->meta_name.assign(value, end);
delete c->tag;
TagBuilder tag_builder;
tag_builder.AddItem(TAG_NAME, c->meta_name.c_str());
c->tag = tag_builder.CommitNew();
} else if (StringEqualsCaseASCII(name, "icy-metaint")) {
char buffer[64];
size_t icy_metaint;
if ((size_t)(end - header) >= sizeof(buffer) ||
c->icy.IsDefined())
return size;
memcpy(buffer, value, end - value);
buffer[end - value] = 0;
icy_metaint = ParseUint64(buffer);
FormatDebug(curl_domain, "icy-metaint=%zu", icy_metaint);
if (icy_metaint > 0) {
c->icy.Start(icy_metaint);
/* a stream with icy-metadata is not
seekable */
c->base.seekable = false;
}
if (GetTotalBufferSize() + size >= CURL_MAX_BUFFERED) {
paused = true;
return CURL_WRITEFUNC_PAUSE;
}
buffers.emplace_back(ptr, size);
base.ready = true;
base.cond.broadcast();
return size;
}
@ -947,24 +1011,13 @@ input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
static size_t
input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
{
CurlInputStream *c = (CurlInputStream *)stream;
CurlInputStream &c = *(CurlInputStream *)stream;
size *= nmemb;
if (size == 0)
return 0;
const ScopeLock protect(c->base.mutex);
if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) {
c->paused = true;
return CURL_WRITEFUNC_PAUSE;
}
c->buffers.emplace_back(ptr, size);
c->base.ready = true;
c->base.cond.broadcast();
return size;
return c.DataReceived(ptr, size);
}
static bool
@ -1091,7 +1144,7 @@ input_curl_seek(InputStream *is, InputPlugin::offset_type offset,
c->base.mutex.unlock();
input_curl_easy_free_indirect(c);
c->FreeEasyIndirect();
c->buffers.clear();
is->offset = offset;