decoder_control: protect command, state with a mutex

Replace decoder_control.notify with decoder_control.mutex and
decoder_control.cond.  Lock the mutex on all accesses to
decoder_control.command and decoder_control.state.
This commit is contained in:
Max Kellermann 2009-08-13 23:33:46 +02:00
parent 499ed62dd7
commit e28a0e97b5
6 changed files with 242 additions and 32 deletions

View File

@ -57,7 +57,10 @@ void decoder_initialized(G_GNUC_UNUSED struct decoder * decoder,
dc.seekable = seekable; dc.seekable = seekable;
dc.total_time = total_time; dc.total_time = total_time;
decoder_lock();
dc.state = DECODE_STATE_DECODE; dc.state = DECODE_STATE_DECODE;
decoder_unlock();
notify_signal(&pc.notify); notify_signal(&pc.notify);
g_debug("audio_format=%u:%u:%u, seekable=%s", g_debug("audio_format=%u:%u:%u, seekable=%s",
@ -88,6 +91,8 @@ enum decoder_command decoder_get_command(G_GNUC_UNUSED struct decoder * decoder)
void decoder_command_finished(G_GNUC_UNUSED struct decoder * decoder) void decoder_command_finished(G_GNUC_UNUSED struct decoder * decoder)
{ {
decoder_lock();
assert(dc.command != DECODE_COMMAND_NONE); assert(dc.command != DECODE_COMMAND_NONE);
assert(dc.command != DECODE_COMMAND_SEEK || assert(dc.command != DECODE_COMMAND_SEEK ||
dc.seek_error || decoder->seeking); dc.seek_error || decoder->seeking);
@ -105,6 +110,8 @@ void decoder_command_finished(G_GNUC_UNUSED struct decoder * decoder)
} }
dc.command = DECODE_COMMAND_NONE; dc.command = DECODE_COMMAND_NONE;
decoder_unlock();
notify_signal(&pc.notify); notify_signal(&pc.notify);
} }
@ -226,21 +233,23 @@ decoder_data(struct decoder *decoder,
{ {
const char *data = _data; const char *data = _data;
GError *error = NULL; GError *error = NULL;
enum decoder_command cmd;
assert(dc.state == DECODE_STATE_DECODE); assert(dc.state == DECODE_STATE_DECODE);
assert(dc.pipe != NULL); assert(dc.pipe != NULL);
assert(length % audio_format_frame_size(&dc.in_audio_format) == 0); assert(length % audio_format_frame_size(&dc.in_audio_format) == 0);
if (dc.command == DECODE_COMMAND_STOP || decoder_lock();
dc.command == DECODE_COMMAND_SEEK || cmd = dc.command;
decoder_unlock();
if (cmd == DECODE_COMMAND_STOP || cmd == DECODE_COMMAND_SEEK ||
length == 0) length == 0)
return dc.command; return cmd;
/* send stream tags */ /* send stream tags */
if (update_stream_tag(decoder, is)) { if (update_stream_tag(decoder, is)) {
enum decoder_command cmd;
if (decoder->decoder_tag != NULL) { if (decoder->decoder_tag != NULL) {
/* merge with tag from decoder plugin */ /* merge with tag from decoder plugin */
struct tag *tag; struct tag *tag;

View File

@ -18,6 +18,7 @@
*/ */
#include "decoder_control.h" #include "decoder_control.h"
#include "notify.h"
#include <assert.h> #include <assert.h>
@ -25,36 +26,63 @@ struct decoder_control dc;
void dc_init(void) void dc_init(void)
{ {
notify_init(&dc.notify); dc.mutex = g_mutex_new();
dc.cond = g_cond_new();
dc.state = DECODE_STATE_STOP; dc.state = DECODE_STATE_STOP;
dc.command = DECODE_COMMAND_NONE; dc.command = DECODE_COMMAND_NONE;
} }
void dc_deinit(void) void dc_deinit(void)
{ {
notify_deinit(&dc.notify); g_cond_free(dc.cond);
g_mutex_free(dc.mutex);
}
static void
dc_command_wait_locked(struct notify *notify)
{
while (dc.command != DECODE_COMMAND_NONE) {
decoder_signal();
decoder_unlock();
notify_wait(notify);
decoder_lock();
}
} }
void void
dc_command_wait(struct notify *notify) dc_command_wait(struct notify *notify)
{ {
while (dc.command != DECODE_COMMAND_NONE) { decoder_lock();
notify_signal(&dc.notify); dc_command_wait_locked(notify);
notify_wait(notify); decoder_unlock();
} }
static void
dc_command_locked(struct notify *notify, enum decoder_command cmd)
{
dc.command = cmd;
dc_command_wait_locked(notify);
} }
static void static void
dc_command(struct notify *notify, enum decoder_command cmd) dc_command(struct notify *notify, enum decoder_command cmd)
{ {
dc.command = cmd; decoder_lock();
dc_command_wait(notify); dc_command_locked(notify, cmd);
decoder_unlock();
} }
static void dc_command_async(enum decoder_command cmd) static void dc_command_async(enum decoder_command cmd)
{ {
decoder_lock();
dc.command = cmd; dc.command = cmd;
notify_signal(&dc.notify); decoder_signal();
decoder_unlock();
} }
void void
@ -80,15 +108,19 @@ dc_start_async(struct song *song)
void void
dc_stop(struct notify *notify) dc_stop(struct notify *notify)
{ {
decoder_lock();
if (dc.command != DECODE_COMMAND_NONE) if (dc.command != DECODE_COMMAND_NONE)
/* Attempt to cancel the current command. If it's too /* Attempt to cancel the current command. If it's too
late and the decoder thread is already executing late and the decoder thread is already executing
the old command, we'll call STOP again in this the old command, we'll call STOP again in this
function (see below). */ function (see below). */
dc_command(notify, DECODE_COMMAND_STOP); dc_command_locked(notify, DECODE_COMMAND_STOP);
if (dc.state != DECODE_STATE_STOP && dc.state != DECODE_STATE_ERROR) if (dc.state != DECODE_STATE_STOP && dc.state != DECODE_STATE_ERROR)
dc_command(notify, DECODE_COMMAND_STOP); dc_command_locked(notify, DECODE_COMMAND_STOP);
decoder_unlock();
} }
bool bool

View File

@ -22,13 +22,16 @@
#include "decoder_command.h" #include "decoder_command.h"
#include "audio_format.h" #include "audio_format.h"
#include "notify.h"
#include <glib.h>
#include <assert.h> #include <assert.h>
#define DECODE_TYPE_FILE 0 #define DECODE_TYPE_FILE 0
#define DECODE_TYPE_URL 1 #define DECODE_TYPE_URL 1
struct notify;
enum decoder_state { enum decoder_state {
DECODE_STATE_STOP = 0, DECODE_STATE_STOP = 0,
DECODE_STATE_START, DECODE_STATE_START,
@ -48,14 +51,25 @@ struct decoder_control {
thread isn't running */ thread isn't running */
GThread *thread; GThread *thread;
struct notify notify; /**
* This lock protects #state and #command.
*/
GMutex *mutex;
/**
* Trigger this object after you have modified #command. This
* is also used by the decoder thread to notify the caller
* when it has finished a command.
*/
GCond *cond;
enum decoder_state state;
enum decoder_command command;
volatile enum decoder_state state;
volatile enum decoder_command command;
bool quit; bool quit;
bool seek_error; bool seek_error;
bool seekable; bool seekable;
volatile double seek_where; double seek_where;
/** the format of the song file */ /** the format of the song file */
struct audio_format in_audio_format; struct audio_format in_audio_format;
@ -80,6 +94,46 @@ void dc_init(void);
void dc_deinit(void); void dc_deinit(void);
/**
* Locks the #decoder_control object.
*/
static inline void
decoder_lock(void)
{
g_mutex_lock(dc.mutex);
}
/**
* Unlocks the #decoder_control object.
*/
static inline void
decoder_unlock(void)
{
g_mutex_unlock(dc.mutex);
}
/**
* Waits for a signal on the #decoder_control object. This function
* is only valid in the decoder thread. The object must be locked
* prior to calling this function.
*/
static inline void
decoder_wait(void)
{
g_cond_wait(dc.cond, dc.mutex);
}
/**
* Signals the #decoder_control object. This function is only valid
* in the player thread. The object should be locked prior to calling
* this function.
*/
static inline void
decoder_signal(void)
{
g_cond_signal(dc.cond);
}
static inline bool decoder_is_idle(void) static inline bool decoder_is_idle(void)
{ {
return (dc.state == DECODE_STATE_STOP || return (dc.state == DECODE_STATE_STOP ||
@ -100,6 +154,39 @@ static inline bool decoder_has_failed(void)
return dc.state == DECODE_STATE_ERROR; return dc.state == DECODE_STATE_ERROR;
} }
static inline bool decoder_lock_is_idle(void)
{
bool ret;
decoder_lock();
ret = decoder_is_idle();
decoder_unlock();
return ret;
}
static inline bool decoder_lock_is_starting(void)
{
bool ret;
decoder_lock();
ret = decoder_is_starting();
decoder_unlock();
return ret;
}
static inline bool decoder_lock_has_failed(void)
{
bool ret;
decoder_lock();
ret = decoder_has_failed();
decoder_unlock();
return ret;
}
static inline struct song * static inline struct song *
decoder_current_song(void) decoder_current_song(void)
{ {

View File

@ -27,6 +27,24 @@
#include <assert.h> #include <assert.h>
/**
* This is a wrapper for input_stream_buffer(). It assumes that the
* decoder is currently locked, and temporarily unlocks it while
* calling input_stream_buffer(). We shouldn't hold the lock during a
* potentially blocking operation.
*/
static int
decoder_input_buffer(struct input_stream *is)
{
int ret;
decoder_unlock();
ret = input_stream_buffer(is) > 0;
decoder_lock();
return ret;
}
/** /**
* All chunks are full of decoded data; wait for the player to free * All chunks are full of decoded data; wait for the player to free
* one. * one.
@ -38,9 +56,12 @@ need_chunks(struct input_stream *is, bool do_wait)
dc.command == DECODE_COMMAND_SEEK) dc.command == DECODE_COMMAND_SEEK)
return dc.command; return dc.command;
if ((is == NULL || input_stream_buffer(is) <= 0) && do_wait) { if ((is == NULL || decoder_input_buffer(is) <= 0) && do_wait) {
notify_wait(&dc.notify); decoder_wait();
decoder_unlock();
notify_signal(&pc.notify); notify_signal(&pc.notify);
decoder_lock();
return dc.command; return dc.command;
} }
@ -63,7 +84,9 @@ decoder_get_chunk(struct decoder *decoder, struct input_stream *is)
if (decoder->chunk != NULL) if (decoder->chunk != NULL)
return decoder->chunk; return decoder->chunk;
decoder_lock();
cmd = need_chunks(is, true); cmd = need_chunks(is, true);
decoder_unlock();
} while (cmd == DECODE_COMMAND_NONE); } while (cmd == DECODE_COMMAND_NONE);
return NULL; return NULL;

View File

@ -49,11 +49,15 @@ decoder_stream_decode(const struct decoder_plugin *plugin,
assert(input_stream->ready); assert(input_stream->ready);
assert(dc.state == DECODE_STATE_START); assert(dc.state == DECODE_STATE_START);
decoder_unlock();
/* rewind the stream, so each plugin gets a fresh start */ /* rewind the stream, so each plugin gets a fresh start */
input_stream_seek(input_stream, 0, SEEK_SET); input_stream_seek(input_stream, 0, SEEK_SET);
decoder_plugin_stream_decode(plugin, decoder, input_stream); decoder_plugin_stream_decode(plugin, decoder, input_stream);
decoder_lock();
assert(dc.state == DECODE_STATE_START || assert(dc.state == DECODE_STATE_START ||
dc.state == DECODE_STATE_DECODE); dc.state == DECODE_STATE_DECODE);
@ -73,8 +77,12 @@ decoder_file_decode(const struct decoder_plugin *plugin,
assert(path[0] == '/'); assert(path[0] == '/');
assert(dc.state == DECODE_STATE_START); assert(dc.state == DECODE_STATE_START);
decoder_unlock();
decoder_plugin_file_decode(plugin, decoder, path); decoder_plugin_file_decode(plugin, decoder, path);
decoder_lock();
assert(dc.state == DECODE_STATE_START || assert(dc.state == DECODE_STATE_START ||
dc.state == DECODE_STATE_DECODE); dc.state == DECODE_STATE_DECODE);
@ -103,28 +111,40 @@ static void decoder_run_song(const struct song *song, const char *uri)
dc.state = DECODE_STATE_START; dc.state = DECODE_STATE_START;
dc.command = DECODE_COMMAND_NONE; dc.command = DECODE_COMMAND_NONE;
decoder_unlock();
notify_signal(&pc.notify); notify_signal(&pc.notify);
decoder_lock();
/* wait for the input stream to become ready; its metadata /* wait for the input stream to become ready; its metadata
will be available then */ will be available then */
while (!input_stream.ready) { while (!input_stream.ready) {
if (dc.command == DECODE_COMMAND_STOP) { if (dc.command == DECODE_COMMAND_STOP) {
decoder_unlock();
input_stream_close(&input_stream); input_stream_close(&input_stream);
decoder_lock();
dc.state = DECODE_STATE_STOP; dc.state = DECODE_STATE_STOP;
return; return;
} }
decoder_unlock();
ret = input_stream_buffer(&input_stream); ret = input_stream_buffer(&input_stream);
if (ret < 0) { if (ret < 0) {
input_stream_close(&input_stream); input_stream_close(&input_stream);
decoder_lock();
dc.state = DECODE_STATE_ERROR; dc.state = DECODE_STATE_ERROR;
return; return;
} }
decoder_lock();
} }
if (dc.command == DECODE_COMMAND_STOP) { if (dc.command == DECODE_COMMAND_STOP) {
decoder_unlock();
input_stream_close(&input_stream); input_stream_close(&input_stream);
decoder_lock();
dc.state = DECODE_STATE_STOP; dc.state = DECODE_STATE_STOP;
return; return;
} }
@ -179,7 +199,10 @@ static void decoder_run_song(const struct song *song, const char *uri)
const char *s = uri_get_suffix(uri); const char *s = uri_get_suffix(uri);
while ((plugin = decoder_plugin_from_suffix(s, next++))) { while ((plugin = decoder_plugin_from_suffix(s, next++))) {
if (plugin->file_decode != NULL) { if (plugin->file_decode != NULL) {
decoder_unlock();
input_stream_close(&input_stream); input_stream_close(&input_stream);
decoder_lock();
close_instream = false; close_instream = false;
ret = decoder_file_decode(plugin, ret = decoder_file_decode(plugin,
&decoder, uri); &decoder, uri);
@ -191,7 +214,13 @@ static void decoder_run_song(const struct song *song, const char *uri)
been closed before been closed before
decoder_file_decode() - decoder_file_decode() -
reopen it */ reopen it */
if (input_stream_open(&input_stream, uri)) bool success;
decoder_unlock();
success = input_stream_open(&input_stream, uri);
decoder_lock();
if (success)
close_instream = true; close_instream = true;
else else
continue; continue;
@ -205,6 +234,8 @@ static void decoder_run_song(const struct song *song, const char *uri)
} }
} }
decoder_unlock();
pcm_convert_deinit(&decoder.conv_state); pcm_convert_deinit(&decoder.conv_state);
/* flush the last chunk */ /* flush the last chunk */
@ -223,6 +254,8 @@ static void decoder_run_song(const struct song *song, const char *uri)
if (decoder.decoder_tag != NULL) if (decoder.decoder_tag != NULL)
tag_free(decoder.decoder_tag); tag_free(decoder.decoder_tag);
decoder_lock();
dc.state = ret ? DECODE_STATE_STOP : DECODE_STATE_ERROR; dc.state = ret ? DECODE_STATE_STOP : DECODE_STATE_ERROR;
} }
@ -249,6 +282,8 @@ static void decoder_run(void)
static gpointer decoder_task(G_GNUC_UNUSED gpointer arg) static gpointer decoder_task(G_GNUC_UNUSED gpointer arg)
{ {
decoder_lock();
do { do {
assert(dc.state == DECODE_STATE_STOP || assert(dc.state == DECODE_STATE_STOP ||
dc.state == DECODE_STATE_ERROR); dc.state == DECODE_STATE_ERROR);
@ -259,20 +294,28 @@ static gpointer decoder_task(G_GNUC_UNUSED gpointer arg)
decoder_run(); decoder_run();
dc.command = DECODE_COMMAND_NONE; dc.command = DECODE_COMMAND_NONE;
decoder_unlock();
notify_signal(&pc.notify); notify_signal(&pc.notify);
decoder_lock();
break; break;
case DECODE_COMMAND_STOP: case DECODE_COMMAND_STOP:
dc.command = DECODE_COMMAND_NONE; dc.command = DECODE_COMMAND_NONE;
decoder_unlock();
notify_signal(&pc.notify); notify_signal(&pc.notify);
decoder_lock();
break; break;
case DECODE_COMMAND_NONE: case DECODE_COMMAND_NONE:
notify_wait(&dc.notify); decoder_wait();
break; break;
} }
} while (dc.command != DECODE_COMMAND_NONE || !dc.quit); } while (dc.command != DECODE_COMMAND_NONE || !dc.quit);
decoder_unlock();
return NULL; return NULL;
} }

View File

@ -135,7 +135,7 @@ player_wait_for_decoder(struct player *player)
{ {
dc_command_wait(&pc.notify); dc_command_wait(&pc.notify);
if (decoder_has_failed()) { if (decoder_lock_has_failed()) {
assert(dc.next_song == NULL || dc.next_song->url != NULL); assert(dc.next_song == NULL || dc.next_song->url != NULL);
pc.errored_song = dc.next_song; pc.errored_song = dc.next_song;
pc.error = PLAYER_ERROR_FILE; pc.error = PLAYER_ERROR_FILE;
@ -174,10 +174,14 @@ player_check_decoder_startup(struct player *player)
{ {
assert(player->decoder_starting); assert(player->decoder_starting);
decoder_lock();
if (decoder_has_failed()) { if (decoder_has_failed()) {
/* the decoder failed */ /* the decoder failed */
assert(dc.next_song == NULL || dc.next_song->url != NULL); assert(dc.next_song == NULL || dc.next_song->url != NULL);
decoder_unlock();
pc.errored_song = dc.next_song; pc.errored_song = dc.next_song;
pc.error = PLAYER_ERROR_FILE; pc.error = PLAYER_ERROR_FILE;
@ -185,6 +189,8 @@ player_check_decoder_startup(struct player *player)
} else if (!decoder_is_starting()) { } else if (!decoder_is_starting()) {
/* the decoder is ready and ok */ /* the decoder is ready and ok */
decoder_unlock();
if (audio_format_defined(&player->play_audio_format) && if (audio_format_defined(&player->play_audio_format) &&
!audio_output_all_wait(1)) !audio_output_all_wait(1))
/* the output devices havn't finished playing /* the output devices havn't finished playing
@ -219,6 +225,7 @@ player_check_decoder_startup(struct player *player)
} else { } else {
/* the decoder is not yet ready; wait /* the decoder is not yet ready; wait
some more */ some more */
decoder_unlock();
notify_wait(&pc.notify); notify_wait(&pc.notify);
return true; return true;
@ -512,13 +519,20 @@ play_next_chunk(struct player *player)
music_buffer_return(player_buffer, other_chunk); music_buffer_return(player_buffer, other_chunk);
} else { } else {
/* there are not enough decoded chunks yet */ /* there are not enough decoded chunks yet */
decoder_lock();
if (decoder_is_idle()) { if (decoder_is_idle()) {
/* the decoder isn't running, abort /* the decoder isn't running, abort
cross fading */ cross fading */
decoder_unlock();
player->xfade = XFADE_DISABLED; player->xfade = XFADE_DISABLED;
} else { } else {
/* wait for the decoder */ /* wait for the decoder */
notify_signal(&dc.notify); decoder_signal();
decoder_unlock();
notify_wait(&pc.notify); notify_wait(&pc.notify);
return true; return true;
@ -549,10 +563,12 @@ play_next_chunk(struct player *player)
/* this formula should prevent that the decoder gets woken up /* this formula should prevent that the decoder gets woken up
with each chunk; it is more efficient to make it decode a with each chunk; it is more efficient to make it decode a
larger block at a time */ larger block at a time */
decoder_lock();
if (!decoder_is_idle() && if (!decoder_is_idle() &&
music_pipe_size(dc.pipe) <= (pc.buffered_before_play + music_pipe_size(dc.pipe) <= (pc.buffered_before_play +
music_buffer_size(player_buffer) * 3) / 4) music_buffer_size(player_buffer) * 3) / 4)
notify_signal(&dc.notify); decoder_signal();
decoder_unlock();
return true; return true;
} }
@ -634,7 +650,7 @@ static void do_play(void)
prevent stuttering on slow machines */ prevent stuttering on slow machines */
if (music_pipe_size(player.pipe) < pc.buffered_before_play && if (music_pipe_size(player.pipe) < pc.buffered_before_play &&
!decoder_is_idle()) { !decoder_lock_is_idle()) {
/* not enough decoded buffer space yet */ /* not enough decoded buffer space yet */
if (!player.paused && if (!player.paused &&
@ -669,7 +685,7 @@ static void do_play(void)
*/ */
#endif #endif
if (decoder_is_idle() && player.queued) { if (decoder_lock_is_idle() && player.queued) {
/* the decoder has finished the current song; /* the decoder has finished the current song;
make it decode the next song */ make it decode the next song */
assert(pc.next_song != NULL); assert(pc.next_song != NULL);
@ -682,7 +698,7 @@ static void do_play(void)
if (dc.pipe != NULL && dc.pipe != player.pipe && if (dc.pipe != NULL && dc.pipe != player.pipe &&
player.xfade == XFADE_UNKNOWN && player.xfade == XFADE_UNKNOWN &&
!decoder_is_starting()) { !decoder_lock_is_starting()) {
/* enable cross fading in this song? if yes, /* enable cross fading in this song? if yes,
calculate how many chunks will be required calculate how many chunks will be required
for it */ for it */
@ -720,7 +736,7 @@ static void do_play(void)
if (!player_song_border(&player)) if (!player_song_border(&player))
break; break;
} else if (decoder_is_idle()) { } else if (decoder_lock_is_idle()) {
/* check the size of the pipe again, because /* check the size of the pipe again, because
the decoder thread may have added something the decoder thread may have added something
since we last checked */ since we last checked */