input/alsa: rebase on AsyncInputStream

Use the snd_pcm_t only in the IOThread, and reuse code that is
well-known to work.
This commit is contained in:
Max Kellermann 2016-06-17 17:34:47 +02:00
parent 5d11759f7d
commit 95e53ac0a0
3 changed files with 62 additions and 60 deletions

View File

@ -215,6 +215,17 @@ AsyncInputStream::Read(void *ptr, size_t read_size, Error &error)
return nbytes; return nbytes;
} }
void
AsyncInputStream::CommitWriteBuffer(size_t nbytes)
{
buffer.Append(nbytes);
if (!IsReady())
SetReady();
else
cond.broadcast();
}
void void
AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) AsyncInputStream::AppendToBuffer(const void *data, size_t append_size)
{ {

View File

@ -133,6 +133,12 @@ protected:
return buffer.GetSpace(); return buffer.GetSpace();
} }
CircularBuffer<uint8_t>::Range PrepareWriteBuffer() {
return buffer.Write();
}
void CommitWriteBuffer(size_t nbytes);
/** /**
* Append data to the buffer. The size must fit into the * Append data to the buffer. The size must fit into the
* buffer; see GetBufferSpace(). * buffer; see GetBufferSpace().

View File

@ -27,7 +27,7 @@
#include "config.h" #include "config.h"
#include "AlsaInputPlugin.hxx" #include "AlsaInputPlugin.hxx"
#include "../InputPlugin.hxx" #include "../InputPlugin.hxx"
#include "../InputStream.hxx" #include "../AsyncInputStream.hxx"
#include "util/Domain.hxx" #include "util/Domain.hxx"
#include "util/Error.hxx" #include "util/Error.hxx"
#include "util/StringCompare.hxx" #include "util/StringCompare.hxx"
@ -36,14 +36,10 @@
#include "Log.hxx" #include "Log.hxx"
#include "event/MultiSocketMonitor.hxx" #include "event/MultiSocketMonitor.hxx"
#include "event/DeferredMonitor.hxx" #include "event/DeferredMonitor.hxx"
#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
#include "IOThread.hxx" #include "IOThread.hxx"
#include <alsa/asoundlib.h> #include <alsa/asoundlib.h>
#include <atomic>
#include <assert.h> #include <assert.h>
#include <string.h> #include <string.h>
@ -56,6 +52,9 @@ static constexpr snd_pcm_format_t default_format = SND_PCM_FORMAT_S16;
static constexpr int default_channels = 2; // stereo static constexpr int default_channels = 2; // stereo
static constexpr unsigned int default_rate = 44100; // cd quality static constexpr unsigned int default_rate = 44100; // cd quality
static constexpr size_t ALSA_MAX_BUFFERED = default_rate * default_channels * 2;
static constexpr size_t ALSA_RESUME_AT = ALSA_MAX_BUFFERED / 2;
/** /**
* This value should be the same as the read buffer size defined in * This value should be the same as the read buffer size defined in
* PcmDecoderPlugin.cxx:pcm_stream_decode(). * PcmDecoderPlugin.cxx:pcm_stream_decode().
@ -66,18 +65,10 @@ static constexpr unsigned int default_rate = 44100; // cd quality
static constexpr size_t read_buffer_size = 4096; static constexpr size_t read_buffer_size = 4096;
class AlsaInputStream final class AlsaInputStream final
: public InputStream, : public AsyncInputStream,
MultiSocketMonitor, DeferredMonitor { MultiSocketMonitor, DeferredMonitor {
snd_pcm_t *capture_handle; snd_pcm_t *capture_handle;
size_t frame_size; size_t frame_size;
int frames_to_read;
bool eof;
/**
* Is somebody waiting for data? This is set by method
* Available().
*/
std::atomic_bool waiting;
ReusableArray<pollfd> pfd_buffer; ReusableArray<pollfd> pfd_buffer;
@ -85,12 +76,12 @@ public:
AlsaInputStream(EventLoop &loop, AlsaInputStream(EventLoop &loop,
const char *_uri, Mutex &_mutex, Cond &_cond, const char *_uri, Mutex &_mutex, Cond &_cond,
snd_pcm_t *_handle, int _frame_size) snd_pcm_t *_handle, int _frame_size)
:InputStream(_uri, _mutex, _cond), :AsyncInputStream(_uri, _mutex, _cond,
ALSA_MAX_BUFFERED, ALSA_RESUME_AT),
MultiSocketMonitor(loop), MultiSocketMonitor(loop),
DeferredMonitor(loop), DeferredMonitor(loop),
capture_handle(_handle), capture_handle(_handle),
frame_size(_frame_size), frame_size(_frame_size)
eof(false)
{ {
assert(_uri != nullptr); assert(_uri != nullptr);
assert(_handle != nullptr); assert(_handle != nullptr);
@ -101,8 +92,6 @@ public:
SetMimeType("audio/x-mpd-cdda-pcm"); SetMimeType("audio/x-mpd-cdda-pcm");
InputStream::SetReady(); InputStream::SetReady();
frames_to_read = read_buffer_size / frame_size;
snd_pcm_start(capture_handle); snd_pcm_start(capture_handle);
DeferredMonitor::Schedule(); DeferredMonitor::Schedule();
@ -112,34 +101,32 @@ public:
snd_pcm_close(capture_handle); snd_pcm_close(capture_handle);
} }
using DeferredMonitor::GetEventLoop;
static InputStream *Create(const char *uri, Mutex &mutex, Cond &cond, static InputStream *Create(const char *uri, Mutex &mutex, Cond &cond,
Error &error); Error &error);
/* virtual methods from InputStream */ protected:
/* virtual methods from AsyncInputStream */
virtual void DoResume() override {
snd_pcm_resume(capture_handle);
bool IsEOF() override { InvalidateSockets();
return eof;
} }
bool IsAvailable() override { virtual void DoSeek(gcc_unused offset_type new_offset) override {
if (snd_pcm_avail(capture_handle) > frames_to_read) /* unreachable because seekable==false */
return true; SeekDone();
if (!waiting.exchange(true))
SafeInvalidateSockets();
return false;
} }
size_t Read(void *ptr, size_t size, Error &error) override;
private: private:
static snd_pcm_t *OpenDevice(const char *device, int rate, static snd_pcm_t *OpenDevice(const char *device, int rate,
snd_pcm_format_t format, int channels, snd_pcm_format_t format, int channels,
Error &error); Error &error);
void Pause() {
AsyncInputStream::Pause();
InvalidateSockets();
}
int Recover(int err); int Recover(int err);
void SafeInvalidateSockets() { void SafeInvalidateSockets() {
@ -183,31 +170,10 @@ AlsaInputStream::Create(const char *uri, Mutex &mutex, Cond &cond,
handle, frame_size); handle, frame_size);
} }
size_t
AlsaInputStream::Read(void *ptr, size_t read_size, Error &error)
{
assert(ptr != nullptr);
int num_frames = read_size / frame_size;
int ret;
while ((ret = snd_pcm_readi(capture_handle, ptr, num_frames)) < 0) {
if (Recover(ret) < 0) {
eof = true;
error.Format(alsa_input_domain,
"PCM error - stream aborted");
return 0;
}
}
size_t nbytes = ret * frame_size;
offset += nbytes;
return nbytes;
}
int int
AlsaInputStream::PrepareSockets() AlsaInputStream::PrepareSockets()
{ {
if (!waiting) { if (IsPaused()) {
ClearSocketList(); ClearSocketList();
return -1; return -1;
} }
@ -231,11 +197,30 @@ AlsaInputStream::PrepareSockets()
void void
AlsaInputStream::DispatchSockets() AlsaInputStream::DispatchSockets()
{ {
waiting = false;
const ScopeLock protect(mutex); const ScopeLock protect(mutex);
/* wake up the thread that is waiting for more data */
cond.broadcast(); auto w = PrepareWriteBuffer();
const snd_pcm_uframes_t w_frames = w.size / frame_size;
if (w_frames == 0) {
/* buffer is full */
Pause();
return;
}
snd_pcm_sframes_t n_frames;
while ((n_frames = snd_pcm_readi(capture_handle,
w.data, w_frames)) < 0) {
if (Recover(n_frames) < 0) {
Error error;
error.Format(alsa_input_domain,
"PCM error - stream aborted");
PostponeError(std::move(error));
return;
}
}
size_t nbytes = n_frames * frame_size;
CommitWriteBuffer(nbytes);
} }
inline int inline int