input/curl: per-request mutex/cond

The global data structures are now lock-free, because they are
accessed only from the I/O thread.  By using per-request locks, we
have finer grained locking, preparing for locks shared with the
client.
This commit is contained in:
Max Kellermann 2011-09-15 10:24:50 +02:00
parent 3a2ec50d5f
commit 8751783a1b

View File

@ -76,6 +76,9 @@ struct input_curl {
/** the curl handles */ /** the curl handles */
CURL *easy; CURL *easy;
GMutex *mutex;
GCond *cond;
/** the GMainLoop source used to poll all CURL file /** the GMainLoop source used to poll all CURL file
descriptors */ descriptors */
GSource *source; GSource *source;
@ -123,8 +126,6 @@ static const char *proxy, *proxy_user, *proxy_password;
static unsigned proxy_port; static unsigned proxy_port;
static struct { static struct {
GStaticMutex mutex;
GCond *cond;
CURLM *multi; CURLM *multi;
/** /**
@ -165,9 +166,7 @@ static struct {
*/ */
GTimeVal absolute_timeout; GTimeVal absolute_timeout;
#endif #endif
} curl = { } curl;
.mutex = G_STATIC_MUTEX_INIT,
};
static inline GQuark static inline GQuark
curl_quark(void) curl_quark(void)
@ -460,21 +459,21 @@ input_curl_abort_all_requests(GError *error)
assert(io_thread_inside()); assert(io_thread_inside());
assert(error != NULL); assert(error != NULL);
g_static_mutex_lock(&curl.mutex);
while (curl.requests != NULL) { while (curl.requests != NULL) {
struct input_curl *c = curl.requests->data; struct input_curl *c = curl.requests->data;
assert(c->postponed_error == NULL); assert(c->postponed_error == NULL);
input_curl_easy_free(c); input_curl_easy_free(c);
g_mutex_lock(c->mutex);
c->postponed_error = g_error_copy(error); c->postponed_error = g_error_copy(error);
c->base.ready = true; c->base.ready = true;
g_cond_broadcast(c->cond);
g_mutex_unlock(c->mutex);
} }
g_error_free(error); g_error_free(error);
g_cond_broadcast(curl.cond);
g_static_mutex_unlock(&curl.mutex);
} }
/** /**
@ -490,7 +489,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status)
assert(c->easy == NULL); assert(c->easy == NULL);
assert(c->postponed_error == NULL); assert(c->postponed_error == NULL);
g_static_mutex_lock(&curl.mutex); g_mutex_lock(c->mutex);
if (result != CURLE_OK) { if (result != CURLE_OK) {
c->postponed_error = g_error_new(curl_quark(), result, c->postponed_error = g_error_new(curl_quark(), result,
@ -503,8 +502,8 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status)
} }
c->base.ready = true; c->base.ready = true;
g_cond_broadcast(curl.cond); g_cond_broadcast(c->cond);
g_static_mutex_unlock(&curl.mutex); g_mutex_unlock(c->mutex);
} }
static void static void
@ -711,8 +710,6 @@ input_curl_init(const struct config_param *param,
return false; return false;
} }
curl.cond = g_cond_new();
curl.source = g_source_new(&curl_source_funcs, sizeof(*curl.source)); curl.source = g_source_new(&curl_source_funcs, sizeof(*curl.source));
curl.source_id = g_source_attach(curl.source, io_thread_context()); curl.source_id = g_source_attach(curl.source, io_thread_context());
@ -745,7 +742,6 @@ input_curl_finish(void)
io_thread_call(curl_destroy_sources, NULL); io_thread_call(curl_destroy_sources, NULL);
curl_multi_cleanup(curl.multi); curl_multi_cleanup(curl.multi);
g_cond_free(curl.cond);
curl_slist_free_all(http_200_aliases); curl_slist_free_all(http_200_aliases);
@ -809,6 +805,9 @@ input_curl_free(struct input_curl *c)
g_queue_free(c->buffers); g_queue_free(c->buffers);
g_mutex_free(c->mutex);
g_cond_free(c->cond);
g_free(c->url); g_free(c->url);
input_stream_deinit(&c->base); input_stream_deinit(&c->base);
g_free(c); g_free(c);
@ -828,7 +827,7 @@ static bool
fill_buffer(struct input_curl *c, GError **error_r) fill_buffer(struct input_curl *c, GError **error_r)
{ {
while (c->easy != NULL && g_queue_is_empty(c->buffers)) while (c->easy != NULL && g_queue_is_empty(c->buffers))
g_cond_wait(curl.cond, g_static_mutex_get_mutex(&curl.mutex)); g_cond_wait(c->cond, c->mutex);
if (c->postponed_error != NULL) { if (c->postponed_error != NULL) {
g_propagate_error(error_r, c->postponed_error); g_propagate_error(error_r, c->postponed_error);
@ -938,14 +937,14 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size,
size_t nbytes = 0; size_t nbytes = 0;
char *dest = ptr; char *dest = ptr;
g_static_mutex_lock(&curl.mutex); g_mutex_lock(c->mutex);
do { do {
/* fill the buffer */ /* fill the buffer */
success = fill_buffer(c, error_r); success = fill_buffer(c, error_r);
if (!success) { if (!success) {
g_static_mutex_unlock(&curl.mutex); g_mutex_unlock(c->mutex);
return 0; return 0;
} }
@ -970,7 +969,7 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size,
io_thread_call(input_curl_resume, c); io_thread_call(input_curl_resume, c);
#endif #endif
g_static_mutex_unlock(&curl.mutex); g_mutex_unlock(c->mutex);
return nbytes; return nbytes;
} }
@ -988,9 +987,9 @@ input_curl_eof(G_GNUC_UNUSED struct input_stream *is)
{ {
struct input_curl *c = (struct input_curl *)is; struct input_curl *c = (struct input_curl *)is;
g_static_mutex_lock(&curl.mutex); g_mutex_lock(c->mutex);
bool eof = c->easy == NULL && g_queue_is_empty(c->buffers); bool eof = c->easy == NULL && g_queue_is_empty(c->buffers);
g_static_mutex_unlock(&curl.mutex); g_mutex_unlock(c->mutex);
return eof; return eof;
} }
@ -1000,7 +999,7 @@ input_curl_buffer(struct input_stream *is, GError **error_r)
{ {
struct input_curl *c = (struct input_curl *)is; struct input_curl *c = (struct input_curl *)is;
g_static_mutex_lock(&curl.mutex); g_mutex_lock(c->mutex);
int result; int result;
if (c->postponed_error != NULL) { if (c->postponed_error != NULL) {
@ -1012,7 +1011,7 @@ input_curl_buffer(struct input_stream *is, GError **error_r)
else else
result = 1; result = 1;
g_static_mutex_unlock(&curl.mutex); g_mutex_unlock(c->mutex);
return result; return result;
} }
@ -1112,12 +1111,12 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
if (size == 0) if (size == 0)
return 0; return 0;
g_static_mutex_lock(&curl.mutex); g_mutex_lock(c->mutex);
#if LIBCURL_VERSION_NUM >= 0x071200 #if LIBCURL_VERSION_NUM >= 0x071200
if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) {
c->paused = true; c->paused = true;
g_static_mutex_unlock(&curl.mutex); g_mutex_unlock(c->mutex);
return CURL_WRITEFUNC_PAUSE; return CURL_WRITEFUNC_PAUSE;
} }
#endif #endif
@ -1131,8 +1130,8 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
c->base.ready = true; c->base.ready = true;
g_cond_broadcast(curl.cond); g_cond_broadcast(c->cond);
g_static_mutex_unlock(&curl.mutex); g_mutex_unlock(c->mutex);
return size; return size;
} }
@ -1239,7 +1238,7 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
/* check if we can fast-forward the buffer */ /* check if we can fast-forward the buffer */
g_static_mutex_lock(&curl.mutex); g_mutex_lock(c->mutex);
while (offset > is->offset && !g_queue_is_empty(c->buffers)) { while (offset > is->offset && !g_queue_is_empty(c->buffers)) {
struct buffer *buffer; struct buffer *buffer;
@ -1258,7 +1257,7 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
is->offset += length; is->offset += length;
} }
g_static_mutex_unlock(&curl.mutex); g_mutex_unlock(c->mutex);
if (offset == is->offset) if (offset == is->offset)
return true; return true;
@ -1292,19 +1291,19 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
if (!input_curl_easy_add_indirect(c, error_r)) if (!input_curl_easy_add_indirect(c, error_r))
return false; return false;
g_static_mutex_lock(&curl.mutex); g_mutex_lock(c->mutex);
while (!c->base.ready) while (!c->base.ready)
g_cond_wait(curl.cond, g_static_mutex_get_mutex(&curl.mutex)); g_cond_wait(c->cond, c->mutex);
if (c->postponed_error != NULL) { if (c->postponed_error != NULL) {
g_propagate_error(error_r, c->postponed_error); g_propagate_error(error_r, c->postponed_error);
c->postponed_error = NULL; c->postponed_error = NULL;
g_static_mutex_unlock(&curl.mutex); g_mutex_unlock(c->mutex);
return false; return false;
} }
g_static_mutex_unlock(&curl.mutex); g_mutex_unlock(c->mutex);
return true; return true;
} }
@ -1320,6 +1319,9 @@ input_curl_open(const char *url, GError **error_r)
c = g_new0(struct input_curl, 1); c = g_new0(struct input_curl, 1);
input_stream_init(&c->base, &input_plugin_curl, url); input_stream_init(&c->base, &input_plugin_curl, url);
c->mutex = g_mutex_new();
c->cond = g_cond_new();
c->url = g_strdup(url); c->url = g_strdup(url);
c->buffers = g_queue_new(); c->buffers = g_queue_new();