diff --git a/lib/ipc/server.c b/lib/ipc/server.c index 74c0f1a2d..ccfdb1454 100644 --- a/lib/ipc/server.c +++ b/lib/ipc/server.c @@ -453,6 +453,10 @@ struct client { uint8_t *inmsg; size_t olen; uint8_t *outmsg; +#ifdef HAVE_GCD + dispatch_source_t in; + dispatch_source_t out; +#endif }; #ifndef HAVE_GCD @@ -460,6 +464,9 @@ static unsigned num_clients = 0; static struct client **clients = NULL; #endif +static void handle_read(struct client *); +static void handle_write(struct client *); +static int maybe_close(struct client *); static struct client * add_new_socket(int fd, @@ -468,25 +475,60 @@ add_new_socket(int fd, void *userctx) { struct client *c; - int one = 1; + int fileflags; c = calloc(1, sizeof(*c)); if (c == NULL) return NULL; - c->fd = accept(fd, NULL, NULL); - if(c->fd < 0) { - free(c); - return NULL; + if (flags & LISTEN_SOCKET) { + c->fd = fd; + } else { + c->fd = accept(fd, NULL, NULL); + if(c->fd < 0) { + free(c); + return NULL; + } } + printf("new socket %d\n", c->fd); c->flags = flags; c->callback = callback; c->userctx = userctx; - (void)ioctl(c->fd, FIONBIO, (void *)&one); + fileflags = fcntl(c->fd, F_GETFL, 0); + fcntl(c->fd, F_SETFL, fileflags | O_NONBLOCK); -#ifndef HAVE_GCD +#ifdef HAVE_GCD + init_globals(); + + c->in = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, + c->fd, 0, eventq); + c->out = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, + c->fd, 0, eventq); + + dispatch_source_set_event_handler(c->in, ^{ + int rw = (c->flags & WAITING_WRITE); + printf("handle read %d\n", c->fd); + handle_read(c); + if (rw == 0 && (c->flags & WAITING_WRITE)) + dispatch_resume(c->out); + if ((c->flags & WAITING_READ) == 0) + dispatch_suspend(c->in); + maybe_close(c); + }); + dispatch_source_set_event_handler(c->out, ^{ + printf("handle write %d\n", c->fd); + handle_write(c); + if ((c->flags & WAITING_WRITE) == 0) { + printf("completed write %d\n", c->fd); + dispatch_suspend(c->out); + } + maybe_close(c); + }); + + dispatch_resume(c->in); +#else clients = erealloc(clients, sizeof(clients[0]) * (num_clients + 1)); clients[num_clients] = c; num_clients++; @@ -495,12 +537,37 @@ add_new_socket(int fd, return c; } +static int +maybe_close(struct client *c) +{ + if (c->calls != 0) + return 0; + if (c->flags & (WAITING_READ|WAITING_WRITE)) + return 0; + + printf("client close: %d\n", c->fd); +#ifdef HAVE_GCD + dispatch_cancel(c->in); + if ((c->flags & WAITING_READ) == 0) + dispatch_resume(c->in); + dispatch_release(c->in); + + dispatch_cancel(c->out); + if ((c->flags & WAITING_WRITE) == 0) + dispatch_resume(c->out); + dispatch_release(c->out); +#endif + close(c->fd); /* ref count fd close */ + free(c); + return 1; +} + + struct socket_call { heim_idata in; struct client *c; }; - static void socket_complete(heim_sipc_call ctx, int returnvalue, heim_idata *reply) { @@ -517,16 +584,18 @@ socket_complete(heim_sipc_call ctx, int returnvalue, heim_idata *reply) size_t rlen = reply->length + sizeof(u32) + sizeof(u32); c->outmsg = erealloc(c->outmsg, c->olen + rlen); - ptr = &c->outmsg[c->olen]; /* length */ + ptr = &c->outmsg[c->olen]; u32 = htonl(reply->length); memcpy(ptr, &u32, sizeof(u32)); ptr += sizeof(u32); + /* return value */ u32 = htonl(returnvalue); memcpy(ptr, &u32, sizeof(u32)); ptr += sizeof(u32); + /* data */ memcpy(ptr, reply->data, reply->length); c->olen += rlen; @@ -539,10 +608,7 @@ socket_complete(heim_sipc_call ctx, int returnvalue, heim_idata *reply) sc->c = NULL; /* so we can catch double complete */ free(sc); - if (c->calls == 0 && (c->flags & WAITING_CLOSE)) { - close(c->fd); - free(c); - } + maybe_close(c); } static void @@ -568,6 +634,7 @@ handle_read(struct client *c) len = read(c->fd, c->inmsg + c->ptr, c->len - c->ptr); if (len <= 0) { c->flags |= WAITING_CLOSE; + c->flags &= ~WAITING_READ; return; } c->ptr += len; @@ -609,25 +676,20 @@ handle_write(struct client *c) len = write(c->fd, c->outmsg, c->olen); if (len <= 0) { c->flags |= WAITING_CLOSE; - return; - } - if (c->olen != len) { + c->flags &= ~(WAITING_WRITE); + } else if (c->olen != len) { memmove(&c->outmsg[0], &c->outmsg[len], c->olen - len); c->olen -= len; } else { c->olen = 0; free(c->outmsg); c->outmsg = NULL; - - c->flags &= WAITING_WRITE; + c->flags &= ~(WAITING_WRITE); } } -#ifdef HAVE_GCD - - -#else +#ifndef HAVE_GCD static void process_loop(void) @@ -674,16 +736,11 @@ process_loop(void) n = 0; while (n < num_clients) { struct client *c = clients[n]; - if ((c->flags & WAITING_CLOSE) == 0) { - n++; - continue; + if (maybe_close(c)) { + if (n < num_clients - 1) + clients[n] = clients[num_clients - 1]; + num_clients--; } - if (c->calls == 0) { - close(c->fd); - free(c); - } - clients[n] = clients[num_clients - 1]; - num_clients--; } free(fds); @@ -710,29 +767,6 @@ heim_sipc_launchd_stream_fd_init(int fd, c = add_new_socket(fd, LISTEN_SOCKET|WAITING_READ, callback, user); -#ifndef HAVE_GCD -#else - dispatch_source_t in, out; - - init_globals(); - - in = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0, eventq); - out = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0, eventq); - - dispatch_source_set_event_handler(in, ^{ - int rw = (c->flags & WAITING_WRITE); - handle_read(c); - if (rw == 0 && (c->flags & WAITING_WRITE)) - dispatch_resume(out); - }); - dispatch_source_set_event_handler(out, ^{ - handle_write(c); - if ((c->flags & WAITING_WRITE) == 0) - dispatch_suspend(out); - }); - - dispatch_resume(in); -#endif ct->mech = c; ct->release = socket_release; *ctx = ct;