From 3b9ffea36f2079fe23e301d0298ca957ce6a7158 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Wed, 24 Aug 2011 03:23:12 +0200 Subject: [PATCH] input/soup: new input plugin based on libsoup To demonstrate the new I/O thread. libsoup is well-integrated into the GLib main loop, which made this plugin pretty easy to write. As a side effect, we have to initialize the I/O thread in all debug programs that use the input API. --- Makefile.am | 12 ++ NEWS | 1 + configure.ac | 14 ++ doc/user.xml | 28 +++ src/input/soup_input_plugin.c | 370 ++++++++++++++++++++++++++++++++++ src/input/soup_input_plugin.h | 25 +++ src/input_registry.c | 7 + src/ls.c | 2 +- test/dump_playlist.c | 10 + test/read_tags.c | 11 + test/run_decoder.c | 10 + test/run_input.c | 11 + 12 files changed, 500 insertions(+), 1 deletion(-) create mode 100644 src/input/soup_input_plugin.c create mode 100644 src/input/soup_input_plugin.h diff --git a/Makefile.am b/Makefile.am index b350424f7..708629bac 100644 --- a/Makefile.am +++ b/Makefile.am @@ -636,12 +636,14 @@ endif INPUT_CFLAGS = \ $(CURL_CFLAGS) \ + $(SOUP_CFLAGS) \ $(CDIO_PARANOIA_CFLAGS) \ $(FFMPEG_CFLAGS) \ $(MMS_CFLAGS) INPUT_LIBS = \ $(CURL_LIBS) \ + $(SOUP_LIBS) \ $(CDIO_PARANOIA_LIBS) \ $(FFMPEG_LIBS) \ $(MMS_LIBS) @@ -658,6 +660,12 @@ INPUT_SRC += src/input/curl_input_plugin.c \ src/icy_metadata.c endif +if ENABLE_SOUP +INPUT_SRC += \ + src/input/soup_input_plugin.c \ + src/input/soup_input_plugin.h +endif + if ENABLE_CDIO_PARANOIA INPUT_SRC += src/input/cdio_paranoia_input_plugin.c endif @@ -909,6 +917,7 @@ test_run_input_LDADD = $(MPD_LIBS) \ $(GLIB_LIBS) test_run_input_SOURCES = test/run_input.c \ test/stdbin.h \ + src/io_thread.c src/io_thread.h \ src/conf.c src/tokenizer.c src/utils.c src/string_util.c\ src/tag.c src/tag_pool.c src/tag_save.c \ src/fd_util.c \ @@ -927,6 +936,7 @@ test_dump_playlist_LDADD = $(MPD_LIBS) \ $(INPUT_LIBS) \ $(GLIB_LIBS) test_dump_playlist_SOURCES = test/dump_playlist.c \ + src/io_thread.c src/io_thread.h \ src/conf.c src/tokenizer.c src/utils.c src/string_util.c\ src/uri.c \ src/song.c src/tag.c src/tag_pool.c src/tag_save.c \ @@ -957,6 +967,7 @@ test_run_decoder_LDADD = $(MPD_LIBS) \ $(GLIB_LIBS) test_run_decoder_SOURCES = test/run_decoder.c \ test/stdbin.h \ + src/io_thread.c src/io_thread.h \ src/conf.c src/tokenizer.c src/utils.c src/string_util.c src/log.c \ src/tag.c src/tag_pool.c \ src/replay_gain_info.c \ @@ -980,6 +991,7 @@ test_read_tags_LDADD = $(MPD_LIBS) \ $(INPUT_LIBS) $(DECODER_LIBS) \ $(GLIB_LIBS) test_read_tags_SOURCES = test/read_tags.c \ + src/io_thread.c src/io_thread.h \ src/conf.c src/tokenizer.c src/utils.c src/string_util.c src/log.c \ src/tag.c src/tag_pool.c \ src/replay_gain_info.c \ diff --git a/NEWS b/NEWS index 8d9c335a6..34cf90e45 100644 --- a/NEWS +++ b/NEWS @@ -5,6 +5,7 @@ ver 0.17 (2011/??/??) * input: - cdio_paranoia: new input plugin to play audio CDs - curl: enable CURLOPT_NETRC + - soup: new input plugin based on libsoup - ffmpeg: support libavformat 0.7 * decoder: - mpg123: implement seeking diff --git a/configure.ac b/configure.ac index 84adf4d7e..c1f76e932 100644 --- a/configure.ac +++ b/configure.ac @@ -156,6 +156,11 @@ AC_ARG_ENABLE(curl, [enable support for libcurl HTTP streaming (default: auto)]),, [enable_curl=auto]) +AC_ARG_ENABLE(soup, + AS_HELP_STRING([--enable-soup], + [enable support for libsoup HTTP streaming (default: auto)]),, + [enable_soup=auto]) + AC_ARG_ENABLE(debug, AS_HELP_STRING([--enable-debug], [enable debugging (default: disabled)]),, @@ -644,6 +649,14 @@ if test x$enable_curl = xyes; then fi AM_CONDITIONAL(ENABLE_CURL, test x$enable_curl = xyes) +dnl ----------------------------------- SOUP ---------------------------------- +MPD_AUTO_PKG(soup, SOUP, [libsoup-2.4], + [libsoup HTTP streaming], [libsoup not found]) +if test x$enable_soup = xyes; then + AC_DEFINE(ENABLE_SOUP, 1, [Define when libsoup is used for HTTP streaming]) +fi +AM_CONDITIONAL(ENABLE_SOUP, test x$enable_soup = xyes) + dnl --------------------------------- Last.FM --------------------------------- if test x$enable_lastfm = xyes; then if test x$enable_curl != xyes; then @@ -1625,6 +1638,7 @@ fi printf '\nStreaming support:\n\t' results(curl,[CURL]) +results(soup, [SOUP]) results(lastfm,[Last.FM]) results(mms,[MMS]) results(cdio_paranoia, [CDIO_PARANOIA]) diff --git a/doc/user.xml b/doc/user.xml index 97f76c5d3..4ca91da33 100644 --- a/doc/user.xml +++ b/doc/user.xml @@ -703,6 +703,34 @@ cd mpd-version +
+ <varname>soup</varname> + + + Opens remote files or streams over HTTP. + + + + + + + Setting + Description + + + + + + proxy + + + Sets the address of the HTTP proxy server. + + + + + +
diff --git a/src/input/soup_input_plugin.c b/src/input/soup_input_plugin.c new file mode 100644 index 000000000..7ceabbc62 --- /dev/null +++ b/src/input/soup_input_plugin.c @@ -0,0 +1,370 @@ +/* + * Copyright (C) 2003-2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "input/soup_input_plugin.h" +#include "input_plugin.h" +#include "io_thread.h" +#include "conf.h" + +#include +#include + +#include +#include + +#undef G_LOG_DOMAIN +#define G_LOG_DOMAIN "input_soup" + +/** + * Do not buffer more than this number of bytes. It should be a + * reasonable limit that doesn't make low-end machines suffer too + * much, but doesn't cause stuttering on high-latency lines. + */ +static const size_t SOUP_MAX_BUFFERED = 512 * 1024; + +static SoupURI *soup_proxy; +static SoupSession *soup_session; + +struct input_soup { + struct input_stream base; + + GMutex *mutex; + GCond *cond; + + SoupMessage *msg; + + GQueue *buffers; + + size_t current_consumed; + + size_t total_buffered; + + bool alive, ready, pause, eof; +}; + +static inline GQuark +soup_quark(void) +{ + return g_quark_from_static_string("soup"); +} + +static bool +input_soup_init(const struct config_param *param, GError **error_r) +{ + assert(soup_proxy == NULL); + assert(soup_session == NULL); + + g_type_init(); + + const char *proxy = config_get_block_string(param, "proxy", NULL); + + if (proxy != NULL) { + soup_proxy = soup_uri_new(proxy); + if (soup_proxy == NULL) { + g_set_error(error_r, soup_quark(), 0, + "failed to parse proxy setting"); + return false; + } + } + + soup_session = + soup_session_async_new_with_options(SOUP_SESSION_PROXY_URI, + soup_proxy, + SOUP_SESSION_ASYNC_CONTEXT, + io_thread_context(), + NULL); + + return true; +} + +static void +input_soup_finish(void) +{ + assert(soup_session != NULL); + + soup_session_abort(soup_session); + g_object_unref(G_OBJECT(soup_session)); + + if (soup_proxy != NULL) + soup_uri_free(soup_proxy); +} + +static void +input_soup_session_callback(G_GNUC_UNUSED SoupSession *session, + G_GNUC_UNUSED SoupMessage *msg, gpointer user_data) +{ + struct input_soup *s = user_data; + + assert(msg == s->msg); + + g_mutex_lock(s->mutex); + s->alive = false; + g_cond_broadcast(s->cond); + g_mutex_unlock(s->mutex); +} + +static void +input_soup_got_headers(SoupMessage *msg, gpointer user_data) +{ + struct input_soup *s = user_data; + + if (!SOUP_STATUS_IS_SUCCESSFUL(msg->status_code)) { + soup_session_cancel_message(soup_session, msg, + SOUP_STATUS_CANCELLED); + return; + } + + soup_message_body_set_accumulate(msg->response_body, false); + + g_mutex_lock(s->mutex); + s->ready = true; + g_cond_broadcast(s->cond); + g_mutex_unlock(s->mutex); +} + +static void +input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data) +{ + struct input_soup *s = user_data; + + assert(msg == s->msg); + + g_mutex_lock(s->mutex); + + g_queue_push_tail(s->buffers, soup_buffer_copy(chunk)); + s->total_buffered += chunk->length; + + if (s->total_buffered >= SOUP_MAX_BUFFERED && !s->pause) { + s->pause = true; + soup_session_pause_message(soup_session, msg); + } + + g_cond_broadcast(s->cond); + g_mutex_unlock(s->mutex); +} + +static void +input_soup_got_body(SoupMessage *msg, gpointer user_data) +{ + struct input_soup *s = user_data; + + assert(msg == s->msg); + + g_mutex_lock(s->mutex); + + s->eof = true; + s->alive = false; + + g_cond_broadcast(s->cond); + g_mutex_unlock(s->mutex); +} + +static bool +input_soup_wait_data(struct input_soup *s) +{ + while (true) { + if (s->eof) + return true; + + if (!s->alive) + return false; + + if (!g_queue_is_empty(s->buffers)) + return true; + + assert(s->current_consumed == 0); + + g_cond_wait(s->cond, s->mutex); + } +} + +static struct input_stream * +input_soup_open(const char *uri, G_GNUC_UNUSED GError **error_r) +{ + if (strncmp(uri, "http://", 7) != 0) + return NULL; + + struct input_soup *s = g_new(struct input_soup, 1); + input_stream_init(&s->base, &input_plugin_soup, uri); + + s->mutex = g_mutex_new(); + s->cond = g_cond_new(); + + s->buffers = g_queue_new(); + s->current_consumed = 0; + s->total_buffered = 0; + + s->msg = soup_message_new(SOUP_METHOD_GET, uri); + soup_message_set_flags(s->msg, SOUP_MESSAGE_NO_REDIRECT); + + soup_message_headers_append(s->msg->request_headers, "User-Agent", + "Music Player Daemon " VERSION); + + g_signal_connect(s->msg, "got-headers", + G_CALLBACK(input_soup_got_headers), s); + g_signal_connect(s->msg, "got-chunk", + G_CALLBACK(input_soup_got_chunk), s); + g_signal_connect(s->msg, "got-body", + G_CALLBACK(input_soup_got_body), s); + + s->alive = true; + s->ready = false; + s->pause = false; + s->eof = false; + + soup_session_queue_message(soup_session, s->msg, + input_soup_session_callback, s); + + return &s->base; +} + +static void +input_soup_close(struct input_stream *is) +{ + struct input_soup *s = (struct input_soup *)is; + + g_mutex_lock(s->mutex); + + if (s->alive) { + assert(s->msg != NULL); + + soup_session_cancel_message(soup_session, s->msg, + SOUP_STATUS_CANCELLED); + s->alive = false; + } + + g_mutex_unlock(s->mutex); +} + +static int +input_soup_buffer(struct input_stream *is, GError **error_r) +{ + struct input_soup *s = (struct input_soup *)is; + + g_mutex_lock(s->mutex); + + if (s->pause) { + if (s->total_buffered >= SOUP_MAX_BUFFERED) { + g_mutex_unlock(s->mutex); + return 1; + } + + s->pause = false; + soup_session_unpause_message(soup_session, s->msg); + } + + + bool success = input_soup_wait_data(s); + s->base.ready = s->ready; + g_mutex_unlock(s->mutex); + + if (!success) { + g_set_error_literal(error_r, soup_quark(), 0, "HTTP failure"); + return -1; + } + + return 1; +} + +static size_t +input_soup_read(struct input_stream *is, void *ptr, size_t size, + G_GNUC_UNUSED GError **error_r) +{ + struct input_soup *s = (struct input_soup *)is; + + g_mutex_lock(s->mutex); + + if (!input_soup_wait_data(s)) { + assert(!s->alive); + g_mutex_unlock(s->mutex); + + return 0; + } + + s->base.ready = s->ready; + + char *p0 = ptr, *p = p0, *p_end = p0 + size; + + while (p < p_end) { + SoupBuffer *buffer = g_queue_pop_head(s->buffers); + if (buffer == NULL) { + assert(s->current_consumed == 0); + break; + } + + assert(s->current_consumed < buffer->length); + assert(s->total_buffered >= buffer->length); + + const char *q = buffer->data; + q += s->current_consumed; + + size_t remaining = buffer->length - s->current_consumed; + size_t nbytes = p_end - p; + if (nbytes > remaining) + nbytes = remaining; + + memcpy(p, q, nbytes); + p += nbytes; + + s->current_consumed += remaining; + if (s->current_consumed >= buffer->length) { + /* done with this buffer */ + s->total_buffered -= buffer->length; + soup_buffer_free(buffer); + s->current_consumed = 0; + } else { + /* partial read */ + assert(p == p_end); + + g_queue_push_head(s->buffers, buffer); + } + } + + if (s->pause && s->total_buffered < SOUP_MAX_BUFFERED) { + s->pause = false; + soup_session_unpause_message(soup_session, s->msg); + } + + size_t nbytes = p - p0; + s->base.offset += nbytes; + + g_mutex_unlock(s->mutex); + return nbytes; +} + +static bool +input_soup_eof(G_GNUC_UNUSED struct input_stream *is) +{ + struct input_soup *s = (struct input_soup *)is; + + return !s->alive && g_queue_is_empty(s->buffers); +} + +const struct input_plugin input_plugin_soup = { + .name = "soup", + .init = input_soup_init, + .finish = input_soup_finish, + + .open = input_soup_open, + .close = input_soup_close, + .buffer = input_soup_buffer, + .read = input_soup_read, + .eof = input_soup_eof, +}; diff --git a/src/input/soup_input_plugin.h b/src/input/soup_input_plugin.h new file mode 100644 index 000000000..689b2d971 --- /dev/null +++ b/src/input/soup_input_plugin.h @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2003-2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_INPUT_SOUP_H +#define MPD_INPUT_SOUP_H + +extern const struct input_plugin input_plugin_soup; + +#endif diff --git a/src/input_registry.c b/src/input_registry.c index b76d9888e..5987d5da2 100644 --- a/src/input_registry.c +++ b/src/input_registry.c @@ -29,6 +29,10 @@ #include "input/curl_input_plugin.h" #endif +#ifdef ENABLE_SOUP +#include "input/soup_input_plugin.h" +#endif + #ifdef HAVE_FFMPEG #include "input/ffmpeg_input_plugin.h" #endif @@ -55,6 +59,9 @@ const struct input_plugin *const input_plugins[] = { #ifdef ENABLE_CURL &input_plugin_curl, #endif +#ifdef ENABLE_SOUP + &input_plugin_soup, +#endif #ifdef HAVE_FFMPEG &input_plugin_ffmpeg, #endif diff --git a/src/ls.c b/src/ls.c index fa87d35d7..5ed9fc579 100644 --- a/src/ls.c +++ b/src/ls.c @@ -32,7 +32,7 @@ * connected by IPC socket. */ static const char *remoteUrlPrefixes[] = { -#ifdef ENABLE_CURL +#if defined(ENABLE_CURL) || defined(ENABLE_SOUP) "http://", #endif #ifdef ENABLE_MMS diff --git a/test/dump_playlist.c b/test/dump_playlist.c index 75f4f08b0..bf3fed7c9 100644 --- a/test/dump_playlist.c +++ b/test/dump_playlist.c @@ -18,6 +18,7 @@ */ #include "config.h" +#include "io_thread.h" #include "input_init.h" #include "input_stream.h" #include "tag_pool.h" @@ -30,6 +31,7 @@ #include #include +#include static void my_log_func(const gchar *log_domain, G_GNUC_UNUSED GLogLevelFlags log_level, @@ -73,6 +75,13 @@ int main(int argc, char **argv) return 1; } + io_thread_init(); + if (!io_thread_start(&error)) { + g_warning("%s", error->message); + g_error_free(error); + return EXIT_FAILURE; + } + if (!input_stream_global_init(&error)) { g_warning("%s", error->message); g_error_free(error); @@ -150,6 +159,7 @@ int main(int argc, char **argv) input_stream_close(is); playlist_list_global_finish(); input_stream_global_finish(); + io_thread_deinit(); config_global_finish(); tag_pool_deinit(); diff --git a/test/read_tags.c b/test/read_tags.c index 04c0caec6..64535a87f 100644 --- a/test/read_tags.c +++ b/test/read_tags.c @@ -18,6 +18,7 @@ */ #include "config.h" +#include "io_thread.h" #include "decoder_list.h" #include "decoder_api.h" #include "input_init.h" @@ -32,6 +33,7 @@ #include #include +#include #ifdef HAVE_LOCALE_H #include @@ -164,6 +166,13 @@ int main(int argc, char **argv) decoder_name = argv[1]; path = argv[2]; + io_thread_init(); + if (!io_thread_start(&error)) { + g_warning("%s", error->message); + g_error_free(error); + return EXIT_FAILURE; + } + if (!input_stream_global_init(&error)) { g_warning("%s", error->message); g_error_free(error); @@ -195,6 +204,8 @@ int main(int argc, char **argv) decoder_plugin_deinit_all(); input_stream_global_finish(); + io_thread_deinit(); + if (tag == NULL) { g_printerr("Failed to read tags\n"); return 1; diff --git a/test/run_decoder.c b/test/run_decoder.c index 1f268dde8..efc246f55 100644 --- a/test/run_decoder.c +++ b/test/run_decoder.c @@ -18,6 +18,7 @@ */ #include "config.h" +#include "io_thread.h" #include "decoder_list.h" #include "decoder_api.h" #include "input_init.h" @@ -31,6 +32,7 @@ #include #include +#include static void my_log_func(const gchar *log_domain, G_GNUC_UNUSED GLogLevelFlags log_level, @@ -180,6 +182,13 @@ int main(int argc, char **argv) g_log_set_default_handler(my_log_func, NULL); + io_thread_init(); + if (!io_thread_start(&error)) { + g_warning("%s", error->message); + g_error_free(error); + return EXIT_FAILURE; + } + if (!input_stream_global_init(&error)) { g_warning("%s", error->message); g_error_free(error); @@ -222,6 +231,7 @@ int main(int argc, char **argv) decoder_plugin_deinit_all(); input_stream_global_finish(); + io_thread_deinit(); if (!decoder.initialized) { g_printerr("Decoding failed\n"); diff --git a/test/run_input.c b/test/run_input.c index 7787445e9..c00698dff 100644 --- a/test/run_input.c +++ b/test/run_input.c @@ -18,6 +18,7 @@ */ #include "config.h" +#include "io_thread.h" #include "input_init.h" #include "input_stream.h" #include "tag_pool.h" @@ -32,6 +33,7 @@ #include #include +#include static void my_log_func(const gchar *log_domain, G_GNUC_UNUSED GLogLevelFlags log_level, @@ -122,6 +124,13 @@ int main(int argc, char **argv) tag_pool_init(); config_global_init(); + io_thread_init(); + if (!io_thread_start(&error)) { + g_warning("%s", error->message); + g_error_free(error); + return EXIT_FAILURE; + } + #ifdef ENABLE_ARCHIVE archive_plugin_init_all(); #endif @@ -155,6 +164,8 @@ int main(int argc, char **argv) archive_plugin_deinit_all(); #endif + io_thread_deinit(); + config_global_finish(); tag_pool_deinit();