[master] 35cf781 Simplify vsl segment management, fixing spurious vsl overruns

Nils Goroll nils.goroll at uplex.de
Fri Jun 26 14:38:38 CEST 2015


commit 35cf78131c2aaaf8227c31ba92eb577c5bea9855
Author: Nils Goroll <nils.goroll at uplex.de>
Date:   Mon Jun 15 12:47:04 2015 +0200

    Simplify vsl segment management, fixing spurious vsl overruns
    
    vsl sequence and segment updates didn't happen atomically, so
    vslc_vsm_check could report spurious overruns.
    
    Replace sequence and segment index with a counter (segment_n), which
    always increments (with overflow after UINT_MAX). The actual segment
    index will be segment_n % VSL_SEGMENTS. Overrun detection by
    calculation of the difference between two segment numbers becomes
    simple and safe because we only ever access/update a single integer.
    
    Update the shared memory log head.
    
    (struct VSLC_ptr).priv is now the the equivalent of segment_n from the
    reader side. It gets initialized once and is maintained independently.
    
    Patch prepared in collaboration with Martin Blix Grydeland
    <martin at varnish-software.com>
    
    Fixes: #1747

diff --git a/bin/varnishd/cache/cache_shmlog.c b/bin/varnishd/cache/cache_shmlog.c
index 2388969..3566d53 100644
--- a/bin/varnishd/cache/cache_shmlog.c
+++ b/bin/varnishd/cache/cache_shmlog.c
@@ -47,9 +47,8 @@ static pthread_mutex_t vsm_mtx;
 static struct VSL_head		*vsl_head;
 static const uint32_t		*vsl_end;
 static uint32_t			*vsl_ptr;
-static unsigned			vsl_segment;
+static unsigned			vsl_segment_n;
 static ssize_t			vsl_segsize;
-static unsigned			vsl_seq;
 
 struct VSC_C_main       *VSC_C_main;
 
@@ -108,19 +107,16 @@ vsl_wrap(void)
 
 	assert(vsl_ptr >= vsl_head->log);
 	assert(vsl_ptr < vsl_end);
+	vsl_segment_n += VSL_SEGMENTS - (vsl_segment_n % VSL_SEGMENTS);
+	assert(vsl_segment_n % VSL_SEGMENTS == 0);
+	vsl_head->offset[0] = 0;
 	vsl_head->log[0] = VSL_ENDMARKER;
-	do
-		vsl_seq++;
-	while (vsl_seq == 0);
-	vsl_head->seq = vsl_seq;
-	vsl_head->segments[0] = 0;
 	VWMB();
 	if (vsl_ptr != vsl_head->log) {
 		*vsl_ptr = VSL_WRAPMARKER;
 		vsl_ptr = vsl_head->log;
 	}
-	vsl_segment = 0;
-	vsl_head->segment = vsl_segment;
+	vsl_head->segment_n = vsl_segment_n;
 	VSC_C_main->shm_cycles++;
 }
 
@@ -133,7 +129,6 @@ vsl_get(unsigned len, unsigned records, unsigned flushes)
 {
 	uint32_t *p;
 	int err;
-	unsigned old_segment;
 
 	err = pthread_mutex_trylock(&vsl_mtx);
 	if (err == EBUSY) {
@@ -160,21 +155,17 @@ vsl_get(unsigned len, unsigned records, unsigned flushes)
 
 	*vsl_ptr = VSL_ENDMARKER;
 
-	old_segment = vsl_segment;
-	while ((vsl_ptr - vsl_head->log) / vsl_segsize > vsl_segment) {
-		if (vsl_segment == VSL_SEGMENTS - 1)
-			break;
-		vsl_segment++;
-		vsl_head->segments[vsl_segment] = vsl_ptr - vsl_head->log;
-	}
-	if (old_segment != vsl_segment) {
-		/* Write memory barrier to ensure ENDMARKER and new table
-		   values are seen before new segment number */
-		VWMB();
-		vsl_head->segment = vsl_segment;
+	while ((vsl_ptr - vsl_head->log) / vsl_segsize >
+	    vsl_segment_n % VSL_SEGMENTS) {
+		vsl_segment_n++;
+		vsl_head->offset[vsl_segment_n % VSL_SEGMENTS] =
+		    vsl_ptr - vsl_head->log;
 	}
 
 	AZ(pthread_mutex_unlock(&vsl_mtx));
+	/* Implicit VWMB() in mutex op ensures ENDMARKER and new table
+	   values are seen before new segment number */
+	vsl_head->segment_n = vsl_segment_n;
 
 	return (p);
 }
@@ -458,29 +449,31 @@ VSM_Init(void)
 	int i;
 	pthread_t tp;
 
+	assert(UINT_MAX % VSL_SEGMENTS == VSL_SEGMENTS - 1);
+
 	AZ(pthread_mutex_init(&vsl_mtx, NULL));
 	AZ(pthread_mutex_init(&vsm_mtx, NULL));
 
 	vsl_head = VSM_Alloc(cache_param->vsl_space, VSL_CLASS, "", "");
 	AN(vsl_head);
-	vsl_end = vsl_head->log +
-	    (cache_param->vsl_space - sizeof *vsl_head) / sizeof *vsl_end;
-	vsl_segsize = (vsl_end - vsl_head->log) / VSL_SEGMENTS;
-
-	memset(vsl_head, 0, sizeof *vsl_head);
-	memcpy(vsl_head->marker, VSL_HEAD_MARKER, sizeof vsl_head->marker);
-	vsl_head->segments[0] = 0;
-	for (i = 1; i < VSL_SEGMENTS; i++)
-		vsl_head->segments[i] = -1;
+	vsl_segsize = (cache_param->vsl_space - sizeof *vsl_head) /
+	    sizeof *vsl_end / VSL_SEGMENTS;
+	vsl_end = vsl_head->log + vsl_segsize * VSL_SEGMENTS;
+	/* Make segment_n always overflow on first log wrap to make any
+	   problems with regard to readers on that event visible */
+	vsl_segment_n = 0U - VSL_SEGMENTS;
+	AZ(vsl_segment_n % VSL_SEGMENTS);
+	vsl_head->segment_n = vsl_segment_n;
 	vsl_ptr = vsl_head->log;
 	*vsl_ptr = VSL_ENDMARKER;
 
+	memset(vsl_head, 0, sizeof *vsl_head);
+	vsl_head->segsize = vsl_segsize;
+	vsl_head->offset[0] = 0;
+	for (i = 1; i < VSL_SEGMENTS; i++)
+		vsl_head->offset[i] = -1;
 	VWMB();
-	do
-		vsl_seq = random();
-	while (vsl_seq == 0);
-	vsl_head->seq = vsl_seq;
-	VWMB();
+	memcpy(vsl_head->marker, VSL_HEAD_MARKER, sizeof vsl_head->marker);
 
 	VSC_C_main = VSM_Alloc(sizeof *VSC_C_main,
 	    VSC_CLASS, VSC_type_main, "");
diff --git a/include/vapi/vsl_int.h b/include/vapi/vsl_int.h
index e3046b2..15ed9af 100644
--- a/include/vapi/vsl_int.h
+++ b/include/vapi/vsl_int.h
@@ -40,7 +40,7 @@
 #define VAPI_VSL_INT_H_INCLUDED
 
 #define VSL_CLASS		"Log"
-#define VSL_SEGMENTS		8
+#define VSL_SEGMENTS		8U
 
 /*
  * Shared memory log format
diff --git a/include/vsl_priv.h b/include/vsl_priv.h
index d5643ed..0c780dc 100644
--- a/include/vsl_priv.h
+++ b/include/vsl_priv.h
@@ -39,32 +39,30 @@
 #include "vapi/vsm_int.h"
 
 #define VSL_CLASS		"Log"
-#define VSL_SEGMENTS		8
+#define VSL_SEGMENTS		8U	// power of two
 
 /*
  * Shared memory log format
  *
  * The segments array has index values providing safe entry points into
  * the log, where each element N gives the index of the first log record
- * in the Nth fraction of the log. An index value of -1 indicated that no
- * log records in this fraction exists.
+ * in the Nth segment of the log. An index value of -1 indicates that no
+ * log records in this segment exists.
  *
- * The segment member shows the current segment where Varnish is currently
- * appending log data.
- *
- * The seq member contains a non-zero seq number randomly initialized,
- * which increases whenever writing the log starts from the front.
+ * The segment_n member is incremented only, natively wrapping at
+ * UINT_MAX. When taken modulo VSL_SEGMENTS, it gives the current index
+ * into the offset array.
  *
  * The format of the actual log is in vapi/vsl_int.h
  *
  */
 
 struct VSL_head {
-#define VSL_HEAD_MARKER		"VSLHEAD0"	/* Incr. as version# */
+#define VSL_HEAD_MARKER		"VSLHEAD1"	/* Incr. as version# */
 	char			marker[VSM_MARKER_LEN];
-	volatile ssize_t	segments[VSL_SEGMENTS];
-	volatile unsigned	segment;	/* Current varnishd segment */
-	volatile unsigned	seq;		/* Non-zero seq number */
+	ssize_t			segsize;
+	unsigned		segment_n;
+	ssize_t			offset[VSL_SEGMENTS];
 	uint32_t		log[];
 };
 
diff --git a/lib/libvarnishapi/vsl_cursor.c b/lib/libvarnishapi/vsl_cursor.c
index ba6e7d2..b9bbcb3 100644
--- a/lib/libvarnishapi/vsl_cursor.c
+++ b/lib/libvarnishapi/vsl_cursor.c
@@ -42,6 +42,7 @@
 #include "vdef.h"
 #include "vas.h"
 #include "miniobj.h"
+#include "vmb.h"
 
 #include "vqueue.h"
 #include "vre.h"
@@ -66,7 +67,6 @@ struct vslc_vsm {
 
 	const struct VSL_head		*head;
 	const uint32_t			*end;
-	ssize_t				segsize;
 	struct VSLC_ptr			next;
 };
 
@@ -80,11 +80,21 @@ vslc_vsm_delete(const struct VSL_cursor *cursor)
 	FREE_OBJ(c);
 }
 
+/*
+ * We tolerate the fact that segment_n wraps around eventually: for the default
+ * vsl_space of 80MB and 8 segments, each segement is 10MB long, so we wrap
+ * roughly after 40 pebibytes (32bit) or 160 yobibytes (64bit) worth of vsl
+ * written.
+ *
+ * The vsm_check would fail if a vslc paused while this amount of data was
+ * written
+ */
+
 static int
 vslc_vsm_check(const struct VSL_cursor *cursor, const struct VSLC_ptr *ptr)
 {
 	const struct vslc_vsm *c;
-	unsigned seqdiff, segment, segdiff;
+	unsigned dist;
 
 	CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VSM_MAGIC);
 	assert(&c->cursor == cursor);
@@ -92,28 +102,12 @@ vslc_vsm_check(const struct VSL_cursor *cursor, const struct VSLC_ptr *ptr)
 	if (ptr->ptr == NULL)
 		return (0);
 
-	/* Check sequence number */
-	seqdiff = c->head->seq - ptr->priv;
-	if (c->head->seq < ptr->priv)
-		/* Wrap around skips 0 */
-		seqdiff -= 1;
-	if (seqdiff > 1)
-		/* Too late */
-		return (0);
+	dist = c->head->segment_n - ptr->priv;
 
-	/* Check overrun */
-	segment = (ptr->ptr - c->head->log) / c->segsize;
-	if (segment >= VSL_SEGMENTS)
-		/* Rounding error spills to last segment */
-		segment = VSL_SEGMENTS - 1;
-	segdiff = (segment - c->head->segment) % VSL_SEGMENTS;
-	if (segdiff == 0 && seqdiff == 0)
-		/* In same segment, but close to tail */
-		return (2);
-	if (segdiff <= 2)
+	if (dist >= VSL_SEGMENTS - 2)
 		/* Too close to continue */
 		return (0);
-	if (segdiff <= 4)
+	if (dist >= VSL_SEGMENTS - 4)
 		/* Warning level */
 		return (1);
 	/* Safe */
@@ -131,11 +125,6 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
 	assert(&c->cursor == cursor);
 	CHECK_OBJ_NOTNULL(c->vsm, VSM_MAGIC);
 
-	/* Assert pointers */
-	AN(c->next.ptr);
-	assert(c->next.ptr >= c->head->log);
-	assert(c->next.ptr < c->end);
-
 	i = vslc_vsm_check(&c->cursor, &c->next);
 	if (i <= 0)
 		/* Overrun */
@@ -149,9 +138,6 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
 	}
 
 	while (1) {
-		assert(c->next.ptr >= c->head->log);
-		assert(c->next.ptr < c->end);
-		AN(c->head->seq);
 		t = *(volatile const uint32_t *)c->next.ptr;
 		AN(t);
 
@@ -159,17 +145,12 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
 			/* Wrap around not possible at front */
 			assert(c->next.ptr != c->head->log);
 			c->next.ptr = c->head->log;
+			while (c->next.priv % VSL_SEGMENTS)
+				c->next.priv++;
 			continue;
 		}
 
 		if (t == VSL_ENDMARKER) {
-			if (c->next.ptr != c->head->log &&
-			    c->next.priv != c->head->seq) {
-				/* ENDMARKER not at front and seq wrapped */
-				/* XXX: assert on this? */
-				c->next.ptr = c->head->log;
-				continue;
-			}
 			if (c->options & VSL_COPT_TAILSTOP)
 				/* EOF */
 				return (-1);
@@ -177,11 +158,9 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
 				return (0);
 		}
 
-		if (c->next.ptr == c->head->log)
-			c->next.priv = c->head->seq;
-
 		c->cursor.rec = c->next;
 		c->next.ptr = VSL_NEXT(c->next.ptr);
+
 		if (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch) {
 			if (!(c->options & VSL_COPT_BATCH))
 				/* Skip the batch record */
@@ -191,6 +170,14 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
 			c->next.ptr +=
 			    VSL_WORDS(VSL_BATCHLEN(c->cursor.rec.ptr));
 		}
+
+		while ((c->next.ptr - c->head->log) / c->head->segsize >
+		    c->next.priv % VSL_SEGMENTS)
+			c->next.priv++;
+
+		assert(c->next.ptr >= c->head->log);
+		assert(c->next.ptr < c->end);
+
 		return (1);
 	}
 }
@@ -199,25 +186,53 @@ static int
 vslc_vsm_reset(const struct VSL_cursor *cursor)
 {
 	struct vslc_vsm *c;
-	unsigned segment;
+	unsigned u, segment_n;
+	int i;
 
 	CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VSM_MAGIC);
 	assert(&c->cursor == cursor);
-
-	/*
-	 * Starting (VSL_SEGMENTS - 3) behind varnishd. This way
-	 * even if varnishd wraps immediately, we'll still have a
-	 * full segment worth of log before the general constraint
-	 * of at least 2 segments apart will be broken
-	 */
-	segment = (c->head->segment + 3) % VSL_SEGMENTS;
-	if (c->head->segments[segment] < 0)
-		segment = 0;
-	assert(c->head->segments[segment] >= 0);
-	c->next.ptr = c->head->log + c->head->segments[segment];
-	c->next.priv = c->head->seq;
 	c->cursor.rec.ptr = NULL;
 
+	segment_n = c->head->segment_n;
+	VRMB();			/* Make sure offset table is not stale
+				   compared to segment_n */
+
+	if (c->options & VSL_COPT_TAIL) {
+		/* Start in the same segment varnishd currently is in and
+		   run forward until we see the end */
+		u = c->next.priv = segment_n;
+		assert(c->head->offset[c->next.priv % VSL_SEGMENTS] >= 0);
+		c->next.ptr = c->head->log +
+		    c->head->offset[c->next.priv % VSL_SEGMENTS];
+		do {
+			if (c->head->segment_n - u > 1) {
+				/* Give up if varnishd is moving faster
+				   than us */
+				return (-3); /* overrun */
+			}
+			i = vslc_vsm_next(&c->cursor);
+		} while (i == 1);
+		if (i)
+			return (i);
+	} else {
+		/* Starting (VSL_SEGMENTS - 3) behind varnishd. This way
+		 * even if varnishd advances segment_n immediately, we'll
+		 * still have a full segment worth of log before the
+		 * general constraint of at least 2 segments apart will be
+		 * broken.
+		 */
+		c->next.priv = segment_n - (VSL_SEGMENTS - 3);
+		while (c->head->offset[c->next.priv % VSL_SEGMENTS] < 0) {
+			/* seg 0 must be initialized */
+			assert(c->next.priv % VSL_SEGMENTS != 0);
+			c->next.priv++;
+		}
+		assert(c->head->offset[c->next.priv % VSL_SEGMENTS] >= 0);
+		c->next.ptr = c->head->log +
+		    c->head->offset[c->next.priv % VSL_SEGMENTS];
+	}
+	assert(c->next.ptr >= c->head->log);
+	assert(c->next.ptr < c->end);
 	return (0);
 }
 
@@ -235,28 +250,25 @@ VSL_CursorVSM(struct VSL_data *vsl, struct VSM_data *vsm, unsigned options)
 	struct vslc_vsm *c;
 	struct VSM_fantom vf;
 	struct VSL_head *head;
+	int i;
 
 	CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC);
 	CHECK_OBJ_NOTNULL(vsm, VSM_MAGIC);
 
 	if (!VSM_Get(vsm, &vf, VSL_CLASS, "", "")) {
-		vsl_diag(vsl, "No VSL chunk found (child not started ?)\n");
+		(void)vsl_diag(vsl,
+		    "No VSL chunk found (child not started ?)\n");
 		return (NULL);
 	}
 
 	head = vf.b;
 	if (memcmp(head->marker, VSL_HEAD_MARKER, sizeof head->marker)) {
-		vsl_diag(vsl, "Not a VSL chunk\n");
-		return (NULL);
-	}
-	if (head->seq == 0) {
-		vsl_diag(vsl, "VSL chunk not initialized\n");
+		(void)vsl_diag(vsl, "Not a VSL chunk\n");
 		return (NULL);
 	}
-
 	ALLOC_OBJ(c, VSLC_VSM_MAGIC);
 	if (c == NULL) {
-		vsl_diag(vsl, "Out of memory\n");
+		(void)vsl_diag(vsl, "Out of memory\n");
 		return (NULL);
 	}
 	c->cursor.priv_tbl = &vslc_vsm_tbl;
@@ -266,19 +278,15 @@ VSL_CursorVSM(struct VSL_data *vsl, struct VSM_data *vsm, unsigned options)
 	c->vsm = vsm;
 	c->vf = vf;
 	c->head = head;
-	c->end = vf.e;
-	c->segsize = (c->end - c->head->log) / VSL_SEGMENTS;
+	c->end = c->head->log + c->head->segsize * VSL_SEGMENTS;
+	assert(c->end <= (const uint32_t *)vf.e);
 
-	if (c->options & VSL_COPT_TAIL) {
-		/* Locate tail of log */
-		c->next.ptr = c->head->log +
-		    c->head->segments[c->head->segment];
-		while (c->next.ptr < c->end &&
-		    *(volatile const uint32_t *)c->next.ptr != VSL_ENDMARKER)
-			c->next.ptr = VSL_NEXT(c->next.ptr);
-		c->next.priv = c->head->seq;
-	} else
-		AZ(vslc_vsm_reset(&c->cursor));
+	i = vslc_vsm_reset(&c->cursor);
+	if (i) {
+		(void)vsl_diag(vsl, "Cursor initialization failure (%d)\n", i);
+		FREE_OBJ(c);
+		return (NULL);
+	}
 
 	return (&c->cursor);
 }



More information about the varnish-commit mailing list