storage/nfs: use the libnfs async API

Share the NFS connection with the NFS input plugin.
This commit is contained in:
Max Kellermann 2014-10-01 23:57:28 +02:00
parent 990809cc21
commit bb922d577d
4 changed files with 399 additions and 50 deletions

View File

@ -561,6 +561,7 @@ NFS_SOURCES = \
src/lib/nfs/Glue.cxx src/lib/nfs/Glue.hxx \ src/lib/nfs/Glue.cxx src/lib/nfs/Glue.hxx \
src/lib/nfs/Base.cxx src/lib/nfs/Base.hxx \ src/lib/nfs/Base.cxx src/lib/nfs/Base.hxx \
src/lib/nfs/FileReader.cxx src/lib/nfs/FileReader.hxx \ src/lib/nfs/FileReader.cxx src/lib/nfs/FileReader.hxx \
src/lib/nfs/Blocking.cxx src/lib/nfs/Blocking.hxx \
src/lib/nfs/Domain.cxx src/lib/nfs/Domain.hxx src/lib/nfs/Domain.cxx src/lib/nfs/Domain.hxx
if ENABLE_DATABASE if ENABLE_DATABASE

84
src/lib/nfs/Blocking.cxx Normal file
View File

@ -0,0 +1,84 @@
/*
* Copyright (C) 2003-2014 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "config.h"
#include "Blocking.hxx"
#include "Connection.hxx"
#include "event/Call.hxx"
bool
BlockingNfsOperation::Run(Error &_error)
{
/* subscribe to the connection, which will invoke either
OnNfsConnectionReady() or OnNfsConnectionFailed() */
BlockingCall(connection.GetEventLoop(),
[this](){ connection.AddLease(*this); });
/* wait for completion */
LockWaitFinished();
/* check for error */
if (error.IsDefined()) {
_error = std::move(error);
return false;
}
return true;
}
void
BlockingNfsOperation::OnNfsConnectionReady()
{
if (!Start(error)) {
connection.RemoveLease(*this);
LockSetFinished();
}
}
void
BlockingNfsOperation::OnNfsConnectionFailed(const Error &_error)
{
error.Set(_error);
LockSetFinished();
}
void
BlockingNfsOperation::OnNfsConnectionDisconnected(const Error &_error)
{
error.Set(_error);
LockSetFinished();
}
void
BlockingNfsOperation::OnNfsCallback(unsigned status, void *data)
{
connection.RemoveLease(*this);
HandleResult(status, data);
LockSetFinished();
}
void
BlockingNfsOperation::OnNfsError(Error &&_error)
{
connection.RemoveLease(*this);
error = std::move(_error);
LockSetFinished();
}

85
src/lib/nfs/Blocking.hxx Normal file
View File

@ -0,0 +1,85 @@
/*
* Copyright (C) 2003-2014 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef MPD_BLOCKING_NFS_CALLBACK_HXX
#define MPD_BLOCKING_NFS_CALLBACK_HXX
#include "check.h"
#include "Callback.hxx"
#include "Lease.hxx"
#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
#include "util/Error.hxx"
class NfsConnection;
/**
* Utility class to implement a blocking NFS call using the libnfs
* async API. The actual method call is deferred to the #EventLoop
* thread, and method Run() waits for completion.
*/
class BlockingNfsOperation : protected NfsCallback, NfsLease {
Mutex mutex;
Cond cond;
bool finished;
Error error;
protected:
NfsConnection &connection;
public:
BlockingNfsOperation(NfsConnection &_connection)
:finished(false), connection(_connection) {}
bool Run(Error &error);
private:
void LockWaitFinished() {
const ScopeLock protect(mutex);
while (!finished)
cond.wait(mutex);
}
/**
* Mark the operation as "finished" and wake up the waiting
* thread.
*/
void LockSetFinished() {
const ScopeLock protect(mutex);
finished = true;
cond.signal();
}
/* virtual methods from NfsLease */
void OnNfsConnectionReady() final;
void OnNfsConnectionFailed(const Error &error) final;
void OnNfsConnectionDisconnected(const Error &error) final;
/* virtual methods from NfsCallback */
void OnNfsCallback(unsigned status, void *data) final;
void OnNfsError(Error &&error) final;
protected:
virtual bool Start(Error &error) = 0;
virtual void HandleResult(unsigned status, void *data) = 0;
};
#endif

View File

@ -23,31 +23,64 @@
#include "storage/StorageInterface.hxx" #include "storage/StorageInterface.hxx"
#include "storage/FileInfo.hxx" #include "storage/FileInfo.hxx"
#include "storage/MemoryDirectoryReader.hxx" #include "storage/MemoryDirectoryReader.hxx"
#include "lib/nfs/Blocking.hxx"
#include "lib/nfs/Domain.hxx" #include "lib/nfs/Domain.hxx"
#include "lib/nfs/Base.hxx" #include "lib/nfs/Base.hxx"
#include "lib/nfs/Lease.hxx"
#include "lib/nfs/Connection.hxx"
#include "lib/nfs/Glue.hxx"
#include "fs/AllocatedPath.hxx" #include "fs/AllocatedPath.hxx"
#include "util/Error.hxx" #include "util/Error.hxx"
#include "thread/Mutex.hxx" #include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
#include "event/Loop.hxx"
#include "event/Call.hxx"
#include "event/DeferredMonitor.hxx"
#include "event/TimeoutMonitor.hxx"
extern "C" { extern "C" {
#include <nfsc/libnfs.h> #include <nfsc/libnfs.h>
#include <nfsc/libnfs-raw-nfs.h> #include <nfsc/libnfs-raw-nfs.h>
} }
#include <string>
#include <assert.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>
class NfsStorage final : public Storage { class NfsStorage final
: public Storage, NfsLease, DeferredMonitor, TimeoutMonitor {
enum class State {
INITIAL, CONNECTING, READY, DELAY,
};
const std::string base; const std::string base;
nfs_context *const ctx; const std::string server, export_name;
NfsConnection *connection;
Mutex mutex;
Cond cond;
State state;
Error last_error;
public: public:
NfsStorage(const char *_base, nfs_context *_ctx) NfsStorage(EventLoop &_loop, const char *_base,
:base(_base), ctx(_ctx) {} std::string &&_server, std::string &&_export_name)
:DeferredMonitor(_loop), TimeoutMonitor(_loop),
base(_base),
server(std::move(_server)),
export_name(std::move(_export_name)),
state(State::INITIAL) {
nfs_init();
}
virtual ~NfsStorage() { ~NfsStorage() {
nfs_destroy_context(ctx); BlockingCall(GetEventLoop(), [this](){ Disconnect(); });
nfs_finish();
} }
/* virtual methods from class Storage */ /* virtual methods from class Storage */
@ -60,6 +93,125 @@ public:
std::string MapUTF8(const char *uri_utf8) const override; std::string MapUTF8(const char *uri_utf8) const override;
const char *MapToRelativeUTF8(const char *uri_utf8) const override; const char *MapToRelativeUTF8(const char *uri_utf8) const override;
/* virtual methods from NfsLease */
void OnNfsConnectionReady() final {
assert(state == State::CONNECTING);
SetState(State::READY);
}
void OnNfsConnectionFailed(gcc_unused const Error &error) final {
assert(state == State::CONNECTING);
SetState(State::DELAY, error);
TimeoutMonitor::ScheduleSeconds(60);
}
void OnNfsConnectionDisconnected(gcc_unused const Error &error) final {
assert(state == State::READY);
SetState(State::DELAY, error);
TimeoutMonitor::ScheduleSeconds(5);
}
/* virtual methods from DeferredMonitor */
void RunDeferred() final {
if (state == State::INITIAL)
Connect();
}
/* virtual methods from TimeoutMonitor */
void OnTimeout() final {
assert(state == State::DELAY);
Connect();
}
private:
EventLoop &GetEventLoop() {
return DeferredMonitor::GetEventLoop();
}
void SetState(State _state) {
assert(GetEventLoop().IsInside());
const ScopeLock protect(mutex);
state = _state;
cond.broadcast();
}
void SetState(State _state, const Error &error) {
assert(GetEventLoop().IsInside());
const ScopeLock protect(mutex);
state = _state;
last_error.Set(error);
cond.broadcast();
}
void Connect() {
assert(state != State::READY);
assert(GetEventLoop().IsInside());
connection = &nfs_get_connection(server.c_str(),
export_name.c_str());
connection->AddLease(*this);
SetState(State::CONNECTING);
}
void EnsureConnected() {
if (state != State::READY)
Connect();
}
bool WaitConnected(Error &error) {
const ScopeLock protect(mutex);
while (true) {
switch (state) {
case State::INITIAL:
/* schedule connect */
mutex.unlock();
DeferredMonitor::Schedule();
mutex.lock();
break;
case State::CONNECTING:
case State::READY:
return true;
case State::DELAY:
assert(last_error.IsDefined());
error.Set(last_error);
return false;
}
cond.wait(mutex);
}
}
void Disconnect() {
assert(GetEventLoop().IsInside());
switch (state) {
case State::INITIAL:
DeferredMonitor::Cancel();
break;
case State::CONNECTING:
case State::READY:
connection->RemoveLease(*this);
SetState(State::INITIAL);
break;
case State::DELAY:
TimeoutMonitor::Cancel();
SetState(State::INITIAL);
break;
}
}
}; };
static std::string static std::string
@ -107,19 +259,24 @@ Copy(FileInfo &info, const struct stat &st)
info.inode = st.st_ino; info.inode = st.st_ino;
} }
static bool class NfsGetInfoOperation final : public BlockingNfsOperation {
GetInfo(nfs_context *ctx, const char *path, FileInfo &info, Error &error) const char *const path;
{ FileInfo &info;
struct stat st;
int result = nfs_stat(ctx, path, &st); public:
if (result < 0) { NfsGetInfoOperation(NfsConnection &_connection, const char *_path,
error.SetErrno(-result, "nfs_stat() failed"); FileInfo &_info)
return false; :BlockingNfsOperation(_connection), path(_path), info(_info) {}
protected:
bool Start(Error &_error) override {
return connection.Stat(path, *this, _error);
} }
Copy(info, st); void HandleResult(gcc_unused unsigned status, void *data) override {
return true; Copy(info, *(const struct stat *)data);
} }
};
bool bool
NfsStorage::GetInfo(const char *uri_utf8, gcc_unused bool follow, NfsStorage::GetInfo(const char *uri_utf8, gcc_unused bool follow,
@ -129,7 +286,11 @@ NfsStorage::GetInfo(const char *uri_utf8, gcc_unused bool follow,
if (path.empty()) if (path.empty())
return false; return false;
return ::GetInfo(ctx, path.c_str(), info, error); if (!WaitConnected(error))
return nullptr;
NfsGetInfoOperation operation(*connection, path.c_str(), info);
return operation.Run(error);
} }
gcc_pure gcc_pure
@ -164,24 +325,43 @@ Copy(FileInfo &info, const struct nfsdirent &ent)
info.inode = ent.inode; info.inode = ent.inode;
} }
StorageDirectoryReader * class NfsListDirectoryOperation final : public BlockingNfsOperation {
NfsStorage::OpenDirectory(const char *uri_utf8, Error &error) const char *const path;
{
const std::string path = UriToNfsPath(uri_utf8, error);
if (path.empty())
return nullptr;
nfsdir *dir;
int result = nfs_opendir(ctx, path.c_str(), &dir);
if (result < 0) {
error.SetErrno(-result, "nfs_opendir() failed");
return nullptr;
}
MemoryStorageDirectoryReader::List entries; MemoryStorageDirectoryReader::List entries;
public:
NfsListDirectoryOperation(NfsConnection &_connection,
const char *_path)
:BlockingNfsOperation(_connection), path(_path) {}
StorageDirectoryReader *ToReader() {
return new MemoryStorageDirectoryReader(std::move(entries));
}
protected:
bool Start(Error &_error) override {
return connection.OpenDirectory(path, *this, _error);
}
void HandleResult(gcc_unused unsigned status, void *data) override {
struct nfsdir *const dir = (struct nfsdir *)data;
CollectEntries(dir);
connection.CloseDirectory(dir);
}
private:
void CollectEntries(struct nfsdir *dir);
};
inline void
NfsListDirectoryOperation::CollectEntries(struct nfsdir *dir)
{
assert(entries.empty());
const struct nfsdirent *ent; const struct nfsdirent *ent;
while ((ent = nfs_readdir(ctx, dir)) != nullptr) { while ((ent = connection.ReadDirectory(dir)) != nullptr) {
const Path name_fs = Path::FromFS(ent->name); const Path name_fs = Path::FromFS(ent->name);
if (SkipNameFS(name_fs.c_str())) if (SkipNameFS(name_fs.c_str()))
continue; continue;
@ -195,15 +375,27 @@ NfsStorage::OpenDirectory(const char *uri_utf8, Error &error)
entries.emplace_front(std::move(name_utf8)); entries.emplace_front(std::move(name_utf8));
Copy(entries.front().info, *ent); Copy(entries.front().info, *ent);
} }
}
nfs_closedir(ctx, dir); StorageDirectoryReader *
NfsStorage::OpenDirectory(const char *uri_utf8, Error &error)
{
const std::string path = UriToNfsPath(uri_utf8, error);
if (path.empty())
return nullptr;
/* don't reverse the list - order does not matter */ if (!WaitConnected(error))
return new MemoryStorageDirectoryReader(std::move(entries)); return nullptr;
NfsListDirectoryOperation operation(*connection, path.c_str());
if (!operation.Run(error))
return nullptr;
return operation.ToReader();
} }
static Storage * static Storage *
CreateNfsStorageURI(gcc_unused EventLoop &event_loop, const char *base, CreateNfsStorageURI(EventLoop &event_loop, const char *base,
Error &error) Error &error)
{ {
if (memcmp(base, "nfs://", 6) != 0) if (memcmp(base, "nfs://", 6) != 0)
@ -219,22 +411,9 @@ CreateNfsStorageURI(gcc_unused EventLoop &event_loop, const char *base,
const std::string server(p, mount); const std::string server(p, mount);
nfs_context *ctx = nfs_init_context();
if (ctx == nullptr) {
error.Set(nfs_domain, "nfs_init_context() failed");
return nullptr;
}
int result = nfs_mount(ctx, server.c_str(), mount);
if (result < 0) {
nfs_destroy_context(ctx);
error.SetErrno(-result, "nfs_mount() failed");
return nullptr;
}
nfs_set_base(server.c_str(), mount); nfs_set_base(server.c_str(), mount);
return new NfsStorage(base, ctx); return new NfsStorage(event_loop, base, server.c_str(), mount);
} }
const StoragePlugin nfs_storage_plugin = { const StoragePlugin nfs_storage_plugin = {