input/{async,thread}: move code to ReadFromBuffer()

This commit is contained in:
Max Kellermann 2025-01-29 21:12:03 +01:00
parent abc8420697
commit 950f5f4d32
4 changed files with 54 additions and 37 deletions

@ -159,26 +159,12 @@ AsyncInputStream::IsAvailable() const noexcept
!buffer.empty();
}
size_t
AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
std::span<std::byte> dest)
inline std::size_t
AsyncInputStream::ReadFromBuffer(std::span<std::byte> dest) noexcept
{
assert(!GetEventLoop().IsInside());
/* wait for data */
CircularBuffer<std::byte>::Range r;
while (true) {
Check();
r = buffer.Read();
if (!r.empty())
break;
if (IsEOF())
return 0;
caller_cond.wait(lock);
}
const auto r = buffer.Read();
if (r.empty())
return 0;
const size_t nbytes = std::min(dest.size(), r.size());
memcpy(dest.data(), r.data(), nbytes);
@ -191,13 +177,33 @@ AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
buffer.Clear();
offset += (offset_type)nbytes;
if (paused && buffer.GetSize() < resume_at)
deferred_resume.Schedule();
return nbytes;
}
size_t
AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
std::span<std::byte> dest)
{
assert(!GetEventLoop().IsInside());
/* wait for data */
while (true) {
Check();
if (std::size_t nbytes = ReadFromBuffer(dest); nbytes > 0) {
if (paused && buffer.GetSize() < resume_at)
deferred_resume.Schedule();
return nbytes;
}
if (IsEOF())
return 0;
caller_cond.wait(lock);
}
}
void
AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept
{

@ -152,6 +152,7 @@ protected:
void SeekDone() noexcept;
private:
std::size_t ReadFromBuffer(std::span<std::byte> dest) noexcept;
void Resume();
/* for InjectEvent */

@ -149,6 +149,27 @@ ThreadInputStream::Seek([[maybe_unused]] std::unique_lock<Mutex> &lock,
wake_cond.notify_one();
}
inline std::size_t
ThreadInputStream::ReadFromBuffer(std::span<std::byte> dest) noexcept
{
const auto r = buffer.Read();
if (r.empty())
return 0;
const size_t nbytes = std::min(dest.size(), r.size());
memcpy(dest.data(), r.data(), nbytes);
buffer.Consume(nbytes);
if (buffer.empty())
/* when the buffer becomes empty, reset its head and
tail so the next write can fill the whole buffer
and not just the part after the tail */
buffer.Clear();
offset += (offset_type)nbytes;
return nbytes;
}
size_t
ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
std::span<std::byte> dest)
@ -164,21 +185,8 @@ ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
continue;
}
auto r = buffer.Read();
if (!r.empty()) {
size_t nbytes = std::min(dest.size(), r.size());
memcpy(dest.data(), r.data(), nbytes);
buffer.Consume(nbytes);
if (buffer.empty())
/* when the buffer becomes empty,
reset its head and tail so the next
write can fill the whole buffer and
not just the part after the tail */
buffer.Clear();
if (std::size_t nbytes = ReadFromBuffer(dest); nbytes > 0) {
wake_cond.notify_one();
offset += nbytes;
return nbytes;
}

@ -151,6 +151,8 @@ protected:
virtual void Cancel() noexcept {}
private:
std::size_t ReadFromBuffer(std::span<std::byte> dest) noexcept;
bool IsSeeking() const noexcept {
return seek_offset != UNKNOWN_SIZE;
}