diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx index 4c80e8e57..1c2388898 100644 --- a/src/input/AsyncInputStream.cxx +++ b/src/input/AsyncInputStream.cxx @@ -215,6 +215,17 @@ AsyncInputStream::Read(void *ptr, size_t read_size, Error &error) return nbytes; } +void +AsyncInputStream::CommitWriteBuffer(size_t nbytes) +{ + buffer.Append(nbytes); + + if (!IsReady()) + SetReady(); + else + cond.broadcast(); +} + void AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) { diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx index d4107e627..0d53e3c59 100644 --- a/src/input/AsyncInputStream.hxx +++ b/src/input/AsyncInputStream.hxx @@ -133,6 +133,12 @@ protected: return buffer.GetSpace(); } + CircularBuffer::Range PrepareWriteBuffer() { + return buffer.Write(); + } + + void CommitWriteBuffer(size_t nbytes); + /** * Append data to the buffer. The size must fit into the * buffer; see GetBufferSpace(). diff --git a/src/input/plugins/AlsaInputPlugin.cxx b/src/input/plugins/AlsaInputPlugin.cxx index 85b61ee10..a99f35e30 100644 --- a/src/input/plugins/AlsaInputPlugin.cxx +++ b/src/input/plugins/AlsaInputPlugin.cxx @@ -27,7 +27,7 @@ #include "config.h" #include "AlsaInputPlugin.hxx" #include "../InputPlugin.hxx" -#include "../InputStream.hxx" +#include "../AsyncInputStream.hxx" #include "util/Domain.hxx" #include "util/Error.hxx" #include "util/StringCompare.hxx" @@ -36,14 +36,10 @@ #include "Log.hxx" #include "event/MultiSocketMonitor.hxx" #include "event/DeferredMonitor.hxx" -#include "thread/Mutex.hxx" -#include "thread/Cond.hxx" #include "IOThread.hxx" #include -#include - #include #include @@ -56,6 +52,9 @@ static constexpr snd_pcm_format_t default_format = SND_PCM_FORMAT_S16; static constexpr int default_channels = 2; // stereo static constexpr unsigned int default_rate = 44100; // cd quality +static constexpr size_t ALSA_MAX_BUFFERED = default_rate * default_channels * 2; +static constexpr size_t ALSA_RESUME_AT = ALSA_MAX_BUFFERED / 2; + /** * This value should be the same as the read buffer size defined in * PcmDecoderPlugin.cxx:pcm_stream_decode(). @@ -66,18 +65,10 @@ static constexpr unsigned int default_rate = 44100; // cd quality static constexpr size_t read_buffer_size = 4096; class AlsaInputStream final - : public InputStream, + : public AsyncInputStream, MultiSocketMonitor, DeferredMonitor { snd_pcm_t *capture_handle; size_t frame_size; - int frames_to_read; - bool eof; - - /** - * Is somebody waiting for data? This is set by method - * Available(). - */ - std::atomic_bool waiting; ReusableArray pfd_buffer; @@ -85,12 +76,12 @@ public: AlsaInputStream(EventLoop &loop, const char *_uri, Mutex &_mutex, Cond &_cond, snd_pcm_t *_handle, int _frame_size) - :InputStream(_uri, _mutex, _cond), + :AsyncInputStream(_uri, _mutex, _cond, + ALSA_MAX_BUFFERED, ALSA_RESUME_AT), MultiSocketMonitor(loop), DeferredMonitor(loop), capture_handle(_handle), - frame_size(_frame_size), - eof(false) + frame_size(_frame_size) { assert(_uri != nullptr); assert(_handle != nullptr); @@ -101,8 +92,6 @@ public: SetMimeType("audio/x-mpd-cdda-pcm"); InputStream::SetReady(); - frames_to_read = read_buffer_size / frame_size; - snd_pcm_start(capture_handle); DeferredMonitor::Schedule(); @@ -112,34 +101,32 @@ public: snd_pcm_close(capture_handle); } - using DeferredMonitor::GetEventLoop; - static InputStream *Create(const char *uri, Mutex &mutex, Cond &cond, Error &error); - /* virtual methods from InputStream */ +protected: + /* virtual methods from AsyncInputStream */ + virtual void DoResume() override { + snd_pcm_resume(capture_handle); - bool IsEOF() override { - return eof; + InvalidateSockets(); } - bool IsAvailable() override { - if (snd_pcm_avail(capture_handle) > frames_to_read) - return true; - - if (!waiting.exchange(true)) - SafeInvalidateSockets(); - - return false; + virtual void DoSeek(gcc_unused offset_type new_offset) override { + /* unreachable because seekable==false */ + SeekDone(); } - size_t Read(void *ptr, size_t size, Error &error) override; - private: static snd_pcm_t *OpenDevice(const char *device, int rate, snd_pcm_format_t format, int channels, Error &error); + void Pause() { + AsyncInputStream::Pause(); + InvalidateSockets(); + } + int Recover(int err); void SafeInvalidateSockets() { @@ -183,31 +170,10 @@ AlsaInputStream::Create(const char *uri, Mutex &mutex, Cond &cond, handle, frame_size); } -size_t -AlsaInputStream::Read(void *ptr, size_t read_size, Error &error) -{ - assert(ptr != nullptr); - - int num_frames = read_size / frame_size; - int ret; - while ((ret = snd_pcm_readi(capture_handle, ptr, num_frames)) < 0) { - if (Recover(ret) < 0) { - eof = true; - error.Format(alsa_input_domain, - "PCM error - stream aborted"); - return 0; - } - } - - size_t nbytes = ret * frame_size; - offset += nbytes; - return nbytes; -} - int AlsaInputStream::PrepareSockets() { - if (!waiting) { + if (IsPaused()) { ClearSocketList(); return -1; } @@ -231,11 +197,30 @@ AlsaInputStream::PrepareSockets() void AlsaInputStream::DispatchSockets() { - waiting = false; - const ScopeLock protect(mutex); - /* wake up the thread that is waiting for more data */ - cond.broadcast(); + + auto w = PrepareWriteBuffer(); + const snd_pcm_uframes_t w_frames = w.size / frame_size; + if (w_frames == 0) { + /* buffer is full */ + Pause(); + return; + } + + snd_pcm_sframes_t n_frames; + while ((n_frames = snd_pcm_readi(capture_handle, + w.data, w_frames)) < 0) { + if (Recover(n_frames) < 0) { + Error error; + error.Format(alsa_input_domain, + "PCM error - stream aborted"); + PostponeError(std::move(error)); + return; + } + } + + size_t nbytes = n_frames * frame_size; + CommitWriteBuffer(nbytes); } inline int