input/buffering: make read errors fatal, no recovery
If a read error occurs, it is very unlikely that the InputStream will ever recover. Removing the code removes some code complexity which just isn't worth it. And it allows supporting multiple readers for one buffer.
This commit is contained in:
@@ -111,10 +111,8 @@ BufferingInputStream::Read(std::unique_lock<Mutex> &lock, void *ptr, size_t s)
|
|||||||
return nbytes;
|
return nbytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (read_error) {
|
if (read_error)
|
||||||
wake_cond.notify_one();
|
std::rethrow_exception(read_error);
|
||||||
std::rethrow_exception(std::exchange(read_error, {}));
|
|
||||||
}
|
|
||||||
|
|
||||||
client_cond.wait(lock);
|
client_cond.wait(lock);
|
||||||
}
|
}
|
||||||
@@ -152,11 +150,7 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
}
|
}
|
||||||
|
|
||||||
seek = false;
|
seek = false;
|
||||||
read_error = {};
|
|
||||||
client_cond.notify_one();
|
client_cond.notify_one();
|
||||||
} else if (read_error) {
|
|
||||||
/* wait for client to consume the read error */
|
|
||||||
wake_cond.wait(lock);
|
|
||||||
} else if (offset != input->GetOffset() && !IsAvailable()) {
|
} else if (offset != input->GetOffset() && !IsAvailable()) {
|
||||||
/* a past Seek() call was a no-op because data
|
/* a past Seek() call was a no-op because data
|
||||||
was already available at that position, but
|
was already available at that position, but
|
||||||
@@ -180,6 +174,7 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
read_error = std::current_exception();
|
read_error = std::current_exception();
|
||||||
client_cond.notify_one();
|
client_cond.notify_one();
|
||||||
OnBufferAvailable();
|
OnBufferAvailable();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} else if (input->IsEOF()) {
|
} else if (input->IsEOF()) {
|
||||||
/* our input has reached its end: prepare
|
/* our input has reached its end: prepare
|
||||||
@@ -198,6 +193,7 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
read_error = std::current_exception();
|
read_error = std::current_exception();
|
||||||
client_cond.notify_one();
|
client_cond.notify_one();
|
||||||
OnBufferAvailable();
|
OnBufferAvailable();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} else if (input->IsAvailable()) {
|
} else if (input->IsAvailable()) {
|
||||||
const auto read_offset = input->GetOffset();
|
const auto read_offset = input->GetOffset();
|
||||||
@@ -221,6 +217,7 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
read_error = std::current_exception();
|
read_error = std::current_exception();
|
||||||
client_cond.notify_one();
|
client_cond.notify_one();
|
||||||
OnBufferAvailable();
|
OnBufferAvailable();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* we need more data at our
|
/* we need more data at our
|
||||||
@@ -235,6 +232,7 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
read_error = std::current_exception();
|
read_error = std::current_exception();
|
||||||
client_cond.notify_one();
|
client_cond.notify_one();
|
||||||
OnBufferAvailable();
|
OnBufferAvailable();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -248,6 +246,9 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
read_offset + nbytes);
|
read_offset + nbytes);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
read_error = std::current_exception();
|
read_error = std::current_exception();
|
||||||
|
client_cond.notify_one();
|
||||||
|
OnBufferAvailable();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
client_cond.notify_one();
|
client_cond.notify_one();
|
||||||
|
Reference in New Issue
Block a user