output/alsa: non-blocking mode

Use SND_PCM_NONBLOCK, and perform all snd_pcm_writei() calls in the
IOThread.  Use a lockless queue to copy data from the OutputThread to
the IOThread.

This rather major change aims to improve MPD's internal latency.  All
waits are now under MPD's control, instead of blocking inside
libasound2.

As a side effect, an output's filter is now decoupled from the actual
device I/O, which solves a major latency problem with the conversion
filter on slow CPUs and small period buffers.  See:

 https://bugs.musicpd.org/view.php?id=3900
This commit is contained in:
Max Kellermann 2017-01-24 23:10:35 +01:00
parent 853740f1e2
commit b1c7649edb
3 changed files with 412 additions and 69 deletions

View File

@ -1413,6 +1413,7 @@ libmixer_plugins_a_CPPFLAGS = $(AM_CPPFLAGS) \
if ENABLE_ALSA
liboutput_plugins_a_SOURCES += \
$(ALSA_SOURCES) \
src/output/plugins/AlsaOutputPlugin.cxx \
src/output/plugins/AlsaOutputPlugin.hxx
libmixer_plugins_a_SOURCES += \

2
NEWS
View File

@ -2,6 +2,8 @@ ver 0.21 (not yet released)
* protocol
- "tagtypes" can be used to hide tags
- "find" and "search" can sort
* output
- alsa: non-blocking mode
ver 0.20.5 (not yet released)
* tags

View File

@ -19,6 +19,7 @@
#include "config.h"
#include "AlsaOutputPlugin.hxx"
#include "lib/alsa/NonBlock.hxx"
#include "../OutputAPI.hxx"
#include "../Wrapper.hxx"
#include "mixer/MixerList.hxx"
@ -28,10 +29,16 @@
#include "util/RuntimeError.hxx"
#include "util/Domain.hxx"
#include "util/ConstBuffer.hxx"
#include "event/MultiSocketMonitor.hxx"
#include "event/DeferredMonitor.hxx"
#include "event/Call.hxx"
#include "IOThread.hxx"
#include "Log.hxx"
#include <alsa/asoundlib.h>
#include <boost/lockfree/spsc_queue.hpp>
#include <string>
#if SND_LIB_VERSION >= 0x1001c
@ -50,7 +57,9 @@ static constexpr unsigned MPD_ALSA_BUFFER_TIME_US = 500000;
static constexpr unsigned MPD_ALSA_RETRY_NR = 5;
class AlsaOutput {
class AlsaOutput final
: MultiSocketMonitor, DeferredMonitor {
friend struct AudioOutputWrapper<AlsaOutput>;
AudioOutput base;
@ -100,9 +109,10 @@ class AlsaOutput {
snd_pcm_uframes_t period_frames;
/**
* The number of frames written in the current period.
* After Open(), has this output been activated by a Play()
* command?
*/
snd_pcm_uframes_t period_position;
bool active;
/**
* Do we need to call snd_pcm_prepare() before the next write?
@ -116,6 +126,8 @@ class AlsaOutput {
*/
bool must_prepare;
bool drain;
/**
* This buffer gets allocated after opening the ALSA device.
* It contains silence samples, enough to fill one period (see
@ -123,8 +135,137 @@ class AlsaOutput {
*/
uint8_t *silence;
/**
* For PrepareAlsaPcmSockets().
*/
ReusableArray<pollfd> pfd_buffer;
/**
* For copying data from OutputThread to IOThread.
*/
boost::lockfree::spsc_queue<uint8_t> *ring_buffer;
class PeriodBuffer {
size_t capacity, head, tail;
uint8_t *buffer;
public:
PeriodBuffer() = default;
PeriodBuffer(const PeriodBuffer &) = delete;
PeriodBuffer &operator=(const PeriodBuffer &) = delete;
void Allocate(size_t n_frames, size_t frame_size) {
capacity = n_frames * frame_size;
/* reserve space for one more (partial) frame,
to be able to fill the buffer with silence,
after moving an unfinished frame to the
end */
buffer = new uint8_t[capacity + frame_size - 1];
head = tail = 0;
}
void Free() {
delete[] buffer;
}
bool IsEmpty() const {
return head == tail;
}
bool IsFull() const {
return tail >= capacity;
}
uint8_t *GetTail() {
return buffer + tail;
}
size_t GetSpaceBytes() const {
assert(tail <= capacity);
return capacity - tail;
}
void AppendBytes(size_t n) {
assert(n <= capacity);
assert(tail <= capacity - n);
tail += n;
}
void FillWithSilence(const uint8_t *_silence,
const size_t frame_size) {
size_t partial_frame = tail % frame_size;
auto *dest = GetTail() - partial_frame;
/* move the partial frame to the end */
std::copy(dest, GetTail(), buffer + capacity);
size_t silence_size = capacity - tail - partial_frame;
std::copy_n(_silence, silence_size, dest);
tail = capacity + partial_frame;
}
const uint8_t *GetHead() const {
return buffer + head;
}
snd_pcm_uframes_t GetFrames(size_t frame_size) const {
return (tail - head) / frame_size;
}
void ConsumeBytes(size_t n) {
head += n;
assert(head <= capacity);
if (head >= capacity) {
tail -= head;
/* copy the partial frame (if any)
back to the beginning */
std::copy_n(GetHead(), tail, buffer);
head = 0;
}
}
void ConsumeFrames(snd_pcm_uframes_t n, size_t frame_size) {
ConsumeBytes(n * frame_size);
}
snd_pcm_uframes_t GetPeriodPosition(size_t frame_size) const {
return head / frame_size;
}
void Rewind() {
head = 0;
}
void Clear() {
head = tail = 0;
}
};
PeriodBuffer period_buffer;
/**
* Protects #cond, #error, #drain.
*/
mutable Mutex mutex;
/**
* Used to wait when #ring_buffer is full. It will be
* signalled each time data is popped from the #ring_buffer,
* making space for more data.
*/
Cond cond;
std::exception_ptr error;
public:
AlsaOutput(const ConfigBlock &block);
AlsaOutput(EventLoop &loop, const ConfigBlock &block);
~AlsaOutput() {
/* free libasound's config cache */
@ -145,7 +286,6 @@ public:
void Open(AudioFormat &audio_format);
void Close();
size_t PlayRaw(ConstBuffer<void> data);
size_t Play(const void *chunk, size_t size);
void Drain();
void Cancel();
@ -166,21 +306,102 @@ private:
void SetupOrDop(AudioFormat &audio_format, PcmExport::Params &params);
/**
* Activate the output by registering the sockets in the
* #EventLoop. Before calling this, filling the ring buffer
* has no effect; nothing will be played, and no code will be
* run on #EventLoop's thread.
*/
void Activate() {
if (active)
return;
active = true;
DeferredMonitor::Schedule();
}
/**
* Wrapper for Activate() which unlocks our mutex. Call this
* if you're holding the mutex.
*/
void UnlockActivate() {
if (active)
return;
const ScopeUnlock unlock(mutex);
Activate();
}
void ClearRingBuffer() {
std::array<uint8_t, 1024> buffer;
while (ring_buffer->pop(&buffer.front(), buffer.size())) {}
}
int Recover(int err);
/**
* Write silence to the ALSA device.
* Drain all buffers. To be run in #EventLoop's thread.
*
* @return true if draining is complete, false if this method
* needs to be called again later
*/
void WriteSilence(snd_pcm_uframes_t nframes) {
snd_pcm_writei(pcm, silence, nframes);
bool DrainInternal();
/**
* Stop playback immediately, dropping all buffers. To be run
* in #EventLoop's thread.
*/
void CancelInternal();
void CopyRingToPeriodBuffer() {
if (period_buffer.IsFull())
return;
size_t nbytes = ring_buffer->pop(period_buffer.GetTail(),
period_buffer.GetSpaceBytes());
if (nbytes == 0)
return;
period_buffer.AppendBytes(nbytes);
const std::lock_guard<Mutex> lock(mutex);
/* notify the OutputThread that there is now
room in ring_buffer */
cond.signal();
}
snd_pcm_sframes_t WriteFromPeriodBuffer() {
assert(!period_buffer.IsEmpty());
auto frames_written = snd_pcm_writei(pcm, period_buffer.GetHead(),
period_buffer.GetFrames(out_frame_size));
if (frames_written > 0)
period_buffer.ConsumeFrames(frames_written,
out_frame_size);
return frames_written;
}
bool LockHasError() const {
const std::lock_guard<Mutex> lock(mutex);
return !!error;
}
/* virtual methods from class DeferredMonitor */
virtual void RunDeferred() override {
InvalidateSockets();
}
/* virtual methods from class MultiSocketMonitor */
virtual std::chrono::steady_clock::duration PrepareSockets() override;
virtual void DispatchSockets() override;
};
static constexpr Domain alsa_output_domain("alsa_output");
AlsaOutput::AlsaOutput(const ConfigBlock &block)
:base(alsa_output_plugin, block),
AlsaOutput::AlsaOutput(EventLoop &loop, const ConfigBlock &block)
:MultiSocketMonitor(loop), DeferredMonitor(loop),
base(alsa_output_plugin, block),
device(block.GetBlockValue("device", "")),
#ifdef ENABLE_DSD
dop(block.GetBlockValue("dop", false) ||
@ -210,7 +431,7 @@ AlsaOutput::AlsaOutput(const ConfigBlock &block)
inline AlsaOutput *
AlsaOutput::Create(EventLoop &, const ConfigBlock &block)
{
return new AlsaOutput(block);
return new AlsaOutput(io_thread_get(), block);
}
inline void
@ -692,7 +913,6 @@ AlsaOutput::Setup(AudioFormat &audio_format,
alsa_period_size = 1;
period_frames = alsa_period_size;
period_position = 0;
silence = new uint8_t[snd_pcm_frames_to_bytes(pcm, alsa_period_size)];
snd_pcm_format_set_silence(format, silence,
@ -793,6 +1013,8 @@ AlsaOutput::Open(AudioFormat &audio_format)
GetDevice()));
}
snd_pcm_nonblock(pcm, 1);
#ifdef ENABLE_DSD
if (params.dop)
FormatDebug(alsa_output_domain, "DoP (DSD over PCM) enabled");
@ -805,6 +1027,17 @@ AlsaOutput::Open(AudioFormat &audio_format)
in_frame_size = audio_format.GetFrameSize();
out_frame_size = pcm_export->GetFrameSize(audio_format);
drain = false;
size_t period_size = period_frames * out_frame_size;
ring_buffer = new boost::lockfree::spsc_queue<uint8_t>(period_size * 4);
/* reserve space for one more (partial) frame, to be able to
fill the buffer with silence, after moving an unfinished
frame to the end */
period_buffer.Allocate(period_frames, out_frame_size);
active = false;
must_prepare = false;
}
@ -836,7 +1069,7 @@ AlsaOutput::Recover(int err)
case SND_PCM_STATE_OPEN:
case SND_PCM_STATE_SETUP:
case SND_PCM_STATE_XRUN:
period_position = 0;
period_buffer.Rewind();
err = snd_pcm_prepare(pcm);
break;
case SND_PCM_STATE_DISCONNECTED:
@ -852,88 +1085,107 @@ AlsaOutput::Recover(int err)
return err;
}
inline void
AlsaOutput::Drain()
inline bool
AlsaOutput::DrainInternal()
{
if (snd_pcm_state(pcm) != SND_PCM_STATE_RUNNING)
return;
if (period_position > 0) {
/* generate some silence to finish the partial
period */
snd_pcm_uframes_t nframes =
period_frames - period_position;
WriteSilence(nframes);
if (snd_pcm_state(pcm) != SND_PCM_STATE_RUNNING) {
CancelInternal();
return true;
}
snd_pcm_drain(pcm);
/* drain ring_buffer */
CopyRingToPeriodBuffer();
period_position = 0;
auto period_position = period_buffer.GetPeriodPosition(out_frame_size);
if (period_position > 0)
/* generate some silence to finish the partial
period */
period_buffer.FillWithSilence(silence, out_frame_size);
/* drain period_buffer */
if (!period_buffer.IsEmpty()) {
auto frames_written = WriteFromPeriodBuffer();
if (frames_written < 0 && errno != EAGAIN) {
CancelInternal();
return true;
}
if (!period_buffer.IsEmpty())
/* need to call WriteFromPeriodBuffer() again
in the next iteration, so don't finish the
drain just yet */
return false;
}
/* .. and finally drain the ALSA hardware buffer */
return snd_pcm_drain(pcm) != -EAGAIN;
}
inline void
AlsaOutput::Cancel()
AlsaOutput::Drain()
{
const std::lock_guard<Mutex> lock(mutex);
drain = true;
UnlockActivate();
while (drain && !error)
cond.wait(mutex);
}
inline void
AlsaOutput::CancelInternal()
{
period_position = 0;
must_prepare = true;
snd_pcm_drop(pcm);
pcm_export->Reset();
period_buffer.Clear();
ClearRingBuffer();
}
inline void
AlsaOutput::Cancel()
{
if (!active) {
/* early cancel, quick code path without thread
synchronization */
pcm_export->Reset();
assert(period_buffer.IsEmpty());
ClearRingBuffer();
return;
}
BlockingCall(MultiSocketMonitor::GetEventLoop(), [this](){
CancelInternal();
});
}
inline void
AlsaOutput::Close()
{
/* make sure the I/O thread isn't inside DispatchSockets() */
BlockingCall(MultiSocketMonitor::GetEventLoop(), [this](){
MultiSocketMonitor::Reset();
DeferredMonitor::Cancel();
});
period_buffer.Free();
delete ring_buffer;
snd_pcm_close(pcm);
delete[] silence;
}
inline size_t
AlsaOutput::PlayRaw(ConstBuffer<void> data)
{
if (data.IsEmpty())
return 0;
assert(data.size % out_frame_size == 0);
const size_t n_frames = data.size / out_frame_size;
assert(n_frames > 0);
while (true) {
const auto frames_written = snd_pcm_writei(pcm, data.data,
n_frames);
if (frames_written > 0) {
period_position = (period_position + frames_written)
% period_frames;
return frames_written * out_frame_size;
}
if (frames_written < 0 && frames_written != -EAGAIN &&
frames_written != -EINTR &&
Recover(frames_written) < 0)
throw FormatRuntimeError("snd_pcm_writei() failed: %s",
snd_strerror(-frames_written));
}
}
inline size_t
AlsaOutput::Play(const void *chunk, size_t size)
{
assert(size > 0);
assert(size % in_frame_size == 0);
if (must_prepare) {
must_prepare = false;
int err = snd_pcm_prepare(pcm);
if (err < 0)
throw FormatRuntimeError("snd_pcm_prepare() failed: %s",
snd_strerror(-err));
}
const auto e = pcm_export->Export({chunk, size});
if (e.size == 0)
/* the DoP (DSD over PCM) filter converts two frames
@ -944,8 +1196,96 @@ AlsaOutput::Play(const void *chunk, size_t size)
been played */
return size;
const size_t bytes_written = PlayRaw(e);
const std::lock_guard<Mutex> lock(mutex);
while (true) {
if (error)
std::rethrow_exception(error);
size_t bytes_written = ring_buffer->push((const uint8_t *)chunk,
size);
if (bytes_written > 0)
return pcm_export->CalcSourceSize(bytes_written);
/* now that the ring_buffer is full, we can activate
the socket handlers to trigger the first
snd_pcm_writei() */
UnlockActivate();
/* wait for the DispatchSockets() to make room in the
ring_buffer */
cond.wait(mutex);
}
}
std::chrono::steady_clock::duration
AlsaOutput::PrepareSockets()
{
if (LockHasError()) {
ClearSocketList();
return std::chrono::steady_clock::duration(-1);
}
return PrepareAlsaPcmSockets(*this, pcm, pfd_buffer);
}
void
AlsaOutput::DispatchSockets()
try {
{
const std::lock_guard<Mutex> lock(mutex);
if (drain) {
{
ScopeUnlock unlock(mutex);
if (!DrainInternal())
return;
MultiSocketMonitor::InvalidateSockets();
}
drain = false;
cond.signal();
return;
}
}
if (must_prepare) {
must_prepare = false;
int err = snd_pcm_prepare(pcm);
if (err < 0)
throw FormatRuntimeError("snd_pcm_prepare() failed: %s",
snd_strerror(-err));
}
CopyRingToPeriodBuffer();
if (period_buffer.IsEmpty())
/* insert some silence if the buffer has not enough
data yet, to avoid ALSA xrun */
period_buffer.FillWithSilence(silence, out_frame_size);
auto frames_written = WriteFromPeriodBuffer();
if (frames_written < 0) {
if (frames_written == -EAGAIN || frames_written == -EINTR)
/* try again in the next DispatchSockets()
call which is still scheduled */
return;
if (Recover(frames_written) < 0)
throw FormatRuntimeError("snd_pcm_writei() failed: %s",
snd_strerror(-frames_written));
/* recovered; try again in the next DispatchSockets()
call */
return;
}
} catch (const std::runtime_error &) {
MultiSocketMonitor::Reset();
const std::lock_guard<Mutex> lock(mutex);
error = std::current_exception();
cond.signal();
}
typedef AudioOutputWrapper<AlsaOutput> Wrapper;