diff --git a/NEWS b/NEWS index 7aaea260d..9933fd21d 100644 --- a/NEWS +++ b/NEWS @@ -11,6 +11,7 @@ ver 0.22 (not yet released) * input - curl: support "charset" parameter in URI fragment - ffmpeg: allow partial reads + - io_uring: new plugin for local files on Linux (using liburing) * archive - iso9660: support seeking * playlist diff --git a/src/CommandLine.cxx b/src/CommandLine.cxx index 06a4d63a3..b43253e4c 100644 --- a/src/CommandLine.cxx +++ b/src/CommandLine.cxx @@ -37,6 +37,7 @@ #include "fs/Traits.hxx" #include "fs/FileSystem.hxx" #include "fs/StandardDirectory.hxx" +#include "io/uring/Features.h" #include "util/Domain.hxx" #include "util/OptionDef.hxx" #include "util/OptionParser.hxx" @@ -203,6 +204,9 @@ static void version() "\n" "Input plugins:\n" " file" +#ifdef HAVE_URING + " io_uring" +#endif #ifdef ENABLE_ARCHIVE " archive" #endif diff --git a/src/input/Init.cxx b/src/input/Init.cxx index 87e944463..49f0b279f 100644 --- a/src/input/Init.cxx +++ b/src/input/Init.cxx @@ -30,9 +30,18 @@ #include #include +#include "io/uring/Features.h" +#ifdef HAVE_URING +#include "plugins/UringInputPlugin.hxx" +#endif + void input_stream_global_init(const ConfigData &config, EventLoop &event_loop) { +#ifdef HAVE_URING + InitUringInputPlugin(event_loop); +#endif + const ConfigBlock empty; for (unsigned i = 0; input_plugins[i] != nullptr; ++i) { diff --git a/src/input/LocalOpen.cxx b/src/input/LocalOpen.cxx index ecbd42511..5d8751e19 100644 --- a/src/input/LocalOpen.cxx +++ b/src/input/LocalOpen.cxx @@ -22,6 +22,11 @@ #include "plugins/FileInputPlugin.hxx" #include "config.h" +#include "io/uring/Features.h" +#ifdef HAVE_URING +#include "plugins/UringInputPlugin.hxx" +#endif + #ifdef ENABLE_ARCHIVE #include "plugins/ArchiveInputPlugin.hxx" #endif @@ -39,6 +44,12 @@ OpenLocalInputStream(Path path, Mutex &mutex) #ifdef ENABLE_ARCHIVE try { #endif +#ifdef HAVE_URING + is = OpenUringInputStream(path.c_str(), mutex); + if (is) + return is; +#endif + is = OpenFileInputStream(path, mutex); #ifdef ENABLE_ARCHIVE } catch (const std::system_error &e) { diff --git a/src/input/plugins/UringInputPlugin.cxx b/src/input/plugins/UringInputPlugin.cxx new file mode 100644 index 000000000..7205d3540 --- /dev/null +++ b/src/input/plugins/UringInputPlugin.cxx @@ -0,0 +1,202 @@ +/* + * Copyright 2003-2020 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "UringInputPlugin.hxx" +#include "../AsyncInputStream.hxx" +#include "event/Call.hxx" +#include "event/Loop.hxx" +#include "system/Error.hxx" +#include "io/Open.hxx" +#include "io/UniqueFileDescriptor.hxx" +#include "io/uring/Operation.hxx" +#include "io/uring/Queue.hxx" +#include "util/RuntimeError.hxx" + +#include + +/** + * Read at most this number of bytes in each read request. + */ +static const size_t URING_MAX_READ = 256 * 1024; + +/** + * Do not buffer more than this number of bytes. It should be a + * reasonable limit that doesn't make low-end machines suffer too + * much, but doesn't cause stuttering on high-latency lines. + */ +static const size_t URING_MAX_BUFFERED = 512 * 1024; + +/** + * Resume the stream at this number of bytes after it has been paused. + */ +static const size_t URING_RESUME_AT = 384 * 1024; + +static EventLoop *uring_input_event_loop; +static Uring::Queue *uring_input_queue; + +class UringInputStream final : public AsyncInputStream, Uring::Operation { + Uring::Queue ů + + UniqueFileDescriptor fd; + + uint64_t next_offset = 0; + + struct iovec iov; + +public: + UringInputStream(EventLoop &event_loop, Uring::Queue &_uring, + const char *path, + UniqueFileDescriptor &&_fd, + offset_type _size, Mutex &_mutex) + :AsyncInputStream(event_loop, + path, _mutex, + URING_MAX_BUFFERED, + URING_RESUME_AT), + uring(_uring), + fd(std::move(_fd)) + { + size = _size; + seekable = true; + SetReady(); + + BlockingCall(GetEventLoop(), [this](){ + SubmitRead(); + }); + } + + ~UringInputStream() override { + BlockingCall(GetEventLoop(), [this](){ + CancelUring(); + }); + } + +private: + void SubmitRead() noexcept; + +protected: + /* virtual methods from AsyncInputStream */ + void DoResume() override; + void DoSeek(offset_type new_offset) override; + +private: + /* virtual methods from class Uring::Operation */ + void OnUringCompletion(int res) noexcept override; +}; + +void +UringInputStream::SubmitRead() noexcept +{ + assert(!IsUringPending()); + + int64_t remaining = size - next_offset; + if (remaining <= 0) + return; + + auto w = PrepareWriteBuffer(); + if (w.empty()) { + Pause(); + return; + } + + auto *s = uring.GetSubmitEntry(); + assert(s != nullptr); // TODO: what if the submit queue is full? + + iov.iov_base = w.data; + iov.iov_len = std::min(std::min(remaining, + URING_MAX_READ), + w.size); + + io_uring_prep_readv(s, fd.Get(), &iov, 1, next_offset); + uring.Push(*s, *this); +} + +void +UringInputStream::DoResume() +{ + SubmitRead(); +} + +void +UringInputStream::DoSeek(offset_type new_offset) +{ + CancelUring(); + + next_offset = offset = new_offset; + SeekDone(); + SubmitRead(); +} + +void +UringInputStream::OnUringCompletion(int res) noexcept +{ + const std::lock_guard protect(mutex); + assert(!IsBufferFull()); + assert(IsBufferFull() == (GetBufferSpace() == 0)); + + if (res <= 0) { + try { + if (res == 0) + throw std::runtime_error("Premature end of file"); + else + throw MakeErrno(-res, "Read failed"); + } catch (...) { + postponed_exception = std::current_exception(); + } + + InvokeOnAvailable(); + return; + } + + CommitWriteBuffer(res); + next_offset += res; + SubmitRead(); +} + +InputStreamPtr +OpenUringInputStream(const char *path, Mutex &mutex) +{ + if (uring_input_queue == nullptr) + return nullptr; + + // TODO: use IORING_OP_OPENAT + auto fd = OpenReadOnly(path); + + // TODO: use IORING_OP_STATX + struct stat st; + if (fstat(fd.Get(), &st) < 0) + throw FormatErrno("Failed to access %s", path); + + if (!S_ISREG(st.st_mode)) + throw FormatRuntimeError("Not a regular file: %s", path); + + return std::make_unique(*uring_input_event_loop, + *uring_input_queue, + path, std::move(fd), + st.st_size, mutex); +} + +void +InitUringInputPlugin(EventLoop &event_loop) noexcept +{ + uring_input_event_loop = &event_loop; + + BlockingCall(event_loop, [](){ + uring_input_queue = uring_input_event_loop->GetUring(); + }); +} diff --git a/src/input/plugins/UringInputPlugin.hxx b/src/input/plugins/UringInputPlugin.hxx new file mode 100644 index 000000000..d476388e9 --- /dev/null +++ b/src/input/plugins/UringInputPlugin.hxx @@ -0,0 +1,34 @@ +/* + * Copyright 2003-2020 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_INPUT_URING_HXX +#define MPD_INPUT_URING_HXX + +#include "input/Ptr.hxx" +#include "thread/Mutex.hxx" + +class EventLoop; + +void +InitUringInputPlugin(EventLoop &event_loop) noexcept; + +InputStreamPtr +OpenUringInputStream(const char *path, Mutex &mutex); + +#endif diff --git a/src/input/plugins/meson.build b/src/input/plugins/meson.build index 14f6ac576..73223654f 100644 --- a/src/input/plugins/meson.build +++ b/src/input/plugins/meson.build @@ -2,6 +2,10 @@ input_plugins_sources = [ 'FileInputPlugin.cxx', ] +if uring_dep.found() + input_plugins_sources += 'UringInputPlugin.cxx' +endif + if alsa_dep.found() input_plugins_sources += 'AlsaInputPlugin.cxx' endif