ServerSocket: use the SocketMonitor class

This commit is contained in:
Max Kellermann
2013-01-15 22:50:49 +01:00
parent a0ebd444ad
commit 0dd5f2915a
5 changed files with 40 additions and 56 deletions

@ -92,13 +92,15 @@ listen_systemd_activation(GError **error_r)
bool bool
listen_global_init(GError **error_r) listen_global_init(GError **error_r)
{ {
assert(main_loop != nullptr);
int port = config_get_positive(CONF_PORT, DEFAULT_PORT); int port = config_get_positive(CONF_PORT, DEFAULT_PORT);
const struct config_param *param = const struct config_param *param =
config_get_next_param(CONF_BIND_TO_ADDRESS, NULL); config_get_next_param(CONF_BIND_TO_ADDRESS, NULL);
bool success; bool success;
GError *error = NULL; GError *error = NULL;
listen_socket = server_socket_new(listen_callback, NULL); listen_socket = server_socket_new(*main_loop, listen_callback, NULL);
if (listen_systemd_activation(&error)) if (listen_systemd_activation(&error))
return true; return true;

@ -398,6 +398,9 @@ int mpd_main(int argc, char *argv[])
return EXIT_FAILURE; return EXIT_FAILURE;
} }
main_task = g_thread_self();
main_loop = new EventLoop(EventLoop::Default());
success = listen_global_init(&error); success = listen_global_init(&error);
if (!success) { if (!success) {
g_warning("%s", error->message); g_warning("%s", error->message);
@ -407,9 +410,6 @@ int mpd_main(int argc, char *argv[])
daemonize_set_user(); daemonize_set_user();
main_task = g_thread_self();
main_loop = new EventLoop(EventLoop::Default());
GlobalEvents::Initialize(); GlobalEvents::Initialize();
GlobalEvents::Register(GlobalEvents::IDLE, idle_event_emitted); GlobalEvents::Register(GlobalEvents::IDLE, idle_event_emitted);
GlobalEvents::Register(GlobalEvents::SHUTDOWN, shutdown_event_emitted); GlobalEvents::Register(GlobalEvents::SHUTDOWN, shutdown_event_emitted);

@ -26,9 +26,9 @@
#include "ServerSocket.hxx" #include "ServerSocket.hxx"
#include "SocketUtil.hxx" #include "SocketUtil.hxx"
#include "SocketError.hxx" #include "SocketError.hxx"
#include "event/SocketMonitor.hxx"
#include "resolver.h" #include "resolver.h"
#include "fd_util.h" #include "fd_util.h"
#include "glib_socket.h"
#include <forward_list> #include <forward_list>
@ -55,24 +55,22 @@
#define DEFAULT_PORT 6600 #define DEFAULT_PORT 6600
struct OneServerSocket { struct OneServerSocket final : private SocketMonitor {
const server_socket &parent; const server_socket &parent;
const unsigned serial; const unsigned serial;
int fd;
guint source_id;
char *path; char *path;
size_t address_length; size_t address_length;
struct sockaddr *address; struct sockaddr *address;
OneServerSocket(const server_socket &_parent, unsigned _serial, OneServerSocket(EventLoop &_loop, const server_socket &_parent,
unsigned _serial,
const struct sockaddr *_address, const struct sockaddr *_address,
size_t _address_length) size_t _address_length)
:parent(_parent), serial(_serial), :SocketMonitor(_loop),
fd(-1), parent(_parent), serial(_serial),
path(nullptr), path(nullptr),
address_length(_address_length), address_length(_address_length),
address((sockaddr *)g_memdup(_address, _address_length)) address((sockaddr *)g_memdup(_address, _address_length))
@ -85,23 +83,30 @@ struct OneServerSocket {
OneServerSocket &operator=(const OneServerSocket &other) = delete; OneServerSocket &operator=(const OneServerSocket &other) = delete;
~OneServerSocket() { ~OneServerSocket() {
Close();
g_free(path); g_free(path);
g_free(address); g_free(address);
} }
bool Open(GError **error_r); bool Open(GError **error_r);
void Close(); using SocketMonitor::Close;
char *ToString() const; char *ToString() const;
void SetFD(int fd); void SetFD(int _fd) {
SocketMonitor::Open(_fd);
SocketMonitor::ScheduleRead();
}
void Accept(); void Accept();
private:
virtual void OnSocketReady(unsigned flags) override;
}; };
struct server_socket { struct server_socket {
EventLoop &loop;
server_socket_callback_t callback; server_socket_callback_t callback;
void *callback_ctx; void *callback_ctx;
@ -109,8 +114,10 @@ struct server_socket {
unsigned next_serial; unsigned next_serial;
server_socket(server_socket_callback_t _callback, void *_callback_ctx) server_socket(EventLoop &_loop,
:callback(_callback), callback_ctx(_callback_ctx), server_socket_callback_t _callback, void *_callback_ctx)
:loop(_loop),
callback(_callback), callback_ctx(_callback_ctx),
next_serial(1) {} next_serial(1) {}
void Close(); void Close();
@ -123,9 +130,10 @@ server_socket_quark(void)
} }
struct server_socket * struct server_socket *
server_socket_new(server_socket_callback_t callback, void *callback_ctx) server_socket_new(EventLoop &loop,
server_socket_callback_t callback, void *callback_ctx)
{ {
return new server_socket(callback, callback_ctx); return new server_socket(loop, callback, callback_ctx);
} }
void void
@ -177,7 +185,7 @@ OneServerSocket::Accept()
struct sockaddr_storage peer_address; struct sockaddr_storage peer_address;
size_t peer_address_length = sizeof(peer_address); size_t peer_address_length = sizeof(peer_address);
int peer_fd = int peer_fd =
accept_cloexec_nonblock(fd, (struct sockaddr*)&peer_address, accept_cloexec_nonblock(Get(), (struct sockaddr*)&peer_address,
&peer_address_length); &peer_address_length);
if (peer_fd < 0) { if (peer_fd < 0) {
const SocketErrorMessage msg; const SocketErrorMessage msg;
@ -197,35 +205,16 @@ OneServerSocket::Accept()
parent.callback_ctx); parent.callback_ctx);
} }
static gboolean
server_socket_in_event(G_GNUC_UNUSED GIOChannel *source,
G_GNUC_UNUSED GIOCondition condition,
gpointer data)
{
OneServerSocket &s = *(OneServerSocket *)data;
s.Accept();
return true;
}
void void
OneServerSocket::SetFD(int _fd) OneServerSocket::OnSocketReady(gcc_unused unsigned flags)
{ {
assert(fd < 0); Accept();
assert(_fd >= 0);
fd = _fd;
GIOChannel *channel = g_io_channel_new_socket(fd);
source_id = g_io_add_watch(channel, G_IO_IN,
server_socket_in_event, this);
g_io_channel_unref(channel);
} }
inline bool inline bool
OneServerSocket::Open(GError **error_r) OneServerSocket::Open(GError **error_r)
{ {
assert(fd < 0); assert(!IsDefined());
int _fd = socket_bind_listen(address->sa_family, int _fd = socket_bind_listen(address->sa_family,
SOCK_STREAM, 0, SOCK_STREAM, 0,
@ -309,17 +298,6 @@ server_socket_open(struct server_socket *ss, GError **error_r)
return true; return true;
} }
void
OneServerSocket::Close()
{
if (fd < 0)
return;
g_source_remove(source_id);
close_socket(fd);
fd = -1;
}
void void
server_socket::Close() server_socket::Close()
{ {
@ -340,7 +318,7 @@ server_socket_add_address(struct server_socket *ss,
{ {
assert(ss != nullptr); assert(ss != nullptr);
ss->sockets.emplace_front(*ss, ss->next_serial, ss->sockets.emplace_front(ss->loop, *ss, ss->next_serial,
address, address_length); address, address_length);
return ss->sockets.front(); return ss->sockets.front();

@ -25,6 +25,7 @@
#include <stddef.h> #include <stddef.h>
struct sockaddr; struct sockaddr;
class EventLoop;
typedef void (*server_socket_callback_t)(int fd, typedef void (*server_socket_callback_t)(int fd,
const struct sockaddr *address, const struct sockaddr *address,
@ -32,7 +33,8 @@ typedef void (*server_socket_callback_t)(int fd,
void *ctx); void *ctx);
struct server_socket * struct server_socket *
server_socket_new(server_socket_callback_t callback, void *callback_ctx); server_socket_new(EventLoop &loop,
server_socket_callback_t callback, void *callback_ctx);
void void
server_socket_free(struct server_socket *ss); server_socket_free(struct server_socket *ss);

@ -29,6 +29,7 @@
#include "icy_server.h" #include "icy_server.h"
#include "fd_util.h" #include "fd_util.h"
#include "ServerSocket.hxx" #include "ServerSocket.hxx"
#include "Main.hxx"
#include <assert.h> #include <assert.h>
@ -134,7 +135,8 @@ httpd_output_init(const struct config_param *param,
/* set up bind_to_address */ /* set up bind_to_address */
httpd->server_socket = server_socket_new(httpd_listen_in_event, httpd); httpd->server_socket = server_socket_new(*main_loop,
httpd_listen_in_event, httpd);
const char *bind_to_address = const char *bind_to_address =
config_get_block_string(param, "bind_to_address", NULL); config_get_block_string(param, "bind_to_address", NULL);