event: add function BlockingCall()
Replaces io_thread_call(). This approach is more generic and easier to use due to std::function.
This commit is contained in:
		| @@ -304,6 +304,7 @@ libevent_a_SOURCES = \ | ||||
| 	src/event/FullyBufferedSocket.cxx src/event/FullyBufferedSocket.hxx \ | ||||
| 	src/event/MultiSocketMonitor.cxx src/event/MultiSocketMonitor.hxx \ | ||||
| 	src/event/ServerSocket.cxx src/event/ServerSocket.hxx \ | ||||
| 	src/event/Call.hxx src/event/Call.cxx \ | ||||
| 	src/event/Loop.cxx src/event/Loop.hxx | ||||
|  | ||||
| # PCM library | ||||
|   | ||||
| @@ -115,53 +115,3 @@ io_thread_inside(void) | ||||
| { | ||||
| 	return io.thread != NULL && g_thread_self() == io.thread; | ||||
| } | ||||
|  | ||||
| struct call_data { | ||||
| 	GThreadFunc function; | ||||
| 	gpointer data; | ||||
| 	bool done; | ||||
| 	gpointer result; | ||||
| }; | ||||
|  | ||||
| static gboolean | ||||
| io_thread_call_func(gpointer _data) | ||||
| { | ||||
| 	struct call_data *data = (struct call_data *)_data; | ||||
|  | ||||
| 	gpointer result = data->function(data->data); | ||||
|  | ||||
| 	io.mutex.lock(); | ||||
| 	data->done = true; | ||||
| 	data->result = result; | ||||
| 	io.cond.broadcast(); | ||||
| 	io.mutex.unlock(); | ||||
|  | ||||
| 	return false; | ||||
| } | ||||
|  | ||||
| gpointer | ||||
| io_thread_call(GThreadFunc function, gpointer _data) | ||||
| { | ||||
| 	assert(io.thread != NULL); | ||||
|  | ||||
| 	if (io_thread_inside()) | ||||
| 		/* we're already in the I/O thread - no | ||||
| 		   synchronization needed */ | ||||
| 		return function(_data); | ||||
|  | ||||
| 	struct call_data data = { | ||||
| 		function, | ||||
| 		_data, | ||||
| 		false, | ||||
| 		nullptr, | ||||
| 	}; | ||||
|  | ||||
| 	io.loop->AddIdle(io_thread_call_func, &data); | ||||
|  | ||||
| 	io.mutex.lock(); | ||||
| 	while (!data.done) | ||||
| 		io.cond.wait(io.mutex); | ||||
| 	io.mutex.unlock(); | ||||
|  | ||||
| 	return data.result; | ||||
| } | ||||
|   | ||||
| @@ -62,10 +62,4 @@ gcc_pure | ||||
| bool | ||||
| io_thread_inside(void); | ||||
|  | ||||
| /** | ||||
|  * Call a function synchronously in the I/O thread. | ||||
|  */ | ||||
| gpointer | ||||
| io_thread_call(GThreadFunc function, gpointer data); | ||||
|  | ||||
| #endif | ||||
|   | ||||
							
								
								
									
										72
									
								
								src/event/Call.cxx
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								src/event/Call.cxx
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,72 @@ | ||||
| /* | ||||
|  * Copyright (C) 2003-2013 The Music Player Daemon Project | ||||
|  * http://www.musicpd.org | ||||
|  * | ||||
|  * This program is free software; you can redistribute it and/or modify | ||||
|  * it under the terms of the GNU General Public License as published by | ||||
|  * the Free Software Foundation; either version 2 of the License, or | ||||
|  * (at your option) any later version. | ||||
|  * | ||||
|  * This program is distributed in the hope that it will be useful, | ||||
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|  * GNU General Public License for more details. | ||||
|  * | ||||
|  * You should have received a copy of the GNU General Public License along | ||||
|  * with this program; if not, write to the Free Software Foundation, Inc., | ||||
|  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||||
|  */ | ||||
|  | ||||
| #include "config.h" | ||||
| #include "Call.hxx" | ||||
| #include "Loop.hxx" | ||||
| #include "DeferredMonitor.hxx" | ||||
| #include "thread/Mutex.hxx" | ||||
| #include "thread/Cond.hxx" | ||||
| #include "gcc.h" | ||||
|  | ||||
| #include <assert.h> | ||||
|  | ||||
| class BlockingCallMonitor final : DeferredMonitor { | ||||
| 	const std::function<void()> f; | ||||
|  | ||||
| 	Mutex mutex; | ||||
| 	Cond cond; | ||||
|  | ||||
| 	bool done; | ||||
|  | ||||
| public: | ||||
| 	BlockingCallMonitor(EventLoop &_loop, std::function<void()> &&_f) | ||||
| 		:DeferredMonitor(_loop), f(std::move(_f)), done(false) {} | ||||
|  | ||||
| 	void Run() { | ||||
| 		assert(!done); | ||||
|  | ||||
| 		Schedule(); | ||||
|  | ||||
| 		mutex.lock(); | ||||
| 		while (!done) | ||||
| 			cond.wait(mutex); | ||||
| 		mutex.unlock(); | ||||
| 	} | ||||
|  | ||||
| private: | ||||
| 	virtual void RunDeferred() override { | ||||
| 		f(); | ||||
| 	} | ||||
| }; | ||||
|  | ||||
| void | ||||
| BlockingCall(EventLoop &loop, std::function<void()> &&f) | ||||
| { | ||||
| 	if (loop.IsInside()) { | ||||
| 		/* we're already inside the loop - we can simply call | ||||
| 		   the function */ | ||||
| 		f(); | ||||
| 	} else { | ||||
| 		/* outside the EventLoop's thread - defer execution to | ||||
| 		   the EventLoop, wait for completion */ | ||||
| 		BlockingCallMonitor m(loop, std::move(f)); | ||||
| 		m.Run(); | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										36
									
								
								src/event/Call.hxx
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								src/event/Call.hxx
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,36 @@ | ||||
| /* | ||||
|  * Copyright (C) 2003-2013 The Music Player Daemon Project | ||||
|  * http://www.musicpd.org | ||||
|  * | ||||
|  * This program is free software; you can redistribute it and/or modify | ||||
|  * it under the terms of the GNU General Public License as published by | ||||
|  * the Free Software Foundation; either version 2 of the License, or | ||||
|  * (at your option) any later version. | ||||
|  * | ||||
|  * This program is distributed in the hope that it will be useful, | ||||
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|  * GNU General Public License for more details. | ||||
|  * | ||||
|  * You should have received a copy of the GNU General Public License along | ||||
|  * with this program; if not, write to the Free Software Foundation, Inc., | ||||
|  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||||
|  */ | ||||
|  | ||||
| #ifndef MPD_EVENT_CALL_HXX | ||||
| #define MPD_EVENT_CALL_HXX | ||||
|  | ||||
| #include "check.h" | ||||
|  | ||||
| #include <functional> | ||||
|  | ||||
| class EventLoop; | ||||
|  | ||||
| /** | ||||
|  * Call the given function in the context of the #EventLoop, and wait | ||||
|  * for it to finish. | ||||
|  */ | ||||
| void | ||||
| BlockingCall(EventLoop &loop, std::function<void()> &&f); | ||||
|  | ||||
| #endif /* MAIN_NOTIFY_H */ | ||||
| @@ -26,7 +26,7 @@ | ||||
| #include "Tag.hxx" | ||||
| #include "IcyMetaDataParser.hxx" | ||||
| #include "event/MultiSocketMonitor.hxx" | ||||
| #include "event/Loop.hxx" | ||||
| #include "event/Call.hxx" | ||||
| #include "IOThread.hxx" | ||||
|  | ||||
| #include <assert.h> | ||||
| @@ -252,19 +252,15 @@ input_curl_find_request(CURL *easy) | ||||
| 	return NULL; | ||||
| } | ||||
|  | ||||
| static gpointer | ||||
| input_curl_resume(gpointer data) | ||||
| static void | ||||
| input_curl_resume(struct input_curl *c) | ||||
| { | ||||
| 	assert(io_thread_inside()); | ||||
|  | ||||
| 	struct input_curl *c = (struct input_curl *)data; | ||||
|  | ||||
| 	if (c->paused) { | ||||
| 		c->paused = false; | ||||
| 		curl_easy_pause(c->easy, CURLPAUSE_CONT); | ||||
| 	} | ||||
|  | ||||
| 	return NULL; | ||||
| } | ||||
|  | ||||
| /** | ||||
| @@ -358,21 +354,6 @@ input_curl_easy_add(struct input_curl *c, GError **error_r) | ||||
| 	return true; | ||||
| } | ||||
|  | ||||
| struct easy_add_params { | ||||
| 	struct input_curl *c; | ||||
| 	GError **error_r; | ||||
| }; | ||||
|  | ||||
| static gpointer | ||||
| input_curl_easy_add_callback(gpointer data) | ||||
| { | ||||
| 	const struct easy_add_params *params = | ||||
| 		(const struct easy_add_params *)data; | ||||
|  | ||||
| 	bool success = input_curl_easy_add(params->c, params->error_r); | ||||
| 	return GUINT_TO_POINTER(success); | ||||
| } | ||||
|  | ||||
| /** | ||||
|  * Call input_curl_easy_add() in the I/O thread.  May be called from | ||||
|  * any thread.  Caller must not hold a mutex. | ||||
| @@ -383,14 +364,11 @@ input_curl_easy_add_indirect(struct input_curl *c, GError **error_r) | ||||
| 	assert(c != NULL); | ||||
| 	assert(c->easy != NULL); | ||||
|  | ||||
| 	struct easy_add_params params = { | ||||
| 		c, | ||||
| 		error_r, | ||||
| 	}; | ||||
|  | ||||
| 	gpointer result = | ||||
| 		io_thread_call(input_curl_easy_add_callback, ¶ms); | ||||
| 	return GPOINTER_TO_UINT(result); | ||||
| 	bool result; | ||||
| 	BlockingCall(io_thread_get(), [c, error_r, &result](){ | ||||
| 			result = input_curl_easy_add(c, error_r); | ||||
| 		}); | ||||
| 	return result; | ||||
| } | ||||
|  | ||||
| /** | ||||
| @@ -421,17 +399,6 @@ input_curl_easy_free(struct input_curl *c) | ||||
| 	c->range = NULL; | ||||
| } | ||||
|  | ||||
| static gpointer | ||||
| input_curl_easy_free_callback(gpointer data) | ||||
| { | ||||
| 	struct input_curl *c = (struct input_curl *)data; | ||||
|  | ||||
| 	input_curl_easy_free(c); | ||||
| 	curl.sockets->InvalidateSockets(); | ||||
|  | ||||
| 	return NULL; | ||||
| } | ||||
|  | ||||
| /** | ||||
|  * Frees the current "libcurl easy" handle, and everything associated | ||||
|  * with it. | ||||
| @@ -441,7 +408,11 @@ input_curl_easy_free_callback(gpointer data) | ||||
| static void | ||||
| input_curl_easy_free_indirect(struct input_curl *c) | ||||
| { | ||||
| 	io_thread_call(input_curl_easy_free_callback, c); | ||||
| 	BlockingCall(io_thread_get(), [c](){ | ||||
| 			input_curl_easy_free(c); | ||||
| 			curl.sockets->InvalidateSockets(); | ||||
| 		}); | ||||
|  | ||||
| 	assert(c->easy == NULL); | ||||
| } | ||||
|  | ||||
| @@ -654,20 +625,14 @@ input_curl_init(const config_param ¶m, | ||||
| 	return true; | ||||
| } | ||||
|  | ||||
| static gpointer | ||||
| curl_destroy_sources(gcc_unused gpointer data) | ||||
| { | ||||
| 	delete curl.sockets; | ||||
|  | ||||
| 	return NULL; | ||||
| } | ||||
|  | ||||
| static void | ||||
| input_curl_finish(void) | ||||
| { | ||||
| 	assert(curl.requests.empty()); | ||||
|  | ||||
| 	io_thread_call(curl_destroy_sources, NULL); | ||||
| 	BlockingCall(io_thread_get(), [](){ | ||||
| 			delete curl.sockets; | ||||
| 		}); | ||||
|  | ||||
| 	curl_multi_cleanup(curl.multi); | ||||
|  | ||||
| @@ -854,7 +819,11 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, | ||||
|  | ||||
| 	if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) { | ||||
| 		c->base.mutex.unlock(); | ||||
| 		io_thread_call(input_curl_resume, c); | ||||
|  | ||||
| 		BlockingCall(io_thread_get(), [c](){ | ||||
| 				input_curl_resume(c); | ||||
| 			}); | ||||
|  | ||||
| 		c->base.mutex.lock(); | ||||
| 	} | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Max Kellermann
					Max Kellermann