From 3a2ec50d5f5a23c808c9168a9e1d3c431fdfa556 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Thu, 15 Sep 2011 07:43:35 +0200 Subject: [PATCH] input/curl: move all libCURL calls to the I/O thread This adds some overheads for indirect calls to the I/O thread, but reduces the amount of global locks. Next step will be switching to per-request locks. --- src/input/curl_input_plugin.c | 98 ++++++++++++++++++++++++----------- 1 file changed, 69 insertions(+), 29 deletions(-) diff --git a/src/input/curl_input_plugin.c b/src/input/curl_input_plugin.c index 1969ceb7e..c377b6426 100644 --- a/src/input/curl_input_plugin.c +++ b/src/input/curl_input_plugin.c @@ -178,11 +178,13 @@ curl_quark(void) /** * Find a request by its CURL "easy" handle. * - * The caller must lock the mutex. + * Runs in the I/O thread. No lock needed. */ static struct input_curl * input_curl_find_request(CURL *easy) { + assert(io_thread_inside()); + for (GSList *i = curl.requests; i != NULL; i = g_slist_next(i)) { struct input_curl *c = i->data; if (c->easy == easy) @@ -197,6 +199,8 @@ input_curl_find_request(CURL *easy) static gpointer input_curl_resume(gpointer data) { + assert(io_thread_inside()); + struct input_curl *c = data; if (c->paused) { @@ -240,7 +244,7 @@ input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) * Updates all registered GPollFD objects, unregisters old ones, * registers new ones. * - * The caller must lock the mutex. Runs in the I/O thread. + * Runs in the I/O thread. No lock needed. */ static void curl_update_fds(void) @@ -308,14 +312,11 @@ static gboolean input_curl_dirty_callback(G_GNUC_UNUSED gpointer data) { assert(io_thread_inside()); - g_static_mutex_lock(&curl.mutex); - assert(curl.dirty_source_id != 0 || curl.requests == NULL); curl.dirty_source_id = 0; curl_update_fds(); - g_static_mutex_unlock(&curl.mutex); return false; } @@ -323,7 +324,7 @@ input_curl_dirty_callback(G_GNUC_UNUSED gpointer data) * Schedule a refresh of curl.fds. Does nothing if that is already * scheduled. * - * The caller must lock the mutex. + * No lock needed. */ static void input_curl_schedule_update(void) @@ -336,9 +337,13 @@ input_curl_schedule_update(void) io_thread_idle_add(input_curl_dirty_callback, NULL); } +/** + * Runs in the I/O thread. No lock needed. + */ static bool input_curl_easy_add(struct input_curl *c, GError **error_r) { + assert(io_thread_inside()); assert(c != NULL); assert(c->easy != NULL); assert(input_curl_find_request(c->easy) == NULL); @@ -358,15 +363,50 @@ input_curl_easy_add(struct input_curl *c, GError **error_r) return true; } +struct easy_add_params { + struct input_curl *c; + GError **error_r; +}; + +static gpointer +input_curl_easy_add_callback(gpointer data) +{ + const struct easy_add_params *params = data; + + bool success = input_curl_easy_add(params->c, params->error_r); + return GUINT_TO_POINTER(success); +} + +/** + * Call input_curl_easy_add() in the I/O thread. May be called from + * any thread. Caller must not hold a mutex. + */ +static bool +input_curl_easy_add_indirect(struct input_curl *c, GError **error_r) +{ + assert(c != NULL); + assert(c->easy != NULL); + + struct easy_add_params params = { + .c = c, + .error_r = error_r, + }; + + gpointer result = + io_thread_call(input_curl_easy_add_callback, ¶ms); + return GPOINTER_TO_UINT(result); +} + /** * Frees the current "libcurl easy" handle, and everything associated * with it. * - * The caller must lock the mutex. + * Runs in the I/O thread. */ static void input_curl_easy_free(struct input_curl *c) { + assert(io_thread_inside()); assert(c != NULL); if (c->easy == NULL) @@ -390,13 +430,9 @@ input_curl_easy_free_callback(gpointer data) { struct input_curl *c = data; - g_static_mutex_lock(&curl.mutex); - input_curl_easy_free(c); curl_update_fds(); - g_static_mutex_unlock(&curl.mutex); - return NULL; } @@ -416,7 +452,7 @@ input_curl_easy_free_indirect(struct input_curl *c) /** * Abort and free all HTTP requests. * - * The caller must lock the mutex. Runs in the I/O thread. + * Runs in the I/O thread. The caller must not hold locks. */ static void input_curl_abort_all_requests(GError *error) @@ -424,6 +460,8 @@ input_curl_abort_all_requests(GError *error) assert(io_thread_inside()); assert(error != NULL); + g_static_mutex_lock(&curl.mutex); + while (curl.requests != NULL) { struct input_curl *c = curl.requests->data; assert(c->postponed_error == NULL); @@ -436,12 +474,13 @@ input_curl_abort_all_requests(GError *error) g_error_free(error); g_cond_broadcast(curl.cond); + g_static_mutex_unlock(&curl.mutex); } /** * A HTTP request is finished. * - * The caller must lock the mutex. Runs in the I/O thread. + * Runs in the I/O thread. The caller must not hold locks. */ static void input_curl_request_done(struct input_curl *c, CURLcode result, long status) @@ -451,6 +490,8 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status) assert(c->easy == NULL); assert(c->postponed_error == NULL); + g_static_mutex_lock(&curl.mutex); + if (result != CURLE_OK) { c->postponed_error = g_error_new(curl_quark(), result, "curl failed: %s", @@ -463,6 +504,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status) c->base.ready = true; g_cond_broadcast(curl.cond); + g_static_mutex_unlock(&curl.mutex); } static void @@ -481,7 +523,7 @@ input_curl_handle_done(CURL *easy_handle, CURLcode result) /** * Check for finished HTTP responses. * - * The caller must lock the mutex. Runs in the I/O thread. + * Runs in the I/O thread. The caller must not hold locks. */ static void input_curl_info_read(void) @@ -501,7 +543,7 @@ input_curl_info_read(void) /** * Give control to CURL. * - * The caller must lock the mutex. Runs in the I/O thread. + * Runs in the I/O thread. The caller must not hold locks. */ static bool input_curl_perform(void) @@ -611,13 +653,9 @@ input_curl_source_dispatch(G_GNUC_UNUSED GSource *source, G_GNUC_UNUSED GSourceFunc callback, G_GNUC_UNUSED gpointer user_data) { - g_static_mutex_lock(&curl.mutex); - if (input_curl_perform()) input_curl_info_read(); - g_static_mutex_unlock(&curl.mutex); - return true; } @@ -1074,9 +1112,12 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) if (size == 0) return 0; + g_static_mutex_lock(&curl.mutex); + #if LIBCURL_VERSION_NUM >= 0x071200 if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { c->paused = true; + g_static_mutex_unlock(&curl.mutex); return CURL_WRITEFUNC_PAUSE; } #endif @@ -1091,6 +1132,7 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) c->base.ready = true; g_cond_broadcast(curl.cond); + g_static_mutex_unlock(&curl.mutex); return size; } @@ -1197,6 +1239,8 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, /* check if we can fast-forward the buffer */ + g_static_mutex_lock(&curl.mutex); + while (offset > is->offset && !g_queue_is_empty(c->buffers)) { struct buffer *buffer; size_t length; @@ -1214,6 +1258,8 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, is->offset += length; } + g_static_mutex_unlock(&curl.mutex); + if (offset == is->offset) return true; @@ -1241,14 +1287,12 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, curl_easy_setopt(c->easy, CURLOPT_RANGE, c->range); } - g_static_mutex_lock(&curl.mutex); - c->base.ready = false; - if (!input_curl_easy_add(c, error_r)) { - g_static_mutex_unlock(&curl.mutex); + if (!input_curl_easy_add_indirect(c, error_r)) return false; - } + + g_static_mutex_lock(&curl.mutex); while (!c->base.ready) g_cond_wait(curl.cond, g_static_mutex_get_mutex(&curl.mutex)); @@ -1291,15 +1335,11 @@ input_curl_open(const char *url, GError **error_r) return NULL; } - g_static_mutex_lock(&curl.mutex); - if (!input_curl_easy_add(c, error_r)) { - g_static_mutex_unlock(&curl.mutex); + if (!input_curl_easy_add_indirect(c, error_r)) { input_curl_free(c); return NULL; } - g_static_mutex_unlock(&curl.mutex); - return &c->base; }