From b1c7649edb80285c113f4710dd4c19d8f7c1aef7 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Tue, 24 Jan 2017 23:10:35 +0100 Subject: [PATCH] output/alsa: non-blocking mode Use SND_PCM_NONBLOCK, and perform all snd_pcm_writei() calls in the IOThread. Use a lockless queue to copy data from the OutputThread to the IOThread. This rather major change aims to improve MPD's internal latency. All waits are now under MPD's control, instead of blocking inside libasound2. As a side effect, an output's filter is now decoupled from the actual device I/O, which solves a major latency problem with the conversion filter on slow CPUs and small period buffers. See: https://bugs.musicpd.org/view.php?id=3900 --- Makefile.am | 1 + NEWS | 2 + src/output/plugins/AlsaOutputPlugin.cxx | 478 ++++++++++++++++++++---- 3 files changed, 412 insertions(+), 69 deletions(-) diff --git a/Makefile.am b/Makefile.am index 3df7fa2f4..0387d0ddd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1413,6 +1413,7 @@ libmixer_plugins_a_CPPFLAGS = $(AM_CPPFLAGS) \ if ENABLE_ALSA liboutput_plugins_a_SOURCES += \ + $(ALSA_SOURCES) \ src/output/plugins/AlsaOutputPlugin.cxx \ src/output/plugins/AlsaOutputPlugin.hxx libmixer_plugins_a_SOURCES += \ diff --git a/NEWS b/NEWS index a5bad06ad..a412ac5a2 100644 --- a/NEWS +++ b/NEWS @@ -2,6 +2,8 @@ ver 0.21 (not yet released) * protocol - "tagtypes" can be used to hide tags - "find" and "search" can sort +* output + - alsa: non-blocking mode ver 0.20.5 (not yet released) * tags diff --git a/src/output/plugins/AlsaOutputPlugin.cxx b/src/output/plugins/AlsaOutputPlugin.cxx index 74961a224..1cb364c22 100644 --- a/src/output/plugins/AlsaOutputPlugin.cxx +++ b/src/output/plugins/AlsaOutputPlugin.cxx @@ -19,6 +19,7 @@ #include "config.h" #include "AlsaOutputPlugin.hxx" +#include "lib/alsa/NonBlock.hxx" #include "../OutputAPI.hxx" #include "../Wrapper.hxx" #include "mixer/MixerList.hxx" @@ -28,10 +29,16 @@ #include "util/RuntimeError.hxx" #include "util/Domain.hxx" #include "util/ConstBuffer.hxx" +#include "event/MultiSocketMonitor.hxx" +#include "event/DeferredMonitor.hxx" +#include "event/Call.hxx" +#include "IOThread.hxx" #include "Log.hxx" #include +#include + #include #if SND_LIB_VERSION >= 0x1001c @@ -50,7 +57,9 @@ static constexpr unsigned MPD_ALSA_BUFFER_TIME_US = 500000; static constexpr unsigned MPD_ALSA_RETRY_NR = 5; -class AlsaOutput { +class AlsaOutput final + : MultiSocketMonitor, DeferredMonitor { + friend struct AudioOutputWrapper; AudioOutput base; @@ -100,9 +109,10 @@ class AlsaOutput { snd_pcm_uframes_t period_frames; /** - * The number of frames written in the current period. + * After Open(), has this output been activated by a Play() + * command? */ - snd_pcm_uframes_t period_position; + bool active; /** * Do we need to call snd_pcm_prepare() before the next write? @@ -116,6 +126,8 @@ class AlsaOutput { */ bool must_prepare; + bool drain; + /** * This buffer gets allocated after opening the ALSA device. * It contains silence samples, enough to fill one period (see @@ -123,8 +135,137 @@ class AlsaOutput { */ uint8_t *silence; + /** + * For PrepareAlsaPcmSockets(). + */ + ReusableArray pfd_buffer; + + /** + * For copying data from OutputThread to IOThread. + */ + boost::lockfree::spsc_queue *ring_buffer; + + class PeriodBuffer { + size_t capacity, head, tail; + + uint8_t *buffer; + + public: + PeriodBuffer() = default; + PeriodBuffer(const PeriodBuffer &) = delete; + PeriodBuffer &operator=(const PeriodBuffer &) = delete; + + void Allocate(size_t n_frames, size_t frame_size) { + capacity = n_frames * frame_size; + + /* reserve space for one more (partial) frame, + to be able to fill the buffer with silence, + after moving an unfinished frame to the + end */ + buffer = new uint8_t[capacity + frame_size - 1]; + head = tail = 0; + } + + void Free() { + delete[] buffer; + } + + bool IsEmpty() const { + return head == tail; + } + + bool IsFull() const { + return tail >= capacity; + } + + uint8_t *GetTail() { + return buffer + tail; + } + + size_t GetSpaceBytes() const { + assert(tail <= capacity); + + return capacity - tail; + } + + void AppendBytes(size_t n) { + assert(n <= capacity); + assert(tail <= capacity - n); + + tail += n; + } + + void FillWithSilence(const uint8_t *_silence, + const size_t frame_size) { + size_t partial_frame = tail % frame_size; + auto *dest = GetTail() - partial_frame; + + /* move the partial frame to the end */ + std::copy(dest, GetTail(), buffer + capacity); + + size_t silence_size = capacity - tail - partial_frame; + std::copy_n(_silence, silence_size, dest); + + tail = capacity + partial_frame; + } + + const uint8_t *GetHead() const { + return buffer + head; + } + + snd_pcm_uframes_t GetFrames(size_t frame_size) const { + return (tail - head) / frame_size; + } + + void ConsumeBytes(size_t n) { + head += n; + + assert(head <= capacity); + + if (head >= capacity) { + tail -= head; + /* copy the partial frame (if any) + back to the beginning */ + std::copy_n(GetHead(), tail, buffer); + head = 0; + } + } + + void ConsumeFrames(snd_pcm_uframes_t n, size_t frame_size) { + ConsumeBytes(n * frame_size); + } + + snd_pcm_uframes_t GetPeriodPosition(size_t frame_size) const { + return head / frame_size; + } + + void Rewind() { + head = 0; + } + + void Clear() { + head = tail = 0; + } + }; + + PeriodBuffer period_buffer; + + /** + * Protects #cond, #error, #drain. + */ + mutable Mutex mutex; + + /** + * Used to wait when #ring_buffer is full. It will be + * signalled each time data is popped from the #ring_buffer, + * making space for more data. + */ + Cond cond; + + std::exception_ptr error; + public: - AlsaOutput(const ConfigBlock &block); + AlsaOutput(EventLoop &loop, const ConfigBlock &block); ~AlsaOutput() { /* free libasound's config cache */ @@ -145,7 +286,6 @@ public: void Open(AudioFormat &audio_format); void Close(); - size_t PlayRaw(ConstBuffer data); size_t Play(const void *chunk, size_t size); void Drain(); void Cancel(); @@ -166,21 +306,102 @@ private: void SetupOrDop(AudioFormat &audio_format, PcmExport::Params ¶ms); + /** + * Activate the output by registering the sockets in the + * #EventLoop. Before calling this, filling the ring buffer + * has no effect; nothing will be played, and no code will be + * run on #EventLoop's thread. + */ + void Activate() { + if (active) + return; + + active = true; + DeferredMonitor::Schedule(); + } + + /** + * Wrapper for Activate() which unlocks our mutex. Call this + * if you're holding the mutex. + */ + void UnlockActivate() { + if (active) + return; + + const ScopeUnlock unlock(mutex); + Activate(); + } + + void ClearRingBuffer() { + std::array buffer; + while (ring_buffer->pop(&buffer.front(), buffer.size())) {} + } + int Recover(int err); /** - * Write silence to the ALSA device. + * Drain all buffers. To be run in #EventLoop's thread. + * + * @return true if draining is complete, false if this method + * needs to be called again later */ - void WriteSilence(snd_pcm_uframes_t nframes) { - snd_pcm_writei(pcm, silence, nframes); + bool DrainInternal(); + + /** + * Stop playback immediately, dropping all buffers. To be run + * in #EventLoop's thread. + */ + void CancelInternal(); + + void CopyRingToPeriodBuffer() { + if (period_buffer.IsFull()) + return; + + size_t nbytes = ring_buffer->pop(period_buffer.GetTail(), + period_buffer.GetSpaceBytes()); + if (nbytes == 0) + return; + + period_buffer.AppendBytes(nbytes); + + const std::lock_guard lock(mutex); + /* notify the OutputThread that there is now + room in ring_buffer */ + cond.signal(); } + snd_pcm_sframes_t WriteFromPeriodBuffer() { + assert(!period_buffer.IsEmpty()); + + auto frames_written = snd_pcm_writei(pcm, period_buffer.GetHead(), + period_buffer.GetFrames(out_frame_size)); + if (frames_written > 0) + period_buffer.ConsumeFrames(frames_written, + out_frame_size); + + return frames_written; + } + + bool LockHasError() const { + const std::lock_guard lock(mutex); + return !!error; + } + + /* virtual methods from class DeferredMonitor */ + virtual void RunDeferred() override { + InvalidateSockets(); + } + + /* virtual methods from class MultiSocketMonitor */ + virtual std::chrono::steady_clock::duration PrepareSockets() override; + virtual void DispatchSockets() override; }; static constexpr Domain alsa_output_domain("alsa_output"); -AlsaOutput::AlsaOutput(const ConfigBlock &block) - :base(alsa_output_plugin, block), +AlsaOutput::AlsaOutput(EventLoop &loop, const ConfigBlock &block) + :MultiSocketMonitor(loop), DeferredMonitor(loop), + base(alsa_output_plugin, block), device(block.GetBlockValue("device", "")), #ifdef ENABLE_DSD dop(block.GetBlockValue("dop", false) || @@ -210,7 +431,7 @@ AlsaOutput::AlsaOutput(const ConfigBlock &block) inline AlsaOutput * AlsaOutput::Create(EventLoop &, const ConfigBlock &block) { - return new AlsaOutput(block); + return new AlsaOutput(io_thread_get(), block); } inline void @@ -692,7 +913,6 @@ AlsaOutput::Setup(AudioFormat &audio_format, alsa_period_size = 1; period_frames = alsa_period_size; - period_position = 0; silence = new uint8_t[snd_pcm_frames_to_bytes(pcm, alsa_period_size)]; snd_pcm_format_set_silence(format, silence, @@ -793,6 +1013,8 @@ AlsaOutput::Open(AudioFormat &audio_format) GetDevice())); } + snd_pcm_nonblock(pcm, 1); + #ifdef ENABLE_DSD if (params.dop) FormatDebug(alsa_output_domain, "DoP (DSD over PCM) enabled"); @@ -805,6 +1027,17 @@ AlsaOutput::Open(AudioFormat &audio_format) in_frame_size = audio_format.GetFrameSize(); out_frame_size = pcm_export->GetFrameSize(audio_format); + drain = false; + + size_t period_size = period_frames * out_frame_size; + ring_buffer = new boost::lockfree::spsc_queue(period_size * 4); + + /* reserve space for one more (partial) frame, to be able to + fill the buffer with silence, after moving an unfinished + frame to the end */ + period_buffer.Allocate(period_frames, out_frame_size); + + active = false; must_prepare = false; } @@ -836,7 +1069,7 @@ AlsaOutput::Recover(int err) case SND_PCM_STATE_OPEN: case SND_PCM_STATE_SETUP: case SND_PCM_STATE_XRUN: - period_position = 0; + period_buffer.Rewind(); err = snd_pcm_prepare(pcm); break; case SND_PCM_STATE_DISCONNECTED: @@ -852,88 +1085,107 @@ AlsaOutput::Recover(int err) return err; } -inline void -AlsaOutput::Drain() +inline bool +AlsaOutput::DrainInternal() { - if (snd_pcm_state(pcm) != SND_PCM_STATE_RUNNING) - return; - - if (period_position > 0) { - /* generate some silence to finish the partial - period */ - snd_pcm_uframes_t nframes = - period_frames - period_position; - WriteSilence(nframes); + if (snd_pcm_state(pcm) != SND_PCM_STATE_RUNNING) { + CancelInternal(); + return true; } - snd_pcm_drain(pcm); + /* drain ring_buffer */ + CopyRingToPeriodBuffer(); - period_position = 0; + auto period_position = period_buffer.GetPeriodPosition(out_frame_size); + if (period_position > 0) + /* generate some silence to finish the partial + period */ + period_buffer.FillWithSilence(silence, out_frame_size); + + /* drain period_buffer */ + if (!period_buffer.IsEmpty()) { + auto frames_written = WriteFromPeriodBuffer(); + if (frames_written < 0 && errno != EAGAIN) { + CancelInternal(); + return true; + } + + if (!period_buffer.IsEmpty()) + /* need to call WriteFromPeriodBuffer() again + in the next iteration, so don't finish the + drain just yet */ + return false; + } + + /* .. and finally drain the ALSA hardware buffer */ + return snd_pcm_drain(pcm) != -EAGAIN; } inline void -AlsaOutput::Cancel() +AlsaOutput::Drain() +{ + const std::lock_guard lock(mutex); + + drain = true; + + UnlockActivate(); + + while (drain && !error) + cond.wait(mutex); +} + +inline void +AlsaOutput::CancelInternal() { - period_position = 0; must_prepare = true; snd_pcm_drop(pcm); pcm_export->Reset(); + period_buffer.Clear(); + ClearRingBuffer(); +} + +inline void +AlsaOutput::Cancel() +{ + if (!active) { + /* early cancel, quick code path without thread + synchronization */ + + pcm_export->Reset(); + assert(period_buffer.IsEmpty()); + ClearRingBuffer(); + + return; + } + + BlockingCall(MultiSocketMonitor::GetEventLoop(), [this](){ + CancelInternal(); + }); } inline void AlsaOutput::Close() { + /* make sure the I/O thread isn't inside DispatchSockets() */ + BlockingCall(MultiSocketMonitor::GetEventLoop(), [this](){ + MultiSocketMonitor::Reset(); + DeferredMonitor::Cancel(); + }); + + period_buffer.Free(); + delete ring_buffer; snd_pcm_close(pcm); delete[] silence; } -inline size_t -AlsaOutput::PlayRaw(ConstBuffer data) -{ - if (data.IsEmpty()) - return 0; - - assert(data.size % out_frame_size == 0); - - const size_t n_frames = data.size / out_frame_size; - assert(n_frames > 0); - - while (true) { - const auto frames_written = snd_pcm_writei(pcm, data.data, - n_frames); - if (frames_written > 0) { - period_position = (period_position + frames_written) - % period_frames; - - return frames_written * out_frame_size; - } - - if (frames_written < 0 && frames_written != -EAGAIN && - frames_written != -EINTR && - Recover(frames_written) < 0) - throw FormatRuntimeError("snd_pcm_writei() failed: %s", - snd_strerror(-frames_written)); - } - -} - inline size_t AlsaOutput::Play(const void *chunk, size_t size) { assert(size > 0); assert(size % in_frame_size == 0); - if (must_prepare) { - must_prepare = false; - - int err = snd_pcm_prepare(pcm); - if (err < 0) - throw FormatRuntimeError("snd_pcm_prepare() failed: %s", - snd_strerror(-err)); - } - const auto e = pcm_export->Export({chunk, size}); if (e.size == 0) /* the DoP (DSD over PCM) filter converts two frames @@ -944,8 +1196,96 @@ AlsaOutput::Play(const void *chunk, size_t size) been played */ return size; - const size_t bytes_written = PlayRaw(e); - return pcm_export->CalcSourceSize(bytes_written); + const std::lock_guard lock(mutex); + + while (true) { + if (error) + std::rethrow_exception(error); + + size_t bytes_written = ring_buffer->push((const uint8_t *)chunk, + size); + if (bytes_written > 0) + return pcm_export->CalcSourceSize(bytes_written); + + /* now that the ring_buffer is full, we can activate + the socket handlers to trigger the first + snd_pcm_writei() */ + UnlockActivate(); + + /* wait for the DispatchSockets() to make room in the + ring_buffer */ + cond.wait(mutex); + } +} + +std::chrono::steady_clock::duration +AlsaOutput::PrepareSockets() +{ + if (LockHasError()) { + ClearSocketList(); + return std::chrono::steady_clock::duration(-1); + } + + return PrepareAlsaPcmSockets(*this, pcm, pfd_buffer); +} + +void +AlsaOutput::DispatchSockets() +try { + { + const std::lock_guard lock(mutex); + if (drain) { + { + ScopeUnlock unlock(mutex); + if (!DrainInternal()) + return; + + MultiSocketMonitor::InvalidateSockets(); + } + + drain = false; + cond.signal(); + return; + } + } + + if (must_prepare) { + must_prepare = false; + + int err = snd_pcm_prepare(pcm); + if (err < 0) + throw FormatRuntimeError("snd_pcm_prepare() failed: %s", + snd_strerror(-err)); + } + + CopyRingToPeriodBuffer(); + + if (period_buffer.IsEmpty()) + /* insert some silence if the buffer has not enough + data yet, to avoid ALSA xrun */ + period_buffer.FillWithSilence(silence, out_frame_size); + + auto frames_written = WriteFromPeriodBuffer(); + if (frames_written < 0) { + if (frames_written == -EAGAIN || frames_written == -EINTR) + /* try again in the next DispatchSockets() + call which is still scheduled */ + return; + + if (Recover(frames_written) < 0) + throw FormatRuntimeError("snd_pcm_writei() failed: %s", + snd_strerror(-frames_written)); + + /* recovered; try again in the next DispatchSockets() + call */ + return; + } +} catch (const std::runtime_error &) { + MultiSocketMonitor::Reset(); + + const std::lock_guard lock(mutex); + error = std::current_exception(); + cond.signal(); } typedef AudioOutputWrapper Wrapper;