input/curl: move code to class CurlRequest
The CurlRequest gives us a more low-level API to CURL without the InputStream interface, integrated into our IOThread.
This commit is contained in:
@@ -22,6 +22,7 @@
|
||||
#include "lib/curl/Easy.hxx"
|
||||
#include "lib/curl/Global.hxx"
|
||||
#include "lib/curl/Request.hxx"
|
||||
#include "lib/curl/Handler.hxx"
|
||||
#include "../AsyncInputStream.hxx"
|
||||
#include "../IcyInputStream.hxx"
|
||||
#include "../InputPlugin.hxx"
|
||||
@@ -59,17 +60,13 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024;
|
||||
*/
|
||||
static const size_t CURL_RESUME_AT = 384 * 1024;
|
||||
|
||||
struct CurlInputStream final : public AsyncInputStream, CurlRequest {
|
||||
struct CurlInputStream final : public AsyncInputStream, CurlResponseHandler {
|
||||
/* some buffers which were passed to libcurl, which we have
|
||||
too free */
|
||||
char range[32];
|
||||
struct curl_slist *request_headers;
|
||||
|
||||
/** the curl handles */
|
||||
CurlEasy easy;
|
||||
|
||||
/** error message provided by libcurl */
|
||||
char error_buffer[CURL_ERROR_SIZE];
|
||||
CurlRequest *request = nullptr;
|
||||
|
||||
/** parser for icy-metadata */
|
||||
IcyInputStream *icy;
|
||||
@@ -79,7 +76,6 @@ struct CurlInputStream final : public AsyncInputStream, CurlRequest {
|
||||
CURL_MAX_BUFFERED,
|
||||
CURL_RESUME_AT),
|
||||
request_headers(nullptr),
|
||||
easy(nullptr),
|
||||
icy(new IcyInputStream(this)) {
|
||||
}
|
||||
|
||||
@@ -113,37 +109,18 @@ struct CurlInputStream final : public AsyncInputStream, CurlRequest {
|
||||
*/
|
||||
void SeekInternal(offset_type new_offset);
|
||||
|
||||
/**
|
||||
* Called when a new response begins. This is used to discard
|
||||
* headers from previous responses (for example authentication
|
||||
* and redirects).
|
||||
*/
|
||||
void ResponseBoundary();
|
||||
|
||||
void HeaderReceived(const char *name, std::string &&value);
|
||||
|
||||
size_t DataReceived(const void *ptr, size_t size);
|
||||
|
||||
/**
|
||||
* A HTTP request is finished.
|
||||
*
|
||||
* Runs in the I/O thread. The caller must not hold locks.
|
||||
*/
|
||||
void RequestDone(CURLcode result, long status);
|
||||
|
||||
/* virtual methods from CurlRequest */
|
||||
void Done(CURLcode result) override;
|
||||
/* virtual methods from CurlResponseHandler */
|
||||
void OnHeaders(unsigned status,
|
||||
std::multimap<std::string, std::string> &&headers) override;
|
||||
void OnData(ConstBuffer<void> data) override;
|
||||
void OnEnd() override;
|
||||
void OnError(std::exception_ptr e) override;
|
||||
|
||||
/* virtual methods from AsyncInputStream */
|
||||
virtual void DoResume() override;
|
||||
virtual void DoSeek(offset_type new_offset) override;
|
||||
};
|
||||
|
||||
/**
|
||||
* libcurl version number encoded in a 24 bit integer.
|
||||
*/
|
||||
static unsigned curl_version_num;
|
||||
|
||||
/** libcurl should accept "ICY 200 OK" */
|
||||
static struct curl_slist *http_200_aliases;
|
||||
|
||||
@@ -163,17 +140,7 @@ CurlInputStream::DoResume()
|
||||
assert(io_thread_inside());
|
||||
|
||||
mutex.unlock();
|
||||
|
||||
curl_easy_pause(easy.Get(), CURLPAUSE_CONT);
|
||||
|
||||
if (curl_version_num < 0x072000)
|
||||
/* libcurl older than 7.32.0 does not update
|
||||
its sockets after curl_easy_pause(); force
|
||||
libcurl to do it now */
|
||||
curl_global->ResumeSockets();
|
||||
|
||||
curl_global->InvalidateSockets();
|
||||
|
||||
request->Resume();
|
||||
mutex.lock();
|
||||
}
|
||||
|
||||
@@ -182,12 +149,11 @@ CurlInputStream::FreeEasy()
|
||||
{
|
||||
assert(io_thread_inside());
|
||||
|
||||
if (!easy)
|
||||
if (request == nullptr)
|
||||
return;
|
||||
|
||||
curl_global->Remove(this);
|
||||
|
||||
easy = nullptr;
|
||||
delete request;
|
||||
request = nullptr;
|
||||
|
||||
curl_slist_free_all(request_headers);
|
||||
request_headers = nullptr;
|
||||
@@ -200,44 +166,115 @@ CurlInputStream::FreeEasyIndirect()
|
||||
FreeEasy();
|
||||
curl_global->InvalidateSockets();
|
||||
});
|
||||
|
||||
assert(!easy);
|
||||
}
|
||||
|
||||
inline void
|
||||
CurlInputStream::RequestDone(CURLcode result, long status)
|
||||
void
|
||||
CurlInputStream::OnHeaders(unsigned status,
|
||||
std::multimap<std::string, std::string> &&headers)
|
||||
{
|
||||
assert(io_thread_inside());
|
||||
assert(!postponed_exception);
|
||||
|
||||
FreeEasy();
|
||||
AsyncInputStream::SetClosed();
|
||||
if (status < 200 || status >= 300)
|
||||
throw FormatRuntimeError("got HTTP status %ld", status);
|
||||
|
||||
const std::lock_guard<Mutex> protect(mutex);
|
||||
|
||||
if (result != CURLE_OK) {
|
||||
postponed_exception = std::make_exception_ptr(FormatRuntimeError("curl failed: %s",
|
||||
error_buffer));
|
||||
} else if (status < 200 || status >= 300) {
|
||||
postponed_exception = std::make_exception_ptr(FormatRuntimeError("got HTTP status %ld",
|
||||
status));
|
||||
if (IsSeekPending()) {
|
||||
/* don't update metadata while seeking */
|
||||
SeekDone();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!icy->IsEnabled() &&
|
||||
headers.find("accept-ranges") != headers.end())
|
||||
/* a stream with icy-metadata is not seekable */
|
||||
seekable = true;
|
||||
|
||||
auto i = headers.find("content-length");
|
||||
if (i != headers.end())
|
||||
size = offset + ParseUint64(i->second.c_str());
|
||||
|
||||
i = headers.find("content-type");
|
||||
if (i != headers.end())
|
||||
SetMimeType(std::move(i->second));
|
||||
|
||||
i = headers.find("icy-name");
|
||||
if (i == headers.end()) {
|
||||
i = headers.find("ice-name");
|
||||
if (i == headers.end())
|
||||
i = headers.find("x-audiocast-name");
|
||||
}
|
||||
|
||||
if (i != headers.end()) {
|
||||
TagBuilder tag_builder;
|
||||
tag_builder.AddItem(TAG_NAME, i->second.c_str());
|
||||
|
||||
SetTag(tag_builder.CommitNew());
|
||||
}
|
||||
|
||||
if (!icy->IsEnabled()) {
|
||||
i = headers.find("icy-metaint");
|
||||
|
||||
if (i != headers.end()) {
|
||||
size_t icy_metaint = ParseUint64(i->second.c_str());
|
||||
#ifndef WIN32
|
||||
/* Windows doesn't know "%z" */
|
||||
FormatDebug(curl_domain, "icy-metaint=%zu", icy_metaint);
|
||||
#endif
|
||||
|
||||
if (icy_metaint > 0) {
|
||||
icy->Enable(icy_metaint);
|
||||
|
||||
/* a stream with icy-metadata is not
|
||||
seekable */
|
||||
seekable = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SetReady();
|
||||
}
|
||||
|
||||
void
|
||||
CurlInputStream::OnData(ConstBuffer<void> data)
|
||||
{
|
||||
assert(data.size > 0);
|
||||
|
||||
const std::lock_guard<Mutex> protect(mutex);
|
||||
|
||||
if (IsSeekPending())
|
||||
SeekDone();
|
||||
|
||||
if (data.size > GetBufferSpace()) {
|
||||
AsyncInputStream::Pause();
|
||||
throw CurlRequest::Pause();
|
||||
}
|
||||
|
||||
AppendToBuffer(data.data, data.size);
|
||||
}
|
||||
|
||||
void
|
||||
CurlInputStream::OnEnd()
|
||||
{
|
||||
cond.broadcast();
|
||||
|
||||
AsyncInputStream::SetClosed();
|
||||
}
|
||||
|
||||
void
|
||||
CurlInputStream::OnError(std::exception_ptr e)
|
||||
{
|
||||
postponed_exception = std::move(e);
|
||||
|
||||
if (IsSeekPending())
|
||||
SeekDone();
|
||||
else if (!IsReady())
|
||||
SetReady();
|
||||
else
|
||||
cond.broadcast();
|
||||
}
|
||||
|
||||
void
|
||||
CurlInputStream::Done(CURLcode result)
|
||||
{
|
||||
long status = 0;
|
||||
curl_easy_getinfo(easy.Get(), CURLINFO_RESPONSE_CODE, &status);
|
||||
|
||||
RequestDone(result, status);
|
||||
AsyncInputStream::SetClosed();
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -258,8 +295,6 @@ input_curl_init(const ConfigBlock &block)
|
||||
if (version_info->features & CURL_VERSION_SSL)
|
||||
FormatDebug(curl_domain, "with %s",
|
||||
version_info->ssl_version);
|
||||
|
||||
curl_version_num = version_info->version_num;
|
||||
}
|
||||
|
||||
http_200_aliases = curl_slist_append(http_200_aliases, "ICY 200 OK");
|
||||
@@ -309,179 +344,37 @@ CurlInputStream::~CurlInputStream()
|
||||
FreeEasyIndirect();
|
||||
}
|
||||
|
||||
inline void
|
||||
CurlInputStream::ResponseBoundary()
|
||||
{
|
||||
/* undo all effects of HeaderReceived() because the previous
|
||||
response was not applicable for this stream */
|
||||
|
||||
if (IsSeekPending())
|
||||
/* don't update metadata while seeking */
|
||||
return;
|
||||
|
||||
seekable = false;
|
||||
size = UNKNOWN_SIZE;
|
||||
ClearMimeType();
|
||||
ClearTag();
|
||||
|
||||
// TODO: reset the IcyInputStream?
|
||||
}
|
||||
|
||||
inline void
|
||||
CurlInputStream::HeaderReceived(const char *name, std::string &&value)
|
||||
{
|
||||
if (IsSeekPending())
|
||||
/* don't update metadata while seeking */
|
||||
return;
|
||||
|
||||
if (StringEqualsCaseASCII(name, "accept-ranges")) {
|
||||
/* a stream with icy-metadata is not seekable */
|
||||
if (!icy->IsEnabled())
|
||||
seekable = true;
|
||||
} else if (StringEqualsCaseASCII(name, "content-length")) {
|
||||
size = offset + ParseUint64(value.c_str());
|
||||
} else if (StringEqualsCaseASCII(name, "content-type")) {
|
||||
SetMimeType(std::move(value));
|
||||
} else if (StringEqualsCaseASCII(name, "icy-name") ||
|
||||
StringEqualsCaseASCII(name, "ice-name") ||
|
||||
StringEqualsCaseASCII(name, "x-audiocast-name")) {
|
||||
TagBuilder tag_builder;
|
||||
tag_builder.AddItem(TAG_NAME, value.c_str());
|
||||
|
||||
SetTag(tag_builder.CommitNew());
|
||||
} else if (StringEqualsCaseASCII(name, "icy-metaint")) {
|
||||
if (icy->IsEnabled())
|
||||
return;
|
||||
|
||||
size_t icy_metaint = ParseUint64(value.c_str());
|
||||
#ifndef WIN32
|
||||
/* Windows doesn't know "%z" */
|
||||
FormatDebug(curl_domain, "icy-metaint=%zu", icy_metaint);
|
||||
#endif
|
||||
|
||||
if (icy_metaint > 0) {
|
||||
icy->Enable(icy_metaint);
|
||||
|
||||
/* a stream with icy-metadata is not
|
||||
seekable */
|
||||
seekable = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** called by curl when new data is available */
|
||||
static size_t
|
||||
input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
|
||||
{
|
||||
CurlInputStream &c = *(CurlInputStream *)stream;
|
||||
|
||||
size *= nmemb;
|
||||
|
||||
const char *header = (const char *)ptr;
|
||||
if (size > 5 && memcmp(header, "HTTP/", 5) == 0) {
|
||||
c.ResponseBoundary();
|
||||
return size;
|
||||
}
|
||||
|
||||
const char *end = header + size;
|
||||
|
||||
char name[64];
|
||||
|
||||
const char *value = (const char *)memchr(header, ':', size);
|
||||
if (value == nullptr || (size_t)(value - header) >= sizeof(name))
|
||||
return size;
|
||||
|
||||
memcpy(name, header, value - header);
|
||||
name[value - header] = 0;
|
||||
|
||||
/* skip the colon */
|
||||
|
||||
++value;
|
||||
|
||||
/* strip the value */
|
||||
|
||||
value = StripLeft(value, end);
|
||||
end = StripRight(value, end);
|
||||
|
||||
c.HeaderReceived(name, std::string(value, end));
|
||||
return size;
|
||||
}
|
||||
|
||||
inline size_t
|
||||
CurlInputStream::DataReceived(const void *ptr, size_t received_size)
|
||||
{
|
||||
assert(received_size > 0);
|
||||
|
||||
const std::lock_guard<Mutex> protect(mutex);
|
||||
|
||||
if (IsSeekPending())
|
||||
SeekDone();
|
||||
|
||||
if (received_size > GetBufferSpace()) {
|
||||
AsyncInputStream::Pause();
|
||||
return CURL_WRITEFUNC_PAUSE;
|
||||
}
|
||||
|
||||
AppendToBuffer(ptr, received_size);
|
||||
return received_size;
|
||||
}
|
||||
|
||||
/** called by curl when new data is available */
|
||||
static size_t
|
||||
input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
|
||||
{
|
||||
CurlInputStream &c = *(CurlInputStream *)stream;
|
||||
|
||||
size *= nmemb;
|
||||
if (size == 0)
|
||||
return 0;
|
||||
|
||||
return c.DataReceived(ptr, size);
|
||||
}
|
||||
|
||||
void
|
||||
CurlInputStream::InitEasy()
|
||||
{
|
||||
easy = CurlEasy();
|
||||
request = new CurlRequest(*curl_global, GetURI(), *this);
|
||||
|
||||
easy.SetOption(CURLOPT_USERAGENT, "Music Player Daemon " VERSION);
|
||||
easy.SetOption(CURLOPT_HEADERFUNCTION, input_curl_headerfunction);
|
||||
easy.SetOption(CURLOPT_WRITEHEADER, this);
|
||||
easy.SetOption(CURLOPT_WRITEFUNCTION, input_curl_writefunction);
|
||||
easy.SetOption(CURLOPT_WRITEDATA, this);
|
||||
easy.SetOption(CURLOPT_HTTP200ALIASES, http_200_aliases);
|
||||
easy.SetOption(CURLOPT_FOLLOWLOCATION, 1l);
|
||||
easy.SetOption(CURLOPT_NETRC, 1l);
|
||||
easy.SetOption(CURLOPT_MAXREDIRS, 5l);
|
||||
easy.SetOption(CURLOPT_FAILONERROR, 1l);
|
||||
easy.SetOption(CURLOPT_ERRORBUFFER, error_buffer);
|
||||
easy.SetOption(CURLOPT_NOPROGRESS, 1l);
|
||||
easy.SetOption(CURLOPT_NOSIGNAL, 1l);
|
||||
easy.SetOption(CURLOPT_CONNECTTIMEOUT, 10l);
|
||||
request->SetOption(CURLOPT_HTTP200ALIASES, http_200_aliases);
|
||||
request->SetOption(CURLOPT_FOLLOWLOCATION, 1l);
|
||||
request->SetOption(CURLOPT_MAXREDIRS, 5l);
|
||||
request->SetOption(CURLOPT_FAILONERROR, 1l);
|
||||
|
||||
if (proxy != nullptr)
|
||||
easy.SetOption(CURLOPT_PROXY, proxy);
|
||||
request->SetOption(CURLOPT_PROXY, proxy);
|
||||
|
||||
if (proxy_port > 0)
|
||||
easy.SetOption(CURLOPT_PROXYPORT, (long)proxy_port);
|
||||
request->SetOption(CURLOPT_PROXYPORT, (long)proxy_port);
|
||||
|
||||
if (proxy_user != nullptr && proxy_password != nullptr) {
|
||||
char proxy_auth_str[1024];
|
||||
snprintf(proxy_auth_str, sizeof(proxy_auth_str),
|
||||
"%s:%s",
|
||||
proxy_user, proxy_password);
|
||||
easy.SetOption(CURLOPT_PROXYUSERPWD, proxy_auth_str);
|
||||
request->SetOption(CURLOPT_PROXYUSERPWD, proxy_auth_str);
|
||||
}
|
||||
|
||||
easy.SetOption(CURLOPT_SSL_VERIFYPEER, verify_peer ? 1l : 0l);
|
||||
easy.SetOption(CURLOPT_SSL_VERIFYHOST, verify_host ? 2l : 0l);
|
||||
|
||||
easy.SetOption(CURLOPT_URL, GetURI());
|
||||
request->SetOption(CURLOPT_SSL_VERIFYPEER, verify_peer ? 1l : 0l);
|
||||
request->SetOption(CURLOPT_SSL_VERIFYHOST, verify_host ? 2l : 0l);
|
||||
|
||||
request_headers = nullptr;
|
||||
request_headers = curl_slist_append(request_headers,
|
||||
"Icy-Metadata: 1");
|
||||
easy.SetOption(CURLOPT_HTTPHEADER, request_headers);
|
||||
"Icy-Metadata: 1");
|
||||
request->SetOption(CURLOPT_HTTPHEADER, request_headers);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -504,10 +397,8 @@ CurlInputStream::SeekInternal(offset_type new_offset)
|
||||
|
||||
if (offset > 0) {
|
||||
sprintf(range, "%lld-", (long long)offset);
|
||||
easy.SetOption(CURLOPT_RANGE, range);
|
||||
request->SetOption(CURLOPT_RANGE, range);
|
||||
}
|
||||
|
||||
curl_global->Add(easy.Get(), *this);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -528,9 +419,8 @@ CurlInputStream::Open(const char *url, Mutex &mutex, Cond &cond)
|
||||
CurlInputStream *c = new CurlInputStream(url, mutex, cond);
|
||||
|
||||
try {
|
||||
c->InitEasy();
|
||||
BlockingCall(io_thread_get(), [c](){
|
||||
curl_global->Add(c->easy.Get(), *c);
|
||||
c->InitEasy();
|
||||
});
|
||||
} catch (...) {
|
||||
delete c;
|
||||
|
Reference in New Issue
Block a user