diff --git a/Makefile.am b/Makefile.am index ad39b8f3e..dfeb5386a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -115,6 +115,7 @@ mpd_headers = \ src/mapper.h \ src/output/httpd_client.h \ src/output/httpd_internal.h \ + src/output/pulse_output_plugin.h \ src/page.h \ src/pcm_buffer.h \ src/pcm_utils.h \ diff --git a/NEWS b/NEWS index 3f5f5f896..4906fa3f3 100644 --- a/NEWS +++ b/NEWS @@ -31,6 +31,7 @@ ver 0.16 (20??/??/??) - openal: new output plugin - pulse: announce "media.role=music" - pulse: renamed context to "Music Player Daemon" + - pulse: connect to server on MPD startup, implement pause * mixers: - removed support for legacy mixer configuration - reimplemented software volume as mixer+filter plugin diff --git a/configure.ac b/configure.ac index 885115971..150e81aa0 100644 --- a/configure.ac +++ b/configure.ac @@ -726,7 +726,7 @@ AC_ARG_ENABLE(pulse, [enable support for the PulseAudio sound server]),, enable_pulse=auto) -MPD_AUTO_PKG(pulse, PULSE, [libpulse-simple], +MPD_AUTO_PKG(pulse, PULSE, [libpulse], [PulseAudio output plugin], [libpulse not found]) if test x$enable_pulse = xyes; then AC_DEFINE([HAVE_PULSE], 1, diff --git a/src/mixer/pulse_mixer_plugin.c b/src/mixer/pulse_mixer_plugin.c index ecc0fc75b..b33ef80ae 100644 --- a/src/mixer/pulse_mixer_plugin.c +++ b/src/mixer/pulse_mixer_plugin.c @@ -18,12 +18,20 @@ */ #include "mixer_api.h" +#include "output/pulse_output_plugin.h" #include "conf.h" +#include "event_pipe.h" #include -#include -#include +#include +#include +#include +#include +#include +#include + +#include #include #undef G_LOG_DOMAIN @@ -32,15 +40,9 @@ struct pulse_mixer { struct mixer base; - const char *server; - const char *sink; - const char *output_name; + struct pulse_output *output; - uint32_t index; bool online; - - struct pa_context *context; - struct pa_threaded_mainloop *mainloop; struct pa_cvolume volume; }; @@ -54,175 +56,159 @@ pulse_mixer_quark(void) return g_quark_from_static_string("pulse_mixer"); } +static void +pulse_mixer_offline(struct pulse_mixer *pm) +{ + if (!pm->online) + return; + + pm->online = false; + + event_pipe_emit(PIPE_EVENT_MIXER); +} + /** - * \brief waits for a pulseaudio operation to finish, frees it and - * unlocks the mainloop - * \param operation the operation to wait for - * \return true if operation has finished normally (DONE state), - * false otherwise + * Callback invoked by pulse_mixer_update(). Receives the new mixer + * value. */ -static bool -pulse_wait_for_operation(struct pa_threaded_mainloop *mainloop, - struct pa_operation *operation) -{ - pa_operation_state_t state; - - assert(mainloop != NULL); - assert(operation != NULL); - - state = pa_operation_get_state(operation); - while (state == PA_OPERATION_RUNNING) { - pa_threaded_mainloop_wait(mainloop); - state = pa_operation_get_state(operation); - } - - pa_operation_unref(operation); - - return state == PA_OPERATION_DONE; -} - static void -sink_input_cb(G_GNUC_UNUSED pa_context *context, const pa_sink_input_info *i, - int eol, void *userdata) +pulse_mixer_volume_cb(G_GNUC_UNUSED pa_context *context, const pa_sink_input_info *i, + int eol, void *userdata) { - struct pulse_mixer *pm = userdata; - if (eol) { - g_debug("eol error sink_input_cb"); + if (eol) return; - } if (i == NULL) { - g_debug("Sink input callback failure"); + pulse_mixer_offline(pm); return; } - g_debug("sink input cb %s, index %d ",i->name,i->index); - - if (strcmp(i->name,pm->output_name) == 0) { - pm->index = i->index; - pm->online = true; - pm->volume = i->volume; - } else - g_debug("bad name"); -} - -static void -sink_input_vol(G_GNUC_UNUSED pa_context *context, const pa_sink_input_info *i, - int eol, void *userdata) -{ - - struct pulse_mixer *pm = userdata; - - if (eol) { - g_debug("eol error sink_input_vol"); - return; - } - - if (i == NULL) { - g_debug("Sink input callback failure"); - return; - } - - g_debug("sink input vol %s, index %d ", i->name, i->index); - + pm->online = true; pm->volume = i->volume; - pa_threaded_mainloop_signal(pm->mainloop, 0); + event_pipe_emit(PIPE_EVENT_MIXER); } static void -subscribe_cb(pa_context *c, pa_subscription_event_type_t t, +pulse_mixer_update(struct pulse_mixer *pm) +{ + pa_operation *o; + + assert(pm->output->stream != NULL); + + if (pm->output->context == NULL) + return; + + o = pa_context_get_sink_input_info(pm->output->context, + pa_stream_get_index(pm->output->stream), + pulse_mixer_volume_cb, pm); + if (o == NULL) { + g_warning("pa_context_get_sink_input_info() failed: %s", + pa_strerror(pa_context_errno(pm->output->context))); + pulse_mixer_offline(pm); + return; + } + + pa_operation_unref(o); +} + +static void +pulse_mixer_handle_sink_input(struct pulse_mixer *pm, + pa_subscription_event_type_t t, + uint32_t idx) +{ + if (pm->output->stream == NULL) { + pulse_mixer_offline(pm); + return; + } + + if (idx != pa_stream_get_index(pm->output->stream)) + return; + + if (t == PA_SUBSCRIPTION_EVENT_NEW || + t == PA_SUBSCRIPTION_EVENT_CHANGE) + pulse_mixer_update(pm); +} + +static void +pulse_mixer_subscribe_cb(G_GNUC_UNUSED pa_context *c, pa_subscription_event_type_t t, uint32_t idx, void *userdata) { - struct pulse_mixer *pm = userdata; - g_debug("subscribe call back"); - switch (t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) { case PA_SUBSCRIPTION_EVENT_SINK_INPUT: - if ((t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == - PA_SUBSCRIPTION_EVENT_REMOVE && - pm->index == idx) - pm->online = false; - else { - pa_operation *o; - - o = pa_context_get_sink_input_info(c, idx, - sink_input_cb, pm); - if (o == NULL) { - g_debug("pa_context_get_sink_input_info() failed"); - return; - } - - pa_operation_unref(o); - } - + pulse_mixer_handle_sink_input(pm, + t & PA_SUBSCRIPTION_EVENT_TYPE_MASK, + idx); break; } } static void -context_state_cb(pa_context *context, void *userdata) +pulxe_mixer_context_state_cb(pa_context *context, void *userdata) { struct pulse_mixer *pm = userdata; + pa_operation *o; - switch (pa_context_get_state(context)) { - case PA_CONTEXT_READY: { - pa_operation *o; + /* pass event to the output's callback function */ + pulse_output_context_state_cb(context, pm->output); - pa_context_set_subscribe_callback(context, subscribe_cb, pm); + if (pa_context_get_state(context) == PA_CONTEXT_READY) { + /* subscribe to sink_input events after the connection + has been established */ o = pa_context_subscribe(context, (pa_subscription_mask_t)PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL); if (o == NULL) { - g_debug("pa_context_subscribe() failed"); + g_warning("pa_context_subscribe() failed: %s", + pa_strerror(pa_context_errno(context))); return; } pa_operation_unref(o); - o = pa_context_get_sink_input_info_list(context, - sink_input_cb, pm); - if (o == NULL) { - g_debug("pa_context_get_sink_input_info_list() failed"); - return; - } - - pa_operation_unref(o); - - pa_threaded_mainloop_signal(pm->mainloop, 0); - break; - } - - case PA_CONTEXT_UNCONNECTED: - case PA_CONTEXT_CONNECTING: - case PA_CONTEXT_AUTHORIZING: - case PA_CONTEXT_SETTING_NAME: - break; - case PA_CONTEXT_TERMINATED: - case PA_CONTEXT_FAILED: - pa_threaded_mainloop_signal(pm->mainloop, 0); - break; + if (pm->output->stream != NULL) + pulse_mixer_update(pm); } } - static struct mixer * -pulse_mixer_init(G_GNUC_UNUSED void *ao, const struct config_param *param, - G_GNUC_UNUSED GError **error_r) +pulse_mixer_init(void *ao, G_GNUC_UNUSED const struct config_param *param, + GError **error_r) { - struct pulse_mixer *pm = g_new(struct pulse_mixer,1); + struct pulse_mixer *pm; + + if (ao == NULL) { + g_set_error(error_r, pulse_mixer_quark(), 0, + "The pulse mixer cannot work without the audio output"); + return false; + } + + pm = g_new(struct pulse_mixer,1); mixer_init(&pm->base, &pulse_mixer_plugin); + pm->output = ao; pm->online = false; - pm->server = config_get_block_string(param, "server", NULL); - pm->sink = config_get_block_string(param, "sink", NULL); - pm->output_name = config_get_block_string(param, "name", NULL); + pa_threaded_mainloop_lock(pm->output->mainloop); + + /* register callbacks (override the output's context state + callback) */ + + pa_context_set_state_callback(pm->output->context, + pulxe_mixer_context_state_cb, pm); + pa_context_set_subscribe_callback(pm->output->context, + pulse_mixer_subscribe_cb, pm); + + /* check the current state now (we might have missed the first + events!) */ + pulxe_mixer_context_state_cb(pm->output->context, pm); + + pa_threaded_mainloop_unlock(pm->output->mainloop); return &pm->base; } @@ -232,79 +218,35 @@ pulse_mixer_finish(struct mixer *data) { struct pulse_mixer *pm = (struct pulse_mixer *) data; + /* restore callbacks */ + + pa_threaded_mainloop_lock(pm->output->mainloop); + + if (pm->output->context != NULL) { + pa_context_set_state_callback(pm->output->context, + pulse_output_context_state_cb, + pm->output); + pa_context_set_subscribe_callback(pm->output->context, + NULL, NULL); + } + + pa_threaded_mainloop_unlock(pm->output->mainloop); + + /* free resources */ + g_free(pm); } static bool -pulse_mixer_setup(struct pulse_mixer *pm, GError **error_r) -{ - pa_context_set_state_callback(pm->context, context_state_cb, pm); - - if (pa_context_connect(pm->context, pm->server, - (pa_context_flags_t)0, NULL) < 0) { - g_set_error(error_r, pulse_mixer_quark(), 0, - "pa_context_connect() has failed"); - return false; - } - - pa_threaded_mainloop_lock(pm->mainloop); - - if (pa_threaded_mainloop_start(pm->mainloop) < 0) { - pa_threaded_mainloop_unlock(pm->mainloop); - g_set_error(error_r, pulse_mixer_quark(), 0, - "pa_threaded_mainloop_start() has failed"); - return false; - } - - pa_threaded_mainloop_wait(pm->mainloop); - - if (pa_context_get_state(pm->context) != PA_CONTEXT_READY) { - g_set_error(error_r, pulse_mixer_quark(), 0, - "failed to connect: %s", - pa_strerror(pa_context_errno(pm->context))); - pa_threaded_mainloop_unlock(pm->mainloop); - return false; - } - - pa_threaded_mainloop_unlock(pm->mainloop); - - return true; -} - -static bool -pulse_mixer_open(struct mixer *data, GError **error_r) +pulse_mixer_open(struct mixer *data, G_GNUC_UNUSED GError **error_r) { struct pulse_mixer *pm = (struct pulse_mixer *) data; - g_debug("pulse mixer open"); - - pm->index = 0; - pm->online = false; - - pm->mainloop = pa_threaded_mainloop_new(); - if (pm->mainloop == NULL) { - g_set_error(error_r, pulse_mixer_quark(), 0, - "pa_threaded_mainloop_new() has failed"); - return false; - } - - pm->context = pa_context_new(pa_threaded_mainloop_get_api(pm->mainloop), - "Mixer mpd"); - if (pm->context == NULL) { - pa_threaded_mainloop_stop(pm->mainloop); - pa_threaded_mainloop_free(pm->mainloop); - g_set_error(error_r, pulse_mixer_quark(), 0, - "pa_context_new() has failed"); - return false; - } - - if (!pulse_mixer_setup(pm, error_r)) { - pa_threaded_mainloop_stop(pm->mainloop); - pa_context_disconnect(pm->context); - pa_context_unref(pm->context); - pa_threaded_mainloop_free(pm->mainloop); - return false; - } + pa_threaded_mainloop_lock(pm->output->mainloop); + if (pm->output->stream != NULL && + pa_stream_get_state(pm->output->stream) == PA_STREAM_READY) + pulse_mixer_update(pm); + pa_threaded_mainloop_unlock(pm->output->mainloop); return true; } @@ -314,49 +256,22 @@ pulse_mixer_close(struct mixer *data) { struct pulse_mixer *pm = (struct pulse_mixer *) data; - pa_threaded_mainloop_stop(pm->mainloop); - pa_context_disconnect(pm->context); - pa_context_unref(pm->context); - pa_threaded_mainloop_free(pm->mainloop); - - pm->online = false; + pulse_mixer_offline(pm); } static int -pulse_mixer_get_volume(struct mixer *mixer, GError **error_r) +pulse_mixer_get_volume(struct mixer *mixer, G_GNUC_UNUSED GError **error_r) { struct pulse_mixer *pm = (struct pulse_mixer *) mixer; int ret; - pa_operation *o; - pa_threaded_mainloop_lock(pm->mainloop); - - if (!pm->online) { - pa_threaded_mainloop_unlock(pm->mainloop); - return false; - } - - o = pa_context_get_sink_input_info(pm->context, pm->index, - sink_input_vol, pm); - if (o == NULL) { - pa_threaded_mainloop_unlock(pm->mainloop); - g_set_error(error_r, pulse_mixer_quark(), 0, - "pa_context_get_sink_input_info() has failed"); - return false; - } - - if (!pulse_wait_for_operation(pm->mainloop, o)) { - pa_threaded_mainloop_unlock(pm->mainloop); - g_set_error(error_r, pulse_mixer_quark(), 0, - "failed to read PulseAudio volume"); - return false; - } + pa_threaded_mainloop_lock(pm->output->mainloop); ret = pm->online ? (int)((100*(pa_cvolume_avg(&pm->volume)+1))/PA_VOLUME_NORM) : -1; - pa_threaded_mainloop_unlock(pm->mainloop); + pa_threaded_mainloop_unlock(pm->output->mainloop); return ret; } @@ -368,10 +283,11 @@ pulse_mixer_set_volume(struct mixer *mixer, unsigned volume, GError **error_r) struct pa_cvolume cvolume; pa_operation *o; - pa_threaded_mainloop_lock(pm->mainloop); + pa_threaded_mainloop_lock(pm->output->mainloop); - if (!pm->online) { - pa_threaded_mainloop_unlock(pm->mainloop); + if (!pm->online || pm->output->stream == NULL || + pm->output->context == NULL) { + pa_threaded_mainloop_unlock(pm->output->mainloop); g_set_error(error_r, pulse_mixer_quark(), 0, "disconnected"); return false; } @@ -379,9 +295,10 @@ pulse_mixer_set_volume(struct mixer *mixer, unsigned volume, GError **error_r) pa_cvolume_set(&cvolume, pm->volume.channels, (pa_volume_t)volume * PA_VOLUME_NORM / 100 + 0.5); - o = pa_context_set_sink_input_volume(pm->context, pm->index, + o = pa_context_set_sink_input_volume(pm->output->context, + pa_stream_get_index(pm->output->stream), &cvolume, NULL, NULL); - pa_threaded_mainloop_unlock(pm->mainloop); + pa_threaded_mainloop_unlock(pm->output->mainloop); if (o == NULL) { g_set_error(error_r, pulse_mixer_quark(), 0, "failed to set PulseAudio volume"); diff --git a/src/output/pulse_output_plugin.c b/src/output/pulse_output_plugin.c index 1b1c27575..c24a356f2 100644 --- a/src/output/pulse_output_plugin.c +++ b/src/output/pulse_output_plugin.c @@ -17,23 +17,21 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#include "pulse_output_plugin.h" #include "output_api.h" #include "mixer_list.h" #include -#include + +#include +#include +#include #include +#include + #define MPD_PULSE_NAME "Music Player Daemon" -struct pulse_data { - const char *name; - const char *server; - const char *sink; - - pa_simple *s; -}; - /** * The quark used for GError.domain. */ @@ -43,76 +41,299 @@ pulse_output_quark(void) return g_quark_from_static_string("pulse_output"); } -static struct pulse_data *pulse_new_data(void) +/** + * \brief waits for a pulseaudio operation to finish, frees it and + * unlocks the mainloop + * \param operation the operation to wait for + * \return true if operation has finished normally (DONE state), + * false otherwise + */ +static bool +pulse_wait_for_operation(struct pa_threaded_mainloop *mainloop, + struct pa_operation *operation) { - struct pulse_data *ret; + pa_operation_state_t state; - ret = g_new(struct pulse_data, 1); + assert(mainloop != NULL); + assert(operation != NULL); - ret->server = NULL; - ret->sink = NULL; - - return ret; -} - -static void pulse_free_data(struct pulse_data *pd) -{ - g_free(pd); -} - -static void * -pulse_init(G_GNUC_UNUSED const struct audio_format *audio_format, - const struct config_param *param, G_GNUC_UNUSED GError **error) -{ - struct pulse_data *pd; - - g_setenv("PULSE_PROP_media.role", "music", true); - - pd = pulse_new_data(); - pd->name = config_get_block_string(param, "name", "mpd_pulse"); - pd->server = config_get_block_string(param, "server", NULL); - pd->sink = config_get_block_string(param, "sink", NULL); - - return pd; -} - -static void pulse_finish(void *data) -{ - struct pulse_data *pd = data; - - pulse_free_data(pd); -} - -static bool pulse_test_default_device(void) -{ - pa_simple *s; - pa_sample_spec ss; - int error; - - ss.format = PA_SAMPLE_S16NE; - ss.rate = 44100; - ss.channels = 2; - - s = pa_simple_new(NULL, MPD_PULSE_NAME, PA_STREAM_PLAYBACK, NULL, - MPD_PULSE_NAME, &ss, NULL, NULL, &error); - if (!s) { - g_message("Cannot connect to default PulseAudio server: %s\n", - pa_strerror(error)); - return false; + state = pa_operation_get_state(operation); + while (state == PA_OPERATION_RUNNING) { + pa_threaded_mainloop_wait(mainloop); + state = pa_operation_get_state(operation); } - pa_simple_free(s); + pa_operation_unref(operation); + + return state == PA_OPERATION_DONE; +} + +/** + * Callback function for stream operation. It just sends a signal to + * the caller thread, to wake pulse_wait_for_operation() up. + */ +static void +pulse_output_stream_success_cb(G_GNUC_UNUSED pa_stream *s, + G_GNUC_UNUSED int success, void *userdata) +{ + struct pulse_output *po = userdata; + + pa_threaded_mainloop_signal(po->mainloop, 0); +} + +void +pulse_output_context_state_cb(struct pa_context *context, void *userdata) +{ + struct pulse_output *po = userdata; + + switch (pa_context_get_state(context)) { + case PA_CONTEXT_READY: + case PA_CONTEXT_TERMINATED: + case PA_CONTEXT_FAILED: + /* the caller thread might be waiting for these + states */ + pa_threaded_mainloop_signal(po->mainloop, 0); + break; + + case PA_CONTEXT_UNCONNECTED: + case PA_CONTEXT_CONNECTING: + case PA_CONTEXT_AUTHORIZING: + case PA_CONTEXT_SETTING_NAME: + break; + } +} + +/** + * Attempt to connect asynchronously to the PulseAudio server. + * + * @return true on success, false on error + */ +static bool +pulse_output_connect(struct pulse_output *po, GError **error_r) +{ + int error; + + error = pa_context_connect(po->context, po->server, + (pa_context_flags_t)0, NULL); + if (error < 0) { + g_set_error(error_r, pulse_output_quark(), 0, + "pa_context_connect() has failed: %s", + pa_strerror(pa_context_errno(po->context))); + return false; + } return true; } +/** + * Create, set up and connect a context. + * + * @return true on success, false on error + */ static bool -pulse_open(void *data, struct audio_format *audio_format, GError **error_r) +pulse_output_setup_context(struct pulse_output *po, GError **error_r) { - struct pulse_data *pd = data; + po->context = pa_context_new(pa_threaded_mainloop_get_api(po->mainloop), + MPD_PULSE_NAME); + if (po->context == NULL) { + g_set_error(error_r, pulse_output_quark(), 0, + "pa_context_new() has failed"); + return false; + } + + pa_context_set_state_callback(po->context, + pulse_output_context_state_cb, po); + + if (!pulse_output_connect(po, error_r)) { + pa_context_unref(po->context); + return false; + } + + return true; +} + +/** + * Frees and clears the context. + */ +static void +pulse_output_delete_context(struct pulse_output *po) +{ + pa_context_disconnect(po->context); + pa_context_unref(po->context); + po->context = NULL; +} + +static void * +pulse_output_init(G_GNUC_UNUSED const struct audio_format *audio_format, + const struct config_param *param, GError **error_r) +{ + struct pulse_output *po; + + g_setenv("PULSE_PROP_media.role", "music", true); + + po = g_new(struct pulse_output, 1); + po->name = config_get_block_string(param, "name", "mpd_pulse"); + po->server = config_get_block_string(param, "server", NULL); + po->sink = config_get_block_string(param, "sink", NULL); + + /* create the libpulse mainloop and start the thread */ + + po->mainloop = pa_threaded_mainloop_new(); + if (po->mainloop == NULL) { + g_free(po); + + g_set_error(error_r, pulse_output_quark(), 0, + "pa_threaded_mainloop_new() has failed"); + return NULL; + } + + pa_threaded_mainloop_lock(po->mainloop); + + if (pa_threaded_mainloop_start(po->mainloop) < 0) { + pa_threaded_mainloop_unlock(po->mainloop); + pa_threaded_mainloop_free(po->mainloop); + g_free(po); + + g_set_error(error_r, pulse_output_quark(), 0, + "pa_threaded_mainloop_start() has failed"); + return false; + } + + pa_threaded_mainloop_unlock(po->mainloop); + + /* create the libpulse context and connect it */ + + pa_threaded_mainloop_lock(po->mainloop); + + if (!pulse_output_setup_context(po, error_r)) { + pa_threaded_mainloop_unlock(po->mainloop); + pa_threaded_mainloop_stop(po->mainloop); + pa_threaded_mainloop_free(po->mainloop); + g_free(po); + return NULL; + } + + pa_threaded_mainloop_unlock(po->mainloop); + + return po; +} + +static void +pulse_output_finish(void *data) +{ + struct pulse_output *po = data; + + pa_threaded_mainloop_stop(po->mainloop); + if (po->context != NULL) + pulse_output_delete_context(po); + pa_threaded_mainloop_free(po->mainloop); + + g_free(po); +} + +/** + * Check if the context is (already) connected, and waits if not. If + * the context has been disconnected, retry to connect. + * + * @return true on success, false on error + */ +static bool +pulse_output_wait_connection(struct pulse_output *po, GError **error_r) +{ + pa_context_state_t state; + + pa_threaded_mainloop_lock(po->mainloop); + + if (po->context == NULL && !pulse_output_setup_context(po, error_r)) + return false; + + while (true) { + state = pa_context_get_state(po->context); + switch (state) { + case PA_CONTEXT_READY: + /* nothing to do */ + pa_threaded_mainloop_unlock(po->mainloop); + return true; + + case PA_CONTEXT_UNCONNECTED: + case PA_CONTEXT_TERMINATED: + case PA_CONTEXT_FAILED: + /* failure */ + g_set_error(error_r, pulse_output_quark(), 0, + "failed to connect: %s", + pa_strerror(pa_context_errno(po->context))); + pulse_output_delete_context(po); + pa_threaded_mainloop_unlock(po->mainloop); + return false; + + case PA_CONTEXT_CONNECTING: + case PA_CONTEXT_AUTHORIZING: + case PA_CONTEXT_SETTING_NAME: + /* wait some more */ + pa_threaded_mainloop_wait(po->mainloop); + break; + } + } +} + +static void +pulse_output_stream_state_cb(pa_stream *stream, void *userdata) +{ + struct pulse_output *po = userdata; + + switch (pa_stream_get_state(stream)) { + case PA_STREAM_READY: + case PA_STREAM_FAILED: + case PA_STREAM_TERMINATED: + pa_threaded_mainloop_signal(po->mainloop, 0); + break; + + case PA_STREAM_UNCONNECTED: + case PA_STREAM_CREATING: + break; + } +} + +static void +pulse_output_stream_write_cb(G_GNUC_UNUSED pa_stream *stream, size_t nbytes, + void *userdata) +{ + struct pulse_output *po = userdata; + + po->writable = nbytes; + pa_threaded_mainloop_signal(po->mainloop, 0); +} + +static bool +pulse_output_open(void *data, struct audio_format *audio_format, + GError **error_r) +{ + struct pulse_output *po = data; pa_sample_spec ss; int error; + if (po->context != NULL) { + switch (pa_context_get_state(po->context)) { + case PA_CONTEXT_UNCONNECTED: + case PA_CONTEXT_TERMINATED: + case PA_CONTEXT_FAILED: + /* the connection was closed meanwhile; delete + it, and pulse_output_wait_connection() will + reopen it */ + pulse_output_delete_context(po); + break; + + case PA_CONTEXT_READY: + case PA_CONTEXT_CONNECTING: + case PA_CONTEXT_AUTHORIZING: + case PA_CONTEXT_SETTING_NAME: + break; + } + } + + if (!pulse_output_wait_connection(po, error_r)) + return false; + /* MPD doesn't support the other pulseaudio sample formats, so we just force MPD to send us everything as 16 bit */ audio_format->bits = 16; @@ -121,45 +342,243 @@ pulse_open(void *data, struct audio_format *audio_format, GError **error_r) ss.rate = audio_format->sample_rate; ss.channels = audio_format->channels; - pd->s = pa_simple_new(pd->server, MPD_PULSE_NAME, PA_STREAM_PLAYBACK, - pd->sink, pd->name, - &ss, NULL, NULL, - &error); - if (!pd->s) { - g_set_error(error_r, pulse_output_quark(), error, - "Cannot connect to PulseAudio server: %s", - pa_strerror(error)); + pa_threaded_mainloop_lock(po->mainloop); + + /* create a stream .. */ + + po->stream = pa_stream_new(po->context, po->name, &ss, NULL); + if (po->stream == NULL) { + g_set_error(error_r, pulse_output_quark(), 0, + "pa_stream_new() has failed: %s", + pa_strerror(pa_context_errno(po->context))); + pa_threaded_mainloop_unlock(po->mainloop); + return false; + } + + pa_stream_set_state_callback(po->stream, + pulse_output_stream_state_cb, po); + pa_stream_set_write_callback(po->stream, + pulse_output_stream_write_cb, po); + + /* .. and connect it (asynchronously) */ + + error = pa_stream_connect_playback(po->stream, po->sink, + NULL, 0, NULL, NULL); + if (error < 0) { + pa_stream_unref(po->stream); + po->stream = NULL; + + g_set_error(error_r, pulse_output_quark(), 0, + "pa_stream_connect_playback() has failed: %s", + pa_strerror(pa_context_errno(po->context))); + pa_threaded_mainloop_unlock(po->mainloop); + return false; + } + + pa_threaded_mainloop_unlock(po->mainloop); + +#if !PA_CHECK_VERSION(0,9,11) + po->pause = false; +#endif + + return true; +} + +static void +pulse_output_close(void *data) +{ + struct pulse_output *po = data; + pa_operation *o; + + pa_threaded_mainloop_lock(po->mainloop); + + if (pa_stream_get_state(po->stream) == PA_STREAM_READY) { + o = pa_stream_drain(po->stream, + pulse_output_stream_success_cb, po); + if (o == NULL) { + g_warning("pa_stream_drain() has failed: %s", + pa_strerror(pa_context_errno(po->context))); + } else + pulse_wait_for_operation(po->mainloop, o); + } + + pa_stream_disconnect(po->stream); + pa_stream_unref(po->stream); + po->stream = NULL; + + if (po->context != NULL && + pa_context_get_state(po->context) != PA_CONTEXT_READY) + pulse_output_delete_context(po); + + pa_threaded_mainloop_unlock(po->mainloop); +} + +/** + * Check if the stream is (already) connected, and waits for a signal + * if not. The mainloop must be locked before calling this function. + * + * @return the current stream state + */ +static pa_stream_state_t +pulse_output_check_stream(struct pulse_output *po) +{ + pa_stream_state_t state = pa_stream_get_state(po->stream); + + switch (state) { + case PA_STREAM_READY: + case PA_STREAM_FAILED: + case PA_STREAM_TERMINATED: + case PA_STREAM_UNCONNECTED: + break; + + case PA_STREAM_CREATING: + pa_threaded_mainloop_wait(po->mainloop); + state = pa_stream_get_state(po->stream); + break; + } + + return state; +} + +/** + * Check if the stream is (already) connected, and waits if not. The + * mainloop must be locked before calling this function. + * + * @return true on success, false on error + */ +static bool +pulse_output_wait_stream(struct pulse_output *po, GError **error_r) +{ + pa_stream_state_t state = pa_stream_get_state(po->stream); + + switch (state) { + case PA_STREAM_READY: + return true; + + case PA_STREAM_FAILED: + case PA_STREAM_TERMINATED: + case PA_STREAM_UNCONNECTED: + g_set_error(error_r, pulse_output_quark(), 0, + "disconnected"); + return false; + + case PA_STREAM_CREATING: + break; + } + + do { + state = pulse_output_check_stream(po); + } while (state == PA_STREAM_CREATING); + + if (state != PA_STREAM_READY) { + g_set_error(error_r, pulse_output_quark(), 0, + "failed to connect the stream: %s", + pa_strerror(pa_context_errno(po->context))); return false; } return true; } -static void pulse_cancel(void *data) +/** + * Determines whether the stream is paused. On libpulse older than + * 0.9.11, it uses a custom pause flag. + */ +static bool +pulse_output_stream_is_paused(struct pulse_output *po) { - struct pulse_data *pd = data; - int error; + assert(po->stream != NULL); - if (pa_simple_flush(pd->s, &error) < 0) - g_warning("Flush failed in PulseAudio output \"%s\": %s\n", - pd->name, pa_strerror(error)); +#if !defined(PA_CHECK_VERSION) || !PA_CHECK_VERSION(0,9,11) + return po->pause; +#else + return pa_stream_is_corked(po->stream); +#endif } -static void pulse_close(void *data) +/** + * Sets cork mode on the stream. + */ +static bool +pulse_output_stream_pause(struct pulse_output *po, bool pause, + GError **error_r) { - struct pulse_data *pd = data; + pa_operation *o; - pa_simple_drain(pd->s, NULL); - pa_simple_free(pd->s); + assert(po->stream != NULL); + + o = pa_stream_cork(po->stream, pause, + pulse_output_stream_success_cb, po); + if (o == NULL) { + g_set_error(error_r, pulse_output_quark(), 0, + "pa_stream_cork() has failed: %s", + pa_strerror(pa_context_errno(po->context))); + return false; + } + + if (!pulse_wait_for_operation(po->mainloop, o)) { + g_set_error(error_r, pulse_output_quark(), 0, + "pa_stream_cork() has failed: %s", + pa_strerror(pa_context_errno(po->context))); + return false; + } + +#if !PA_CHECK_VERSION(0,9,11) + po->pause = pause; +#endif + return true; } static size_t -pulse_play(void *data, const void *chunk, size_t size, GError **error_r) +pulse_output_play(void *data, const void *chunk, size_t size, GError **error_r) { - struct pulse_data *pd = data; + struct pulse_output *po = data; int error; - if (pa_simple_write(pd->s, chunk, size, &error) < 0) { + assert(po->stream != NULL); + + pa_threaded_mainloop_lock(po->mainloop); + + /* check if the stream is (already) connected */ + + if (!pulse_output_wait_stream(po, error_r)) { + pa_threaded_mainloop_unlock(po->mainloop); + return 0; + } + + assert(po->context != NULL); + + /* unpause if previously paused */ + + if (pulse_output_stream_is_paused(po) && + !pulse_output_stream_pause(po, false, error_r)) + return 0; + + /* wait until the server allows us to write */ + + while (po->writable == 0) { + pa_threaded_mainloop_wait(po->mainloop); + + if (pa_stream_get_state(po->stream) != PA_STREAM_READY) { + pa_threaded_mainloop_unlock(po->mainloop); + g_set_error(error_r, pulse_output_quark(), 0, + "disconnected"); + return false; + } + } + + /* now write */ + + if (size > po->writable) + /* don't send more than possible */ + size = po->writable; + + po->writable -= size; + + error = pa_stream_write(po->stream, chunk, size, NULL, + 0, PA_SEEK_RELATIVE); + pa_threaded_mainloop_unlock(po->mainloop); + if (error < 0) { g_set_error(error_r, pulse_output_quark(), error, "%s", pa_strerror(error)); return 0; @@ -168,16 +587,105 @@ pulse_play(void *data, const void *chunk, size_t size, GError **error_r) return size; } +static void +pulse_output_cancel(void *data) +{ + struct pulse_output *po = data; + pa_operation *o; + + assert(po->stream != NULL); + + pa_threaded_mainloop_lock(po->mainloop); + + if (pa_stream_get_state(po->stream) != PA_STREAM_READY) { + /* no need to flush when the stream isn't connected + yet */ + pa_threaded_mainloop_unlock(po->mainloop); + return; + } + + assert(po->context != NULL); + + o = pa_stream_flush(po->stream, pulse_output_stream_success_cb, po); + if (o == NULL) { + g_warning("pa_stream_flush() has failed: %s", + pa_strerror(pa_context_errno(po->context))); + pa_threaded_mainloop_unlock(po->mainloop); + return; + } + + pulse_wait_for_operation(po->mainloop, o); + pa_threaded_mainloop_unlock(po->mainloop); +} + +static bool +pulse_output_pause(void *data) +{ + struct pulse_output *po = data; + GError *error = NULL; + + assert(po->stream != NULL); + + pa_threaded_mainloop_lock(po->mainloop); + + /* check if the stream is (already/still) connected */ + + if (!pulse_output_wait_stream(po, &error)) { + pa_threaded_mainloop_unlock(po->mainloop); + g_warning("%s", error->message); + g_error_free(error); + return false; + } + + assert(po->context != NULL); + + /* cork the stream */ + + if (pulse_output_stream_is_paused(po)) { + /* already paused; due to a MPD API limitation, we + have to sleep a little bit here, to avoid hogging + the CPU */ + + g_usleep(50000); + } else if (!pulse_output_stream_pause(po, true, &error)) { + pa_threaded_mainloop_unlock(po->mainloop); + g_warning("%s", error->message); + g_error_free(error); + return false; + } + + pa_threaded_mainloop_unlock(po->mainloop); + + return true; +} + +static bool +pulse_output_test_default_device(void) +{ + struct pulse_output *po; + bool success; + + po = pulse_output_init(NULL, NULL, NULL); + if (po == NULL) + return false; + + success = pulse_output_wait_connection(po, NULL); + pulse_output_finish(po); + + return success; +} + const struct audio_output_plugin pulse_output_plugin = { .name = "pulse", - .test_default_device = pulse_test_default_device, - .init = pulse_init, - .finish = pulse_finish, - .open = pulse_open, - .play = pulse_play, - .cancel = pulse_cancel, - .close = pulse_close, + .test_default_device = pulse_output_test_default_device, + .init = pulse_output_init, + .finish = pulse_output_finish, + .open = pulse_output_open, + .play = pulse_output_play, + .cancel = pulse_output_cancel, + .pause = pulse_output_pause, + .close = pulse_output_close, .mixer_plugin = &pulse_mixer_plugin, }; diff --git a/src/output/pulse_output_plugin.h b/src/output/pulse_output_plugin.h new file mode 100644 index 000000000..fc2a7d4d5 --- /dev/null +++ b/src/output/pulse_output_plugin.h @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2003-2009 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_PULSE_OUTPUT_PLUGIN_H +#define MPD_PULSE_OUTPUT_PLUGIN_H + +#include +#include + +#if !defined(PA_CHECK_VERSION) +/** + * This macro was implemented in libpulse 0.9.16. + */ +#define PA_CHECK_VERSION(a,b,c) false +#endif + +struct pa_operation; + +struct pulse_output { + const char *name; + const char *server; + const char *sink; + + struct pa_threaded_mainloop *mainloop; + struct pa_context *context; + struct pa_stream *stream; + + size_t writable; + +#if !PA_CHECK_VERSION(0,9,11) + /** + * We need this variable because pa_stream_is_corked() wasn't + * added before 0.9.11. + */ + bool pause; +#endif +}; + +void +pulse_output_context_state_cb(struct pa_context *context, void *userdata); + +#endif diff --git a/test/read_mixer.c b/test/read_mixer.c index fdf6b7fe1..1bf40bd5b 100644 --- a/test/read_mixer.c +++ b/test/read_mixer.c @@ -21,6 +21,8 @@ #include "mixer_list.h" #include "filter_registry.h" #include "pcm_volume.h" +#include "output/pulse_output_plugin.h" +#include "event_pipe.h" #include @@ -28,6 +30,17 @@ #include #include +void +pulse_output_context_state_cb(G_GNUC_UNUSED struct pa_context *context, + G_GNUC_UNUSED void *userdata) +{ +} + +void +event_pipe_emit(G_GNUC_UNUSED enum pipe_event event) +{ +} + const struct filter_plugin * filter_plugin_by_name(G_GNUC_UNUSED const char *name) { diff --git a/test/run_output.c b/test/run_output.c index 5ab9625e8..3731b6c09 100644 --- a/test/run_output.c +++ b/test/run_output.c @@ -24,6 +24,7 @@ #include "audio_parser.h" #include "filter_registry.h" #include "pcm_convert.h" +#include "event_pipe.h" #include @@ -31,6 +32,11 @@ #include #include +void +event_pipe_emit(G_GNUC_UNUSED enum pipe_event event) +{ +} + void pcm_convert_init(G_GNUC_UNUSED struct pcm_convert_state *state) { }