diff --git a/Makefile.am b/Makefile.am index 3df7fa2f4..0387d0ddd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1413,6 +1413,7 @@ libmixer_plugins_a_CPPFLAGS = $(AM_CPPFLAGS) \ if ENABLE_ALSA liboutput_plugins_a_SOURCES += \ + $(ALSA_SOURCES) \ src/output/plugins/AlsaOutputPlugin.cxx \ src/output/plugins/AlsaOutputPlugin.hxx libmixer_plugins_a_SOURCES += \ diff --git a/NEWS b/NEWS index a5bad06ad..a412ac5a2 100644 --- a/NEWS +++ b/NEWS @@ -2,6 +2,8 @@ ver 0.21 (not yet released) * protocol - "tagtypes" can be used to hide tags - "find" and "search" can sort +* output + - alsa: non-blocking mode ver 0.20.5 (not yet released) * tags diff --git a/src/output/plugins/AlsaOutputPlugin.cxx b/src/output/plugins/AlsaOutputPlugin.cxx index 74961a224..1cb364c22 100644 --- a/src/output/plugins/AlsaOutputPlugin.cxx +++ b/src/output/plugins/AlsaOutputPlugin.cxx @@ -19,6 +19,7 @@ #include "config.h" #include "AlsaOutputPlugin.hxx" +#include "lib/alsa/NonBlock.hxx" #include "../OutputAPI.hxx" #include "../Wrapper.hxx" #include "mixer/MixerList.hxx" @@ -28,10 +29,16 @@ #include "util/RuntimeError.hxx" #include "util/Domain.hxx" #include "util/ConstBuffer.hxx" +#include "event/MultiSocketMonitor.hxx" +#include "event/DeferredMonitor.hxx" +#include "event/Call.hxx" +#include "IOThread.hxx" #include "Log.hxx" #include +#include + #include #if SND_LIB_VERSION >= 0x1001c @@ -50,7 +57,9 @@ static constexpr unsigned MPD_ALSA_BUFFER_TIME_US = 500000; static constexpr unsigned MPD_ALSA_RETRY_NR = 5; -class AlsaOutput { +class AlsaOutput final + : MultiSocketMonitor, DeferredMonitor { + friend struct AudioOutputWrapper; AudioOutput base; @@ -100,9 +109,10 @@ class AlsaOutput { snd_pcm_uframes_t period_frames; /** - * The number of frames written in the current period. + * After Open(), has this output been activated by a Play() + * command? */ - snd_pcm_uframes_t period_position; + bool active; /** * Do we need to call snd_pcm_prepare() before the next write? @@ -116,6 +126,8 @@ class AlsaOutput { */ bool must_prepare; + bool drain; + /** * This buffer gets allocated after opening the ALSA device. * It contains silence samples, enough to fill one period (see @@ -123,8 +135,137 @@ class AlsaOutput { */ uint8_t *silence; + /** + * For PrepareAlsaPcmSockets(). + */ + ReusableArray pfd_buffer; + + /** + * For copying data from OutputThread to IOThread. + */ + boost::lockfree::spsc_queue *ring_buffer; + + class PeriodBuffer { + size_t capacity, head, tail; + + uint8_t *buffer; + + public: + PeriodBuffer() = default; + PeriodBuffer(const PeriodBuffer &) = delete; + PeriodBuffer &operator=(const PeriodBuffer &) = delete; + + void Allocate(size_t n_frames, size_t frame_size) { + capacity = n_frames * frame_size; + + /* reserve space for one more (partial) frame, + to be able to fill the buffer with silence, + after moving an unfinished frame to the + end */ + buffer = new uint8_t[capacity + frame_size - 1]; + head = tail = 0; + } + + void Free() { + delete[] buffer; + } + + bool IsEmpty() const { + return head == tail; + } + + bool IsFull() const { + return tail >= capacity; + } + + uint8_t *GetTail() { + return buffer + tail; + } + + size_t GetSpaceBytes() const { + assert(tail <= capacity); + + return capacity - tail; + } + + void AppendBytes(size_t n) { + assert(n <= capacity); + assert(tail <= capacity - n); + + tail += n; + } + + void FillWithSilence(const uint8_t *_silence, + const size_t frame_size) { + size_t partial_frame = tail % frame_size; + auto *dest = GetTail() - partial_frame; + + /* move the partial frame to the end */ + std::copy(dest, GetTail(), buffer + capacity); + + size_t silence_size = capacity - tail - partial_frame; + std::copy_n(_silence, silence_size, dest); + + tail = capacity + partial_frame; + } + + const uint8_t *GetHead() const { + return buffer + head; + } + + snd_pcm_uframes_t GetFrames(size_t frame_size) const { + return (tail - head) / frame_size; + } + + void ConsumeBytes(size_t n) { + head += n; + + assert(head <= capacity); + + if (head >= capacity) { + tail -= head; + /* copy the partial frame (if any) + back to the beginning */ + std::copy_n(GetHead(), tail, buffer); + head = 0; + } + } + + void ConsumeFrames(snd_pcm_uframes_t n, size_t frame_size) { + ConsumeBytes(n * frame_size); + } + + snd_pcm_uframes_t GetPeriodPosition(size_t frame_size) const { + return head / frame_size; + } + + void Rewind() { + head = 0; + } + + void Clear() { + head = tail = 0; + } + }; + + PeriodBuffer period_buffer; + + /** + * Protects #cond, #error, #drain. + */ + mutable Mutex mutex; + + /** + * Used to wait when #ring_buffer is full. It will be + * signalled each time data is popped from the #ring_buffer, + * making space for more data. + */ + Cond cond; + + std::exception_ptr error; + public: - AlsaOutput(const ConfigBlock &block); + AlsaOutput(EventLoop &loop, const ConfigBlock &block); ~AlsaOutput() { /* free libasound's config cache */ @@ -145,7 +286,6 @@ public: void Open(AudioFormat &audio_format); void Close(); - size_t PlayRaw(ConstBuffer data); size_t Play(const void *chunk, size_t size); void Drain(); void Cancel(); @@ -166,21 +306,102 @@ private: void SetupOrDop(AudioFormat &audio_format, PcmExport::Params ¶ms); + /** + * Activate the output by registering the sockets in the + * #EventLoop. Before calling this, filling the ring buffer + * has no effect; nothing will be played, and no code will be + * run on #EventLoop's thread. + */ + void Activate() { + if (active) + return; + + active = true; + DeferredMonitor::Schedule(); + } + + /** + * Wrapper for Activate() which unlocks our mutex. Call this + * if you're holding the mutex. + */ + void UnlockActivate() { + if (active) + return; + + const ScopeUnlock unlock(mutex); + Activate(); + } + + void ClearRingBuffer() { + std::array buffer; + while (ring_buffer->pop(&buffer.front(), buffer.size())) {} + } + int Recover(int err); /** - * Write silence to the ALSA device. + * Drain all buffers. To be run in #EventLoop's thread. + * + * @return true if draining is complete, false if this method + * needs to be called again later */ - void WriteSilence(snd_pcm_uframes_t nframes) { - snd_pcm_writei(pcm, silence, nframes); + bool DrainInternal(); + + /** + * Stop playback immediately, dropping all buffers. To be run + * in #EventLoop's thread. + */ + void CancelInternal(); + + void CopyRingToPeriodBuffer() { + if (period_buffer.IsFull()) + return; + + size_t nbytes = ring_buffer->pop(period_buffer.GetTail(), + period_buffer.GetSpaceBytes()); + if (nbytes == 0) + return; + + period_buffer.AppendBytes(nbytes); + + const std::lock_guard lock(mutex); + /* notify the OutputThread that there is now + room in ring_buffer */ + cond.signal(); } + snd_pcm_sframes_t WriteFromPeriodBuffer() { + assert(!period_buffer.IsEmpty()); + + auto frames_written = snd_pcm_writei(pcm, period_buffer.GetHead(), + period_buffer.GetFrames(out_frame_size)); + if (frames_written > 0) + period_buffer.ConsumeFrames(frames_written, + out_frame_size); + + return frames_written; + } + + bool LockHasError() const { + const std::lock_guard lock(mutex); + return !!error; + } + + /* virtual methods from class DeferredMonitor */ + virtual void RunDeferred() override { + InvalidateSockets(); + } + + /* virtual methods from class MultiSocketMonitor */ + virtual std::chrono::steady_clock::duration PrepareSockets() override; + virtual void DispatchSockets() override; }; static constexpr Domain alsa_output_domain("alsa_output"); -AlsaOutput::AlsaOutput(const ConfigBlock &block) - :base(alsa_output_plugin, block), +AlsaOutput::AlsaOutput(EventLoop &loop, const ConfigBlock &block) + :MultiSocketMonitor(loop), DeferredMonitor(loop), + base(alsa_output_plugin, block), device(block.GetBlockValue("device", "")), #ifdef ENABLE_DSD dop(block.GetBlockValue("dop", false) || @@ -210,7 +431,7 @@ AlsaOutput::AlsaOutput(const ConfigBlock &block) inline AlsaOutput * AlsaOutput::Create(EventLoop &, const ConfigBlock &block) { - return new AlsaOutput(block); + return new AlsaOutput(io_thread_get(), block); } inline void @@ -692,7 +913,6 @@ AlsaOutput::Setup(AudioFormat &audio_format, alsa_period_size = 1; period_frames = alsa_period_size; - period_position = 0; silence = new uint8_t[snd_pcm_frames_to_bytes(pcm, alsa_period_size)]; snd_pcm_format_set_silence(format, silence, @@ -793,6 +1013,8 @@ AlsaOutput::Open(AudioFormat &audio_format) GetDevice())); } + snd_pcm_nonblock(pcm, 1); + #ifdef ENABLE_DSD if (params.dop) FormatDebug(alsa_output_domain, "DoP (DSD over PCM) enabled"); @@ -805,6 +1027,17 @@ AlsaOutput::Open(AudioFormat &audio_format) in_frame_size = audio_format.GetFrameSize(); out_frame_size = pcm_export->GetFrameSize(audio_format); + drain = false; + + size_t period_size = period_frames * out_frame_size; + ring_buffer = new boost::lockfree::spsc_queue(period_size * 4); + + /* reserve space for one more (partial) frame, to be able to + fill the buffer with silence, after moving an unfinished + frame to the end */ + period_buffer.Allocate(period_frames, out_frame_size); + + active = false; must_prepare = false; } @@ -836,7 +1069,7 @@ AlsaOutput::Recover(int err) case SND_PCM_STATE_OPEN: case SND_PCM_STATE_SETUP: case SND_PCM_STATE_XRUN: - period_position = 0; + period_buffer.Rewind(); err = snd_pcm_prepare(pcm); break; case SND_PCM_STATE_DISCONNECTED: @@ -852,88 +1085,107 @@ AlsaOutput::Recover(int err) return err; } -inline void -AlsaOutput::Drain() +inline bool +AlsaOutput::DrainInternal() { - if (snd_pcm_state(pcm) != SND_PCM_STATE_RUNNING) - return; - - if (period_position > 0) { - /* generate some silence to finish the partial - period */ - snd_pcm_uframes_t nframes = - period_frames - period_position; - WriteSilence(nframes); + if (snd_pcm_state(pcm) != SND_PCM_STATE_RUNNING) { + CancelInternal(); + return true; } - snd_pcm_drain(pcm); + /* drain ring_buffer */ + CopyRingToPeriodBuffer(); - period_position = 0; + auto period_position = period_buffer.GetPeriodPosition(out_frame_size); + if (period_position > 0) + /* generate some silence to finish the partial + period */ + period_buffer.FillWithSilence(silence, out_frame_size); + + /* drain period_buffer */ + if (!period_buffer.IsEmpty()) { + auto frames_written = WriteFromPeriodBuffer(); + if (frames_written < 0 && errno != EAGAIN) { + CancelInternal(); + return true; + } + + if (!period_buffer.IsEmpty()) + /* need to call WriteFromPeriodBuffer() again + in the next iteration, so don't finish the + drain just yet */ + return false; + } + + /* .. and finally drain the ALSA hardware buffer */ + return snd_pcm_drain(pcm) != -EAGAIN; } inline void -AlsaOutput::Cancel() +AlsaOutput::Drain() +{ + const std::lock_guard lock(mutex); + + drain = true; + + UnlockActivate(); + + while (drain && !error) + cond.wait(mutex); +} + +inline void +AlsaOutput::CancelInternal() { - period_position = 0; must_prepare = true; snd_pcm_drop(pcm); pcm_export->Reset(); + period_buffer.Clear(); + ClearRingBuffer(); +} + +inline void +AlsaOutput::Cancel() +{ + if (!active) { + /* early cancel, quick code path without thread + synchronization */ + + pcm_export->Reset(); + assert(period_buffer.IsEmpty()); + ClearRingBuffer(); + + return; + } + + BlockingCall(MultiSocketMonitor::GetEventLoop(), [this](){ + CancelInternal(); + }); } inline void AlsaOutput::Close() { + /* make sure the I/O thread isn't inside DispatchSockets() */ + BlockingCall(MultiSocketMonitor::GetEventLoop(), [this](){ + MultiSocketMonitor::Reset(); + DeferredMonitor::Cancel(); + }); + + period_buffer.Free(); + delete ring_buffer; snd_pcm_close(pcm); delete[] silence; } -inline size_t -AlsaOutput::PlayRaw(ConstBuffer data) -{ - if (data.IsEmpty()) - return 0; - - assert(data.size % out_frame_size == 0); - - const size_t n_frames = data.size / out_frame_size; - assert(n_frames > 0); - - while (true) { - const auto frames_written = snd_pcm_writei(pcm, data.data, - n_frames); - if (frames_written > 0) { - period_position = (period_position + frames_written) - % period_frames; - - return frames_written * out_frame_size; - } - - if (frames_written < 0 && frames_written != -EAGAIN && - frames_written != -EINTR && - Recover(frames_written) < 0) - throw FormatRuntimeError("snd_pcm_writei() failed: %s", - snd_strerror(-frames_written)); - } - -} - inline size_t AlsaOutput::Play(const void *chunk, size_t size) { assert(size > 0); assert(size % in_frame_size == 0); - if (must_prepare) { - must_prepare = false; - - int err = snd_pcm_prepare(pcm); - if (err < 0) - throw FormatRuntimeError("snd_pcm_prepare() failed: %s", - snd_strerror(-err)); - } - const auto e = pcm_export->Export({chunk, size}); if (e.size == 0) /* the DoP (DSD over PCM) filter converts two frames @@ -944,8 +1196,96 @@ AlsaOutput::Play(const void *chunk, size_t size) been played */ return size; - const size_t bytes_written = PlayRaw(e); - return pcm_export->CalcSourceSize(bytes_written); + const std::lock_guard lock(mutex); + + while (true) { + if (error) + std::rethrow_exception(error); + + size_t bytes_written = ring_buffer->push((const uint8_t *)chunk, + size); + if (bytes_written > 0) + return pcm_export->CalcSourceSize(bytes_written); + + /* now that the ring_buffer is full, we can activate + the socket handlers to trigger the first + snd_pcm_writei() */ + UnlockActivate(); + + /* wait for the DispatchSockets() to make room in the + ring_buffer */ + cond.wait(mutex); + } +} + +std::chrono::steady_clock::duration +AlsaOutput::PrepareSockets() +{ + if (LockHasError()) { + ClearSocketList(); + return std::chrono::steady_clock::duration(-1); + } + + return PrepareAlsaPcmSockets(*this, pcm, pfd_buffer); +} + +void +AlsaOutput::DispatchSockets() +try { + { + const std::lock_guard lock(mutex); + if (drain) { + { + ScopeUnlock unlock(mutex); + if (!DrainInternal()) + return; + + MultiSocketMonitor::InvalidateSockets(); + } + + drain = false; + cond.signal(); + return; + } + } + + if (must_prepare) { + must_prepare = false; + + int err = snd_pcm_prepare(pcm); + if (err < 0) + throw FormatRuntimeError("snd_pcm_prepare() failed: %s", + snd_strerror(-err)); + } + + CopyRingToPeriodBuffer(); + + if (period_buffer.IsEmpty()) + /* insert some silence if the buffer has not enough + data yet, to avoid ALSA xrun */ + period_buffer.FillWithSilence(silence, out_frame_size); + + auto frames_written = WriteFromPeriodBuffer(); + if (frames_written < 0) { + if (frames_written == -EAGAIN || frames_written == -EINTR) + /* try again in the next DispatchSockets() + call which is still scheduled */ + return; + + if (Recover(frames_written) < 0) + throw FormatRuntimeError("snd_pcm_writei() failed: %s", + snd_strerror(-frames_written)); + + /* recovered; try again in the next DispatchSockets() + call */ + return; + } +} catch (const std::runtime_error &) { + MultiSocketMonitor::Reset(); + + const std::lock_guard lock(mutex); + error = std::current_exception(); + cond.signal(); } typedef AudioOutputWrapper Wrapper;