From 23eacbd1326268777e080baa4fcbfbe54652156d Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Sat, 15 Mar 2014 20:40:43 +0100 Subject: [PATCH] input/curl: move code to CurlInputStream methods --- src/input/plugins/CurlInputPlugin.cxx | 411 +++++++++++++++----------- 1 file changed, 232 insertions(+), 179 deletions(-) diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx index bc4b0e5b6..a339353cd 100644 --- a/src/input/plugins/CurlInputPlugin.cxx +++ b/src/input/plugins/CurlInputPlugin.cxx @@ -177,6 +177,63 @@ struct CurlInputStream { CurlInputStream(const CurlInputStream &) = delete; CurlInputStream &operator=(const CurlInputStream &) = delete; + + bool Check(Error &error); + + bool IsEOF() const { + return easy == nullptr && buffers.empty(); + } + + Tag *ReadTag(); + + bool IsAvailable() const { + return postponed_error.IsDefined() || easy == nullptr || + !buffers.empty(); + } + + size_t Read(void *ptr, size_t size, Error &error); + + /** + * Frees the current "libcurl easy" handle, and everything + * associated with it. + * + * Runs in the I/O thread. + */ + void FreeEasy(); + + /** + * Frees the current "libcurl easy" handle, and everything associated + * with it. + * + * The mutex must not be locked. + */ + void FreeEasyIndirect(); + + void HeaderReceived(const char *name, + const char *value, const char *end); + + size_t DataReceived(const void *ptr, size_t size); + + void Resume(); + bool FillBuffer(Error &error); + + /** + * Determine the total sizes of all buffers, including + * portions that have already been consumed. + * + * The caller must lock the mutex. + */ + gcc_pure + size_t GetTotalBufferSize() const; + + void CopyIcyTag(); + + /** + * A HTTP request is finished. + * + * Runs in the I/O thread. The caller must not hold locks. + */ + void RequestDone(CURLcode result, long status); }; class CurlMulti; @@ -335,14 +392,14 @@ input_curl_find_request(CURL *easy) return (CurlInputStream *)p; } -static void -input_curl_resume(CurlInputStream *c) +inline void +CurlInputStream::Resume() { assert(io_thread_inside()); - if (c->paused) { - c->paused = false; - curl_easy_pause(c->easy, CURLPAUSE_CONT); + if (paused) { + paused = false; + curl_easy_pause(easy, CURLPAUSE_CONT); if (curl_version_num < 0x072000) /* libcurl older than 7.32.0 does not update @@ -445,74 +502,55 @@ CurlMulti::Remove(CurlInputStream *c) curl_multi_remove_handle(multi, c->easy); } -/** - * Frees the current "libcurl easy" handle, and everything associated - * with it. - * - * Runs in the I/O thread. - */ -static void -input_curl_easy_free(CurlInputStream *c) +void +CurlInputStream::FreeEasy() { assert(io_thread_inside()); - assert(c != nullptr); - if (c->easy == nullptr) + if (easy == nullptr) return; - curl_multi->Remove(c); + curl_multi->Remove(this); - curl_easy_cleanup(c->easy); - c->easy = nullptr; + curl_easy_cleanup(easy); + easy = nullptr; - curl_slist_free_all(c->request_headers); - c->request_headers = nullptr; + curl_slist_free_all(request_headers); + request_headers = nullptr; } -/** - * Frees the current "libcurl easy" handle, and everything associated - * with it. - * - * The mutex must not be locked. - */ -static void -input_curl_easy_free_indirect(CurlInputStream *c) +void +CurlInputStream::FreeEasyIndirect() { - BlockingCall(io_thread_get(), [c](){ - input_curl_easy_free(c); + BlockingCall(io_thread_get(), [this](){ + FreeEasy(); curl_multi->InvalidateSockets(); }); - assert(c->easy == nullptr); + assert(easy == nullptr); } -/** - * A HTTP request is finished. - * - * Runs in the I/O thread. The caller must not hold locks. - */ -static void -input_curl_request_done(CurlInputStream *c, CURLcode result, long status) +inline void +CurlInputStream::RequestDone(CURLcode result, long status) { assert(io_thread_inside()); - assert(c != nullptr); - assert(c->easy == nullptr); - assert(!c->postponed_error.IsDefined()); + assert(!postponed_error.IsDefined()); - const ScopeLock protect(c->base.mutex); + FreeEasy(); + + const ScopeLock protect(base.mutex); if (result != CURLE_OK) { - c->postponed_error.Format(curl_domain, result, - "curl failed: %s", c->error_buffer); + postponed_error.Format(curl_domain, result, + "curl failed: %s", error_buffer); } else if (status < 200 || status >= 300) { - c->postponed_error.Format(http_domain, status, - "got HTTP status %ld", - status); + postponed_error.Format(http_domain, status, + "got HTTP status %ld", + status); } - c->base.ready = true; - - c->base.cond.broadcast(); + base.ready = true; + base.cond.broadcast(); } static void @@ -524,8 +562,7 @@ input_curl_handle_done(CURL *easy_handle, CURLcode result) long status = 0; curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &status); - input_curl_easy_free(c); - input_curl_request_done(c, result, status); + c->RequestDone(result, status); } void @@ -656,19 +693,11 @@ input_curl_finish(void) curl_global_cleanup(); } -/** - * Determine the total sizes of all buffers, including portions that - * have already been consumed. - * - * The caller must lock the mutex. - */ -gcc_pure -static size_t -curl_total_buffer_size(const CurlInputStream *c) +size_t +CurlInputStream::GetTotalBufferSize() const { size_t total = 0; - - for (const auto &i : c->buffers) + for (const auto &i : buffers) total += i.TotalSize(); return total; @@ -678,46 +707,56 @@ CurlInputStream::~CurlInputStream() { delete tag; - input_curl_easy_free_indirect(this); + FreeEasyIndirect(); } -static bool -input_curl_check(InputStream *is, Error &error) +inline bool +CurlInputStream::Check(Error &error) { - CurlInputStream *c = (CurlInputStream *)is; - - bool success = !c->postponed_error.IsDefined(); + bool success = !postponed_error.IsDefined(); if (!success) { - error = std::move(c->postponed_error); - c->postponed_error.Clear(); + error = std::move(postponed_error); + postponed_error.Clear(); } return success; } +static bool +input_curl_check(InputStream *is, Error &error) +{ + CurlInputStream &c = *(CurlInputStream *)is; + return c.Check(error); +} + +inline Tag * +CurlInputStream::ReadTag() +{ + Tag *result = tag; + tag = nullptr; + return result; +} + static Tag * input_curl_tag(InputStream *is) { - CurlInputStream *c = (CurlInputStream *)is; - Tag *tag = c->tag; - - c->tag = nullptr; - return tag; + CurlInputStream &c = *(CurlInputStream *)is; + return c.ReadTag(); } -static bool -fill_buffer(CurlInputStream *c, Error &error) +inline bool +CurlInputStream::FillBuffer(Error &error) { - while (c->easy != nullptr && c->buffers.empty()) - c->base.cond.wait(c->base.mutex); + while (easy != nullptr && buffers.empty()) + base.cond.wait(base.mutex); - if (c->postponed_error.IsDefined()) { - error = std::move(c->postponed_error); - c->postponed_error.Clear(); + if (postponed_error.IsDefined()) { + error = std::move(postponed_error); + postponed_error.Clear(); return false; } - return !c->buffers.empty(); + return !buffers.empty(); } static size_t @@ -770,54 +809,47 @@ read_from_buffer(IcyMetaDataParser &icy, std::list &buffers, return nbytes; } -static void -copy_icy_tag(CurlInputStream *c) +inline void +CurlInputStream::CopyIcyTag() { - Tag *tag = c->icy.ReadTag(); - - if (tag == nullptr) + Tag *new_tag = icy.ReadTag(); + if (new_tag == nullptr) return; - delete c->tag; + delete tag; - if (!c->meta_name.empty() && !tag->HasType(TAG_NAME)) { - TagBuilder tag_builder(std::move(*tag)); - tag_builder.AddItem(TAG_NAME, c->meta_name.c_str()); - *tag = tag_builder.Commit(); + if (!meta_name.empty() && !new_tag->HasType(TAG_NAME)) { + TagBuilder tag_builder(std::move(*new_tag)); + tag_builder.AddItem(TAG_NAME, meta_name.c_str()); + *new_tag = tag_builder.Commit(); } - c->tag = tag; + tag = new_tag; } static bool input_curl_available(InputStream *is) { - CurlInputStream *c = (CurlInputStream *)is; - - return c->postponed_error.IsDefined() || c->easy == nullptr || - !c->buffers.empty(); + const CurlInputStream &c = *(const CurlInputStream *)is; + return c.IsAvailable(); } -static size_t -input_curl_read(InputStream *is, void *ptr, size_t size, - Error &error) +inline size_t +CurlInputStream::Read(void *ptr, size_t size, Error &error) { - CurlInputStream *c = (CurlInputStream *)is; - bool success; size_t nbytes = 0; char *dest = (char *)ptr; do { /* fill the buffer */ - success = fill_buffer(c, error); - if (!success) + if (!FillBuffer(error)) return 0; /* send buffer contents */ - while (size > 0 && !c->buffers.empty()) { - size_t copy = read_from_buffer(c->icy, c->buffers, + while (size > 0 && !buffers.empty()) { + size_t copy = read_from_buffer(icy, buffers, dest + nbytes, size); nbytes += copy; @@ -825,24 +857,32 @@ input_curl_read(InputStream *is, void *ptr, size_t size, } } while (nbytes == 0); - if (c->icy.IsDefined()) - copy_icy_tag(c); + if (icy.IsDefined()) + CopyIcyTag(); - is->offset += (InputPlugin::offset_type)nbytes; + base.offset += (InputPlugin::offset_type)nbytes; - if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) { - c->base.mutex.unlock(); + if (paused && GetTotalBufferSize() < CURL_RESUME_AT) { + base.mutex.unlock(); - BlockingCall(io_thread_get(), [c](){ - input_curl_resume(c); + BlockingCall(io_thread_get(), [this](){ + Resume(); }); - c->base.mutex.lock(); + base.mutex.lock(); } return nbytes; } +static size_t +input_curl_read(InputStream *is, void *ptr, size_t size, + Error &error) +{ + CurlInputStream &c = *(CurlInputStream *)is; + return c.Read(ptr, size, error); +} + static void input_curl_close(InputStream *is) { @@ -854,23 +894,78 @@ input_curl_close(InputStream *is) static bool input_curl_eof(gcc_unused InputStream *is) { - CurlInputStream *c = (CurlInputStream *)is; + const CurlInputStream &c = *(const CurlInputStream *)is; + return c.IsEOF(); +} - return c->easy == nullptr && c->buffers.empty(); +inline void +CurlInputStream::HeaderReceived(const char *name, + const char *value, const char *end) +{ + if (StringEqualsCaseASCII(name, "accept-ranges")) { + /* a stream with icy-metadata is not seekable */ + if (!icy.IsDefined()) + base.seekable = true; + } else if (StringEqualsCaseASCII(name, "content-length")) { + char buffer[64]; + + if ((size_t)(end - value) >= sizeof(buffer)) + return; + + memcpy(buffer, value, end - value); + buffer[end - value] = 0; + + base.size = base.offset + ParseUint64(buffer); + } else if (StringEqualsCaseASCII(name, "content-type")) { + base.mime.assign(value, end); + } else if (StringEqualsCaseASCII(name, "icy-name") || + StringEqualsCaseASCII(name, "ice-name") || + StringEqualsCaseASCII(name, "x-audiocast-name")) { + meta_name.assign(value, end); + + delete tag; + + TagBuilder tag_builder; + tag_builder.AddItem(TAG_NAME, meta_name.c_str()); + + tag = tag_builder.CommitNew(); + } else if (StringEqualsCaseASCII(name, "icy-metaint")) { + char buffer[64]; + size_t icy_metaint; + + if ((size_t)(end - value) >= sizeof(buffer) || + icy.IsDefined()) + return; + + memcpy(buffer, value, end - value); + buffer[end - value] = 0; + + icy_metaint = ParseUint64(buffer); + FormatDebug(curl_domain, "icy-metaint=%zu", icy_metaint); + + if (icy_metaint > 0) { + icy.Start(icy_metaint); + + /* a stream with icy-metadata is not + seekable */ + base.seekable = false; + } + } } /** called by curl when new data is available */ static size_t input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream) { - CurlInputStream *c = (CurlInputStream *)stream; - char name[64]; + CurlInputStream &c = *(CurlInputStream *)stream; size *= nmemb; const char *header = (const char *)ptr; const char *end = header + size; + char name[64]; + const char *value = (const char *)memchr(header, ':', size); if (value == nullptr || (size_t)(value - header) >= sizeof(name)) return size; @@ -890,56 +985,25 @@ input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream) while (end > value && IsWhitespaceOrNull(end[-1])) --end; - if (StringEqualsCaseASCII(name, "accept-ranges")) { - /* a stream with icy-metadata is not seekable */ - if (!c->icy.IsDefined()) - c->base.seekable = true; - } else if (StringEqualsCaseASCII(name, "content-length")) { - char buffer[64]; + c.HeaderReceived(name, value, end); + return size; +} - if ((size_t)(end - header) >= sizeof(buffer)) - return size; +inline size_t +CurlInputStream::DataReceived(const void *ptr, size_t size) +{ + assert(size > 0); - memcpy(buffer, value, end - value); - buffer[end - value] = 0; + const ScopeLock protect(base.mutex); - c->base.size = c->base.offset + ParseUint64(buffer); - } else if (StringEqualsCaseASCII(name, "content-type")) { - c->base.mime.assign(value, end); - } else if (StringEqualsCaseASCII(name, "icy-name") || - StringEqualsCaseASCII(name, "ice-name") || - StringEqualsCaseASCII(name, "x-audiocast-name")) { - c->meta_name.assign(value, end); - - delete c->tag; - - TagBuilder tag_builder; - tag_builder.AddItem(TAG_NAME, c->meta_name.c_str()); - - c->tag = tag_builder.CommitNew(); - } else if (StringEqualsCaseASCII(name, "icy-metaint")) { - char buffer[64]; - size_t icy_metaint; - - if ((size_t)(end - header) >= sizeof(buffer) || - c->icy.IsDefined()) - return size; - - memcpy(buffer, value, end - value); - buffer[end - value] = 0; - - icy_metaint = ParseUint64(buffer); - FormatDebug(curl_domain, "icy-metaint=%zu", icy_metaint); - - if (icy_metaint > 0) { - c->icy.Start(icy_metaint); - - /* a stream with icy-metadata is not - seekable */ - c->base.seekable = false; - } + if (GetTotalBufferSize() + size >= CURL_MAX_BUFFERED) { + paused = true; + return CURL_WRITEFUNC_PAUSE; } + buffers.emplace_back(ptr, size); + base.ready = true; + base.cond.broadcast(); return size; } @@ -947,24 +1011,13 @@ input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream) static size_t input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) { - CurlInputStream *c = (CurlInputStream *)stream; + CurlInputStream &c = *(CurlInputStream *)stream; size *= nmemb; if (size == 0) return 0; - const ScopeLock protect(c->base.mutex); - - if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { - c->paused = true; - return CURL_WRITEFUNC_PAUSE; - } - - c->buffers.emplace_back(ptr, size); - c->base.ready = true; - - c->base.cond.broadcast(); - return size; + return c.DataReceived(ptr, size); } static bool @@ -1091,7 +1144,7 @@ input_curl_seek(InputStream *is, InputPlugin::offset_type offset, c->base.mutex.unlock(); - input_curl_easy_free_indirect(c); + c->FreeEasyIndirect(); c->buffers.clear(); is->offset = offset;