[master] b9d8796 Centralise the timer/poker function of pipe-based waiters, so that we can avoid thundering-herd/resonance phenomena when we have multiple waiters.

Poul-Henning Kamp phk at FreeBSD.org
Thu Jan 15 12:47:28 CET 2015


commit b9d87969b5955d21713d437cec22af547b7663b1
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Thu Jan 15 11:46:18 2015 +0000

    Centralise the timer/poker function of pipe-based waiters, so that
    we can avoid thundering-herd/resonance phenomena when we have multiple
    waiters.

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index be6bc54..fed05d1 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -397,7 +397,7 @@ struct waited {
 	VTAILQ_ENTRY(waited)	list;
 	int			fd;
 	void			*ptr;
-	double			deadline;
+	double			idle;
 #if defined(HAVE_EPOLL_CTL)
 	struct epoll_event ev;
 #endif
diff --git a/bin/varnishd/cache/cache_main.c b/bin/varnishd/cache/cache_main.c
index cfce5cc..bc55e99 100644
--- a/bin/varnishd/cache/cache_main.c
+++ b/bin/varnishd/cache/cache_main.c
@@ -39,6 +39,7 @@
 
 #include "vcli_priv.h"
 #include "vrnd.h"
+#include "waiter/waiter.h"
 
 #include "hash/hash_slinger.h"
 
@@ -214,6 +215,8 @@ child_main(void)
 	CLI_Init();
 	VFP_Init();
 
+	Wait_Init();
+
 	VCL_Init();
 
 	HTTP_Init();
diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c
index 814005a..822c0a2 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -295,7 +295,7 @@ SES_Wait(struct sess *sp)
 	sp->waited.magic = WAITED_MAGIC;
 	sp->waited.fd = sp->fd;
 	sp->waited.ptr = sp;
-	sp->waited.deadline = sp->t_idle;
+	sp->waited.idle = sp->t_idle;
 	if (Wait_Enter(pp->http1_waiter, &sp->waited)) {
 		VSC_C_main->sess_pipe_overflow++;
 		SES_Delete(sp, SC_SESS_PIPE_OVERFLOW, NAN);
diff --git a/bin/varnishd/waiter/cache_waiter.c b/bin/varnishd/waiter/cache_waiter.c
index 4b8fd6e..6d88566 100644
--- a/bin/varnishd/waiter/cache_waiter.c
+++ b/bin/varnishd/waiter/cache_waiter.c
@@ -38,12 +38,53 @@
 #include "cache/cache.h"
 
 #include "vfil.h"
+#include "vtim.h"
 
 #include "waiter/waiter.h"
 #include "waiter/waiter_priv.h"
 
 #define NEV 8192
 
+static VTAILQ_HEAD(, waiter)	waiters = VTAILQ_HEAD_INITIALIZER(waiters);
+static int			nwaiters;
+static struct lock		wait_mtx;
+static pthread_t		wait_thr;
+
+static void *
+wait_poker_thread(void *arg)
+{
+	struct waiter *w;
+	struct waited *wp;
+	double now;
+
+	(void)arg;
+	THR_SetName("Waiter timer");
+	while (1) {
+		/* Avoid thundering herds and resonances */
+		(void)usleep(990013/nwaiters);
+
+		now = VTIM_real();
+
+		Lck_Lock(&wait_mtx);
+		w = VTAILQ_FIRST(&waiters);
+		VTAILQ_REMOVE(&waiters, w, list);
+		VTAILQ_INSERT_TAIL(&waiters, w, list);
+		assert(w->pipes[1] >= 0);
+
+		wp = VTAILQ_FIRST(&w->waithead);
+		CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
+		if (wp == w->pipe_w) {
+			VTAILQ_REMOVE(&w->waithead, wp, list);
+			VTAILQ_INSERT_TAIL(&w->waithead, wp, list);
+			wp = VTAILQ_FIRST(&w->waithead);
+		}
+		if (wp->idle + *w->tmo < now)
+			(void)write(w->pipes[1], &w->pipe_w, sizeof w->pipe_w);
+		Lck_Unlock(&wait_mtx);
+	}
+	NEEDLESS_RETURN(NULL);
+}
+
 const char *
 Wait_GetName(void)
 {
@@ -70,10 +111,21 @@ Wait_New(waiter_handle_f *func, volatile double *tmo)
 	w->impl = waiter;
 	w->func = func;
 	w->tmo = tmo;
+	w->pipes[0] = w->pipes[1] = -1;
 	VTAILQ_INIT(&w->waithead);
 
 	waiter->init(w);
-	AN(w->impl->pass || w->pfd > 0);
+	AN(w->impl->pass || w->pipes[1] >= 0);
+
+	if (w->pipes[1] >= 0 && VTAILQ_EMPTY(&waiters)) {
+		/* Start timer poker thread */
+		AZ(pthread_create(&wait_thr, NULL, wait_poker_thread, NULL));
+	}
+
+	Lck_Lock(&wait_mtx);
+	VTAILQ_INSERT_TAIL(&waiters, w, list);
+	nwaiters++;
+	Lck_Unlock(&wait_mtx);
 	return (w);
 }
 
@@ -86,10 +138,9 @@ Wait_UsePipe(struct waiter *w)
 	AZ(pipe(w->pipes));
 	AZ(VFIL_nonblocking(w->pipes[0]));
 	AZ(VFIL_nonblocking(w->pipes[1]));
-	w->pfd = w->pipes[1];
 	ALLOC_OBJ(w->pipe_w, WAITED_MAGIC);
 	w->pipe_w->fd = w->pipes[0];
-	w->pipe_w->deadline = 9e99;
+	w->pipe_w->idle = 9e99;
 	VTAILQ_INSERT_HEAD(&w->waithead, w->pipe_w, list);
 	waiter->inject(w, w->pipe_w);
 }
@@ -106,9 +157,9 @@ Wait_Enter(const struct waiter *w, struct waited *wp)
 	if (w->impl->pass != NULL)
 		return (w->impl->pass(w->priv, wp));
 
-	assert(w->pfd >= 0);
+	assert(w->pipes[1] > 0);
 
-	written = write(w->pfd, &wp, sizeof wp);
+	written = write(w->pipes[1], &wp, sizeof wp);
 	if (written != sizeof wp && (errno == EAGAIN || errno == EWOULDBLOCK))
 		return (-1);
 	assert (written == sizeof wp);
@@ -118,28 +169,55 @@ Wait_Enter(const struct waiter *w, struct waited *wp)
 void
 Wait_Handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
 {
-	struct waited *ss[NEV];
-	int i, j;
+	struct waited *ss[NEV], *wp2;
+	int i, j, dotimer = 0;
 
 	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
 	CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
 
-	if (wp == w->pipe_w) {
-		i = read(w->pipes[0], ss, sizeof ss);
-		if (i == -1 && errno == EAGAIN)
-			return;
-		for (j = 0; i >= sizeof ss[0]; j++, i -= sizeof ss[0]) {
-			CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
-			assert(ss[j]->fd >= 0);
-			VTAILQ_INSERT_TAIL(&w->waithead, ss[j], list);
-			w->impl->inject(w, ss[j]);
+	if (wp != w->pipe_w) {
+		if (w->impl->evict != NULL)
+			w->impl->evict(w, wp);
+
+		VTAILQ_REMOVE(&w->waithead, wp, list);
+		w->func(wp, ev, now);
+		return;
+	}
+
+	i = read(w->pipes[0], ss, sizeof ss);
+	if (i == -1 && errno == EAGAIN)
+		return;
+
+	for (j = 0; i >= sizeof ss[0]; j++, i -= sizeof ss[0]) {
+		CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
+		if (ss[j] == w->pipe_w) {
+			dotimer = 1;
+			continue;
 		}
-		AZ(i);
+		assert(ss[j]->fd >= 0);
+		VTAILQ_INSERT_TAIL(&w->waithead, ss[j], list);
+		w->impl->inject(w, ss[j]);
+	}
+	AZ(i);
+
+	if (!dotimer)
 		return;
+
+	VTAILQ_FOREACH_SAFE(wp, &w->waithead, list, wp2) {
+		if (wp == w->pipe_w)
+			continue;
+		if (wp->idle + *w->tmo > now)
+			break;
+		if (w->impl->evict != NULL)
+			w->impl->evict(w, wp);
+		VTAILQ_REMOVE(&w->waithead, wp, list);
+		w->func(wp, WAITER_TIMEOUT, now);
 	}
-	if (w->impl->evict != NULL)
-		w->impl->evict(w, wp);
+}
+
+void
+Wait_Init(void)
+{
 
-	VTAILQ_REMOVE(&w->waithead, wp, list);
-	w->func(wp, ev, now);
+	Lck_New(&wait_mtx, lck_misc);
 }
diff --git a/bin/varnishd/waiter/cache_waiter_epoll.c b/bin/varnishd/waiter/cache_waiter_epoll.c
index a281f6b..8f63c78 100644
--- a/bin/varnishd/waiter/cache_waiter_epoll.c
+++ b/bin/varnishd/waiter/cache_waiter_epoll.c
@@ -110,7 +110,7 @@ vwe_thread(void *priv)
 	struct epoll_event ev[NEEV], *ep;
 	struct waited *sp, *sp2;
 	char junk;
-	double now, deadline;
+	double now, idle;
 	int dotimer, i, n;
 	struct vwe *vwe;
 
@@ -135,9 +135,9 @@ vwe_thread(void *priv)
 			continue;
 
 		/* check for timeouts */
-		deadline = now - *vwe->waiter->tmo;
+		idle = now - *vwe->waiter->tmo;
 		VTAILQ_FOREACH_SAFE(sp, &vwe->waiter->waithead, list, sp2) {
-			if (sp->deadline < deadline)
+			if (sp->idle < idle)
 				Wait_Handle(vwe->waiter, sp,
 				    WAITER_TIMEOUT, now);
 		}
diff --git a/bin/varnishd/waiter/cache_waiter_kqueue.c b/bin/varnishd/waiter/cache_waiter_kqueue.c
index 422dabe..8609048 100644
--- a/bin/varnishd/waiter/cache_waiter_kqueue.c
+++ b/bin/varnishd/waiter/cache_waiter_kqueue.c
@@ -127,9 +127,8 @@ vwk_thread(void *priv)
 {
 	struct vwk *vwk;
 	struct kevent ke[NKEV], *kp;
-	int j, n, dotimer;
-	double now, deadline;
-	struct waited *sp;
+	int j, n;
+	double now;
 
 	CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
 	THR_SetName("cache-kqueue");
@@ -141,42 +140,17 @@ vwk_thread(void *priv)
 
 	vwk->nki = 0;
 	while (1) {
-		dotimer = 0;
 		n = kevent(vwk->kq, vwk->ki, vwk->nki, ke, NKEV, NULL);
-		now = VTIM_real();
 		assert(n <= NKEV);
 		if (n == 0) {
 			/* This happens on OSX in m00011.vtc */
-			dotimer = 1;
 			(void)usleep(10000);
 		}
 		vwk->nki = 0;
+		now = VTIM_real();
 		for (kp = ke, j = 0; j < n; j++, kp++) {
-			if (kp->filter == EVFILT_TIMER) {
-				dotimer = 1;
-			} else {
-				assert(kp->filter == EVFILT_READ);
-				vwk_sess_ev(vwk, kp, now);
-			}
-		}
-		if (!dotimer)
-			continue;
-		/*
-		 * Make sure we have no pending changes for the fd's
-		 * we are about to close, in case the accept(2) in the
-		 * other thread creates new fd's betwen our close and
-		 * the kevent(2) at the top of this loop, the kernel
-		 * would not know we meant "the old fd of this number".
-		 */
-		vwk_kq_flush(vwk);
-		deadline = now - *vwk->waiter->tmo;
-		for (;;) {
-			sp = VTAILQ_FIRST(&vwk->waiter->waithead);
-			if (sp == NULL)
-				break;
-			if (sp->deadline > deadline)
-				break;
-			Wait_Handle(vwk->waiter, sp, WAITER_TIMEOUT, now);
+			assert(kp->filter == EVFILT_READ);
+			vwk_sess_ev(vwk, kp, now);
 		}
 	}
 	NEEDLESS_RETURN(NULL);
@@ -194,9 +168,6 @@ vwk_init(struct waiter *w)
 	INIT_OBJ(vwk, VWK_MAGIC);
 	vwk->waiter = w;
 
-	EV_SET(&vwk->ki[vwk->nki], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
-	vwk->nki++;
-
 	Wait_UsePipe(w);
 
 	AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
diff --git a/bin/varnishd/waiter/cache_waiter_poll.c b/bin/varnishd/waiter/cache_waiter_poll.c
index dfaac80..48c3cd7 100644
--- a/bin/varnishd/waiter/cache_waiter_poll.c
+++ b/bin/varnishd/waiter/cache_waiter_poll.c
@@ -131,7 +131,7 @@ vwp_main(void *priv)
 	int v, v2;
 	struct vwp *vwp;
 	struct waited *sp, *sp2;
-	double now, deadline;
+	double now, idle;
 	int fd;
 
 	CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
@@ -141,11 +141,11 @@ vwp_main(void *priv)
 		assert(vwp->hpoll < vwp->npoll);
 		while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
 			vwp->hpoll--;
-		v = poll(vwp->pollfd, vwp->hpoll + 1, 100);
+		v = poll(vwp->pollfd, vwp->hpoll + 1, -1);
 		assert(v >= 0);
 		v2 = v;
 		now = VTIM_real();
-		deadline = now - *vwp->waiter->tmo;
+		idle = now - *vwp->waiter->tmo;
 		VTAILQ_FOREACH_SAFE(sp, &vwp->waiter->waithead, list, sp2) {
 			if (v != 0 && v2 == 0)
 				break;
@@ -160,7 +160,7 @@ vwp_main(void *priv)
 				vwp->pollfd[fd].revents = 0;
 				Wait_Handle(vwp->waiter, sp, WAITER_ACTION,
 				    now);
-			} else if (sp->deadline <= deadline) {
+			} else if (sp->idle <= idle) {
 				Wait_Handle(vwp->waiter, sp, WAITER_TIMEOUT,
 				    now);
 			}
diff --git a/bin/varnishd/waiter/cache_waiter_ports.c b/bin/varnishd/waiter/cache_waiter_ports.c
index 76dad22..158a624 100644
--- a/bin/varnishd/waiter/cache_waiter_ports.c
+++ b/bin/varnishd/waiter/cache_waiter_ports.c
@@ -154,7 +154,7 @@ vws_thread(void *priv)
 		port_event_t ev[MAX_EVENTS];
 		u_int nevents;
 		int ei, ret;
-		double now, deadline;
+		double now, idle;
 
 		/*
 		 * XXX Do we want to scale this up dynamically to increase
@@ -189,7 +189,7 @@ vws_thread(void *priv)
 			vws_port_ev(vws, ev + ei, now);
 
 		/* check for timeouts */
-		deadline = now - *vws->waiter->tmo;
+		idle = now - *vws->waiter->tmo;
 
 		/*
 		 * This loop assumes that the oldest sessions are always at the
@@ -202,7 +202,7 @@ vws_thread(void *priv)
 			sp = VTAILQ_FIRST(&vws->waiter->waithead);
 			if (sp == NULL)
 				break;
-			if (sp->deadline > deadline) {
+			if (sp->idle > idle) {
 				break;
 			}
 			vws_del(vws, sp->fd);
@@ -214,7 +214,7 @@ vws_thread(void *priv)
 		 */
 
 		if (sp) {
-			double tmo = (sp->deadline + *vws->waiter->tmo) - now;
+			double tmo = (sp->idle + *vws->waiter->tmo) - now;
 
 			if (tmo < min_t) {
 				timeout = &min_ts;
diff --git a/bin/varnishd/waiter/waiter.h b/bin/varnishd/waiter/waiter.h
index 4c41001..ee4b6bc 100644
--- a/bin/varnishd/waiter/waiter.h
+++ b/bin/varnishd/waiter/waiter.h
@@ -59,6 +59,7 @@ typedef void waiter_handle_f(struct waited *, enum wait_event, double now);
 int Wait_Enter(const struct waiter *, struct waited *);
 struct waiter *Wait_New(waiter_handle_f *, volatile double *timeout);
 const char *Wait_GetName(void);
+void Wait_Init(void);
 
 /* mgt_waiter.c */
 int Wait_Argument(struct vsb *vsb, const char *arg);
diff --git a/bin/varnishd/waiter/waiter_priv.h b/bin/varnishd/waiter/waiter_priv.h
index 336328d..ba0d0e5 100644
--- a/bin/varnishd/waiter/waiter_priv.h
+++ b/bin/varnishd/waiter/waiter_priv.h
@@ -35,6 +35,8 @@ struct waiter {
 	unsigned			magic;
 	#define WAITER_MAGIC		0x17c399db
 	const struct waiter_impl	*impl;
+	VTAILQ_ENTRY(waiter)		list;
+
 	waiter_handle_f *		func;
 
 	int				pipes[2];
@@ -44,7 +46,6 @@ struct waiter {
 	VTAILQ_HEAD(,waited)		waithead;
 
 	void				*priv;
-	int				pfd;
 };
 
 typedef void waiter_init_f(struct waiter *);
diff --git a/include/tbl/locks.h b/include/tbl/locks.h
index 1a01782..e4b4914 100644
--- a/include/tbl/locks.h
+++ b/include/tbl/locks.h
@@ -54,4 +54,5 @@ LOCK(busyobj)
 LOCK(mempool)
 LOCK(vxid)
 LOCK(pipestat)
+LOCK(misc)
 /*lint -restore */



More information about the varnish-commit mailing list