input/InputStreams: pass std::unique_lock<> to various methods

This commit is contained in:
Max Kellermann
2019-04-26 19:19:45 +02:00
parent 040573c636
commit 1b5c1f75a4
33 changed files with 212 additions and 156 deletions

View File

@@ -95,7 +95,8 @@ AsyncInputStream::IsEOF() noexcept
}
void
AsyncInputStream::Seek(offset_type new_offset)
AsyncInputStream::Seek(std::unique_lock<Mutex> &lock,
offset_type new_offset)
{
assert(IsReady());
assert(seek_state == SeekState::NONE);
@@ -136,7 +137,7 @@ AsyncInputStream::Seek(offset_type new_offset)
CondInputStreamHandler cond_handler;
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
while (seek_state != SeekState::NONE)
cond_handler.cond.wait(mutex);
cond_handler.cond.wait(lock);
Check();
}
@@ -171,7 +172,8 @@ AsyncInputStream::IsAvailable() noexcept
}
size_t
AsyncInputStream::Read(void *ptr, size_t read_size)
AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size)
{
assert(!GetEventLoop().IsInside());
@@ -187,7 +189,7 @@ AsyncInputStream::Read(void *ptr, size_t read_size)
break;
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
cond_handler.cond.wait(mutex);
cond_handler.cond.wait(lock);
}
const size_t nbytes = std::min(read_size, r.size);

View File

@@ -83,10 +83,12 @@ public:
/* virtual methods from InputStream */
void Check() final;
bool IsEOF() noexcept final;
void Seek(offset_type new_offset) final;
void Seek(std::unique_lock<Mutex> &lock,
offset_type new_offset) final;
std::unique_ptr<Tag> ReadTag() noexcept final;
bool IsAvailable() noexcept final;
size_t Read(void *ptr, size_t read_size) final;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size) final;
protected:
/**

View File

@@ -64,7 +64,8 @@ BufferedInputStream::Check()
}
void
BufferedInputStream::Seek(offset_type new_offset)
BufferedInputStream::Seek(std::unique_lock<Mutex> &lock,
offset_type new_offset)
{
if (new_offset >= size) {
offset = size;
@@ -84,7 +85,7 @@ BufferedInputStream::Seek(offset_type new_offset)
wake_cond.notify_one();
while (seek)
client_cond.wait(mutex);
client_cond.wait(lock);
if (seek_error)
std::rethrow_exception(std::exchange(seek_error, {}));
@@ -105,7 +106,8 @@ BufferedInputStream::IsAvailable() noexcept
}
size_t
BufferedInputStream::Read(void *ptr, size_t s)
BufferedInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t s)
{
if (offset >= size)
return 0;
@@ -140,7 +142,7 @@ BufferedInputStream::Read(void *ptr, size_t s)
wake_cond.notify_one();
}
client_cond.wait(mutex);
client_cond.wait(lock);
}
}
@@ -156,7 +158,7 @@ BufferedInputStream::RunThread() noexcept
if (seek) {
try {
input->Seek(seek_offset);
input->Seek(lock, seek_offset);
} catch (...) {
seek_error = std::current_exception();
}
@@ -183,7 +185,7 @@ BufferedInputStream::RunThread() noexcept
offset to prepare filling
the buffer from there */
try {
input->Seek(offset);
input->Seek(lock, offset);
} catch (...) {
read_error = std::current_exception();
client_cond.notify_one();
@@ -195,7 +197,8 @@ BufferedInputStream::RunThread() noexcept
}
try {
size_t nbytes = input->Read(w.data, w.size);
size_t nbytes = input->Read(lock,
w.data, w.size);
buffer.Commit(read_offset,
read_offset + nbytes);
} catch (...) {

View File

@@ -85,12 +85,13 @@ public:
/* we don't need to implement Update() because all attributes
have been copied already in our constructor */
//void Update() noexcept;
void Seek(offset_type offset) override;
void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override;
bool IsEOF() noexcept override;
/* we don't support tags */
// std::unique_ptr<Tag> ReadTag() override;
bool IsAvailable() noexcept override;
size_t Read(void *ptr, size_t size) override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
/* virtual methods from class InputStreamHandler */
void OnInputStreamReady() noexcept override {

View File

@@ -45,7 +45,7 @@ public:
std::rethrow_exception(error);
}
void Seek(offset_type) override {
void Seek(std::unique_lock<Mutex> &, offset_type) override {
std::rethrow_exception(error);
}
@@ -53,7 +53,7 @@ public:
return false;
}
size_t Read(void *, size_t) override {
size_t Read(std::unique_lock<Mutex> &, void *, size_t) override {
std::rethrow_exception(error);
}
};

View File

@@ -80,13 +80,14 @@ IcyInputStream::ReadTag() noexcept
}
size_t
IcyInputStream::Read(void *ptr, size_t read_size)
IcyInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size)
{
if (!IsEnabled())
return ProxyInputStream::Read(ptr, read_size);
return ProxyInputStream::Read(lock, ptr, read_size);
while (true) {
size_t nbytes = ProxyInputStream::Read(ptr, read_size);
size_t nbytes = ProxyInputStream::Read(lock, ptr, read_size);
if (nbytes == 0)
return 0;

View File

@@ -66,7 +66,8 @@ public:
/* virtual methods from InputStream */
void Update() noexcept override;
std::unique_ptr<Tag> ReadTag() noexcept override;
size_t Read(void *ptr, size_t size) override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
};
#endif

View File

@@ -72,7 +72,7 @@ InputStream::CheapSeeking() const noexcept
}
void
InputStream::Seek(gcc_unused offset_type new_offset)
InputStream::Seek(std::unique_lock<Mutex> &, gcc_unused offset_type new_offset)
{
throw std::runtime_error("Seeking is not implemented");
}
@@ -80,15 +80,15 @@ InputStream::Seek(gcc_unused offset_type new_offset)
void
InputStream::LockSeek(offset_type _offset)
{
const std::lock_guard<Mutex> protect(mutex);
Seek(_offset);
std::unique_lock<Mutex> lock(mutex);
Seek(lock, _offset);
}
void
InputStream::LockSkip(offset_type _offset)
{
const std::lock_guard<Mutex> protect(mutex);
Skip(_offset);
std::unique_lock<Mutex> lock(mutex);
Skip(lock, _offset);
}
std::unique_ptr<Tag>
@@ -119,18 +119,18 @@ InputStream::LockRead(void *ptr, size_t _size)
#endif
assert(_size > 0);
const std::lock_guard<Mutex> protect(mutex);
return Read(ptr, _size);
std::unique_lock<Mutex> lock(mutex);
return Read(lock, ptr, _size);
}
void
InputStream::ReadFull(void *_ptr, size_t _size)
InputStream::ReadFull(std::unique_lock<Mutex> &lock, void *_ptr, size_t _size)
{
uint8_t *ptr = (uint8_t *)_ptr;
size_t nbytes_total = 0;
while (_size > 0) {
size_t nbytes = Read(ptr + nbytes_total, _size);
size_t nbytes = Read(lock, ptr + nbytes_total, _size);
if (nbytes == 0)
throw std::runtime_error("Unexpected end of file");
@@ -148,8 +148,8 @@ InputStream::LockReadFull(void *ptr, size_t _size)
#endif
assert(_size > 0);
const std::lock_guard<Mutex> protect(mutex);
ReadFull(ptr, _size);
std::unique_lock<Mutex> lock(mutex);
ReadFull(lock, ptr, _size);
}
bool

View File

@@ -271,9 +271,11 @@ public:
*
* Throws std::runtime_error on error.
*
* @param lock the locked mutex; may be used to wait on
* condition variables
* @param offset the relative offset
*/
virtual void Seek(offset_type offset);
virtual void Seek(std::unique_lock<Mutex> &lock, offset_type offset);
/**
* Wrapper for Seek() which locks and unlocks the mutex; the
@@ -285,8 +287,8 @@ public:
* Rewind to the beginning of the stream. This is a wrapper
* for Seek(0, error).
*/
void Rewind() {
Seek(0);
void Rewind(std::unique_lock<Mutex> &lock) {
Seek(lock, 0);
}
void LockRewind() {
@@ -296,8 +298,9 @@ public:
/**
* Skip input bytes.
*/
void Skip(offset_type _offset) {
Seek(GetOffset() + _offset);
void Skip(std::unique_lock<Mutex> &lock,
offset_type _offset) {
Seek(lock, GetOffset() + _offset);
}
void LockSkip(offset_type _offset);
@@ -351,12 +354,15 @@ public:
*
* Throws std::runtime_error on error.
*
* @param lock the locked mutex; may be used to wait on
* condition variables
* @param ptr the buffer to read into
* @param size the maximum number of bytes to read
* @return the number of bytes read
*/
gcc_nonnull_all
virtual size_t Read(void *ptr, size_t size) = 0;
virtual size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) = 0;
/**
* Wrapper for Read() which locks and unlocks the mutex;
@@ -379,7 +385,7 @@ public:
* @return true if the whole data was read, false otherwise.
*/
gcc_nonnull_all
void ReadFull(void *ptr, size_t size);
void ReadFull(std::unique_lock<Mutex> &lock, void *ptr, size_t size);
/**
* Wrapper for ReadFull() which locks and unlocks the mutex;

View File

@@ -89,12 +89,13 @@ ProxyInputStream::Update() noexcept
}
void
ProxyInputStream::Seek(offset_type new_offset)
ProxyInputStream::Seek(std::unique_lock<Mutex> &lock,
offset_type new_offset)
{
while (!input)
set_input_cond.wait(mutex);
set_input_cond.wait(lock);
input->Seek(new_offset);
input->Seek(lock, new_offset);
CopyAttributes();
}
@@ -120,12 +121,13 @@ ProxyInputStream::IsAvailable() noexcept
}
size_t
ProxyInputStream::Read(void *ptr, size_t read_size)
ProxyInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size)
{
while (!input)
set_input_cond.wait(mutex);
set_input_cond.wait(lock);
size_t nbytes = input->Read(ptr, read_size);
size_t nbytes = input->Read(lock, ptr, read_size);
CopyAttributes();
return nbytes;
}

View File

@@ -60,11 +60,13 @@ public:
/* virtual methods from InputStream */
void Check() override;
void Update() noexcept override;
void Seek(offset_type new_offset) override;
void Seek(std::unique_lock<Mutex> &lock,
offset_type new_offset) override;
bool IsEOF() noexcept override;
std::unique_ptr<Tag> ReadTag() noexcept override;
bool IsAvailable() noexcept override;
size_t Read(void *ptr, size_t read_size) override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size) override;
protected:
/**

View File

@@ -60,8 +60,9 @@ public:
return !ReadingFromBuffer() && ProxyInputStream::IsEOF();
}
size_t Read(void *ptr, size_t size) override;
void Seek(offset_type offset) override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override;
private:
/**
@@ -74,7 +75,8 @@ private:
};
size_t
RewindInputStream::Read(void *ptr, size_t read_size)
RewindInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size)
{
if (ReadingFromBuffer()) {
/* buffered read */
@@ -93,7 +95,7 @@ RewindInputStream::Read(void *ptr, size_t read_size)
} else {
/* pass method call to underlying stream */
size_t nbytes = input->Read(ptr, read_size);
size_t nbytes = input->Read(lock, ptr, read_size);
if (input->GetOffset() > (offset_type)sizeof(buffer))
/* disable buffering */
@@ -114,7 +116,7 @@ RewindInputStream::Read(void *ptr, size_t read_size)
}
void
RewindInputStream::Seek(offset_type new_offset)
RewindInputStream::Seek(std::unique_lock<Mutex> &lock, offset_type new_offset)
{
assert(IsReady());
@@ -132,7 +134,7 @@ RewindInputStream::Seek(offset_type new_offset)
buffered range now */
tail = 0;
ProxyInputStream::Seek(new_offset);
ProxyInputStream::Seek(lock, new_offset);
}
}

View File

@@ -130,7 +130,8 @@ ThreadInputStream::IsAvailable() noexcept
}
inline size_t
ThreadInputStream::Read(void *ptr, size_t read_size)
ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size)
{
assert(!thread.IsInside());
@@ -154,7 +155,7 @@ ThreadInputStream::Read(void *ptr, size_t read_size)
return 0;
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
cond_handler.cond.wait(mutex);
cond_handler.cond.wait(lock);
}
}

View File

@@ -94,7 +94,8 @@ public:
void Check() override final;
bool IsEOF() noexcept final;
bool IsAvailable() noexcept final;
size_t Read(void *ptr, size_t size) override final;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override final;
protected:
/**

View File

@@ -91,8 +91,9 @@ class CdioParanoiaInputStream final : public InputStream {
/* virtual methods from InputStream */
bool IsEOF() noexcept override;
size_t Read(void *ptr, size_t size) override;
void Seek(offset_type offset) override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override;
};
static constexpr Domain cdio_domain("cdio");
@@ -255,7 +256,8 @@ input_cdio_open(const char *uri,
}
void
CdioParanoiaInputStream::Seek(offset_type new_offset)
CdioParanoiaInputStream::Seek(std::unique_lock<Mutex> &,
offset_type new_offset)
{
if (new_offset > size)
throw FormatRuntimeError("Invalid offset to seek %ld (%ld)",
@@ -276,7 +278,8 @@ CdioParanoiaInputStream::Seek(offset_type new_offset)
}
size_t
CdioParanoiaInputStream::Read(void *ptr, size_t length)
CdioParanoiaInputStream::Read(std::unique_lock<Mutex> &,
void *ptr, size_t length)
{
size_t nbytes = 0;
char *wptr = (char *) ptr;

View File

@@ -49,8 +49,10 @@ public:
/* virtual methods from InputStream */
bool IsEOF() noexcept override;
size_t Read(void *ptr, size_t size) override;
void Seek(offset_type offset) override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
void Seek(std::unique_lock<Mutex> &lock,
offset_type offset) override;
};
gcc_const
@@ -79,7 +81,8 @@ input_ffmpeg_open(const char *uri,
}
size_t
FfmpegInputStream::Read(void *ptr, size_t read_size)
FfmpegInputStream::Read(std::unique_lock<Mutex> &,
void *ptr, size_t read_size)
{
size_t result;
@@ -99,7 +102,7 @@ FfmpegInputStream::IsEOF() noexcept
}
void
FfmpegInputStream::Seek(offset_type new_offset)
FfmpegInputStream::Seek(std::unique_lock<Mutex> &, offset_type new_offset)
{
uint64_t result;

View File

@@ -48,8 +48,10 @@ public:
return GetOffset() >= GetSize();
}
size_t Read(void *ptr, size_t size) override;
void Seek(offset_type offset) override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
void Seek(std::unique_lock<Mutex> &lock,
offset_type offset) override;
};
InputStreamPtr
@@ -74,7 +76,8 @@ OpenFileInputStream(Path path, Mutex &mutex)
}
void
FileInputStream::Seek(offset_type new_offset)
FileInputStream::Seek(std::unique_lock<Mutex> &,
offset_type new_offset)
{
{
const ScopeUnlock unlock(mutex);
@@ -85,7 +88,8 @@ FileInputStream::Seek(offset_type new_offset)
}
size_t
FileInputStream::Read(void *ptr, size_t read_size)
FileInputStream::Read(std::unique_lock<Mutex> &,
void *ptr, size_t read_size)
{
size_t nbytes;

View File

@@ -56,8 +56,9 @@ public:
return offset >= size;
}
size_t Read(void *ptr, size_t size) override;
void Seek(offset_type offset) override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override;
};
/*
@@ -118,7 +119,8 @@ input_smbclient_open(const char *uri,
}
size_t
SmbclientInputStream::Read(void *ptr, size_t read_size)
SmbclientInputStream::Read(std::unique_lock<Mutex> &,
void *ptr, size_t read_size)
{
ssize_t nbytes;
@@ -136,7 +138,8 @@ SmbclientInputStream::Read(void *ptr, size_t read_size)
}
void
SmbclientInputStream::Seek(offset_type new_offset)
SmbclientInputStream::Seek(std::unique_lock<Mutex> &,
offset_type new_offset)
{
off_t result;