diff --git a/Makefile.am b/Makefile.am index fce275360..8f3f57b7d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1382,6 +1382,7 @@ libinput_a_SOURCES = \ src/input/InputStream.cxx src/input/InputStream.hxx \ src/input/Ptr.hxx \ src/input/InputPlugin.hxx \ + src/input/Handler.hxx \ src/input/RemoteTagScanner.hxx \ src/input/ScanTags.cxx src/input/ScanTags.hxx \ src/input/Reader.cxx src/input/Reader.hxx \ diff --git a/src/TagArchive.cxx b/src/TagArchive.cxx index c16b13a27..ce0f2ed39 100644 --- a/src/TagArchive.cxx +++ b/src/TagArchive.cxx @@ -22,16 +22,14 @@ #include "TagStream.hxx" #include "archive/ArchiveFile.hxx" #include "input/InputStream.hxx" -#include "thread/Cond.hxx" bool tag_archive_scan(ArchiveFile &archive, const char *path_utf8, const TagHandler &handler, void *handler_ctx) noexcept try { Mutex mutex; - Cond cond; - auto is = archive.OpenStream(path_utf8, mutex, cond); + auto is = archive.OpenStream(path_utf8, mutex); if (!is) return false; @@ -45,9 +43,8 @@ tag_archive_scan(ArchiveFile &archive, const char *path_utf8, TagBuilder &builder) noexcept try { Mutex mutex; - Cond cond; - auto is = archive.OpenStream(path_utf8, mutex, cond); + auto is = archive.OpenStream(path_utf8, mutex); return is && tag_stream_scan(*is, builder); } catch (const std::exception &e) { return false; diff --git a/src/TagFile.cxx b/src/TagFile.cxx index e28c90d50..da25102bf 100644 --- a/src/TagFile.cxx +++ b/src/TagFile.cxx @@ -27,7 +27,6 @@ #include "decoder/DecoderPlugin.hxx" #include "input/InputStream.hxx" #include "input/LocalOpen.hxx" -#include "thread/Cond.hxx" #include <exception> @@ -41,7 +40,6 @@ class TagFileScan { void *handler_ctx; Mutex mutex; - Cond cond; InputStreamPtr is; public: @@ -62,8 +60,7 @@ public: /* open the InputStream (if not already open) */ if (is == nullptr) { try { - is = OpenLocalInputStream(path_fs, - mutex, cond); + is = OpenLocalInputStream(path_fs, mutex); } catch (...) { return false; } diff --git a/src/TagStream.cxx b/src/TagStream.cxx index 58cdcb555..0d57f050b 100644 --- a/src/TagStream.cxx +++ b/src/TagStream.cxx @@ -28,7 +28,6 @@ #include "decoder/DecoderPlugin.hxx" #include "input/InputStream.hxx" #include "thread/Mutex.hxx" -#include "thread/Cond.hxx" #include <exception> @@ -78,9 +77,8 @@ bool tag_stream_scan(const char *uri, const TagHandler &handler, void *ctx) try { Mutex mutex; - Cond cond; - auto is = InputStream::OpenReady(uri, mutex, cond); + auto is = InputStream::OpenReady(uri, mutex); return tag_stream_scan(*is, handler, ctx); } catch (const std::exception &e) { return false; @@ -104,9 +102,8 @@ bool tag_stream_scan(const char *uri, TagBuilder &builder) try { Mutex mutex; - Cond cond; - auto is = InputStream::OpenReady(uri, mutex, cond); + auto is = InputStream::OpenReady(uri, mutex); return tag_stream_scan(*is, builder); } catch (const std::exception &e) { return false; diff --git a/src/archive/ArchiveFile.hxx b/src/archive/ArchiveFile.hxx index 00c528e40..7118f78ac 100644 --- a/src/archive/ArchiveFile.hxx +++ b/src/archive/ArchiveFile.hxx @@ -23,7 +23,6 @@ #include "input/Ptr.hxx" class Mutex; -class Cond; class ArchiveVisitor; class ArchiveFile { @@ -43,7 +42,7 @@ public: * @param path the path within the archive */ virtual InputStreamPtr OpenStream(const char *path, - Mutex &mutex, Cond &cond) = 0; + Mutex &mutex) = 0; }; #endif diff --git a/src/archive/plugins/Bzip2ArchivePlugin.cxx b/src/archive/plugins/Bzip2ArchivePlugin.cxx index ec29f81f9..60d2137f3 100644 --- a/src/archive/plugins/Bzip2ArchivePlugin.cxx +++ b/src/archive/plugins/Bzip2ArchivePlugin.cxx @@ -28,7 +28,6 @@ #include "../ArchiveVisitor.hxx" #include "input/InputStream.hxx" #include "input/LocalOpen.hxx" -#include "thread/Cond.hxx" #include "fs/Path.hxx" #include <bzlib.h> @@ -54,7 +53,7 @@ public: } InputStreamPtr OpenStream(const char *path, - Mutex &mutex, Cond &cond) override; + Mutex &mutex) override; }; class Bzip2InputStream final : public InputStream { @@ -69,7 +68,7 @@ class Bzip2InputStream final : public InputStream { public: Bzip2InputStream(const std::shared_ptr<InputStream> &_input, const char *uri, - Mutex &mutex, Cond &cond); + Mutex &mutex); ~Bzip2InputStream(); /* virtual methods from InputStream */ @@ -106,8 +105,7 @@ static std::unique_ptr<ArchiveFile> bz2_open(Path pathname) { static Mutex mutex; - static Cond cond; - auto is = OpenLocalInputStream(pathname, mutex, cond); + auto is = OpenLocalInputStream(pathname, mutex); return std::make_unique<Bzip2ArchiveFile>(pathname, std::move(is)); } @@ -115,8 +113,8 @@ bz2_open(Path pathname) Bzip2InputStream::Bzip2InputStream(const std::shared_ptr<InputStream> &_input, const char *_uri, - Mutex &_mutex, Cond &_cond) - :InputStream(_uri, _mutex, _cond), + Mutex &_mutex) + :InputStream(_uri, _mutex), input(_input) { Open(); @@ -129,9 +127,9 @@ Bzip2InputStream::~Bzip2InputStream() InputStreamPtr Bzip2ArchiveFile::OpenStream(const char *path, - Mutex &mutex, Cond &cond) + Mutex &mutex) { - return std::make_unique<Bzip2InputStream>(istream, path, mutex, cond); + return std::make_unique<Bzip2InputStream>(istream, path, mutex); } inline bool diff --git a/src/archive/plugins/Iso9660ArchivePlugin.cxx b/src/archive/plugins/Iso9660ArchivePlugin.cxx index 971f30275..106c0097b 100644 --- a/src/archive/plugins/Iso9660ArchivePlugin.cxx +++ b/src/archive/plugins/Iso9660ArchivePlugin.cxx @@ -75,7 +75,7 @@ public: virtual void Visit(ArchiveVisitor &visitor) override; InputStreamPtr OpenStream(const char *path, - Mutex &mutex, Cond &cond) override; + Mutex &mutex) override; }; /* archive open && listing routine */ @@ -144,9 +144,9 @@ class Iso9660InputStream final : public InputStream { public: Iso9660InputStream(const std::shared_ptr<Iso9660> &_iso, const char *_uri, - Mutex &_mutex, Cond &_cond, + Mutex &_mutex, iso9660_stat_t *_statbuf) - :InputStream(_uri, _mutex, _cond), + :InputStream(_uri, _mutex), iso(_iso), statbuf(_statbuf) { size = statbuf->size; SetReady(); @@ -163,14 +163,14 @@ public: InputStreamPtr Iso9660ArchiveFile::OpenStream(const char *pathname, - Mutex &mutex, Cond &cond) + Mutex &mutex) { auto statbuf = iso9660_ifs_stat_translate(iso->iso, pathname); if (statbuf == nullptr) throw FormatRuntimeError("not found in the ISO file: %s", pathname); - return std::make_unique<Iso9660InputStream>(iso, pathname, mutex, cond, + return std::make_unique<Iso9660InputStream>(iso, pathname, mutex, statbuf); } diff --git a/src/archive/plugins/ZzipArchivePlugin.cxx b/src/archive/plugins/ZzipArchivePlugin.cxx index 0fbc4cc35..18f5fa07b 100644 --- a/src/archive/plugins/ZzipArchivePlugin.cxx +++ b/src/archive/plugins/ZzipArchivePlugin.cxx @@ -60,7 +60,7 @@ public: virtual void Visit(ArchiveVisitor &visitor) override; InputStreamPtr OpenStream(const char *path, - Mutex &mutex, Cond &cond) override; + Mutex &mutex) override; }; /* archive open && listing routine */ @@ -92,9 +92,9 @@ class ZzipInputStream final : public InputStream { public: ZzipInputStream(const std::shared_ptr<ZzipDir> _dir, const char *_uri, - Mutex &_mutex, Cond &_cond, + Mutex &_mutex, ZZIP_FILE *_file) - :InputStream(_uri, _mutex, _cond), + :InputStream(_uri, _mutex), dir(_dir), file(_file) { //we are seekable (but its not recommendent to do so) seekable = true; @@ -118,7 +118,7 @@ public: InputStreamPtr ZzipArchiveFile::OpenStream(const char *pathname, - Mutex &mutex, Cond &cond) + Mutex &mutex) { ZZIP_FILE *_file = zzip_file_open(dir->dir, pathname, 0); if (_file == nullptr) @@ -126,7 +126,7 @@ ZzipArchiveFile::OpenStream(const char *pathname, pathname); return std::make_unique<ZzipInputStream>(dir, pathname, - mutex, cond, + mutex, _file); } diff --git a/src/command/FileCommands.cxx b/src/command/FileCommands.cxx index fa055618d..488c6a798 100644 --- a/src/command/FileCommands.cxx +++ b/src/command/FileCommands.cxx @@ -40,7 +40,6 @@ #include "LocateUri.hxx" #include "TimePrint.hxx" #include "thread/Mutex.hxx" -#include "thread/Cond.hxx" #include <assert.h> #include <inttypes.h> /* for PRIu64 */ @@ -244,7 +243,7 @@ handle_read_comments(Client &client, Request args, Response &r) * opened file or #nullptr on failure. */ static InputStreamPtr -find_stream_art(const char *directory, Mutex &mutex, Cond &cond) +find_stream_art(const char *directory, Mutex &mutex) { static constexpr char const * art_names[] = { "cover.png", @@ -257,7 +256,7 @@ find_stream_art(const char *directory, Mutex &mutex, Cond &cond) std::string art_file = PathTraitsUTF8::Build(directory, name); try { - return InputStream::OpenReady(art_file.c_str(), mutex, cond); + return InputStream::OpenReady(art_file.c_str(), mutex); } catch (const std::exception &e) {} } return nullptr; @@ -269,9 +268,8 @@ read_stream_art(Response &r, const char *uri, size_t offset) std::string art_directory = PathTraitsUTF8::GetParent(uri); Mutex mutex; - Cond cond; - InputStreamPtr is = find_stream_art(art_directory.c_str(), mutex, cond); + InputStreamPtr is = find_stream_art(art_directory.c_str(), mutex); if (is == nullptr) { r.Error(ACK_ERROR_NO_EXIST, "No file exists"); diff --git a/src/decoder/Bridge.cxx b/src/decoder/Bridge.cxx index d8495de92..0dbaf49af 100644 --- a/src/decoder/Bridge.cxx +++ b/src/decoder/Bridge.cxx @@ -370,7 +370,8 @@ DecoderBridge::OpenUri(const char *uri) Mutex &mutex = dc.mutex; Cond &cond = dc.cond; - auto is = InputStream::Open(uri, mutex, cond); + auto is = InputStream::Open(uri, mutex); + is->SetHandler(&dc); const std::lock_guard<Mutex> lock(mutex); while (true) { @@ -404,7 +405,7 @@ try { if (is.IsAvailable()) break; - is.cond.wait(is.mutex); + dc.cond.wait(is.mutex); } size_t nbytes = is.Read(buffer, length); diff --git a/src/decoder/DecoderControl.hxx b/src/decoder/DecoderControl.hxx index 1d792520d..e77237481 100644 --- a/src/decoder/DecoderControl.hxx +++ b/src/decoder/DecoderControl.hxx @@ -23,6 +23,7 @@ #include "DecoderCommand.hxx" #include "AudioFormat.hxx" #include "MixRampInfo.hxx" +#include "input/Handler.hxx" #include "thread/Mutex.hxx" #include "thread/Cond.hxx" #include "thread/Thread.hxx" @@ -60,7 +61,7 @@ enum class DecoderState : uint8_t { ERROR, }; -struct DecoderControl { +struct DecoderControl final : InputStreamHandler { /** * The handle of the decoder thread. */ @@ -422,6 +423,15 @@ public: private: void RunThread() noexcept; + + /* virtual methods from class InputStreamHandler */ + void OnInputStreamReady() noexcept override { + cond.signal(); + } + + void OnInputStreamAvailable() noexcept override { + cond.signal(); + } }; #endif diff --git a/src/decoder/DecoderThread.cxx b/src/decoder/DecoderThread.cxx index 1f08cc1ff..7b5b764cb 100644 --- a/src/decoder/DecoderThread.cxx +++ b/src/decoder/DecoderThread.cxx @@ -56,7 +56,8 @@ static constexpr Domain decoder_thread_domain("decoder_thread"); static InputStreamPtr decoder_input_stream_open(DecoderControl &dc, const char *uri) { - auto is = InputStream::Open(uri, dc.mutex, dc.cond); + auto is = InputStream::Open(uri, dc.mutex); + is->SetHandler(&dc); /* wait for the input stream to become ready; its metadata will be available then */ @@ -81,7 +82,7 @@ decoder_input_stream_open(DecoderControl &dc, const char *uri) static InputStreamPtr decoder_input_stream_open(DecoderControl &dc, Path path) { - auto is = OpenLocalInputStream(path, dc.mutex, dc.cond); + auto is = OpenLocalInputStream(path, dc.mutex); assert(is->IsReady()); diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx index 772a87df0..72decc465 100644 --- a/src/input/AsyncInputStream.cxx +++ b/src/input/AsyncInputStream.cxx @@ -19,6 +19,7 @@ #include "config.h" #include "AsyncInputStream.hxx" +#include "CondHandler.hxx" #include "tag/Tag.hxx" #include "thread/Cond.hxx" #include "event/Loop.hxx" @@ -29,10 +30,10 @@ #include <string.h> AsyncInputStream::AsyncInputStream(EventLoop &event_loop, const char *_url, - Mutex &_mutex, Cond &_cond, + Mutex &_mutex, size_t _buffer_size, size_t _resume_at) - :InputStream(_url, _mutex, _cond), + :InputStream(_url, _mutex), deferred_resume(event_loop, BIND_THIS_METHOD(DeferredResume)), deferred_seek(event_loop, BIND_THIS_METHOD(DeferredSeek)), allocation(_buffer_size), @@ -133,8 +134,10 @@ AsyncInputStream::Seek(offset_type new_offset) deferred_seek.Schedule(); + CondInputStreamHandler cond_handler; + const ScopeExchangeInputStreamHandler h(*this, &cond_handler); while (seek_state != SeekState::NONE) - cond.wait(mutex); + cond_handler.cond.wait(mutex); Check(); } @@ -151,7 +154,7 @@ AsyncInputStream::SeekDone() noexcept open = true; seek_state = SeekState::NONE; - cond.broadcast(); + InvokeOnAvailable(); } std::unique_ptr<Tag> @@ -173,6 +176,8 @@ AsyncInputStream::Read(void *ptr, size_t read_size) { assert(!GetEventLoop().IsInside()); + CondInputStreamHandler cond_handler; + /* wait for data */ CircularBuffer<uint8_t>::Range r; while (true) { @@ -182,7 +187,8 @@ AsyncInputStream::Read(void *ptr, size_t read_size) if (!r.empty() || IsEOF()) break; - cond.wait(mutex); + const ScopeExchangeInputStreamHandler h(*this, &cond_handler); + cond_handler.cond.wait(mutex); } const size_t nbytes = std::min(read_size, r.size); @@ -205,7 +211,7 @@ AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept if (!IsReady()) SetReady(); else - cond.broadcast(); + InvokeOnAvailable(); } void @@ -231,7 +237,7 @@ AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) noexcept if (!IsReady()) SetReady(); else - cond.broadcast(); + InvokeOnAvailable(); } void @@ -243,7 +249,7 @@ AsyncInputStream::DeferredResume() noexcept Resume(); } catch (...) { postponed_exception = std::current_exception(); - cond.broadcast(); + InvokeOnAvailable(); } } @@ -265,6 +271,6 @@ AsyncInputStream::DeferredSeek() noexcept } catch (...) { seek_state = SeekState::NONE; postponed_exception = std::current_exception(); - cond.broadcast(); + InvokeOnAvailable(); } } diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx index 117ebfc3a..c4bc1aca4 100644 --- a/src/input/AsyncInputStream.hxx +++ b/src/input/AsyncInputStream.hxx @@ -70,7 +70,7 @@ protected: public: AsyncInputStream(EventLoop &event_loop, const char *_url, - Mutex &_mutex, Cond &_cond, + Mutex &_mutex, size_t _buffer_size, size_t _resume_at); diff --git a/src/input/CondHandler.hxx b/src/input/CondHandler.hxx new file mode 100644 index 000000000..70538d25d --- /dev/null +++ b/src/input/CondHandler.hxx @@ -0,0 +1,43 @@ +/* + * Copyright 2003-2018 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_COND_INPUT_STREAM_HANDLER_HXX +#define MPD_COND_INPUT_STREAM_HANDLER_HXX + +#include "check.h" +#include "Handler.hxx" +#include "thread/Cond.hxx" + +/** + * An #InputStreamHandler implementation which signals a #Cond. + */ +struct CondInputStreamHandler final : InputStreamHandler { + Cond cond; + + /* virtual methods from class InputStreamHandler */ + void OnInputStreamReady() noexcept override { + cond.signal(); + } + + void OnInputStreamAvailable() noexcept override { + cond.signal(); + } +}; + +#endif diff --git a/src/input/FailingInputStream.hxx b/src/input/FailingInputStream.hxx index a5b37d00e..d8436f195 100644 --- a/src/input/FailingInputStream.hxx +++ b/src/input/FailingInputStream.hxx @@ -35,8 +35,8 @@ class FailingInputStream final : public InputStream { public: explicit FailingInputStream(const char *_uri, const std::exception_ptr _error, - Mutex &_mutex, Cond &_cond) noexcept - :InputStream(_uri, _mutex, _cond), error(_error) { + Mutex &_mutex) noexcept + :InputStream(_uri, _mutex), error(_error) { SetReady(); } diff --git a/src/input/Handler.hxx b/src/input/Handler.hxx new file mode 100644 index 000000000..533e51518 --- /dev/null +++ b/src/input/Handler.hxx @@ -0,0 +1,53 @@ +/* + * Copyright 2003-2018 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_INPUT_STREAM_HANDLER_HXX +#define MPD_INPUT_STREAM_HANDLER_HXX + +#include "check.h" + +/** + * An interface which gets receives events from an #InputStream. Its + * methods will be called from within an arbitrary thread and must not + * block. + * + * A reference to an instance is passed to the #InputStream, but it + * remains owned by the caller. + */ +class InputStreamHandler { +public: + /** + * Called when InputStream::IsReady() becomes true. + * + * Before querying metadata from the #InputStream, + * InputStream::Update() must be called. + * + * Caller locks InputStream::mutex. + */ + virtual void OnInputStreamReady() noexcept = 0; + + /** + * Called when InputStream::IsAvailable() becomes true. + * + * Caller locks InputStream::mutex. + */ + virtual void OnInputStreamAvailable() noexcept = 0; +}; + +#endif diff --git a/src/input/InputPlugin.hxx b/src/input/InputPlugin.hxx index 7e278f4ae..0a235f51a 100644 --- a/src/input/InputPlugin.hxx +++ b/src/input/InputPlugin.hxx @@ -24,7 +24,6 @@ struct ConfigBlock; class Mutex; -class Cond; class EventLoop; class RemoteTagScanner; class RemoteTagHandler; @@ -54,8 +53,7 @@ struct InputPlugin { * * Throws std::runtime_error on error. */ - InputStreamPtr (*open)(const char *uri, - Mutex &mutex, Cond &cond); + InputStreamPtr (*open)(const char *uri, Mutex &mutex); /** * Prepare a #RemoteTagScanner. The operation must be started diff --git a/src/input/InputStream.cxx b/src/input/InputStream.cxx index c1d6b8c08..6dd53456d 100644 --- a/src/input/InputStream.cxx +++ b/src/input/InputStream.cxx @@ -19,8 +19,8 @@ #include "config.h" #include "InputStream.hxx" +#include "Handler.hxx" #include "tag/Tag.hxx" -#include "thread/Cond.hxx" #include "util/StringCompare.hxx" #include <stdexcept> @@ -47,26 +47,8 @@ InputStream::SetReady() noexcept assert(!ready); ready = true; - cond.broadcast(); -} -void -InputStream::WaitReady() noexcept -{ - while (true) { - Update(); - if (ready) - break; - - cond.wait(mutex); - } -} - -void -InputStream::LockWaitReady() noexcept -{ - const std::lock_guard<Mutex> protect(mutex); - WaitReady(); + InvokeOnReady(); } /** @@ -177,3 +159,17 @@ InputStream::LockIsEOF() noexcept const std::lock_guard<Mutex> protect(mutex); return IsEOF(); } + +void +InputStream::InvokeOnReady() noexcept +{ + if (handler != nullptr) + handler->OnInputStreamReady(); +} + +void +InputStream::InvokeOnAvailable() noexcept +{ + if (handler != nullptr) + handler->OnInputStreamAvailable(); +} diff --git a/src/input/InputStream.hxx b/src/input/InputStream.hxx index f31c63ae4..331ec96bb 100644 --- a/src/input/InputStream.hxx +++ b/src/input/InputStream.hxx @@ -31,8 +31,8 @@ #include <assert.h> -class Cond; struct Tag; +class InputStreamHandler; class InputStream { public: @@ -55,6 +55,7 @@ public: */ Mutex &mutex; +private: /** * A cond that gets signalled when the state of this object * changes from the I/O thread. The client of this object may @@ -63,7 +64,7 @@ public: * This object is allocated by the client, and the client is * responsible for freeing it. */ - Cond &cond; + InputStreamHandler *handler = nullptr; protected: /** @@ -96,9 +97,9 @@ private: std::string mime; public: - InputStream(const char *_uri, Mutex &_mutex, Cond &_cond) noexcept + InputStream(const char *_uri, Mutex &_mutex) noexcept :uri(_uri), - mutex(_mutex), cond(_cond) { + mutex(_mutex) { assert(_uri != nullptr); } @@ -122,16 +123,33 @@ public: * notifications * @return an #InputStream object on success */ - gcc_nonnull_all - static InputStreamPtr Open(const char *uri, Mutex &mutex, Cond &cond); + gcc_nonnull(1) + static InputStreamPtr Open(const char *uri, Mutex &mutex); /** * Just like Open(), but waits for the stream to become ready. * It is a wrapper for Open(), WaitReady() and Check(). */ - gcc_nonnull_all - static InputStreamPtr OpenReady(const char *uri, - Mutex &mutex, Cond &cond); + gcc_nonnull(1) + static InputStreamPtr OpenReady(const char *uri, Mutex &mutex); + + /** + * Install a new handler. + * + * The caller must lock the mutex. + */ + void SetHandler(InputStreamHandler *new_handler) noexcept { + handler = new_handler; + } + + /** + * Install a new handler and return the old one. + * + * The caller must lock the mutex. + */ + InputStreamHandler *ExchangeHandler(InputStreamHandler *new_handler) noexcept { + return std::exchange(handler, new_handler); + } /** * The absolute URI which was used to open this stream. @@ -166,14 +184,6 @@ public: return ready; } - void WaitReady() noexcept; - - /** - * Wrapper for WaitReady() which locks and unlocks the mutex; - * the caller must not be holding it already. - */ - void LockWaitReady() noexcept; - gcc_pure bool HasMimeType() const noexcept { assert(ready); @@ -380,6 +390,30 @@ public: */ gcc_nonnull_all void LockReadFull(void *ptr, size_t size); + +protected: + void InvokeOnReady() noexcept; + void InvokeOnAvailable() noexcept; +}; + +/** + * Install an #InputStreamHandler during the scope in which this + * variable lives, and restore the old handler afterwards. + */ +class ScopeExchangeInputStreamHandler { + InputStream &is; + InputStreamHandler *const old_handler; + +public: + ScopeExchangeInputStreamHandler(InputStream &_is, + InputStreamHandler *new_handler) noexcept + :is(_is), old_handler(is.ExchangeHandler(new_handler)) {} + + ScopeExchangeInputStreamHandler(const ScopeExchangeInputStreamHandler &) = delete; + + ~ScopeExchangeInputStreamHandler() noexcept { + is.SetHandler(old_handler); + } }; #endif diff --git a/src/input/LocalOpen.cxx b/src/input/LocalOpen.cxx index 06129f2a5..a5043fe22 100644 --- a/src/input/LocalOpen.cxx +++ b/src/input/LocalOpen.cxx @@ -32,20 +32,20 @@ #include <assert.h> InputStreamPtr -OpenLocalInputStream(Path path, Mutex &mutex, Cond &cond) +OpenLocalInputStream(Path path, Mutex &mutex) { InputStreamPtr is; #ifdef ENABLE_ARCHIVE try { #endif - is = OpenFileInputStream(path, mutex, cond); + is = OpenFileInputStream(path, mutex); #ifdef ENABLE_ARCHIVE } catch (const std::system_error &e) { if (IsPathNotFound(e)) { /* ENOTDIR means this may be a path inside an archive file */ - is = OpenArchiveInputStream(path, mutex, cond); + is = OpenArchiveInputStream(path, mutex); if (!is) throw; } else diff --git a/src/input/LocalOpen.hxx b/src/input/LocalOpen.hxx index e73b3ee00..902b78c30 100644 --- a/src/input/LocalOpen.hxx +++ b/src/input/LocalOpen.hxx @@ -25,7 +25,6 @@ class Path; class Mutex; -class Cond; /** * Open a "local" file. This is a wrapper for the input plugins @@ -34,6 +33,6 @@ class Cond; * Throws std::runtime_error on error. */ InputStreamPtr -OpenLocalInputStream(Path path, Mutex &mutex, Cond &cond); +OpenLocalInputStream(Path path, Mutex &mutex); #endif diff --git a/src/input/Open.cxx b/src/input/Open.cxx index 9c136b064..22400c142 100644 --- a/src/input/Open.cxx +++ b/src/input/Open.cxx @@ -22,6 +22,7 @@ #include "Registry.hxx" #include "InputPlugin.hxx" #include "LocalOpen.hxx" +#include "CondHandler.hxx" #include "RewindInputStream.hxx" #include "fs/Traits.hxx" #include "fs/AllocatedPath.hxx" @@ -29,16 +30,15 @@ #include <stdexcept> InputStreamPtr -InputStream::Open(const char *url, - Mutex &mutex, Cond &cond) +InputStream::Open(const char *url, Mutex &mutex) { if (PathTraitsUTF8::IsAbsolute(url)) { const auto path = AllocatedPath::FromUTF8Throw(url); - return OpenLocalInputStream(path, mutex, cond); + return OpenLocalInputStream(path, mutex); } input_plugins_for_each_enabled(plugin) { - auto is = plugin->open(url, mutex, cond); + auto is = plugin->open(url, mutex); if (is != nullptr) return input_rewind_open(std::move(is)); } @@ -47,16 +47,27 @@ InputStream::Open(const char *url, } InputStreamPtr -InputStream::OpenReady(const char *uri, - Mutex &mutex, Cond &cond) +InputStream::OpenReady(const char *uri, Mutex &mutex) { - auto is = Open(uri, mutex, cond); + CondInputStreamHandler handler; + + auto is = Open(uri, mutex); + is->SetHandler(&handler); { const std::lock_guard<Mutex> protect(mutex); - is->WaitReady(); + + while (true) { + is->Update(); + if (is->IsReady()) + break; + + handler.cond.wait(mutex); + } + is->Check(); } + is->SetHandler(nullptr); return is; } diff --git a/src/input/ProxyInputStream.cxx b/src/input/ProxyInputStream.cxx index 86d2e82aa..d78664173 100644 --- a/src/input/ProxyInputStream.cxx +++ b/src/input/ProxyInputStream.cxx @@ -20,15 +20,16 @@ #include "config.h" #include "ProxyInputStream.hxx" #include "tag/Tag.hxx" -#include "thread/Cond.hxx" #include <stdexcept> ProxyInputStream::ProxyInputStream(InputStreamPtr _input) noexcept - :InputStream(_input->GetURI(), _input->mutex, _input->cond), + :InputStream(_input->GetURI(), _input->mutex), input(std::move(_input)) { assert(input); + + input->SetHandler(this); } ProxyInputStream::~ProxyInputStream() noexcept = default; @@ -40,10 +41,13 @@ ProxyInputStream::SetInput(InputStreamPtr _input) noexcept assert(_input); input = std::move(_input); + input->SetHandler(this); /* this call wakes up client threads if the new input is ready */ CopyAttributes(); + + set_input_cond.signal(); } void @@ -89,7 +93,7 @@ void ProxyInputStream::Seek(offset_type new_offset) { while (!input) - cond.wait(mutex); + set_input_cond.wait(mutex); input->Seek(new_offset); CopyAttributes(); @@ -120,7 +124,7 @@ size_t ProxyInputStream::Read(void *ptr, size_t read_size) { while (!input) - cond.wait(mutex); + set_input_cond.wait(mutex); size_t nbytes = input->Read(ptr, read_size); CopyAttributes(); diff --git a/src/input/ProxyInputStream.hxx b/src/input/ProxyInputStream.hxx index e55330a64..8dbc7f701 100644 --- a/src/input/ProxyInputStream.hxx +++ b/src/input/ProxyInputStream.hxx @@ -22,6 +22,8 @@ #include "InputStream.hxx" #include "Ptr.hxx" +#include "Handler.hxx" +#include "thread/Cond.hxx" struct Tag; @@ -33,7 +35,9 @@ struct Tag; * The inner #InputStream instance may be nullptr initially, to be set * later. */ -class ProxyInputStream : public InputStream { +class ProxyInputStream : public InputStream, protected InputStreamHandler { + Cond set_input_cond; + protected: InputStreamPtr input; @@ -45,8 +49,8 @@ public: * Once that instance becomes available, call SetInput(). */ ProxyInputStream(const char *_uri, - Mutex &_mutex, Cond &_cond) noexcept - :InputStream(_uri, _mutex, _cond) {} + Mutex &_mutex) noexcept + :InputStream(_uri, _mutex) {} virtual ~ProxyInputStream() noexcept; @@ -78,6 +82,15 @@ protected: * attributes. */ void CopyAttributes(); + + /* virtual methods from class InputStreamHandler */ + void OnInputStreamReady() noexcept override { + InvokeOnReady(); + } + + void OnInputStreamAvailable() noexcept override { + InvokeOnAvailable(); + } }; #endif diff --git a/src/input/ThreadInputStream.cxx b/src/input/ThreadInputStream.cxx index f33f8dbd7..1c7357262 100644 --- a/src/input/ThreadInputStream.cxx +++ b/src/input/ThreadInputStream.cxx @@ -19,6 +19,7 @@ #include "config.h" #include "ThreadInputStream.hxx" +#include "CondHandler.hxx" #include "thread/Name.hxx" #include <assert.h> @@ -26,9 +27,9 @@ ThreadInputStream::ThreadInputStream(const char *_plugin, const char *_uri, - Mutex &_mutex, Cond &_cond, + Mutex &_mutex, size_t _buffer_size) noexcept - :InputStream(_uri, _mutex, _cond), + :InputStream(_uri, _mutex), plugin(_plugin), thread(BIND_THIS_METHOD(ThreadFunc)), allocation(_buffer_size), @@ -94,11 +95,11 @@ ThreadInputStream::ThreadFunc() noexcept nbytes = ThreadRead(w.data, w.size); } catch (...) { postponed_exception = std::current_exception(); - cond.broadcast(); + InvokeOnAvailable(); break; } - cond.broadcast(); + InvokeOnAvailable(); if (nbytes == 0) { eof = true; @@ -134,6 +135,8 @@ ThreadInputStream::Read(void *ptr, size_t read_size) { assert(!thread.IsInside()); + CondInputStreamHandler cond_handler; + while (true) { if (postponed_exception) std::rethrow_exception(postponed_exception); @@ -151,7 +154,8 @@ ThreadInputStream::Read(void *ptr, size_t read_size) if (eof) return 0; - cond.wait(mutex); + const ScopeExchangeInputStreamHandler h(*this, &cond_handler); + cond_handler.cond.wait(mutex); } } diff --git a/src/input/ThreadInputStream.hxx b/src/input/ThreadInputStream.hxx index 4883dae24..cb97ac771 100644 --- a/src/input/ThreadInputStream.hxx +++ b/src/input/ThreadInputStream.hxx @@ -76,7 +76,7 @@ class ThreadInputStream : public InputStream { public: ThreadInputStream(const char *_plugin, - const char *_uri, Mutex &_mutex, Cond &_cond, + const char *_uri, Mutex &_mutex, size_t _buffer_size) noexcept; #ifndef NDEBUG diff --git a/src/input/plugins/AlsaInputPlugin.cxx b/src/input/plugins/AlsaInputPlugin.cxx index ee8fdf55a..0cfdb59f5 100644 --- a/src/input/plugins/AlsaInputPlugin.cxx +++ b/src/input/plugins/AlsaInputPlugin.cxx @@ -75,10 +75,10 @@ class AlsaInputStream final public: AlsaInputStream(EventLoop &_loop, - const char *_uri, Mutex &_mutex, Cond &_cond, + const char *_uri, Mutex &_mutex, const char *_device, snd_pcm_t *_handle, int _frame_size) - :AsyncInputStream(_loop, _uri, _mutex, _cond, + :AsyncInputStream(_loop, _uri, _mutex, ALSA_MAX_BUFFERED, ALSA_RESUME_AT), MultiSocketMonitor(_loop), device(_device), @@ -111,7 +111,7 @@ public: } static InputStreamPtr Create(EventLoop &event_loop, const char *uri, - Mutex &mutex, Cond &cond); + Mutex &mutex); protected: /* virtual methods from AsyncInputStream */ @@ -148,7 +148,7 @@ private: inline InputStreamPtr AlsaInputStream::Create(EventLoop &event_loop, const char *uri, - Mutex &mutex, Cond &cond) + Mutex &mutex) { const char *device = StringAfterPrefix(uri, "alsa://"); if (device == nullptr) @@ -168,7 +168,7 @@ AlsaInputStream::Create(EventLoop &event_loop, const char *uri, int frame_size = snd_pcm_format_width(format) / 8 * channels; return std::make_unique<AlsaInputStream>(event_loop, - uri, mutex, cond, + uri, mutex, device, handle, frame_size); } @@ -204,7 +204,7 @@ AlsaInputStream::DispatchSockets() noexcept if (Recover(n_frames) < 0) { postponed_exception = std::make_exception_ptr(std::runtime_error("PCM error - stream aborted")); - cond.broadcast(); + InvokeOnAvailable(); return; } } @@ -403,10 +403,10 @@ alsa_input_init(EventLoop &event_loop, const ConfigBlock &) } static InputStreamPtr -alsa_input_open(const char *uri, Mutex &mutex, Cond &cond) +alsa_input_open(const char *uri, Mutex &mutex) { return AlsaInputStream::Create(*alsa_input_event_loop, uri, - mutex, cond); + mutex); } const struct InputPlugin input_plugin_alsa = { diff --git a/src/input/plugins/ArchiveInputPlugin.cxx b/src/input/plugins/ArchiveInputPlugin.cxx index 9c26c7939..cfede2253 100644 --- a/src/input/plugins/ArchiveInputPlugin.cxx +++ b/src/input/plugins/ArchiveInputPlugin.cxx @@ -35,7 +35,7 @@ #include <stdlib.h> InputStreamPtr -OpenArchiveInputStream(Path path, Mutex &mutex, Cond &cond) +OpenArchiveInputStream(Path path, Mutex &mutex) { const ArchivePlugin *arplug; @@ -61,5 +61,5 @@ OpenArchiveInputStream(Path path, Mutex &mutex, Cond &cond) } return archive_file_open(arplug, Path::FromFS(archive)) - ->OpenStream(filename, mutex, cond); + ->OpenStream(filename, mutex); } diff --git a/src/input/plugins/ArchiveInputPlugin.hxx b/src/input/plugins/ArchiveInputPlugin.hxx index 5e8934052..9aa2b0bfe 100644 --- a/src/input/plugins/ArchiveInputPlugin.hxx +++ b/src/input/plugins/ArchiveInputPlugin.hxx @@ -24,9 +24,8 @@ class Path; class Mutex; -class Cond; InputStreamPtr -OpenArchiveInputStream(Path path, Mutex &mutex, Cond &cond); +OpenArchiveInputStream(Path path, Mutex &mutex); #endif diff --git a/src/input/plugins/CdioParanoiaInputPlugin.cxx b/src/input/plugins/CdioParanoiaInputPlugin.cxx index d81577bb7..22121efc9 100644 --- a/src/input/plugins/CdioParanoiaInputPlugin.cxx +++ b/src/input/plugins/CdioParanoiaInputPlugin.cxx @@ -61,11 +61,11 @@ class CdioParanoiaInputStream final : public InputStream { int buffer_lsn; public: - CdioParanoiaInputStream(const char *_uri, Mutex &_mutex, Cond &_cond, + CdioParanoiaInputStream(const char *_uri, Mutex &_mutex, cdrom_drive_t *_drv, CdIo_t *_cdio, bool reverse_endian, lsn_t _lsn_from, lsn_t _lsn_to) - :InputStream(_uri, _mutex, _cond), + :InputStream(_uri, _mutex), drv(_drv), cdio(_cdio), para(cdio_paranoia_init(drv)), lsn_from(_lsn_from), lsn_to(_lsn_to), lsn_relofs(0), @@ -184,7 +184,7 @@ cdio_detect_device(void) static InputStreamPtr input_cdio_open(const char *uri, - Mutex &mutex, Cond &cond) + Mutex &mutex) { struct cdio_uri parsed_uri; if (!parse_cdio_uri(&parsed_uri, uri)) @@ -250,7 +250,7 @@ input_cdio_open(const char *uri, lsn_to = cdio_get_disc_last_lsn(cdio); } - return std::make_unique<CdioParanoiaInputStream>(uri, mutex, cond, + return std::make_unique<CdioParanoiaInputStream>(uri, mutex, drv, cdio, reverse_endian, lsn_from, lsn_to); diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx index 69876a463..1cca33aa6 100644 --- a/src/input/plugins/CurlInputPlugin.cxx +++ b/src/input/plugins/CurlInputPlugin.cxx @@ -35,7 +35,6 @@ #include "tag/Tag.hxx" #include "event/Call.hxx" #include "event/Loop.hxx" -#include "thread/Cond.hxx" #include "util/ASCII.hxx" #include "util/StringUtil.hxx" #include "util/StringFormat.hxx" @@ -83,7 +82,7 @@ public: CurlInputStream(EventLoop &event_loop, const char *_url, const std::multimap<std::string, std::string> &headers, I &&_icy, - Mutex &_mutex, Cond &_cond); + Mutex &_mutex); ~CurlInputStream() noexcept; @@ -92,7 +91,7 @@ public: static InputStreamPtr Open(const char *url, const std::multimap<std::string, std::string> &headers, - Mutex &mutex, Cond &cond); + Mutex &mutex); private: /** @@ -274,7 +273,7 @@ void CurlInputStream::OnEnd() { const std::lock_guard<Mutex> protect(mutex); - cond.broadcast(); + InvokeOnAvailable(); AsyncInputStream::SetClosed(); } @@ -290,7 +289,7 @@ CurlInputStream::OnError(std::exception_ptr e) noexcept else if (!IsReady()) SetReady(); else - cond.broadcast(); + InvokeOnAvailable(); AsyncInputStream::SetClosed(); } @@ -352,8 +351,8 @@ inline CurlInputStream::CurlInputStream(EventLoop &event_loop, const char *_url, const std::multimap<std::string, std::string> &headers, I &&_icy, - Mutex &_mutex, Cond &_cond) - :AsyncInputStream(event_loop, _url, _mutex, _cond, + Mutex &_mutex) + :AsyncInputStream(event_loop, _url, _mutex, CURL_MAX_BUFFERED, CURL_RESUME_AT), icy(std::forward<I>(_icy)) @@ -445,14 +444,14 @@ CurlInputStream::DoSeek(offset_type new_offset) inline InputStreamPtr CurlInputStream::Open(const char *url, const std::multimap<std::string, std::string> &headers, - Mutex &mutex, Cond &cond) + Mutex &mutex) { auto icy = std::make_shared<IcyMetaDataParser>(); auto c = std::make_unique<CurlInputStream>((*curl_init)->GetEventLoop(), url, headers, icy, - mutex, cond); + mutex); BlockingCall(c->GetEventLoop(), [&c](){ c->InitEasy(); @@ -465,19 +464,19 @@ CurlInputStream::Open(const char *url, InputStreamPtr OpenCurlInputStream(const char *uri, const std::multimap<std::string, std::string> &headers, - Mutex &mutex, Cond &cond) + Mutex &mutex) { - return CurlInputStream::Open(uri, headers, mutex, cond); + return CurlInputStream::Open(uri, headers, mutex); } static InputStreamPtr -input_curl_open(const char *url, Mutex &mutex, Cond &cond) +input_curl_open(const char *url, Mutex &mutex) { if (strncmp(url, "http://", 7) != 0 && strncmp(url, "https://", 8) != 0) return nullptr; - return CurlInputStream::Open(url, {}, mutex, cond); + return CurlInputStream::Open(url, {}, mutex); } const struct InputPlugin input_plugin_curl = { diff --git a/src/input/plugins/CurlInputPlugin.hxx b/src/input/plugins/CurlInputPlugin.hxx index 478acd43f..bd64f09d3 100644 --- a/src/input/plugins/CurlInputPlugin.hxx +++ b/src/input/plugins/CurlInputPlugin.hxx @@ -26,7 +26,6 @@ #include <map> class Mutex; -class Cond; extern const struct InputPlugin input_plugin_curl; @@ -40,6 +39,6 @@ extern const struct InputPlugin input_plugin_curl; InputStreamPtr OpenCurlInputStream(const char *uri, const std::multimap<std::string, std::string> &headers, - Mutex &mutex, Cond &cond); + Mutex &mutex); #endif diff --git a/src/input/plugins/FfmpegInputPlugin.cxx b/src/input/plugins/FfmpegInputPlugin.cxx index b78d1dd13..9a8dc9833 100644 --- a/src/input/plugins/FfmpegInputPlugin.cxx +++ b/src/input/plugins/FfmpegInputPlugin.cxx @@ -39,9 +39,9 @@ struct FfmpegInputStream final : public InputStream { bool eof; - FfmpegInputStream(const char *_uri, Mutex &_mutex, Cond &_cond, + FfmpegInputStream(const char *_uri, Mutex &_mutex, AVIOContext *_h) - :InputStream(_uri, _mutex, _cond), + :InputStream(_uri, _mutex), h(_h), eof(false) { seekable = (h->seekable & AVIO_SEEKABLE_NORMAL) != 0; size = avio_size(h); @@ -83,7 +83,7 @@ input_ffmpeg_init(EventLoop &, const ConfigBlock &) static InputStreamPtr input_ffmpeg_open(const char *uri, - Mutex &mutex, Cond &cond) + Mutex &mutex) { if (!StringStartsWith(uri, "gopher://") && !StringStartsWith(uri, "rtp://") && @@ -98,7 +98,7 @@ input_ffmpeg_open(const char *uri, if (result != 0) throw MakeFfmpegError(result); - return std::make_unique<FfmpegInputStream>(uri, mutex, cond, h); + return std::make_unique<FfmpegInputStream>(uri, mutex, h); } size_t diff --git a/src/input/plugins/FileInputPlugin.cxx b/src/input/plugins/FileInputPlugin.cxx index cc8e62413..f757c9a93 100644 --- a/src/input/plugins/FileInputPlugin.cxx +++ b/src/input/plugins/FileInputPlugin.cxx @@ -35,8 +35,8 @@ class FileInputStream final : public InputStream { public: FileInputStream(const char *path, FileReader &&_reader, off_t _size, - Mutex &_mutex, Cond &_cond) - :InputStream(path, _mutex, _cond), + Mutex &_mutex) + :InputStream(path, _mutex), reader(std::move(_reader)) { size = _size; seekable = true; @@ -54,8 +54,7 @@ public: }; InputStreamPtr -OpenFileInputStream(Path path, - Mutex &mutex, Cond &cond) +OpenFileInputStream(Path path, Mutex &mutex) { FileReader reader(path); @@ -75,7 +74,7 @@ OpenFileInputStream(Path path, return std::make_unique<FileInputStream>(path.ToUTF8().c_str(), std::move(reader), info.GetSize(), - mutex, cond); + mutex); } void diff --git a/src/input/plugins/FileInputPlugin.hxx b/src/input/plugins/FileInputPlugin.hxx index 4a5528185..867564d3c 100644 --- a/src/input/plugins/FileInputPlugin.hxx +++ b/src/input/plugins/FileInputPlugin.hxx @@ -24,10 +24,8 @@ class Path; class Mutex; -class Cond; InputStreamPtr -OpenFileInputStream(Path path, - Mutex &mutex, Cond &cond); +OpenFileInputStream(Path path, Mutex &mutex); #endif diff --git a/src/input/plugins/MmsInputPlugin.cxx b/src/input/plugins/MmsInputPlugin.cxx index de35e2f15..87cbcc520 100644 --- a/src/input/plugins/MmsInputPlugin.cxx +++ b/src/input/plugins/MmsInputPlugin.cxx @@ -34,8 +34,8 @@ class MmsInputStream final : public ThreadInputStream { mmsx_t *mms; public: - MmsInputStream(const char *_uri, Mutex &_mutex, Cond &_cond) - :ThreadInputStream(input_plugin_mms.name, _uri, _mutex, _cond, + MmsInputStream(const char *_uri, Mutex &_mutex) + :ThreadInputStream(input_plugin_mms.name, _uri, _mutex, MMS_BUFFER_SIZE) { } @@ -70,7 +70,7 @@ MmsInputStream::Open() static InputStreamPtr input_mms_open(const char *url, - Mutex &mutex, Cond &cond) + Mutex &mutex) { if (!StringStartsWith(url, "mms://") && !StringStartsWith(url, "mmsh://") && @@ -78,7 +78,7 @@ input_mms_open(const char *url, !StringStartsWith(url, "mmsu://")) return nullptr; - auto m = std::make_unique<MmsInputStream>(url, mutex, cond); + auto m = std::make_unique<MmsInputStream>(url, mutex); m->Start(); return m; } diff --git a/src/input/plugins/NfsInputPlugin.cxx b/src/input/plugins/NfsInputPlugin.cxx index ffee929b6..35ee78f4c 100644 --- a/src/input/plugins/NfsInputPlugin.cxx +++ b/src/input/plugins/NfsInputPlugin.cxx @@ -23,7 +23,6 @@ #include "../InputPlugin.hxx" #include "lib/nfs/Glue.hxx" #include "lib/nfs/FileReader.hxx" -#include "thread/Cond.hxx" #include "util/StringCompare.hxx" #include <string.h> @@ -46,9 +45,9 @@ class NfsInputStream final : NfsFileReader, public AsyncInputStream { bool reconnect_on_resume = false, reconnecting = false; public: - NfsInputStream(const char *_uri, Mutex &_mutex, Cond &_cond) + NfsInputStream(const char *_uri, Mutex &_mutex) :AsyncInputStream(NfsFileReader::GetEventLoop(), - _uri, _mutex, _cond, + _uri, _mutex, NFS_MAX_BUFFERED, NFS_RESUME_AT) {} @@ -100,7 +99,7 @@ NfsInputStream::DoRead() NfsFileReader::Read(next_offset, nbytes); } catch (...) { postponed_exception = std::current_exception(); - cond.broadcast(); + InvokeOnAvailable(); } } @@ -196,7 +195,7 @@ NfsInputStream::OnNfsFileError(std::exception_ptr &&e) noexcept else if (!IsReady()) SetReady(); else - cond.broadcast(); + InvokeOnAvailable(); } /* @@ -218,12 +217,12 @@ input_nfs_finish() noexcept static InputStreamPtr input_nfs_open(const char *uri, - Mutex &mutex, Cond &cond) + Mutex &mutex) { if (!StringStartsWith(uri, "nfs://")) return nullptr; - auto is = std::make_unique<NfsInputStream>(uri, mutex, cond); + auto is = std::make_unique<NfsInputStream>(uri, mutex); is->Open(); return is; } diff --git a/src/input/plugins/QobuzInputPlugin.cxx b/src/input/plugins/QobuzInputPlugin.cxx index e5c4f6d64..44bc5819b 100644 --- a/src/input/plugins/QobuzInputPlugin.cxx +++ b/src/input/plugins/QobuzInputPlugin.cxx @@ -49,8 +49,8 @@ class QobuzInputStream final public: QobuzInputStream(const char *_uri, const char *_track_id, - Mutex &_mutex, Cond &_cond) noexcept - :ProxyInputStream(_uri, _mutex, _cond), + Mutex &_mutex) noexcept + :ProxyInputStream(_uri, _mutex), track_id(_track_id) { qobuz_client->AddLoginHandler(*this); @@ -70,7 +70,7 @@ public: private: void Failed(std::exception_ptr e) { SetInput(std::make_unique<FailingInputStream>(GetURI(), e, - mutex, cond)); + mutex)); } /* virtual methods from QobuzSessionHandler */ @@ -89,11 +89,11 @@ QobuzInputStream::OnQobuzSession() noexcept try { const auto session = qobuz_client->GetSession(); - QobuzTrackHandler &handler = *this; + QobuzTrackHandler &h = *this; track_request = std::make_unique<QobuzTrackRequest>(*qobuz_client, session, track_id.c_str(), - handler); + h); track_request->Start(); } catch (...) { Failed(std::current_exception()); @@ -108,7 +108,7 @@ QobuzInputStream::OnQobuzTrackSuccess(std::string url) noexcept try { SetInput(OpenCurlInputStream(url.c_str(), {}, - mutex, cond)); + mutex)); } catch (...) { Failed(std::current_exception()); } @@ -180,7 +180,7 @@ ExtractQobuzTrackId(const char *uri) } static InputStreamPtr -OpenQobuzInput(const char *uri, Mutex &mutex, Cond &cond) +OpenQobuzInput(const char *uri, Mutex &mutex) { assert(qobuz_client != nullptr); @@ -190,7 +190,7 @@ OpenQobuzInput(const char *uri, Mutex &mutex, Cond &cond) // TODO: validate track_id - return std::make_unique<QobuzInputStream>(uri, track_id, mutex, cond); + return std::make_unique<QobuzInputStream>(uri, track_id, mutex); } static std::unique_ptr<RemoteTagScanner> diff --git a/src/input/plugins/SmbclientInputPlugin.cxx b/src/input/plugins/SmbclientInputPlugin.cxx index 517c29c2b..00b421ee4 100644 --- a/src/input/plugins/SmbclientInputPlugin.cxx +++ b/src/input/plugins/SmbclientInputPlugin.cxx @@ -37,9 +37,9 @@ class SmbclientInputStream final : public InputStream { public: SmbclientInputStream(const char *_uri, - Mutex &_mutex, Cond &_cond, + Mutex &_mutex, SMBCCTX *_ctx, int _fd, const struct stat &st) - :InputStream(_uri, _mutex, _cond), + :InputStream(_uri, _mutex), ctx(_ctx), fd(_fd) { seekable = true; size = st.st_size; @@ -85,7 +85,7 @@ input_smbclient_init(EventLoop &, const ConfigBlock &) static InputStreamPtr input_smbclient_open(const char *uri, - Mutex &mutex, Cond &cond) + Mutex &mutex) { if (!StringStartsWith(uri, "smb://")) return nullptr; @@ -119,7 +119,7 @@ input_smbclient_open(const char *uri, throw MakeErrno(e, "smbc_fstat() failed"); } - return std::make_unique<SmbclientInputStream>(uri, mutex, cond, + return std::make_unique<SmbclientInputStream>(uri, mutex, ctx, fd, st); } diff --git a/src/input/plugins/TidalInputPlugin.cxx b/src/input/plugins/TidalInputPlugin.cxx index cf10bbea6..9f28cdbda 100644 --- a/src/input/plugins/TidalInputPlugin.cxx +++ b/src/input/plugins/TidalInputPlugin.cxx @@ -60,8 +60,8 @@ class TidalInputStream final public: TidalInputStream(const char *_uri, const char *_track_id, - Mutex &_mutex, Cond &_cond) noexcept - :ProxyInputStream(_uri, _mutex, _cond), + Mutex &_mutex) noexcept + :ProxyInputStream(_uri, _mutex), track_id(_track_id) { tidal_session->AddLoginHandler(*this); @@ -81,7 +81,7 @@ public: private: void Failed(std::exception_ptr e) { SetInput(std::make_unique<FailingInputStream>(GetURI(), e, - mutex, cond)); + mutex)); } /* virtual methods from TidalSessionHandler */ @@ -98,14 +98,14 @@ TidalInputStream::OnTidalSession() noexcept const std::lock_guard<Mutex> protect(mutex); try { - TidalTrackHandler &handler = *this; + TidalTrackHandler &h = *this; track_request = std::make_unique<TidalTrackRequest>(tidal_session->GetCurl(), tidal_session->GetBaseUrl(), tidal_session->GetToken(), tidal_session->GetSession().c_str(), track_id.c_str(), tidal_audioquality, - handler); + h); track_request->Start(); } catch (...) { Failed(std::current_exception()); @@ -124,7 +124,7 @@ TidalInputStream::OnTidalTrackSuccess(std::string url) noexcept try { SetInput(OpenCurlInputStream(url.c_str(), {}, - mutex, cond)); + mutex)); } catch (...) { Failed(std::current_exception()); } @@ -211,7 +211,7 @@ ExtractTidalTrackId(const char *uri) } static InputStreamPtr -OpenTidalInput(const char *uri, Mutex &mutex, Cond &cond) +OpenTidalInput(const char *uri, Mutex &mutex) { assert(tidal_session != nullptr); @@ -221,7 +221,7 @@ OpenTidalInput(const char *uri, Mutex &mutex, Cond &cond) // TODO: validate track_id - return std::make_unique<TidalInputStream>(uri, track_id, mutex, cond); + return std::make_unique<TidalInputStream>(uri, track_id, mutex); } static std::unique_ptr<RemoteTagScanner> diff --git a/src/playlist/PlaylistAny.cxx b/src/playlist/PlaylistAny.cxx index 6c1c6367d..993c09e46 100644 --- a/src/playlist/PlaylistAny.cxx +++ b/src/playlist/PlaylistAny.cxx @@ -29,13 +29,13 @@ playlist_open_any(const char *uri, #ifdef ENABLE_DATABASE const Storage *storage, #endif - Mutex &mutex, Cond &cond) + Mutex &mutex) { return uri_has_scheme(uri) - ? playlist_open_remote(uri, mutex, cond) + ? playlist_open_remote(uri, mutex) : playlist_mapper_open(uri, #ifdef ENABLE_DATABASE storage, #endif - mutex, cond); + mutex); } diff --git a/src/playlist/PlaylistAny.hxx b/src/playlist/PlaylistAny.hxx index f2f3fb4a0..d021d20d5 100644 --- a/src/playlist/PlaylistAny.hxx +++ b/src/playlist/PlaylistAny.hxx @@ -23,7 +23,6 @@ #include <memory> class Mutex; -class Cond; class SongEnumerator; class Storage; @@ -37,6 +36,6 @@ playlist_open_any(const char *uri, #ifdef ENABLE_DATABASE const Storage *storage, #endif - Mutex &mutex, Cond &cond); + Mutex &mutex); #endif diff --git a/src/playlist/PlaylistMapper.cxx b/src/playlist/PlaylistMapper.cxx index aa79febc9..9fdc237a4 100644 --- a/src/playlist/PlaylistMapper.cxx +++ b/src/playlist/PlaylistMapper.cxx @@ -33,7 +33,7 @@ * Load a playlist from the configured playlist directory. */ static std::unique_ptr<SongEnumerator> -playlist_open_in_playlist_dir(const char *uri, Mutex &mutex, Cond &cond) +playlist_open_in_playlist_dir(const char *uri, Mutex &mutex) { assert(spl_valid_name(uri)); @@ -41,7 +41,7 @@ playlist_open_in_playlist_dir(const char *uri, Mutex &mutex, Cond &cond) if (path_fs.IsNull()) return nullptr; - return playlist_open_path(path_fs, mutex, cond); + return playlist_open_path(path_fs, mutex); } #ifdef ENABLE_DATABASE @@ -50,8 +50,7 @@ playlist_open_in_playlist_dir(const char *uri, Mutex &mutex, Cond &cond) * Load a playlist from the configured music directory. */ static std::unique_ptr<SongEnumerator> -playlist_open_in_storage(const char *uri, const Storage *storage, - Mutex &mutex, Cond &cond) +playlist_open_in_storage(const char *uri, const Storage *storage, Mutex &mutex) { assert(uri_safe_local(uri)); @@ -61,11 +60,11 @@ playlist_open_in_storage(const char *uri, const Storage *storage, { const auto path = storage->MapFS(uri); if (!path.IsNull()) - return playlist_open_path(path, mutex, cond); + return playlist_open_path(path, mutex); } const auto uri2 = storage->MapUTF8(uri); - return playlist_open_remote(uri2.c_str(), mutex, cond); + return playlist_open_remote(uri2.c_str(), mutex); } #endif @@ -75,19 +74,17 @@ playlist_mapper_open(const char *uri, #ifdef ENABLE_DATABASE const Storage *storage, #endif - Mutex &mutex, Cond &cond) + Mutex &mutex) { if (spl_valid_name(uri)) { - auto playlist = playlist_open_in_playlist_dir(uri, - mutex, cond); + auto playlist = playlist_open_in_playlist_dir(uri, mutex); if (playlist != nullptr) return playlist; } #ifdef ENABLE_DATABASE if (uri_safe_local(uri)) { - auto playlist = playlist_open_in_storage(uri, storage, - mutex, cond); + auto playlist = playlist_open_in_storage(uri, storage, mutex); if (playlist != nullptr) return playlist; } diff --git a/src/playlist/PlaylistMapper.hxx b/src/playlist/PlaylistMapper.hxx index 116faa46e..bc3345c84 100644 --- a/src/playlist/PlaylistMapper.hxx +++ b/src/playlist/PlaylistMapper.hxx @@ -25,7 +25,6 @@ #include <memory> class Mutex; -class Cond; class SongEnumerator; class Storage; @@ -38,6 +37,6 @@ playlist_mapper_open(const char *uri, #ifdef ENABLE_DATABASE const Storage *storage, #endif - Mutex &mutex, Cond &cond); + Mutex &mutex); #endif diff --git a/src/playlist/PlaylistPlugin.hxx b/src/playlist/PlaylistPlugin.hxx index a3f2eb8a1..acd4d67ee 100644 --- a/src/playlist/PlaylistPlugin.hxx +++ b/src/playlist/PlaylistPlugin.hxx @@ -25,7 +25,6 @@ struct ConfigBlock; struct Tag; class Mutex; -class Cond; class SongEnumerator; struct playlist_plugin { @@ -52,7 +51,7 @@ struct playlist_plugin { * either matched one of the schemes or one of the suffixes. */ std::unique_ptr<SongEnumerator> (*open_uri)(const char *uri, - Mutex &mutex, Cond &cond); + Mutex &mutex); /** * Opens the playlist in the specified input stream. It has diff --git a/src/playlist/PlaylistQueue.cxx b/src/playlist/PlaylistQueue.cxx index 77edb7249..ca01d7267 100644 --- a/src/playlist/PlaylistQueue.cxx +++ b/src/playlist/PlaylistQueue.cxx @@ -26,7 +26,6 @@ #include "SongEnumerator.hxx" #include "DetachedSong.hxx" #include "thread/Mutex.hxx" -#include "thread/Cond.hxx" #include "fs/Traits.hxx" #ifdef ENABLE_DATABASE @@ -70,13 +69,12 @@ playlist_open_into_queue(const char *uri, const SongLoader &loader) { Mutex mutex; - Cond cond; auto playlist = playlist_open_any(uri, #ifdef ENABLE_DATABASE loader.GetStorage(), #endif - mutex, cond); + mutex); if (playlist == nullptr) throw PlaylistError::NoSuchList(); diff --git a/src/playlist/PlaylistRegistry.cxx b/src/playlist/PlaylistRegistry.cxx index be8b536f1..01151c83a 100644 --- a/src/playlist/PlaylistRegistry.cxx +++ b/src/playlist/PlaylistRegistry.cxx @@ -103,7 +103,7 @@ playlist_list_global_finish() noexcept } static std::unique_ptr<SongEnumerator> -playlist_list_open_uri_scheme(const char *uri, Mutex &mutex, Cond &cond, +playlist_list_open_uri_scheme(const char *uri, Mutex &mutex, bool *tried) { assert(uri != nullptr); @@ -120,7 +120,7 @@ playlist_list_open_uri_scheme(const char *uri, Mutex &mutex, Cond &cond, if (playlist_plugins_enabled[i] && plugin->open_uri != nullptr && plugin->schemes != nullptr && StringArrayContainsCase(plugin->schemes, scheme.c_str())) { - auto playlist = plugin->open_uri(uri, mutex, cond); + auto playlist = plugin->open_uri(uri, mutex); if (playlist) return playlist; @@ -132,7 +132,7 @@ playlist_list_open_uri_scheme(const char *uri, Mutex &mutex, Cond &cond, } static std::unique_ptr<SongEnumerator> -playlist_list_open_uri_suffix(const char *uri, Mutex &mutex, Cond &cond, +playlist_list_open_uri_suffix(const char *uri, Mutex &mutex, const bool *tried) { assert(uri != nullptr); @@ -148,7 +148,7 @@ playlist_list_open_uri_suffix(const char *uri, Mutex &mutex, Cond &cond, if (playlist_plugins_enabled[i] && !tried[i] && plugin->open_uri != nullptr && plugin->suffixes != nullptr && StringArrayContainsCase(plugin->suffixes, suffix)) { - auto playlist = plugin->open_uri(uri, mutex, cond); + auto playlist = plugin->open_uri(uri, mutex); if (playlist != nullptr) return playlist; } @@ -158,7 +158,7 @@ playlist_list_open_uri_suffix(const char *uri, Mutex &mutex, Cond &cond, } std::unique_ptr<SongEnumerator> -playlist_list_open_uri(const char *uri, Mutex &mutex, Cond &cond) +playlist_list_open_uri(const char *uri, Mutex &mutex) { /** this array tracks which plugins have already been tried by playlist_list_open_uri_scheme() */ @@ -168,9 +168,9 @@ playlist_list_open_uri(const char *uri, Mutex &mutex, Cond &cond) memset(tried, false, sizeof(tried)); - auto playlist = playlist_list_open_uri_scheme(uri, mutex, cond, tried); + auto playlist = playlist_list_open_uri_scheme(uri, mutex, tried); if (playlist == nullptr) - playlist = playlist_list_open_uri_suffix(uri, mutex, cond, + playlist = playlist_list_open_uri_suffix(uri, mutex, tried); return playlist; diff --git a/src/playlist/PlaylistRegistry.hxx b/src/playlist/PlaylistRegistry.hxx index 877d6fc84..a651818f6 100644 --- a/src/playlist/PlaylistRegistry.hxx +++ b/src/playlist/PlaylistRegistry.hxx @@ -24,7 +24,6 @@ #include "Compiler.h" class Mutex; -class Cond; class SongEnumerator; extern const struct playlist_plugin *const playlist_plugins[]; @@ -51,7 +50,7 @@ playlist_list_global_finish() noexcept; * Opens a playlist by its URI. */ std::unique_ptr<SongEnumerator> -playlist_list_open_uri(const char *uri, Mutex &mutex, Cond &cond); +playlist_list_open_uri(const char *uri, Mutex &mutex); std::unique_ptr<SongEnumerator> playlist_list_open_stream_suffix(InputStreamPtr &&is, const char *suffix); diff --git a/src/playlist/PlaylistStream.cxx b/src/playlist/PlaylistStream.cxx index e16162680..5c2403a8f 100644 --- a/src/playlist/PlaylistStream.cxx +++ b/src/playlist/PlaylistStream.cxx @@ -32,7 +32,7 @@ #include <assert.h> static std::unique_ptr<SongEnumerator> -playlist_open_path_suffix(Path path, Mutex &mutex, Cond &cond) +playlist_open_path_suffix(Path path, Mutex &mutex) try { assert(!path.IsNull()); @@ -44,7 +44,7 @@ try { if (!playlist_suffix_supported(suffix_utf8.c_str())) return nullptr; - auto is = OpenLocalInputStream(path, mutex, cond); + auto is = OpenLocalInputStream(path, mutex); return playlist_list_open_stream_suffix(std::move(is), suffix_utf8.c_str()); } catch (...) { @@ -53,16 +53,16 @@ try { } std::unique_ptr<SongEnumerator> -playlist_open_path(Path path, Mutex &mutex, Cond &cond) +playlist_open_path(Path path, Mutex &mutex) try { assert(!path.IsNull()); const std::string uri_utf8 = path.ToUTF8(); auto playlist = !uri_utf8.empty() - ? playlist_list_open_uri(uri_utf8.c_str(), mutex, cond) + ? playlist_list_open_uri(uri_utf8.c_str(), mutex) : nullptr; if (playlist == nullptr) - playlist = playlist_open_path_suffix(path, mutex, cond); + playlist = playlist_open_path_suffix(path, mutex); return playlist; } catch (...) { @@ -71,15 +71,15 @@ try { } std::unique_ptr<SongEnumerator> -playlist_open_remote(const char *uri, Mutex &mutex, Cond &cond) +playlist_open_remote(const char *uri, Mutex &mutex) try { assert(uri_has_scheme(uri)); - auto playlist = playlist_list_open_uri(uri, mutex, cond); + auto playlist = playlist_list_open_uri(uri, mutex); if (playlist != nullptr) return playlist; - auto is = InputStream::OpenReady(uri, mutex, cond); + auto is = InputStream::OpenReady(uri, mutex); return playlist_list_open_stream(std::move(is), uri); } catch (...) { LogError(std::current_exception()); diff --git a/src/playlist/PlaylistStream.hxx b/src/playlist/PlaylistStream.hxx index c53b9f8b0..46742e970 100644 --- a/src/playlist/PlaylistStream.hxx +++ b/src/playlist/PlaylistStream.hxx @@ -25,7 +25,6 @@ #include <memory> class Mutex; -class Cond; class SongEnumerator; class Path; @@ -37,10 +36,10 @@ class Path; */ gcc_nonnull_all std::unique_ptr<SongEnumerator> -playlist_open_path(Path path, Mutex &mutex, Cond &cond); +playlist_open_path(Path path, Mutex &mutex); gcc_nonnull_all std::unique_ptr<SongEnumerator> -playlist_open_remote(const char *uri, Mutex &mutex, Cond &cond); +playlist_open_remote(const char *uri, Mutex &mutex); #endif diff --git a/src/playlist/Print.cxx b/src/playlist/Print.cxx index bc3c96fa2..95b8ad66b 100644 --- a/src/playlist/Print.cxx +++ b/src/playlist/Print.cxx @@ -26,7 +26,6 @@ #include "DetachedSong.hxx" #include "fs/Traits.hxx" #include "thread/Mutex.hxx" -#include "thread/Cond.hxx" #include "Partition.hxx" #include "Instance.hxx" @@ -59,7 +58,6 @@ playlist_file_print(Response &r, Partition &partition, const char *uri, bool detail) { Mutex mutex; - Cond cond; #ifndef ENABLE_DATABASE (void)partition; @@ -69,7 +67,7 @@ playlist_file_print(Response &r, Partition &partition, #ifdef ENABLE_DATABASE partition.instance.storage, #endif - mutex, cond); + mutex); if (playlist == nullptr) return false; diff --git a/src/playlist/plugins/EmbeddedCuePlaylistPlugin.cxx b/src/playlist/plugins/EmbeddedCuePlaylistPlugin.cxx index ef99428bd..afbf6a55c 100644 --- a/src/playlist/plugins/EmbeddedCuePlaylistPlugin.cxx +++ b/src/playlist/plugins/EmbeddedCuePlaylistPlugin.cxx @@ -89,8 +89,7 @@ static constexpr TagHandler embcue_tag_handler = { static std::unique_ptr<SongEnumerator> embcue_playlist_open_uri(const char *uri, - gcc_unused Mutex &mutex, - gcc_unused Cond &cond) + gcc_unused Mutex &mutex) { if (!PathTraitsUTF8::IsAbsolute(uri)) /* only local files supported */ diff --git a/src/playlist/plugins/FlacPlaylistPlugin.cxx b/src/playlist/plugins/FlacPlaylistPlugin.cxx index 58e5c9641..330c3b332 100644 --- a/src/playlist/plugins/FlacPlaylistPlugin.cxx +++ b/src/playlist/plugins/FlacPlaylistPlugin.cxx @@ -88,7 +88,7 @@ FlacPlaylist::NextSong() static std::unique_ptr<SongEnumerator> flac_playlist_open_uri(const char *uri, - gcc_unused Mutex &mutex, gcc_unused Cond &cond) + gcc_unused Mutex &mutex) { if (!PathTraitsUTF8::IsAbsolute(uri)) /* only local files supported */ diff --git a/src/playlist/plugins/SoundCloudPlaylistPlugin.cxx b/src/playlist/plugins/SoundCloudPlaylistPlugin.cxx index fdaffb6c9..3a5df3946 100644 --- a/src/playlist/plugins/SoundCloudPlaylistPlugin.cxx +++ b/src/playlist/plugins/SoundCloudPlaylistPlugin.cxx @@ -221,9 +221,9 @@ static constexpr yajl_callbacks parse_callbacks = { */ static void soundcloud_parse_json(const char *url, Yajl::Handle &handle, - Mutex &mutex, Cond &cond) + Mutex &mutex) { - auto input_stream = InputStream::OpenReady(url, mutex, cond); + auto input_stream = InputStream::OpenReady(url, mutex); Yajl::ParseInputStream(handle, *input_stream); } @@ -235,7 +235,7 @@ soundcloud_parse_json(const char *url, Yajl::Handle &handle, * soundcloud://url/<url or path of soundcloud page> */ static std::unique_ptr<SongEnumerator> -soundcloud_open_uri(const char *uri, Mutex &mutex, Cond &cond) +soundcloud_open_uri(const char *uri, Mutex &mutex) { assert(strncmp(uri, "soundcloud://", 13) == 0); uri += 13; @@ -277,7 +277,7 @@ soundcloud_open_uri(const char *uri, Mutex &mutex, Cond &cond) SoundCloudJsonData data; Yajl::Handle handle(&parse_callbacks, nullptr, &data); - soundcloud_parse_json(u, handle, mutex, cond); + soundcloud_parse_json(u, handle, mutex); data.songs.reverse(); return std::make_unique<MemorySongEnumerator>(std::move(data.songs)); diff --git a/src/tag/Generic.cxx b/src/tag/Generic.cxx index 0d97c9d92..3aff10c79 100644 --- a/src/tag/Generic.cxx +++ b/src/tag/Generic.cxx @@ -23,7 +23,6 @@ #include "ApeTag.hxx" #include "fs/Path.hxx" #include "thread/Mutex.hxx" -#include "thread/Cond.hxx" #include "input/InputStream.hxx" #include "input/LocalOpen.hxx" #include "Log.hxx" @@ -53,9 +52,8 @@ bool ScanGenericTags(Path path, const TagHandler &handler, void *ctx) try { Mutex mutex; - Cond cond; - auto is = OpenLocalInputStream(path, mutex, cond); + auto is = OpenLocalInputStream(path, mutex); return ScanGenericTags(*is, handler, ctx); } catch (...) { LogError(std::current_exception()); diff --git a/test/DumpDecoderClient.cxx b/test/DumpDecoderClient.cxx index 95dd526ef..3c459d469 100644 --- a/test/DumpDecoderClient.cxx +++ b/test/DumpDecoderClient.cxx @@ -73,7 +73,7 @@ DumpDecoderClient::SeekError() InputStreamPtr DumpDecoderClient::OpenUri(const char *uri) { - return InputStream::OpenReady(uri, mutex, cond); + return InputStream::OpenReady(uri, mutex); } size_t diff --git a/test/DumpDecoderClient.hxx b/test/DumpDecoderClient.hxx index 0fcb9754c..77b5cd60c 100644 --- a/test/DumpDecoderClient.hxx +++ b/test/DumpDecoderClient.hxx @@ -23,7 +23,6 @@ #include "check.h" #include "decoder/Client.hxx" #include "thread/Mutex.hxx" -#include "thread/Cond.hxx" /** * A #DecoderClient implementation which dumps metadata to stderr and @@ -36,7 +35,6 @@ class DumpDecoderClient final : public DecoderClient { public: Mutex mutex; - Cond cond; bool IsInitialized() const noexcept { return initialized; diff --git a/test/ReadApeTags.cxx b/test/ReadApeTags.cxx index c4b9887f5..4a1daea32 100644 --- a/test/ReadApeTags.cxx +++ b/test/ReadApeTags.cxx @@ -20,7 +20,6 @@ #include "config.h" #include "tag/ApeLoader.hxx" #include "thread/Mutex.hxx" -#include "thread/Cond.hxx" #include "fs/Path.hxx" #include "Log.hxx" #include "input/InputStream.hxx" @@ -62,9 +61,8 @@ try { const Path path = Path::FromFS(argv[1]); Mutex mutex; - Cond cond; - auto is = OpenLocalInputStream(path, mutex, cond); + auto is = OpenLocalInputStream(path, mutex); if (!tag_ape_scan(*is, MyApeTagCallback)) { fprintf(stderr, "error\n"); diff --git a/test/dump_playlist.cxx b/test/dump_playlist.cxx index 1eb84b8f8..cfafcfad0 100644 --- a/test/dump_playlist.cxx +++ b/test/dump_playlist.cxx @@ -74,14 +74,13 @@ try { /* open the playlist */ Mutex mutex; - Cond cond; InputStreamPtr is; - auto playlist = playlist_list_open_uri(uri, mutex, cond); + auto playlist = playlist_list_open_uri(uri, mutex); if (playlist == NULL) { /* open the stream and wait until it becomes ready */ - is = InputStream::OpenReady(uri, mutex, cond); + is = InputStream::OpenReady(uri, mutex); /* open the playlist */ diff --git a/test/dump_rva2.cxx b/test/dump_rva2.cxx index f9d916876..68beec9b4 100644 --- a/test/dump_rva2.cxx +++ b/test/dump_rva2.cxx @@ -23,7 +23,6 @@ #include "ReplayGainInfo.hxx" #include "config/ConfigGlobal.hxx" #include "thread/Mutex.hxx" -#include "thread/Cond.hxx" #include "fs/Path.hxx" #include "input/InputStream.hxx" #include "input/LocalOpen.hxx" @@ -75,9 +74,8 @@ try { const Path path = Path::FromFS(argv[1]); Mutex mutex; - Cond cond; - auto is = OpenLocalInputStream(path, mutex, cond); + auto is = OpenLocalInputStream(path, mutex); const auto tag = tag_id3_load(*is); if (tag == NULL) { diff --git a/test/dump_text_file.cxx b/test/dump_text_file.cxx index c72d60464..24962b836 100644 --- a/test/dump_text_file.cxx +++ b/test/dump_text_file.cxx @@ -23,7 +23,6 @@ #include "input/InputStream.hxx" #include "input/TextInputStream.hxx" #include "config/ConfigGlobal.hxx" -#include "thread/Cond.hxx" #include "Log.hxx" #ifdef ENABLE_ARCHIVE @@ -94,9 +93,8 @@ try { /* open the stream and dump it */ Mutex mutex; - Cond cond; - auto is = InputStream::OpenReady(argv[1], mutex, cond); + auto is = InputStream::OpenReady(argv[1], mutex); return dump_input_stream(std::move(is)); } catch (const std::exception &e) { LogError(e); diff --git a/test/read_tags.cxx b/test/read_tags.cxx index 8bc33b804..2268afaf9 100644 --- a/test/read_tags.cxx +++ b/test/read_tags.cxx @@ -26,7 +26,6 @@ #include "tag/Handler.hxx" #include "tag/Generic.hxx" #include "fs/Path.hxx" -#include "thread/Cond.hxx" #include "Log.hxx" #include "util/ScopeExit.hxx" @@ -110,11 +109,10 @@ try { } Mutex mutex; - Cond cond; InputStreamPtr is; if (!success && plugin->scan_stream != NULL) { - is = InputStream::OpenReady(path.c_str(), mutex, cond); + is = InputStream::OpenReady(path.c_str(), mutex); success = plugin->ScanStream(*is, print_handler, nullptr); } diff --git a/test/run_decoder.cxx b/test/run_decoder.cxx index 536b3b45d..701d9e7c1 100644 --- a/test/run_decoder.cxx +++ b/test/run_decoder.cxx @@ -123,8 +123,7 @@ try { if (plugin->file_decode != nullptr) { plugin->FileDecode(client, Path::FromFS(c.uri)); } else if (plugin->stream_decode != nullptr) { - auto is = InputStream::OpenReady(c.uri, client.mutex, - client.cond); + auto is = InputStream::OpenReady(c.uri, client.mutex); plugin->StreamDecode(client, *is); } else { fprintf(stderr, "Decoder plugin is not usable\n"); diff --git a/test/run_input.cxx b/test/run_input.cxx index fc5937f68..d255a76b7 100644 --- a/test/run_input.cxx +++ b/test/run_input.cxx @@ -239,8 +239,7 @@ try { /* open the stream and dump it */ Mutex mutex; - Cond cond; - auto is = InputStream::OpenReady(c.uri, mutex, cond); + auto is = InputStream::OpenReady(c.uri, mutex); return dump_input_stream(is.get()); } catch (const std::exception &e) { LogError(e); diff --git a/test/test_rewind.cxx b/test/test_rewind.cxx index d911b5692..db03b2b55 100644 --- a/test/test_rewind.cxx +++ b/test/test_rewind.cxx @@ -6,7 +6,6 @@ #include "input/RewindInputStream.hxx" #include "input/InputStream.hxx" #include "thread/Mutex.hxx" -#include "thread/Cond.hxx" #include <cppunit/TestFixture.h> #include <cppunit/extensions/TestFactoryRegistry.h> @@ -24,9 +23,9 @@ class StringInputStream final : public InputStream { public: StringInputStream(const char *_uri, - Mutex &_mutex, Cond &_cond, + Mutex &_mutex, const char *_data) - :InputStream(_uri, _mutex, _cond), + :InputStream(_uri, _mutex), data(_data), remaining(strlen(data)) { SetReady(); } @@ -54,10 +53,9 @@ class RewindTest : public CppUnit::TestFixture { public: void TestRewind() { Mutex mutex; - Cond cond; StringInputStream *sis = - new StringInputStream("foo://", mutex, cond, + new StringInputStream("foo://", mutex, "foo bar"); CPPUNIT_ASSERT(sis->IsReady());