diff --git a/src/io/uring/Queue.cxx b/src/io/uring/Queue.cxx index 82a4bf9cf..29406fe1f 100644 --- a/src/io/uring/Queue.cxx +++ b/src/io/uring/Queue.cxx @@ -34,6 +34,8 @@ #include "CancellableOperation.hxx" #include "util/DeleteDisposer.hxx" +#include + namespace Uring { Queue::Queue(unsigned entries, unsigned flags) @@ -46,6 +48,23 @@ Queue::~Queue() noexcept operations.clear_and_dispose(DeleteDisposer{}); } +struct io_uring_sqe & +Queue::RequireSubmitEntry() +{ + auto *sqe = GetSubmitEntry(); + if (sqe == nullptr) { + /* the submit queue is full; submit it to the kernel + and try again */ + Submit(); + + sqe = GetSubmitEntry(); + if (sqe == nullptr) + throw std::runtime_error{"io_uring_get_sqe() failed"}; + } + + return *sqe; +} + void Queue::AddPending(struct io_uring_sqe &sqe, Operation &operation) noexcept diff --git a/src/io/uring/Queue.hxx b/src/io/uring/Queue.hxx index 5a9f8132f..62dd6f581 100644 --- a/src/io/uring/Queue.hxx +++ b/src/io/uring/Queue.hxx @@ -63,6 +63,14 @@ public: return ring.GetSubmitEntry(); } + /** + * Like GetSubmitEntry(), but call Submit() if the submit + * queue is full. + * + * May throw exceptions if Submit() fails. + */ + struct io_uring_sqe &RequireSubmitEntry(); + bool HasPending() const noexcept { return !operations.empty(); } diff --git a/src/io/uring/ReadOperation.cxx b/src/io/uring/ReadOperation.cxx index f3aa717a0..17551eeae 100644 --- a/src/io/uring/ReadOperation.cxx +++ b/src/io/uring/ReadOperation.cxx @@ -45,14 +45,13 @@ ReadOperation::Start(Queue &queue, FileDescriptor fd, off_t offset, buffer = std::make_unique(size); - auto *s = queue.GetSubmitEntry(); - assert(s != nullptr); // TODO: what if the submit queue is full? + auto &s = queue.RequireSubmitEntry(); iov.iov_base = buffer.get(); iov.iov_len = size; - io_uring_prep_readv(s, fd.Get(), &iov, 1, offset); - queue.Push(*s, *this); + io_uring_prep_readv(&s, fd.Get(), &iov, 1, offset); + queue.Push(s, *this); } void