From 083065f433cb513156878a33ebe5723ae27057da Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Tue, 15 Jan 2013 07:14:36 +0100 Subject: [PATCH] input/Curl: move code to class MultiSocketMonitor --- Makefile.am | 8 +- src/event/MultiSocketMonitor.cxx | 107 +++++++++++++++++++ src/event/MultiSocketMonitor.hxx | 124 ++++++++++++++++++++++ src/input/CurlInputPlugin.cxx | 176 +++++++++++-------------------- 4 files changed, 298 insertions(+), 117 deletions(-) create mode 100644 src/event/MultiSocketMonitor.cxx create mode 100644 src/event/MultiSocketMonitor.hxx diff --git a/Makefile.am b/Makefile.am index eca6d7d7c..38d31b26b 100644 --- a/Makefile.am +++ b/Makefile.am @@ -359,6 +359,7 @@ libevent_a_SOURCES = \ src/event/TimeoutMonitor.hxx src/event/TimeoutMonitor.cxx \ src/event/SocketMonitor.cxx src/event/SocketMonitor.hxx \ src/event/BufferedSocket.cxx src/event/BufferedSocket.hxx \ + src/event/MultiSocketMonitor.cxx src/event/MultiSocketMonitor.hxx \ src/event/Loop.hxx # PCM library @@ -1070,6 +1071,7 @@ test_DumpDatabase_SOURCES = test/DumpDatabase.cxx \ test_run_input_LDADD = \ $(INPUT_LIBS) \ $(ARCHIVE_LIBS) \ + libevent.a \ $(GLIB_LIBS) test_run_input_SOURCES = test/run_input.c \ test/stdbin.h \ @@ -1080,9 +1082,10 @@ test_run_input_SOURCES = test/run_input.c \ src/fd_util.c test_dump_text_file_LDADD = \ - libutil.a \ $(INPUT_LIBS) \ $(ARCHIVE_LIBS) \ + libevent.a \ + libutil.a \ $(GLIB_LIBS) test_dump_text_file_SOURCES = test/dump_text_file.cxx \ test/stdbin.h \ @@ -1100,6 +1103,7 @@ test_dump_playlist_LDADD = \ $(ARCHIVE_LIBS) \ $(DECODER_LIBS) \ $(TAG_LIBS) \ + libevent.a \ libutil.a \ $(GLIB_LIBS) test_dump_playlist_SOURCES = test/dump_playlist.cxx \ @@ -1126,6 +1130,7 @@ test_run_decoder_LDADD = \ $(INPUT_LIBS) \ $(ARCHIVE_LIBS) \ $(TAG_LIBS) \ + libevent.a \ libutil.a \ $(GLIB_LIBS) test_run_decoder_SOURCES = test/run_decoder.cxx \ @@ -1149,6 +1154,7 @@ test_read_tags_LDADD = \ $(INPUT_LIBS) \ $(ARCHIVE_LIBS) \ $(TAG_LIBS) \ + libevent.a \ libutil.a \ $(GLIB_LIBS) test_read_tags_SOURCES = test/read_tags.cxx \ diff --git a/src/event/MultiSocketMonitor.cxx b/src/event/MultiSocketMonitor.cxx new file mode 100644 index 000000000..6f20b907c --- /dev/null +++ b/src/event/MultiSocketMonitor.cxx @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2003-2013 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "MultiSocketMonitor.hxx" +#include "Loop.hxx" +#include "fd_util.h" +#include "gcc.h" + +#include + +/** + * The vtable for our GSource implementation. Unfortunately, we + * cannot declare it "const", because g_source_new() takes a non-const + * pointer, for whatever reason. + */ +static GSourceFuncs multi_socket_monitor_source_funcs = { + MultiSocketMonitor::Prepare, + MultiSocketMonitor::Check, + MultiSocketMonitor::Dispatch, + nullptr, + nullptr, + nullptr, +}; + +MultiSocketMonitor::MultiSocketMonitor(EventLoop &_loop) + :loop(_loop), + source((Source *)g_source_new(&multi_socket_monitor_source_funcs, + sizeof(*source))) { + source->monitor = this; + + g_source_attach(&source->base, loop.GetContext()); +} + +MultiSocketMonitor::~MultiSocketMonitor() +{ + g_source_destroy(&source->base); + g_source_unref(&source->base); + source = nullptr; +} + +bool +MultiSocketMonitor::Check() const +{ + if (CheckSockets()) + return true; + + for (const auto &i : fds) + if (i.revents != 0) + return true; + + return false; +} + +/* + * GSource methods + * + */ + +gboolean +MultiSocketMonitor::Prepare(GSource *_source, gint *timeout_r) +{ + Source &source = *(Source *)_source; + MultiSocketMonitor &monitor = *source.monitor; + assert(_source == &monitor.source->base); + + return monitor.Prepare(timeout_r); +} + +gboolean +MultiSocketMonitor::Check(GSource *_source) +{ + const Source &source = *(const Source *)_source; + const MultiSocketMonitor &monitor = *source.monitor; + assert(_source == &monitor.source->base); + + return monitor.Check(); +} + +gboolean +MultiSocketMonitor::Dispatch(GSource *_source, + gcc_unused GSourceFunc callback, + gcc_unused gpointer user_data) +{ + Source &source = *(Source *)_source; + MultiSocketMonitor &monitor = *source.monitor; + assert(_source == &monitor.source->base); + + monitor.Dispatch(); + return true; +} diff --git a/src/event/MultiSocketMonitor.hxx b/src/event/MultiSocketMonitor.hxx new file mode 100644 index 000000000..9d0e1502b --- /dev/null +++ b/src/event/MultiSocketMonitor.hxx @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2003-2013 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_MULTI_SOCKET_MONITOR_HXX +#define MPD_MULTI_SOCKET_MONITOR_HXX + +#include "check.h" +#include "gcc.h" + +#include + +#include + +#include + +#ifdef WIN32 +/* ERRORis a WIN32 macro that poisons our namespace; this is a + kludge to allow us to use it anyway */ +#ifdef ERROR +#undef ERROR +#endif +#endif + +class EventLoop; + +/** + * Monitor multiple sockets. + */ +class MultiSocketMonitor { + struct Source { + GSource base; + + MultiSocketMonitor *monitor; + }; + + EventLoop &loop; + Source *source; + std::forward_list fds; + +public: + static constexpr unsigned READ = G_IO_IN; + static constexpr unsigned WRITE = G_IO_OUT; + static constexpr unsigned ERROR = G_IO_ERR; + static constexpr unsigned HANGUP = G_IO_HUP; + + MultiSocketMonitor(EventLoop &_loop); + ~MultiSocketMonitor(); + +public: + gcc_pure + gint64 GetTime() const { + return g_source_get_time(&source->base); + } + + void InvalidateSockets() { + /* no-op because GLib always calls the GSource's + "prepare" method before each poll() anyway */ + } + + void AddSocket(int fd, unsigned events) { + fds.push_front({fd, gushort(events), 0}); + g_source_add_poll(&source->base, &fds.front()); + } + + template + void UpdateSocketList(E &&e) { + for (auto prev = fds.before_begin(), end = fds.end(), + i = std::next(prev); + i != end; i = std::next(prev)) { + assert(i->events != 0); + + unsigned events = e(i->fd); + if (events != 0) { + i->events = events; + prev = i; + } else { + g_source_remove_poll(&source->base, &*i); + fds.erase_after(prev); + } + } + } + +protected: + virtual void PrepareSockets(gcc_unused gint *timeout_r) {} + virtual bool CheckSockets() const { return false; } + virtual void DispatchSockets() = 0; + +public: + /* GSource callbacks */ + static gboolean Prepare(GSource *source, gint *timeout_r); + static gboolean Check(GSource *source); + static gboolean Dispatch(GSource *source, GSourceFunc callback, + gpointer user_data); + +private: + bool Prepare(gint *timeout_r) { + PrepareSockets(timeout_r); + return false; + } + + bool Check() const; + + void Dispatch() { + DispatchSockets(); + } +}; + +#endif diff --git a/src/input/CurlInputPlugin.cxx b/src/input/CurlInputPlugin.cxx index 2b696b46e..1b852b900 100644 --- a/src/input/CurlInputPlugin.cxx +++ b/src/input/CurlInputPlugin.cxx @@ -23,6 +23,7 @@ #include "conf.h" #include "tag.h" #include "IcyMetaDataParser.hxx" +#include "event/MultiSocketMonitor.hxx" extern "C" { #include "input_internal.h" @@ -182,6 +183,36 @@ struct input_curl { input_curl &operator=(const input_curl &) = delete; }; +/** + * This class monitors all CURL file descriptors. + */ +class CurlSockets final : private MultiSocketMonitor { + /** + * Did CURL give us a timeout? If yes, then we need to call + * curl_multi_perform(), even if there was no event on any + * file descriptor. + */ + bool have_timeout; + + /** + * The absolute time stamp when the timeout expires. + */ + gint64 absolute_timeout; + +public: + CurlSockets(EventLoop &_loop) + :MultiSocketMonitor(_loop) {} + + using MultiSocketMonitor::InvalidateSockets; + +private: + void UpdateSockets(); + + virtual void PrepareSockets(gcc_unused gint *timeout_r) override; + virtual bool CheckSockets() const override; + virtual void DispatchSockets() override; +}; + /** libcurl should accept "ICY 200 OK" */ static struct curl_slist *http_200_aliases; @@ -198,32 +229,7 @@ static struct { */ std::forward_list requests; - /** - * The GMainLoop source used to poll all CURL file - * descriptors. - */ - GSource *source; - - /** - * The source id of #source. - */ - guint source_id; - - /** a linked list of all registered GPollFD objects */ - std::forward_list fds; - - /** - * Did CURL give us a timeout? If yes, then we need to call - * curl_multi_perform(), even if there was no event on any - * file descriptor. - */ - bool timeout; - - /** - * The absolute time stamp when the timeout expires. This is - * used in the GSource method check(). - */ - gint64 absolute_timeout; + CurlSockets *sockets; } curl; static inline GQuark @@ -268,7 +274,7 @@ input_curl_resume(gpointer data) * Calculates the GLib event bit mask for one file descriptor, * obtained from three #fd_set objects filled by curl_multi_fdset(). */ -static gushort +static unsigned input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) { gushort events = 0; @@ -297,8 +303,8 @@ input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) * * Runs in the I/O thread. No lock needed. */ -static void -curl_update_fds(void) +void +CurlSockets::UpdateSockets() { assert(io_thread_inside()); @@ -317,29 +323,15 @@ curl_update_fds(void) return; } - for (auto prev = curl.fds.before_begin(), end = curl.fds.end(), - i = std::next(prev); - i != end; i = std::next(prev)) { - const auto poll_fd = &*i; - assert(poll_fd->events != 0); - - gushort events = input_curl_fd_events(poll_fd->fd, &rfds, - &wfds, &efds); - if (events != 0) { - poll_fd->events = events; - prev = i; - } else { - g_source_remove_poll(curl.source, poll_fd); - curl.fds.erase_after(prev); - } - } + UpdateSocketList([&rfds, &wfds, &efds](int fd){ + return input_curl_fd_events(fd, &rfds, + &wfds, &efds); + }); for (int fd = 0; fd <= max_fd; ++fd) { - gushort events = input_curl_fd_events(fd, &rfds, &wfds, &efds); - if (events != 0) { - curl.fds.push_front({fd, events, 0}); - g_source_add_poll(curl.source, &curl.fds.front()); - } + unsigned events = input_curl_fd_events(fd, &rfds, &wfds, &efds); + if (events != 0) + AddSocket(fd, events); } } @@ -364,7 +356,7 @@ input_curl_easy_add(struct input_curl *c, GError **error_r) return false; } - curl_update_fds(); + curl.sockets->InvalidateSockets(); return true; } @@ -438,7 +430,7 @@ input_curl_easy_free_callback(gpointer data) struct input_curl *c = (struct input_curl *)data; input_curl_easy_free(c); - curl_update_fds(); + curl.sockets->InvalidateSockets(); return NULL; } @@ -575,27 +567,18 @@ input_curl_perform(void) return true; } -/* - * GSource methods - * - */ - -/** - * The GSource prepare() method implementation. - */ -static gboolean -input_curl_source_prepare(G_GNUC_UNUSED GSource *source, gint *timeout_r) +void +CurlSockets::PrepareSockets(gint *timeout_r) { - curl_update_fds(); + UpdateSockets(); - curl.timeout = false; + have_timeout = false; long timeout2; CURLMcode mcode = curl_multi_timeout(curl.multi, &timeout2); if (mcode == CURLM_OK) { if (timeout2 >= 0) - curl.absolute_timeout = g_source_get_time(source) - + timeout2 * 1000; + absolute_timeout = GetTime() + timeout2 * 1000; if (timeout2 >= 0 && timeout2 < 10) /* CURL 7.21.1 likes to report "timeout=0", @@ -606,65 +589,28 @@ input_curl_source_prepare(G_GNUC_UNUSED GSource *source, gint *timeout_r) *timeout_r = timeout2; - curl.timeout = timeout2 >= 0; + have_timeout = timeout2 >= 0; } else g_warning("curl_multi_timeout() failed: %s\n", curl_multi_strerror(mcode)); - - return false; } -/** - * The GSource check() method implementation. - */ -static gboolean -input_curl_source_check(G_GNUC_UNUSED GSource *source) +bool +CurlSockets::CheckSockets() const { - if (curl.timeout) { - /* when a timeout has expired, we need to call - curl_multi_perform(), even if there was no file - descriptor event */ - - if (g_source_get_time(source) >= curl.absolute_timeout) - return true; - } - - for (const auto &i : curl.fds) - if (i.revents != 0) - return true; - - return false; + /* when a timeout has expired, we need to call + curl_multi_perform(), even if there was no file descriptor + event */ + return have_timeout && GetTime() >= absolute_timeout; } -/** - * The GSource dispatch() method implementation. The callback isn't - * used, because we're handling all events directly. - */ -static gboolean -input_curl_source_dispatch(G_GNUC_UNUSED GSource *source, - G_GNUC_UNUSED GSourceFunc callback, - G_GNUC_UNUSED gpointer user_data) +void +CurlSockets::DispatchSockets() { if (input_curl_perform()) input_curl_info_read(); - - return true; } -/** - * The vtable for our GSource implementation. Unfortunately, we - * cannot declare it "const", because g_source_new() takes a non-const - * pointer, for whatever reason. - */ -static GSourceFuncs curl_source_funcs = { - input_curl_source_prepare, - input_curl_source_check, - input_curl_source_dispatch, - nullptr, - nullptr, - nullptr, -}; - /* * input_plugin methods * @@ -706,9 +652,7 @@ input_curl_init(const struct config_param *param, return false; } - curl.source = g_source_new(&curl_source_funcs, sizeof(*curl.source)); - curl.source_id = g_source_attach(curl.source, - io_thread_get().GetContext()); + curl.sockets = new CurlSockets(io_thread_get()); return true; } @@ -716,7 +660,7 @@ input_curl_init(const struct config_param *param, static gpointer curl_destroy_sources(G_GNUC_UNUSED gpointer data) { - g_source_destroy(curl.source); + delete curl.sockets; return NULL; }