input_stream: non-blocking I/O

Add GMutex, GCond attributes which will be used by callers to
conditionally wait on the stream.

Remove the (now-useless) plugin method buffer(), wait on GCond
instead.  Lock the input_stream before each method call.  Do the same
with the playlist plugins.
This commit is contained in:
Max Kellermann
2011-09-14 21:46:41 +02:00
parent 29241c4f83
commit 754f26a97c
57 changed files with 655 additions and 379 deletions

View File

@@ -53,7 +53,7 @@ audiofile_file_read(AFvirtualfile *vfile, void *data, size_t length)
GError *error = NULL;
size_t nbytes;
nbytes = input_stream_read(is, data, length, &error);
nbytes = input_stream_lock_read(is, data, length, &error);
if (nbytes == 0 && error != NULL) {
g_warning("%s", error->message);
g_error_free(error);
@@ -90,7 +90,7 @@ audiofile_file_seek(AFvirtualfile *vfile, long offset, int is_relative)
{
struct input_stream *is = (struct input_stream *) vfile->closure;
int whence = (is_relative ? SEEK_CUR : SEEK_SET);
if (input_stream_seek(is, offset, whence, NULL)) {
if (input_stream_lock_seek(is, offset, whence, NULL)) {
return is->offset;
} else {
return -1;

View File

@@ -205,7 +205,7 @@ faad_song_duration(struct decoder_buffer *buffer, struct input_stream *is)
/* obtain the duration from the ADTS header */
float song_length = adts_song_duration(buffer);
input_stream_seek(is, tagsize, SEEK_SET, NULL);
input_stream_lock_seek(is, tagsize, SEEK_SET, NULL);
data = decoder_buffer_read(buffer, &length);
if (data != NULL)
@@ -406,7 +406,7 @@ faad_stream_decode(struct decoder *mpd_decoder, struct input_stream *is)
faacDecSetConfiguration(decoder, config);
while (!decoder_buffer_is_full(buffer) &&
!input_stream_eof(is) &&
!input_stream_lock_eof(is) &&
decoder_get_command(mpd_decoder) == DECODE_COMMAND_NONE) {
adts_find_frame(buffer);
decoder_buffer_fill(buffer);

View File

@@ -105,7 +105,7 @@ mpd_ffmpeg_stream_seek(void *opaque, int64_t pos, int whence)
if (whence == AVSEEK_SIZE)
return stream->input->size;
if (!input_stream_seek(stream->input, pos, whence, NULL))
if (!input_stream_lock_seek(stream->input, pos, whence, NULL))
return -1;
return stream->input->offset;
@@ -320,7 +320,8 @@ ffmpeg_probe(struct decoder *decoder, struct input_stream *is)
unsigned char *buffer = g_malloc(BUFFER_SIZE);
size_t nbytes = decoder_read(decoder, is, buffer, BUFFER_SIZE);
if (nbytes <= PADDING || !input_stream_seek(is, 0, SEEK_SET, NULL)) {
if (nbytes <= PADDING ||
!input_stream_lock_seek(is, 0, SEEK_SET, NULL)) {
g_free(buffer);
return NULL;
}

View File

@@ -50,7 +50,7 @@ flac_read_cb(G_GNUC_UNUSED const FLAC__StreamDecoder *fd,
if (r == 0) {
if (decoder_get_command(data->decoder) != DECODE_COMMAND_NONE ||
input_stream_eof(data->input_stream))
input_stream_lock_eof(data->input_stream))
return FLAC__STREAM_DECODER_READ_STATUS_END_OF_STREAM;
else
return FLAC__STREAM_DECODER_READ_STATUS_ABORT;
@@ -68,7 +68,8 @@ flac_seek_cb(G_GNUC_UNUSED const FLAC__StreamDecoder *fd,
if (!data->input_stream->seekable)
return FLAC__STREAM_DECODER_SEEK_STATUS_UNSUPPORTED;
if (!input_stream_seek(data->input_stream, offset, SEEK_SET, NULL))
if (!input_stream_lock_seek(data->input_stream, offset, SEEK_SET,
NULL))
return FLAC__STREAM_DECODER_SEEK_STATUS_ERROR;
return FLAC__STREAM_DECODER_SEEK_STATUS_OK;
@@ -109,7 +110,7 @@ flac_eof_cb(G_GNUC_UNUSED const FLAC__StreamDecoder *fd, void *fdata)
return (decoder_get_command(data->decoder) != DECODE_COMMAND_NONE &&
decoder_get_command(data->decoder) != DECODE_COMMAND_SEEK) ||
input_stream_eof(data->input_stream);
input_stream_lock_eof(data->input_stream);
}
static void
@@ -449,7 +450,7 @@ oggflac_decode(struct decoder *decoder, struct input_stream *input_stream)
/* rewind the stream, because ogg_stream_type_detect() has
moved it */
input_stream_seek(input_stream, 0, SEEK_SET, NULL);
input_stream_lock_seek(input_stream, 0, SEEK_SET, NULL);
flac_decode_internal(decoder, input_stream, true);
}

View File

@@ -168,7 +168,7 @@ mp3_data_init(struct mp3_data *data, struct decoder *decoder,
static bool mp3_seek(struct mp3_data *data, long offset)
{
if (!input_stream_seek(data->input_stream, offset, SEEK_SET, NULL))
if (!input_stream_lock_seek(data->input_stream, offset, SEEK_SET, NULL))
return false;
mad_stream_buffer(&data->stream, data->input_buffer, 0);

View File

@@ -62,7 +62,7 @@ static GByteArray *mod_loadfile(struct decoder *decoder, struct input_stream *is
while (true) {
ret = decoder_read(decoder, is, data, MODPLUG_READ_BLOCK);
if (ret == 0) {
if (input_stream_eof(is))
if (input_stream_lock_eof(is))
/* end of file */
break;

View File

@@ -102,7 +102,8 @@ mp4_seek(void *user_data, uint64_t position)
{
struct mp4ff_input_stream *mis = user_data;
return input_stream_seek(mis->input_stream, position, SEEK_SET, NULL)
return input_stream_lock_seek(mis->input_stream, position, SEEK_SET,
NULL)
? 0 : -1;
}

View File

@@ -61,7 +61,7 @@ mpc_seek_cb(cb_first_arg, mpc_int32_t offset)
{
struct mpc_decoder_data *data = (struct mpc_decoder_data *) cb_data;
return input_stream_seek(data->is, offset, SEEK_SET, NULL);
return input_stream_lock_seek(data->is, offset, SEEK_SET, NULL);
}
static mpc_int32_t

View File

@@ -52,7 +52,7 @@ pcm_stream_decode(struct decoder *decoder, struct input_stream *is)
size_t nbytes = decoder_read(decoder, is,
buffer, sizeof(buffer));
if (nbytes == 0 && input_stream_eof(is))
if (nbytes == 0 && input_stream_lock_eof(is))
break;
cmd = nbytes > 0
@@ -62,7 +62,8 @@ pcm_stream_decode(struct decoder *decoder, struct input_stream *is)
if (cmd == DECODE_COMMAND_SEEK) {
goffset offset = (goffset)(time_to_size *
decoder_seek_where(decoder));
if (input_stream_seek(is, offset, SEEK_SET, &error)) {
if (input_stream_lock_seek(is, offset, SEEK_SET,
&error)) {
decoder_command_finished(decoder);
} else {
g_warning("seeking failed: %s", error->message);

View File

@@ -40,7 +40,7 @@ sndfile_vio_seek(sf_count_t offset, int whence, void *user_data)
struct input_stream *is = user_data;
bool success;
success = input_stream_seek(is, offset, whence, NULL);
success = input_stream_lock_seek(is, offset, whence, NULL);
if (!success)
return -1;
@@ -54,7 +54,7 @@ sndfile_vio_read(void *ptr, sf_count_t count, void *user_data)
GError *error = NULL;
size_t nbytes;
nbytes = input_stream_read(is, ptr, count, &error);
nbytes = input_stream_lock_read(is, ptr, count, &error);
if (nbytes == 0 && error != NULL) {
g_warning("%s", error->message);
g_error_free(error);

View File

@@ -80,7 +80,7 @@ static int ogg_seek_cb(void *data, ogg_int64_t offset, int whence)
return vis->seekable &&
(!vis->decoder || decoder_get_command(vis->decoder) != DECODE_COMMAND_STOP) &&
input_stream_seek(vis->input_stream, offset, whence, NULL)
input_stream_lock_seek(vis->input_stream, offset, whence, NULL)
? 0 : -1;
}
@@ -290,7 +290,7 @@ vorbis_stream_decode(struct decoder *decoder,
/* rewind the stream, because ogg_stream_type_detect() has
moved it */
input_stream_seek(input_stream, 0, SEEK_SET, NULL);
input_stream_lock_seek(input_stream, 0, SEEK_SET, NULL);
if (!vorbis_is_open(&vis, &vf, decoder, input_stream))
return;

View File

@@ -390,13 +390,15 @@ wavpack_input_get_pos(void *id)
static int
wavpack_input_set_pos_abs(void *id, uint32_t pos)
{
return input_stream_seek(wpin(id)->is, pos, SEEK_SET, NULL) ? 0 : -1;
return input_stream_lock_seek(wpin(id)->is, pos, SEEK_SET, NULL)
? 0 : -1;
}
static int
wavpack_input_set_pos_rel(void *id, int32_t delta, int mode)
{
return input_stream_seek(wpin(id)->is, delta, mode, NULL) ? 0 : -1;
return input_stream_lock_seek(wpin(id)->is, delta, mode, NULL)
? 0 : -1;
}
static int
@@ -447,6 +449,7 @@ wavpack_input_init(struct wavpack_input *isp, struct decoder *decoder,
static struct input_stream *
wavpack_open_wvc(struct decoder *decoder, const char *uri,
GMutex *mutex, GCond *cond,
struct wavpack_input *wpi)
{
struct input_stream *is_wvc;
@@ -462,7 +465,7 @@ wavpack_open_wvc(struct decoder *decoder, const char *uri,
return false;
wvc_url = g_strconcat(uri, "c", NULL);
is_wvc = input_stream_open(wvc_url, NULL);
is_wvc = input_stream_open(wvc_url, mutex, cond, NULL);
g_free(wvc_url);
if (is_wvc == NULL)
@@ -499,7 +502,8 @@ wavpack_streamdecode(struct decoder * decoder, struct input_stream *is)
struct wavpack_input isp, isp_wvc;
bool can_seek = is->seekable;
is_wvc = wavpack_open_wvc(decoder, is->uri, &isp_wvc);
is_wvc = wavpack_open_wvc(decoder, is->uri, is->mutex, is->cond,
&isp_wvc);
if (is_wvc != NULL) {
open_flags |= OPEN_WVC;
can_seek &= is_wvc->seekable;