[master] 35cf781 Simplify vsl segment management, fixing spurious vsl overruns
Nils Goroll
nils.goroll at uplex.de
Fri Jun 26 14:38:38 CEST 2015
commit 35cf78131c2aaaf8227c31ba92eb577c5bea9855
Author: Nils Goroll <nils.goroll at uplex.de>
Date: Mon Jun 15 12:47:04 2015 +0200
Simplify vsl segment management, fixing spurious vsl overruns
vsl sequence and segment updates didn't happen atomically, so
vslc_vsm_check could report spurious overruns.
Replace sequence and segment index with a counter (segment_n), which
always increments (with overflow after UINT_MAX). The actual segment
index will be segment_n % VSL_SEGMENTS. Overrun detection by
calculation of the difference between two segment numbers becomes
simple and safe because we only ever access/update a single integer.
Update the shared memory log head.
(struct VSLC_ptr).priv is now the the equivalent of segment_n from the
reader side. It gets initialized once and is maintained independently.
Patch prepared in collaboration with Martin Blix Grydeland
<martin at varnish-software.com>
Fixes: #1747
diff --git a/bin/varnishd/cache/cache_shmlog.c b/bin/varnishd/cache/cache_shmlog.c
index 2388969..3566d53 100644
--- a/bin/varnishd/cache/cache_shmlog.c
+++ b/bin/varnishd/cache/cache_shmlog.c
@@ -47,9 +47,8 @@ static pthread_mutex_t vsm_mtx;
static struct VSL_head *vsl_head;
static const uint32_t *vsl_end;
static uint32_t *vsl_ptr;
-static unsigned vsl_segment;
+static unsigned vsl_segment_n;
static ssize_t vsl_segsize;
-static unsigned vsl_seq;
struct VSC_C_main *VSC_C_main;
@@ -108,19 +107,16 @@ vsl_wrap(void)
assert(vsl_ptr >= vsl_head->log);
assert(vsl_ptr < vsl_end);
+ vsl_segment_n += VSL_SEGMENTS - (vsl_segment_n % VSL_SEGMENTS);
+ assert(vsl_segment_n % VSL_SEGMENTS == 0);
+ vsl_head->offset[0] = 0;
vsl_head->log[0] = VSL_ENDMARKER;
- do
- vsl_seq++;
- while (vsl_seq == 0);
- vsl_head->seq = vsl_seq;
- vsl_head->segments[0] = 0;
VWMB();
if (vsl_ptr != vsl_head->log) {
*vsl_ptr = VSL_WRAPMARKER;
vsl_ptr = vsl_head->log;
}
- vsl_segment = 0;
- vsl_head->segment = vsl_segment;
+ vsl_head->segment_n = vsl_segment_n;
VSC_C_main->shm_cycles++;
}
@@ -133,7 +129,6 @@ vsl_get(unsigned len, unsigned records, unsigned flushes)
{
uint32_t *p;
int err;
- unsigned old_segment;
err = pthread_mutex_trylock(&vsl_mtx);
if (err == EBUSY) {
@@ -160,21 +155,17 @@ vsl_get(unsigned len, unsigned records, unsigned flushes)
*vsl_ptr = VSL_ENDMARKER;
- old_segment = vsl_segment;
- while ((vsl_ptr - vsl_head->log) / vsl_segsize > vsl_segment) {
- if (vsl_segment == VSL_SEGMENTS - 1)
- break;
- vsl_segment++;
- vsl_head->segments[vsl_segment] = vsl_ptr - vsl_head->log;
- }
- if (old_segment != vsl_segment) {
- /* Write memory barrier to ensure ENDMARKER and new table
- values are seen before new segment number */
- VWMB();
- vsl_head->segment = vsl_segment;
+ while ((vsl_ptr - vsl_head->log) / vsl_segsize >
+ vsl_segment_n % VSL_SEGMENTS) {
+ vsl_segment_n++;
+ vsl_head->offset[vsl_segment_n % VSL_SEGMENTS] =
+ vsl_ptr - vsl_head->log;
}
AZ(pthread_mutex_unlock(&vsl_mtx));
+ /* Implicit VWMB() in mutex op ensures ENDMARKER and new table
+ values are seen before new segment number */
+ vsl_head->segment_n = vsl_segment_n;
return (p);
}
@@ -458,29 +449,31 @@ VSM_Init(void)
int i;
pthread_t tp;
+ assert(UINT_MAX % VSL_SEGMENTS == VSL_SEGMENTS - 1);
+
AZ(pthread_mutex_init(&vsl_mtx, NULL));
AZ(pthread_mutex_init(&vsm_mtx, NULL));
vsl_head = VSM_Alloc(cache_param->vsl_space, VSL_CLASS, "", "");
AN(vsl_head);
- vsl_end = vsl_head->log +
- (cache_param->vsl_space - sizeof *vsl_head) / sizeof *vsl_end;
- vsl_segsize = (vsl_end - vsl_head->log) / VSL_SEGMENTS;
-
- memset(vsl_head, 0, sizeof *vsl_head);
- memcpy(vsl_head->marker, VSL_HEAD_MARKER, sizeof vsl_head->marker);
- vsl_head->segments[0] = 0;
- for (i = 1; i < VSL_SEGMENTS; i++)
- vsl_head->segments[i] = -1;
+ vsl_segsize = (cache_param->vsl_space - sizeof *vsl_head) /
+ sizeof *vsl_end / VSL_SEGMENTS;
+ vsl_end = vsl_head->log + vsl_segsize * VSL_SEGMENTS;
+ /* Make segment_n always overflow on first log wrap to make any
+ problems with regard to readers on that event visible */
+ vsl_segment_n = 0U - VSL_SEGMENTS;
+ AZ(vsl_segment_n % VSL_SEGMENTS);
+ vsl_head->segment_n = vsl_segment_n;
vsl_ptr = vsl_head->log;
*vsl_ptr = VSL_ENDMARKER;
+ memset(vsl_head, 0, sizeof *vsl_head);
+ vsl_head->segsize = vsl_segsize;
+ vsl_head->offset[0] = 0;
+ for (i = 1; i < VSL_SEGMENTS; i++)
+ vsl_head->offset[i] = -1;
VWMB();
- do
- vsl_seq = random();
- while (vsl_seq == 0);
- vsl_head->seq = vsl_seq;
- VWMB();
+ memcpy(vsl_head->marker, VSL_HEAD_MARKER, sizeof vsl_head->marker);
VSC_C_main = VSM_Alloc(sizeof *VSC_C_main,
VSC_CLASS, VSC_type_main, "");
diff --git a/include/vapi/vsl_int.h b/include/vapi/vsl_int.h
index e3046b2..15ed9af 100644
--- a/include/vapi/vsl_int.h
+++ b/include/vapi/vsl_int.h
@@ -40,7 +40,7 @@
#define VAPI_VSL_INT_H_INCLUDED
#define VSL_CLASS "Log"
-#define VSL_SEGMENTS 8
+#define VSL_SEGMENTS 8U
/*
* Shared memory log format
diff --git a/include/vsl_priv.h b/include/vsl_priv.h
index d5643ed..0c780dc 100644
--- a/include/vsl_priv.h
+++ b/include/vsl_priv.h
@@ -39,32 +39,30 @@
#include "vapi/vsm_int.h"
#define VSL_CLASS "Log"
-#define VSL_SEGMENTS 8
+#define VSL_SEGMENTS 8U // power of two
/*
* Shared memory log format
*
* The segments array has index values providing safe entry points into
* the log, where each element N gives the index of the first log record
- * in the Nth fraction of the log. An index value of -1 indicated that no
- * log records in this fraction exists.
+ * in the Nth segment of the log. An index value of -1 indicates that no
+ * log records in this segment exists.
*
- * The segment member shows the current segment where Varnish is currently
- * appending log data.
- *
- * The seq member contains a non-zero seq number randomly initialized,
- * which increases whenever writing the log starts from the front.
+ * The segment_n member is incremented only, natively wrapping at
+ * UINT_MAX. When taken modulo VSL_SEGMENTS, it gives the current index
+ * into the offset array.
*
* The format of the actual log is in vapi/vsl_int.h
*
*/
struct VSL_head {
-#define VSL_HEAD_MARKER "VSLHEAD0" /* Incr. as version# */
+#define VSL_HEAD_MARKER "VSLHEAD1" /* Incr. as version# */
char marker[VSM_MARKER_LEN];
- volatile ssize_t segments[VSL_SEGMENTS];
- volatile unsigned segment; /* Current varnishd segment */
- volatile unsigned seq; /* Non-zero seq number */
+ ssize_t segsize;
+ unsigned segment_n;
+ ssize_t offset[VSL_SEGMENTS];
uint32_t log[];
};
diff --git a/lib/libvarnishapi/vsl_cursor.c b/lib/libvarnishapi/vsl_cursor.c
index ba6e7d2..b9bbcb3 100644
--- a/lib/libvarnishapi/vsl_cursor.c
+++ b/lib/libvarnishapi/vsl_cursor.c
@@ -42,6 +42,7 @@
#include "vdef.h"
#include "vas.h"
#include "miniobj.h"
+#include "vmb.h"
#include "vqueue.h"
#include "vre.h"
@@ -66,7 +67,6 @@ struct vslc_vsm {
const struct VSL_head *head;
const uint32_t *end;
- ssize_t segsize;
struct VSLC_ptr next;
};
@@ -80,11 +80,21 @@ vslc_vsm_delete(const struct VSL_cursor *cursor)
FREE_OBJ(c);
}
+/*
+ * We tolerate the fact that segment_n wraps around eventually: for the default
+ * vsl_space of 80MB and 8 segments, each segement is 10MB long, so we wrap
+ * roughly after 40 pebibytes (32bit) or 160 yobibytes (64bit) worth of vsl
+ * written.
+ *
+ * The vsm_check would fail if a vslc paused while this amount of data was
+ * written
+ */
+
static int
vslc_vsm_check(const struct VSL_cursor *cursor, const struct VSLC_ptr *ptr)
{
const struct vslc_vsm *c;
- unsigned seqdiff, segment, segdiff;
+ unsigned dist;
CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VSM_MAGIC);
assert(&c->cursor == cursor);
@@ -92,28 +102,12 @@ vslc_vsm_check(const struct VSL_cursor *cursor, const struct VSLC_ptr *ptr)
if (ptr->ptr == NULL)
return (0);
- /* Check sequence number */
- seqdiff = c->head->seq - ptr->priv;
- if (c->head->seq < ptr->priv)
- /* Wrap around skips 0 */
- seqdiff -= 1;
- if (seqdiff > 1)
- /* Too late */
- return (0);
+ dist = c->head->segment_n - ptr->priv;
- /* Check overrun */
- segment = (ptr->ptr - c->head->log) / c->segsize;
- if (segment >= VSL_SEGMENTS)
- /* Rounding error spills to last segment */
- segment = VSL_SEGMENTS - 1;
- segdiff = (segment - c->head->segment) % VSL_SEGMENTS;
- if (segdiff == 0 && seqdiff == 0)
- /* In same segment, but close to tail */
- return (2);
- if (segdiff <= 2)
+ if (dist >= VSL_SEGMENTS - 2)
/* Too close to continue */
return (0);
- if (segdiff <= 4)
+ if (dist >= VSL_SEGMENTS - 4)
/* Warning level */
return (1);
/* Safe */
@@ -131,11 +125,6 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
assert(&c->cursor == cursor);
CHECK_OBJ_NOTNULL(c->vsm, VSM_MAGIC);
- /* Assert pointers */
- AN(c->next.ptr);
- assert(c->next.ptr >= c->head->log);
- assert(c->next.ptr < c->end);
-
i = vslc_vsm_check(&c->cursor, &c->next);
if (i <= 0)
/* Overrun */
@@ -149,9 +138,6 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
}
while (1) {
- assert(c->next.ptr >= c->head->log);
- assert(c->next.ptr < c->end);
- AN(c->head->seq);
t = *(volatile const uint32_t *)c->next.ptr;
AN(t);
@@ -159,17 +145,12 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
/* Wrap around not possible at front */
assert(c->next.ptr != c->head->log);
c->next.ptr = c->head->log;
+ while (c->next.priv % VSL_SEGMENTS)
+ c->next.priv++;
continue;
}
if (t == VSL_ENDMARKER) {
- if (c->next.ptr != c->head->log &&
- c->next.priv != c->head->seq) {
- /* ENDMARKER not at front and seq wrapped */
- /* XXX: assert on this? */
- c->next.ptr = c->head->log;
- continue;
- }
if (c->options & VSL_COPT_TAILSTOP)
/* EOF */
return (-1);
@@ -177,11 +158,9 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
return (0);
}
- if (c->next.ptr == c->head->log)
- c->next.priv = c->head->seq;
-
c->cursor.rec = c->next;
c->next.ptr = VSL_NEXT(c->next.ptr);
+
if (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch) {
if (!(c->options & VSL_COPT_BATCH))
/* Skip the batch record */
@@ -191,6 +170,14 @@ vslc_vsm_next(const struct VSL_cursor *cursor)
c->next.ptr +=
VSL_WORDS(VSL_BATCHLEN(c->cursor.rec.ptr));
}
+
+ while ((c->next.ptr - c->head->log) / c->head->segsize >
+ c->next.priv % VSL_SEGMENTS)
+ c->next.priv++;
+
+ assert(c->next.ptr >= c->head->log);
+ assert(c->next.ptr < c->end);
+
return (1);
}
}
@@ -199,25 +186,53 @@ static int
vslc_vsm_reset(const struct VSL_cursor *cursor)
{
struct vslc_vsm *c;
- unsigned segment;
+ unsigned u, segment_n;
+ int i;
CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VSM_MAGIC);
assert(&c->cursor == cursor);
-
- /*
- * Starting (VSL_SEGMENTS - 3) behind varnishd. This way
- * even if varnishd wraps immediately, we'll still have a
- * full segment worth of log before the general constraint
- * of at least 2 segments apart will be broken
- */
- segment = (c->head->segment + 3) % VSL_SEGMENTS;
- if (c->head->segments[segment] < 0)
- segment = 0;
- assert(c->head->segments[segment] >= 0);
- c->next.ptr = c->head->log + c->head->segments[segment];
- c->next.priv = c->head->seq;
c->cursor.rec.ptr = NULL;
+ segment_n = c->head->segment_n;
+ VRMB(); /* Make sure offset table is not stale
+ compared to segment_n */
+
+ if (c->options & VSL_COPT_TAIL) {
+ /* Start in the same segment varnishd currently is in and
+ run forward until we see the end */
+ u = c->next.priv = segment_n;
+ assert(c->head->offset[c->next.priv % VSL_SEGMENTS] >= 0);
+ c->next.ptr = c->head->log +
+ c->head->offset[c->next.priv % VSL_SEGMENTS];
+ do {
+ if (c->head->segment_n - u > 1) {
+ /* Give up if varnishd is moving faster
+ than us */
+ return (-3); /* overrun */
+ }
+ i = vslc_vsm_next(&c->cursor);
+ } while (i == 1);
+ if (i)
+ return (i);
+ } else {
+ /* Starting (VSL_SEGMENTS - 3) behind varnishd. This way
+ * even if varnishd advances segment_n immediately, we'll
+ * still have a full segment worth of log before the
+ * general constraint of at least 2 segments apart will be
+ * broken.
+ */
+ c->next.priv = segment_n - (VSL_SEGMENTS - 3);
+ while (c->head->offset[c->next.priv % VSL_SEGMENTS] < 0) {
+ /* seg 0 must be initialized */
+ assert(c->next.priv % VSL_SEGMENTS != 0);
+ c->next.priv++;
+ }
+ assert(c->head->offset[c->next.priv % VSL_SEGMENTS] >= 0);
+ c->next.ptr = c->head->log +
+ c->head->offset[c->next.priv % VSL_SEGMENTS];
+ }
+ assert(c->next.ptr >= c->head->log);
+ assert(c->next.ptr < c->end);
return (0);
}
@@ -235,28 +250,25 @@ VSL_CursorVSM(struct VSL_data *vsl, struct VSM_data *vsm, unsigned options)
struct vslc_vsm *c;
struct VSM_fantom vf;
struct VSL_head *head;
+ int i;
CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC);
CHECK_OBJ_NOTNULL(vsm, VSM_MAGIC);
if (!VSM_Get(vsm, &vf, VSL_CLASS, "", "")) {
- vsl_diag(vsl, "No VSL chunk found (child not started ?)\n");
+ (void)vsl_diag(vsl,
+ "No VSL chunk found (child not started ?)\n");
return (NULL);
}
head = vf.b;
if (memcmp(head->marker, VSL_HEAD_MARKER, sizeof head->marker)) {
- vsl_diag(vsl, "Not a VSL chunk\n");
- return (NULL);
- }
- if (head->seq == 0) {
- vsl_diag(vsl, "VSL chunk not initialized\n");
+ (void)vsl_diag(vsl, "Not a VSL chunk\n");
return (NULL);
}
-
ALLOC_OBJ(c, VSLC_VSM_MAGIC);
if (c == NULL) {
- vsl_diag(vsl, "Out of memory\n");
+ (void)vsl_diag(vsl, "Out of memory\n");
return (NULL);
}
c->cursor.priv_tbl = &vslc_vsm_tbl;
@@ -266,19 +278,15 @@ VSL_CursorVSM(struct VSL_data *vsl, struct VSM_data *vsm, unsigned options)
c->vsm = vsm;
c->vf = vf;
c->head = head;
- c->end = vf.e;
- c->segsize = (c->end - c->head->log) / VSL_SEGMENTS;
+ c->end = c->head->log + c->head->segsize * VSL_SEGMENTS;
+ assert(c->end <= (const uint32_t *)vf.e);
- if (c->options & VSL_COPT_TAIL) {
- /* Locate tail of log */
- c->next.ptr = c->head->log +
- c->head->segments[c->head->segment];
- while (c->next.ptr < c->end &&
- *(volatile const uint32_t *)c->next.ptr != VSL_ENDMARKER)
- c->next.ptr = VSL_NEXT(c->next.ptr);
- c->next.priv = c->head->seq;
- } else
- AZ(vslc_vsm_reset(&c->cursor));
+ i = vslc_vsm_reset(&c->cursor);
+ if (i) {
+ (void)vsl_diag(vsl, "Cursor initialization failure (%d)\n", i);
+ FREE_OBJ(c);
+ return (NULL);
+ }
return (&c->cursor);
}
More information about the varnish-commit
mailing list