diff --git a/src/archive/plugins/Bzip2ArchivePlugin.cxx b/src/archive/plugins/Bzip2ArchivePlugin.cxx index db0d013ff..638ad84e5 100644 --- a/src/archive/plugins/Bzip2ArchivePlugin.cxx +++ b/src/archive/plugins/Bzip2ArchivePlugin.cxx @@ -72,7 +72,8 @@ public: /* virtual methods from InputStream */ bool IsEOF() noexcept override; - size_t Read(void *ptr, size_t size) override; + size_t Read(std::unique_lock &lock, + void *ptr, size_t size) override; private: void Open(); @@ -147,7 +148,7 @@ Bzip2InputStream::FillBuffer() } size_t -Bzip2InputStream::Read(void *ptr, size_t length) +Bzip2InputStream::Read(std::unique_lock &, void *ptr, size_t length) { const ScopeUnlock unlock(mutex); diff --git a/src/archive/plugins/Iso9660ArchivePlugin.cxx b/src/archive/plugins/Iso9660ArchivePlugin.cxx index a27230728..59060bd64 100644 --- a/src/archive/plugins/Iso9660ArchivePlugin.cxx +++ b/src/archive/plugins/Iso9660ArchivePlugin.cxx @@ -157,7 +157,8 @@ public: /* virtual methods from InputStream */ bool IsEOF() noexcept override; - size_t Read(void *ptr, size_t size) override; + size_t Read(std::unique_lock &lock, + void *ptr, size_t size) override; }; InputStreamPtr @@ -174,7 +175,8 @@ Iso9660ArchiveFile::OpenStream(const char *pathname, } size_t -Iso9660InputStream::Read(void *ptr, size_t read_size) +Iso9660InputStream::Read(std::unique_lock &, + void *ptr, size_t read_size) { const ScopeUnlock unlock(mutex); diff --git a/src/archive/plugins/ZzipArchivePlugin.cxx b/src/archive/plugins/ZzipArchivePlugin.cxx index e238dd4ba..ed41c5d2b 100644 --- a/src/archive/plugins/ZzipArchivePlugin.cxx +++ b/src/archive/plugins/ZzipArchivePlugin.cxx @@ -111,8 +111,9 @@ public: /* virtual methods from InputStream */ bool IsEOF() noexcept override; - size_t Read(void *ptr, size_t size) override; - void Seek(offset_type offset) override; + size_t Read(std::unique_lock &lock, + void *ptr, size_t size) override; + void Seek(std::unique_lock &lock, offset_type offset) override; }; InputStreamPtr @@ -130,7 +131,7 @@ ZzipArchiveFile::OpenStream(const char *pathname, } size_t -ZzipInputStream::Read(void *ptr, size_t read_size) +ZzipInputStream::Read(std::unique_lock &, void *ptr, size_t read_size) { const ScopeUnlock unlock(mutex); @@ -149,7 +150,7 @@ ZzipInputStream::IsEOF() noexcept } void -ZzipInputStream::Seek(offset_type new_offset) +ZzipInputStream::Seek(std::unique_lock &, offset_type new_offset) { const ScopeUnlock unlock(mutex); diff --git a/src/command/FileCommands.cxx b/src/command/FileCommands.cxx index 448da2caf..9f040bd8b 100644 --- a/src/command/FileCommands.cxx +++ b/src/command/FileCommands.cxx @@ -292,9 +292,9 @@ read_stream_art(Response &r, const char *uri, size_t offset) size_t read_size; { - const std::lock_guard protect(mutex); - is->Seek(offset); - read_size = is->Read(&buffer, CHUNK_SIZE); + std::unique_lock lock(mutex); + is->Seek(lock, offset); + read_size = is->Read(lock, &buffer, CHUNK_SIZE); } r.Format("size: %" PRIoffset "\n" diff --git a/src/command/FingerprintCommands.cxx b/src/command/FingerprintCommands.cxx index be5aa3f2c..295795c95 100644 --- a/src/command/FingerprintCommands.cxx +++ b/src/command/FingerprintCommands.cxx @@ -305,7 +305,7 @@ GetChromaprintCommand::Read(InputStream &is, void *buffer, size_t length) cond.wait(lock); } - return is.Read(buffer, length); + return is.Read(lock, buffer, length); } CommandResult diff --git a/src/decoder/Bridge.cxx b/src/decoder/Bridge.cxx index 28e0d0104..065da259d 100644 --- a/src/decoder/Bridge.cxx +++ b/src/decoder/Bridge.cxx @@ -403,7 +403,7 @@ try { dc.cond.wait(lock); } - size_t nbytes = is.Read(buffer, length); + size_t nbytes = is.Read(lock, buffer, length); assert(nbytes > 0 || is.IsEOF()); return nbytes; diff --git a/src/decoder/Thread.cxx b/src/decoder/Thread.cxx index 91a4344d2..d042c7a5c 100644 --- a/src/decoder/Thread.cxx +++ b/src/decoder/Thread.cxx @@ -55,7 +55,8 @@ static constexpr Domain decoder_thread_domain("decoder_thread"); static bool decoder_stream_decode(const DecoderPlugin &plugin, DecoderBridge &bridge, - InputStream &input_stream) + InputStream &input_stream, + std::unique_lock &lock) { assert(plugin.stream_decode != nullptr); assert(bridge.stream_tag == nullptr); @@ -70,7 +71,7 @@ decoder_stream_decode(const DecoderPlugin &plugin, /* rewind the stream, so each plugin gets a fresh start */ try { - input_stream.Rewind(); + input_stream.Rewind(lock); } catch (...) { } @@ -161,6 +162,7 @@ decoder_check_plugin(const DecoderPlugin &plugin, const InputStream &is, static bool decoder_run_stream_plugin(DecoderBridge &bridge, InputStream &is, + std::unique_lock &lock, const char *suffix, const DecoderPlugin &plugin, bool &tried_r) @@ -171,11 +173,12 @@ decoder_run_stream_plugin(DecoderBridge &bridge, InputStream &is, bridge.Reset(); tried_r = true; - return decoder_stream_decode(plugin, bridge, is); + return decoder_stream_decode(plugin, bridge, is, lock); } static bool decoder_run_stream_locked(DecoderBridge &bridge, InputStream &is, + std::unique_lock &lock, const char *uri, bool &tried_r) { UriSuffixBuffer suffix_buffer; @@ -183,7 +186,8 @@ decoder_run_stream_locked(DecoderBridge &bridge, InputStream &is, using namespace std::placeholders; const auto f = std::bind(decoder_run_stream_plugin, - std::ref(bridge), std::ref(is), suffix, + std::ref(bridge), std::ref(is), std::ref(lock), + suffix, _1, std::ref(tried_r)); return decoder_plugins_try(f); } @@ -192,7 +196,8 @@ decoder_run_stream_locked(DecoderBridge &bridge, InputStream &is, * Try decoding a stream, using the fallback plugin. */ static bool -decoder_run_stream_fallback(DecoderBridge &bridge, InputStream &is) +decoder_run_stream_fallback(DecoderBridge &bridge, InputStream &is, + std::unique_lock &lock) { const struct DecoderPlugin *plugin; @@ -202,7 +207,7 @@ decoder_run_stream_fallback(DecoderBridge &bridge, InputStream &is) plugin = decoder_plugin_from_name("mad"); #endif return plugin != nullptr && plugin->stream_decode != nullptr && - decoder_stream_decode(*plugin, bridge, is); + decoder_stream_decode(*plugin, bridge, is, lock); } /** @@ -249,16 +254,16 @@ decoder_run_stream(DecoderBridge &bridge, const char *uri) MaybeLoadReplayGain(bridge, *input_stream); - const std::lock_guard protect(dc.mutex); + std::unique_lock lock(dc.mutex); bool tried = false; return dc.command == DecoderCommand::STOP || - decoder_run_stream_locked(bridge, *input_stream, uri, + decoder_run_stream_locked(bridge, *input_stream, lock, uri, tried) || /* fallback to mp3: this is needed for bastard streams that don't have a suffix or set the mimeType */ (!tried && - decoder_run_stream_fallback(bridge, *input_stream)); + decoder_run_stream_fallback(bridge, *input_stream, lock)); } /** @@ -282,8 +287,9 @@ TryDecoderFile(DecoderBridge &bridge, Path path_fs, const char *suffix, const std::lock_guard protect(dc.mutex); return decoder_file_decode(plugin, bridge, path_fs); } else if (plugin.stream_decode != nullptr) { - const std::lock_guard protect(dc.mutex); - return decoder_stream_decode(plugin, bridge, input_stream); + std::unique_lock lock(dc.mutex); + return decoder_stream_decode(plugin, bridge, input_stream, + lock); } else return false; } diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx index ce78d166d..3ccd41353 100644 --- a/src/input/AsyncInputStream.cxx +++ b/src/input/AsyncInputStream.cxx @@ -95,7 +95,8 @@ AsyncInputStream::IsEOF() noexcept } void -AsyncInputStream::Seek(offset_type new_offset) +AsyncInputStream::Seek(std::unique_lock &lock, + offset_type new_offset) { assert(IsReady()); assert(seek_state == SeekState::NONE); @@ -136,7 +137,7 @@ AsyncInputStream::Seek(offset_type new_offset) CondInputStreamHandler cond_handler; const ScopeExchangeInputStreamHandler h(*this, &cond_handler); while (seek_state != SeekState::NONE) - cond_handler.cond.wait(mutex); + cond_handler.cond.wait(lock); Check(); } @@ -171,7 +172,8 @@ AsyncInputStream::IsAvailable() noexcept } size_t -AsyncInputStream::Read(void *ptr, size_t read_size) +AsyncInputStream::Read(std::unique_lock &lock, + void *ptr, size_t read_size) { assert(!GetEventLoop().IsInside()); @@ -187,7 +189,7 @@ AsyncInputStream::Read(void *ptr, size_t read_size) break; const ScopeExchangeInputStreamHandler h(*this, &cond_handler); - cond_handler.cond.wait(mutex); + cond_handler.cond.wait(lock); } const size_t nbytes = std::min(read_size, r.size); diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx index 958f5f690..2577cd9fa 100644 --- a/src/input/AsyncInputStream.hxx +++ b/src/input/AsyncInputStream.hxx @@ -83,10 +83,12 @@ public: /* virtual methods from InputStream */ void Check() final; bool IsEOF() noexcept final; - void Seek(offset_type new_offset) final; + void Seek(std::unique_lock &lock, + offset_type new_offset) final; std::unique_ptr ReadTag() noexcept final; bool IsAvailable() noexcept final; - size_t Read(void *ptr, size_t read_size) final; + size_t Read(std::unique_lock &lock, + void *ptr, size_t read_size) final; protected: /** diff --git a/src/input/BufferedInputStream.cxx b/src/input/BufferedInputStream.cxx index d28964dbb..69a557a79 100644 --- a/src/input/BufferedInputStream.cxx +++ b/src/input/BufferedInputStream.cxx @@ -64,7 +64,8 @@ BufferedInputStream::Check() } void -BufferedInputStream::Seek(offset_type new_offset) +BufferedInputStream::Seek(std::unique_lock &lock, + offset_type new_offset) { if (new_offset >= size) { offset = size; @@ -84,7 +85,7 @@ BufferedInputStream::Seek(offset_type new_offset) wake_cond.notify_one(); while (seek) - client_cond.wait(mutex); + client_cond.wait(lock); if (seek_error) std::rethrow_exception(std::exchange(seek_error, {})); @@ -105,7 +106,8 @@ BufferedInputStream::IsAvailable() noexcept } size_t -BufferedInputStream::Read(void *ptr, size_t s) +BufferedInputStream::Read(std::unique_lock &lock, + void *ptr, size_t s) { if (offset >= size) return 0; @@ -140,7 +142,7 @@ BufferedInputStream::Read(void *ptr, size_t s) wake_cond.notify_one(); } - client_cond.wait(mutex); + client_cond.wait(lock); } } @@ -156,7 +158,7 @@ BufferedInputStream::RunThread() noexcept if (seek) { try { - input->Seek(seek_offset); + input->Seek(lock, seek_offset); } catch (...) { seek_error = std::current_exception(); } @@ -183,7 +185,7 @@ BufferedInputStream::RunThread() noexcept offset to prepare filling the buffer from there */ try { - input->Seek(offset); + input->Seek(lock, offset); } catch (...) { read_error = std::current_exception(); client_cond.notify_one(); @@ -195,7 +197,8 @@ BufferedInputStream::RunThread() noexcept } try { - size_t nbytes = input->Read(w.data, w.size); + size_t nbytes = input->Read(lock, + w.data, w.size); buffer.Commit(read_offset, read_offset + nbytes); } catch (...) { diff --git a/src/input/BufferedInputStream.hxx b/src/input/BufferedInputStream.hxx index 46a85a470..dcb6282a1 100644 --- a/src/input/BufferedInputStream.hxx +++ b/src/input/BufferedInputStream.hxx @@ -85,12 +85,13 @@ public: /* we don't need to implement Update() because all attributes have been copied already in our constructor */ //void Update() noexcept; - void Seek(offset_type offset) override; + void Seek(std::unique_lock &lock, offset_type offset) override; bool IsEOF() noexcept override; /* we don't support tags */ // std::unique_ptr ReadTag() override; bool IsAvailable() noexcept override; - size_t Read(void *ptr, size_t size) override; + size_t Read(std::unique_lock &lock, + void *ptr, size_t size) override; /* virtual methods from class InputStreamHandler */ void OnInputStreamReady() noexcept override { diff --git a/src/input/FailingInputStream.hxx b/src/input/FailingInputStream.hxx index 008a1ceae..f150fd972 100644 --- a/src/input/FailingInputStream.hxx +++ b/src/input/FailingInputStream.hxx @@ -45,7 +45,7 @@ public: std::rethrow_exception(error); } - void Seek(offset_type) override { + void Seek(std::unique_lock &, offset_type) override { std::rethrow_exception(error); } @@ -53,7 +53,7 @@ public: return false; } - size_t Read(void *, size_t) override { + size_t Read(std::unique_lock &, void *, size_t) override { std::rethrow_exception(error); } }; diff --git a/src/input/IcyInputStream.cxx b/src/input/IcyInputStream.cxx index 4614d8f81..3d85ea47a 100644 --- a/src/input/IcyInputStream.cxx +++ b/src/input/IcyInputStream.cxx @@ -80,13 +80,14 @@ IcyInputStream::ReadTag() noexcept } size_t -IcyInputStream::Read(void *ptr, size_t read_size) +IcyInputStream::Read(std::unique_lock &lock, + void *ptr, size_t read_size) { if (!IsEnabled()) - return ProxyInputStream::Read(ptr, read_size); + return ProxyInputStream::Read(lock, ptr, read_size); while (true) { - size_t nbytes = ProxyInputStream::Read(ptr, read_size); + size_t nbytes = ProxyInputStream::Read(lock, ptr, read_size); if (nbytes == 0) return 0; diff --git a/src/input/IcyInputStream.hxx b/src/input/IcyInputStream.hxx index 5a6a69074..183cc46d7 100644 --- a/src/input/IcyInputStream.hxx +++ b/src/input/IcyInputStream.hxx @@ -66,7 +66,8 @@ public: /* virtual methods from InputStream */ void Update() noexcept override; std::unique_ptr ReadTag() noexcept override; - size_t Read(void *ptr, size_t size) override; + size_t Read(std::unique_lock &lock, + void *ptr, size_t size) override; }; #endif diff --git a/src/input/InputStream.cxx b/src/input/InputStream.cxx index e125a0409..f808fb5ee 100644 --- a/src/input/InputStream.cxx +++ b/src/input/InputStream.cxx @@ -72,7 +72,7 @@ InputStream::CheapSeeking() const noexcept } void -InputStream::Seek(gcc_unused offset_type new_offset) +InputStream::Seek(std::unique_lock &, gcc_unused offset_type new_offset) { throw std::runtime_error("Seeking is not implemented"); } @@ -80,15 +80,15 @@ InputStream::Seek(gcc_unused offset_type new_offset) void InputStream::LockSeek(offset_type _offset) { - const std::lock_guard protect(mutex); - Seek(_offset); + std::unique_lock lock(mutex); + Seek(lock, _offset); } void InputStream::LockSkip(offset_type _offset) { - const std::lock_guard protect(mutex); - Skip(_offset); + std::unique_lock lock(mutex); + Skip(lock, _offset); } std::unique_ptr @@ -119,18 +119,18 @@ InputStream::LockRead(void *ptr, size_t _size) #endif assert(_size > 0); - const std::lock_guard protect(mutex); - return Read(ptr, _size); + std::unique_lock lock(mutex); + return Read(lock, ptr, _size); } void -InputStream::ReadFull(void *_ptr, size_t _size) +InputStream::ReadFull(std::unique_lock &lock, void *_ptr, size_t _size) { uint8_t *ptr = (uint8_t *)_ptr; size_t nbytes_total = 0; while (_size > 0) { - size_t nbytes = Read(ptr + nbytes_total, _size); + size_t nbytes = Read(lock, ptr + nbytes_total, _size); if (nbytes == 0) throw std::runtime_error("Unexpected end of file"); @@ -148,8 +148,8 @@ InputStream::LockReadFull(void *ptr, size_t _size) #endif assert(_size > 0); - const std::lock_guard protect(mutex); - ReadFull(ptr, _size); + std::unique_lock lock(mutex); + ReadFull(lock, ptr, _size); } bool diff --git a/src/input/InputStream.hxx b/src/input/InputStream.hxx index ba8156d6e..7b19a1e0b 100644 --- a/src/input/InputStream.hxx +++ b/src/input/InputStream.hxx @@ -271,9 +271,11 @@ public: * * Throws std::runtime_error on error. * + * @param lock the locked mutex; may be used to wait on + * condition variables * @param offset the relative offset */ - virtual void Seek(offset_type offset); + virtual void Seek(std::unique_lock &lock, offset_type offset); /** * Wrapper for Seek() which locks and unlocks the mutex; the @@ -285,8 +287,8 @@ public: * Rewind to the beginning of the stream. This is a wrapper * for Seek(0, error). */ - void Rewind() { - Seek(0); + void Rewind(std::unique_lock &lock) { + Seek(lock, 0); } void LockRewind() { @@ -296,8 +298,9 @@ public: /** * Skip input bytes. */ - void Skip(offset_type _offset) { - Seek(GetOffset() + _offset); + void Skip(std::unique_lock &lock, + offset_type _offset) { + Seek(lock, GetOffset() + _offset); } void LockSkip(offset_type _offset); @@ -351,12 +354,15 @@ public: * * Throws std::runtime_error on error. * + * @param lock the locked mutex; may be used to wait on + * condition variables * @param ptr the buffer to read into * @param size the maximum number of bytes to read * @return the number of bytes read */ gcc_nonnull_all - virtual size_t Read(void *ptr, size_t size) = 0; + virtual size_t Read(std::unique_lock &lock, + void *ptr, size_t size) = 0; /** * Wrapper for Read() which locks and unlocks the mutex; @@ -379,7 +385,7 @@ public: * @return true if the whole data was read, false otherwise. */ gcc_nonnull_all - void ReadFull(void *ptr, size_t size); + void ReadFull(std::unique_lock &lock, void *ptr, size_t size); /** * Wrapper for ReadFull() which locks and unlocks the mutex; diff --git a/src/input/ProxyInputStream.cxx b/src/input/ProxyInputStream.cxx index f05f439f7..345e6f319 100644 --- a/src/input/ProxyInputStream.cxx +++ b/src/input/ProxyInputStream.cxx @@ -89,12 +89,13 @@ ProxyInputStream::Update() noexcept } void -ProxyInputStream::Seek(offset_type new_offset) +ProxyInputStream::Seek(std::unique_lock &lock, + offset_type new_offset) { while (!input) - set_input_cond.wait(mutex); + set_input_cond.wait(lock); - input->Seek(new_offset); + input->Seek(lock, new_offset); CopyAttributes(); } @@ -120,12 +121,13 @@ ProxyInputStream::IsAvailable() noexcept } size_t -ProxyInputStream::Read(void *ptr, size_t read_size) +ProxyInputStream::Read(std::unique_lock &lock, + void *ptr, size_t read_size) { while (!input) - set_input_cond.wait(mutex); + set_input_cond.wait(lock); - size_t nbytes = input->Read(ptr, read_size); + size_t nbytes = input->Read(lock, ptr, read_size); CopyAttributes(); return nbytes; } diff --git a/src/input/ProxyInputStream.hxx b/src/input/ProxyInputStream.hxx index 853deb11c..d030e16ed 100644 --- a/src/input/ProxyInputStream.hxx +++ b/src/input/ProxyInputStream.hxx @@ -60,11 +60,13 @@ public: /* virtual methods from InputStream */ void Check() override; void Update() noexcept override; - void Seek(offset_type new_offset) override; + void Seek(std::unique_lock &lock, + offset_type new_offset) override; bool IsEOF() noexcept override; std::unique_ptr ReadTag() noexcept override; bool IsAvailable() noexcept override; - size_t Read(void *ptr, size_t read_size) override; + size_t Read(std::unique_lock &lock, + void *ptr, size_t read_size) override; protected: /** diff --git a/src/input/RewindInputStream.cxx b/src/input/RewindInputStream.cxx index 98f2d1967..cb760adb1 100644 --- a/src/input/RewindInputStream.cxx +++ b/src/input/RewindInputStream.cxx @@ -60,8 +60,9 @@ public: return !ReadingFromBuffer() && ProxyInputStream::IsEOF(); } - size_t Read(void *ptr, size_t size) override; - void Seek(offset_type offset) override; + size_t Read(std::unique_lock &lock, + void *ptr, size_t size) override; + void Seek(std::unique_lock &lock, offset_type offset) override; private: /** @@ -74,7 +75,8 @@ private: }; size_t -RewindInputStream::Read(void *ptr, size_t read_size) +RewindInputStream::Read(std::unique_lock &lock, + void *ptr, size_t read_size) { if (ReadingFromBuffer()) { /* buffered read */ @@ -93,7 +95,7 @@ RewindInputStream::Read(void *ptr, size_t read_size) } else { /* pass method call to underlying stream */ - size_t nbytes = input->Read(ptr, read_size); + size_t nbytes = input->Read(lock, ptr, read_size); if (input->GetOffset() > (offset_type)sizeof(buffer)) /* disable buffering */ @@ -114,7 +116,7 @@ RewindInputStream::Read(void *ptr, size_t read_size) } void -RewindInputStream::Seek(offset_type new_offset) +RewindInputStream::Seek(std::unique_lock &lock, offset_type new_offset) { assert(IsReady()); @@ -132,7 +134,7 @@ RewindInputStream::Seek(offset_type new_offset) buffered range now */ tail = 0; - ProxyInputStream::Seek(new_offset); + ProxyInputStream::Seek(lock, new_offset); } } diff --git a/src/input/ThreadInputStream.cxx b/src/input/ThreadInputStream.cxx index 5a21022e1..e1be3ecdc 100644 --- a/src/input/ThreadInputStream.cxx +++ b/src/input/ThreadInputStream.cxx @@ -130,7 +130,8 @@ ThreadInputStream::IsAvailable() noexcept } inline size_t -ThreadInputStream::Read(void *ptr, size_t read_size) +ThreadInputStream::Read(std::unique_lock &lock, + void *ptr, size_t read_size) { assert(!thread.IsInside()); @@ -154,7 +155,7 @@ ThreadInputStream::Read(void *ptr, size_t read_size) return 0; const ScopeExchangeInputStreamHandler h(*this, &cond_handler); - cond_handler.cond.wait(mutex); + cond_handler.cond.wait(lock); } } diff --git a/src/input/ThreadInputStream.hxx b/src/input/ThreadInputStream.hxx index 14b849238..3c946e12a 100644 --- a/src/input/ThreadInputStream.hxx +++ b/src/input/ThreadInputStream.hxx @@ -94,7 +94,8 @@ public: void Check() override final; bool IsEOF() noexcept final; bool IsAvailable() noexcept final; - size_t Read(void *ptr, size_t size) override final; + size_t Read(std::unique_lock &lock, + void *ptr, size_t size) override final; protected: /** diff --git a/src/input/plugins/CdioParanoiaInputPlugin.cxx b/src/input/plugins/CdioParanoiaInputPlugin.cxx index 0079e55a9..9178f8a4a 100644 --- a/src/input/plugins/CdioParanoiaInputPlugin.cxx +++ b/src/input/plugins/CdioParanoiaInputPlugin.cxx @@ -91,8 +91,9 @@ class CdioParanoiaInputStream final : public InputStream { /* virtual methods from InputStream */ bool IsEOF() noexcept override; - size_t Read(void *ptr, size_t size) override; - void Seek(offset_type offset) override; + size_t Read(std::unique_lock &lock, + void *ptr, size_t size) override; + void Seek(std::unique_lock &lock, offset_type offset) override; }; static constexpr Domain cdio_domain("cdio"); @@ -255,7 +256,8 @@ input_cdio_open(const char *uri, } void -CdioParanoiaInputStream::Seek(offset_type new_offset) +CdioParanoiaInputStream::Seek(std::unique_lock &, + offset_type new_offset) { if (new_offset > size) throw FormatRuntimeError("Invalid offset to seek %ld (%ld)", @@ -276,7 +278,8 @@ CdioParanoiaInputStream::Seek(offset_type new_offset) } size_t -CdioParanoiaInputStream::Read(void *ptr, size_t length) +CdioParanoiaInputStream::Read(std::unique_lock &, + void *ptr, size_t length) { size_t nbytes = 0; char *wptr = (char *) ptr; diff --git a/src/input/plugins/FfmpegInputPlugin.cxx b/src/input/plugins/FfmpegInputPlugin.cxx index 198f4bd17..febc44872 100644 --- a/src/input/plugins/FfmpegInputPlugin.cxx +++ b/src/input/plugins/FfmpegInputPlugin.cxx @@ -49,8 +49,10 @@ public: /* virtual methods from InputStream */ bool IsEOF() noexcept override; - size_t Read(void *ptr, size_t size) override; - void Seek(offset_type offset) override; + size_t Read(std::unique_lock &lock, + void *ptr, size_t size) override; + void Seek(std::unique_lock &lock, + offset_type offset) override; }; gcc_const @@ -79,7 +81,8 @@ input_ffmpeg_open(const char *uri, } size_t -FfmpegInputStream::Read(void *ptr, size_t read_size) +FfmpegInputStream::Read(std::unique_lock &, + void *ptr, size_t read_size) { size_t result; @@ -99,7 +102,7 @@ FfmpegInputStream::IsEOF() noexcept } void -FfmpegInputStream::Seek(offset_type new_offset) +FfmpegInputStream::Seek(std::unique_lock &, offset_type new_offset) { uint64_t result; diff --git a/src/input/plugins/FileInputPlugin.cxx b/src/input/plugins/FileInputPlugin.cxx index 989d3de74..a61c94c2b 100644 --- a/src/input/plugins/FileInputPlugin.cxx +++ b/src/input/plugins/FileInputPlugin.cxx @@ -48,8 +48,10 @@ public: return GetOffset() >= GetSize(); } - size_t Read(void *ptr, size_t size) override; - void Seek(offset_type offset) override; + size_t Read(std::unique_lock &lock, + void *ptr, size_t size) override; + void Seek(std::unique_lock &lock, + offset_type offset) override; }; InputStreamPtr @@ -74,7 +76,8 @@ OpenFileInputStream(Path path, Mutex &mutex) } void -FileInputStream::Seek(offset_type new_offset) +FileInputStream::Seek(std::unique_lock &, + offset_type new_offset) { { const ScopeUnlock unlock(mutex); @@ -85,7 +88,8 @@ FileInputStream::Seek(offset_type new_offset) } size_t -FileInputStream::Read(void *ptr, size_t read_size) +FileInputStream::Read(std::unique_lock &, + void *ptr, size_t read_size) { size_t nbytes; diff --git a/src/input/plugins/SmbclientInputPlugin.cxx b/src/input/plugins/SmbclientInputPlugin.cxx index dfd95103d..361d0e18e 100644 --- a/src/input/plugins/SmbclientInputPlugin.cxx +++ b/src/input/plugins/SmbclientInputPlugin.cxx @@ -56,8 +56,9 @@ public: return offset >= size; } - size_t Read(void *ptr, size_t size) override; - void Seek(offset_type offset) override; + size_t Read(std::unique_lock &lock, + void *ptr, size_t size) override; + void Seek(std::unique_lock &lock, offset_type offset) override; }; /* @@ -118,7 +119,8 @@ input_smbclient_open(const char *uri, } size_t -SmbclientInputStream::Read(void *ptr, size_t read_size) +SmbclientInputStream::Read(std::unique_lock &, + void *ptr, size_t read_size) { ssize_t nbytes; @@ -136,7 +138,8 @@ SmbclientInputStream::Read(void *ptr, size_t read_size) } void -SmbclientInputStream::Seek(offset_type new_offset) +SmbclientInputStream::Seek(std::unique_lock &, + offset_type new_offset) { off_t result; diff --git a/src/tag/Aiff.cxx b/src/tag/Aiff.cxx index 3cc3a7f43..77bf06701 100644 --- a/src/tag/Aiff.cxx +++ b/src/tag/Aiff.cxx @@ -39,14 +39,14 @@ struct aiff_chunk_header { }; size_t -aiff_seek_id3(InputStream &is) +aiff_seek_id3(InputStream &is, std::unique_lock &lock) { /* seek to the beginning and read the AIFF header */ - is.Rewind(); + is.Rewind(lock); aiff_header header; - is.ReadFull(&header, sizeof(header)); + is.ReadFull(lock, &header, sizeof(header)); if (memcmp(header.id, "FORM", 4) != 0 || (is.KnownSize() && FromBE32(header.size) > is.GetSize()) || (memcmp(header.format, "AIFF", 4) != 0 && @@ -57,7 +57,7 @@ aiff_seek_id3(InputStream &is) /* read the chunk header */ aiff_chunk_header chunk; - is.ReadFull(&chunk, sizeof(chunk)); + is.ReadFull(lock, &chunk, sizeof(chunk)); size_t size = FromBE32(chunk.size); if (size > size_t(std::numeric_limits::max())) @@ -73,6 +73,6 @@ aiff_seek_id3(InputStream &is) /* pad byte */ ++size; - is.Skip(size); + is.Skip(lock, size); } } diff --git a/src/tag/Aiff.hxx b/src/tag/Aiff.hxx index 28d20c86c..eb59afe71 100644 --- a/src/tag/Aiff.hxx +++ b/src/tag/Aiff.hxx @@ -25,9 +25,12 @@ #ifndef MPD_AIFF_HXX #define MPD_AIFF_HXX +#include + #include class InputStream; +class Mutex; /** * Seeks the AIFF file to the ID3 chunk. @@ -38,6 +41,6 @@ class InputStream; * @return the size of the ID3 chunk */ size_t -aiff_seek_id3(InputStream &is); +aiff_seek_id3(InputStream &is, std::unique_lock &lock); #endif diff --git a/src/tag/ApeLoader.cxx b/src/tag/ApeLoader.cxx index eb2c76b75..15a124824 100644 --- a/src/tag/ApeLoader.cxx +++ b/src/tag/ApeLoader.cxx @@ -40,15 +40,15 @@ struct ApeFooter { bool tag_ape_scan(InputStream &is, ApeTagCallback callback) try { - const std::lock_guard protect(is.mutex); + std::unique_lock lock(is.mutex); if (!is.KnownSize() || !is.CheapSeeking()) return false; /* determine if file has an apeV2 tag */ ApeFooter footer; - is.Seek(is.GetSize() - sizeof(footer)); - is.ReadFull(&footer, sizeof(footer)); + is.Seek(lock, is.GetSize() - sizeof(footer)); + is.ReadFull(lock, &footer, sizeof(footer)); if (memcmp(footer.id, "APETAGEX", sizeof(footer.id)) != 0 || FromLE32(footer.version) != 2000) @@ -61,14 +61,14 @@ try { remaining > 1024 * 1024) return false; - is.Seek(is.GetSize() - remaining); + is.Seek(lock, is.GetSize() - remaining); /* read tag into buffer */ remaining -= sizeof(footer); assert(remaining > 10); std::unique_ptr buffer(new char[remaining]); - is.ReadFull(buffer.get(), remaining); + is.ReadFull(lock, buffer.get(), remaining); /* read tags */ unsigned n = FromLE32(footer.count); diff --git a/src/tag/Id3Load.cxx b/src/tag/Id3Load.cxx index fe9db5c48..7494a8ac1 100644 --- a/src/tag/Id3Load.cxx +++ b/src/tag/Id3Load.cxx @@ -37,11 +37,12 @@ tag_is_id3v1(struct id3_tag *tag) noexcept } static long -get_id3v2_footer_size(InputStream &is, offset_type offset) +get_id3v2_footer_size(InputStream &is, std::unique_lock &lock, + offset_type offset) try { id3_byte_t buf[ID3_TAG_QUERYSIZE]; - is.Seek(offset); - is.ReadFull(buf, sizeof(buf)); + is.Seek(lock, offset); + is.ReadFull(lock, buf, sizeof(buf)); return id3_tag_query(buf, sizeof(buf)); } catch (...) { @@ -49,10 +50,10 @@ try { } static UniqueId3Tag -ReadId3Tag(InputStream &is) +ReadId3Tag(InputStream &is, std::unique_lock &lock) try { id3_byte_t query_buffer[ID3_TAG_QUERYSIZE]; - is.ReadFull(query_buffer, sizeof(query_buffer)); + is.ReadFull(lock, query_buffer, sizeof(query_buffer)); /* Look for a tag header */ long tag_size = id3_tag_query(query_buffer, sizeof(query_buffer)); @@ -72,7 +73,7 @@ try { /* now read the remaining bytes */ const size_t remaining = tag_size - sizeof(query_buffer); - is.ReadFull(end, remaining); + is.ReadFull(lock, end, remaining); return UniqueId3Tag(id3_tag_parse(tag_buffer.get(), tag_size)); } catch (...) { @@ -80,20 +81,20 @@ try { } static UniqueId3Tag -ReadId3Tag(InputStream &is, offset_type offset) +ReadId3Tag(InputStream &is, std::unique_lock &lock, offset_type offset) try { - is.Seek(offset); + is.Seek(lock, offset); - return ReadId3Tag(is); + return ReadId3Tag(is, lock); } catch (...) { return nullptr; } static UniqueId3Tag -ReadId3v1Tag(InputStream &is) +ReadId3v1Tag(InputStream &is, std::unique_lock &lock) try { id3_byte_t buffer[ID3V1_SIZE]; - is.ReadFull(buffer, ID3V1_SIZE); + is.ReadFull(lock, buffer, ID3V1_SIZE); return UniqueId3Tag(id3_tag_parse(buffer, ID3V1_SIZE)); } catch (...) { @@ -101,18 +102,19 @@ try { } static UniqueId3Tag -ReadId3v1Tag(InputStream &is, offset_type offset) +ReadId3v1Tag(InputStream &is, std::unique_lock &lock, + offset_type offset) try { - is.Seek(offset); - return ReadId3v1Tag(is); + is.Seek(lock, offset); + return ReadId3v1Tag(is, lock); } catch (...) { return nullptr; } static UniqueId3Tag -tag_id3_find_from_beginning(InputStream &is) +tag_id3_find_from_beginning(InputStream &is, std::unique_lock &lock) try { - auto tag = ReadId3Tag(is); + auto tag = ReadId3Tag(is, lock); if (!tag) { return nullptr; } else if (tag_is_id3v1(tag.get())) { @@ -129,7 +131,7 @@ try { break; /* Get the tag specified by the SEEK frame */ - auto seektag = ReadId3Tag(is, is.GetOffset() + seek); + auto seektag = ReadId3Tag(is, lock, is.GetOffset() + seek); if (!seektag || tag_is_id3v1(seektag.get())) break; @@ -143,7 +145,7 @@ try { } static UniqueId3Tag -tag_id3_find_from_end(InputStream &is) +tag_id3_find_from_end(InputStream &is, std::unique_lock &lock) try { if (!is.KnownSize() || !is.CheapSeeking()) return nullptr; @@ -155,7 +157,7 @@ try { offset_type offset = size - ID3V1_SIZE; /* Get an id3v1 tag from the end of file for later use */ - auto v1tag = ReadId3v1Tag(is, offset); + auto v1tag = ReadId3v1Tag(is, lock, offset); if (!v1tag) offset = size; @@ -164,7 +166,7 @@ try { return v1tag; long tag_offset = - get_id3v2_footer_size(is, offset - ID3_TAG_QUERYSIZE); + get_id3v2_footer_size(is, lock, offset - ID3_TAG_QUERYSIZE); if (tag_offset >= 0) return v1tag; @@ -173,7 +175,7 @@ try { return v1tag; /* Get the tag which the footer belongs to */ - auto tag = ReadId3Tag(is, offset - tag_size); + auto tag = ReadId3Tag(is, lock, offset - tag_size); if (!tag) return v1tag; @@ -184,13 +186,13 @@ try { } static UniqueId3Tag -tag_id3_riff_aiff_load(InputStream &is) +tag_id3_riff_aiff_load(InputStream &is, std::unique_lock &lock) try { size_t size; try { - size = riff_seek_id3(is); + size = riff_seek_id3(is, lock); } catch (...) { - size = aiff_seek_id3(is); + size = aiff_seek_id3(is, lock); } if (size > 4 * 1024 * 1024) @@ -198,7 +200,7 @@ try { return nullptr; std::unique_ptr buffer(new id3_byte_t[size]); - is.ReadFull(buffer.get(), size); + is.ReadFull(lock, buffer.get(), size); return UniqueId3Tag(id3_tag_parse(buffer.get(), size)); } catch (...) { @@ -208,13 +210,13 @@ try { UniqueId3Tag tag_id3_load(InputStream &is) try { - const std::lock_guard protect(is.mutex); + std::unique_lock lock(is.mutex); - auto tag = tag_id3_find_from_beginning(is); + auto tag = tag_id3_find_from_beginning(is, lock); if (tag == nullptr && is.CheapSeeking()) { - tag = tag_id3_riff_aiff_load(is); + tag = tag_id3_riff_aiff_load(is, lock); if (tag == nullptr) - tag = tag_id3_find_from_end(is); + tag = tag_id3_find_from_end(is, lock); } return tag; diff --git a/src/tag/Riff.cxx b/src/tag/Riff.cxx index 9671861fe..df4570fd0 100644 --- a/src/tag/Riff.cxx +++ b/src/tag/Riff.cxx @@ -39,14 +39,14 @@ struct riff_chunk_header { }; size_t -riff_seek_id3(InputStream &is) +riff_seek_id3(InputStream &is, std::unique_lock &lock) { /* seek to the beginning and read the RIFF header */ - is.Rewind(); + is.Rewind(lock); riff_header header; - is.ReadFull(&header, sizeof(header)); + is.ReadFull(lock, &header, sizeof(header)); if (memcmp(header.id, "RIFF", 4) != 0 || (is.KnownSize() && FromLE32(header.size) > is.GetSize())) throw std::runtime_error("Not a RIFF file"); @@ -55,7 +55,7 @@ riff_seek_id3(InputStream &is) /* read the chunk header */ riff_chunk_header chunk; - is.ReadFull(&chunk, sizeof(chunk)); + is.ReadFull(lock, &chunk, sizeof(chunk)); size_t size = FromLE32(chunk.size); if (size > size_t(std::numeric_limits::max())) @@ -72,6 +72,6 @@ riff_seek_id3(InputStream &is) /* pad byte */ ++size; - is.Skip(size); + is.Skip(lock, size); } } diff --git a/src/tag/Riff.hxx b/src/tag/Riff.hxx index b02cd8197..4a1aac45d 100644 --- a/src/tag/Riff.hxx +++ b/src/tag/Riff.hxx @@ -25,8 +25,11 @@ #ifndef MPD_RIFF_HXX #define MPD_RIFF_HXX +#include + #include +class Mutex; class InputStream; /** @@ -38,6 +41,6 @@ class InputStream; * @return the size of the ID3 chunk */ size_t -riff_seek_id3(InputStream &is); +riff_seek_id3(InputStream &is, std::unique_lock &lock); #endif diff --git a/test/TestRewindInputStream.cxx b/test/TestRewindInputStream.cxx index 051d5a9ea..3487edcd7 100644 --- a/test/TestRewindInputStream.cxx +++ b/test/TestRewindInputStream.cxx @@ -28,7 +28,8 @@ public: return remaining == 0; } - size_t Read(void *ptr, size_t read_size) override { + size_t Read(std::unique_lock &, + void *ptr, size_t read_size) override { size_t nbytes = std::min(remaining, read_size); memcpy(ptr, data, nbytes); data += nbytes; @@ -51,7 +52,7 @@ TEST(RewindInputStream, Basic) EXPECT_TRUE(ris.get() != sis); EXPECT_TRUE(ris != nullptr); - const std::lock_guard protect(mutex); + std::unique_lock lock(mutex); ris->Update(); EXPECT_TRUE(ris->IsReady()); @@ -59,50 +60,50 @@ TEST(RewindInputStream, Basic) EXPECT_EQ(offset_type(0), ris->GetOffset()); char buffer[16]; - size_t nbytes = ris->Read(buffer, 2); + size_t nbytes = ris->Read(lock, buffer, 2); EXPECT_EQ(size_t(2), nbytes); EXPECT_EQ('f', buffer[0]); EXPECT_EQ('o', buffer[1]); EXPECT_EQ(offset_type(2), ris->GetOffset()); EXPECT_FALSE(ris->IsEOF()); - nbytes = ris->Read(buffer, 2); + nbytes = ris->Read(lock, buffer, 2); EXPECT_EQ(size_t(2), nbytes); EXPECT_EQ('o', buffer[0]); EXPECT_EQ(' ', buffer[1]); EXPECT_EQ(offset_type(4), ris->GetOffset()); EXPECT_FALSE(ris->IsEOF()); - ris->Seek(1); + ris->Seek(lock, 1); EXPECT_EQ(offset_type(1), ris->GetOffset()); EXPECT_FALSE(ris->IsEOF()); - nbytes = ris->Read(buffer, 2); + nbytes = ris->Read(lock, buffer, 2); EXPECT_EQ(size_t(2), nbytes); EXPECT_EQ('o', buffer[0]); EXPECT_EQ('o', buffer[1]); EXPECT_EQ(offset_type(3), ris->GetOffset()); EXPECT_FALSE(ris->IsEOF()); - ris->Seek(0); + ris->Seek(lock, 0); EXPECT_EQ(offset_type(0), ris->GetOffset()); EXPECT_FALSE(ris->IsEOF()); - nbytes = ris->Read(buffer, 2); + nbytes = ris->Read(lock, buffer, 2); EXPECT_EQ(size_t(2), nbytes); EXPECT_EQ('f', buffer[0]); EXPECT_EQ('o', buffer[1]); EXPECT_EQ(offset_type(2), ris->GetOffset()); EXPECT_FALSE(ris->IsEOF()); - nbytes = ris->Read(buffer, sizeof(buffer)); + nbytes = ris->Read(lock, buffer, sizeof(buffer)); EXPECT_EQ(size_t(2), nbytes); EXPECT_EQ('o', buffer[0]); EXPECT_EQ(' ', buffer[1]); EXPECT_EQ(offset_type(4), ris->GetOffset()); EXPECT_FALSE(ris->IsEOF()); - nbytes = ris->Read(buffer, sizeof(buffer)); + nbytes = ris->Read(lock, buffer, sizeof(buffer)); EXPECT_EQ(size_t(3), nbytes); EXPECT_EQ('b', buffer[0]); EXPECT_EQ('a', buffer[1]); @@ -110,11 +111,11 @@ TEST(RewindInputStream, Basic) EXPECT_EQ(offset_type(7), ris->GetOffset()); EXPECT_TRUE(ris->IsEOF()); - ris->Seek(3); + ris->Seek(lock, 3); EXPECT_EQ(offset_type(3), ris->GetOffset()); EXPECT_FALSE(ris->IsEOF()); - nbytes = ris->Read(buffer, sizeof(buffer)); + nbytes = ris->Read(lock, buffer, sizeof(buffer)); EXPECT_EQ(size_t(4), nbytes); EXPECT_EQ(' ', buffer[0]); EXPECT_EQ('b', buffer[1]); diff --git a/test/run_input.cxx b/test/run_input.cxx index 7a5b88e3f..3b47daf4d 100644 --- a/test/run_input.cxx +++ b/test/run_input.cxx @@ -131,7 +131,7 @@ tag_save(FILE *file, const Tag &tag) static int dump_input_stream(InputStream *is) { - const std::lock_guard protect(is->mutex); + std::unique_lock lock(is->mutex); /* print meta data */ @@ -150,7 +150,7 @@ dump_input_stream(InputStream *is) } char buffer[4096]; - size_t num_read = is->Read(buffer, sizeof(buffer)); + size_t num_read = is->Read(lock, buffer, sizeof(buffer)); if (num_read == 0) break;