From 12f241844510d4bf468b04c6edc995c1915ded8d Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Thu, 21 Jun 2018 22:18:24 +0200 Subject: [PATCH] input/buffered: proxy InputStream implementation which caches in a huge buffer --- Makefile.am | 1 + src/input/BufferedInputStream.cxx | 192 ++++++++++++++++++++++++++++++ src/input/BufferedInputStream.hxx | 110 +++++++++++++++++ 3 files changed, 303 insertions(+) create mode 100644 src/input/BufferedInputStream.cxx create mode 100644 src/input/BufferedInputStream.hxx diff --git a/Makefile.am b/Makefile.am index 4f3a6229c..e328c37c7 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1392,6 +1392,7 @@ libinput_a_SOURCES = \ src/input/AsyncInputStream.cxx src/input/AsyncInputStream.hxx \ src/input/ProxyInputStream.cxx src/input/ProxyInputStream.hxx \ src/input/RewindInputStream.cxx src/input/RewindInputStream.hxx \ + src/input/BufferedInputStream.cxx src/input/BufferedInputStream.hxx \ src/input/plugins/FileInputPlugin.cxx src/input/plugins/FileInputPlugin.hxx libinput_a_CPPFLAGS = $(AM_CPPFLAGS) \ diff --git a/src/input/BufferedInputStream.cxx b/src/input/BufferedInputStream.cxx new file mode 100644 index 000000000..48cb3935b --- /dev/null +++ b/src/input/BufferedInputStream.cxx @@ -0,0 +1,192 @@ +/* + * Copyright 2003-2018 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "BufferedInputStream.hxx" +#include "thread/Cond.hxx" +#include "thread/Name.hxx" + +#include + +BufferedInputStream::BufferedInputStream(InputStreamPtr _input) + :InputStream(_input->GetURI(), _input->mutex), + input(std::move(_input)), + thread(BIND_THIS_METHOD(RunThread)), + buffer(input->GetSize()) +{ + assert(IsEligible(*input)); + + if (input->HasMimeType()) + SetMimeType(input->GetMimeType()); + + size = input->GetSize(); + seekable = input->IsSeekable(); + offset = input->GetOffset(); + + SetReady(); + + thread.Start(); +} + +BufferedInputStream::~BufferedInputStream() noexcept +{ + { + const std::lock_guard lock(mutex); + stop = true; + wake_cond.signal(); + } + + thread.Join(); +} + +void +BufferedInputStream::Check() +{ + if (input) + input->Check(); +} + +void +BufferedInputStream::Seek(offset_type new_offset) +{ + auto r = buffer.Read(new_offset); + if (r.HasData()) + /* nice, we already have some data at the desired + offset and this method call is a no-op */ + return; + + seek_offset = new_offset; + seek = true; + wake_cond.signal(); + + while (seek) + client_cond.wait(mutex); + + if (seek_error) + std::rethrow_exception(std::exchange(seek_error, {})); +} + +bool +BufferedInputStream::IsEOF() noexcept +{ + return offset == size; +} + +bool +BufferedInputStream::IsAvailable() noexcept +{ + return IsEOF() || buffer.Read(offset).HasData(); +} + +size_t +BufferedInputStream::Read(void *ptr, size_t s) +{ + if (offset >= size) + return 0; + + while (true) { + auto r = buffer.Read(offset); + if (r.HasData()) { + /* yay, we have some data */ + size_t nbytes = std::min(s, r.defined_buffer.size); + memcpy(ptr, r.defined_buffer.data, nbytes); + offset += nbytes; + return nbytes; + } + + if (read_error) { + wake_cond.broadcast(); + std::rethrow_exception(std::exchange(read_error, {})); + } + + if (idle) { + /* wake up the sleeping thread */ + idle = false; + wake_cond.signal(); + } + + client_cond.wait(mutex); + } +} + +void +BufferedInputStream::RunThread() noexcept +{ + SetThreadName("input_buffered"); + + const std::lock_guard lock(mutex); + + while (!stop) { + if (seek) { + try { + input->Seek(seek_offset); + } catch (...) { + seek_error = std::current_exception(); + } + + offset = input->GetOffset(); + + idle = false; + seek = false; + client_cond.signal(); + } else if (!idle && !read_error && + input->IsAvailable() && !input->IsEOF()) { + const auto read_offset = input->GetOffset(); + auto w = buffer.Write(read_offset); + + if (w.empty()) { + auto r = buffer.Read(offset); + if (r.HasData()) { + /* we still have enough data + for the next Read() - sleep + until we need more data */ + idle = true; + } else { + /* we need more data at our + current position, because + the next Read() will stall + - seek our input to our + offset to prepare filling + the buffer from there */ + try { + input->Seek(offset); + } catch (...) { + read_error = std::current_exception(); + client_cond.signal(); + InvokeOnAvailable(); + } + } + + continue; + } + + try { + size_t nbytes = input->Read(w.data, w.size); + buffer.Commit(read_offset, + read_offset + nbytes); + } catch (...) { + read_error = std::current_exception(); + } + + client_cond.signal(); + InvokeOnAvailable(); + } else + wake_cond.wait(mutex); + } +} diff --git a/src/input/BufferedInputStream.hxx b/src/input/BufferedInputStream.hxx new file mode 100644 index 000000000..2bed6b408 --- /dev/null +++ b/src/input/BufferedInputStream.hxx @@ -0,0 +1,110 @@ +/* + * Copyright 2003-2018 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_BUFFERED_INPUT_STREAM_BUFFER_HXX +#define MPD_BUFFERED_INPUT_STREAM_BUFFER_HXX + +#include "check.h" +#include "InputStream.hxx" +#include "Ptr.hxx" +#include "Handler.hxx" +#include "thread/Thread.hxx" +#include "thread/Cond.hxx" +#include "util/SparseBuffer.hxx" + +#include + +#include + +/** + * A "huge" buffer which remembers the (partial) contents of an + * #InputStream. This works only if the #InputStream is a "file", not + * a "stream"; see IsEligible() for details. + */ +class BufferedInputStream final : public InputStream, InputStreamHandler { + InputStreamPtr input; + + Thread thread; + + /** + * This #Cond wakes up the #Thread. It is used by both the + * "client" thread (to submit commands) and #input's handler + * (to notify new data being available). + */ + Cond wake_cond; + + /** + * This #Cond wakes up the client upon command completion. + */ + Cond client_cond; + + SparseBuffer buffer; + + bool stop = false, seek = false, idle = false; + + offset_type seek_offset; + + std::exception_ptr read_error, seek_error; + + // TODO: make configurable + static constexpr offset_type MAX_SIZE = 128 * 1024 * 1024; + +public: + BufferedInputStream(InputStreamPtr _input); + ~BufferedInputStream() noexcept override; + + /** + * Check whether the given #InputStream can be used as input + * for this class. + */ + static bool IsEligible(const InputStream &input) noexcept { + assert(input.IsReady()); + + return input.IsSeekable() && input.KnownSize() && + input.GetSize() > 0 && + input.GetSize() <= MAX_SIZE; + } + + /* virtual methods from class InputStream */ + void Check() override; + /* we don't need to implement Update() because all attributes + have been copied already in our constructor */ + //void Update() noexcept; + void Seek(offset_type offset) override; + bool IsEOF() noexcept override; + /* we don't support tags */ + // std::unique_ptr ReadTag() override; + bool IsAvailable() noexcept override; + size_t Read(void *ptr, size_t size) override; + + /* virtual methods from class InputStreamHandler */ + void OnInputStreamReady() noexcept override { + /* this should never be called, because our input must + be "ready" already */ + } + + void OnInputStreamAvailable() noexcept override { + wake_cond.signal(); + } + +private: + void RunThread() noexcept; +}; + +#endif