379 lines
7.9 KiB
C
379 lines
7.9 KiB
C
/*
|
|
* Copyright (C) 2003-2011 The Music Player Daemon Project
|
|
* http://www.musicpd.org
|
|
*
|
|
* This program is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation; either version 2 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License along
|
|
* with this program; if not, write to the Free Software Foundation, Inc.,
|
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
*/
|
|
|
|
#include "tcp_socket.h"
|
|
#include "fifo_buffer.h"
|
|
#include "io_thread.h"
|
|
#include "glib_socket.h"
|
|
|
|
#include <assert.h>
|
|
#include <string.h>
|
|
|
|
#ifdef WIN32
|
|
#define WINVER 0x0501
|
|
#include <ws2tcpip.h>
|
|
#include <winsock.h>
|
|
#else
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#endif
|
|
|
|
struct tcp_socket {
|
|
const struct tcp_socket_handler *handler;
|
|
void *handler_ctx;
|
|
|
|
GMutex *mutex;
|
|
|
|
GIOChannel *channel;
|
|
GSource *in_source, *out_source;
|
|
|
|
struct fifo_buffer *input, *output;
|
|
};
|
|
|
|
static gboolean
|
|
tcp_event(GIOChannel *source, GIOCondition condition, gpointer data);
|
|
|
|
static void
|
|
tcp_socket_schedule_read(struct tcp_socket *s)
|
|
{
|
|
assert(s->input != NULL);
|
|
assert(!fifo_buffer_is_full(s->input));
|
|
|
|
if (s->in_source != NULL)
|
|
return;
|
|
|
|
s->in_source = g_io_create_watch(s->channel,
|
|
G_IO_IN|G_IO_ERR|G_IO_HUP);
|
|
g_source_set_callback(s->in_source, (GSourceFunc)tcp_event, s, NULL);
|
|
g_source_attach(s->in_source, io_thread_context());
|
|
}
|
|
|
|
static void
|
|
tcp_socket_unschedule_read(struct tcp_socket *s)
|
|
{
|
|
if (s->in_source == NULL)
|
|
return;
|
|
|
|
g_source_destroy(s->in_source);
|
|
g_source_unref(s->in_source);
|
|
s->in_source = NULL;
|
|
}
|
|
|
|
static void
|
|
tcp_socket_schedule_write(struct tcp_socket *s)
|
|
{
|
|
assert(s->output != NULL);
|
|
assert(!fifo_buffer_is_empty(s->output));
|
|
|
|
if (s->out_source != NULL)
|
|
return;
|
|
|
|
s->out_source = g_io_create_watch(s->channel, G_IO_OUT);
|
|
g_source_set_callback(s->out_source, (GSourceFunc)tcp_event, s, NULL);
|
|
g_source_attach(s->out_source, io_thread_context());
|
|
}
|
|
|
|
static void
|
|
tcp_socket_unschedule_write(struct tcp_socket *s)
|
|
{
|
|
if (s->out_source == NULL)
|
|
return;
|
|
|
|
g_source_destroy(s->out_source);
|
|
g_source_unref(s->out_source);
|
|
s->out_source = NULL;
|
|
}
|
|
|
|
/**
|
|
* Close the socket. Caller must lock the mutex.
|
|
*/
|
|
static void
|
|
tcp_socket_close(struct tcp_socket *s)
|
|
{
|
|
tcp_socket_unschedule_read(s);
|
|
tcp_socket_unschedule_write(s);
|
|
|
|
if (s->channel != NULL) {
|
|
g_io_channel_unref(s->channel);
|
|
s->channel = NULL;
|
|
}
|
|
|
|
if (s->input != NULL) {
|
|
fifo_buffer_free(s->input);
|
|
s->input = NULL;
|
|
}
|
|
|
|
if (s->output != NULL) {
|
|
fifo_buffer_free(s->output);
|
|
s->output = NULL;
|
|
}
|
|
}
|
|
|
|
static gpointer
|
|
tcp_socket_close_callback(gpointer data)
|
|
{
|
|
struct tcp_socket *s = data;
|
|
|
|
g_mutex_lock(s->mutex);
|
|
tcp_socket_close(s);
|
|
g_mutex_unlock(s->mutex);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static void
|
|
tcp_socket_close_indirect(struct tcp_socket *s)
|
|
{
|
|
io_thread_call(tcp_socket_close_callback, s);
|
|
|
|
assert(s->channel == NULL);
|
|
assert(s->in_source == NULL);
|
|
assert(s->out_source == NULL);
|
|
}
|
|
|
|
static void
|
|
tcp_handle_input(struct tcp_socket *s)
|
|
{
|
|
size_t length;
|
|
const void *p = fifo_buffer_read(s->input, &length);
|
|
if (p == NULL)
|
|
return;
|
|
|
|
g_mutex_unlock(s->mutex);
|
|
size_t consumed = s->handler->data(p, length, s->handler_ctx);
|
|
g_mutex_lock(s->mutex);
|
|
if (consumed > 0 && s->input != NULL)
|
|
fifo_buffer_consume(s->input, consumed);
|
|
}
|
|
|
|
static bool
|
|
tcp_in_event(struct tcp_socket *s)
|
|
{
|
|
assert(s != NULL);
|
|
assert(s->channel != NULL);
|
|
|
|
g_mutex_lock(s->mutex);
|
|
|
|
size_t max_length;
|
|
void *p = fifo_buffer_write(s->input, &max_length);
|
|
if (p == NULL) {
|
|
GError *error = g_error_new_literal(tcp_socket_quark(), 0,
|
|
"buffer overflow");
|
|
tcp_socket_close(s);
|
|
g_mutex_unlock(s->mutex);
|
|
s->handler->error(error, s->handler_ctx);
|
|
return false;
|
|
}
|
|
|
|
gsize bytes_read;
|
|
GError *error = NULL;
|
|
GIOStatus status = g_io_channel_read_chars(s->channel,
|
|
p, max_length,
|
|
&bytes_read, &error);
|
|
switch (status) {
|
|
case G_IO_STATUS_NORMAL:
|
|
fifo_buffer_append(s->input, bytes_read);
|
|
tcp_handle_input(s);
|
|
g_mutex_unlock(s->mutex);
|
|
return true;
|
|
|
|
case G_IO_STATUS_AGAIN:
|
|
/* try again later */
|
|
g_mutex_unlock(s->mutex);
|
|
return true;
|
|
|
|
case G_IO_STATUS_EOF:
|
|
/* peer disconnected */
|
|
tcp_socket_close(s);
|
|
g_mutex_unlock(s->mutex);
|
|
s->handler->disconnected(s->handler_ctx);
|
|
return false;
|
|
|
|
case G_IO_STATUS_ERROR:
|
|
/* I/O error */
|
|
tcp_socket_close(s);
|
|
g_mutex_unlock(s->mutex);
|
|
s->handler->error(error, s->handler_ctx);
|
|
return false;
|
|
}
|
|
|
|
/* unreachable */
|
|
assert(false);
|
|
return true;
|
|
}
|
|
|
|
static bool
|
|
tcp_out_event(struct tcp_socket *s)
|
|
{
|
|
assert(s != NULL);
|
|
assert(s->channel != NULL);
|
|
|
|
g_mutex_lock(s->mutex);
|
|
|
|
size_t length;
|
|
const void *p = fifo_buffer_read(s->output, &length);
|
|
if (p == NULL) {
|
|
/* no more data in the output buffer, remove the
|
|
output event */
|
|
tcp_socket_unschedule_write(s);
|
|
g_mutex_unlock(s->mutex);
|
|
return false;
|
|
}
|
|
|
|
gsize bytes_written;
|
|
GError *error = NULL;
|
|
GIOStatus status = g_io_channel_write_chars(s->channel, p, length,
|
|
&bytes_written, &error);
|
|
switch (status) {
|
|
case G_IO_STATUS_NORMAL:
|
|
fifo_buffer_consume(s->output, bytes_written);
|
|
g_mutex_unlock(s->mutex);
|
|
return true;
|
|
|
|
case G_IO_STATUS_AGAIN:
|
|
tcp_socket_schedule_write(s);
|
|
g_mutex_unlock(s->mutex);
|
|
return true;
|
|
|
|
case G_IO_STATUS_EOF:
|
|
/* peer disconnected */
|
|
tcp_socket_close(s);
|
|
g_mutex_unlock(s->mutex);
|
|
s->handler->disconnected(s->handler_ctx);
|
|
return false;
|
|
|
|
case G_IO_STATUS_ERROR:
|
|
/* I/O error */
|
|
tcp_socket_close(s);
|
|
g_mutex_unlock(s->mutex);
|
|
s->handler->error(error, s->handler_ctx);
|
|
return false;
|
|
}
|
|
|
|
/* unreachable */
|
|
g_mutex_unlock(s->mutex);
|
|
assert(false);
|
|
return true;
|
|
}
|
|
|
|
static gboolean
|
|
tcp_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition,
|
|
gpointer data)
|
|
{
|
|
struct tcp_socket *s = data;
|
|
|
|
assert(source == s->channel);
|
|
|
|
switch (condition) {
|
|
case G_IO_IN:
|
|
case G_IO_PRI:
|
|
return tcp_in_event(s);
|
|
|
|
case G_IO_OUT:
|
|
return tcp_out_event(s);
|
|
|
|
case G_IO_ERR:
|
|
case G_IO_HUP:
|
|
case G_IO_NVAL:
|
|
tcp_socket_close(s);
|
|
s->handler->disconnected(s->handler_ctx);
|
|
return false;
|
|
}
|
|
|
|
/* unreachable */
|
|
assert(false);
|
|
return false;
|
|
}
|
|
|
|
struct tcp_socket *
|
|
tcp_socket_new(int fd,
|
|
const struct tcp_socket_handler *handler, void *ctx)
|
|
{
|
|
assert(fd >= 0);
|
|
assert(handler != NULL);
|
|
assert(handler->data != NULL);
|
|
assert(handler->error != NULL);
|
|
assert(handler->disconnected != NULL);
|
|
|
|
struct tcp_socket *s = g_new(struct tcp_socket, 1);
|
|
s->handler = handler;
|
|
s->handler_ctx = ctx;
|
|
s->mutex = g_mutex_new();
|
|
|
|
g_mutex_lock(s->mutex);
|
|
|
|
s->channel = g_io_channel_new_socket(fd);
|
|
/* GLib is responsible for closing the file descriptor */
|
|
g_io_channel_set_close_on_unref(s->channel, true);
|
|
/* NULL encoding means the stream is binary safe */
|
|
g_io_channel_set_encoding(s->channel, NULL, NULL);
|
|
/* no buffering */
|
|
g_io_channel_set_buffered(s->channel, false);
|
|
|
|
s->input = fifo_buffer_new(4096);
|
|
s->output = fifo_buffer_new(4096);
|
|
|
|
s->in_source = NULL;
|
|
s->out_source = NULL;
|
|
|
|
tcp_socket_schedule_read(s);
|
|
|
|
g_mutex_unlock(s->mutex);
|
|
|
|
return s;
|
|
}
|
|
|
|
void
|
|
tcp_socket_free(struct tcp_socket *s)
|
|
{
|
|
tcp_socket_close_indirect(s);
|
|
g_mutex_free(s->mutex);
|
|
g_free(s);
|
|
}
|
|
|
|
bool
|
|
tcp_socket_send(struct tcp_socket *s, const void *data, size_t length)
|
|
{
|
|
assert(s != NULL);
|
|
|
|
g_mutex_lock(s->mutex);
|
|
|
|
if (s->output == NULL || s->channel == NULL) {
|
|
/* already disconnected */
|
|
g_mutex_unlock(s->mutex);
|
|
return false;
|
|
}
|
|
|
|
size_t max_length;
|
|
void *p = fifo_buffer_write(s->output, &max_length);
|
|
if (p == NULL || max_length < length) {
|
|
/* buffer is full */
|
|
g_mutex_unlock(s->mutex);
|
|
return false;
|
|
}
|
|
|
|
memcpy(p, data, length);
|
|
fifo_buffer_append(s->output, length);
|
|
tcp_socket_schedule_write(s);
|
|
|
|
g_mutex_unlock(s->mutex);
|
|
return true;
|
|
}
|
|
|