From ca876988d782bb0bb1962678b230f901dab0930e Mon Sep 17 00:00:00 2001 From: Love Hornquist Astrand Date: Mon, 23 Nov 2009 12:47:39 -0800 Subject: [PATCH] add gcd version, less bits on pipe, run socket in non-blocking mode --- lib/ipc/server.c | 234 ++++++++++++++++++++++++++++------------------- 1 file changed, 138 insertions(+), 96 deletions(-) diff --git a/lib/ipc/server.c b/lib/ipc/server.c index 7a8905ec5..74c0f1a2d 100644 --- a/lib/ipc/server.c +++ b/lib/ipc/server.c @@ -43,8 +43,6 @@ struct heim_sipc { void *mech; }; -#undef HAVE_GCD - #if defined(__APPLE__) && defined(HAVE_GCD) #include "heim_ipcServer.h" @@ -55,6 +53,8 @@ static dispatch_source_t timer; static dispatch_queue_t timerq; static uint64_t timeoutvalue; +static dispatch_queue_t eventq; + static dispatch_queue_t workq; static void @@ -84,6 +84,7 @@ init_globals(void) dispatch_source_set_event_handler(timer, ^{ timer_ev(); } ); workq = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); + eventq = dispatch_queue_create("heim-ipc.event-queue", NULL); }); } @@ -455,9 +456,10 @@ struct client { }; #ifndef HAVE_GCD - static unsigned num_clients = 0; static struct client **clients = NULL; +#endif + static struct client * add_new_socket(int fd, @@ -466,6 +468,7 @@ add_new_socket(int fd, void *userctx) { struct client *c; + int one = 1; c = calloc(1, sizeof(*c)); if (c == NULL) @@ -481,9 +484,13 @@ add_new_socket(int fd, c->callback = callback; c->userctx = userctx; + (void)ioctl(c->fd, FIONBIO, (void *)&one); + +#ifndef HAVE_GCD clients = erealloc(clients, sizeof(clients[0]) * (num_clients + 1)); clients[num_clients] = c; num_clients++; +#endif return c; } @@ -532,8 +539,96 @@ socket_complete(heim_sipc_call ctx, int returnvalue, heim_idata *reply) sc->c = NULL; /* so we can catch double complete */ free(sc); + if (c->calls == 0 && (c->flags & WAITING_CLOSE)) { + close(c->fd); + free(c); + } } +static void +handle_read(struct client *c) +{ + ssize_t len; + uint32_t dlen; + + if (c->flags & LISTEN_SOCKET) { + add_new_socket(c->fd, + WAITING_READ, + c->callback, + c->userctx); + return; + } + + if (c->ptr - c->len < 1024) { + c->inmsg = erealloc(c->inmsg, + c->len + 1024); + c->len += 1024; + } + + len = read(c->fd, c->inmsg + c->ptr, c->len - c->ptr); + if (len <= 0) { + c->flags |= WAITING_CLOSE; + return; + } + c->ptr += len; + if (c->ptr > c->len) + abort(); + + while (c->ptr >= sizeof(dlen)) { + struct socket_call *cs; + + memcpy(&dlen, c->inmsg, sizeof(dlen)); + dlen = ntohl(dlen); + + if (dlen < c->ptr - sizeof(dlen)) + break; + + cs = emalloc(sizeof(*cs)); + cs->c = c; + cs->in.data = emalloc(dlen); + memcpy(cs->in.data, c->inmsg + sizeof(dlen), dlen); + cs->in.length = dlen; + + c->ptr -= sizeof(dlen) + dlen; + memmove(c->inmsg, + c->inmsg + sizeof(dlen) + dlen, + c->ptr); + + c->calls++; + c->callback(c->userctx, &cs->in, + NULL, socket_complete, + (heim_sipc_call)cs); + } +} + +static void +handle_write(struct client *c) +{ + ssize_t len; + + len = write(c->fd, c->outmsg, c->olen); + if (len <= 0) { + c->flags |= WAITING_CLOSE; + return; + } + if (c->olen != len) { + memmove(&c->outmsg[0], &c->outmsg[len], c->olen - len); + c->olen -= len; + } else { + c->olen = 0; + free(c->outmsg); + c->outmsg = NULL; + + c->flags &= WAITING_WRITE; + } +} + + +#ifdef HAVE_GCD + + +#else + static void process_loop(void) { @@ -570,104 +665,33 @@ process_loop(void) continue; } - if (fds[n].revents & POLLIN) { - ssize_t len; - uint32_t dlen; - - if (clients[n]->flags & LISTEN_SOCKET) { - add_new_socket(clients[n]->fd, - WAITING_READ, - clients[n]->callback, - clients[n]->userctx); - continue; - } - - if (clients[n]->ptr - clients[n]->len < 1024) { - clients[n]->inmsg = erealloc(clients[n]->inmsg, - clients[n]->len + 1024); - clients[n]->len += 1024; - } - - len = read(clients[n]->fd, - clients[n]->inmsg + clients[n]->ptr, - clients[n]->len - clients[n]->ptr); - if (len <= 0) { - clients[n]->flags |= WAITING_CLOSE; - continue; - } - clients[n]->ptr += len; - if (clients[n]->ptr > clients[n]->len) - abort(); - - while (clients[n]->ptr >= sizeof(dlen)) { - struct socket_call *cs; - - memcpy(&dlen, clients[n]->inmsg, sizeof(dlen)); - dlen = ntohl(dlen); - - if (dlen < clients[n]->ptr - sizeof(dlen)) - break; - - cs = emalloc(sizeof(*cs)); - cs->c = clients[n]; - cs->in.data = emalloc(dlen); - memcpy(cs->in.data, clients[n]->inmsg + sizeof(dlen), dlen); - cs->in.length = dlen; - - clients[n]->ptr -= sizeof(dlen) + dlen; - memmove(clients[n]->inmsg, - clients[n]->inmsg + sizeof(dlen) + dlen, - clients[n]->ptr); - - clients[n]->calls++; - clients[n]->callback(clients[n]->userctx, &cs->in, - NULL, socket_complete, - (heim_sipc_call)cs); - } - } - if (fds[n].revents & POLLOUT) { - ssize_t len; - - len = write(clients[n]->fd, - clients[n]->outmsg, - clients[n]->olen); - if (len <= 0) { - clients[n]->flags |= WAITING_CLOSE; - continue; - } - if (clients[n]->olen != len) { - memmove(&clients[n]->outmsg[0], - &clients[n]->outmsg[len], - clients[n]->olen - len); - clients[n]->olen -= len; - } else { - clients[n]->olen = 0; - free(clients[n]->outmsg); - clients[n]->outmsg = NULL; - - clients[n]->flags &= WAITING_WRITE; - } - } + if (fds[n].revents & POLLIN) + handle_read(clients[n]); + if (fds[n].revents & POLLOUT) + handle_write(clients[n]); } n = 0; while (n < num_clients) { struct client *c = clients[n]; - if ((c->flags & WAITING_CLOSE) == 0 || c->calls) { + if ((c->flags & WAITING_CLOSE) == 0) { n++; continue; } - close(c->fd); - free(c); + if (c->calls == 0) { + close(c->fd); + free(c); + } clients[n] = clients[num_clients - 1]; num_clients--; } - free(fds); } } +#endif + static int socket_release(heim_sipc ctx) { @@ -676,25 +700,43 @@ socket_release(heim_sipc ctx) return 0; } -#endif - int heim_sipc_launchd_stream_fd_init(int fd, heim_ipc_callback callback, void *user, heim_sipc *ctx) { -#ifndef HAVE_GCD heim_sipc ct = calloc(1, sizeof(*ct)); - - ct->mech = add_new_socket(fd, LISTEN_SOCKET|WAITING_READ, callback, user); + struct client *c; + + c = add_new_socket(fd, LISTEN_SOCKET|WAITING_READ, callback, user); + +#ifndef HAVE_GCD +#else + dispatch_source_t in, out; + + init_globals(); + + in = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0, eventq); + out = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0, eventq); + + dispatch_source_set_event_handler(in, ^{ + int rw = (c->flags & WAITING_WRITE); + handle_read(c); + if (rw == 0 && (c->flags & WAITING_WRITE)) + dispatch_resume(out); + }); + dispatch_source_set_event_handler(out, ^{ + handle_write(c); + if ((c->flags & WAITING_WRITE) == 0) + dispatch_suspend(out); + }); + + dispatch_resume(in); +#endif + ct->mech = c; ct->release = socket_release; *ctx = ct; return 0; -#else - close(fd); - *ctx = NULL; - return EINVAL; -#endif } @@ -734,7 +776,7 @@ heim_sipc_service_unix(const char *service, return errno; } - chmod(un.sun_path, 0777); + chmod(un.sun_path, 0666); return heim_sipc_launchd_stream_fd_init(fd, callback, user, ctx); }