diff --git a/src/DecoderAPI.cxx b/src/DecoderAPI.cxx index 857ee72d1..29b39688f 100644 --- a/src/DecoderAPI.cxx +++ b/src/DecoderAPI.cxx @@ -285,7 +285,7 @@ size_t decoder_read(struct decoder *decoder, if (input_stream_available(is)) break; - is->cond->wait(*is->mutex); + is->cond.wait(is->mutex); } nbytes = input_stream_read(is, buffer, length, &error); diff --git a/src/InputInternal.cxx b/src/InputInternal.cxx index a154b68da..e110ebf0a 100644 --- a/src/InputInternal.cxx +++ b/src/InputInternal.cxx @@ -24,14 +24,13 @@ void input_stream_signal_client(struct input_stream *is) { - if (is->cond != NULL) - is->cond->broadcast(); + is->cond.broadcast(); } void input_stream_set_ready(struct input_stream *is) { - const ScopeLock protect(*is->mutex); + const ScopeLock protect(is->mutex); if (!is->ready) { is->ready = true; diff --git a/src/InputStream.cxx b/src/InputStream.cxx index c9d7f2123..ef77a468d 100644 --- a/src/InputStream.cxx +++ b/src/InputStream.cxx @@ -50,11 +50,10 @@ input_stream_open(const char *url, is = plugin->open(url, mutex, cond, &error); if (is != NULL) { - assert(is->plugin != NULL); - assert(is->plugin->close != NULL); - assert(is->plugin->read != NULL); - assert(is->plugin->eof != NULL); - assert(!is->seekable || is->plugin->seek != NULL); + assert(is->plugin.close != NULL); + assert(is->plugin.read != NULL); + assert(is->plugin.eof != NULL); + assert(!is->seekable || is->plugin.seek != NULL); is = input_rewind_open(is); @@ -73,35 +72,31 @@ bool input_stream_check(struct input_stream *is, GError **error_r) { assert(is != NULL); - assert(is->plugin != NULL); - return is->plugin->check == NULL || - is->plugin->check(is, error_r); + return is->plugin.check == NULL || + is->plugin.check(is, error_r); } void input_stream_update(struct input_stream *is) { assert(is != NULL); - assert(is->plugin != NULL); - if (is->plugin->update != NULL) - is->plugin->update(is); + if (is->plugin.update != NULL) + is->plugin.update(is); } void input_stream_wait_ready(struct input_stream *is) { assert(is != NULL); - assert(is->mutex != NULL); - assert(is->cond != NULL); while (true) { input_stream_update(is); if (is->ready) break; - is->cond->wait(*is->mutex); + is->cond.wait(is->mutex); } } @@ -109,10 +104,8 @@ void input_stream_lock_wait_ready(struct input_stream *is) { assert(is != NULL); - assert(is->mutex != NULL); - assert(is->cond != NULL); - const ScopeLock protect(*is->mutex); + const ScopeLock protect(is->mutex); input_stream_wait_ready(is); } @@ -173,12 +166,11 @@ input_stream_seek(struct input_stream *is, goffset offset, int whence, GError **error_r) { assert(is != NULL); - assert(is->plugin != NULL); - if (is->plugin->seek == NULL) + if (is->plugin.seek == NULL) return false; - return is->plugin->seek(is, offset, whence, error_r); + return is->plugin.seek(is, offset, whence, error_r); } bool @@ -186,16 +178,11 @@ input_stream_lock_seek(struct input_stream *is, goffset offset, int whence, GError **error_r) { assert(is != NULL); - assert(is->plugin != NULL); - if (is->plugin->seek == NULL) + if (is->plugin.seek == NULL) return false; - if (is->mutex == NULL) - /* no locking */ - return input_stream_seek(is, offset, whence, error_r); - - const ScopeLock protect(*is->mutex); + const ScopeLock protect(is->mutex); return input_stream_seek(is, offset, whence, error_r); } @@ -203,10 +190,9 @@ struct tag * input_stream_tag(struct input_stream *is) { assert(is != NULL); - assert(is->plugin != NULL); - return is->plugin->tag != NULL - ? is->plugin->tag(is) + return is->plugin.tag != NULL + ? is->plugin.tag(is) : NULL; } @@ -214,16 +200,11 @@ struct tag * input_stream_lock_tag(struct input_stream *is) { assert(is != NULL); - assert(is->plugin != NULL); - if (is->plugin->tag == NULL) + if (is->plugin.tag == NULL) return nullptr; - if (is->mutex == NULL) - /* no locking */ - return input_stream_tag(is); - - const ScopeLock protect(*is->mutex); + const ScopeLock protect(is->mutex); return input_stream_tag(is); } @@ -231,10 +212,9 @@ bool input_stream_available(struct input_stream *is) { assert(is != NULL); - assert(is->plugin != NULL); - return is->plugin->available != NULL - ? is->plugin->available(is) + return is->plugin.available != NULL + ? is->plugin.available(is) : true; } @@ -245,7 +225,7 @@ input_stream_read(struct input_stream *is, void *ptr, size_t size, assert(ptr != NULL); assert(size > 0); - return is->plugin->read(is, ptr, size, error_r); + return is->plugin.read(is, ptr, size, error_r); } size_t @@ -255,35 +235,26 @@ input_stream_lock_read(struct input_stream *is, void *ptr, size_t size, assert(ptr != NULL); assert(size > 0); - if (is->mutex == NULL) - /* no locking */ - return input_stream_read(is, ptr, size, error_r); - - const ScopeLock protect(*is->mutex); + const ScopeLock protect(is->mutex); return input_stream_read(is, ptr, size, error_r); } void input_stream_close(struct input_stream *is) { - is->plugin->close(is); + is->plugin.close(is); } bool input_stream_eof(struct input_stream *is) { - return is->plugin->eof(is); + return is->plugin.eof(is); } bool input_stream_lock_eof(struct input_stream *is) { assert(is != NULL); - assert(is->plugin != NULL); - if (is->mutex == NULL) - /* no locking */ - return input_stream_eof(is); - - const ScopeLock protect(*is->mutex); + const ScopeLock protect(is->mutex); return input_stream_eof(is); } diff --git a/src/InputStream.hxx b/src/InputStream.hxx index 62836af88..32940fd96 100644 --- a/src/InputStream.hxx +++ b/src/InputStream.hxx @@ -34,7 +34,7 @@ struct input_stream { /** * the plugin which implements this input stream */ - const struct input_plugin *plugin; + const struct input_plugin &plugin; /** * The absolute URI which was used to open this stream. May @@ -50,7 +50,7 @@ struct input_stream { * This object is allocated by the client, and the client is * responsible for freeing it. */ - Mutex *mutex; + Mutex &mutex; /** * A cond that gets signalled when the state of this object @@ -60,7 +60,7 @@ struct input_stream { * This object is allocated by the client, and the client is * responsible for freeing it. */ - Cond *cond; + Cond &cond; /** * indicates whether the stream is ready for reading and @@ -90,8 +90,8 @@ struct input_stream { input_stream(const input_plugin &_plugin, const char *_uri, Mutex &_mutex, Cond &_cond) - :plugin(&_plugin), uri(g_strdup(_uri)), - mutex(&_mutex), cond(&_cond), + :plugin(_plugin), uri(g_strdup(_uri)), + mutex(_mutex), cond(_cond), ready(false), seekable(false), size(-1), offset(0), mime(nullptr) { @@ -108,14 +108,14 @@ gcc_nonnull(1) static inline void input_stream_lock(struct input_stream *is) { - is->mutex->lock(); + is->mutex.lock(); } gcc_nonnull(1) static inline void input_stream_unlock(struct input_stream *is) { - is->mutex->unlock(); + is->mutex.unlock(); } #endif diff --git a/src/decoder/WavpackDecoderPlugin.cxx b/src/decoder/WavpackDecoderPlugin.cxx index d2a1355a9..b7348e218 100644 --- a/src/decoder/WavpackDecoderPlugin.cxx +++ b/src/decoder/WavpackDecoderPlugin.cxx @@ -517,7 +517,7 @@ wavpack_streamdecode(struct decoder * decoder, struct input_stream *is) struct wavpack_input isp, isp_wvc; bool can_seek = is->seekable; - is_wvc = wavpack_open_wvc(decoder, is->uri, *is->mutex, *is->cond, + is_wvc = wavpack_open_wvc(decoder, is->uri, is->mutex, is->cond, &isp_wvc); if (is_wvc != NULL) { open_flags |= OPEN_WVC; diff --git a/src/input/CurlInputPlugin.cxx b/src/input/CurlInputPlugin.cxx index 0fbfa29d2..f3edf1dc8 100644 --- a/src/input/CurlInputPlugin.cxx +++ b/src/input/CurlInputPlugin.cxx @@ -462,12 +462,12 @@ input_curl_abort_all_requests(GError *error) input_curl_easy_free(c); - const ScopeLock protect(*c->base.mutex); + const ScopeLock protect(c->base.mutex); c->postponed_error = g_error_copy(error); c->base.ready = true; - c->base.cond->broadcast(); + c->base.cond.broadcast(); } g_error_free(error); @@ -487,7 +487,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status) assert(c->easy == NULL); assert(c->postponed_error == NULL); - const ScopeLock protect(*c->base.mutex); + const ScopeLock protect(c->base.mutex); if (result != CURLE_OK) { c->postponed_error = g_error_new(curl_quark(), result, @@ -501,7 +501,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status) c->base.ready = true; - c->base.cond->broadcast(); + c->base.cond.broadcast(); } static void @@ -735,7 +735,7 @@ static bool fill_buffer(struct input_curl *c, GError **error_r) { while (c->easy != NULL && c->buffers.empty()) - c->base.cond->wait(*c->base.mutex); + c->base.cond.wait(c->base.mutex); if (c->postponed_error != NULL) { g_propagate_error(error_r, c->postponed_error); @@ -855,9 +855,9 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, is->offset += (goffset)nbytes; if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) { - c->base.mutex->unlock(); + c->base.mutex.unlock(); io_thread_call(input_curl_resume, c); - c->base.mutex->lock(); + c->base.mutex.lock(); } return nbytes; @@ -974,7 +974,7 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) if (size == 0) return 0; - const ScopeLock protect(*c->base.mutex); + const ScopeLock protect(c->base.mutex); if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { c->paused = true; @@ -984,7 +984,7 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) c->buffers.emplace_back(ptr, size); c->base.ready = true; - c->base.cond->broadcast(); + c->base.cond.broadcast(); return size; } @@ -1108,7 +1108,7 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, /* close the old connection and open a new one */ - c->base.mutex->unlock(); + c->base.mutex.unlock(); input_curl_easy_free_indirect(c); c->buffers.clear(); @@ -1137,10 +1137,10 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, if (!input_curl_easy_add_indirect(c, error_r)) return false; - c->base.mutex->lock(); + c->base.mutex.lock(); while (!c->base.ready) - c->base.cond->wait(*c->base.mutex); + c->base.cond.wait(c->base.mutex); if (c->postponed_error != NULL) { g_propagate_error(error_r, c->postponed_error); diff --git a/src/input/RewindInputPlugin.cxx b/src/input/RewindInputPlugin.cxx index 6c0093a38..362e55b3f 100644 --- a/src/input/RewindInputPlugin.cxx +++ b/src/input/RewindInputPlugin.cxx @@ -62,7 +62,7 @@ struct RewindInputStream { RewindInputStream(input_stream *_input) :base(rewind_input_plugin, _input->uri, - *_input->mutex, *_input->cond), + _input->mutex, _input->cond), input(_input), tail(0) { } diff --git a/src/input/SoupInputPlugin.cxx b/src/input/SoupInputPlugin.cxx index 6615a3c7f..e9767c20e 100644 --- a/src/input/SoupInputPlugin.cxx +++ b/src/input/SoupInputPlugin.cxx @@ -174,7 +174,7 @@ input_soup_session_callback(G_GNUC_UNUSED SoupSession *session, assert(msg == s->msg); assert(!s->completed); - const ScopeLock protect(*s->base.mutex); + const ScopeLock protect(s->base.mutex); if (!s->base.ready) s->CopyError(msg); @@ -183,7 +183,7 @@ input_soup_session_callback(G_GNUC_UNUSED SoupSession *session, s->alive = false; s->completed = true; - s->base.cond->broadcast(); + s->base.cond.broadcast(); } static void @@ -191,10 +191,10 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data) { SoupInputStream *s = (SoupInputStream *)user_data; - s->base.mutex->lock(); + s->base.mutex.lock(); if (!s->CopyError(msg)) { - s->base.mutex->unlock(); + s->base.mutex.unlock(); soup_session_cancel_message(soup_session, msg, SOUP_STATUS_CANCELLED); @@ -202,8 +202,8 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data) } s->base.ready = true; - s->base.cond->broadcast(); - s->base.mutex->unlock(); + s->base.cond.broadcast(); + s->base.mutex.unlock(); soup_message_body_set_accumulate(msg->response_body, false); } @@ -215,7 +215,7 @@ input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data) assert(msg == s->msg); - const ScopeLock protect(*s->base.mutex); + const ScopeLock protect(s->base.mutex); g_queue_push_tail(s->buffers, soup_buffer_copy(chunk)); s->total_buffered += chunk->length; @@ -225,8 +225,8 @@ input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data) soup_session_pause_message(soup_session, msg); } - s->base.cond->broadcast(); - s->base.mutex->unlock(); + s->base.cond.broadcast(); + s->base.mutex.unlock(); } static void @@ -236,14 +236,14 @@ input_soup_got_body(G_GNUC_UNUSED SoupMessage *msg, gpointer user_data) assert(msg == s->msg); - const ScopeLock protect(*s->base.mutex); + const ScopeLock protect(s->base.mutex); s->base.ready = true; s->eof = true; s->alive = false; - s->base.cond->broadcast(); - s->base.mutex->unlock(); + s->base.cond.broadcast(); + s->base.mutex.unlock(); } inline bool @@ -261,7 +261,7 @@ SoupInputStream::WaitData() assert(current_consumed == 0); - base.cond->wait(*base.mutex); + base.cond.wait(base.mutex); } } @@ -339,22 +339,22 @@ input_soup_cancel(gpointer data) SoupInputStream::~SoupInputStream() { - base.mutex->lock(); + base.mutex.lock(); if (!completed) { /* the messages's session callback hasn't been invoked yet; cancel it and wait for completion */ - base.mutex->unlock(); + base.mutex.unlock(); io_thread_call(input_soup_cancel, this); - base.mutex->lock(); + base.mutex.lock(); while (!completed) - base.cond->wait(*base.mutex); + base.cond.wait(base.mutex); } - base.mutex->unlock(); + base.mutex.unlock(); SoupBuffer *buffer; while ((buffer = (SoupBuffer *)g_queue_pop_head(buffers)) != NULL)