input/mms: move blocking I/O to thread

This commit is contained in:
Max Kellermann 2014-03-16 11:14:16 +01:00
parent 88a0a48b03
commit 707d379b97
2 changed files with 51 additions and 60 deletions

1
NEWS
View File

@ -21,6 +21,7 @@ ver 0.19 (not yet released)
- read tags from songs in an archive
* input
- alsa: new input plugin
- mms: non-blocking I/O
- nfs: new input plugin
- smbclient: new input plugin
* filter

View File

@ -19,40 +19,56 @@
#include "config.h"
#include "MmsInputPlugin.hxx"
#include "../InputStream.hxx"
#include "../InputPlugin.hxx"
#include "input/ThreadInputStream.hxx"
#include "input/InputPlugin.hxx"
#include "util/StringUtil.hxx"
#include "util/Error.hxx"
#include "util/Domain.hxx"
#include <libmms/mmsx.h>
struct MmsInputStream {
InputStream base;
static constexpr size_t MMS_BUFFER_SIZE = 256 * 1024;
class MmsInputStream final : public ThreadInputStream {
mmsx_t *mms;
bool eof;
MmsInputStream(const char *uri,
Mutex &mutex, Cond &cond,
mmsx_t *_mms)
:base(input_plugin_mms, uri, mutex, cond),
mms(_mms), eof(false) {
/* XX is this correct? at least this selects the ffmpeg
decoder, which seems to work fine*/
base.mime = "audio/x-ms-wma";
base.ready = true;
public:
MmsInputStream(const char *uri, Mutex &mutex, Cond &cond)
:ThreadInputStream(input_plugin_mms, uri, mutex, cond,
MMS_BUFFER_SIZE) {
}
~MmsInputStream() {
protected:
virtual bool Open(gcc_unused Error &error) override;
virtual size_t Read(void *ptr, size_t size, Error &error) override;
virtual void Close() {
mmsx_close(mms);
}
};
static constexpr Domain mms_domain("mms");
bool
MmsInputStream::Open(Error &error)
{
Unlock();
mms = mmsx_connect(nullptr, nullptr, GetURI(), 128 * 1024);
if (mms == nullptr) {
Lock();
error.Set(mms_domain, "mmsx_connect() failed");
return false;
}
Lock();
/* TODO: is this correct? at least this selects the ffmpeg
decoder, which seems to work fine */
SetMimeType("audio/x-ms-wma");
return true;
}
static InputStream *
input_mms_open(const char *url,
Mutex &mutex, Cond &cond,
@ -64,51 +80,25 @@ input_mms_open(const char *url,
!StringStartsWith(url, "mmsu://"))
return nullptr;
const auto mms = mmsx_connect(nullptr, nullptr, url, 128 * 1024);
if (mms == nullptr) {
error.Set(mms_domain, "mmsx_connect() failed");
return nullptr;
}
auto m = new MmsInputStream(url, mutex, cond);
auto is = m->Start(error);
if (is == nullptr)
delete m;
auto m = new MmsInputStream(url, mutex, cond, mms);
return &m->base;
return is;
}
static size_t
input_mms_read(InputStream *is, void *ptr, size_t size,
Error &error)
size_t
MmsInputStream::Read(void *ptr, size_t size, Error &error)
{
MmsInputStream *m = (MmsInputStream *)is;
int ret;
ret = mmsx_read(nullptr, m->mms, (char *)ptr, size);
if (ret <= 0) {
if (ret < 0)
int nbytes = mmsx_read(nullptr, mms, (char *)ptr, size);
if (nbytes <= 0) {
if (nbytes < 0)
error.SetErrno("mmsx_read() failed");
m->eof = true;
return false;
return 0;
}
is->offset += ret;
return (size_t)ret;
}
static void
input_mms_close(InputStream *is)
{
MmsInputStream *m = (MmsInputStream *)is;
delete m;
}
static bool
input_mms_eof(InputStream *is)
{
MmsInputStream *m = (MmsInputStream *)is;
return m->eof;
return (size_t)nbytes;
}
const InputPlugin input_plugin_mms = {
@ -116,12 +106,12 @@ const InputPlugin input_plugin_mms = {
nullptr,
nullptr,
input_mms_open,
input_mms_close,
ThreadInputStream::Close,
ThreadInputStream::Check,
nullptr,
nullptr,
nullptr,
nullptr,
input_mms_read,
input_mms_eof,
ThreadInputStream::Available,
ThreadInputStream::Read,
ThreadInputStream::IsEOF,
nullptr,
};