From 8751783a1b4fe8c61ac63b718be1df5f1d77d339 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Thu, 15 Sep 2011 10:24:50 +0200 Subject: [PATCH] input/curl: per-request mutex/cond The global data structures are now lock-free, because they are accessed only from the I/O thread. By using per-request locks, we have finer grained locking, preparing for locks shared with the client. --- src/input/curl_input_plugin.c | 68 ++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/src/input/curl_input_plugin.c b/src/input/curl_input_plugin.c index c377b6426..d4c116136 100644 --- a/src/input/curl_input_plugin.c +++ b/src/input/curl_input_plugin.c @@ -76,6 +76,9 @@ struct input_curl { /** the curl handles */ CURL *easy; + GMutex *mutex; + GCond *cond; + /** the GMainLoop source used to poll all CURL file descriptors */ GSource *source; @@ -123,8 +126,6 @@ static const char *proxy, *proxy_user, *proxy_password; static unsigned proxy_port; static struct { - GStaticMutex mutex; - GCond *cond; CURLM *multi; /** @@ -165,9 +166,7 @@ static struct { */ GTimeVal absolute_timeout; #endif -} curl = { - .mutex = G_STATIC_MUTEX_INIT, -}; +} curl; static inline GQuark curl_quark(void) @@ -460,21 +459,21 @@ input_curl_abort_all_requests(GError *error) assert(io_thread_inside()); assert(error != NULL); - g_static_mutex_lock(&curl.mutex); - while (curl.requests != NULL) { struct input_curl *c = curl.requests->data; assert(c->postponed_error == NULL); input_curl_easy_free(c); + + g_mutex_lock(c->mutex); c->postponed_error = g_error_copy(error); c->base.ready = true; + g_cond_broadcast(c->cond); + g_mutex_unlock(c->mutex); } g_error_free(error); - g_cond_broadcast(curl.cond); - g_static_mutex_unlock(&curl.mutex); } /** @@ -490,7 +489,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status) assert(c->easy == NULL); assert(c->postponed_error == NULL); - g_static_mutex_lock(&curl.mutex); + g_mutex_lock(c->mutex); if (result != CURLE_OK) { c->postponed_error = g_error_new(curl_quark(), result, @@ -503,8 +502,8 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status) } c->base.ready = true; - g_cond_broadcast(curl.cond); - g_static_mutex_unlock(&curl.mutex); + g_cond_broadcast(c->cond); + g_mutex_unlock(c->mutex); } static void @@ -711,8 +710,6 @@ input_curl_init(const struct config_param *param, return false; } - curl.cond = g_cond_new(); - curl.source = g_source_new(&curl_source_funcs, sizeof(*curl.source)); curl.source_id = g_source_attach(curl.source, io_thread_context()); @@ -745,7 +742,6 @@ input_curl_finish(void) io_thread_call(curl_destroy_sources, NULL); curl_multi_cleanup(curl.multi); - g_cond_free(curl.cond); curl_slist_free_all(http_200_aliases); @@ -809,6 +805,9 @@ input_curl_free(struct input_curl *c) g_queue_free(c->buffers); + g_mutex_free(c->mutex); + g_cond_free(c->cond); + g_free(c->url); input_stream_deinit(&c->base); g_free(c); @@ -828,7 +827,7 @@ static bool fill_buffer(struct input_curl *c, GError **error_r) { while (c->easy != NULL && g_queue_is_empty(c->buffers)) - g_cond_wait(curl.cond, g_static_mutex_get_mutex(&curl.mutex)); + g_cond_wait(c->cond, c->mutex); if (c->postponed_error != NULL) { g_propagate_error(error_r, c->postponed_error); @@ -938,14 +937,14 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, size_t nbytes = 0; char *dest = ptr; - g_static_mutex_lock(&curl.mutex); + g_mutex_lock(c->mutex); do { /* fill the buffer */ success = fill_buffer(c, error_r); if (!success) { - g_static_mutex_unlock(&curl.mutex); + g_mutex_unlock(c->mutex); return 0; } @@ -970,7 +969,7 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, io_thread_call(input_curl_resume, c); #endif - g_static_mutex_unlock(&curl.mutex); + g_mutex_unlock(c->mutex); return nbytes; } @@ -988,9 +987,9 @@ input_curl_eof(G_GNUC_UNUSED struct input_stream *is) { struct input_curl *c = (struct input_curl *)is; - g_static_mutex_lock(&curl.mutex); + g_mutex_lock(c->mutex); bool eof = c->easy == NULL && g_queue_is_empty(c->buffers); - g_static_mutex_unlock(&curl.mutex); + g_mutex_unlock(c->mutex); return eof; } @@ -1000,7 +999,7 @@ input_curl_buffer(struct input_stream *is, GError **error_r) { struct input_curl *c = (struct input_curl *)is; - g_static_mutex_lock(&curl.mutex); + g_mutex_lock(c->mutex); int result; if (c->postponed_error != NULL) { @@ -1012,7 +1011,7 @@ input_curl_buffer(struct input_stream *is, GError **error_r) else result = 1; - g_static_mutex_unlock(&curl.mutex); + g_mutex_unlock(c->mutex); return result; } @@ -1112,12 +1111,12 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) if (size == 0) return 0; - g_static_mutex_lock(&curl.mutex); + g_mutex_lock(c->mutex); #if LIBCURL_VERSION_NUM >= 0x071200 if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { c->paused = true; - g_static_mutex_unlock(&curl.mutex); + g_mutex_unlock(c->mutex); return CURL_WRITEFUNC_PAUSE; } #endif @@ -1131,8 +1130,8 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) c->base.ready = true; - g_cond_broadcast(curl.cond); - g_static_mutex_unlock(&curl.mutex); + g_cond_broadcast(c->cond); + g_mutex_unlock(c->mutex); return size; } @@ -1239,7 +1238,7 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, /* check if we can fast-forward the buffer */ - g_static_mutex_lock(&curl.mutex); + g_mutex_lock(c->mutex); while (offset > is->offset && !g_queue_is_empty(c->buffers)) { struct buffer *buffer; @@ -1258,7 +1257,7 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, is->offset += length; } - g_static_mutex_unlock(&curl.mutex); + g_mutex_unlock(c->mutex); if (offset == is->offset) return true; @@ -1292,19 +1291,19 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, if (!input_curl_easy_add_indirect(c, error_r)) return false; - g_static_mutex_lock(&curl.mutex); + g_mutex_lock(c->mutex); while (!c->base.ready) - g_cond_wait(curl.cond, g_static_mutex_get_mutex(&curl.mutex)); + g_cond_wait(c->cond, c->mutex); if (c->postponed_error != NULL) { g_propagate_error(error_r, c->postponed_error); c->postponed_error = NULL; - g_static_mutex_unlock(&curl.mutex); + g_mutex_unlock(c->mutex); return false; } - g_static_mutex_unlock(&curl.mutex); + g_mutex_unlock(c->mutex); return true; } @@ -1320,6 +1319,9 @@ input_curl_open(const char *url, GError **error_r) c = g_new0(struct input_curl, 1); input_stream_init(&c->base, &input_plugin_curl, url); + c->mutex = g_mutex_new(); + c->cond = g_cond_new(); + c->url = g_strdup(url); c->buffers = g_queue_new();