From 6681b14b71b3122772fa74def3fe526c1fc6aa41 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Thu, 21 Jun 2018 22:19:46 +0200 Subject: [PATCH] input/MaybeBuffered: proxy InputStream implementation which auto-uses BufferedInputStream --- Makefile.am | 1 + src/input/BufferedInputStream.cxx | 26 +++++++++++++---- src/input/MaybeBufferedInputStream.cxx | 38 +++++++++++++++++++++++++ src/input/MaybeBufferedInputStream.hxx | 39 ++++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 6 deletions(-) create mode 100644 src/input/MaybeBufferedInputStream.cxx create mode 100644 src/input/MaybeBufferedInputStream.hxx diff --git a/Makefile.am b/Makefile.am index e328c37c7..8a6beb43b 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1393,6 +1393,7 @@ libinput_a_SOURCES = \ src/input/ProxyInputStream.cxx src/input/ProxyInputStream.hxx \ src/input/RewindInputStream.cxx src/input/RewindInputStream.hxx \ src/input/BufferedInputStream.cxx src/input/BufferedInputStream.hxx \ + src/input/MaybeBufferedInputStream.cxx src/input/MaybeBufferedInputStream.hxx \ src/input/plugins/FileInputPlugin.cxx src/input/plugins/FileInputPlugin.hxx libinput_a_CPPFLAGS = $(AM_CPPFLAGS) \ diff --git a/src/input/BufferedInputStream.cxx b/src/input/BufferedInputStream.cxx index 48cb3935b..c0b6e0783 100644 --- a/src/input/BufferedInputStream.cxx +++ b/src/input/BufferedInputStream.cxx @@ -32,6 +32,8 @@ BufferedInputStream::BufferedInputStream(InputStreamPtr _input) { assert(IsEligible(*input)); + input->SetHandler(this); + if (input->HasMimeType()) SetMimeType(input->GetMimeType()); @@ -66,10 +68,12 @@ void BufferedInputStream::Seek(offset_type new_offset) { auto r = buffer.Read(new_offset); - if (r.HasData()) + if (r.HasData()) { /* nice, we already have some data at the desired offset and this method call is a no-op */ + offset = new_offset; return; + } seek_offset = new_offset; seek = true; @@ -80,6 +84,8 @@ BufferedInputStream::Seek(offset_type new_offset) if (seek_error) std::rethrow_exception(std::exchange(seek_error, {})); + + offset = input->GetOffset(); } bool @@ -101,17 +107,26 @@ BufferedInputStream::Read(void *ptr, size_t s) return 0; while (true) { + assert(size == buffer.size()); + auto r = buffer.Read(offset); if (r.HasData()) { /* yay, we have some data */ size_t nbytes = std::min(s, r.defined_buffer.size); memcpy(ptr, r.defined_buffer.data, nbytes); offset += nbytes; + + if (!IsAvailable()) { + /* wake up the sleeping thread */ + idle = false; + wake_cond.signal(); + } + return nbytes; } if (read_error) { - wake_cond.broadcast(); + wake_cond.signal(); std::rethrow_exception(std::exchange(read_error, {})); } @@ -133,6 +148,8 @@ BufferedInputStream::RunThread() noexcept const std::lock_guard lock(mutex); while (!stop) { + assert(size == buffer.size()); + if (seek) { try { input->Seek(seek_offset); @@ -140,8 +157,6 @@ BufferedInputStream::RunThread() noexcept seek_error = std::current_exception(); } - offset = input->GetOffset(); - idle = false; seek = false; client_cond.signal(); @@ -151,8 +166,7 @@ BufferedInputStream::RunThread() noexcept auto w = buffer.Write(read_offset); if (w.empty()) { - auto r = buffer.Read(offset); - if (r.HasData()) { + if (IsAvailable()) { /* we still have enough data for the next Read() - sleep until we need more data */ diff --git a/src/input/MaybeBufferedInputStream.cxx b/src/input/MaybeBufferedInputStream.cxx new file mode 100644 index 000000000..dc9b801a3 --- /dev/null +++ b/src/input/MaybeBufferedInputStream.cxx @@ -0,0 +1,38 @@ +/* + * Copyright 2003-2018 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "MaybeBufferedInputStream.hxx" +#include "BufferedInputStream.hxx" + +MaybeBufferedInputStream::MaybeBufferedInputStream(InputStreamPtr _input) noexcept + :ProxyInputStream(std::move(_input)) {} + +void +MaybeBufferedInputStream::Update() noexcept +{ + const bool was_ready = IsReady(); + + ProxyInputStream::Update(); + + if (!was_ready && IsReady() && BufferedInputStream::IsEligible(*input)) + /* our input has just become ready - check if we + should buffer it */ + SetInput(std::make_unique(std::move(input))); +} diff --git a/src/input/MaybeBufferedInputStream.hxx b/src/input/MaybeBufferedInputStream.hxx new file mode 100644 index 000000000..8e774729c --- /dev/null +++ b/src/input/MaybeBufferedInputStream.hxx @@ -0,0 +1,39 @@ +/* + * Copyright 2003-2018 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_MAYBE_BUFFERED_INPUT_STREAM_BUFFER_HXX +#define MPD_MAYBE_BUFFERED_INPUT_STREAM_BUFFER_HXX + +#include "check.h" +#include "ProxyInputStream.hxx" + +/** + * A proxy which automatically inserts #BufferedInputStream once the + * input becomes ready and is "eligible" (see + * BufferedInputStream::IsEligible()). + */ +class MaybeBufferedInputStream final : public ProxyInputStream { +public: + explicit MaybeBufferedInputStream(InputStreamPtr _input) noexcept; + + /* virtual methods from class InputStream */ + void Update() noexcept override; +}; + +#endif