r597 - trunk/varnish-cache/bin/varnishd
phk at projects.linpro.no
phk at projects.linpro.no
Wed Aug 2 11:34:40 CEST 2006
Author: phk
Date: 2006-08-02 11:34:40 +0200 (Wed, 02 Aug 2006)
New Revision: 597
Modified:
trunk/varnish-cache/bin/varnishd/cache.h
trunk/varnish-cache/bin/varnishd/cache_acceptor.c
trunk/varnish-cache/bin/varnishd/cache_http.c
Log:
Bite the bullet and write an alternate acceptor which uses kqueue
directly instead of libevent.
Degeneralize the header reading code in cache_http.c which seems to
be cleaner anyway.
An #ifdef at the top of cache_acceptor.c selects which implementation
you want: libevent or kqueue.
Modified: trunk/varnish-cache/bin/varnishd/cache.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache.h 2006-08-02 07:23:45 UTC (rev 596)
+++ trunk/varnish-cache/bin/varnishd/cache.h 2006-08-02 09:34:40 UTC (rev 597)
@@ -46,8 +46,6 @@
* RSN: struct worker and struct session will have one of these embedded.
*/
-typedef void http_callback_f(void *, int bad);
-
struct http_hdr {
char *b;
char *e;
@@ -56,9 +54,6 @@
struct http {
unsigned magic;
#define HTTP_MAGIC 0x6428b5c9
- struct event ev;
- http_callback_f *callback;
- void *arg;
char *s; /* (S)tart of buffer */
char *t; /* start of (T)railing data */
@@ -230,6 +225,7 @@
int id;
unsigned xid;
+ struct event ev;
struct worker *wrk;
unsigned sockaddrlen;
@@ -346,7 +342,9 @@
int http_HdrIs(struct http *hp, const char *hdr, const char *val);
int http_GetTail(struct http *hp, unsigned len, char **b, char **e);
int http_Read(struct http *hp, int fd, void *b, unsigned len);
-void http_RecvHeadEv(struct http *hp, int fd, struct event_base *eb, http_callback_f *func, void *arg);
+void http_RecvPrep(struct http *hp);
+int http_RecvPrepAgain(struct http *hp);
+int http_RecvSome(int fd, struct http *hp);
int http_RecvHead(struct http *hp, int fd);
int http_DissectRequest(struct http *sp, int fd);
int http_DissectResponse(struct http *sp, int fd);
Modified: trunk/varnish-cache/bin/varnishd/cache_acceptor.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor.c 2006-08-02 07:23:45 UTC (rev 596)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor.c 2006-08-02 09:34:40 UTC (rev 597)
@@ -6,6 +6,9 @@
* write the session pointer to a pipe which the event engine monitors.
*/
+#define ACCEPTOR_USE_KQUEUE
+#undef ACCEPTOR_USE_LIBEVENT
+
#include <stdio.h>
#include <errno.h>
#include <string.h>
@@ -16,14 +19,73 @@
#include <sys/types.h>
#include <sys/socket.h>
-#include <netdb.h>
-
#include "config.h"
#include "libvarnish.h"
#include "heritage.h"
#include "shmlog.h"
#include "cache.h"
+static pthread_t vca_thread;
+static unsigned xids;
+
+static struct sess *
+vca_accept_sess(int fd)
+{
+ socklen_t l;
+ struct sockaddr addr[2]; /* XXX: IPv6 hack */
+ struct sess *sp;
+ int i;
+ struct linger linger;
+
+ VSL_stats->client_conn++;
+
+ l = sizeof addr;
+ i = accept(fd, addr, &l);
+ if (i < 0) {
+ VSL(SLT_Debug, fd, "Accept failed errno=%d", errno);
+ /* XXX: stats ? */
+ return (NULL);
+ }
+ sp = SES_New(addr, l);
+ assert(sp != NULL); /* XXX handle */
+
+ sp->fd = i;
+ sp->id = i;
+
+#ifdef SO_NOSIGPIPE /* XXX Linux */
+ i = 1;
+ AZ(setsockopt(sp->fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof i));
+#endif
+#ifdef SO_LINGER /* XXX Linux*/
+ linger.l_onoff = 0;
+ linger.l_linger = 0;
+ AZ(setsockopt(sp->fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger));
+#endif
+
+ TCP_name(addr, l, sp->addr, sizeof sp->addr, sp->port, sizeof sp->port);
+ VSL(SLT_SessionOpen, sp->fd, "%s %s", sp->addr, sp->port);
+ return (sp);
+}
+
+static void
+vca_handover(struct sess *sp, int bad)
+{
+
+ if (bad) {
+ vca_close_session(sp,
+ bad == 1 ? "overflow" : "no request");
+ vca_return_session(sp);
+ return;
+ }
+ sp->step = STP_RECV;
+ VSL_stats->client_req++;
+ sp->xid = xids++;
+ VSL(SLT_XID, sp->fd, "%u", sp->xid);
+ WRK_QueueSession(sp);
+}
+
+#ifdef ACCEPTOR_USE_LIBEVENT
+
static struct event_base *evb;
static struct event pipe_e;
static int pipes[2];
@@ -31,12 +93,11 @@
static struct event tick_e;
static struct timeval tick_rate;
-static pthread_t vca_thread;
-static unsigned xids;
-
static struct event accept_e[2 * HERITAGE_NSOCKS];
static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
+/*--------------------------------------------------------------------*/
+
static void
vca_tick(int a, short b, void *c)
{
@@ -59,28 +120,36 @@
}
static void
-vca_callback(void *arg, int bad)
+vca_rcvhd_f(int fd, short event, void *arg)
{
struct sess *sp;
+ int i;
+ (void)event;
+
CAST_OBJ_NOTNULL(sp, arg, SESS_MAGIC);
+ i = http_RecvSome(fd, sp->http);
+ if (i < 0)
+ return;
+
+ event_del(&sp->ev);
TAILQ_REMOVE(&sesshead, sp, list);
- if (bad) {
- if (bad == 1)
- vca_close_session(sp, "overflow");
- else
- vca_close_session(sp, "no request");
- vca_return_session(sp);
- return;
- }
- sp->step = STP_RECV;
- VSL_stats->client_req++;
- sp->xid = xids++;
- VSL(SLT_XID, sp->fd, "%u", sp->xid);
- WRK_QueueSession(sp);
+ vca_handover(sp, i);
}
static void
+vca_rcvhdev(struct sess *sp)
+{
+
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+ TAILQ_INSERT_TAIL(&sesshead, sp, list);
+ event_set(&sp->ev, sp->fd, EV_READ | EV_PERSIST, vca_rcvhd_f, sp);
+ AZ(event_base_set(evb, &sp->ev));
+ AZ(event_add(&sp->ev, NULL)); /* XXX: timeout */
+}
+
+static void
pipe_f(int fd, short event, void *arg)
{
struct sess *sp;
@@ -90,53 +159,27 @@
(void)arg;
i = read(fd, &sp, sizeof sp);
assert(i == sizeof sp);
- clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
- TAILQ_INSERT_TAIL(&sesshead, sp, list);
- http_RecvHeadEv(sp->http, sp->fd, evb, vca_callback, sp);
+ if (http_RecvPrepAgain(sp->http)) {
+ vca_handover(sp, 0);
+ return;
+ }
+ vca_rcvhdev(sp);
}
static void
accept_f(int fd, short event, void *arg)
{
- socklen_t l;
- struct sockaddr addr[2]; /* XXX: IPv6 hack */
struct sess *sp;
- int i;
- struct linger linger;
(void)event;
(void)arg;
- VSL_stats->client_conn++;
-
- l = sizeof addr;
- i = accept(fd, addr, &l);
- if (i < 0) {
- VSL(SLT_Debug, fd, "Accept failed errno=%d", errno);
- /* XXX: stats ? */
+ sp = vca_accept_sess(fd);
+ if (sp == NULL)
return;
- }
- sp = SES_New(addr, l);
- assert(sp != NULL); /* XXX handle */
- sp->fd = i;
- sp->id = i;
-
-#ifdef SO_NOSIGPIPE /* XXX Linux */
- i = 1;
- AZ(setsockopt(sp->fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof i));
-#endif
-#ifdef SO_LINGER /* XXX Linux*/
- linger.l_onoff = 0;
- linger.l_linger = 0;
- AZ(setsockopt(sp->fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger));
-#endif
-
- TCP_name(addr, l, sp->addr, sizeof sp->addr, sp->port, sizeof sp->port);
- VSL(SLT_SessionOpen, sp->fd, "%s %s", sp->addr, sp->port);
- clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
- TAILQ_INSERT_TAIL(&sesshead, sp, list);
- http_RecvHeadEv(sp->http, sp->fd, evb, vca_callback, sp);
+ http_RecvPrep(sp->http);
+ vca_rcvhdev(sp);
}
static void *
@@ -147,6 +190,8 @@
(void)arg;
+ tick_rate.tv_sec = 1;
+ tick_rate.tv_usec = 0;
AZ(pipe(pipes));
evb = event_init();
assert(evb != NULL);
@@ -187,37 +232,153 @@
/*--------------------------------------------------------------------*/
void
-vca_close_session(struct sess *sp, const char *why)
+vca_return_session(struct sess *sp)
{
- VSL(SLT_SessionClose, sp->fd, why);
- if (sp->fd >= 0)
- AZ(close(sp->fd));
- sp->fd = -1;
+ if (sp->fd < 0) {
+ SES_Delete(sp);
+ return;
+ }
+ VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+ assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
}
+#endif /* ACCEPTOR_USE_LIBEVENT */
+
+#ifdef ACCEPTOR_USE_KQUEUE
+#include <sys/event.h>
+
+static int kq = -1;
+
+static void
+vca_kq_sess(struct sess *sp, int arm)
+{
+ struct kevent ke[2];
+
+ assert(arm == EV_ADD || arm == EV_DELETE);
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ memset(ke, 0, sizeof ke);
+ EV_SET(&ke[0], sp->fd, EVFILT_READ, arm, 0, 0, sp);
+ EV_SET(&ke[1], sp->fd, EVFILT_TIMER, arm , 0, 5000, sp);
+ AZ(kevent(kq, ke, 2, NULL, 0, NULL));
+}
+
+static void
+accept_f(int fd)
+{
+ struct sess *sp;
+
+ sp = vca_accept_sess(fd);
+ if (sp == NULL)
+ return;
+ clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+ http_RecvPrep(sp->http);
+ vca_kq_sess(sp, EV_ADD);
+}
+
+static void *
+vca_main(void *arg)
+{
+ unsigned u;
+ struct kevent ke;
+ int i;
+ struct sess *sp;
+
+ (void)arg;
+
+ kq = kqueue();
+ assert(kq >= 0);
+
+
+ for (u = 0; u < HERITAGE_NSOCKS; u++) {
+ if (heritage.sock_local[u] >= 0) {
+ memset(&ke, 0, sizeof ke);
+ EV_SET(&ke, heritage.sock_local[u],
+ EVFILT_READ, EV_ADD, 0, 0, accept_f);
+ AZ(kevent(kq, &ke, 1, NULL, 0, NULL));
+ }
+ if (heritage.sock_remote[u] >= 0) {
+ memset(&ke, 0, sizeof ke);
+ EV_SET(&ke, heritage.sock_remote[u],
+ EVFILT_READ, EV_ADD, 0, 0, accept_f);
+ AZ(kevent(kq, &ke, 1, NULL, 0, NULL));
+ }
+ }
+
+ while (1) {
+ i = kevent(kq, NULL, 0, &ke, 1, NULL);
+ assert(i == 1);
+#if 0
+ printf("i = %d\n", i);
+ printf("ke.ident = %ju\n", (uintmax_t)ke.ident);
+ printf("ke.filter = %u\n", ke.filter);
+ printf("ke.flags = %u\n", ke.flags);
+ printf("ke.fflags = %u\n", ke.fflags);
+ printf("ke.data = %jd\n", (intmax_t)ke.data);
+ printf("ke.udata = %p\n", ke.udata);
+#endif
+ if (ke.udata == accept_f) {
+ accept_f(ke.ident);
+ continue;
+ }
+ CAST_OBJ_NOTNULL(sp, ke.udata, SESS_MAGIC);
+ if (ke.filter == EVFILT_READ) {
+ i = http_RecvSome(sp->fd, sp->http);
+ if (i == -1)
+ continue;
+ vca_kq_sess(sp, EV_DELETE);
+ vca_handover(sp, i);
+ continue;
+ }
+ if (ke.filter == EVFILT_TIMER) {
+ vca_kq_sess(sp, EV_DELETE);
+ vca_close_session(sp, "timeout");
+ vca_return_session(sp);
+ continue;
+ }
+ INCOMPL();
+ }
+
+ INCOMPL();
+}
+
/*--------------------------------------------------------------------*/
void
vca_return_session(struct sess *sp)
{
- if (sp->fd >= 0) {
- VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
- assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
- } else {
+ if (sp->fd < 0) {
SES_Delete(sp);
+ return;
}
+ VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+ if (http_RecvPrepAgain(sp->http))
+ vca_handover(sp, 0);
+ else
+ vca_kq_sess(sp, EV_ADD);
}
+#endif /* ACCEPTOR_USE_KQUEUE */
+
/*--------------------------------------------------------------------*/
void
+vca_close_session(struct sess *sp, const char *why)
+{
+
+ VSL(SLT_SessionClose, sp->fd, why);
+ if (sp->fd >= 0)
+ AZ(close(sp->fd));
+ sp->fd = -1;
+}
+
+/*--------------------------------------------------------------------*/
+
+void
VCA_Init(void)
{
- tick_rate.tv_sec = 1;
- tick_rate.tv_usec = 0;
AZ(pthread_create(&vca_thread, NULL, vca_main, NULL));
srandomdev();
xids = random();
Modified: trunk/varnish-cache/bin/varnishd/cache_http.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_http.c 2006-08-02 07:23:45 UTC (rev 596)
+++ trunk/varnish-cache/bin/varnishd/cache_http.c 2006-08-02 09:34:40 UTC (rev 597)
@@ -436,14 +436,15 @@
if (++p > hp->v)
return (0);
hp->t = p;
+ assert(hp->t > hp->s);
assert(hp->t <= hp->v);
return (1);
}
/*--------------------------------------------------------------------*/
-static void
-http_preprecv(struct http *hp)
+void
+http_RecvPrep(struct http *hp)
{
unsigned l;
@@ -462,10 +463,19 @@
}
}
+int
+http_RecvPrepAgain(struct http *hp)
+{
+ http_RecvPrep(hp);
+ if (hp->v == hp->s)
+ return (0);
+ return (http_header_complete(hp));
+}
+
/*--------------------------------------------------------------------*/
-static int
-http_read_hdr(int fd, struct http *hp)
+int
+http_RecvSome(int fd, struct http *hp)
{
unsigned l;
int i;
@@ -507,55 +517,15 @@
/*--------------------------------------------------------------------*/
-static void
-http_read_f(int fd, short event, void *arg)
-{
- struct http *hp;
- int i;
-
- (void)event;
-
- CAST_OBJ_NOTNULL(hp, arg, HTTP_MAGIC);
- i = http_read_hdr(fd, hp);
- if (i < 0)
- return;
-
- event_del(&hp->ev);
- if (hp->callback != NULL)
- hp->callback(hp->arg, i);
-}
-
-
-void
-http_RecvHeadEv(struct http *hp, int fd, struct event_base *eb, http_callback_f *func, void *arg)
-{
-
- CHECK_OBJ_NOTNULL(hp, HTTP_MAGIC);
- assert(func != NULL);
- http_preprecv(hp);
- if (hp->v != hp->s && http_header_complete(hp)) {
- func(arg, 0);
- return;
- }
- hp->callback = func;
- hp->arg = arg;
- event_set(&hp->ev, fd, EV_READ | EV_PERSIST, http_read_f, hp);
- AZ(event_base_set(eb, &hp->ev));
- AZ(event_add(&hp->ev, NULL)); /* XXX: timeout */
- return;
-}
-
-/*--------------------------------------------------------------------*/
-
int
http_RecvHead(struct http *hp, int fd)
{
int i;
CHECK_OBJ_NOTNULL(hp, HTTP_MAGIC);
- http_preprecv(hp);
+ http_RecvPrep(hp);
do
- i = http_read_hdr(fd, hp);
+ i = http_RecvSome(fd, hp);
while (i == -1);
return (i);
}
More information about the varnish-commit
mailing list