diff --git a/Makefile.am b/Makefile.am index 6f9963a16..fc78a8e2c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1014,6 +1014,7 @@ test_run_ntp_server_LDADD = $(MPD_LIBS) \ $(GLIB_LIBS) test_run_ntp_server_SOURCES = test/run_ntp_server.c \ test/signals.c test/signals.h \ + src/io_thread.c src/io_thread.h \ src/ntp_server.c src/ntp_server.h test_run_filter_CPPFLAGS = $(AM_CPPFLAGS) @@ -1119,6 +1120,7 @@ test_run_output_LDADD = $(MPD_LIBS) \ test_run_output_SOURCES = test/run_output.c \ test/stdbin.h \ src/conf.c src/tokenizer.c src/utils.c src/string_util.c src/log.c \ + src/io_thread.c src/io_thread.h \ src/audio_check.c \ src/audio_format.c \ src/audio_parser.c \ diff --git a/src/ntp_server.c b/src/ntp_server.c index 8e4aca6bd..d2e45bcac 100644 --- a/src/ntp_server.c +++ b/src/ntp_server.c @@ -18,8 +18,10 @@ */ #include "ntp_server.h" +#include "io_thread.h" #include +#include #include #include #include @@ -31,7 +33,6 @@ #include #include #else -#include #include #endif @@ -75,7 +76,7 @@ fill_time_buffer(unsigned char *buffer) fill_time_buffer_with_time(buffer, ¤t_time); } -bool +static bool ntp_server_handle(struct ntp_server *ntp) { unsigned char buf[32]; @@ -102,25 +103,14 @@ ntp_server_handle(struct ntp_server *ntp) return num_bytes == sizeof(buf); } -bool -ntp_server_check(struct ntp_server *ntp, struct timeval *tout) +static gboolean +ntp_in_event(G_GNUC_UNUSED GIOChannel *source, + G_GNUC_UNUSED GIOCondition condition, + gpointer data) { - fd_set rdfds; - int fdmax = 0; + struct ntp_server *ntp = data; - FD_ZERO(&rdfds); - - FD_SET(ntp->fd, &rdfds); - fdmax = ntp->fd; - if (select(fdmax + 1, &rdfds,NULL, NULL, tout) <= 0) - return false; - - if (FD_ISSET(ntp->fd, &rdfds)) { - if (!ntp_server_handle(ntp)) { - g_debug("unable to send timing response\n"); - return false; - } - } + ntp_server_handle(ntp); return true; } @@ -131,9 +121,41 @@ ntp_server_init(struct ntp_server *ntp) ntp->fd = -1; } +void +ntp_server_open(struct ntp_server *ntp, int fd) +{ + assert(ntp->fd < 0); + assert(fd >= 0); + + ntp->fd = fd; + +#ifndef G_OS_WIN32 + ntp->channel = g_io_channel_unix_new(fd); +#else + ntp->channel = g_io_channel_win32_new_socket(fd); +#endif + /* NULL encoding means the stream is binary safe */ + g_io_channel_set_encoding(ntp->channel, NULL, NULL); + /* no buffering */ + g_io_channel_set_buffered(ntp->channel, false); + + ntp->source = g_io_create_watch(ntp->channel, G_IO_IN); + g_source_set_callback(ntp->source, (GSourceFunc)ntp_in_event, ntp, + NULL); + g_source_attach(ntp->source, io_thread_context()); +} + void ntp_server_close(struct ntp_server *ntp) { + if (ntp->source != NULL) { + g_source_destroy(ntp->source); + g_source_unref(ntp->source); + } + + if (ntp->channel != NULL) + g_io_channel_unref(ntp->channel); + if (ntp->fd >= 0) close(ntp->fd); } diff --git a/src/ntp_server.h b/src/ntp_server.h index 56593fef0..2b970dff2 100644 --- a/src/ntp_server.h +++ b/src/ntp_server.h @@ -20,6 +20,8 @@ #ifndef MPD_NTP_SERVER_H #define MPD_NTP_SERVER_H +#include + #include struct timeval; @@ -27,24 +29,18 @@ struct timeval; struct ntp_server { unsigned short port; int fd; + + GIOChannel *channel; + GSource *source; }; void ntp_server_init(struct ntp_server *ntp); +void +ntp_server_open(struct ntp_server *ntp, int fd); + void ntp_server_close(struct ntp_server *ntp); -/* - * Recv the NTP datagram from the AirTunes, send back an NTP response. - */ -bool -ntp_server_handle(struct ntp_server *ntp); - -/* - * check to see if there are any timing requests, and respond if there are any - */ -bool -ntp_server_check(struct ntp_server *ntp, struct timeval *tout); - #endif diff --git a/src/output/raop_output_plugin.c b/src/output/raop_output_plugin.c index a498ca670..912048007 100644 --- a/src/output/raop_output_plugin.c +++ b/src/output/raop_output_plugin.c @@ -535,15 +535,11 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd, while (true) { FD_ZERO(&rdfds); FD_SET(rtspcld->fd, &rdfds); - FD_SET(raop_session->ntp.fd, &rdfds); - fdmax = raop_session->ntp.fd > rtspcld->fd ? raop_session->ntp.fd : rtspcld->fd;; + fdmax = rtspcld->fd; select(fdmax + 1, &rdfds, NULL, NULL, &tout); if (FD_ISSET(rtspcld->fd, &rdfds)) { break; } - if (FD_ISSET(raop_session->ntp.fd, &rdfds)) { - ntp_server_handle(&raop_session->ntp); - } } if (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) <= 0) { @@ -1068,20 +1064,14 @@ static bool send_audio_data(int fd, GError **error_r) { int i = 0; - struct timeval current_time, tout, rtp_time; + struct timeval current_time, rtp_time; struct raop_data *rd = raop_session->raop_list; get_time_for_rtp(&raop_session->play_state, &rtp_time); gettimeofday(¤t_time, NULL); int diff = difference(¤t_time, &rtp_time); + g_usleep(-diff); - while (diff < -10000) { - tout.tv_sec = 0; - tout.tv_usec = -diff; - ntp_server_check(&raop_session->ntp, &tout); - gettimeofday(¤t_time, NULL); - diff = difference(¤t_time, &rtp_time); - } gettimeofday(&raop_session->play_state.last_send, NULL); while (rd) { if (rd->started) { @@ -1214,10 +1204,8 @@ raop_output_cancel(void *data) static bool raop_output_pause(void *data) { - struct timeval tout = {.tv_sec = 0, .tv_usec = 0}; struct raop_data *rd = (struct raop_data *) data; - ntp_server_check(&raop_session->ntp, &tout); rd->paused = true; return true; } @@ -1284,17 +1272,18 @@ raop_output_open(void *data, struct audio_format *audio_format, GError **error_r if (raop_session->data_fd < 0) return false; - raop_session->ntp.fd = - open_udp_socket(NULL, &raop_session->ntp.port, - error_r); - if (raop_session->ntp.fd < 0) + int fd = open_udp_socket(NULL, &raop_session->ntp.port, + error_r); + if (fd < 0) return false; + ntp_server_open(&raop_session->ntp, fd); + raop_session->ctrl.fd = open_udp_socket(NULL, &raop_session->ctrl.port, error_r); if (raop_session->ctrl.fd < 0) { - close(raop_session->ntp.fd); + ntp_server_close(&raop_session->ntp); raop_session->ctrl.fd = -1; g_mutex_unlock(raop_session->list_mutex); return false; @@ -1324,7 +1313,6 @@ raop_output_play(void *data, const void *chunk, size_t size, { //raopcl_send_sample struct raop_data *rd = data; - struct timeval tout = {.tv_sec = 0, .tv_usec = 0}; size_t rval = 0, orig_size = size; rd->paused = false; @@ -1335,8 +1323,6 @@ raop_output_play(void *data, const void *chunk, size_t size, g_mutex_lock(raop_session->data_mutex); - ntp_server_check(&raop_session->ntp, &tout); - if (raop_session->play_state.rtptime <= NUMSAMPLES) { // looped over, need new reference point to calculate correct times raop_session->play_state.playing = false; diff --git a/test/run_ntp_server.c b/test/run_ntp_server.c index 7268fa94c..db24059ab 100644 --- a/test/run_ntp_server.c +++ b/test/run_ntp_server.c @@ -20,6 +20,7 @@ #include "config.h" #include "ntp_server.h" #include "signals.h" +#include "io_thread.h" #include @@ -39,12 +40,10 @@ #include #endif -static bool quit = false; - void on_quit(void) { - quit = true; + io_thread_quit(); } static int bind_host(int sd, char *hostname, unsigned long ulAddr, @@ -122,27 +121,25 @@ open_udp_socket(char *hostname, unsigned short *port) int main(G_GNUC_UNUSED int argc, G_GNUC_UNUSED char **argv) { + g_thread_init(NULL); signals_init(); + io_thread_init(); struct ntp_server ntp; ntp_server_init(&ntp); - ntp.fd = open_udp_socket(NULL, &ntp.port); - if (ntp.fd < 0) { + int fd = open_udp_socket(NULL, &ntp.port); + if (fd < 0) { g_printerr("Failed to create UDP socket\n"); ntp_server_close(&ntp); return EXIT_FAILURE; } - while (!quit) { - struct timeval tv = { - .tv_sec = 1, - .tv_usec = 0, - }; + ntp_server_open(&ntp, fd); - ntp_server_check(&ntp, &tv); - } + io_thread_run(); ntp_server_close(&ntp); + io_thread_deinit(); return EXIT_SUCCESS; } diff --git a/test/run_output.c b/test/run_output.c index 8a34fedec..b4d2b8f9c 100644 --- a/test/run_output.c +++ b/test/run_output.c @@ -18,6 +18,7 @@ */ #include "config.h" +#include "io_thread.h" #include "output_plugin.h" #include "output_internal.h" #include "output_control.h" @@ -36,6 +37,7 @@ #include #include #include +#include struct playlist g_playlist; @@ -146,6 +148,13 @@ int main(int argc, char **argv) return 1; } + io_thread_init(); + if (!io_thread_start(&error)) { + g_warning("%s", error->message); + g_error_free(error); + return EXIT_FAILURE; + } + /* initialize the audio output */ if (!load_audio_output(&ao, argv[2])) @@ -216,6 +225,8 @@ int main(int argc, char **argv) ao_plugin_finish(ao.plugin, ao.data); g_mutex_free(ao.mutex); + io_thread_deinit(); + config_global_finish(); return 0;