From bb922d577dd8dc484d25c29c7c85bf04ecd62256 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Wed, 1 Oct 2014 23:57:28 +0200 Subject: [PATCH] storage/nfs: use the libnfs async API Share the NFS connection with the NFS input plugin. --- Makefile.am | 1 + src/lib/nfs/Blocking.cxx | 84 +++++++++ src/lib/nfs/Blocking.hxx | 85 +++++++++ src/storage/plugins/NfsStorage.cxx | 279 +++++++++++++++++++++++------ 4 files changed, 399 insertions(+), 50 deletions(-) create mode 100644 src/lib/nfs/Blocking.cxx create mode 100644 src/lib/nfs/Blocking.hxx diff --git a/Makefile.am b/Makefile.am index 213fda2e7..90e2074fc 100644 --- a/Makefile.am +++ b/Makefile.am @@ -561,6 +561,7 @@ NFS_SOURCES = \ src/lib/nfs/Glue.cxx src/lib/nfs/Glue.hxx \ src/lib/nfs/Base.cxx src/lib/nfs/Base.hxx \ src/lib/nfs/FileReader.cxx src/lib/nfs/FileReader.hxx \ + src/lib/nfs/Blocking.cxx src/lib/nfs/Blocking.hxx \ src/lib/nfs/Domain.cxx src/lib/nfs/Domain.hxx if ENABLE_DATABASE diff --git a/src/lib/nfs/Blocking.cxx b/src/lib/nfs/Blocking.cxx new file mode 100644 index 000000000..5f769c408 --- /dev/null +++ b/src/lib/nfs/Blocking.cxx @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "Blocking.hxx" +#include "Connection.hxx" +#include "event/Call.hxx" + +bool +BlockingNfsOperation::Run(Error &_error) +{ + /* subscribe to the connection, which will invoke either + OnNfsConnectionReady() or OnNfsConnectionFailed() */ + BlockingCall(connection.GetEventLoop(), + [this](){ connection.AddLease(*this); }); + + /* wait for completion */ + LockWaitFinished(); + + /* check for error */ + if (error.IsDefined()) { + _error = std::move(error); + return false; + } + + return true; +} + +void +BlockingNfsOperation::OnNfsConnectionReady() +{ + if (!Start(error)) { + connection.RemoveLease(*this); + LockSetFinished(); + } +} + +void +BlockingNfsOperation::OnNfsConnectionFailed(const Error &_error) +{ + error.Set(_error); + LockSetFinished(); +} + +void +BlockingNfsOperation::OnNfsConnectionDisconnected(const Error &_error) +{ + error.Set(_error); + LockSetFinished(); +} + +void +BlockingNfsOperation::OnNfsCallback(unsigned status, void *data) +{ + connection.RemoveLease(*this); + + HandleResult(status, data); + LockSetFinished(); +} + +void +BlockingNfsOperation::OnNfsError(Error &&_error) +{ + connection.RemoveLease(*this); + + error = std::move(_error); + LockSetFinished(); +} diff --git a/src/lib/nfs/Blocking.hxx b/src/lib/nfs/Blocking.hxx new file mode 100644 index 000000000..f8354822d --- /dev/null +++ b/src/lib/nfs/Blocking.hxx @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_BLOCKING_NFS_CALLBACK_HXX +#define MPD_BLOCKING_NFS_CALLBACK_HXX + +#include "check.h" +#include "Callback.hxx" +#include "Lease.hxx" +#include "thread/Mutex.hxx" +#include "thread/Cond.hxx" +#include "util/Error.hxx" + +class NfsConnection; + +/** + * Utility class to implement a blocking NFS call using the libnfs + * async API. The actual method call is deferred to the #EventLoop + * thread, and method Run() waits for completion. + */ +class BlockingNfsOperation : protected NfsCallback, NfsLease { + Mutex mutex; + Cond cond; + + bool finished; + + Error error; + +protected: + NfsConnection &connection; + +public: + BlockingNfsOperation(NfsConnection &_connection) + :finished(false), connection(_connection) {} + + bool Run(Error &error); + +private: + void LockWaitFinished() { + const ScopeLock protect(mutex); + while (!finished) + cond.wait(mutex); + } + + /** + * Mark the operation as "finished" and wake up the waiting + * thread. + */ + void LockSetFinished() { + const ScopeLock protect(mutex); + finished = true; + cond.signal(); + } + + /* virtual methods from NfsLease */ + void OnNfsConnectionReady() final; + void OnNfsConnectionFailed(const Error &error) final; + void OnNfsConnectionDisconnected(const Error &error) final; + + /* virtual methods from NfsCallback */ + void OnNfsCallback(unsigned status, void *data) final; + void OnNfsError(Error &&error) final; + +protected: + virtual bool Start(Error &error) = 0; + virtual void HandleResult(unsigned status, void *data) = 0; +}; + +#endif diff --git a/src/storage/plugins/NfsStorage.cxx b/src/storage/plugins/NfsStorage.cxx index e28e41a67..8ddb14250 100644 --- a/src/storage/plugins/NfsStorage.cxx +++ b/src/storage/plugins/NfsStorage.cxx @@ -23,31 +23,64 @@ #include "storage/StorageInterface.hxx" #include "storage/FileInfo.hxx" #include "storage/MemoryDirectoryReader.hxx" +#include "lib/nfs/Blocking.hxx" #include "lib/nfs/Domain.hxx" #include "lib/nfs/Base.hxx" +#include "lib/nfs/Lease.hxx" +#include "lib/nfs/Connection.hxx" +#include "lib/nfs/Glue.hxx" #include "fs/AllocatedPath.hxx" #include "util/Error.hxx" #include "thread/Mutex.hxx" +#include "thread/Cond.hxx" +#include "event/Loop.hxx" +#include "event/Call.hxx" +#include "event/DeferredMonitor.hxx" +#include "event/TimeoutMonitor.hxx" extern "C" { #include #include } +#include + +#include #include #include -class NfsStorage final : public Storage { +class NfsStorage final + : public Storage, NfsLease, DeferredMonitor, TimeoutMonitor { + + enum class State { + INITIAL, CONNECTING, READY, DELAY, + }; + const std::string base; - nfs_context *const ctx; + const std::string server, export_name; + + NfsConnection *connection; + + Mutex mutex; + Cond cond; + State state; + Error last_error; public: - NfsStorage(const char *_base, nfs_context *_ctx) - :base(_base), ctx(_ctx) {} + NfsStorage(EventLoop &_loop, const char *_base, + std::string &&_server, std::string &&_export_name) + :DeferredMonitor(_loop), TimeoutMonitor(_loop), + base(_base), + server(std::move(_server)), + export_name(std::move(_export_name)), + state(State::INITIAL) { + nfs_init(); + } - virtual ~NfsStorage() { - nfs_destroy_context(ctx); + ~NfsStorage() { + BlockingCall(GetEventLoop(), [this](){ Disconnect(); }); + nfs_finish(); } /* virtual methods from class Storage */ @@ -60,6 +93,125 @@ public: std::string MapUTF8(const char *uri_utf8) const override; const char *MapToRelativeUTF8(const char *uri_utf8) const override; + + /* virtual methods from NfsLease */ + void OnNfsConnectionReady() final { + assert(state == State::CONNECTING); + + SetState(State::READY); + } + + void OnNfsConnectionFailed(gcc_unused const Error &error) final { + assert(state == State::CONNECTING); + + SetState(State::DELAY, error); + TimeoutMonitor::ScheduleSeconds(60); + } + + void OnNfsConnectionDisconnected(gcc_unused const Error &error) final { + assert(state == State::READY); + + SetState(State::DELAY, error); + TimeoutMonitor::ScheduleSeconds(5); + } + + /* virtual methods from DeferredMonitor */ + void RunDeferred() final { + if (state == State::INITIAL) + Connect(); + } + + /* virtual methods from TimeoutMonitor */ + void OnTimeout() final { + assert(state == State::DELAY); + + Connect(); + } + +private: + EventLoop &GetEventLoop() { + return DeferredMonitor::GetEventLoop(); + } + + void SetState(State _state) { + assert(GetEventLoop().IsInside()); + + const ScopeLock protect(mutex); + state = _state; + cond.broadcast(); + } + + void SetState(State _state, const Error &error) { + assert(GetEventLoop().IsInside()); + + const ScopeLock protect(mutex); + state = _state; + last_error.Set(error); + cond.broadcast(); + } + + void Connect() { + assert(state != State::READY); + assert(GetEventLoop().IsInside()); + + connection = &nfs_get_connection(server.c_str(), + export_name.c_str()); + connection->AddLease(*this); + + SetState(State::CONNECTING); + } + + void EnsureConnected() { + if (state != State::READY) + Connect(); + } + + bool WaitConnected(Error &error) { + const ScopeLock protect(mutex); + + while (true) { + switch (state) { + case State::INITIAL: + /* schedule connect */ + mutex.unlock(); + DeferredMonitor::Schedule(); + mutex.lock(); + break; + + case State::CONNECTING: + case State::READY: + return true; + + case State::DELAY: + assert(last_error.IsDefined()); + error.Set(last_error); + return false; + } + + cond.wait(mutex); + } + } + + void Disconnect() { + assert(GetEventLoop().IsInside()); + + switch (state) { + case State::INITIAL: + DeferredMonitor::Cancel(); + break; + + case State::CONNECTING: + case State::READY: + connection->RemoveLease(*this); + SetState(State::INITIAL); + break; + + case State::DELAY: + TimeoutMonitor::Cancel(); + SetState(State::INITIAL); + break; + } + } }; static std::string @@ -107,19 +259,24 @@ Copy(FileInfo &info, const struct stat &st) info.inode = st.st_ino; } -static bool -GetInfo(nfs_context *ctx, const char *path, FileInfo &info, Error &error) -{ - struct stat st; - int result = nfs_stat(ctx, path, &st); - if (result < 0) { - error.SetErrno(-result, "nfs_stat() failed"); - return false; +class NfsGetInfoOperation final : public BlockingNfsOperation { + const char *const path; + FileInfo &info; + +public: + NfsGetInfoOperation(NfsConnection &_connection, const char *_path, + FileInfo &_info) + :BlockingNfsOperation(_connection), path(_path), info(_info) {} + +protected: + bool Start(Error &_error) override { + return connection.Stat(path, *this, _error); } - Copy(info, st); - return true; -} + void HandleResult(gcc_unused unsigned status, void *data) override { + Copy(info, *(const struct stat *)data); + } +}; bool NfsStorage::GetInfo(const char *uri_utf8, gcc_unused bool follow, @@ -129,7 +286,11 @@ NfsStorage::GetInfo(const char *uri_utf8, gcc_unused bool follow, if (path.empty()) return false; - return ::GetInfo(ctx, path.c_str(), info, error); + if (!WaitConnected(error)) + return nullptr; + + NfsGetInfoOperation operation(*connection, path.c_str(), info); + return operation.Run(error); } gcc_pure @@ -164,24 +325,43 @@ Copy(FileInfo &info, const struct nfsdirent &ent) info.inode = ent.inode; } -StorageDirectoryReader * -NfsStorage::OpenDirectory(const char *uri_utf8, Error &error) -{ - const std::string path = UriToNfsPath(uri_utf8, error); - if (path.empty()) - return nullptr; - - nfsdir *dir; - int result = nfs_opendir(ctx, path.c_str(), &dir); - if (result < 0) { - error.SetErrno(-result, "nfs_opendir() failed"); - return nullptr; - } +class NfsListDirectoryOperation final : public BlockingNfsOperation { + const char *const path; MemoryStorageDirectoryReader::List entries; +public: + NfsListDirectoryOperation(NfsConnection &_connection, + const char *_path) + :BlockingNfsOperation(_connection), path(_path) {} + + StorageDirectoryReader *ToReader() { + return new MemoryStorageDirectoryReader(std::move(entries)); + } + +protected: + bool Start(Error &_error) override { + return connection.OpenDirectory(path, *this, _error); + } + + void HandleResult(gcc_unused unsigned status, void *data) override { + struct nfsdir *const dir = (struct nfsdir *)data; + + CollectEntries(dir); + connection.CloseDirectory(dir); + } + +private: + void CollectEntries(struct nfsdir *dir); +}; + +inline void +NfsListDirectoryOperation::CollectEntries(struct nfsdir *dir) +{ + assert(entries.empty()); + const struct nfsdirent *ent; - while ((ent = nfs_readdir(ctx, dir)) != nullptr) { + while ((ent = connection.ReadDirectory(dir)) != nullptr) { const Path name_fs = Path::FromFS(ent->name); if (SkipNameFS(name_fs.c_str())) continue; @@ -195,15 +375,27 @@ NfsStorage::OpenDirectory(const char *uri_utf8, Error &error) entries.emplace_front(std::move(name_utf8)); Copy(entries.front().info, *ent); } +} - nfs_closedir(ctx, dir); +StorageDirectoryReader * +NfsStorage::OpenDirectory(const char *uri_utf8, Error &error) +{ + const std::string path = UriToNfsPath(uri_utf8, error); + if (path.empty()) + return nullptr; - /* don't reverse the list - order does not matter */ - return new MemoryStorageDirectoryReader(std::move(entries)); + if (!WaitConnected(error)) + return nullptr; + + NfsListDirectoryOperation operation(*connection, path.c_str()); + if (!operation.Run(error)) + return nullptr; + + return operation.ToReader(); } static Storage * -CreateNfsStorageURI(gcc_unused EventLoop &event_loop, const char *base, +CreateNfsStorageURI(EventLoop &event_loop, const char *base, Error &error) { if (memcmp(base, "nfs://", 6) != 0) @@ -219,22 +411,9 @@ CreateNfsStorageURI(gcc_unused EventLoop &event_loop, const char *base, const std::string server(p, mount); - nfs_context *ctx = nfs_init_context(); - if (ctx == nullptr) { - error.Set(nfs_domain, "nfs_init_context() failed"); - return nullptr; - } - - int result = nfs_mount(ctx, server.c_str(), mount); - if (result < 0) { - nfs_destroy_context(ctx); - error.SetErrno(-result, "nfs_mount() failed"); - return nullptr; - } - nfs_set_base(server.c_str(), mount); - return new NfsStorage(base, ctx); + return new NfsStorage(event_loop, base, server.c_str(), mount); } const StoragePlugin nfs_storage_plugin = {