input/uring: new input plugin using io_uring
This is the final piece of the series to establish io_uring support on Linux. MPD doesn't need io_uring for its efficient bulk I/O support, but to allow file I/O to be cancelled. This is a big problem on CIFS/NFS mounts where processes sleep uninterruptable if the file server disappears, deadlocking MPD. With io_uring, a flaky NFS connection allows MPD to continue to work (even though there are still deadlocks inside MPD which need to be addressed). This plugin does not yet use cancellable `open()` using `IORING_OP_OPENAT`. This will be implemented later. Lots of other optimization opportunities for io_uring are still missing as well - for example the database update could benefit a lot, but unfortunately, io_uring doesn't have `readdir()` support just yet.
This commit is contained in:
parent
cdf8ac001c
commit
dae8da7066
1
NEWS
1
NEWS
@ -11,6 +11,7 @@ ver 0.22 (not yet released)
|
|||||||
* input
|
* input
|
||||||
- curl: support "charset" parameter in URI fragment
|
- curl: support "charset" parameter in URI fragment
|
||||||
- ffmpeg: allow partial reads
|
- ffmpeg: allow partial reads
|
||||||
|
- io_uring: new plugin for local files on Linux (using liburing)
|
||||||
* archive
|
* archive
|
||||||
- iso9660: support seeking
|
- iso9660: support seeking
|
||||||
* playlist
|
* playlist
|
||||||
|
@ -37,6 +37,7 @@
|
|||||||
#include "fs/Traits.hxx"
|
#include "fs/Traits.hxx"
|
||||||
#include "fs/FileSystem.hxx"
|
#include "fs/FileSystem.hxx"
|
||||||
#include "fs/StandardDirectory.hxx"
|
#include "fs/StandardDirectory.hxx"
|
||||||
|
#include "io/uring/Features.h"
|
||||||
#include "util/Domain.hxx"
|
#include "util/Domain.hxx"
|
||||||
#include "util/OptionDef.hxx"
|
#include "util/OptionDef.hxx"
|
||||||
#include "util/OptionParser.hxx"
|
#include "util/OptionParser.hxx"
|
||||||
@ -203,6 +204,9 @@ static void version()
|
|||||||
"\n"
|
"\n"
|
||||||
"Input plugins:\n"
|
"Input plugins:\n"
|
||||||
" file"
|
" file"
|
||||||
|
#ifdef HAVE_URING
|
||||||
|
" io_uring"
|
||||||
|
#endif
|
||||||
#ifdef ENABLE_ARCHIVE
|
#ifdef ENABLE_ARCHIVE
|
||||||
" archive"
|
" archive"
|
||||||
#endif
|
#endif
|
||||||
|
@ -30,9 +30,18 @@
|
|||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
|
||||||
|
#include "io/uring/Features.h"
|
||||||
|
#ifdef HAVE_URING
|
||||||
|
#include "plugins/UringInputPlugin.hxx"
|
||||||
|
#endif
|
||||||
|
|
||||||
void
|
void
|
||||||
input_stream_global_init(const ConfigData &config, EventLoop &event_loop)
|
input_stream_global_init(const ConfigData &config, EventLoop &event_loop)
|
||||||
{
|
{
|
||||||
|
#ifdef HAVE_URING
|
||||||
|
InitUringInputPlugin(event_loop);
|
||||||
|
#endif
|
||||||
|
|
||||||
const ConfigBlock empty;
|
const ConfigBlock empty;
|
||||||
|
|
||||||
for (unsigned i = 0; input_plugins[i] != nullptr; ++i) {
|
for (unsigned i = 0; input_plugins[i] != nullptr; ++i) {
|
||||||
|
@ -22,6 +22,11 @@
|
|||||||
#include "plugins/FileInputPlugin.hxx"
|
#include "plugins/FileInputPlugin.hxx"
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
|
|
||||||
|
#include "io/uring/Features.h"
|
||||||
|
#ifdef HAVE_URING
|
||||||
|
#include "plugins/UringInputPlugin.hxx"
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef ENABLE_ARCHIVE
|
#ifdef ENABLE_ARCHIVE
|
||||||
#include "plugins/ArchiveInputPlugin.hxx"
|
#include "plugins/ArchiveInputPlugin.hxx"
|
||||||
#endif
|
#endif
|
||||||
@ -39,6 +44,12 @@ OpenLocalInputStream(Path path, Mutex &mutex)
|
|||||||
#ifdef ENABLE_ARCHIVE
|
#ifdef ENABLE_ARCHIVE
|
||||||
try {
|
try {
|
||||||
#endif
|
#endif
|
||||||
|
#ifdef HAVE_URING
|
||||||
|
is = OpenUringInputStream(path.c_str(), mutex);
|
||||||
|
if (is)
|
||||||
|
return is;
|
||||||
|
#endif
|
||||||
|
|
||||||
is = OpenFileInputStream(path, mutex);
|
is = OpenFileInputStream(path, mutex);
|
||||||
#ifdef ENABLE_ARCHIVE
|
#ifdef ENABLE_ARCHIVE
|
||||||
} catch (const std::system_error &e) {
|
} catch (const std::system_error &e) {
|
||||||
|
202
src/input/plugins/UringInputPlugin.cxx
Normal file
202
src/input/plugins/UringInputPlugin.cxx
Normal file
@ -0,0 +1,202 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2003-2020 The Music Player Daemon Project
|
||||||
|
* http://www.musicpd.org
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation; either version 2 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License along
|
||||||
|
* with this program; if not, write to the Free Software Foundation, Inc.,
|
||||||
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "UringInputPlugin.hxx"
|
||||||
|
#include "../AsyncInputStream.hxx"
|
||||||
|
#include "event/Call.hxx"
|
||||||
|
#include "event/Loop.hxx"
|
||||||
|
#include "system/Error.hxx"
|
||||||
|
#include "io/Open.hxx"
|
||||||
|
#include "io/UniqueFileDescriptor.hxx"
|
||||||
|
#include "io/uring/Operation.hxx"
|
||||||
|
#include "io/uring/Queue.hxx"
|
||||||
|
#include "util/RuntimeError.hxx"
|
||||||
|
|
||||||
|
#include <sys/stat.h>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read at most this number of bytes in each read request.
|
||||||
|
*/
|
||||||
|
static const size_t URING_MAX_READ = 256 * 1024;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do not buffer more than this number of bytes. It should be a
|
||||||
|
* reasonable limit that doesn't make low-end machines suffer too
|
||||||
|
* much, but doesn't cause stuttering on high-latency lines.
|
||||||
|
*/
|
||||||
|
static const size_t URING_MAX_BUFFERED = 512 * 1024;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resume the stream at this number of bytes after it has been paused.
|
||||||
|
*/
|
||||||
|
static const size_t URING_RESUME_AT = 384 * 1024;
|
||||||
|
|
||||||
|
static EventLoop *uring_input_event_loop;
|
||||||
|
static Uring::Queue *uring_input_queue;
|
||||||
|
|
||||||
|
class UringInputStream final : public AsyncInputStream, Uring::Operation {
|
||||||
|
Uring::Queue ů
|
||||||
|
|
||||||
|
UniqueFileDescriptor fd;
|
||||||
|
|
||||||
|
uint64_t next_offset = 0;
|
||||||
|
|
||||||
|
struct iovec iov;
|
||||||
|
|
||||||
|
public:
|
||||||
|
UringInputStream(EventLoop &event_loop, Uring::Queue &_uring,
|
||||||
|
const char *path,
|
||||||
|
UniqueFileDescriptor &&_fd,
|
||||||
|
offset_type _size, Mutex &_mutex)
|
||||||
|
:AsyncInputStream(event_loop,
|
||||||
|
path, _mutex,
|
||||||
|
URING_MAX_BUFFERED,
|
||||||
|
URING_RESUME_AT),
|
||||||
|
uring(_uring),
|
||||||
|
fd(std::move(_fd))
|
||||||
|
{
|
||||||
|
size = _size;
|
||||||
|
seekable = true;
|
||||||
|
SetReady();
|
||||||
|
|
||||||
|
BlockingCall(GetEventLoop(), [this](){
|
||||||
|
SubmitRead();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
~UringInputStream() override {
|
||||||
|
BlockingCall(GetEventLoop(), [this](){
|
||||||
|
CancelUring();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void SubmitRead() noexcept;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
/* virtual methods from AsyncInputStream */
|
||||||
|
void DoResume() override;
|
||||||
|
void DoSeek(offset_type new_offset) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
/* virtual methods from class Uring::Operation */
|
||||||
|
void OnUringCompletion(int res) noexcept override;
|
||||||
|
};
|
||||||
|
|
||||||
|
void
|
||||||
|
UringInputStream::SubmitRead() noexcept
|
||||||
|
{
|
||||||
|
assert(!IsUringPending());
|
||||||
|
|
||||||
|
int64_t remaining = size - next_offset;
|
||||||
|
if (remaining <= 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto w = PrepareWriteBuffer();
|
||||||
|
if (w.empty()) {
|
||||||
|
Pause();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto *s = uring.GetSubmitEntry();
|
||||||
|
assert(s != nullptr); // TODO: what if the submit queue is full?
|
||||||
|
|
||||||
|
iov.iov_base = w.data;
|
||||||
|
iov.iov_len = std::min<size_t>(std::min<uint64_t>(remaining,
|
||||||
|
URING_MAX_READ),
|
||||||
|
w.size);
|
||||||
|
|
||||||
|
io_uring_prep_readv(s, fd.Get(), &iov, 1, next_offset);
|
||||||
|
uring.Push(*s, *this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
UringInputStream::DoResume()
|
||||||
|
{
|
||||||
|
SubmitRead();
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
UringInputStream::DoSeek(offset_type new_offset)
|
||||||
|
{
|
||||||
|
CancelUring();
|
||||||
|
|
||||||
|
next_offset = offset = new_offset;
|
||||||
|
SeekDone();
|
||||||
|
SubmitRead();
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
UringInputStream::OnUringCompletion(int res) noexcept
|
||||||
|
{
|
||||||
|
const std::lock_guard<Mutex> protect(mutex);
|
||||||
|
assert(!IsBufferFull());
|
||||||
|
assert(IsBufferFull() == (GetBufferSpace() == 0));
|
||||||
|
|
||||||
|
if (res <= 0) {
|
||||||
|
try {
|
||||||
|
if (res == 0)
|
||||||
|
throw std::runtime_error("Premature end of file");
|
||||||
|
else
|
||||||
|
throw MakeErrno(-res, "Read failed");
|
||||||
|
} catch (...) {
|
||||||
|
postponed_exception = std::current_exception();
|
||||||
|
}
|
||||||
|
|
||||||
|
InvokeOnAvailable();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
CommitWriteBuffer(res);
|
||||||
|
next_offset += res;
|
||||||
|
SubmitRead();
|
||||||
|
}
|
||||||
|
|
||||||
|
InputStreamPtr
|
||||||
|
OpenUringInputStream(const char *path, Mutex &mutex)
|
||||||
|
{
|
||||||
|
if (uring_input_queue == nullptr)
|
||||||
|
return nullptr;
|
||||||
|
|
||||||
|
// TODO: use IORING_OP_OPENAT
|
||||||
|
auto fd = OpenReadOnly(path);
|
||||||
|
|
||||||
|
// TODO: use IORING_OP_STATX
|
||||||
|
struct stat st;
|
||||||
|
if (fstat(fd.Get(), &st) < 0)
|
||||||
|
throw FormatErrno("Failed to access %s", path);
|
||||||
|
|
||||||
|
if (!S_ISREG(st.st_mode))
|
||||||
|
throw FormatRuntimeError("Not a regular file: %s", path);
|
||||||
|
|
||||||
|
return std::make_unique<UringInputStream>(*uring_input_event_loop,
|
||||||
|
*uring_input_queue,
|
||||||
|
path, std::move(fd),
|
||||||
|
st.st_size, mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
InitUringInputPlugin(EventLoop &event_loop) noexcept
|
||||||
|
{
|
||||||
|
uring_input_event_loop = &event_loop;
|
||||||
|
|
||||||
|
BlockingCall(event_loop, [](){
|
||||||
|
uring_input_queue = uring_input_event_loop->GetUring();
|
||||||
|
});
|
||||||
|
}
|
34
src/input/plugins/UringInputPlugin.hxx
Normal file
34
src/input/plugins/UringInputPlugin.hxx
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2003-2020 The Music Player Daemon Project
|
||||||
|
* http://www.musicpd.org
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation; either version 2 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License along
|
||||||
|
* with this program; if not, write to the Free Software Foundation, Inc.,
|
||||||
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef MPD_INPUT_URING_HXX
|
||||||
|
#define MPD_INPUT_URING_HXX
|
||||||
|
|
||||||
|
#include "input/Ptr.hxx"
|
||||||
|
#include "thread/Mutex.hxx"
|
||||||
|
|
||||||
|
class EventLoop;
|
||||||
|
|
||||||
|
void
|
||||||
|
InitUringInputPlugin(EventLoop &event_loop) noexcept;
|
||||||
|
|
||||||
|
InputStreamPtr
|
||||||
|
OpenUringInputStream(const char *path, Mutex &mutex);
|
||||||
|
|
||||||
|
#endif
|
@ -2,6 +2,10 @@ input_plugins_sources = [
|
|||||||
'FileInputPlugin.cxx',
|
'FileInputPlugin.cxx',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
if uring_dep.found()
|
||||||
|
input_plugins_sources += 'UringInputPlugin.cxx'
|
||||||
|
endif
|
||||||
|
|
||||||
if alsa_dep.found()
|
if alsa_dep.found()
|
||||||
input_plugins_sources += 'AlsaInputPlugin.cxx'
|
input_plugins_sources += 'AlsaInputPlugin.cxx'
|
||||||
endif
|
endif
|
||||||
|
Loading…
Reference in New Issue
Block a user