input/nfs: use the asynchronous libnfs API

More robust and cancellable.
This commit is contained in:
Max Kellermann
2014-03-15 15:29:10 +01:00
parent 966c4244cb
commit c99559dbe9
13 changed files with 1542 additions and 84 deletions

View File

@@ -19,9 +19,12 @@
#include "config.h"
#include "NfsInputPlugin.hxx"
#include "../InputStream.hxx"
#include "../AsyncInputStream.hxx"
#include "../InputPlugin.hxx"
#include "lib/nfs/Domain.hxx"
#include "lib/nfs/Glue.hxx"
#include "lib/nfs/FileReader.hxx"
#include "util/HugeAllocator.hxx"
#include "util/StringUtil.hxx"
#include "util/Error.hxx"
@@ -33,69 +36,158 @@ extern "C" {
#include <sys/stat.h>
#include <fcntl.h>
class NfsInputStream final : public InputStream {
nfs_context *ctx;
nfsfh *fh;
/**
* Do not buffer more than this number of bytes. It should be a
* reasonable limit that doesn't make low-end machines suffer too
* much, but doesn't cause stuttering on high-latency lines.
*/
static const size_t NFS_MAX_BUFFERED = 512 * 1024;
/**
* Resume the stream at this number of bytes after it has been paused.
*/
static const size_t NFS_RESUME_AT = 384 * 1024;
class NfsInputStream final : public AsyncInputStream, NfsFileReader {
uint64_t next_offset;
public:
NfsInputStream(const char *_uri,
Mutex &_mutex, Cond &_cond,
nfs_context *_ctx, nfsfh *_fh,
InputStream::offset_type _size)
:InputStream(_uri, _mutex, _cond),
ctx(_ctx), fh(_fh) {
seekable = true;
size = _size;
SetReady();
void *_buffer)
:AsyncInputStream(_uri, _mutex, _cond,
_buffer, NFS_MAX_BUFFERED,
NFS_RESUME_AT) {}
virtual ~NfsInputStream() {
DeferClose();
}
~NfsInputStream() {
nfs_close(ctx, fh);
nfs_destroy_context(ctx);
bool Open(Error &error) {
assert(!IsReady());
return NfsFileReader::Open(GetURI(), error);
}
/* virtual methods from InputStream */
private:
bool DoRead();
bool IsEOF() override {
return offset >= size;
}
protected:
/* virtual methods from AsyncInputStream */
virtual void DoResume() override;
virtual void DoSeek(offset_type new_offset) override;
size_t Read(void *ptr, size_t size, Error &error) override;
bool Seek(offset_type offset, Error &error) override;
private:
/* virtual methods from NfsFileReader */
void OnNfsFileOpen(uint64_t size) override;
void OnNfsFileRead(const void *data, size_t size) override;
void OnNfsFileError(Error &&error) override;
};
size_t
NfsInputStream::Read(void *ptr, size_t read_size, Error &error)
bool
NfsInputStream::DoRead()
{
int nbytes = nfs_read(ctx, fh, read_size, (char *)ptr);
if (nbytes < 0) {
error.SetErrno(-nbytes, "nfs_read() failed");
nbytes = 0;
assert(NfsFileReader::IsIdle());
int64_t remaining = size - next_offset;
if (remaining <= 0)
return true;
if (IsBufferFull()) {
Pause();
return true;
}
return nbytes;
}
size_t nbytes = std::min<uint64_t>(remaining, 32768);
bool
NfsInputStream::Seek(offset_type new_offset, Error &error)
{
uint64_t current_offset;
int result = nfs_lseek(ctx, fh, new_offset, SEEK_SET,
&current_offset);
if (result < 0) {
error.SetErrno(-result, "smbc_lseek() failed");
mutex.unlock();
Error error;
bool success = NfsFileReader::Read(next_offset, nbytes, error);
mutex.lock();
if (!success) {
PostponeError(std::move(error));
return false;
}
offset = current_offset;
return true;
}
void
NfsInputStream::DoResume()
{
assert(NfsFileReader::IsIdle());
DoRead();
}
void
NfsInputStream::DoSeek(offset_type new_offset)
{
mutex.unlock();
NfsFileReader::CancelRead();
mutex.lock();
next_offset = offset = new_offset;
SeekDone();
DoRead();
}
void
NfsInputStream::OnNfsFileOpen(uint64_t _size)
{
const ScopeLock protect(mutex);
size = _size;
seekable = true;
next_offset = 0;
SetReady();
DoRead();
}
void
NfsInputStream::OnNfsFileRead(const void *data, size_t data_size)
{
const ScopeLock protect(mutex);
assert(!IsBufferFull());
assert(IsBufferFull() == (GetBufferSpace() == 0));
AppendToBuffer(data, data_size);
next_offset += data_size;
DoRead();
}
void
NfsInputStream::OnNfsFileError(Error &&error)
{
const ScopeLock protect(mutex);
postponed_error = std::move(error);
if (IsSeekPending())
SeekDone();
else if (!IsReady())
SetReady();
}
/*
* InputPlugin methods
*
*/
static InputPlugin::InitResult
input_nfs_init(const config_param &, Error &)
{
nfs_init();
return InputPlugin::InitResult::SUCCESS;
}
static void
input_nfs_finish()
{
nfs_finish();
}
static InputStream *
input_nfs_open(const char *uri,
Mutex &mutex, Cond &cond,
@@ -104,62 +196,24 @@ input_nfs_open(const char *uri,
if (!StringStartsWith(uri, "nfs://"))
return nullptr;
uri += 6;
const char *slash = strchr(uri, '/');
if (slash == nullptr) {
error.Set(nfs_domain, "Malformed nfs:// URI");
void *buffer = HugeAllocate(NFS_MAX_BUFFERED);
if (buffer == nullptr) {
error.Set(nfs_domain, "Out of memory");
return nullptr;
}
const std::string server(uri, slash);
uri = slash;
slash = strrchr(uri + 1, '/');
if (slash == nullptr || slash[1] == 0) {
error.Set(nfs_domain, "Malformed nfs:// URI");
NfsInputStream *is = new NfsInputStream(uri, mutex, cond, buffer);
if (!is->Open(error)) {
delete is;
return nullptr;
}
const std::string mount(uri, slash);
uri = slash;
nfs_context *ctx = nfs_init_context();
if (ctx == nullptr) {
error.Set(nfs_domain, "nfs_init_context() failed");
return nullptr;
}
int result = nfs_mount(ctx, server.c_str(), mount.c_str());
if (result < 0) {
nfs_destroy_context(ctx);
error.SetErrno(-result, "nfs_mount() failed");
return nullptr;
}
nfsfh *fh;
result = nfs_open(ctx, uri, O_RDONLY, &fh);
if (result < 0) {
nfs_destroy_context(ctx);
error.SetErrno(-result, "nfs_open() failed");
return nullptr;
}
struct stat st;
result = nfs_fstat(ctx, fh, &st);
if (result < 0) {
nfs_close(ctx, fh);
nfs_destroy_context(ctx);
error.SetErrno(-result, "nfs_fstat() failed");
return nullptr;
}
return new NfsInputStream(uri, mutex, cond, ctx, fh, st.st_size);
return is;
}
const InputPlugin input_plugin_nfs = {
"nfs",
nullptr,
nullptr,
input_nfs_init,
input_nfs_finish,
input_nfs_open,
};