add gcd version, less bits on pipe, run socket in non-blocking mode

This commit is contained in:
Love Hornquist Astrand
2009-11-23 12:47:39 -08:00
parent d96a7eb2a1
commit ca876988d7

View File

@@ -43,8 +43,6 @@ struct heim_sipc {
void *mech;
};
#undef HAVE_GCD
#if defined(__APPLE__) && defined(HAVE_GCD)
#include "heim_ipcServer.h"
@@ -55,6 +53,8 @@ static dispatch_source_t timer;
static dispatch_queue_t timerq;
static uint64_t timeoutvalue;
static dispatch_queue_t eventq;
static dispatch_queue_t workq;
static void
@@ -84,6 +84,7 @@ init_globals(void)
dispatch_source_set_event_handler(timer, ^{ timer_ev(); } );
workq = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
eventq = dispatch_queue_create("heim-ipc.event-queue", NULL);
});
}
@@ -455,9 +456,10 @@ struct client {
};
#ifndef HAVE_GCD
static unsigned num_clients = 0;
static struct client **clients = NULL;
#endif
static struct client *
add_new_socket(int fd,
@@ -466,6 +468,7 @@ add_new_socket(int fd,
void *userctx)
{
struct client *c;
int one = 1;
c = calloc(1, sizeof(*c));
if (c == NULL)
@@ -481,9 +484,13 @@ add_new_socket(int fd,
c->callback = callback;
c->userctx = userctx;
(void)ioctl(c->fd, FIONBIO, (void *)&one);
#ifndef HAVE_GCD
clients = erealloc(clients, sizeof(clients[0]) * (num_clients + 1));
clients[num_clients] = c;
num_clients++;
#endif
return c;
}
@@ -532,8 +539,96 @@ socket_complete(heim_sipc_call ctx, int returnvalue, heim_idata *reply)
sc->c = NULL; /* so we can catch double complete */
free(sc);
if (c->calls == 0 && (c->flags & WAITING_CLOSE)) {
close(c->fd);
free(c);
}
}
static void
handle_read(struct client *c)
{
ssize_t len;
uint32_t dlen;
if (c->flags & LISTEN_SOCKET) {
add_new_socket(c->fd,
WAITING_READ,
c->callback,
c->userctx);
return;
}
if (c->ptr - c->len < 1024) {
c->inmsg = erealloc(c->inmsg,
c->len + 1024);
c->len += 1024;
}
len = read(c->fd, c->inmsg + c->ptr, c->len - c->ptr);
if (len <= 0) {
c->flags |= WAITING_CLOSE;
return;
}
c->ptr += len;
if (c->ptr > c->len)
abort();
while (c->ptr >= sizeof(dlen)) {
struct socket_call *cs;
memcpy(&dlen, c->inmsg, sizeof(dlen));
dlen = ntohl(dlen);
if (dlen < c->ptr - sizeof(dlen))
break;
cs = emalloc(sizeof(*cs));
cs->c = c;
cs->in.data = emalloc(dlen);
memcpy(cs->in.data, c->inmsg + sizeof(dlen), dlen);
cs->in.length = dlen;
c->ptr -= sizeof(dlen) + dlen;
memmove(c->inmsg,
c->inmsg + sizeof(dlen) + dlen,
c->ptr);
c->calls++;
c->callback(c->userctx, &cs->in,
NULL, socket_complete,
(heim_sipc_call)cs);
}
}
static void
handle_write(struct client *c)
{
ssize_t len;
len = write(c->fd, c->outmsg, c->olen);
if (len <= 0) {
c->flags |= WAITING_CLOSE;
return;
}
if (c->olen != len) {
memmove(&c->outmsg[0], &c->outmsg[len], c->olen - len);
c->olen -= len;
} else {
c->olen = 0;
free(c->outmsg);
c->outmsg = NULL;
c->flags &= WAITING_WRITE;
}
}
#ifdef HAVE_GCD
#else
static void
process_loop(void)
{
@@ -570,104 +665,33 @@ process_loop(void)
continue;
}
if (fds[n].revents & POLLIN) {
ssize_t len;
uint32_t dlen;
if (clients[n]->flags & LISTEN_SOCKET) {
add_new_socket(clients[n]->fd,
WAITING_READ,
clients[n]->callback,
clients[n]->userctx);
continue;
}
if (clients[n]->ptr - clients[n]->len < 1024) {
clients[n]->inmsg = erealloc(clients[n]->inmsg,
clients[n]->len + 1024);
clients[n]->len += 1024;
}
len = read(clients[n]->fd,
clients[n]->inmsg + clients[n]->ptr,
clients[n]->len - clients[n]->ptr);
if (len <= 0) {
clients[n]->flags |= WAITING_CLOSE;
continue;
}
clients[n]->ptr += len;
if (clients[n]->ptr > clients[n]->len)
abort();
while (clients[n]->ptr >= sizeof(dlen)) {
struct socket_call *cs;
memcpy(&dlen, clients[n]->inmsg, sizeof(dlen));
dlen = ntohl(dlen);
if (dlen < clients[n]->ptr - sizeof(dlen))
break;
cs = emalloc(sizeof(*cs));
cs->c = clients[n];
cs->in.data = emalloc(dlen);
memcpy(cs->in.data, clients[n]->inmsg + sizeof(dlen), dlen);
cs->in.length = dlen;
clients[n]->ptr -= sizeof(dlen) + dlen;
memmove(clients[n]->inmsg,
clients[n]->inmsg + sizeof(dlen) + dlen,
clients[n]->ptr);
clients[n]->calls++;
clients[n]->callback(clients[n]->userctx, &cs->in,
NULL, socket_complete,
(heim_sipc_call)cs);
}
}
if (fds[n].revents & POLLOUT) {
ssize_t len;
len = write(clients[n]->fd,
clients[n]->outmsg,
clients[n]->olen);
if (len <= 0) {
clients[n]->flags |= WAITING_CLOSE;
continue;
}
if (clients[n]->olen != len) {
memmove(&clients[n]->outmsg[0],
&clients[n]->outmsg[len],
clients[n]->olen - len);
clients[n]->olen -= len;
} else {
clients[n]->olen = 0;
free(clients[n]->outmsg);
clients[n]->outmsg = NULL;
clients[n]->flags &= WAITING_WRITE;
}
}
if (fds[n].revents & POLLIN)
handle_read(clients[n]);
if (fds[n].revents & POLLOUT)
handle_write(clients[n]);
}
n = 0;
while (n < num_clients) {
struct client *c = clients[n];
if ((c->flags & WAITING_CLOSE) == 0 || c->calls) {
if ((c->flags & WAITING_CLOSE) == 0) {
n++;
continue;
}
close(c->fd);
free(c);
if (c->calls == 0) {
close(c->fd);
free(c);
}
clients[n] = clients[num_clients - 1];
num_clients--;
}
free(fds);
}
}
#endif
static int
socket_release(heim_sipc ctx)
{
@@ -676,25 +700,43 @@ socket_release(heim_sipc ctx)
return 0;
}
#endif
int
heim_sipc_launchd_stream_fd_init(int fd,
heim_ipc_callback callback,
void *user, heim_sipc *ctx)
{
#ifndef HAVE_GCD
heim_sipc ct = calloc(1, sizeof(*ct));
ct->mech = add_new_socket(fd, LISTEN_SOCKET|WAITING_READ, callback, user);
struct client *c;
c = add_new_socket(fd, LISTEN_SOCKET|WAITING_READ, callback, user);
#ifndef HAVE_GCD
#else
dispatch_source_t in, out;
init_globals();
in = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0, eventq);
out = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0, eventq);
dispatch_source_set_event_handler(in, ^{
int rw = (c->flags & WAITING_WRITE);
handle_read(c);
if (rw == 0 && (c->flags & WAITING_WRITE))
dispatch_resume(out);
});
dispatch_source_set_event_handler(out, ^{
handle_write(c);
if ((c->flags & WAITING_WRITE) == 0)
dispatch_suspend(out);
});
dispatch_resume(in);
#endif
ct->mech = c;
ct->release = socket_release;
*ctx = ct;
return 0;
#else
close(fd);
*ctx = NULL;
return EINVAL;
#endif
}
@@ -734,7 +776,7 @@ heim_sipc_service_unix(const char *service,
return errno;
}
chmod(un.sun_path, 0777);
chmod(un.sun_path, 0666);
return heim_sipc_launchd_stream_fd_init(fd, callback, user, ctx);
}