input/curl: move code to AsyncInputStream

New base class for other InputStream implementations that run in the
I/O thread.
This commit is contained in:
Max Kellermann 2014-05-02 22:31:02 +02:00
parent 6c4438d8a9
commit fbafb19657
4 changed files with 410 additions and 211 deletions

View File

@ -1032,6 +1032,7 @@ libinput_a_SOURCES = \
src/input/InputPlugin.hxx \
src/input/TextInputStream.cxx src/input/TextInputStream.hxx \
src/input/ThreadInputStream.cxx src/input/ThreadInputStream.hxx \
src/input/AsyncInputStream.cxx src/input/AsyncInputStream.hxx \
src/input/ProxyInputStream.cxx src/input/ProxyInputStream.hxx \
src/input/plugins/RewindInputPlugin.cxx src/input/plugins/RewindInputPlugin.hxx \
src/input/plugins/FileInputPlugin.cxx src/input/plugins/FileInputPlugin.hxx

View File

@ -0,0 +1,239 @@
/*
* Copyright (C) 2003-2014 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "config.h"
#include "AsyncInputStream.hxx"
#include "tag/Tag.hxx"
#include "event/Call.hxx"
#include "thread/Cond.hxx"
#include "IOThread.hxx"
#include "util/HugeAllocator.hxx"
#include <assert.h>
#include <string.h>
AsyncInputStream::AsyncInputStream(const char *_url,
Mutex &_mutex, Cond &_cond,
void *_buffer, size_t _buffer_size,
size_t _resume_at)
:InputStream(_url, _mutex, _cond), DeferredMonitor(io_thread_get()),
buffer((uint8_t *)_buffer, _buffer_size),
resume_at(_resume_at),
open(true),
paused(false),
seek_state(SeekState::NONE),
tag(nullptr) {}
AsyncInputStream::~AsyncInputStream()
{
delete tag;
buffer.Clear();
HugeFree(buffer.Write().data, buffer.GetCapacity());
}
void
AsyncInputStream::SetTag(Tag *_tag)
{
delete tag;
tag = _tag;
}
void
AsyncInputStream::Pause()
{
assert(io_thread_inside());
paused = true;
}
inline void
AsyncInputStream::Resume()
{
assert(io_thread_inside());
if (paused) {
paused = false;
DoResume();
}
}
bool
AsyncInputStream::Check(Error &error)
{
bool success = !postponed_error.IsDefined();
if (!success) {
error = std::move(postponed_error);
postponed_error.Clear();
}
return success;
}
bool
AsyncInputStream::IsEOF()
{
return !open && buffer.IsEmpty();
}
bool
AsyncInputStream::Seek(offset_type new_offset, Error &error)
{
assert(IsReady());
assert(seek_state == SeekState::NONE);
if (new_offset == offset)
/* no-op */
return true;
if (!IsSeekable())
return false;
if (new_offset < 0)
return false;
/* check if we can fast-forward the buffer */
while (new_offset > offset) {
auto r = buffer.Read();
if (r.IsEmpty())
break;
const size_t nbytes =
new_offset - offset < (offset_type)r.size
? new_offset - offset
: r.size;
buffer.Consume(nbytes);
offset += nbytes;
}
if (new_offset == offset)
return true;
/* no: ask the implementation to seek */
seek_offset = new_offset;
seek_state = SeekState::SCHEDULED;
DeferredMonitor::Schedule();
while (seek_state != SeekState::NONE)
cond.wait(mutex);
if (!Check(error))
return false;
return true;
}
void
AsyncInputStream::SeekDone()
{
assert(io_thread_inside());
assert(IsSeekPending());
seek_state = SeekState::NONE;
cond.broadcast();
}
Tag *
AsyncInputStream::ReadTag()
{
Tag *result = tag;
tag = nullptr;
return result;
}
bool
AsyncInputStream::IsAvailable()
{
return postponed_error.IsDefined() || !open ||
!buffer.IsEmpty();
}
size_t
AsyncInputStream::Read(void *ptr, size_t read_size, Error &error)
{
assert(!io_thread_inside());
/* wait for data */
CircularBuffer<uint8_t>::Range r;
while (true) {
if (!Check(error))
return 0;
r = buffer.Read();
if (!r.IsEmpty() || !open)
break;
cond.wait(mutex);
}
const size_t nbytes = std::min(read_size, r.size);
memcpy(ptr, r.data, nbytes);
buffer.Consume(nbytes);
offset += (offset_type)nbytes;
if (paused && buffer.GetSize() < resume_at)
DeferredMonitor::Schedule();
return nbytes;
}
void
AsyncInputStream::AppendToBuffer(const void *data, size_t append_size)
{
auto w = buffer.Write();
assert(!w.IsEmpty());
size_t nbytes = std::min(w.size, append_size);
memcpy(w.data, data, nbytes);
buffer.Append(nbytes);
const size_t remaining = append_size - nbytes;
if (remaining > 0) {
w = buffer.Write();
assert(!w.IsEmpty());
assert(w.size >= remaining);
memcpy(w.data, (const uint8_t *)data + nbytes, remaining);
buffer.Append(remaining);
}
if (!IsReady())
SetReady();
else
cond.broadcast();
}
void
AsyncInputStream::RunDeferred()
{
const ScopeLock protect(mutex);
Resume();
if (seek_state == SeekState::SCHEDULED) {
seek_state = SeekState::PENDING;
buffer.Clear();
DoSeek(seek_offset);
}
}

View File

@ -0,0 +1,122 @@
/*
* Copyright (C) 2003-2014 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef MPD_ASYNC_INPUT_STREAM_HXX
#define MPD_ASYNC_INPUT_STREAM_HXX
#include "InputStream.hxx"
#include "event/DeferredMonitor.hxx"
#include "util/CircularBuffer.hxx"
#include "util/Error.hxx"
/**
* Helper class for moving asynchronous (non-blocking) InputStream
* implementations to the I/O thread. Data is being read into a ring
* buffer, and that buffer is then consumed by another thread using
* the regular #InputStream API.
*/
class AsyncInputStream : public InputStream, private DeferredMonitor {
enum class SeekState : uint8_t {
NONE, SCHEDULED, PENDING
};
CircularBuffer<uint8_t> buffer;
const size_t resume_at;
bool open;
/**
* Is the connection currently paused? That happens when the
* buffer was getting too large. It will be unpaused when the
* buffer is below the threshold again.
*/
bool paused;
SeekState seek_state;
/**
* The #Tag object ready to be requested via
* InputStream::ReadTag().
*/
Tag *tag;
offset_type seek_offset;
protected:
Error postponed_error;
public:
AsyncInputStream(const char *_url,
Mutex &_mutex, Cond &_cond,
void *_buffer, size_t _buffer_size,
size_t _resume_at);
virtual ~AsyncInputStream();
/* virtual methods from InputStream */
bool Check(Error &error) final;
bool IsEOF() final;
bool Seek(offset_type new_offset, Error &error) final;
Tag *ReadTag() final;
bool IsAvailable() final;
size_t Read(void *ptr, size_t read_size, Error &error) final;
protected:
void SetTag(Tag *_tag);
void Pause();
void SetClosed() {
open = false;
}
bool IsBufferEmpty() const {
return buffer.IsEmpty();
}
gcc_pure
size_t GetBufferSpace() const {
return buffer.GetSpace();
}
void AppendToBuffer(const void *data, size_t append_size);
virtual void DoResume() = 0;
/**
* The actual Seek() implementation. This virtual method will
* be called from within the I/O thread. When the operation
* is finished, call SeekDone() to notify the caller.
*/
virtual void DoSeek(offset_type new_offset) = 0;
bool IsSeekPending() const {
return seek_state == SeekState::PENDING;
}
void SeekDone();
private:
void Resume();
/* virtual methods from DeferredMonitor */
void RunDeferred() final;
};
#endif

View File

@ -19,8 +19,8 @@
#include "config.h"
#include "CurlInputPlugin.hxx"
#include "../AsyncInputStream.hxx"
#include "../IcyInputStream.hxx"
#include "../InputStream.hxx"
#include "../InputPlugin.hxx"
#include "config/ConfigGlobal.hxx"
#include "config/ConfigData.hxx"
@ -60,7 +60,7 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024;
*/
static const size_t CURL_RESUME_AT = 384 * 1024;
struct CurlInputStream final : public InputStream {
struct CurlInputStream final : public AsyncInputStream {
/* some buffers which were passed to libcurl, which we have
too free */
char range[32];
@ -69,39 +69,19 @@ struct CurlInputStream final : public InputStream {
/** the curl handles */
CURL *easy;
/**
* A buffer where input_curl_writefunction() appends
* to, and input_curl_read() reads from.
*/
CircularBuffer<uint8_t> buffer;
/**
* Is the connection currently paused? That happens when the
* buffer was getting too large. It will be unpaused when the
* buffer is below the threshold again.
*/
bool paused;
/** error message provided by libcurl */
char error_buffer[CURL_ERROR_SIZE];
/** parser for icy-metadata */
IcyInputStream *icy;
/** the tag object ready to be requested via
InputStream::ReadTag() */
Tag *tag;
Error postponed_error;
CurlInputStream(const char *_url, Mutex &_mutex, Cond &_cond,
void *_buffer)
:InputStream(_url, _mutex, _cond),
:AsyncInputStream(_url, _mutex, _cond,
_buffer, CURL_MAX_BUFFERED,
CURL_RESUME_AT),
request_headers(nullptr),
buffer((uint8_t *)_buffer, CURL_MAX_BUFFERED),
paused(false),
icy(new IcyInputStream(this)),
tag(nullptr) {}
icy(new IcyInputStream(this)) {}
~CurlInputStream();
@ -133,19 +113,6 @@ struct CurlInputStream final : public InputStream {
size_t DataReceived(const void *ptr, size_t size);
void Resume();
bool FillBuffer(Error &error);
/**
* Returns the number of bytes stored in the buffer.
*
* The caller must lock the mutex.
*/
gcc_pure
size_t GetTotalBufferSize() const {
return buffer.GetSize();
}
/**
* A HTTP request is finished.
*
@ -153,22 +120,9 @@ struct CurlInputStream final : public InputStream {
*/
void RequestDone(CURLcode result, long status);
/* virtual methods from InputStream */
bool Check(Error &error) override;
bool IsEOF() override {
return easy == nullptr && buffer.IsEmpty();
}
Tag *ReadTag() override;
bool IsAvailable() override {
return postponed_error.IsDefined() || easy == nullptr ||
!buffer.IsEmpty();
}
size_t Read(void *ptr, size_t size, Error &error) override;
bool Seek(offset_type offset, Error &error) override;
/* virtual methods from AsyncInputStream */
virtual void DoResume() override;
virtual void DoSeek(offset_type new_offset) override;
};
class CurlMulti;
@ -327,23 +281,24 @@ input_curl_find_request(CURL *easy)
return (CurlInputStream *)p;
}
inline void
CurlInputStream::Resume()
void
CurlInputStream::DoResume()
{
assert(io_thread_inside());
if (paused) {
paused = false;
curl_easy_pause(easy, CURLPAUSE_CONT);
mutex.unlock();
if (curl_version_num < 0x072000)
/* libcurl older than 7.32.0 does not update
its sockets after curl_easy_pause(); force
libcurl to do it now */
curl_multi->ResumeSockets();
curl_easy_pause(easy, CURLPAUSE_CONT);
curl_multi->InvalidateSockets();
}
if (curl_version_num < 0x072000)
/* libcurl older than 7.32.0 does not update
its sockets after curl_easy_pause(); force
libcurl to do it now */
curl_multi->ResumeSockets();
curl_multi->InvalidateSockets();
mutex.lock();
}
int
@ -472,6 +427,7 @@ CurlInputStream::RequestDone(CURLcode result, long status)
assert(!postponed_error.IsDefined());
FreeEasy();
AsyncInputStream::SetClosed();
const ScopeLock protect(mutex);
@ -484,7 +440,9 @@ CurlInputStream::RequestDone(CURLcode result, long status)
status);
}
if (!IsReady())
if (IsSeekPending())
SeekDone();
else if (!IsReady())
SetReady();
}
@ -630,81 +588,16 @@ input_curl_finish(void)
CurlInputStream::~CurlInputStream()
{
delete tag;
FreeEasyIndirect();
buffer.Clear();
HugeFree(buffer.Write().data, CURL_MAX_BUFFERED);
}
inline bool
CurlInputStream::Check(Error &error)
{
bool success = !postponed_error.IsDefined();
if (!success) {
error = std::move(postponed_error);
postponed_error.Clear();
}
return success;
}
Tag *
CurlInputStream::ReadTag()
{
Tag *result = tag;
tag = nullptr;
return result;
}
inline bool
CurlInputStream::FillBuffer(Error &error)
{
while (easy != nullptr && buffer.IsEmpty())
cond.wait(mutex);
if (postponed_error.IsDefined()) {
error = std::move(postponed_error);
postponed_error.Clear();
return false;
}
return !buffer.IsEmpty();
}
size_t
CurlInputStream::Read(void *ptr, size_t read_size, Error &error)
{
if (!FillBuffer(error))
return 0;
auto r = buffer.Read();
if (r.IsEmpty())
return 0;
const size_t nbytes = std::min(read_size, r.size);
memcpy(ptr, r.data, nbytes);
buffer.Consume(nbytes);
offset += (InputPlugin::offset_type)nbytes;
if (paused && GetTotalBufferSize() < CURL_RESUME_AT) {
mutex.unlock();
BlockingCall(io_thread_get(), [this](){
Resume();
});
mutex.lock();
}
return nbytes;
}
inline void
CurlInputStream::HeaderReceived(const char *name, std::string &&value)
{
if (IsSeekPending())
/* don't update metadata while seeking */
return;
if (StringEqualsCaseASCII(name, "accept-ranges")) {
/* a stream with icy-metadata is not seekable */
if (!icy->IsEnabled())
@ -716,12 +609,10 @@ CurlInputStream::HeaderReceived(const char *name, std::string &&value)
} else if (StringEqualsCaseASCII(name, "icy-name") ||
StringEqualsCaseASCII(name, "ice-name") ||
StringEqualsCaseASCII(name, "x-audiocast-name")) {
delete tag;
TagBuilder tag_builder;
tag_builder.AddItem(TAG_NAME, value.c_str());
tag = tag_builder.CommitNew();
SetTag(tag_builder.CommitNew());
} else if (StringEqualsCaseASCII(name, "icy-metaint")) {
if (icy->IsEnabled())
return;
@ -782,30 +673,15 @@ CurlInputStream::DataReceived(const void *ptr, size_t received_size)
const ScopeLock protect(mutex);
if (received_size > buffer.GetSpace()) {
paused = true;
if (IsSeekPending())
SeekDone();
if (received_size > GetBufferSpace()) {
AsyncInputStream::Pause();
return CURL_WRITEFUNC_PAUSE;
}
auto w = buffer.Write();
assert(!w.IsEmpty());
size_t nbytes = std::min(w.size, received_size);
memcpy(w.data, ptr, nbytes);
buffer.Append(nbytes);
const size_t remaining = received_size - nbytes;
if (remaining > 0) {
w = buffer.Write();
assert(!w.IsEmpty());
assert(w.size >= remaining);
memcpy(w.data, (const uint8_t *)ptr + nbytes, remaining);
buffer.Append(remaining);
}
ready = true;
cond.broadcast();
AppendToBuffer(ptr, received_size);
return received_size;
}
@ -880,48 +756,16 @@ CurlInputStream::InitEasy(Error &error)
return true;
}
inline bool
CurlInputStream::Seek(offset_type new_offset, Error &error)
void
CurlInputStream::DoSeek(offset_type new_offset)
{
assert(IsReady());
if (new_offset == offset)
/* no-op */
return true;
if (!IsSeekable())
return false;
/* calculate the absolute offset */
if (new_offset < 0)
return false;
/* check if we can fast-forward the buffer */
while (new_offset > offset) {
auto r = buffer.Read();
if (r.IsEmpty())
break;
const size_t nbytes =
new_offset - offset < (InputPlugin::offset_type)r.size
? new_offset - offset
: r.size;
buffer.Consume(nbytes);
offset += nbytes;
}
if (new_offset == offset)
return true;
/* close the old connection and open a new one */
mutex.unlock();
FreeEasyIndirect();
buffer.Clear();
offset = new_offset;
if (offset == size) {
@ -929,12 +773,14 @@ CurlInputStream::Seek(offset_type new_offset, Error &error)
triggering a "416 Requested Range Not Satisfiable"
response */
mutex.lock();
return true;
SeekDone();
return;
}
if (!InitEasy(error)) {
if (!InitEasy(postponed_error)) {
mutex.lock();
return false;
SeekDone();
return;
}
/* send the "Range" header */
@ -944,23 +790,14 @@ CurlInputStream::Seek(offset_type new_offset, Error &error)
curl_easy_setopt(easy, CURLOPT_RANGE, range);
}
ready = false;
if (!input_curl_easy_add_indirect(this, error)) {
if (!input_curl_easy_add_indirect(this, postponed_error)) {
mutex.lock();
return false;
SeekDone();
return;
}
mutex.lock();
WaitReady();
if (postponed_error.IsDefined()) {
error = std::move(postponed_error);
postponed_error.Clear();
return false;
}
return true;
offset = new_offset;
}
inline InputStream *