From 018f4155eb7b476b96a7b401377be78e00ca7dd2 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Wed, 7 Aug 2013 22:42:45 +0200 Subject: [PATCH] event: add function BlockingCall() Replaces io_thread_call(). This approach is more generic and easier to use due to std::function. --- Makefile.am | 1 + src/IOThread.cxx | 50 ------------------------ src/IOThread.hxx | 6 --- src/event/Call.cxx | 72 ++++++++++++++++++++++++++++++++++ src/event/Call.hxx | 36 +++++++++++++++++ src/input/CurlInputPlugin.cxx | 73 ++++++++++------------------------- 6 files changed, 130 insertions(+), 108 deletions(-) create mode 100644 src/event/Call.cxx create mode 100644 src/event/Call.hxx diff --git a/Makefile.am b/Makefile.am index 898f1e094..f12eebf7d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -304,6 +304,7 @@ libevent_a_SOURCES = \ src/event/FullyBufferedSocket.cxx src/event/FullyBufferedSocket.hxx \ src/event/MultiSocketMonitor.cxx src/event/MultiSocketMonitor.hxx \ src/event/ServerSocket.cxx src/event/ServerSocket.hxx \ + src/event/Call.hxx src/event/Call.cxx \ src/event/Loop.cxx src/event/Loop.hxx # PCM library diff --git a/src/IOThread.cxx b/src/IOThread.cxx index ef0cec4d6..cba9c9263 100644 --- a/src/IOThread.cxx +++ b/src/IOThread.cxx @@ -115,53 +115,3 @@ io_thread_inside(void) { return io.thread != NULL && g_thread_self() == io.thread; } - -struct call_data { - GThreadFunc function; - gpointer data; - bool done; - gpointer result; -}; - -static gboolean -io_thread_call_func(gpointer _data) -{ - struct call_data *data = (struct call_data *)_data; - - gpointer result = data->function(data->data); - - io.mutex.lock(); - data->done = true; - data->result = result; - io.cond.broadcast(); - io.mutex.unlock(); - - return false; -} - -gpointer -io_thread_call(GThreadFunc function, gpointer _data) -{ - assert(io.thread != NULL); - - if (io_thread_inside()) - /* we're already in the I/O thread - no - synchronization needed */ - return function(_data); - - struct call_data data = { - function, - _data, - false, - nullptr, - }; - - io.loop->AddIdle(io_thread_call_func, &data); - - io.mutex.lock(); - while (!data.done) - io.cond.wait(io.mutex); - io.mutex.unlock(); - - return data.result; -} diff --git a/src/IOThread.hxx b/src/IOThread.hxx index a9401dc7f..8c6f233ab 100644 --- a/src/IOThread.hxx +++ b/src/IOThread.hxx @@ -62,10 +62,4 @@ gcc_pure bool io_thread_inside(void); -/** - * Call a function synchronously in the I/O thread. - */ -gpointer -io_thread_call(GThreadFunc function, gpointer data); - #endif diff --git a/src/event/Call.cxx b/src/event/Call.cxx new file mode 100644 index 000000000..e8343f606 --- /dev/null +++ b/src/event/Call.cxx @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2003-2013 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "Call.hxx" +#include "Loop.hxx" +#include "DeferredMonitor.hxx" +#include "thread/Mutex.hxx" +#include "thread/Cond.hxx" +#include "gcc.h" + +#include + +class BlockingCallMonitor final : DeferredMonitor { + const std::function f; + + Mutex mutex; + Cond cond; + + bool done; + +public: + BlockingCallMonitor(EventLoop &_loop, std::function &&_f) + :DeferredMonitor(_loop), f(std::move(_f)), done(false) {} + + void Run() { + assert(!done); + + Schedule(); + + mutex.lock(); + while (!done) + cond.wait(mutex); + mutex.unlock(); + } + +private: + virtual void RunDeferred() override { + f(); + } +}; + +void +BlockingCall(EventLoop &loop, std::function &&f) +{ + if (loop.IsInside()) { + /* we're already inside the loop - we can simply call + the function */ + f(); + } else { + /* outside the EventLoop's thread - defer execution to + the EventLoop, wait for completion */ + BlockingCallMonitor m(loop, std::move(f)); + m.Run(); + } +} diff --git a/src/event/Call.hxx b/src/event/Call.hxx new file mode 100644 index 000000000..34d886ca5 --- /dev/null +++ b/src/event/Call.hxx @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2003-2013 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_EVENT_CALL_HXX +#define MPD_EVENT_CALL_HXX + +#include "check.h" + +#include + +class EventLoop; + +/** + * Call the given function in the context of the #EventLoop, and wait + * for it to finish. + */ +void +BlockingCall(EventLoop &loop, std::function &&f); + +#endif /* MAIN_NOTIFY_H */ diff --git a/src/input/CurlInputPlugin.cxx b/src/input/CurlInputPlugin.cxx index cabde5bba..8e29e47c7 100644 --- a/src/input/CurlInputPlugin.cxx +++ b/src/input/CurlInputPlugin.cxx @@ -26,7 +26,7 @@ #include "Tag.hxx" #include "IcyMetaDataParser.hxx" #include "event/MultiSocketMonitor.hxx" -#include "event/Loop.hxx" +#include "event/Call.hxx" #include "IOThread.hxx" #include @@ -252,19 +252,15 @@ input_curl_find_request(CURL *easy) return NULL; } -static gpointer -input_curl_resume(gpointer data) +static void +input_curl_resume(struct input_curl *c) { assert(io_thread_inside()); - struct input_curl *c = (struct input_curl *)data; - if (c->paused) { c->paused = false; curl_easy_pause(c->easy, CURLPAUSE_CONT); } - - return NULL; } /** @@ -358,21 +354,6 @@ input_curl_easy_add(struct input_curl *c, GError **error_r) return true; } -struct easy_add_params { - struct input_curl *c; - GError **error_r; -}; - -static gpointer -input_curl_easy_add_callback(gpointer data) -{ - const struct easy_add_params *params = - (const struct easy_add_params *)data; - - bool success = input_curl_easy_add(params->c, params->error_r); - return GUINT_TO_POINTER(success); -} - /** * Call input_curl_easy_add() in the I/O thread. May be called from * any thread. Caller must not hold a mutex. @@ -383,14 +364,11 @@ input_curl_easy_add_indirect(struct input_curl *c, GError **error_r) assert(c != NULL); assert(c->easy != NULL); - struct easy_add_params params = { - c, - error_r, - }; - - gpointer result = - io_thread_call(input_curl_easy_add_callback, ¶ms); - return GPOINTER_TO_UINT(result); + bool result; + BlockingCall(io_thread_get(), [c, error_r, &result](){ + result = input_curl_easy_add(c, error_r); + }); + return result; } /** @@ -421,17 +399,6 @@ input_curl_easy_free(struct input_curl *c) c->range = NULL; } -static gpointer -input_curl_easy_free_callback(gpointer data) -{ - struct input_curl *c = (struct input_curl *)data; - - input_curl_easy_free(c); - curl.sockets->InvalidateSockets(); - - return NULL; -} - /** * Frees the current "libcurl easy" handle, and everything associated * with it. @@ -441,7 +408,11 @@ input_curl_easy_free_callback(gpointer data) static void input_curl_easy_free_indirect(struct input_curl *c) { - io_thread_call(input_curl_easy_free_callback, c); + BlockingCall(io_thread_get(), [c](){ + input_curl_easy_free(c); + curl.sockets->InvalidateSockets(); + }); + assert(c->easy == NULL); } @@ -654,20 +625,14 @@ input_curl_init(const config_param ¶m, return true; } -static gpointer -curl_destroy_sources(gcc_unused gpointer data) -{ - delete curl.sockets; - - return NULL; -} - static void input_curl_finish(void) { assert(curl.requests.empty()); - io_thread_call(curl_destroy_sources, NULL); + BlockingCall(io_thread_get(), [](){ + delete curl.sockets; + }); curl_multi_cleanup(curl.multi); @@ -854,7 +819,11 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) { c->base.mutex.unlock(); - io_thread_call(input_curl_resume, c); + + BlockingCall(io_thread_get(), [c](){ + input_curl_resume(c); + }); + c->base.mutex.lock(); }