DecoderControl, InputStream: use Mutex/Cond instead of GMutex/GCond

This commit is contained in:
Max Kellermann
2013-01-27 17:20:50 +01:00
parent 257a0dee75
commit 6f3d70b5e2
46 changed files with 182 additions and 234 deletions

View File

@@ -36,7 +36,7 @@
*/
static struct input_stream *
input_archive_open(const char *pathname,
GMutex *mutex, GCond *cond,
Mutex &mutex, Cond &cond,
GError **error_r)
{
const struct archive_plugin *arplug;

View File

@@ -54,7 +54,7 @@ struct CdioParanoiaInputStream {
char buffer[CDIO_CD_FRAMESIZE_RAW];
int buffer_lsn;
CdioParanoiaInputStream(const char *uri, GMutex *mutex, GCond *cond,
CdioParanoiaInputStream(const char *uri, Mutex &mutex, Cond &cond,
int _trackno)
:drv(nullptr), cdio(nullptr), para(nullptr),
trackno(_trackno)
@@ -157,7 +157,7 @@ cdio_detect_device(void)
static struct input_stream *
input_cdio_open(const char *uri,
GMutex *mutex, GCond *cond,
Mutex &mutex, Cond &cond,
GError **error_r)
{
struct cdio_uri parsed_uri;

View File

@@ -165,7 +165,7 @@ struct input_curl {
GError *postponed_error;
input_curl(const char *url, GMutex *mutex, GCond *cond)
input_curl(const char *url, Mutex &mutex, Cond &cond)
:range(nullptr), request_headers(nullptr),
paused(false),
meta_name(nullptr),
@@ -462,11 +462,12 @@ input_curl_abort_all_requests(GError *error)
input_curl_easy_free(c);
g_mutex_lock(c->base.mutex);
const ScopeLock protect(*c->base.mutex);
c->postponed_error = g_error_copy(error);
c->base.ready = true;
g_cond_broadcast(c->base.cond);
g_mutex_unlock(c->base.mutex);
c->base.cond->broadcast();
}
g_error_free(error);
@@ -486,7 +487,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status)
assert(c->easy == NULL);
assert(c->postponed_error == NULL);
g_mutex_lock(c->base.mutex);
const ScopeLock protect(*c->base.mutex);
if (result != CURLE_OK) {
c->postponed_error = g_error_new(curl_quark(), result,
@@ -499,8 +500,8 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status)
}
c->base.ready = true;
g_cond_broadcast(c->base.cond);
g_mutex_unlock(c->base.mutex);
c->base.cond->broadcast();
}
static void
@@ -736,7 +737,7 @@ static bool
fill_buffer(struct input_curl *c, GError **error_r)
{
while (c->easy != NULL && c->buffers.empty())
g_cond_wait(c->base.cond, c->base.mutex);
c->base.cond->wait(*c->base.mutex);
if (c->postponed_error != NULL) {
g_propagate_error(error_r, c->postponed_error);
@@ -856,9 +857,9 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size,
is->offset += (goffset)nbytes;
if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) {
g_mutex_unlock(c->base.mutex);
c->base.mutex->unlock();
io_thread_call(input_curl_resume, c);
g_mutex_lock(c->base.mutex);
c->base.mutex->lock();
}
return nbytes;
@@ -975,20 +976,17 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
if (size == 0)
return 0;
g_mutex_lock(c->base.mutex);
const ScopeLock protect(*c->base.mutex);
if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) {
c->paused = true;
g_mutex_unlock(c->base.mutex);
return CURL_WRITEFUNC_PAUSE;
}
c->buffers.emplace_back(ptr, size);
c->base.ready = true;
g_cond_broadcast(c->base.cond);
g_mutex_unlock(c->base.mutex);
c->base.cond->broadcast();
return size;
}
@@ -1112,7 +1110,7 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
/* close the old connection and open a new one */
g_mutex_unlock(c->base.mutex);
c->base.mutex->unlock();
input_curl_easy_free_indirect(c);
c->buffers.clear();
@@ -1141,10 +1139,10 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
if (!input_curl_easy_add_indirect(c, error_r))
return false;
g_mutex_lock(c->base.mutex);
c->base.mutex->lock();
while (!c->base.ready)
g_cond_wait(c->base.cond, c->base.mutex);
c->base.cond->wait(*c->base.mutex);
if (c->postponed_error != NULL) {
g_propagate_error(error_r, c->postponed_error);
@@ -1156,12 +1154,9 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
}
static struct input_stream *
input_curl_open(const char *url, GMutex *mutex, GCond *cond,
input_curl_open(const char *url, Mutex &mutex, Cond &cond,
GError **error_r)
{
assert(mutex != NULL);
assert(cond != NULL);
if (strncmp(url, "http://", 7) != 0)
return NULL;

View File

@@ -102,7 +102,7 @@ static void callback(G_GNUC_UNUSED struct despotify_session* ds,
static struct input_stream *
input_despotify_open(const char *url,
GMutex *mutex, GCond *cond,
Mutex &mutex, Cond &cond,
G_GNUC_UNUSED GError **error_r)
{
struct input_despotify *ctx;

View File

@@ -82,7 +82,7 @@ input_ffmpeg_init(G_GNUC_UNUSED const struct config_param *param,
static struct input_stream *
input_ffmpeg_open(const char *uri,
GMutex *mutex, GCond *cond,
Mutex &mutex, Cond &cond,
GError **error_r)
{
struct input_ffmpeg *i;

View File

@@ -41,7 +41,7 @@ struct FileInputStream {
int fd;
FileInputStream(const char *path, int _fd, off_t size,
GMutex *mutex, GCond *cond)
Mutex &mutex, Cond &cond)
:fd(_fd) {
input_stream_init(&base, &input_plugin_file, path,
mutex, cond);
@@ -59,7 +59,7 @@ struct FileInputStream {
static struct input_stream *
input_file_open(const char *filename,
GMutex *mutex, GCond *cond,
Mutex &mutex, Cond &cond,
GError **error_r)
{
int fd, ret;

View File

@@ -40,7 +40,7 @@ struct MmsInputStream {
bool eof;
MmsInputStream(const char *uri,
GMutex *mutex, GCond *cond,
Mutex &mutex, Cond &cond,
mmsx_t *_mms)
:mms(_mms), eof(false) {
input_stream_init(&base, &input_plugin_mms, uri, mutex, cond);
@@ -66,7 +66,7 @@ mms_quark(void)
static struct input_stream *
input_mms_open(const char *url,
GMutex *mutex, GCond *cond,
Mutex &mutex, Cond &cond,
GError **error_r)
{
if (!g_str_has_prefix(url, "mms://") &&

View File

@@ -63,7 +63,7 @@ struct RewindInputStream {
RewindInputStream(input_stream *_input)
:input(_input), tail(0) {
input_stream_init(&base, &rewind_input_plugin, input->uri,
input->mutex, input->cond);
*input->mutex, *input->cond);
}
~RewindInputStream() {

View File

@@ -165,7 +165,7 @@ input_soup_session_callback(G_GNUC_UNUSED SoupSession *session,
assert(msg == s->msg);
assert(!s->completed);
g_mutex_lock(s->base.mutex);
const ScopeLock protect(*s->base.mutex);
if (!s->base.ready)
input_soup_copy_error(s, msg);
@@ -174,8 +174,7 @@ input_soup_session_callback(G_GNUC_UNUSED SoupSession *session,
s->alive = false;
s->completed = true;
g_cond_broadcast(s->base.cond);
g_mutex_unlock(s->base.mutex);
s->base.cond->broadcast();
}
static void
@@ -183,10 +182,10 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data)
{
struct input_soup *s = (struct input_soup *)user_data;
g_mutex_lock(s->base.mutex);
s->base.mutex->lock();
if (!input_soup_copy_error(s, msg)) {
g_mutex_unlock(s->base.mutex);
s->base.mutex->unlock();
soup_session_cancel_message(soup_session, msg,
SOUP_STATUS_CANCELLED);
@@ -194,8 +193,8 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data)
}
s->base.ready = true;
g_cond_broadcast(s->base.cond);
g_mutex_unlock(s->base.mutex);
s->base.cond->broadcast();
s->base.mutex->unlock();
soup_message_body_set_accumulate(msg->response_body, false);
}
@@ -207,7 +206,7 @@ input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data)
assert(msg == s->msg);
g_mutex_lock(s->base.mutex);
const ScopeLock protect(*s->base.mutex);
g_queue_push_tail(s->buffers, soup_buffer_copy(chunk));
s->total_buffered += chunk->length;
@@ -217,8 +216,8 @@ input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data)
soup_session_pause_message(soup_session, msg);
}
g_cond_broadcast(s->base.cond);
g_mutex_unlock(s->base.mutex);
s->base.cond->broadcast();
s->base.mutex->unlock();
}
static void
@@ -228,14 +227,14 @@ input_soup_got_body(G_GNUC_UNUSED SoupMessage *msg, gpointer user_data)
assert(msg == s->msg);
g_mutex_lock(s->base.mutex);
const ScopeLock protect(*s->base.mutex);
s->base.ready = true;
s->eof = true;
s->alive = false;
g_cond_broadcast(s->base.cond);
g_mutex_unlock(s->base.mutex);
s->base.cond->broadcast();
s->base.mutex->unlock();
}
static bool
@@ -253,7 +252,7 @@ input_soup_wait_data(struct input_soup *s)
assert(s->current_consumed == 0);
g_cond_wait(s->base.cond, s->base.mutex);
s->base.cond->wait(*s->base.mutex);
}
}
@@ -270,7 +269,7 @@ input_soup_queue(gpointer data)
static struct input_stream *
input_soup_open(const char *uri,
GMutex *mutex, GCond *cond,
Mutex &mutex, Cond &cond,
G_GNUC_UNUSED GError **error_r)
{
if (strncmp(uri, "http://", 7) != 0)
@@ -338,22 +337,22 @@ input_soup_close(struct input_stream *is)
{
struct input_soup *s = (struct input_soup *)is;
g_mutex_lock(s->base.mutex);
s->base.mutex->lock();
if (!s->completed) {
/* the messages's session callback hasn't been invoked
yet; cancel it and wait for completion */
g_mutex_unlock(s->base.mutex);
s->base.mutex->unlock();
io_thread_call(input_soup_cancel, s);
g_mutex_lock(s->base.mutex);
s->base.mutex->lock();
while (!s->completed)
g_cond_wait(s->base.cond, s->base.mutex);
s->base.cond->wait(*s->base.mutex);
}
g_mutex_unlock(s->base.mutex);
s->base.mutex->unlock();
SoupBuffer *buffer;
while ((buffer = (SoupBuffer *)g_queue_pop_head(s->buffers)) != NULL)