input/buffering: eliminate "idle" flag, automatically seek to next hole
This commit is contained in:
@@ -105,7 +105,6 @@ BufferingInputStream::Read(std::unique_lock<Mutex> &lock, void *ptr, size_t s)
|
|||||||
|
|
||||||
if (!IsAvailable()) {
|
if (!IsAvailable()) {
|
||||||
/* wake up the sleeping thread */
|
/* wake up the sleeping thread */
|
||||||
idle = false;
|
|
||||||
wake_cond.notify_one();
|
wake_cond.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,12 +116,6 @@ BufferingInputStream::Read(std::unique_lock<Mutex> &lock, void *ptr, size_t s)
|
|||||||
std::rethrow_exception(std::exchange(read_error, {}));
|
std::rethrow_exception(std::exchange(read_error, {}));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (idle) {
|
|
||||||
/* wake up the sleeping thread */
|
|
||||||
idle = false;
|
|
||||||
wake_cond.notify_one();
|
|
||||||
}
|
|
||||||
|
|
||||||
client_cond.wait(lock);
|
client_cond.wait(lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -158,11 +151,10 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
seek_error = std::current_exception();
|
seek_error = std::current_exception();
|
||||||
}
|
}
|
||||||
|
|
||||||
idle = false;
|
|
||||||
seek = false;
|
seek = false;
|
||||||
read_error = {};
|
read_error = {};
|
||||||
client_cond.notify_one();
|
client_cond.notify_one();
|
||||||
} else if (read_error || idle) {
|
} else if (read_error) {
|
||||||
/* wait for client to consume the read error */
|
/* wait for client to consume the read error */
|
||||||
wake_cond.wait(lock);
|
wake_cond.wait(lock);
|
||||||
} else if (offset != input->GetOffset() && !IsAvailable()) {
|
} else if (offset != input->GetOffset() && !IsAvailable()) {
|
||||||
@@ -214,9 +206,22 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
if (w.empty()) {
|
if (w.empty()) {
|
||||||
if (IsAvailable()) {
|
if (IsAvailable()) {
|
||||||
/* we still have enough data
|
/* we still have enough data
|
||||||
for the next Read() - sleep
|
for the next Read() - seek
|
||||||
until we need more data */
|
to the first hole */
|
||||||
idle = true;
|
|
||||||
|
size_t new_offset = FindFirstHole();
|
||||||
|
if (new_offset == INVALID_OFFSET)
|
||||||
|
/* the file has been
|
||||||
|
read completely */
|
||||||
|
break;
|
||||||
|
|
||||||
|
try {
|
||||||
|
input->Seek(lock, new_offset);
|
||||||
|
} catch (...) {
|
||||||
|
read_error = std::current_exception();
|
||||||
|
client_cond.notify_one();
|
||||||
|
OnBufferAvailable();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
/* we need more data at our
|
/* we need more data at our
|
||||||
current position, because
|
current position, because
|
||||||
|
@@ -55,7 +55,7 @@ class BufferingInputStream : InputStreamHandler {
|
|||||||
|
|
||||||
SparseBuffer<uint8_t> buffer;
|
SparseBuffer<uint8_t> buffer;
|
||||||
|
|
||||||
bool stop = false, seek = false, idle = false;
|
bool stop = false, seek = false;
|
||||||
|
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user