input/InputStream: pass std::span<std::byte> to Read()

This commit is contained in:
Max Kellermann
2024-05-13 10:47:57 +02:00
parent f28d10d934
commit 34f7b38f39
66 changed files with 248 additions and 274 deletions

View File

@@ -166,7 +166,7 @@ AsyncInputStream::IsAvailable() const noexcept
size_t
AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size)
std::span<std::byte> dest)
{
assert(!GetEventLoop().IsInside());
@@ -185,8 +185,8 @@ AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
cond_handler.cond.wait(lock);
}
const size_t nbytes = std::min(read_size, r.size());
memcpy(ptr, r.data(), nbytes);
const size_t nbytes = std::min(dest.size(), r.size());
memcpy(dest.data(), r.data(), nbytes);
buffer.Consume(nbytes);
offset += (offset_type)nbytes;

View File

@@ -72,7 +72,7 @@ public:
std::unique_ptr<Tag> ReadTag() noexcept final;
bool IsAvailable() const noexcept final;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size) final;
std::span<std::byte> dest) final;
protected:
/**

View File

@@ -48,9 +48,9 @@ BufferedInputStream::IsAvailable() const noexcept
size_t
BufferedInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t s)
std::span<std::byte> dest)
{
size_t nbytes = BufferingInputStream::Read(lock, offset, ptr, s);
size_t nbytes = BufferingInputStream::Read(lock, offset, dest);
InputStream::offset += nbytes;
return nbytes;
}

View File

@@ -1,8 +1,7 @@
// SPDX-License-Identifier: GPL-2.0-or-later
// Copyright The Music Player Daemon Project
#ifndef MPD_BUFFERED_INPUT_STREAM_BUFFER_HXX
#define MPD_BUFFERED_INPUT_STREAM_BUFFER_HXX
#pragma once
#include "InputStream.hxx"
#include "BufferingInputStream.hxx"
@@ -44,7 +43,7 @@ public:
// std::unique_ptr<Tag> ReadTag() override;
bool IsAvailable() const noexcept override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
std::span<std::byte> dest) override;
private:
/* virtual methods from class BufferingInputStream */
@@ -52,5 +51,3 @@ private:
InvokeOnAvailable();
}
};
#endif

View File

@@ -59,7 +59,7 @@ BufferingInputStream::IsAvailable(size_t offset) const noexcept
size_t
BufferingInputStream::Read(std::unique_lock<Mutex> &lock, size_t offset,
void *ptr, size_t s)
std::span<std::byte> dest)
{
if (offset >= size())
return 0;
@@ -68,8 +68,8 @@ BufferingInputStream::Read(std::unique_lock<Mutex> &lock, size_t offset,
auto r = buffer.Read(offset);
if (r.HasData()) {
/* yay, we have some data */
size_t nbytes = std::min(s, r.defined_buffer.size());
memcpy(ptr, r.defined_buffer.data(), nbytes);
size_t nbytes = std::min(dest.size(), r.defined_buffer.size());
memcpy(dest.data(), r.defined_buffer.data(), nbytes);
return nbytes;
}
@@ -148,9 +148,10 @@ BufferingInputStream::RunThreadLocked(std::unique_lock<Mutex> &lock)
data has been read */
constexpr size_t MAX_READ = 64 * 1024;
size_t nbytes = input->Read(lock, w.data(),
std::min(w.size(),
MAX_READ));
if (w.size() > MAX_READ)
w = w.first(MAX_READ);
size_t nbytes = input->Read(lock, w);
buffer.Commit(read_offset, read_offset + nbytes);
client_cond.notify_all();

View File

@@ -1,8 +1,7 @@
// SPDX-License-Identifier: GPL-2.0-or-later
// Copyright The Music Player Daemon Project
#ifndef MPD_BUFFERING_INPUT_STREAM_BUFFER_HXX
#define MPD_BUFFERING_INPUT_STREAM_BUFFER_HXX
#pragma once
#include "Ptr.hxx"
#include "Handler.hxx"
@@ -11,6 +10,7 @@
#include "thread/Cond.hxx"
#include "util/SparseBuffer.hxx"
#include <cstddef>
#include <exception>
/**
@@ -39,7 +39,7 @@ private:
*/
Cond client_cond;
SparseBuffer<uint8_t> buffer;
SparseBuffer<std::byte> buffer;
bool stop = false;
@@ -96,7 +96,7 @@ public:
* @return the number of bytes copied into the given pointer.
*/
size_t Read(std::unique_lock<Mutex> &lock, size_t offset,
void *ptr, size_t size);
std::span<std::byte> dest);
protected:
/**
@@ -122,5 +122,3 @@ private:
wake_cond.notify_one();
}
};
#endif

View File

@@ -38,7 +38,7 @@ public:
return false;
}
size_t Read(std::unique_lock<Mutex> &, void *, size_t) override {
size_t Read(std::unique_lock<Mutex> &, std::span<std::byte>) override {
std::rethrow_exception(error);
}
};

View File

@@ -68,20 +68,20 @@ IcyInputStream::ReadTag() noexcept
size_t
IcyInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size)
std::span<std::byte> dest)
{
if (!IsEnabled())
return ProxyInputStream::Read(lock, ptr, read_size);
return ProxyInputStream::Read(lock, dest);
while (true) {
size_t nbytes = ProxyInputStream::Read(lock, ptr, read_size);
size_t nbytes = ProxyInputStream::Read(lock, dest);
if (nbytes == 0) {
assert(IsEOF());
offset = override_offset;
return 0;
}
size_t result = parser->ParseInPlace({static_cast<std::byte *>(ptr), nbytes});
size_t result = parser->ParseInPlace(dest.first(nbytes));
if (result > 0) {
override_offset += result;
offset = override_offset;

View File

@@ -53,7 +53,7 @@ public:
void Update() noexcept override;
std::unique_ptr<Tag> ReadTag() noexcept override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
std::span<std::byte> dest) override;
};
#endif

View File

@@ -92,45 +92,35 @@ InputStream::IsAvailable() const noexcept
}
size_t
InputStream::LockRead(void *ptr, size_t _size)
InputStream::LockRead(std::span<std::byte> dest)
{
#if !CLANG_CHECK_VERSION(3,6)
/* disabled on clang due to -Wtautological-pointer-compare */
assert(ptr != nullptr);
#endif
assert(_size > 0);
assert(!dest.empty());
std::unique_lock<Mutex> lock(mutex);
return Read(lock, ptr, _size);
return Read(lock, dest);
}
void
InputStream::ReadFull(std::unique_lock<Mutex> &lock, void *_ptr, size_t _size)
InputStream::ReadFull(std::unique_lock<Mutex> &lock, std::span<std::byte> dest)
{
auto *ptr = (uint8_t *)_ptr;
assert(!dest.empty());
size_t nbytes_total = 0;
while (_size > 0) {
size_t nbytes = Read(lock, ptr + nbytes_total, _size);
do {
std::size_t nbytes = Read(lock, dest);
if (nbytes == 0)
throw std::runtime_error("Unexpected end of file");
nbytes_total += nbytes;
_size -= nbytes;
}
dest = dest.subspan(nbytes);
} while (!dest.empty());
}
void
InputStream::LockReadFull(void *ptr, size_t _size)
InputStream::LockReadFull(std::span<std::byte> dest)
{
#if !CLANG_CHECK_VERSION(3,6)
/* disabled on clang due to -Wtautological-pointer-compare */
assert(ptr != nullptr);
#endif
assert(_size > 0);
assert(!dest.empty());
std::unique_lock<Mutex> lock(mutex);
ReadFull(lock, ptr, _size);
ReadFull(lock, dest);
}
bool

View File

@@ -1,15 +1,16 @@
// SPDX-License-Identifier: GPL-2.0-or-later
// Copyright The Music Player Daemon Project
#ifndef MPD_INPUT_STREAM_HXX
#define MPD_INPUT_STREAM_HXX
#pragma once
#include "Offset.hxx"
#include "Ptr.hxx"
#include "thread/Mutex.hxx"
#include <cassert>
#include <cstddef>
#include <memory>
#include <span>
#include <string>
#include <utility>
@@ -350,9 +351,8 @@ public:
* @param size the maximum number of bytes to read
* @return the number of bytes read
*/
[[gnu::nonnull]]
virtual size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) = 0;
virtual std::size_t Read(std::unique_lock<Mutex> &lock,
std::span<std::byte> dest) = 0;
/**
* Wrapper for Read() which locks and unlocks the mutex;
@@ -360,8 +360,7 @@ public:
*
* Throws std::runtime_error on error.
*/
[[gnu::nonnull]]
size_t LockRead(void *ptr, size_t size);
std::size_t LockRead(std::span<std::byte> dest);
/**
* Reads the whole data from the stream into the caller-supplied buffer.
@@ -374,8 +373,7 @@ public:
* @param size the number of bytes to read
* @return true if the whole data was read, false otherwise.
*/
[[gnu::nonnull]]
void ReadFull(std::unique_lock<Mutex> &lock, void *ptr, size_t size);
void ReadFull(std::unique_lock<Mutex> &lock, std::span<std::byte> dest);
/**
* Wrapper for ReadFull() which locks and unlocks the mutex;
@@ -383,8 +381,7 @@ public:
*
* Throws std::runtime_error on error.
*/
[[gnu::nonnull]]
void LockReadFull(void *ptr, size_t size);
void LockReadFull(std::span<std::byte> dest);
protected:
void InvokeOnReady() noexcept;
@@ -410,5 +407,3 @@ public:
is.SetHandler(old_handler);
}
};
#endif

View File

@@ -105,11 +105,11 @@ ProxyInputStream::IsAvailable() const noexcept
size_t
ProxyInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size)
std::span<std::byte> dest)
{
set_input_cond.wait(lock, [this]{ return !!input; });
size_t nbytes = input->Read(lock, ptr, read_size);
size_t nbytes = input->Read(lock, dest);
CopyAttributes();
return nbytes;
}

View File

@@ -50,7 +50,7 @@ public:
std::unique_ptr<Tag> ReadTag() noexcept override;
bool IsAvailable() const noexcept override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size) override;
std::span<std::byte> dest) override;
protected:
/**

View File

@@ -7,7 +7,7 @@
std::size_t
InputStreamReader::Read(std::span<std::byte> dest)
{
size_t nbytes = is.LockRead(dest.data(), dest.size());
size_t nbytes = is.LockRead(dest);
assert(nbytes > 0 || is.IsEOF());
return nbytes;

View File

@@ -28,7 +28,7 @@ class RewindInputStream final : public ProxyInputStream {
* The origin of this buffer is always the beginning of the
* stream (offset 0).
*/
char buffer[64 * 1024];
std::byte buffer[64 * 1024];
public:
explicit RewindInputStream(InputStreamPtr _input)
@@ -46,7 +46,7 @@ public:
}
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
std::span<std::byte> dest) override;
void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override;
private:
@@ -61,7 +61,7 @@ private:
size_t
RewindInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size)
std::span<std::byte> dest)
{
if (ReadingFromBuffer()) {
/* buffered read */
@@ -69,18 +69,18 @@ RewindInputStream::Read(std::unique_lock<Mutex> &lock,
assert(head == (size_t)offset);
assert(tail == (size_t)input->GetOffset());
if (read_size > tail - head)
read_size = tail - head;
if (dest.size() > tail - head)
dest = dest.first(tail - head);
memcpy(ptr, buffer + head, read_size);
head += read_size;
offset += read_size;
memcpy(dest.data(), buffer + head, dest.size());
head += dest.size();
offset += dest.size();
return read_size;
return dest.size();
} else {
/* pass method call to underlying stream */
size_t nbytes = input->Read(lock, ptr, read_size);
size_t nbytes = input->Read(lock, dest);
if (std::cmp_greater(input->GetOffset(), sizeof(buffer)))
/* disable buffering */
@@ -88,7 +88,7 @@ RewindInputStream::Read(std::unique_lock<Mutex> &lock,
else if (tail == (size_t)offset) {
/* append to buffer */
memcpy(buffer + tail, ptr, nbytes);
memcpy(buffer + tail, dest.data(), nbytes);
tail += nbytes;
assert(tail == (size_t)input->GetOffset());

View File

@@ -39,7 +39,7 @@ TextInputStream::ReadLine()
character */
dest = dest.first(dest.size() - 1);
size_t nbytes = is->LockRead(dest.data(), dest.size());
size_t nbytes = is->LockRead(std::as_writable_bytes(dest));
buffer.Append(nbytes);

View File

@@ -117,7 +117,7 @@ ThreadInputStream::IsAvailable() const noexcept
inline size_t
ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size)
std::span<std::byte> dest)
{
assert(!thread.IsInside());
@@ -129,8 +129,8 @@ ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
auto r = buffer.Read();
if (!r.empty()) {
size_t nbytes = std::min(read_size, r.size());
memcpy(ptr, r.data(), nbytes);
size_t nbytes = std::min(dest.size(), r.size());
memcpy(dest.data(), r.data(), nbytes);
buffer.Consume(nbytes);
wake_cond.notify_all();
offset += nbytes;

View File

@@ -78,7 +78,7 @@ public:
bool IsEOF() const noexcept final;
bool IsAvailable() const noexcept final;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override final;
std::span<std::byte> dest) override final;
protected:
/**

View File

@@ -51,7 +51,7 @@ CacheInputStream::IsAvailable() const noexcept
size_t
CacheInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t read_size)
std::span<std::byte> dest)
{
const auto _offset = offset;
auto &i = GetCacheItem();
@@ -62,7 +62,7 @@ CacheInputStream::Read(std::unique_lock<Mutex> &lock,
const ScopeUnlock unlock(mutex);
const std::scoped_lock<Mutex> protect(i.mutex);
nbytes = i.Read(lock, _offset, ptr, read_size);
nbytes = i.Read(lock, _offset, dest);
}
offset += nbytes;

View File

@@ -1,8 +1,7 @@
// SPDX-License-Identifier: GPL-2.0-or-later
// Copyright The Music Player Daemon Project
#ifndef MPD_CACHE_INPUT_STREAM_HXX
#define MPD_CACHE_INPUT_STREAM_HXX
#pragma once
#include "Lease.hxx"
#include "input/InputStream.hxx"
@@ -26,11 +25,9 @@ public:
// std::unique_ptr<Tag> ReadTag() override;
bool IsAvailable() const noexcept override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
std::span<std::byte> dest) override;
private:
/* virtual methods from class InputCacheLease */
void OnInputCacheAvailable() noexcept override;
};
#endif

View File

@@ -84,7 +84,7 @@ class CdioParanoiaInputStream final : public InputStream {
/* virtual methods from InputStream */
[[nodiscard]] bool IsEOF() const noexcept override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
std::span<std::byte> dest) override;
void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override;
};
@@ -305,7 +305,7 @@ CdioParanoiaInputStream::Seek(std::unique_lock<Mutex> &,
size_t
CdioParanoiaInputStream::Read(std::unique_lock<Mutex> &,
void *ptr, size_t length)
std::span<std::byte> dest)
{
/* end of track ? */
if (IsEOF())
@@ -342,10 +342,10 @@ CdioParanoiaInputStream::Read(std::unique_lock<Mutex> &,
}
const size_t maxwrite = CDIO_CD_FRAMESIZE_RAW - diff; //# of bytes pending in current buffer
const std::size_t nbytes = std::min(length, maxwrite);
const std::size_t nbytes = std::min(dest.size(), maxwrite);
//skip diff bytes from this lsn
memcpy(ptr, ((const char *)rbuf) + diff, nbytes);
memcpy(dest.data(), ((const char *)rbuf) + diff, nbytes);
//update offset
offset += nbytes;

View File

@@ -34,7 +34,7 @@ public:
/* virtual methods from InputStream */
[[nodiscard]] bool IsEOF() const noexcept override;
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
std::span<std::byte> dest) override;
void Seek(std::unique_lock<Mutex> &lock,
offset_type offset) override;
};
@@ -91,13 +91,13 @@ input_ffmpeg_open(const char *uri,
size_t
FfmpegInputStream::Read(std::unique_lock<Mutex> &,
void *ptr, size_t read_size)
std::span<std::byte> dest)
{
size_t result;
{
const ScopeUnlock unlock(mutex);
result = io.Read(ptr, read_size);
result = io.Read(dest);
}
offset += result;

View File

@@ -33,7 +33,7 @@ public:
}
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
std::span<std::byte> dest) override;
void Seek(std::unique_lock<Mutex> &lock,
offset_type offset) override;
};
@@ -71,14 +71,13 @@ FileInputStream::Seek(std::unique_lock<Mutex> &,
}
size_t
FileInputStream::Read(std::unique_lock<Mutex> &,
void *ptr, size_t read_size)
FileInputStream::Read(std::unique_lock<Mutex> &, std::span<std::byte> dest)
{
size_t nbytes;
{
const ScopeUnlock unlock(mutex);
nbytes = reader.Read({static_cast<std::byte *>(ptr), read_size});
nbytes = reader.Read(dest);
}
if (nbytes == 0 && !IsEOF())

View File

@@ -40,7 +40,7 @@ public:
}
size_t Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t size) override;
std::span<std::byte> dest) override;
void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override;
};
@@ -88,13 +88,13 @@ input_smbclient_open(const char *uri,
size_t
SmbclientInputStream::Read(std::unique_lock<Mutex> &,
void *ptr, size_t read_size)
std::span<std::byte> dest)
{
ssize_t nbytes;
{
const ScopeUnlock unlock(mutex);
nbytes = ctx.Read(handle, ptr, read_size);
nbytes = ctx.Read(handle, dest.data(), dest.size());
}
if (nbytes < 0)