From ed1852ab95c47c7e2bfe90c074c2e75c30458c19 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Fri, 17 Apr 2020 20:51:03 +0200 Subject: [PATCH] TSDB: Isolation: avoid creating appenderId's without appender (#7135) Prior to this commit we could have situations where we are creating an appenderId but never creating an appender to go with it, therefore blocking the low watermak. Signed-off-by: Julien Pivotto --- tsdb/head.go | 18 +++++++----------- tsdb/head_test.go | 37 +++++++++++++++++++++++++++++-------- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index b8494f6182..437fb469d3 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -803,8 +803,6 @@ func (h *RangeHead) Meta() BlockMeta { type initAppender struct { app storage.Appender head *Head - - appendID, cleanupAppendIDsBelow uint64 } func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -812,7 +810,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro return a.app.Add(lset, t, v) } a.head.initTime(t) - a.app = a.head.appender(a.appendID, a.cleanupAppendIDsBelow) + a.app = a.head.appender() return a.app.Add(lset, t, v) } @@ -842,22 +840,20 @@ func (a *initAppender) Rollback() error { func (h *Head) Appender() storage.Appender { h.metrics.activeAppenders.Inc() - appendID := h.iso.newAppendID() - cleanupAppendIDsBelow := h.iso.lowWatermark() - // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. if h.MinTime() == math.MaxInt64 { return &initAppender{ - head: h, - appendID: appendID, - cleanupAppendIDsBelow: cleanupAppendIDsBelow, + head: h, } } - return h.appender(appendID, cleanupAppendIDsBelow) + return h.appender() } -func (h *Head) appender(appendID, cleanupAppendIDsBelow uint64) *headAppender { +func (h *Head) appender() *headAppender { + appendID := h.iso.newAppendID() + cleanupAppendIDsBelow := h.iso.lowWatermark() + return &headAppender{ head: h, // Set the minimum valid time to whichever is greater the head min valid time or the compaction window. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 08c53418de..6e8b5293d1 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1078,7 +1078,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { h.initTime(0) - app := h.appender(0, 0) + app := h.appender() lset := labels.FromStrings("a", "1") _, err = app.Add(lset, 2100, 1) testutil.Ok(t, err) @@ -1106,7 +1106,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { h.initTime(0) - app := h.appender(0, 0) + app := h.appender() lset := labels.FromStrings("a", "1") _, err = app.Add(lset, 2100, 1) testutil.Ok(t, err) @@ -1368,14 +1368,16 @@ func TestMemSeriesIsolation(t *testing.T) { return -1 } - i := 0 + i := 1 for ; i <= 1000; i++ { var app storage.Appender // To initialize bounds. if hb.MinTime() == math.MaxInt64 { - app = &initAppender{head: hb, appendID: uint64(i), cleanupAppendIDsBelow: 0} + app = &initAppender{head: hb} } else { - app = hb.appender(uint64(i), 0) + a := hb.appender() + a.cleanupAppendIDsBelow = 0 + app = a } _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) @@ -1394,7 +1396,8 @@ func TestMemSeriesIsolation(t *testing.T) { testutil.Equals(t, 999, lastValue(999)) // Cleanup appendIDs below 500. - app := hb.appender(uint64(i), 500) + app := hb.appender() + app.cleanupAppendIDsBelow = 500 _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1412,7 +1415,8 @@ func TestMemSeriesIsolation(t *testing.T) { // Cleanup appendIDs below 1000, which means the sample buffer is // the only thing with appendIDs. - app = hb.appender(uint64(i), 1000) + app = hb.appender() + app.cleanupAppendIDsBelow = 1000 _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1425,7 +1429,8 @@ func TestMemSeriesIsolation(t *testing.T) { i++ // Cleanup appendIDs below 1001, but with a rollback. - app = hb.appender(uint64(i), 1001) + app = hb.appender() + app.cleanupAppendIDsBelow = 1001 _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) @@ -1515,6 +1520,22 @@ func TestHeadSeriesChunkRace(t *testing.T) { } } +func TestIsolationWithoutAdd(t *testing.T) { + hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + testutil.Ok(t, err) + defer hb.Close() + + app := hb.Appender() + testutil.Ok(t, app.Commit()) + + app = hb.Appender() + _, err = app.Add(labels.FromStrings("foo", "baz"), 1, 1) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + testutil.Equals(t, hb.iso.lastAppendID, hb.iso.lowWatermark(), "High watermark should be equal to the low watermark") +} + func testHeadSeriesChunkRace(t *testing.T) { h, err := NewHead(nil, nil, nil, 30, DefaultStripeSize) testutil.Ok(t, err)