From 9ccaa904393ddf2189f7d7815cef29a3e3393cbc Mon Sep 17 00:00:00 2001
From: Max Kellermann <max@duempel.org>
Date: Sun, 28 Aug 2011 17:29:09 +0200
Subject: [PATCH] ntp_server: use the I/O thread

---
 Makefile.am                     |  2 ++
 src/ntp_server.c                | 60 ++++++++++++++++++++++-----------
 src/ntp_server.h                | 20 +++++------
 src/output/raop_output_plugin.c | 32 +++++-------------
 test/run_ntp_server.c           | 21 +++++-------
 test/run_output.c               | 11 ++++++
 6 files changed, 80 insertions(+), 66 deletions(-)

diff --git a/Makefile.am b/Makefile.am
index 6f9963a16..fc78a8e2c 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -1014,6 +1014,7 @@ test_run_ntp_server_LDADD = $(MPD_LIBS) \
 	$(GLIB_LIBS)
 test_run_ntp_server_SOURCES = test/run_ntp_server.c \
 	test/signals.c test/signals.h \
+	src/io_thread.c src/io_thread.h \
 	src/ntp_server.c src/ntp_server.h
 
 test_run_filter_CPPFLAGS = $(AM_CPPFLAGS)
@@ -1119,6 +1120,7 @@ test_run_output_LDADD = $(MPD_LIBS) \
 test_run_output_SOURCES = test/run_output.c \
 	test/stdbin.h \
 	src/conf.c src/tokenizer.c src/utils.c src/string_util.c src/log.c \
+	src/io_thread.c src/io_thread.h \
 	src/audio_check.c \
 	src/audio_format.c \
 	src/audio_parser.c \
diff --git a/src/ntp_server.c b/src/ntp_server.c
index 8e4aca6bd..d2e45bcac 100644
--- a/src/ntp_server.c
+++ b/src/ntp_server.c
@@ -18,8 +18,10 @@
  */
 
 #include "ntp_server.h"
+#include "io_thread.h"
 
 #include <glib.h>
+#include <assert.h>
 #include <stddef.h>
 #include <stdint.h>
 #include <string.h>
@@ -31,7 +33,6 @@
 #include <ws2tcpip.h>
 #include <winsock.h>
 #else
-#include <sys/select.h>
 #include <sys/socket.h>
 #endif
 
@@ -75,7 +76,7 @@ fill_time_buffer(unsigned char *buffer)
 	fill_time_buffer_with_time(buffer, &current_time);
 }
 
-bool
+static bool
 ntp_server_handle(struct ntp_server *ntp)
 {
 	unsigned char buf[32];
@@ -102,25 +103,14 @@ ntp_server_handle(struct ntp_server *ntp)
 	return num_bytes == sizeof(buf);
 }
 
-bool
-ntp_server_check(struct ntp_server *ntp, struct timeval *tout)
+static gboolean
+ntp_in_event(G_GNUC_UNUSED GIOChannel *source,
+	     G_GNUC_UNUSED GIOCondition condition,
+	     gpointer data)
 {
-	fd_set rdfds;
-	int fdmax = 0;
+	struct ntp_server *ntp = data;
 
-	FD_ZERO(&rdfds);
-
-	FD_SET(ntp->fd, &rdfds);
-	fdmax = ntp->fd;
-	if (select(fdmax + 1, &rdfds,NULL, NULL, tout) <= 0)
-		return false;
-
-	if (FD_ISSET(ntp->fd, &rdfds)) {
-		if (!ntp_server_handle(ntp)) {
-			g_debug("unable to send timing response\n");
-			return false;
-		}
-	}
+	ntp_server_handle(ntp);
 	return true;
 }
 
@@ -131,9 +121,41 @@ ntp_server_init(struct ntp_server *ntp)
 	ntp->fd = -1;
 }
 
+void
+ntp_server_open(struct ntp_server *ntp, int fd)
+{
+	assert(ntp->fd < 0);
+	assert(fd >= 0);
+
+	ntp->fd = fd;
+
+#ifndef G_OS_WIN32
+	ntp->channel = g_io_channel_unix_new(fd);
+#else
+	ntp->channel = g_io_channel_win32_new_socket(fd);
+#endif
+	/* NULL encoding means the stream is binary safe */
+	g_io_channel_set_encoding(ntp->channel, NULL, NULL);
+	/* no buffering */
+	g_io_channel_set_buffered(ntp->channel, false);
+
+	ntp->source = g_io_create_watch(ntp->channel, G_IO_IN);
+	g_source_set_callback(ntp->source, (GSourceFunc)ntp_in_event, ntp,
+			      NULL);
+	g_source_attach(ntp->source, io_thread_context());
+}
+
 void
 ntp_server_close(struct ntp_server *ntp)
 {
+	if (ntp->source != NULL) {
+		g_source_destroy(ntp->source);
+		g_source_unref(ntp->source);
+	}
+
+	if (ntp->channel != NULL)
+		g_io_channel_unref(ntp->channel);
+
 	if (ntp->fd >= 0)
 		close(ntp->fd);
 }
diff --git a/src/ntp_server.h b/src/ntp_server.h
index 56593fef0..2b970dff2 100644
--- a/src/ntp_server.h
+++ b/src/ntp_server.h
@@ -20,6 +20,8 @@
 #ifndef MPD_NTP_SERVER_H
 #define MPD_NTP_SERVER_H
 
+#include <glib.h>
+
 #include <stdbool.h>
 
 struct timeval;
@@ -27,24 +29,18 @@ struct timeval;
 struct ntp_server {
 	unsigned short port;
 	int fd;
+
+	GIOChannel *channel;
+	GSource *source;
 };
 
 void
 ntp_server_init(struct ntp_server *ntp);
 
+void
+ntp_server_open(struct ntp_server *ntp, int fd);
+
 void
 ntp_server_close(struct ntp_server *ntp);
 
-/*
- * Recv the NTP datagram from the AirTunes, send back an NTP response.
- */
-bool
-ntp_server_handle(struct ntp_server *ntp);
-
-/*
- * check to see if there are any timing requests, and respond if there are any
- */
-bool
-ntp_server_check(struct ntp_server *ntp, struct timeval *tout);
-
 #endif
diff --git a/src/output/raop_output_plugin.c b/src/output/raop_output_plugin.c
index a498ca670..912048007 100644
--- a/src/output/raop_output_plugin.c
+++ b/src/output/raop_output_plugin.c
@@ -535,15 +535,11 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
 	while (true) {
 		FD_ZERO(&rdfds);
 		FD_SET(rtspcld->fd, &rdfds);
-		FD_SET(raop_session->ntp.fd, &rdfds);
-		fdmax = raop_session->ntp.fd > rtspcld->fd ? raop_session->ntp.fd : rtspcld->fd;;
+		fdmax = rtspcld->fd;
 		select(fdmax + 1, &rdfds, NULL, NULL, &tout);
 		if (FD_ISSET(rtspcld->fd, &rdfds)) {
 			break;
 		}
-		if (FD_ISSET(raop_session->ntp.fd, &rdfds)) {
-			ntp_server_handle(&raop_session->ntp);
-		}
 	}
 
 	if (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) <= 0) {
@@ -1068,20 +1064,14 @@ static bool
 send_audio_data(int fd, GError **error_r)
 {
 	int i = 0;
-	struct timeval current_time, tout, rtp_time;
+	struct timeval current_time, rtp_time;
 	struct raop_data *rd = raop_session->raop_list;
 
 	get_time_for_rtp(&raop_session->play_state, &rtp_time);
 	gettimeofday(&current_time, NULL);
 	int diff = difference(&current_time, &rtp_time);
+	g_usleep(-diff);
 
-	while (diff < -10000) {
-		tout.tv_sec = 0;
-		tout.tv_usec = -diff;
-		ntp_server_check(&raop_session->ntp, &tout);
-		gettimeofday(&current_time, NULL);
-		diff = difference(&current_time, &rtp_time);
-	}
 	gettimeofday(&raop_session->play_state.last_send, NULL);
 	while (rd) {
 		if (rd->started) {
@@ -1214,10 +1204,8 @@ raop_output_cancel(void *data)
 static bool
 raop_output_pause(void *data)
 {
-	struct timeval tout = {.tv_sec = 0, .tv_usec = 0};
 	struct raop_data *rd = (struct raop_data *) data;
 
-	ntp_server_check(&raop_session->ntp, &tout);
 	rd->paused = true;
 	return true;
 }
@@ -1284,17 +1272,18 @@ raop_output_open(void *data, struct audio_format *audio_format, GError **error_r
 		if (raop_session->data_fd < 0)
 			return false;
 
-		raop_session->ntp.fd =
-			open_udp_socket(NULL, &raop_session->ntp.port,
-					error_r);
-		if (raop_session->ntp.fd < 0)
+		int fd = open_udp_socket(NULL, &raop_session->ntp.port,
+					 error_r);
+		if (fd < 0)
 			return false;
 
+		ntp_server_open(&raop_session->ntp, fd);
+
 		raop_session->ctrl.fd =
 			open_udp_socket(NULL, &raop_session->ctrl.port,
 					error_r);
 		if (raop_session->ctrl.fd < 0) {
-			close(raop_session->ntp.fd);
+			ntp_server_close(&raop_session->ntp);
 			raop_session->ctrl.fd = -1;
 			g_mutex_unlock(raop_session->list_mutex);
 			return false;
@@ -1324,7 +1313,6 @@ raop_output_play(void *data, const void *chunk, size_t size,
 {
 	//raopcl_send_sample
 	struct raop_data *rd = data;
-	struct timeval tout = {.tv_sec = 0, .tv_usec = 0};
 	size_t rval = 0, orig_size = size;
 
 	rd->paused = false;
@@ -1335,8 +1323,6 @@ raop_output_play(void *data, const void *chunk, size_t size,
 
 	g_mutex_lock(raop_session->data_mutex);
 
-	ntp_server_check(&raop_session->ntp, &tout);
-
 	if (raop_session->play_state.rtptime <= NUMSAMPLES) {
 		// looped over, need new reference point to calculate correct times
 		raop_session->play_state.playing = false;
diff --git a/test/run_ntp_server.c b/test/run_ntp_server.c
index 7268fa94c..db24059ab 100644
--- a/test/run_ntp_server.c
+++ b/test/run_ntp_server.c
@@ -20,6 +20,7 @@
 #include "config.h"
 #include "ntp_server.h"
 #include "signals.h"
+#include "io_thread.h"
 
 #include <glib.h>
 
@@ -39,12 +40,10 @@
 #include <arpa/inet.h>
 #endif
 
-static bool quit = false;
-
 void
 on_quit(void)
 {
-	quit = true;
+	io_thread_quit();
 }
 
 static int bind_host(int sd, char *hostname, unsigned long ulAddr,
@@ -122,27 +121,25 @@ open_udp_socket(char *hostname, unsigned short *port)
 int
 main(G_GNUC_UNUSED int argc, G_GNUC_UNUSED char **argv)
 {
+	g_thread_init(NULL);
 	signals_init();
+	io_thread_init();
 
 	struct ntp_server ntp;
 	ntp_server_init(&ntp);
 
-	ntp.fd = open_udp_socket(NULL, &ntp.port);
-	if (ntp.fd < 0) {
+	int fd = open_udp_socket(NULL, &ntp.port);
+	if (fd < 0) {
 		g_printerr("Failed to create UDP socket\n");
 		ntp_server_close(&ntp);
 		return EXIT_FAILURE;
 	}
 
-	while (!quit) {
-		struct timeval tv = {
-			.tv_sec = 1,
-			.tv_usec = 0,
-		};
+	ntp_server_open(&ntp, fd);
 
-		ntp_server_check(&ntp, &tv);
-	}
+	io_thread_run();
 
 	ntp_server_close(&ntp);
+	io_thread_deinit();
 	return EXIT_SUCCESS;
 }
diff --git a/test/run_output.c b/test/run_output.c
index 8a34fedec..b4d2b8f9c 100644
--- a/test/run_output.c
+++ b/test/run_output.c
@@ -18,6 +18,7 @@
  */
 
 #include "config.h"
+#include "io_thread.h"
 #include "output_plugin.h"
 #include "output_internal.h"
 #include "output_control.h"
@@ -36,6 +37,7 @@
 #include <assert.h>
 #include <string.h>
 #include <unistd.h>
+#include <stdlib.h>
 
 struct playlist g_playlist;
 
@@ -146,6 +148,13 @@ int main(int argc, char **argv)
 		return 1;
 	}
 
+	io_thread_init();
+	if (!io_thread_start(&error)) {
+		g_warning("%s", error->message);
+		g_error_free(error);
+		return EXIT_FAILURE;
+	}
+
 	/* initialize the audio output */
 
 	if (!load_audio_output(&ao, argv[2]))
@@ -216,6 +225,8 @@ int main(int argc, char **argv)
 	ao_plugin_finish(ao.plugin, ao.data);
 	g_mutex_free(ao.mutex);
 
+	io_thread_deinit();
+
 	config_global_finish();
 
 	return 0;