From 848cb5a6d68d480f3960b0204fd4837a93911212 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Tue, 3 Aug 2021 20:03:54 +0530 Subject: [PATCH] Enhanced WAL replay for duplicate series record (#7438) Signed-off-by: Ganesh Vernekar --- tsdb/head.go | 8 ++++-- tsdb/head_test.go | 8 ++++-- tsdb/head_wal.go | 71 +++++++++++++++++++++++++++++++++++------------ 3 files changed, 65 insertions(+), 22 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index fff3c4dbc3..8291b03525 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1466,7 +1466,11 @@ type memChunk struct { // OverlapsClosedInterval returns true if the chunk overlaps [mint, maxt]. func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool { - return mc.minTime <= maxt && mint <= mc.maxTime + return overlapsClosedInterval(mc.minTime, mc.maxTime, mint, maxt) +} + +func overlapsClosedInterval(mint1, maxt1, mint2, maxt2 int64) bool { + return mint1 <= maxt2 && mint2 <= maxt1 } type mmappedChunk struct { @@ -1477,7 +1481,7 @@ type mmappedChunk struct { // Returns true if the chunk overlaps [mint, maxt]. func (mc *mmappedChunk) OverlapsClosedInterval(mint, maxt int64) bool { - return mc.minTime <= maxt && mint <= mc.maxTime + return overlapsClosedInterval(mc.minTime, mc.maxTime, mint, maxt) } type noopSeriesLifecycleCallback struct{} diff --git a/tsdb/head_test.go b/tsdb/head_test.go index b6aab07807..68207360e1 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -306,7 +306,9 @@ func TestHead_ReadWAL(t *testing.T) { } require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil))) require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil))) - require.Equal(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil))) + // The samples before the new series record should be discarded since a duplicate record + // is only possible when old samples were compacted. + require.Equal(t, []sample{{101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil))) q, err := head.ExemplarQuerier(context.Background()) require.NoError(t, err) @@ -369,9 +371,9 @@ func TestHead_WALMultiRef(t *testing.T) { q, err := NewBlockQuerier(head, 0, 2100) require.NoError(t, err) series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + // The samples before the new ref should be discarded since Head truncation + // happens only after compacting the Head. require.Equal(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: { - sample{100, 1}, - sample{1500, 2}, sample{1700, 3}, sample{2000, 4}, }}, series) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 707bb0fd97..056bd288a1 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -18,6 +18,7 @@ import ( "math" "runtime" "sync" + "time" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -181,38 +182,74 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks } }() + // The records are always replayed from the oldest to the newest. Outer: for d := range decoded { switch v := d.(type) { case []record.RefSeries: - for _, s := range v { - series, created, err := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + for _, walSeries := range v { + mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels) if err != nil { seriesCreationErr = err break Outer } + if h.lastSeriesID.Load() < walSeries.Ref { + h.lastSeriesID.Store(walSeries.Ref) + } + + mmc := mmappedChunks[walSeries.Ref] + if created { - // If this series gets a duplicate record, we don't restore its mmapped chunks, - // and instead restore everything from WAL records. - series.mmappedChunks = mmappedChunks[series.ref] + // This is the first WAL series record for this series. + h.metrics.chunksCreated.Add(float64(len(mmc))) + h.metrics.chunks.Add(float64(len(mmc))) + mSeries.mmappedChunks = mmc + continue + } - h.metrics.chunks.Add(float64(len(series.mmappedChunks))) - h.metrics.chunksCreated.Add(float64(len(series.mmappedChunks))) + // There's already a different ref for this series. + // A duplicate series record is only possible when the old samples were already compacted into a block. + // Hence we can discard all the samples and m-mapped chunks replayed till now for this series. - if len(series.mmappedChunks) > 0 { - h.updateMinMaxTime(series.minTime(), series.maxTime()) + multiRef[walSeries.Ref] = mSeries.ref + + idx := mSeries.ref % uint64(n) + // It is possible that some old sample is being processed in processWALSamples that + // could cause race below. So we wait for the goroutine to empty input the buffer and finish + // processing all old samples after emptying the buffer. + inputs[idx] <- []record.RefSample{} + for len(inputs[idx]) != 0 { + time.Sleep(1 * time.Millisecond) + } + + // Checking if the new m-mapped chunks overlap with the already existing ones. + // This should never happen, but we have a check anyway to detect any + // edge cases that we might have missed. + if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 { + if overlapsClosedInterval( + mSeries.mmappedChunks[0].minTime, + mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime, + mmc[0].minTime, + mmc[len(mmc)-1].maxTime, + ) { + // The m-map chunks for the new series ref overlaps with old m-map chunks. + seriesCreationErr = errors.Errorf("overlapping m-mapped chunks for series %s", mSeries.lset.String()) + break Outer } - } else { - // TODO(codesome) Discard old samples and mmapped chunks and use mmap chunks for the new series ID. - - // There's already a different ref for this series. - multiRef[s.Ref] = series.ref } - if h.lastSeriesID.Load() < s.Ref { - h.lastSeriesID.Store(s.Ref) - } + // Replacing m-mapped chunks with the new ones (could be empty). + h.metrics.chunksCreated.Add(float64(len(mmc))) + h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) + h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks))) + mSeries.mmappedChunks = mmc + + // Any samples replayed till now would already be compacted. Resetting the head chunk. + mSeries.nextAt = 0 + mSeries.headChunk = nil + mSeries.app = nil + h.updateMinMaxTime(mSeries.minTime(), mSeries.maxTime()) } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. seriesPool.Put(v)