handle gcd for sockets too

This commit is contained in:
Love Hornquist Astrand
2009-11-23 17:28:56 -08:00
parent ca876988d7
commit d4a562ebe6

View File

@@ -453,6 +453,10 @@ struct client {
uint8_t *inmsg; uint8_t *inmsg;
size_t olen; size_t olen;
uint8_t *outmsg; uint8_t *outmsg;
#ifdef HAVE_GCD
dispatch_source_t in;
dispatch_source_t out;
#endif
}; };
#ifndef HAVE_GCD #ifndef HAVE_GCD
@@ -460,6 +464,9 @@ static unsigned num_clients = 0;
static struct client **clients = NULL; static struct client **clients = NULL;
#endif #endif
static void handle_read(struct client *);
static void handle_write(struct client *);
static int maybe_close(struct client *);
static struct client * static struct client *
add_new_socket(int fd, add_new_socket(int fd,
@@ -468,25 +475,60 @@ add_new_socket(int fd,
void *userctx) void *userctx)
{ {
struct client *c; struct client *c;
int one = 1; int fileflags;
c = calloc(1, sizeof(*c)); c = calloc(1, sizeof(*c));
if (c == NULL) if (c == NULL)
return NULL; return NULL;
c->fd = accept(fd, NULL, NULL); if (flags & LISTEN_SOCKET) {
if(c->fd < 0) { c->fd = fd;
free(c); } else {
return NULL; c->fd = accept(fd, NULL, NULL);
if(c->fd < 0) {
free(c);
return NULL;
}
} }
printf("new socket %d\n", c->fd);
c->flags = flags; c->flags = flags;
c->callback = callback; c->callback = callback;
c->userctx = userctx; c->userctx = userctx;
(void)ioctl(c->fd, FIONBIO, (void *)&one); fileflags = fcntl(c->fd, F_GETFL, 0);
fcntl(c->fd, F_SETFL, fileflags | O_NONBLOCK);
#ifndef HAVE_GCD #ifdef HAVE_GCD
init_globals();
c->in = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
c->fd, 0, eventq);
c->out = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
c->fd, 0, eventq);
dispatch_source_set_event_handler(c->in, ^{
int rw = (c->flags & WAITING_WRITE);
printf("handle read %d\n", c->fd);
handle_read(c);
if (rw == 0 && (c->flags & WAITING_WRITE))
dispatch_resume(c->out);
if ((c->flags & WAITING_READ) == 0)
dispatch_suspend(c->in);
maybe_close(c);
});
dispatch_source_set_event_handler(c->out, ^{
printf("handle write %d\n", c->fd);
handle_write(c);
if ((c->flags & WAITING_WRITE) == 0) {
printf("completed write %d\n", c->fd);
dispatch_suspend(c->out);
}
maybe_close(c);
});
dispatch_resume(c->in);
#else
clients = erealloc(clients, sizeof(clients[0]) * (num_clients + 1)); clients = erealloc(clients, sizeof(clients[0]) * (num_clients + 1));
clients[num_clients] = c; clients[num_clients] = c;
num_clients++; num_clients++;
@@ -495,12 +537,37 @@ add_new_socket(int fd,
return c; return c;
} }
static int
maybe_close(struct client *c)
{
if (c->calls != 0)
return 0;
if (c->flags & (WAITING_READ|WAITING_WRITE))
return 0;
printf("client close: %d\n", c->fd);
#ifdef HAVE_GCD
dispatch_cancel(c->in);
if ((c->flags & WAITING_READ) == 0)
dispatch_resume(c->in);
dispatch_release(c->in);
dispatch_cancel(c->out);
if ((c->flags & WAITING_WRITE) == 0)
dispatch_resume(c->out);
dispatch_release(c->out);
#endif
close(c->fd); /* ref count fd close */
free(c);
return 1;
}
struct socket_call { struct socket_call {
heim_idata in; heim_idata in;
struct client *c; struct client *c;
}; };
static void static void
socket_complete(heim_sipc_call ctx, int returnvalue, heim_idata *reply) socket_complete(heim_sipc_call ctx, int returnvalue, heim_idata *reply)
{ {
@@ -517,16 +584,18 @@ socket_complete(heim_sipc_call ctx, int returnvalue, heim_idata *reply)
size_t rlen = reply->length + sizeof(u32) + sizeof(u32); size_t rlen = reply->length + sizeof(u32) + sizeof(u32);
c->outmsg = erealloc(c->outmsg, c->olen + rlen); c->outmsg = erealloc(c->outmsg, c->olen + rlen);
ptr = &c->outmsg[c->olen];
/* length */ /* length */
ptr = &c->outmsg[c->olen];
u32 = htonl(reply->length); u32 = htonl(reply->length);
memcpy(ptr, &u32, sizeof(u32)); memcpy(ptr, &u32, sizeof(u32));
ptr += sizeof(u32); ptr += sizeof(u32);
/* return value */ /* return value */
u32 = htonl(returnvalue); u32 = htonl(returnvalue);
memcpy(ptr, &u32, sizeof(u32)); memcpy(ptr, &u32, sizeof(u32));
ptr += sizeof(u32); ptr += sizeof(u32);
/* data */ /* data */
memcpy(ptr, reply->data, reply->length); memcpy(ptr, reply->data, reply->length);
c->olen += rlen; c->olen += rlen;
@@ -539,10 +608,7 @@ socket_complete(heim_sipc_call ctx, int returnvalue, heim_idata *reply)
sc->c = NULL; /* so we can catch double complete */ sc->c = NULL; /* so we can catch double complete */
free(sc); free(sc);
if (c->calls == 0 && (c->flags & WAITING_CLOSE)) { maybe_close(c);
close(c->fd);
free(c);
}
} }
static void static void
@@ -568,6 +634,7 @@ handle_read(struct client *c)
len = read(c->fd, c->inmsg + c->ptr, c->len - c->ptr); len = read(c->fd, c->inmsg + c->ptr, c->len - c->ptr);
if (len <= 0) { if (len <= 0) {
c->flags |= WAITING_CLOSE; c->flags |= WAITING_CLOSE;
c->flags &= ~WAITING_READ;
return; return;
} }
c->ptr += len; c->ptr += len;
@@ -609,25 +676,20 @@ handle_write(struct client *c)
len = write(c->fd, c->outmsg, c->olen); len = write(c->fd, c->outmsg, c->olen);
if (len <= 0) { if (len <= 0) {
c->flags |= WAITING_CLOSE; c->flags |= WAITING_CLOSE;
return; c->flags &= ~(WAITING_WRITE);
} } else if (c->olen != len) {
if (c->olen != len) {
memmove(&c->outmsg[0], &c->outmsg[len], c->olen - len); memmove(&c->outmsg[0], &c->outmsg[len], c->olen - len);
c->olen -= len; c->olen -= len;
} else { } else {
c->olen = 0; c->olen = 0;
free(c->outmsg); free(c->outmsg);
c->outmsg = NULL; c->outmsg = NULL;
c->flags &= ~(WAITING_WRITE);
c->flags &= WAITING_WRITE;
} }
} }
#ifdef HAVE_GCD #ifndef HAVE_GCD
#else
static void static void
process_loop(void) process_loop(void)
@@ -674,16 +736,11 @@ process_loop(void)
n = 0; n = 0;
while (n < num_clients) { while (n < num_clients) {
struct client *c = clients[n]; struct client *c = clients[n];
if ((c->flags & WAITING_CLOSE) == 0) { if (maybe_close(c)) {
n++; if (n < num_clients - 1)
continue; clients[n] = clients[num_clients - 1];
num_clients--;
} }
if (c->calls == 0) {
close(c->fd);
free(c);
}
clients[n] = clients[num_clients - 1];
num_clients--;
} }
free(fds); free(fds);
@@ -710,29 +767,6 @@ heim_sipc_launchd_stream_fd_init(int fd,
c = add_new_socket(fd, LISTEN_SOCKET|WAITING_READ, callback, user); c = add_new_socket(fd, LISTEN_SOCKET|WAITING_READ, callback, user);
#ifndef HAVE_GCD
#else
dispatch_source_t in, out;
init_globals();
in = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0, eventq);
out = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0, eventq);
dispatch_source_set_event_handler(in, ^{
int rw = (c->flags & WAITING_WRITE);
handle_read(c);
if (rw == 0 && (c->flags & WAITING_WRITE))
dispatch_resume(out);
});
dispatch_source_set_event_handler(out, ^{
handle_write(c);
if ((c->flags & WAITING_WRITE) == 0)
dispatch_suspend(out);
});
dispatch_resume(in);
#endif
ct->mech = c; ct->mech = c;
ct->release = socket_release; ct->release = socket_release;
*ctx = ct; *ctx = ct;