From 5277297336eb2d9db0ba9f86e6a5f638f04b06af Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Thu, 9 May 2019 11:39:30 +0200 Subject: [PATCH] input/buffered: move basic buffering code to class BufferingInputStream Prepare to reuse it in another class. --- src/input/BufferedInputStream.cxx | 193 +++----------------------- src/input/BufferedInputStream.hxx | 52 +------ src/input/BufferingInputStream.cxx | 213 +++++++++++++++++++++++++++++ src/input/BufferingInputStream.hxx | 100 ++++++++++++++ src/input/meson.build | 1 + 5 files changed, 337 insertions(+), 222 deletions(-) create mode 100644 src/input/BufferingInputStream.cxx create mode 100644 src/input/BufferingInputStream.hxx diff --git a/src/input/BufferedInputStream.cxx b/src/input/BufferedInputStream.cxx index 29c6ebd1b..55eb07b5d 100644 --- a/src/input/BufferedInputStream.cxx +++ b/src/input/BufferedInputStream.cxx @@ -1,5 +1,5 @@ /* - * Copyright 2003-2018 The Music Player Daemon Project + * Copyright 2003-2019 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -25,212 +25,51 @@ BufferedInputStream::BufferedInputStream(InputStreamPtr _input) :InputStream(_input->GetURI(), _input->mutex), - input(std::move(_input)), - thread(BIND_THIS_METHOD(RunThread)), - buffer(input->GetSize()) + BufferingInputStream(std::move(_input)) { - assert(IsEligible(*input)); + assert(IsEligible(GetInput())); - input->SetHandler(this); + if (GetInput().HasMimeType()) + SetMimeType(GetInput().GetMimeType()); - if (input->HasMimeType()) - SetMimeType(input->GetMimeType()); - - size = input->GetSize(); - seekable = input->IsSeekable(); - offset = input->GetOffset(); + InputStream::size = BufferingInputStream::size(); + InputStream::seekable = GetInput().IsSeekable(); + InputStream::offset = GetInput().GetOffset(); SetReady(); - - thread.Start(); -} - -BufferedInputStream::~BufferedInputStream() noexcept -{ - { - const std::lock_guard lock(mutex); - stop = true; - wake_cond.notify_one(); - } - - thread.Join(); } void BufferedInputStream::Check() { - if (input) - input->Check(); + BufferingInputStream::Check(); } void BufferedInputStream::Seek(std::unique_lock &lock, offset_type new_offset) { - if (new_offset >= size) { - offset = size; - return; - } - - auto r = buffer.Read(new_offset); - if (r.HasData()) { - /* nice, we already have some data at the desired - offset and this method call is a no-op */ - offset = new_offset; - return; - } - - seek_offset = new_offset; - seek = true; - wake_cond.notify_one(); - - client_cond.wait(lock, [this]{ return !seek; }); - - if (seek_error) - std::rethrow_exception(std::exchange(seek_error, {})); - - offset = new_offset; + BufferingInputStream::Seek(lock, new_offset); + InputStream::offset = new_offset; } bool BufferedInputStream::IsEOF() noexcept { - return offset == size; + return InputStream::offset == BufferingInputStream::size(); } bool BufferedInputStream::IsAvailable() noexcept { - return IsEOF() || buffer.Read(offset).HasData(); + return BufferingInputStream::IsAvailable(); } size_t BufferedInputStream::Read(std::unique_lock &lock, void *ptr, size_t s) { - if (offset >= size) - return 0; - - while (true) { - assert(size == buffer.size()); - - auto r = buffer.Read(offset); - if (r.HasData()) { - /* yay, we have some data */ - size_t nbytes = std::min(s, r.defined_buffer.size); - memcpy(ptr, r.defined_buffer.data, nbytes); - offset += nbytes; - - if (!IsAvailable()) { - /* wake up the sleeping thread */ - idle = false; - wake_cond.notify_one(); - } - - return nbytes; - } - - if (read_error) { - wake_cond.notify_one(); - std::rethrow_exception(std::exchange(read_error, {})); - } - - if (idle) { - /* wake up the sleeping thread */ - idle = false; - wake_cond.notify_one(); - } - - client_cond.wait(lock); - } -} - -void -BufferedInputStream::RunThread() noexcept -{ - SetThreadName("input_buffered"); - - std::unique_lock lock(mutex); - - while (!stop) { - assert(size == buffer.size()); - - if (seek) { - try { - input->Seek(lock, seek_offset); - } catch (...) { - seek_error = std::current_exception(); - } - - idle = false; - seek = false; - client_cond.notify_one(); - } else if (!idle && !read_error && - offset != input->GetOffset() && - !IsAvailable()) { - /* a past Seek() call was a no-op because data - was already available at that position, but - now we've reached a new position where - there is no more data in the buffer, and - our input is reading somewhere else (maybe - stuck at the end of the file); to find a - way out, we now seek our input to our - reading position to be able to fill our - buffer */ - - try { - input->Seek(lock, offset); - } catch (...) { - /* this is really a seek error, but we - register it as a read_error, - because seek_error is only checked - by Seek(), and at our frontend (our - own InputStream interface) is in - "read" mode */ - read_error = std::current_exception(); - } - } else if (!idle && !read_error && - input->IsAvailable() && !input->IsEOF()) { - const auto read_offset = input->GetOffset(); - auto w = buffer.Write(read_offset); - - if (w.empty()) { - if (IsAvailable()) { - /* we still have enough data - for the next Read() - sleep - until we need more data */ - idle = true; - } else { - /* we need more data at our - current position, because - the next Read() will stall - - seek our input to our - offset to prepare filling - the buffer from there */ - try { - input->Seek(lock, offset); - } catch (...) { - read_error = std::current_exception(); - client_cond.notify_one(); - InvokeOnAvailable(); - } - } - - continue; - } - - try { - size_t nbytes = input->Read(lock, - w.data, w.size); - buffer.Commit(read_offset, - read_offset + nbytes); - } catch (...) { - read_error = std::current_exception(); - } - - client_cond.notify_one(); - InvokeOnAvailable(); - } else - wake_cond.wait(lock); - } + size_t nbytes = BufferingInputStream::Read(lock, ptr, s); + InputStream::offset += nbytes; + return nbytes; } diff --git a/src/input/BufferedInputStream.hxx b/src/input/BufferedInputStream.hxx index dcb6282a1..74b2ef492 100644 --- a/src/input/BufferedInputStream.hxx +++ b/src/input/BufferedInputStream.hxx @@ -1,5 +1,5 @@ /* - * Copyright 2003-2018 The Music Player Daemon Project + * Copyright 2003-2019 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -21,13 +21,7 @@ #define MPD_BUFFERED_INPUT_STREAM_BUFFER_HXX #include "InputStream.hxx" -#include "Ptr.hxx" -#include "Handler.hxx" -#include "thread/Thread.hxx" -#include "thread/Cond.hxx" -#include "util/SparseBuffer.hxx" - -#include +#include "BufferingInputStream.hxx" #include @@ -36,37 +30,12 @@ * #InputStream. This works only if the #InputStream is a "file", not * a "stream"; see IsEligible() for details. */ -class BufferedInputStream final : public InputStream, InputStreamHandler { - InputStreamPtr input; - - Thread thread; - - /** - * This #Cond wakes up the #Thread. It is used by both the - * "client" thread (to submit commands) and #input's handler - * (to notify new data being available). - */ - Cond wake_cond; - - /** - * This #Cond wakes up the client upon command completion. - */ - Cond client_cond; - - SparseBuffer buffer; - - bool stop = false, seek = false, idle = false; - - offset_type seek_offset; - - std::exception_ptr read_error, seek_error; - +class BufferedInputStream final : public InputStream, BufferingInputStream { // TODO: make configurable static constexpr offset_type MAX_SIZE = 128 * 1024 * 1024; public: BufferedInputStream(InputStreamPtr _input); - ~BufferedInputStream() noexcept override; /** * Check whether the given #InputStream can be used as input @@ -93,18 +62,11 @@ public: size_t Read(std::unique_lock &lock, void *ptr, size_t size) override; - /* virtual methods from class InputStreamHandler */ - void OnInputStreamReady() noexcept override { - /* this should never be called, because our input must - be "ready" already */ - } - - void OnInputStreamAvailable() noexcept override { - wake_cond.notify_one(); - } - private: - void RunThread() noexcept; + /* virtual methods from class BufferingInputStream */ + void OnBufferAvailable() noexcept override { + InvokeOnAvailable(); + } }; #endif diff --git a/src/input/BufferingInputStream.cxx b/src/input/BufferingInputStream.cxx new file mode 100644 index 000000000..3f624cfef --- /dev/null +++ b/src/input/BufferingInputStream.cxx @@ -0,0 +1,213 @@ +/* + * Copyright 2003-2019 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "BufferingInputStream.hxx" +#include "thread/Cond.hxx" +#include "thread/Name.hxx" + +#include + +BufferingInputStream::BufferingInputStream(InputStreamPtr _input) + :input(std::move(_input)), + mutex(input->mutex), + thread(BIND_THIS_METHOD(RunThread)), + buffer(input->GetSize()) +{ + input->SetHandler(this); + + thread.Start(); +} + +BufferingInputStream::~BufferingInputStream() noexcept +{ + { + const std::lock_guard lock(mutex); + stop = true; + wake_cond.notify_one(); + } + + thread.Join(); +} + +void +BufferingInputStream::Check() +{ + if (input) + input->Check(); +} + +void +BufferingInputStream::Seek(std::unique_lock &lock, size_t new_offset) +{ + if (new_offset >= size()) { + offset = new_offset; + return; + } + + auto r = buffer.Read(new_offset); + if (r.HasData()) { + /* nice, we already have some data at the desired + offset and this method call is a no-op */ + offset = new_offset; + return; + } + + seek_offset = new_offset; + seek = true; + wake_cond.notify_one(); + + client_cond.wait(lock, [this]{ return !seek; }); + + if (seek_error) + std::rethrow_exception(std::exchange(seek_error, {})); + + offset = new_offset; +} + +bool +BufferingInputStream::IsAvailable() noexcept +{ + return offset == size() || buffer.Read(offset).HasData(); +} + +size_t +BufferingInputStream::Read(std::unique_lock &lock, void *ptr, size_t s) +{ + if (offset >= size()) + return 0; + + while (true) { + auto r = buffer.Read(offset); + if (r.HasData()) { + /* yay, we have some data */ + size_t nbytes = std::min(s, r.defined_buffer.size); + memcpy(ptr, r.defined_buffer.data, nbytes); + offset += nbytes; + + if (!IsAvailable()) { + /* wake up the sleeping thread */ + idle = false; + wake_cond.notify_one(); + } + + return nbytes; + } + + if (read_error) { + wake_cond.notify_one(); + std::rethrow_exception(std::exchange(read_error, {})); + } + + if (idle) { + /* wake up the sleeping thread */ + idle = false; + wake_cond.notify_one(); + } + + client_cond.wait(lock); + } +} + +void +BufferingInputStream::RunThread() noexcept +{ + SetThreadName("input_buffered"); + + std::unique_lock lock(mutex); + + while (!stop) { + if (seek) { + try { + input->Seek(lock, seek_offset); + } catch (...) { + seek_error = std::current_exception(); + } + + idle = false; + seek = false; + client_cond.notify_one(); + } else if (!idle && !read_error && + offset != input->GetOffset() && + !IsAvailable()) { + /* a past Seek() call was a no-op because data + was already available at that position, but + now we've reached a new position where + there is no more data in the buffer, and + our input is reading somewhere else (maybe + stuck at the end of the file); to find a + way out, we now seek our input to our + reading position to be able to fill our + buffer */ + + try { + input->Seek(lock, offset); + } catch (...) { + /* this is really a seek error, but we + register it as a read_error, + because seek_error is only checked + by Seek(), and at our frontend (our + own InputStream interface) is in + "read" mode */ + read_error = std::current_exception(); + } + } else if (!idle && !read_error && + input->IsAvailable() && !input->IsEOF()) { + const auto read_offset = input->GetOffset(); + auto w = buffer.Write(read_offset); + + if (w.empty()) { + if (IsAvailable()) { + /* we still have enough data + for the next Read() - sleep + until we need more data */ + idle = true; + } else { + /* we need more data at our + current position, because + the next Read() will stall + - seek our input to our + offset to prepare filling + the buffer from there */ + try { + input->Seek(lock, offset); + } catch (...) { + read_error = std::current_exception(); + client_cond.notify_one(); + OnBufferAvailable(); + } + } + + continue; + } + + try { + size_t nbytes = input->Read(lock, + w.data, w.size); + buffer.Commit(read_offset, + read_offset + nbytes); + } catch (...) { + read_error = std::current_exception(); + } + + client_cond.notify_one(); + OnBufferAvailable(); + } else + wake_cond.wait(lock); + } +} diff --git a/src/input/BufferingInputStream.hxx b/src/input/BufferingInputStream.hxx new file mode 100644 index 000000000..6749022a5 --- /dev/null +++ b/src/input/BufferingInputStream.hxx @@ -0,0 +1,100 @@ +/* + * Copyright 2003-2019 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_BUFFERING_INPUT_STREAM_BUFFER_HXX +#define MPD_BUFFERING_INPUT_STREAM_BUFFER_HXX + +#include "InputStream.hxx" +#include "Ptr.hxx" +#include "Handler.hxx" +#include "thread/Thread.hxx" +#include "thread/Cond.hxx" +#include "util/SparseBuffer.hxx" + +#include + +/** + * A "huge" buffer which remembers the (partial) contents of an + * #InputStream. This works only if the #InputStream is a "file", not + * a "stream". + */ +class BufferingInputStream : InputStreamHandler { + InputStreamPtr input; + + Mutex &mutex; + + Thread thread; + + /** + * This #Cond wakes up the #Thread. It is used by both the + * "client" thread (to submit commands) and #input's handler + * (to notify new data being available). + */ + Cond wake_cond; + + /** + * This #Cond wakes up the client upon command completion. + */ + Cond client_cond; + + SparseBuffer buffer; + + bool stop = false, seek = false, idle = false; + + size_t offset = 0; + + size_t seek_offset; + + std::exception_ptr read_error, seek_error; + +public: + explicit BufferingInputStream(InputStreamPtr _input); + ~BufferingInputStream() noexcept; + + const auto &GetInput() const noexcept { + return *input; + } + + auto size() const noexcept { + return buffer.size(); + } + + void Check(); + void Seek(std::unique_lock &lock, size_t new_offset); + bool IsAvailable() noexcept; + size_t Read(std::unique_lock &lock, void *ptr, size_t size); + +protected: + virtual void OnBufferAvailable() noexcept {} + +private: + void RunThread() noexcept; + + /* virtual methods from class InputStreamHandler */ + void OnInputStreamReady() noexcept final { + /* this should never be called, because our input must + be "ready" already */ + } + + void OnInputStreamAvailable() noexcept final { + wake_cond.notify_one(); + } +}; + +#endif diff --git a/src/input/meson.build b/src/input/meson.build index 2ce6f8832..3502379d8 100644 --- a/src/input/meson.build +++ b/src/input/meson.build @@ -32,6 +32,7 @@ input_glue = static_library( 'TextInputStream.cxx', 'ProxyInputStream.cxx', 'RewindInputStream.cxx', + 'BufferingInputStream.cxx', 'BufferedInputStream.cxx', 'MaybeBufferedInputStream.cxx', include_directories: inc,