diff --git a/Makefile.am b/Makefile.am index 913d4bddd..2df5be262 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1032,6 +1032,7 @@ libinput_a_SOURCES = \ src/input/InputPlugin.hxx \ src/input/TextInputStream.cxx src/input/TextInputStream.hxx \ src/input/ThreadInputStream.cxx src/input/ThreadInputStream.hxx \ + src/input/AsyncInputStream.cxx src/input/AsyncInputStream.hxx \ src/input/ProxyInputStream.cxx src/input/ProxyInputStream.hxx \ src/input/plugins/RewindInputPlugin.cxx src/input/plugins/RewindInputPlugin.hxx \ src/input/plugins/FileInputPlugin.cxx src/input/plugins/FileInputPlugin.hxx diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx new file mode 100644 index 000000000..3f2d3b90f --- /dev/null +++ b/src/input/AsyncInputStream.cxx @@ -0,0 +1,239 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "AsyncInputStream.hxx" +#include "tag/Tag.hxx" +#include "event/Call.hxx" +#include "thread/Cond.hxx" +#include "IOThread.hxx" +#include "util/HugeAllocator.hxx" + +#include +#include + +AsyncInputStream::AsyncInputStream(const char *_url, + Mutex &_mutex, Cond &_cond, + void *_buffer, size_t _buffer_size, + size_t _resume_at) + :InputStream(_url, _mutex, _cond), DeferredMonitor(io_thread_get()), + buffer((uint8_t *)_buffer, _buffer_size), + resume_at(_resume_at), + open(true), + paused(false), + seek_state(SeekState::NONE), + tag(nullptr) {} + +AsyncInputStream::~AsyncInputStream() +{ + delete tag; + + buffer.Clear(); + HugeFree(buffer.Write().data, buffer.GetCapacity()); +} + +void +AsyncInputStream::SetTag(Tag *_tag) +{ + delete tag; + tag = _tag; +} + +void +AsyncInputStream::Pause() +{ + assert(io_thread_inside()); + + paused = true; +} + +inline void +AsyncInputStream::Resume() +{ + assert(io_thread_inside()); + + if (paused) { + paused = false; + DoResume(); + } +} + +bool +AsyncInputStream::Check(Error &error) +{ + bool success = !postponed_error.IsDefined(); + if (!success) { + error = std::move(postponed_error); + postponed_error.Clear(); + } + + return success; +} + +bool +AsyncInputStream::IsEOF() +{ + return !open && buffer.IsEmpty(); +} + +bool +AsyncInputStream::Seek(offset_type new_offset, Error &error) +{ + assert(IsReady()); + assert(seek_state == SeekState::NONE); + + if (new_offset == offset) + /* no-op */ + return true; + + if (!IsSeekable()) + return false; + + if (new_offset < 0) + return false; + + /* check if we can fast-forward the buffer */ + + while (new_offset > offset) { + auto r = buffer.Read(); + if (r.IsEmpty()) + break; + + const size_t nbytes = + new_offset - offset < (offset_type)r.size + ? new_offset - offset + : r.size; + + buffer.Consume(nbytes); + offset += nbytes; + } + + if (new_offset == offset) + return true; + + /* no: ask the implementation to seek */ + + seek_offset = new_offset; + seek_state = SeekState::SCHEDULED; + + DeferredMonitor::Schedule(); + + while (seek_state != SeekState::NONE) + cond.wait(mutex); + + if (!Check(error)) + return false; + + return true; +} + +void +AsyncInputStream::SeekDone() +{ + assert(io_thread_inside()); + assert(IsSeekPending()); + + seek_state = SeekState::NONE; + cond.broadcast(); +} + +Tag * +AsyncInputStream::ReadTag() +{ + Tag *result = tag; + tag = nullptr; + return result; +} + +bool +AsyncInputStream::IsAvailable() +{ + return postponed_error.IsDefined() || !open || + !buffer.IsEmpty(); +} + +size_t +AsyncInputStream::Read(void *ptr, size_t read_size, Error &error) +{ + assert(!io_thread_inside()); + + /* wait for data */ + CircularBuffer::Range r; + while (true) { + if (!Check(error)) + return 0; + + r = buffer.Read(); + if (!r.IsEmpty() || !open) + break; + + cond.wait(mutex); + } + + const size_t nbytes = std::min(read_size, r.size); + memcpy(ptr, r.data, nbytes); + buffer.Consume(nbytes); + + offset += (offset_type)nbytes; + + if (paused && buffer.GetSize() < resume_at) + DeferredMonitor::Schedule(); + + return nbytes; +} + +void +AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) +{ + auto w = buffer.Write(); + assert(!w.IsEmpty()); + + size_t nbytes = std::min(w.size, append_size); + memcpy(w.data, data, nbytes); + buffer.Append(nbytes); + + const size_t remaining = append_size - nbytes; + if (remaining > 0) { + w = buffer.Write(); + assert(!w.IsEmpty()); + assert(w.size >= remaining); + + memcpy(w.data, (const uint8_t *)data + nbytes, remaining); + buffer.Append(remaining); + } + + if (!IsReady()) + SetReady(); + else + cond.broadcast(); +} + +void +AsyncInputStream::RunDeferred() +{ + const ScopeLock protect(mutex); + + Resume(); + + if (seek_state == SeekState::SCHEDULED) { + seek_state = SeekState::PENDING; + buffer.Clear(); + DoSeek(seek_offset); + } +} diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx new file mode 100644 index 000000000..f72e7465d --- /dev/null +++ b/src/input/AsyncInputStream.hxx @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_ASYNC_INPUT_STREAM_HXX +#define MPD_ASYNC_INPUT_STREAM_HXX + +#include "InputStream.hxx" +#include "event/DeferredMonitor.hxx" +#include "util/CircularBuffer.hxx" +#include "util/Error.hxx" + +/** + * Helper class for moving asynchronous (non-blocking) InputStream + * implementations to the I/O thread. Data is being read into a ring + * buffer, and that buffer is then consumed by another thread using + * the regular #InputStream API. + */ +class AsyncInputStream : public InputStream, private DeferredMonitor { + enum class SeekState : uint8_t { + NONE, SCHEDULED, PENDING + }; + + CircularBuffer buffer; + const size_t resume_at; + + bool open; + + /** + * Is the connection currently paused? That happens when the + * buffer was getting too large. It will be unpaused when the + * buffer is below the threshold again. + */ + bool paused; + + SeekState seek_state; + + /** + * The #Tag object ready to be requested via + * InputStream::ReadTag(). + */ + Tag *tag; + + offset_type seek_offset; + +protected: + Error postponed_error; + +public: + AsyncInputStream(const char *_url, + Mutex &_mutex, Cond &_cond, + void *_buffer, size_t _buffer_size, + size_t _resume_at); + + virtual ~AsyncInputStream(); + + /* virtual methods from InputStream */ + bool Check(Error &error) final; + bool IsEOF() final; + bool Seek(offset_type new_offset, Error &error) final; + Tag *ReadTag() final; + bool IsAvailable() final; + size_t Read(void *ptr, size_t read_size, Error &error) final; + +protected: + void SetTag(Tag *_tag); + + void Pause(); + + void SetClosed() { + open = false; + } + + bool IsBufferEmpty() const { + return buffer.IsEmpty(); + } + + gcc_pure + size_t GetBufferSpace() const { + return buffer.GetSpace(); + } + + void AppendToBuffer(const void *data, size_t append_size); + + virtual void DoResume() = 0; + + /** + * The actual Seek() implementation. This virtual method will + * be called from within the I/O thread. When the operation + * is finished, call SeekDone() to notify the caller. + */ + virtual void DoSeek(offset_type new_offset) = 0; + + bool IsSeekPending() const { + return seek_state == SeekState::PENDING; + } + + void SeekDone(); + +private: + void Resume(); + + /* virtual methods from DeferredMonitor */ + void RunDeferred() final; +}; + +#endif diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx index b48512ccb..2450c8754 100644 --- a/src/input/plugins/CurlInputPlugin.cxx +++ b/src/input/plugins/CurlInputPlugin.cxx @@ -19,8 +19,8 @@ #include "config.h" #include "CurlInputPlugin.hxx" +#include "../AsyncInputStream.hxx" #include "../IcyInputStream.hxx" -#include "../InputStream.hxx" #include "../InputPlugin.hxx" #include "config/ConfigGlobal.hxx" #include "config/ConfigData.hxx" @@ -60,7 +60,7 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024; */ static const size_t CURL_RESUME_AT = 384 * 1024; -struct CurlInputStream final : public InputStream { +struct CurlInputStream final : public AsyncInputStream { /* some buffers which were passed to libcurl, which we have too free */ char range[32]; @@ -69,39 +69,19 @@ struct CurlInputStream final : public InputStream { /** the curl handles */ CURL *easy; - /** - * A buffer where input_curl_writefunction() appends - * to, and input_curl_read() reads from. - */ - CircularBuffer buffer; - - /** - * Is the connection currently paused? That happens when the - * buffer was getting too large. It will be unpaused when the - * buffer is below the threshold again. - */ - bool paused; - /** error message provided by libcurl */ char error_buffer[CURL_ERROR_SIZE]; /** parser for icy-metadata */ IcyInputStream *icy; - /** the tag object ready to be requested via - InputStream::ReadTag() */ - Tag *tag; - - Error postponed_error; - CurlInputStream(const char *_url, Mutex &_mutex, Cond &_cond, void *_buffer) - :InputStream(_url, _mutex, _cond), + :AsyncInputStream(_url, _mutex, _cond, + _buffer, CURL_MAX_BUFFERED, + CURL_RESUME_AT), request_headers(nullptr), - buffer((uint8_t *)_buffer, CURL_MAX_BUFFERED), - paused(false), - icy(new IcyInputStream(this)), - tag(nullptr) {} + icy(new IcyInputStream(this)) {} ~CurlInputStream(); @@ -133,19 +113,6 @@ struct CurlInputStream final : public InputStream { size_t DataReceived(const void *ptr, size_t size); - void Resume(); - bool FillBuffer(Error &error); - - /** - * Returns the number of bytes stored in the buffer. - * - * The caller must lock the mutex. - */ - gcc_pure - size_t GetTotalBufferSize() const { - return buffer.GetSize(); - } - /** * A HTTP request is finished. * @@ -153,22 +120,9 @@ struct CurlInputStream final : public InputStream { */ void RequestDone(CURLcode result, long status); - /* virtual methods from InputStream */ - bool Check(Error &error) override; - - bool IsEOF() override { - return easy == nullptr && buffer.IsEmpty(); - } - - Tag *ReadTag() override; - - bool IsAvailable() override { - return postponed_error.IsDefined() || easy == nullptr || - !buffer.IsEmpty(); - } - - size_t Read(void *ptr, size_t size, Error &error) override; - bool Seek(offset_type offset, Error &error) override; + /* virtual methods from AsyncInputStream */ + virtual void DoResume() override; + virtual void DoSeek(offset_type new_offset) override; }; class CurlMulti; @@ -327,23 +281,24 @@ input_curl_find_request(CURL *easy) return (CurlInputStream *)p; } -inline void -CurlInputStream::Resume() +void +CurlInputStream::DoResume() { assert(io_thread_inside()); - if (paused) { - paused = false; - curl_easy_pause(easy, CURLPAUSE_CONT); + mutex.unlock(); - if (curl_version_num < 0x072000) - /* libcurl older than 7.32.0 does not update - its sockets after curl_easy_pause(); force - libcurl to do it now */ - curl_multi->ResumeSockets(); + curl_easy_pause(easy, CURLPAUSE_CONT); - curl_multi->InvalidateSockets(); - } + if (curl_version_num < 0x072000) + /* libcurl older than 7.32.0 does not update + its sockets after curl_easy_pause(); force + libcurl to do it now */ + curl_multi->ResumeSockets(); + + curl_multi->InvalidateSockets(); + + mutex.lock(); } int @@ -472,6 +427,7 @@ CurlInputStream::RequestDone(CURLcode result, long status) assert(!postponed_error.IsDefined()); FreeEasy(); + AsyncInputStream::SetClosed(); const ScopeLock protect(mutex); @@ -484,7 +440,9 @@ CurlInputStream::RequestDone(CURLcode result, long status) status); } - if (!IsReady()) + if (IsSeekPending()) + SeekDone(); + else if (!IsReady()) SetReady(); } @@ -630,81 +588,16 @@ input_curl_finish(void) CurlInputStream::~CurlInputStream() { - delete tag; - FreeEasyIndirect(); - - buffer.Clear(); - HugeFree(buffer.Write().data, CURL_MAX_BUFFERED); -} - -inline bool -CurlInputStream::Check(Error &error) -{ - bool success = !postponed_error.IsDefined(); - if (!success) { - error = std::move(postponed_error); - postponed_error.Clear(); - } - - return success; -} - -Tag * -CurlInputStream::ReadTag() -{ - Tag *result = tag; - tag = nullptr; - return result; -} - -inline bool -CurlInputStream::FillBuffer(Error &error) -{ - while (easy != nullptr && buffer.IsEmpty()) - cond.wait(mutex); - - if (postponed_error.IsDefined()) { - error = std::move(postponed_error); - postponed_error.Clear(); - return false; - } - - return !buffer.IsEmpty(); -} - -size_t -CurlInputStream::Read(void *ptr, size_t read_size, Error &error) -{ - if (!FillBuffer(error)) - return 0; - - auto r = buffer.Read(); - if (r.IsEmpty()) - return 0; - - const size_t nbytes = std::min(read_size, r.size); - memcpy(ptr, r.data, nbytes); - buffer.Consume(nbytes); - - offset += (InputPlugin::offset_type)nbytes; - - if (paused && GetTotalBufferSize() < CURL_RESUME_AT) { - mutex.unlock(); - - BlockingCall(io_thread_get(), [this](){ - Resume(); - }); - - mutex.lock(); - } - - return nbytes; } inline void CurlInputStream::HeaderReceived(const char *name, std::string &&value) { + if (IsSeekPending()) + /* don't update metadata while seeking */ + return; + if (StringEqualsCaseASCII(name, "accept-ranges")) { /* a stream with icy-metadata is not seekable */ if (!icy->IsEnabled()) @@ -716,12 +609,10 @@ CurlInputStream::HeaderReceived(const char *name, std::string &&value) } else if (StringEqualsCaseASCII(name, "icy-name") || StringEqualsCaseASCII(name, "ice-name") || StringEqualsCaseASCII(name, "x-audiocast-name")) { - delete tag; - TagBuilder tag_builder; tag_builder.AddItem(TAG_NAME, value.c_str()); - tag = tag_builder.CommitNew(); + SetTag(tag_builder.CommitNew()); } else if (StringEqualsCaseASCII(name, "icy-metaint")) { if (icy->IsEnabled()) return; @@ -782,30 +673,15 @@ CurlInputStream::DataReceived(const void *ptr, size_t received_size) const ScopeLock protect(mutex); - if (received_size > buffer.GetSpace()) { - paused = true; + if (IsSeekPending()) + SeekDone(); + + if (received_size > GetBufferSpace()) { + AsyncInputStream::Pause(); return CURL_WRITEFUNC_PAUSE; } - auto w = buffer.Write(); - assert(!w.IsEmpty()); - - size_t nbytes = std::min(w.size, received_size); - memcpy(w.data, ptr, nbytes); - buffer.Append(nbytes); - - const size_t remaining = received_size - nbytes; - if (remaining > 0) { - w = buffer.Write(); - assert(!w.IsEmpty()); - assert(w.size >= remaining); - - memcpy(w.data, (const uint8_t *)ptr + nbytes, remaining); - buffer.Append(remaining); - } - - ready = true; - cond.broadcast(); + AppendToBuffer(ptr, received_size); return received_size; } @@ -880,48 +756,16 @@ CurlInputStream::InitEasy(Error &error) return true; } -inline bool -CurlInputStream::Seek(offset_type new_offset, Error &error) +void +CurlInputStream::DoSeek(offset_type new_offset) { assert(IsReady()); - if (new_offset == offset) - /* no-op */ - return true; - - if (!IsSeekable()) - return false; - - /* calculate the absolute offset */ - - if (new_offset < 0) - return false; - - /* check if we can fast-forward the buffer */ - - while (new_offset > offset) { - auto r = buffer.Read(); - if (r.IsEmpty()) - break; - - const size_t nbytes = - new_offset - offset < (InputPlugin::offset_type)r.size - ? new_offset - offset - : r.size; - - buffer.Consume(nbytes); - offset += nbytes; - } - - if (new_offset == offset) - return true; - /* close the old connection and open a new one */ mutex.unlock(); FreeEasyIndirect(); - buffer.Clear(); offset = new_offset; if (offset == size) { @@ -929,12 +773,14 @@ CurlInputStream::Seek(offset_type new_offset, Error &error) triggering a "416 Requested Range Not Satisfiable" response */ mutex.lock(); - return true; + SeekDone(); + return; } - if (!InitEasy(error)) { + if (!InitEasy(postponed_error)) { mutex.lock(); - return false; + SeekDone(); + return; } /* send the "Range" header */ @@ -944,23 +790,14 @@ CurlInputStream::Seek(offset_type new_offset, Error &error) curl_easy_setopt(easy, CURLOPT_RANGE, range); } - ready = false; - - if (!input_curl_easy_add_indirect(this, error)) { + if (!input_curl_easy_add_indirect(this, postponed_error)) { mutex.lock(); - return false; + SeekDone(); + return; } mutex.lock(); - WaitReady(); - - if (postponed_error.IsDefined()) { - error = std::move(postponed_error); - postponed_error.Clear(); - return false; - } - - return true; + offset = new_offset; } inline InputStream *