[master] f71729a Move pool stuff related to the workthreads of each pool to cache_wrk.c, but retain stats stuff (since it runs across pools) in cache_pool.c

Poul-Henning Kamp phk at FreeBSD.org
Tue May 26 22:36:43 CEST 2015


commit f71729aec2aaacf280ddeafe7891f4103393cd14
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Tue May 26 19:59:42 2015 +0000

    Move pool stuff related to the workthreads of each pool to cache_wrk.c,
    but retain stats stuff (since it runs across pools) in cache_pool.c

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index ba45bf2..bb3c61d 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -962,7 +962,6 @@ const char *sess_close_2str(enum sess_close sc, int want_desc);
 
 /* cache_pool.c */
 void Pool_Init(void);
-void Pool_Work_Thread(struct pool *, struct worker *w);
 int Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how);
 int Pool_Task_Arg(struct worker *, task_func_t *,
     const void *arg, size_t arg_len);
@@ -1110,7 +1109,6 @@ void VMOD_Init(void);
 
 /* cache_wrk.c */
 
-void WRK_Thread(struct pool *qp, size_t stacksize, unsigned thread_workspace);
 typedef void *bgthread_t(struct worker *, void *priv);
 void WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func,
     void *priv);
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index 730966c..f71980d 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -41,12 +41,10 @@
 #include "cache.h"
 #include "cache_pool.h"
 
-#include "vtim.h"
-
-static struct lock		pool_mtx;
 static pthread_t		thr_pool_herder;
 
 static struct lock		wstat_mtx;
+struct lock			pool_mtx;
 
 /*--------------------------------------------------------------------
  * Summing of stats into global stats counters
@@ -88,25 +86,6 @@ Pool_TrySumstat(struct worker *wrk)
 }
 
 /*--------------------------------------------------------------------
- * Summing of stats into pool counters
- */
-
-static void
-pool_addstat(struct dstat *dst, struct dstat *src)
-{
-
-	dst->summs++;
-#define L0(n)
-#define L1(n) (dst->n += src->n)
-#define VSC_F(n,t,l,s,f,v,d,e)	L##l(n);
-#include "tbl/vsc_f_main.h"
-#undef VSC_F
-#undef L0
-#undef L1
-	memset(src, 0, sizeof *src);
-}
-
-/*--------------------------------------------------------------------
  * Helper function to update stats for purges under lock
  */
 
@@ -119,145 +98,11 @@ Pool_PurgeStat(unsigned nobj)
 	Lck_Unlock(&wstat_mtx);
 }
 
-
-/*--------------------------------------------------------------------
- */
-
-static struct worker *
-pool_getidleworker(struct pool *pp)
-{
-	struct pool_task *pt;
-	struct worker *wrk;
-
-	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
-	Lck_AssertHeld(&pp->mtx);
-	pt = VTAILQ_FIRST(&pp->idle_queue);
-	if (pt == NULL) {
-		if (pp->nthr < cache_param->wthread_max) {
-			pp->dry++;
-			AZ(pthread_cond_signal(&pp->herder_cond));
-		}
-		return (NULL);
-	}
-	AZ(pt->func);
-	CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
-	return (wrk);
-}
-
-/*--------------------------------------------------------------------
- * Special scheduling:  If no thread can be found, the current thread
- * will be prepared for rescheduling instead.
- * The selected threads workspace is reserved and the argument put there.
- * Return one if another thread was scheduled, otherwise zero.
- */
-
-int
-Pool_Task_Arg(struct worker *wrk, task_func_t *func,
-    const void *arg, size_t arg_len)
-{
-	struct pool *pp;
-	struct worker *wrk2;
-	int retval;
-
-	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
-	AN(arg);
-	AN(arg_len);
-	pp = wrk->pool;
-	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
-
-	Lck_Lock(&pp->mtx);
-	wrk2 = pool_getidleworker(pp);
-	if (wrk2 != NULL) {
-		VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
-		retval = 1;
-	} else {
-		wrk2 = wrk;
-		retval = 0;
-	}
-	Lck_Unlock(&pp->mtx);
-	AZ(wrk2->task.func);
-
-	assert(arg_len <= WS_Reserve(wrk2->aws, arg_len));
-	memcpy(wrk2->aws->f, arg, arg_len);
-	wrk2->task.func = func;
-	wrk2->task.priv = wrk2->aws->f;
-	if (retval)
-		AZ(pthread_cond_signal(&wrk2->cond));
-	return (retval);
-}
-
-/*--------------------------------------------------------------------
- * Enter a new task to be done
- */
-
-int
-Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
-{
-	struct worker *wrk;
-	int retval = 0;
-
-	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
-	AN(task);
-	AN(task->func);
-
-	Lck_Lock(&pp->mtx);
-
-	/*
-	 * The common case first:  Take an idle thread, do it.
-	 */
-
-	wrk = pool_getidleworker(pp);
-	if (wrk != NULL) {
-		VTAILQ_REMOVE(&pp->idle_queue, &wrk->task, list);
-		AZ(wrk->task.func);
-		wrk->task.func = task->func;
-		wrk->task.priv = task->priv;
-		Lck_Unlock(&pp->mtx);
-		AZ(pthread_cond_signal(&wrk->cond));
-		return (0);
-	}
-
-	switch (how) {
-	case POOL_NO_QUEUE:
-		retval = -1;
-		break;
-	case POOL_QUEUE_FRONT:
-		/* If we have too much in the queue already, refuse. */
-		if (pp->lqueue > cache_param->wthread_queue_limit) {
-			pp->ndropped++;
-			retval = -1;
-		} else {
-			VTAILQ_INSERT_TAIL(&pp->front_queue, task, list);
-			pp->nqueued++;
-			pp->lqueue++;
-		}
-		break;
-	case POOL_QUEUE_BACK:
-		VTAILQ_INSERT_TAIL(&pp->back_queue, task, list);
-		break;
-	default:
-		WRONG("Unknown enum pool_how");
-	}
-	Lck_Unlock(&pp->mtx);
-	return (retval);
-}
-
-/*--------------------------------------------------------------------
- * Empty function used as a pointer value for the thread exit condition.
- */
-
-static void __match_proto__(task_func_t)
-pool_kiss_of_death(struct worker *wrk, void *priv)
-{
-	(void)wrk;
-	(void)priv;
-}
-
 /*--------------------------------------------------------------------
  * Special function to summ stats
  */
 
-static void __match_proto__(task_func_t)
+void __match_proto__(task_func_t)
 pool_stat_summ(struct worker *wrk, void *priv)
 {
 	struct dstat *src;
@@ -274,235 +119,6 @@ pool_stat_summ(struct worker *wrk, void *priv)
 }
 
 /*--------------------------------------------------------------------
- * This is the work function for worker threads in the pool.
- */
-
-void
-Pool_Work_Thread(struct pool *pp, struct worker *wrk)
-{
-	struct pool_task *tp;
-	struct pool_task tpx, tps;
-	int i;
-
-	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
-	wrk->pool = pp;
-	while (1) {
-		Lck_Lock(&pp->mtx);
-
-		CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
-
-		WS_Reset(wrk->aws, NULL);
-		AZ(wrk->vsl);
-
-		tp = VTAILQ_FIRST(&pp->front_queue);
-		if (tp != NULL) {
-			pp->lqueue--;
-			VTAILQ_REMOVE(&pp->front_queue, tp, list);
-		} else {
-			tp = VTAILQ_FIRST(&pp->back_queue);
-			if (tp != NULL)
-				VTAILQ_REMOVE(&pp->back_queue, tp, list);
-		}
-
-		if ((tp == NULL && wrk->stats->summs > 0) ||
-		    (wrk->stats->summs >= cache_param->wthread_stats_rate))
-			pool_addstat(pp->a_stat, wrk->stats);
-
-		if (tp != NULL) {
-			wrk->stats->summs++;
-		} else if (pp->b_stat != NULL && pp->a_stat->summs) {
-			/* Nothing to do, push pool stats into global pool */
-			tps.func = pool_stat_summ;
-			tps.priv = pp->a_stat;
-			pp->a_stat = pp->b_stat;
-			pp->b_stat = NULL;
-			tp = &tps;
-		} else {
-			/* Nothing to do: To sleep, perchance to dream ... */
-			if (isnan(wrk->lastused))
-				wrk->lastused = VTIM_real();
-			wrk->task.func = NULL;
-			wrk->task.priv = wrk;
-			VTAILQ_INSERT_HEAD(&pp->idle_queue, &wrk->task, list);
-			do {
-				i = Lck_CondWait(&wrk->cond, &pp->mtx,
-				    wrk->vcl == NULL ?  0 : wrk->lastused+60.);
-				if (i == ETIMEDOUT)
-					VCL_Rel(&wrk->vcl);
-			} while (wrk->task.func == NULL);
-			tpx = wrk->task;
-			tp = &tpx;
-			wrk->stats->summs++;
-		}
-		Lck_Unlock(&pp->mtx);
-
-		if (tp->func == pool_kiss_of_death)
-			break;
-
-		do {
-			memset(&wrk->task, 0, sizeof wrk->task);
-			assert(wrk->pool == pp);
-			tp->func(wrk, tp->priv);
-			tpx = wrk->task;
-			tp = &tpx;
-		} while (tp->func != NULL);
-
-		/* cleanup for next task */
-		wrk->seen_methods = 0;
-	}
-	wrk->pool = NULL;
-}
-
-/*--------------------------------------------------------------------
- * Create another worker thread.
- */
-
-struct pool_info {
-	unsigned		magic;
-#define POOL_INFO_MAGIC		0x4e4442d3
-	size_t			stacksize;
-	struct pool		*qp;
-};
-
-static void *
-pool_thread(void *priv)
-{
-	struct pool_info *pi;
-
-	CAST_OBJ_NOTNULL(pi, priv, POOL_INFO_MAGIC);
-	WRK_Thread(pi->qp, pi->stacksize, cache_param->workspace_thread);
-	FREE_OBJ(pi);
-	return (NULL);
-}
-
-static void
-pool_breed(struct pool *qp)
-{
-	pthread_t tp;
-	pthread_attr_t tp_attr;
-	struct pool_info *pi;
-
-	AZ(pthread_attr_init(&tp_attr));
-	AZ(pthread_attr_setdetachstate(&tp_attr, PTHREAD_CREATE_DETACHED));
-
-	/* Set the stacksize for worker threads we create */
-	if (cache_param->wthread_stacksize != UINT_MAX)
-		AZ(pthread_attr_setstacksize(&tp_attr,
-		    cache_param->wthread_stacksize));
-
-	ALLOC_OBJ(pi, POOL_INFO_MAGIC);
-	AN(pi);
-	AZ(pthread_attr_getstacksize(&tp_attr, &pi->stacksize));
-	pi->qp = qp;
-
-	if (pthread_create(&tp, &tp_attr, pool_thread, pi)) {
-		VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
-		    errno, strerror(errno));
-		Lck_Lock(&pool_mtx);
-		VSC_C_main->threads_failed++;
-		Lck_Unlock(&pool_mtx);
-		VTIM_sleep(cache_param->wthread_fail_delay);
-	} else {
-		qp->dry = 0;
-		qp->nthr++;
-		Lck_Lock(&pool_mtx);
-		VSC_C_main->threads++;
-		VSC_C_main->threads_created++;
-		Lck_Unlock(&pool_mtx);
-		VTIM_sleep(cache_param->wthread_add_delay);
-	}
-
-	AZ(pthread_attr_destroy(&tp_attr));
-}
-
-/*--------------------------------------------------------------------
- * Herd a single pool
- *
- * This thread wakes up whenever a pool queues.
- *
- * The trick here is to not be too aggressive about creating threads.
- * We do this by only examining one pool at a time, and by sleeping
- * a short while whenever we create a thread and a little while longer
- * whenever we fail to, hopefully missing a lot of cond_signals in
- * the meantime.
- *
- * XXX: probably need a lot more work.
- *
- */
-
-static void*
-pool_herder(void *priv)
-{
-	struct pool *pp;
-	struct pool_task *pt;
-	double t_idle;
-	struct worker *wrk;
-
-	CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
-
-	while (1) {
-		/* Make more threads if needed and allowed */
-		if (pp->nthr < cache_param->wthread_min ||
-		    (pp->dry && pp->nthr < cache_param->wthread_max)) {
-			pool_breed(pp);
-			continue;
-		}
-		assert(pp->nthr >= cache_param->wthread_min);
-
-		if (pp->nthr > cache_param->wthread_min) {
-
-			t_idle = VTIM_real() - cache_param->wthread_timeout;
-
-			Lck_Lock(&pp->mtx);
-			/* XXX: unsafe counters */
-			VSC_C_main->sess_queued += pp->nqueued;
-			VSC_C_main->sess_dropped += pp->ndropped;
-			pp->nqueued = pp->ndropped = 0;
-
-			wrk = NULL;
-			pt = VTAILQ_LAST(&pp->idle_queue, taskhead);
-			if (pt != NULL) {
-				AZ(pt->func);
-				CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
-
-				if (wrk->lastused < t_idle ||
-				    pp->nthr > cache_param->wthread_max) {
-					/* Give it a kiss on the cheek... */
-					VTAILQ_REMOVE(&pp->idle_queue,
-					    &wrk->task, list);
-					wrk->task.func = pool_kiss_of_death;
-					AZ(pthread_cond_signal(&wrk->cond));
-				} else
-					wrk = NULL;
-			}
-			Lck_Unlock(&pp->mtx);
-
-			if (wrk != NULL) {
-				pp->nthr--;
-				Lck_Lock(&pool_mtx);
-				VSC_C_main->threads--;
-				VSC_C_main->threads_destroyed++;
-				Lck_Unlock(&pool_mtx);
-				VTIM_sleep(cache_param->wthread_destroy_delay);
-				continue;
-			}
-		}
-
-		Lck_Lock(&pp->mtx);
-		if (!pp->dry) {
-			(void)Lck_CondWait(&pp->herder_cond, &pp->mtx,
-				VTIM_real() + 5);
-		} else {
-			/* XXX: unsafe counters */
-			VSC_C_main->threads_limited++;
-			pp->dry = 0;
-		}
-		Lck_Unlock(&pp->mtx);
-	}
-	NEEDLESS_RETURN(NULL);
-}
-
-/*--------------------------------------------------------------------
  * Add a thread pool
  */
 
@@ -538,6 +154,9 @@ pool_mkpool(unsigned pool_no)
 /*--------------------------------------------------------------------
  * This thread adjusts the number of pools to match the parameter.
  *
+ * NB: This is quite silly.  The master should tell the child through
+ * NB: CLI when parameters change and an appropriate call-out table
+ * NB: be maintained for params which require action.
  */
 
 static void *
diff --git a/bin/varnishd/cache/cache_pool.h b/bin/varnishd/cache/cache_pool.h
index 7660d7a..76acc08 100644
--- a/bin/varnishd/cache/cache_pool.h
+++ b/bin/varnishd/cache/cache_pool.h
@@ -54,3 +54,7 @@ struct pool {
 	struct dstat			*a_stat;
 	struct dstat			*b_stat;
 };
+
+void *pool_herder(void*);
+task_func_t pool_stat_summ;
+extern struct lock			pool_mtx;
diff --git a/bin/varnishd/cache/cache_wrk.c b/bin/varnishd/cache/cache_wrk.c
index c793176..e95b1eb 100644
--- a/bin/varnishd/cache/cache_wrk.c
+++ b/bin/varnishd/cache/cache_wrk.c
@@ -31,11 +31,18 @@
 
 #include "config.h"
 
+#include <errno.h>
 #include <stdlib.h>
 
 #include "cache.h"
+#include "cache_pool.h"
+
+#include "vtim.h"
 
 #include "hash/hash_slinger.h"
+
+static void Pool_Work_Thread(struct pool *pp, struct worker *wrk);
+
 /*--------------------------------------------------------------------
  * Create and starte a back-ground thread which as its own worker and
  * session data structures;
@@ -82,7 +89,7 @@ WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func, void *priv)
 
 /*--------------------------------------------------------------------*/
 
-void
+static void
 WRK_Thread(struct pool *qp, size_t stacksize, unsigned thread_workspace)
 {
 	struct worker *w, ww;
@@ -123,3 +130,384 @@ WRK_Thread(struct pool *qp, size_t stacksize, unsigned thread_workspace)
 	HSH_Cleanup(w);
 	Pool_Sumstat(w);
 }
+
+/*--------------------------------------------------------------------
+ * Summing of stats into pool counters
+ */
+
+static void
+pool_addstat(struct dstat *dst, struct dstat *src)
+{
+
+	dst->summs++;
+#define L0(n)
+#define L1(n) (dst->n += src->n)
+#define VSC_F(n,t,l,s,f,v,d,e)	L##l(n);
+#include "tbl/vsc_f_main.h"
+#undef VSC_F
+#undef L0
+#undef L1
+	memset(src, 0, sizeof *src);
+}
+
+/*--------------------------------------------------------------------*/
+
+static struct worker *
+pool_getidleworker(struct pool *pp)
+{
+	struct pool_task *pt;
+	struct worker *wrk;
+
+	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+	Lck_AssertHeld(&pp->mtx);
+	pt = VTAILQ_FIRST(&pp->idle_queue);
+	if (pt == NULL) {
+		if (pp->nthr < cache_param->wthread_max) {
+			pp->dry++;
+			AZ(pthread_cond_signal(&pp->herder_cond));
+		}
+		return (NULL);
+	}
+	AZ(pt->func);
+	CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
+	return (wrk);
+}
+
+/*--------------------------------------------------------------------
+ * Special scheduling:  If no thread can be found, the current thread
+ * will be prepared for rescheduling instead.
+ * The selected threads workspace is reserved and the argument put there.
+ * Return one if another thread was scheduled, otherwise zero.
+ */
+
+int
+Pool_Task_Arg(struct worker *wrk, task_func_t *func,
+    const void *arg, size_t arg_len)
+{
+	struct pool *pp;
+	struct worker *wrk2;
+	int retval;
+
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	AN(arg);
+	AN(arg_len);
+	pp = wrk->pool;
+	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+
+	Lck_Lock(&pp->mtx);
+	wrk2 = pool_getidleworker(pp);
+	if (wrk2 != NULL) {
+		VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
+		retval = 1;
+	} else {
+		wrk2 = wrk;
+		retval = 0;
+	}
+	Lck_Unlock(&pp->mtx);
+	AZ(wrk2->task.func);
+
+	assert(arg_len <= WS_Reserve(wrk2->aws, arg_len));
+	memcpy(wrk2->aws->f, arg, arg_len);
+	wrk2->task.func = func;
+	wrk2->task.priv = wrk2->aws->f;
+	if (retval)
+		AZ(pthread_cond_signal(&wrk2->cond));
+	return (retval);
+}
+
+/*--------------------------------------------------------------------
+ * Enter a new task to be done
+ */
+
+int
+Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
+{
+	struct worker *wrk;
+	int retval = 0;
+
+	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+	AN(task);
+	AN(task->func);
+
+	Lck_Lock(&pp->mtx);
+
+	/*
+	 * The common case first:  Take an idle thread, do it.
+	 */
+
+	wrk = pool_getidleworker(pp);
+	if (wrk != NULL) {
+		VTAILQ_REMOVE(&pp->idle_queue, &wrk->task, list);
+		AZ(wrk->task.func);
+		wrk->task.func = task->func;
+		wrk->task.priv = task->priv;
+		Lck_Unlock(&pp->mtx);
+		AZ(pthread_cond_signal(&wrk->cond));
+		return (0);
+	}
+
+	switch (how) {
+	case POOL_NO_QUEUE:
+		retval = -1;
+		break;
+	case POOL_QUEUE_FRONT:
+		/* If we have too much in the queue already, refuse. */
+		if (pp->lqueue > cache_param->wthread_queue_limit) {
+			pp->ndropped++;
+			retval = -1;
+		} else {
+			VTAILQ_INSERT_TAIL(&pp->front_queue, task, list);
+			pp->nqueued++;
+			pp->lqueue++;
+		}
+		break;
+	case POOL_QUEUE_BACK:
+		VTAILQ_INSERT_TAIL(&pp->back_queue, task, list);
+		break;
+	default:
+		WRONG("Unknown enum pool_how");
+	}
+	Lck_Unlock(&pp->mtx);
+	return (retval);
+}
+
+/*--------------------------------------------------------------------
+ * Empty function used as a pointer value for the thread exit condition.
+ */
+
+static void __match_proto__(task_func_t)
+pool_kiss_of_death(struct worker *wrk, void *priv)
+{
+	(void)wrk;
+	(void)priv;
+}
+
+
+/*--------------------------------------------------------------------
+ * This is the work function for worker threads in the pool.
+ */
+
+static void
+Pool_Work_Thread(struct pool *pp, struct worker *wrk)
+{
+	struct pool_task *tp;
+	struct pool_task tpx, tps;
+	int i;
+
+	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+	wrk->pool = pp;
+	while (1) {
+		Lck_Lock(&pp->mtx);
+
+		CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+
+		WS_Reset(wrk->aws, NULL);
+		AZ(wrk->vsl);
+
+		tp = VTAILQ_FIRST(&pp->front_queue);
+		if (tp != NULL) {
+			pp->lqueue--;
+			VTAILQ_REMOVE(&pp->front_queue, tp, list);
+		} else {
+			tp = VTAILQ_FIRST(&pp->back_queue);
+			if (tp != NULL)
+				VTAILQ_REMOVE(&pp->back_queue, tp, list);
+		}
+
+		if ((tp == NULL && wrk->stats->summs > 0) ||
+		    (wrk->stats->summs >= cache_param->wthread_stats_rate))
+			pool_addstat(pp->a_stat, wrk->stats);
+
+		if (tp != NULL) {
+			wrk->stats->summs++;
+		} else if (pp->b_stat != NULL && pp->a_stat->summs) {
+			/* Nothing to do, push pool stats into global pool */
+			tps.func = pool_stat_summ;
+			tps.priv = pp->a_stat;
+			pp->a_stat = pp->b_stat;
+			pp->b_stat = NULL;
+			tp = &tps;
+		} else {
+			/* Nothing to do: To sleep, perchance to dream ... */
+			if (isnan(wrk->lastused))
+				wrk->lastused = VTIM_real();
+			wrk->task.func = NULL;
+			wrk->task.priv = wrk;
+			VTAILQ_INSERT_HEAD(&pp->idle_queue, &wrk->task, list);
+			do {
+				i = Lck_CondWait(&wrk->cond, &pp->mtx,
+				    wrk->vcl == NULL ?  0 : wrk->lastused+60.);
+				if (i == ETIMEDOUT)
+					VCL_Rel(&wrk->vcl);
+			} while (wrk->task.func == NULL);
+			tpx = wrk->task;
+			tp = &tpx;
+			wrk->stats->summs++;
+		}
+		Lck_Unlock(&pp->mtx);
+
+		if (tp->func == pool_kiss_of_death)
+			break;
+
+		do {
+			memset(&wrk->task, 0, sizeof wrk->task);
+			assert(wrk->pool == pp);
+			tp->func(wrk, tp->priv);
+			tpx = wrk->task;
+			tp = &tpx;
+		} while (tp->func != NULL);
+
+		/* cleanup for next task */
+		wrk->seen_methods = 0;
+	}
+	wrk->pool = NULL;
+}
+
+/*--------------------------------------------------------------------
+ * Create another worker thread.
+ */
+
+struct pool_info {
+	unsigned		magic;
+#define POOL_INFO_MAGIC		0x4e4442d3
+	size_t			stacksize;
+	struct pool		*qp;
+};
+
+static void *
+pool_thread(void *priv)
+{
+	struct pool_info *pi;
+
+	CAST_OBJ_NOTNULL(pi, priv, POOL_INFO_MAGIC);
+	WRK_Thread(pi->qp, pi->stacksize, cache_param->workspace_thread);
+	FREE_OBJ(pi);
+	return (NULL);
+}
+
+static void
+pool_breed(struct pool *qp)
+{
+	pthread_t tp;
+	pthread_attr_t tp_attr;
+	struct pool_info *pi;
+
+	AZ(pthread_attr_init(&tp_attr));
+	AZ(pthread_attr_setdetachstate(&tp_attr, PTHREAD_CREATE_DETACHED));
+
+	/* Set the stacksize for worker threads we create */
+	if (cache_param->wthread_stacksize != UINT_MAX)
+		AZ(pthread_attr_setstacksize(&tp_attr,
+		    cache_param->wthread_stacksize));
+
+	ALLOC_OBJ(pi, POOL_INFO_MAGIC);
+	AN(pi);
+	AZ(pthread_attr_getstacksize(&tp_attr, &pi->stacksize));
+	pi->qp = qp;
+
+	if (pthread_create(&tp, &tp_attr, pool_thread, pi)) {
+		VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
+		    errno, strerror(errno));
+		Lck_Lock(&pool_mtx);
+		VSC_C_main->threads_failed++;
+		Lck_Unlock(&pool_mtx);
+		VTIM_sleep(cache_param->wthread_fail_delay);
+	} else {
+		qp->dry = 0;
+		qp->nthr++;
+		Lck_Lock(&pool_mtx);
+		VSC_C_main->threads++;
+		VSC_C_main->threads_created++;
+		Lck_Unlock(&pool_mtx);
+		VTIM_sleep(cache_param->wthread_add_delay);
+	}
+
+	AZ(pthread_attr_destroy(&tp_attr));
+}
+
+/*--------------------------------------------------------------------
+ * Herd a single pool
+ *
+ * This thread wakes up whenever a pool queues.
+ *
+ * The trick here is to not be too aggressive about creating threads.
+ * We do this by only examining one pool at a time, and by sleeping
+ * a short while whenever we create a thread and a little while longer
+ * whenever we fail to, hopefully missing a lot of cond_signals in
+ * the meantime.
+ *
+ * XXX: probably need a lot more work.
+ *
+ */
+
+void*
+pool_herder(void *priv)
+{
+	struct pool *pp;
+	struct pool_task *pt;
+	double t_idle;
+	struct worker *wrk;
+
+	CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
+
+	while (1) {
+		/* Make more threads if needed and allowed */
+		if (pp->nthr < cache_param->wthread_min ||
+		    (pp->dry && pp->nthr < cache_param->wthread_max)) {
+			pool_breed(pp);
+			continue;
+		}
+		assert(pp->nthr >= cache_param->wthread_min);
+
+		if (pp->nthr > cache_param->wthread_min) {
+
+			t_idle = VTIM_real() - cache_param->wthread_timeout;
+
+			Lck_Lock(&pp->mtx);
+			/* XXX: unsafe counters */
+			VSC_C_main->sess_queued += pp->nqueued;
+			VSC_C_main->sess_dropped += pp->ndropped;
+			pp->nqueued = pp->ndropped = 0;
+
+			wrk = NULL;
+			pt = VTAILQ_LAST(&pp->idle_queue, taskhead);
+			if (pt != NULL) {
+				AZ(pt->func);
+				CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
+
+				if (wrk->lastused < t_idle ||
+				    pp->nthr > cache_param->wthread_max) {
+					/* Give it a kiss on the cheek... */
+					VTAILQ_REMOVE(&pp->idle_queue,
+					    &wrk->task, list);
+					wrk->task.func = pool_kiss_of_death;
+					AZ(pthread_cond_signal(&wrk->cond));
+				} else
+					wrk = NULL;
+			}
+			Lck_Unlock(&pp->mtx);
+
+			if (wrk != NULL) {
+				pp->nthr--;
+				Lck_Lock(&pool_mtx);
+				VSC_C_main->threads--;
+				VSC_C_main->threads_destroyed++;
+				Lck_Unlock(&pool_mtx);
+				VTIM_sleep(cache_param->wthread_destroy_delay);
+				continue;
+			}
+		}
+
+		Lck_Lock(&pp->mtx);
+		if (!pp->dry) {
+			(void)Lck_CondWait(&pp->herder_cond, &pp->mtx,
+				VTIM_real() + 5);
+		} else {
+			/* XXX: unsafe counters */
+			VSC_C_main->threads_limited++;
+			pp->dry = 0;
+		}
+		Lck_Unlock(&pp->mtx);
+	}
+	NEEDLESS_RETURN(NULL);
+}



More information about the varnish-commit mailing list