output/alsa: non-blocking mode
Use SND_PCM_NONBLOCK, and perform all snd_pcm_writei() calls in the IOThread. Use a lockless queue to copy data from the OutputThread to the IOThread. This rather major change aims to improve MPD's internal latency. All waits are now under MPD's control, instead of blocking inside libasound2. As a side effect, an output's filter is now decoupled from the actual device I/O, which solves a major latency problem with the conversion filter on slow CPUs and small period buffers. See: https://bugs.musicpd.org/view.php?id=3900
This commit is contained in:
parent
853740f1e2
commit
b1c7649edb
@ -1413,6 +1413,7 @@ libmixer_plugins_a_CPPFLAGS = $(AM_CPPFLAGS) \
|
||||
|
||||
if ENABLE_ALSA
|
||||
liboutput_plugins_a_SOURCES += \
|
||||
$(ALSA_SOURCES) \
|
||||
src/output/plugins/AlsaOutputPlugin.cxx \
|
||||
src/output/plugins/AlsaOutputPlugin.hxx
|
||||
libmixer_plugins_a_SOURCES += \
|
||||
|
2
NEWS
2
NEWS
@ -2,6 +2,8 @@ ver 0.21 (not yet released)
|
||||
* protocol
|
||||
- "tagtypes" can be used to hide tags
|
||||
- "find" and "search" can sort
|
||||
* output
|
||||
- alsa: non-blocking mode
|
||||
|
||||
ver 0.20.5 (not yet released)
|
||||
* tags
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
#include "config.h"
|
||||
#include "AlsaOutputPlugin.hxx"
|
||||
#include "lib/alsa/NonBlock.hxx"
|
||||
#include "../OutputAPI.hxx"
|
||||
#include "../Wrapper.hxx"
|
||||
#include "mixer/MixerList.hxx"
|
||||
@ -28,10 +29,16 @@
|
||||
#include "util/RuntimeError.hxx"
|
||||
#include "util/Domain.hxx"
|
||||
#include "util/ConstBuffer.hxx"
|
||||
#include "event/MultiSocketMonitor.hxx"
|
||||
#include "event/DeferredMonitor.hxx"
|
||||
#include "event/Call.hxx"
|
||||
#include "IOThread.hxx"
|
||||
#include "Log.hxx"
|
||||
|
||||
#include <alsa/asoundlib.h>
|
||||
|
||||
#include <boost/lockfree/spsc_queue.hpp>
|
||||
|
||||
#include <string>
|
||||
|
||||
#if SND_LIB_VERSION >= 0x1001c
|
||||
@ -50,7 +57,9 @@ static constexpr unsigned MPD_ALSA_BUFFER_TIME_US = 500000;
|
||||
|
||||
static constexpr unsigned MPD_ALSA_RETRY_NR = 5;
|
||||
|
||||
class AlsaOutput {
|
||||
class AlsaOutput final
|
||||
: MultiSocketMonitor, DeferredMonitor {
|
||||
|
||||
friend struct AudioOutputWrapper<AlsaOutput>;
|
||||
|
||||
AudioOutput base;
|
||||
@ -100,9 +109,10 @@ class AlsaOutput {
|
||||
snd_pcm_uframes_t period_frames;
|
||||
|
||||
/**
|
||||
* The number of frames written in the current period.
|
||||
* After Open(), has this output been activated by a Play()
|
||||
* command?
|
||||
*/
|
||||
snd_pcm_uframes_t period_position;
|
||||
bool active;
|
||||
|
||||
/**
|
||||
* Do we need to call snd_pcm_prepare() before the next write?
|
||||
@ -116,6 +126,8 @@ class AlsaOutput {
|
||||
*/
|
||||
bool must_prepare;
|
||||
|
||||
bool drain;
|
||||
|
||||
/**
|
||||
* This buffer gets allocated after opening the ALSA device.
|
||||
* It contains silence samples, enough to fill one period (see
|
||||
@ -123,8 +135,137 @@ class AlsaOutput {
|
||||
*/
|
||||
uint8_t *silence;
|
||||
|
||||
/**
|
||||
* For PrepareAlsaPcmSockets().
|
||||
*/
|
||||
ReusableArray<pollfd> pfd_buffer;
|
||||
|
||||
/**
|
||||
* For copying data from OutputThread to IOThread.
|
||||
*/
|
||||
boost::lockfree::spsc_queue<uint8_t> *ring_buffer;
|
||||
|
||||
class PeriodBuffer {
|
||||
size_t capacity, head, tail;
|
||||
|
||||
uint8_t *buffer;
|
||||
|
||||
public:
|
||||
PeriodBuffer() = default;
|
||||
PeriodBuffer(const PeriodBuffer &) = delete;
|
||||
PeriodBuffer &operator=(const PeriodBuffer &) = delete;
|
||||
|
||||
void Allocate(size_t n_frames, size_t frame_size) {
|
||||
capacity = n_frames * frame_size;
|
||||
|
||||
/* reserve space for one more (partial) frame,
|
||||
to be able to fill the buffer with silence,
|
||||
after moving an unfinished frame to the
|
||||
end */
|
||||
buffer = new uint8_t[capacity + frame_size - 1];
|
||||
head = tail = 0;
|
||||
}
|
||||
|
||||
void Free() {
|
||||
delete[] buffer;
|
||||
}
|
||||
|
||||
bool IsEmpty() const {
|
||||
return head == tail;
|
||||
}
|
||||
|
||||
bool IsFull() const {
|
||||
return tail >= capacity;
|
||||
}
|
||||
|
||||
uint8_t *GetTail() {
|
||||
return buffer + tail;
|
||||
}
|
||||
|
||||
size_t GetSpaceBytes() const {
|
||||
assert(tail <= capacity);
|
||||
|
||||
return capacity - tail;
|
||||
}
|
||||
|
||||
void AppendBytes(size_t n) {
|
||||
assert(n <= capacity);
|
||||
assert(tail <= capacity - n);
|
||||
|
||||
tail += n;
|
||||
}
|
||||
|
||||
void FillWithSilence(const uint8_t *_silence,
|
||||
const size_t frame_size) {
|
||||
size_t partial_frame = tail % frame_size;
|
||||
auto *dest = GetTail() - partial_frame;
|
||||
|
||||
/* move the partial frame to the end */
|
||||
std::copy(dest, GetTail(), buffer + capacity);
|
||||
|
||||
size_t silence_size = capacity - tail - partial_frame;
|
||||
std::copy_n(_silence, silence_size, dest);
|
||||
|
||||
tail = capacity + partial_frame;
|
||||
}
|
||||
|
||||
const uint8_t *GetHead() const {
|
||||
return buffer + head;
|
||||
}
|
||||
|
||||
snd_pcm_uframes_t GetFrames(size_t frame_size) const {
|
||||
return (tail - head) / frame_size;
|
||||
}
|
||||
|
||||
void ConsumeBytes(size_t n) {
|
||||
head += n;
|
||||
|
||||
assert(head <= capacity);
|
||||
|
||||
if (head >= capacity) {
|
||||
tail -= head;
|
||||
/* copy the partial frame (if any)
|
||||
back to the beginning */
|
||||
std::copy_n(GetHead(), tail, buffer);
|
||||
head = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void ConsumeFrames(snd_pcm_uframes_t n, size_t frame_size) {
|
||||
ConsumeBytes(n * frame_size);
|
||||
}
|
||||
|
||||
snd_pcm_uframes_t GetPeriodPosition(size_t frame_size) const {
|
||||
return head / frame_size;
|
||||
}
|
||||
|
||||
void Rewind() {
|
||||
head = 0;
|
||||
}
|
||||
|
||||
void Clear() {
|
||||
head = tail = 0;
|
||||
}
|
||||
};
|
||||
|
||||
PeriodBuffer period_buffer;
|
||||
|
||||
/**
|
||||
* Protects #cond, #error, #drain.
|
||||
*/
|
||||
mutable Mutex mutex;
|
||||
|
||||
/**
|
||||
* Used to wait when #ring_buffer is full. It will be
|
||||
* signalled each time data is popped from the #ring_buffer,
|
||||
* making space for more data.
|
||||
*/
|
||||
Cond cond;
|
||||
|
||||
std::exception_ptr error;
|
||||
|
||||
public:
|
||||
AlsaOutput(const ConfigBlock &block);
|
||||
AlsaOutput(EventLoop &loop, const ConfigBlock &block);
|
||||
|
||||
~AlsaOutput() {
|
||||
/* free libasound's config cache */
|
||||
@ -145,7 +286,6 @@ public:
|
||||
void Open(AudioFormat &audio_format);
|
||||
void Close();
|
||||
|
||||
size_t PlayRaw(ConstBuffer<void> data);
|
||||
size_t Play(const void *chunk, size_t size);
|
||||
void Drain();
|
||||
void Cancel();
|
||||
@ -166,21 +306,102 @@ private:
|
||||
|
||||
void SetupOrDop(AudioFormat &audio_format, PcmExport::Params ¶ms);
|
||||
|
||||
/**
|
||||
* Activate the output by registering the sockets in the
|
||||
* #EventLoop. Before calling this, filling the ring buffer
|
||||
* has no effect; nothing will be played, and no code will be
|
||||
* run on #EventLoop's thread.
|
||||
*/
|
||||
void Activate() {
|
||||
if (active)
|
||||
return;
|
||||
|
||||
active = true;
|
||||
DeferredMonitor::Schedule();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for Activate() which unlocks our mutex. Call this
|
||||
* if you're holding the mutex.
|
||||
*/
|
||||
void UnlockActivate() {
|
||||
if (active)
|
||||
return;
|
||||
|
||||
const ScopeUnlock unlock(mutex);
|
||||
Activate();
|
||||
}
|
||||
|
||||
void ClearRingBuffer() {
|
||||
std::array<uint8_t, 1024> buffer;
|
||||
while (ring_buffer->pop(&buffer.front(), buffer.size())) {}
|
||||
}
|
||||
|
||||
int Recover(int err);
|
||||
|
||||
/**
|
||||
* Write silence to the ALSA device.
|
||||
* Drain all buffers. To be run in #EventLoop's thread.
|
||||
*
|
||||
* @return true if draining is complete, false if this method
|
||||
* needs to be called again later
|
||||
*/
|
||||
void WriteSilence(snd_pcm_uframes_t nframes) {
|
||||
snd_pcm_writei(pcm, silence, nframes);
|
||||
bool DrainInternal();
|
||||
|
||||
/**
|
||||
* Stop playback immediately, dropping all buffers. To be run
|
||||
* in #EventLoop's thread.
|
||||
*/
|
||||
void CancelInternal();
|
||||
|
||||
void CopyRingToPeriodBuffer() {
|
||||
if (period_buffer.IsFull())
|
||||
return;
|
||||
|
||||
size_t nbytes = ring_buffer->pop(period_buffer.GetTail(),
|
||||
period_buffer.GetSpaceBytes());
|
||||
if (nbytes == 0)
|
||||
return;
|
||||
|
||||
period_buffer.AppendBytes(nbytes);
|
||||
|
||||
const std::lock_guard<Mutex> lock(mutex);
|
||||
/* notify the OutputThread that there is now
|
||||
room in ring_buffer */
|
||||
cond.signal();
|
||||
}
|
||||
|
||||
snd_pcm_sframes_t WriteFromPeriodBuffer() {
|
||||
assert(!period_buffer.IsEmpty());
|
||||
|
||||
auto frames_written = snd_pcm_writei(pcm, period_buffer.GetHead(),
|
||||
period_buffer.GetFrames(out_frame_size));
|
||||
if (frames_written > 0)
|
||||
period_buffer.ConsumeFrames(frames_written,
|
||||
out_frame_size);
|
||||
|
||||
return frames_written;
|
||||
}
|
||||
|
||||
bool LockHasError() const {
|
||||
const std::lock_guard<Mutex> lock(mutex);
|
||||
return !!error;
|
||||
}
|
||||
|
||||
/* virtual methods from class DeferredMonitor */
|
||||
virtual void RunDeferred() override {
|
||||
InvalidateSockets();
|
||||
}
|
||||
|
||||
/* virtual methods from class MultiSocketMonitor */
|
||||
virtual std::chrono::steady_clock::duration PrepareSockets() override;
|
||||
virtual void DispatchSockets() override;
|
||||
};
|
||||
|
||||
static constexpr Domain alsa_output_domain("alsa_output");
|
||||
|
||||
AlsaOutput::AlsaOutput(const ConfigBlock &block)
|
||||
:base(alsa_output_plugin, block),
|
||||
AlsaOutput::AlsaOutput(EventLoop &loop, const ConfigBlock &block)
|
||||
:MultiSocketMonitor(loop), DeferredMonitor(loop),
|
||||
base(alsa_output_plugin, block),
|
||||
device(block.GetBlockValue("device", "")),
|
||||
#ifdef ENABLE_DSD
|
||||
dop(block.GetBlockValue("dop", false) ||
|
||||
@ -210,7 +431,7 @@ AlsaOutput::AlsaOutput(const ConfigBlock &block)
|
||||
inline AlsaOutput *
|
||||
AlsaOutput::Create(EventLoop &, const ConfigBlock &block)
|
||||
{
|
||||
return new AlsaOutput(block);
|
||||
return new AlsaOutput(io_thread_get(), block);
|
||||
}
|
||||
|
||||
inline void
|
||||
@ -692,7 +913,6 @@ AlsaOutput::Setup(AudioFormat &audio_format,
|
||||
alsa_period_size = 1;
|
||||
|
||||
period_frames = alsa_period_size;
|
||||
period_position = 0;
|
||||
|
||||
silence = new uint8_t[snd_pcm_frames_to_bytes(pcm, alsa_period_size)];
|
||||
snd_pcm_format_set_silence(format, silence,
|
||||
@ -793,6 +1013,8 @@ AlsaOutput::Open(AudioFormat &audio_format)
|
||||
GetDevice()));
|
||||
}
|
||||
|
||||
snd_pcm_nonblock(pcm, 1);
|
||||
|
||||
#ifdef ENABLE_DSD
|
||||
if (params.dop)
|
||||
FormatDebug(alsa_output_domain, "DoP (DSD over PCM) enabled");
|
||||
@ -805,6 +1027,17 @@ AlsaOutput::Open(AudioFormat &audio_format)
|
||||
in_frame_size = audio_format.GetFrameSize();
|
||||
out_frame_size = pcm_export->GetFrameSize(audio_format);
|
||||
|
||||
drain = false;
|
||||
|
||||
size_t period_size = period_frames * out_frame_size;
|
||||
ring_buffer = new boost::lockfree::spsc_queue<uint8_t>(period_size * 4);
|
||||
|
||||
/* reserve space for one more (partial) frame, to be able to
|
||||
fill the buffer with silence, after moving an unfinished
|
||||
frame to the end */
|
||||
period_buffer.Allocate(period_frames, out_frame_size);
|
||||
|
||||
active = false;
|
||||
must_prepare = false;
|
||||
}
|
||||
|
||||
@ -836,7 +1069,7 @@ AlsaOutput::Recover(int err)
|
||||
case SND_PCM_STATE_OPEN:
|
||||
case SND_PCM_STATE_SETUP:
|
||||
case SND_PCM_STATE_XRUN:
|
||||
period_position = 0;
|
||||
period_buffer.Rewind();
|
||||
err = snd_pcm_prepare(pcm);
|
||||
break;
|
||||
case SND_PCM_STATE_DISCONNECTED:
|
||||
@ -852,88 +1085,107 @@ AlsaOutput::Recover(int err)
|
||||
return err;
|
||||
}
|
||||
|
||||
inline void
|
||||
AlsaOutput::Drain()
|
||||
inline bool
|
||||
AlsaOutput::DrainInternal()
|
||||
{
|
||||
if (snd_pcm_state(pcm) != SND_PCM_STATE_RUNNING)
|
||||
return;
|
||||
|
||||
if (period_position > 0) {
|
||||
/* generate some silence to finish the partial
|
||||
period */
|
||||
snd_pcm_uframes_t nframes =
|
||||
period_frames - period_position;
|
||||
WriteSilence(nframes);
|
||||
if (snd_pcm_state(pcm) != SND_PCM_STATE_RUNNING) {
|
||||
CancelInternal();
|
||||
return true;
|
||||
}
|
||||
|
||||
snd_pcm_drain(pcm);
|
||||
/* drain ring_buffer */
|
||||
CopyRingToPeriodBuffer();
|
||||
|
||||
period_position = 0;
|
||||
auto period_position = period_buffer.GetPeriodPosition(out_frame_size);
|
||||
if (period_position > 0)
|
||||
/* generate some silence to finish the partial
|
||||
period */
|
||||
period_buffer.FillWithSilence(silence, out_frame_size);
|
||||
|
||||
/* drain period_buffer */
|
||||
if (!period_buffer.IsEmpty()) {
|
||||
auto frames_written = WriteFromPeriodBuffer();
|
||||
if (frames_written < 0 && errno != EAGAIN) {
|
||||
CancelInternal();
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!period_buffer.IsEmpty())
|
||||
/* need to call WriteFromPeriodBuffer() again
|
||||
in the next iteration, so don't finish the
|
||||
drain just yet */
|
||||
return false;
|
||||
}
|
||||
|
||||
/* .. and finally drain the ALSA hardware buffer */
|
||||
return snd_pcm_drain(pcm) != -EAGAIN;
|
||||
}
|
||||
|
||||
inline void
|
||||
AlsaOutput::Cancel()
|
||||
AlsaOutput::Drain()
|
||||
{
|
||||
const std::lock_guard<Mutex> lock(mutex);
|
||||
|
||||
drain = true;
|
||||
|
||||
UnlockActivate();
|
||||
|
||||
while (drain && !error)
|
||||
cond.wait(mutex);
|
||||
}
|
||||
|
||||
inline void
|
||||
AlsaOutput::CancelInternal()
|
||||
{
|
||||
period_position = 0;
|
||||
must_prepare = true;
|
||||
|
||||
snd_pcm_drop(pcm);
|
||||
|
||||
pcm_export->Reset();
|
||||
period_buffer.Clear();
|
||||
ClearRingBuffer();
|
||||
}
|
||||
|
||||
inline void
|
||||
AlsaOutput::Cancel()
|
||||
{
|
||||
if (!active) {
|
||||
/* early cancel, quick code path without thread
|
||||
synchronization */
|
||||
|
||||
pcm_export->Reset();
|
||||
assert(period_buffer.IsEmpty());
|
||||
ClearRingBuffer();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
BlockingCall(MultiSocketMonitor::GetEventLoop(), [this](){
|
||||
CancelInternal();
|
||||
});
|
||||
}
|
||||
|
||||
inline void
|
||||
AlsaOutput::Close()
|
||||
{
|
||||
/* make sure the I/O thread isn't inside DispatchSockets() */
|
||||
BlockingCall(MultiSocketMonitor::GetEventLoop(), [this](){
|
||||
MultiSocketMonitor::Reset();
|
||||
DeferredMonitor::Cancel();
|
||||
});
|
||||
|
||||
period_buffer.Free();
|
||||
delete ring_buffer;
|
||||
snd_pcm_close(pcm);
|
||||
delete[] silence;
|
||||
}
|
||||
|
||||
inline size_t
|
||||
AlsaOutput::PlayRaw(ConstBuffer<void> data)
|
||||
{
|
||||
if (data.IsEmpty())
|
||||
return 0;
|
||||
|
||||
assert(data.size % out_frame_size == 0);
|
||||
|
||||
const size_t n_frames = data.size / out_frame_size;
|
||||
assert(n_frames > 0);
|
||||
|
||||
while (true) {
|
||||
const auto frames_written = snd_pcm_writei(pcm, data.data,
|
||||
n_frames);
|
||||
if (frames_written > 0) {
|
||||
period_position = (period_position + frames_written)
|
||||
% period_frames;
|
||||
|
||||
return frames_written * out_frame_size;
|
||||
}
|
||||
|
||||
if (frames_written < 0 && frames_written != -EAGAIN &&
|
||||
frames_written != -EINTR &&
|
||||
Recover(frames_written) < 0)
|
||||
throw FormatRuntimeError("snd_pcm_writei() failed: %s",
|
||||
snd_strerror(-frames_written));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
inline size_t
|
||||
AlsaOutput::Play(const void *chunk, size_t size)
|
||||
{
|
||||
assert(size > 0);
|
||||
assert(size % in_frame_size == 0);
|
||||
|
||||
if (must_prepare) {
|
||||
must_prepare = false;
|
||||
|
||||
int err = snd_pcm_prepare(pcm);
|
||||
if (err < 0)
|
||||
throw FormatRuntimeError("snd_pcm_prepare() failed: %s",
|
||||
snd_strerror(-err));
|
||||
}
|
||||
|
||||
const auto e = pcm_export->Export({chunk, size});
|
||||
if (e.size == 0)
|
||||
/* the DoP (DSD over PCM) filter converts two frames
|
||||
@ -944,8 +1196,96 @@ AlsaOutput::Play(const void *chunk, size_t size)
|
||||
been played */
|
||||
return size;
|
||||
|
||||
const size_t bytes_written = PlayRaw(e);
|
||||
return pcm_export->CalcSourceSize(bytes_written);
|
||||
const std::lock_guard<Mutex> lock(mutex);
|
||||
|
||||
while (true) {
|
||||
if (error)
|
||||
std::rethrow_exception(error);
|
||||
|
||||
size_t bytes_written = ring_buffer->push((const uint8_t *)chunk,
|
||||
size);
|
||||
if (bytes_written > 0)
|
||||
return pcm_export->CalcSourceSize(bytes_written);
|
||||
|
||||
/* now that the ring_buffer is full, we can activate
|
||||
the socket handlers to trigger the first
|
||||
snd_pcm_writei() */
|
||||
UnlockActivate();
|
||||
|
||||
/* wait for the DispatchSockets() to make room in the
|
||||
ring_buffer */
|
||||
cond.wait(mutex);
|
||||
}
|
||||
}
|
||||
|
||||
std::chrono::steady_clock::duration
|
||||
AlsaOutput::PrepareSockets()
|
||||
{
|
||||
if (LockHasError()) {
|
||||
ClearSocketList();
|
||||
return std::chrono::steady_clock::duration(-1);
|
||||
}
|
||||
|
||||
return PrepareAlsaPcmSockets(*this, pcm, pfd_buffer);
|
||||
}
|
||||
|
||||
void
|
||||
AlsaOutput::DispatchSockets()
|
||||
try {
|
||||
{
|
||||
const std::lock_guard<Mutex> lock(mutex);
|
||||
if (drain) {
|
||||
{
|
||||
ScopeUnlock unlock(mutex);
|
||||
if (!DrainInternal())
|
||||
return;
|
||||
|
||||
MultiSocketMonitor::InvalidateSockets();
|
||||
}
|
||||
|
||||
drain = false;
|
||||
cond.signal();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (must_prepare) {
|
||||
must_prepare = false;
|
||||
|
||||
int err = snd_pcm_prepare(pcm);
|
||||
if (err < 0)
|
||||
throw FormatRuntimeError("snd_pcm_prepare() failed: %s",
|
||||
snd_strerror(-err));
|
||||
}
|
||||
|
||||
CopyRingToPeriodBuffer();
|
||||
|
||||
if (period_buffer.IsEmpty())
|
||||
/* insert some silence if the buffer has not enough
|
||||
data yet, to avoid ALSA xrun */
|
||||
period_buffer.FillWithSilence(silence, out_frame_size);
|
||||
|
||||
auto frames_written = WriteFromPeriodBuffer();
|
||||
if (frames_written < 0) {
|
||||
if (frames_written == -EAGAIN || frames_written == -EINTR)
|
||||
/* try again in the next DispatchSockets()
|
||||
call which is still scheduled */
|
||||
return;
|
||||
|
||||
if (Recover(frames_written) < 0)
|
||||
throw FormatRuntimeError("snd_pcm_writei() failed: %s",
|
||||
snd_strerror(-frames_written));
|
||||
|
||||
/* recovered; try again in the next DispatchSockets()
|
||||
call */
|
||||
return;
|
||||
}
|
||||
} catch (const std::runtime_error &) {
|
||||
MultiSocketMonitor::Reset();
|
||||
|
||||
const std::lock_guard<Mutex> lock(mutex);
|
||||
error = std::current_exception();
|
||||
cond.signal();
|
||||
}
|
||||
|
||||
typedef AudioOutputWrapper<AlsaOutput> Wrapper;
|
||||
|
Loading…
Reference in New Issue
Block a user