From 85bab67083d506f06494d89e5453791d5aa5f841 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Wed, 23 Sep 2020 10:01:16 +0200 Subject: [PATCH] input/uring: safe cancellation My concept with `class CancellableOperation` doesn't work properly, because the kernel may continue to write to the given buffer as soon as the read finishes. To fix this, this commit adds `class ReadOperation` which owns the buffer and the `struct iovec`. Instances of this class persist until the read really finishes, even if the operation is canceled. --- src/input/plugins/UringInputPlugin.cxx | 72 +++++++++++---------- src/io/uring/ReadOperation.cxx | 70 ++++++++++++++++++++ src/io/uring/ReadOperation.hxx | 88 ++++++++++++++++++++++++++ src/io/uring/meson.build | 1 + 4 files changed, 199 insertions(+), 32 deletions(-) create mode 100644 src/io/uring/ReadOperation.cxx create mode 100644 src/io/uring/ReadOperation.hxx diff --git a/src/input/plugins/UringInputPlugin.cxx b/src/input/plugins/UringInputPlugin.cxx index ab68c6825..d2f650014 100644 --- a/src/input/plugins/UringInputPlugin.cxx +++ b/src/input/plugins/UringInputPlugin.cxx @@ -24,7 +24,7 @@ #include "system/Error.hxx" #include "io/Open.hxx" #include "io/UniqueFileDescriptor.hxx" -#include "io/uring/Operation.hxx" +#include "io/uring/ReadOperation.hxx" #include "io/uring/Queue.hxx" #include "util/RuntimeError.hxx" @@ -50,14 +50,14 @@ static const size_t URING_RESUME_AT = 384 * 1024; static EventLoop *uring_input_event_loop; static Uring::Queue *uring_input_queue; -class UringInputStream final : public AsyncInputStream, Uring::Operation { +class UringInputStream final : public AsyncInputStream, Uring::ReadHandler { Uring::Queue ů UniqueFileDescriptor fd; uint64_t next_offset = 0; - struct iovec iov; + std::unique_ptr read_operation; public: UringInputStream(EventLoop &event_loop, Uring::Queue &_uring, @@ -82,13 +82,18 @@ public: ~UringInputStream() noexcept override { BlockingCall(GetEventLoop(), [this](){ - CancelUring(); + CancelRead(); }); } private: void SubmitRead() noexcept; + void CancelRead() noexcept { + if (read_operation) + read_operation.release()->Cancel(); + } + protected: /* virtual methods from AsyncInputStream */ void DoResume() override; @@ -96,13 +101,15 @@ protected: private: /* virtual methods from class Uring::Operation */ - void OnUringCompletion(int res) noexcept override; + void OnRead(std::unique_ptr buffer, + std::size_t size) noexcept override; + void OnReadError(int error) noexcept override; }; void UringInputStream::SubmitRead() noexcept { - assert(!IsUringPending()); + assert(!read_operation); int64_t remaining = size - next_offset; if (remaining <= 0) @@ -114,16 +121,10 @@ UringInputStream::SubmitRead() noexcept return; } - auto *s = uring.GetSubmitEntry(); - assert(s != nullptr); // TODO: what if the submit queue is full? - - iov.iov_base = w.data; - iov.iov_len = std::min(std::min(remaining, - URING_MAX_READ), - w.size); - - io_uring_prep_readv(s, fd.Get(), &iov, 1, next_offset); - uring.Push(*s, *this); + read_operation = std::make_unique(); + read_operation->Start(uring, fd, next_offset, + std::min(w.size, URING_MAX_READ), + *this); } void @@ -135,7 +136,7 @@ UringInputStream::DoResume() void UringInputStream::DoSeek(offset_type new_offset) { - CancelUring(); + CancelRead(); next_offset = offset = new_offset; SeekDone(); @@ -143,31 +144,38 @@ UringInputStream::DoSeek(offset_type new_offset) } void -UringInputStream::OnUringCompletion(int res) noexcept +UringInputStream::OnRead(std::unique_ptr data, + std::size_t nbytes) noexcept { + read_operation.reset(); + const std::lock_guard protect(mutex); - assert(!IsBufferFull()); - assert(IsBufferFull() == (GetBufferSpace() == 0)); - - if (res <= 0) { - try { - if (res == 0) - throw std::runtime_error("Premature end of file"); - else - throw MakeErrno(-res, "Read failed"); - } catch (...) { - postponed_exception = std::current_exception(); - } + if (nbytes == 0) { + postponed_exception = std::make_exception_ptr(std::runtime_error("Premature end of file")); InvokeOnAvailable(); return; } - CommitWriteBuffer(res); - next_offset += res; + auto w = PrepareWriteBuffer(); + assert(w.size >= nbytes); + memcpy(w.data, data.get(), nbytes); + CommitWriteBuffer(nbytes); + next_offset += nbytes; SubmitRead(); } +void +UringInputStream::OnReadError(int error) noexcept +{ + read_operation.reset(); + + const std::lock_guard protect(mutex); + + postponed_exception = std::make_exception_ptr(MakeErrno(error, "Read failed")); + InvokeOnAvailable(); +} + InputStreamPtr OpenUringInputStream(const char *path, Mutex &mutex) { diff --git a/src/io/uring/ReadOperation.cxx b/src/io/uring/ReadOperation.cxx new file mode 100644 index 000000000..f3aa717a0 --- /dev/null +++ b/src/io/uring/ReadOperation.cxx @@ -0,0 +1,70 @@ +/* + * Copyright 2020 Max Kellermann + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "ReadOperation.hxx" +#include "Queue.hxx" +#include "io/FileDescriptor.hxx" + +#include + +namespace Uring { + +void +ReadOperation::Start(Queue &queue, FileDescriptor fd, off_t offset, + std::size_t size, ReadHandler &_handler) noexcept +{ + assert(!buffer); + + handler = &_handler; + + buffer = std::make_unique(size); + + auto *s = queue.GetSubmitEntry(); + assert(s != nullptr); // TODO: what if the submit queue is full? + + iov.iov_base = buffer.get(); + iov.iov_len = size; + + io_uring_prep_readv(s, fd.Get(), &iov, 1, offset); + queue.Push(*s, *this); +} + +void +ReadOperation::OnUringCompletion(int res) noexcept +{ + if (handler == nullptr) + /* operation was canceled */ + delete this; + else if (res >= 0) + handler->OnRead(std::move(buffer), res); + else + handler->OnReadError(-res); +} + +} // namespace Uring diff --git a/src/io/uring/ReadOperation.hxx b/src/io/uring/ReadOperation.hxx new file mode 100644 index 000000000..1efccb9a2 --- /dev/null +++ b/src/io/uring/ReadOperation.hxx @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Max Kellermann + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#pragma once + +#include "Operation.hxx" + +#include +#include + +#include // for struct iovec + +class FileDescriptor; + +namespace Uring { + +class Queue; + +class ReadHandler { +public: + virtual void OnRead(std::unique_ptr buffer, + std::size_t size) noexcept = 0; + + /** + * @param error an errno value + */ + virtual void OnReadError(int error) noexcept = 0; +}; + +/** + * Read into a newly allocated buffer. + * + * Instances of this class must be allocated with `new`, because + * cancellation will require this object (and the allocated buffer) to + * persist until the kernel completes the operation. + */ +class ReadOperation final : Operation { + ReadHandler *handler; + + struct iovec iov; + + std::unique_ptr buffer; + +public: + void Start(Queue &queue, FileDescriptor fd, off_t offset, + std::size_t size, ReadHandler &_handler) noexcept; + + /** + * Cancel this operation. This instance will be freed using + * `delete` after the kernel has finished cancellation, + * i.e. the caller resigns ownership. + */ + void Cancel() noexcept { + handler = nullptr; + } + +private: + /* virtual methods from class Operation */ + void OnUringCompletion(int res) noexcept override; +}; + +} // namespace Uring diff --git a/src/io/uring/meson.build b/src/io/uring/meson.build index 7d582be7d..9bfe24654 100644 --- a/src/io/uring/meson.build +++ b/src/io/uring/meson.build @@ -21,6 +21,7 @@ uring = static_library( 'Ring.cxx', 'Queue.cxx', 'Operation.cxx', + 'ReadOperation.cxx', include_directories: inc, dependencies: [ liburing,