From dff967286e01ef5e125a04191f6415ac9dc22e59 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Wed, 25 Nov 2020 18:33:30 +0530 Subject: [PATCH] Set the min time of Head properly after truncation (#8212) * Set the min time of Head properly after truncation Signed-off-by: Ganesh Vernekar * Fix lint Signed-off-by: Ganesh Vernekar * Enhance compaction plan logic for completely deleted small block Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar --- tsdb/compact.go | 5 +++++ tsdb/head.go | 53 ++++++++++++++++++++++++++++++++++++----------- tsdb/head_test.go | 34 ++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 12 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index 7c6e14216..dbde84ed0 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -214,6 +214,11 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { for i := len(dms) - 1; i >= 0; i-- { meta := dms[i].meta if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { + // If the block is entirely deleted, then we don't care about the block being big enough. + // TODO: This is assuming single tombstone is for distinct series, which might be no true. + if meta.Stats.NumTombstones > 0 && meta.Stats.NumTombstones >= meta.Stats.NumSeries { + return []string{dms[i].dir}, nil + } break } if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { diff --git a/tsdb/head.go b/tsdb/head.go index 342073dd6..6f39d39ea 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -820,9 +820,22 @@ func (h *Head) truncateMemory(mint int64) (err error) { h.metrics.headTruncateTotal.Inc() start := time.Now() - h.gc() + actualMint := h.gc() level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start)) h.metrics.gcDuration.Observe(time.Since(start).Seconds()) + if actualMint > h.minTime.Load() { + // The actual mint of the Head is higher than the one asked to truncate. + appendableMinValidTime := h.appendableMinValidTime() + if actualMint < appendableMinValidTime { + h.minTime.Store(actualMint) + h.minValidTime.Store(actualMint) + } else { + // The actual min time is in the appendable window. + // So we set the mint to the appendableMinValidTime. + h.minTime.Store(appendableMinValidTime) + h.minValidTime.Store(appendableMinValidTime) + } + } // Truncate the chunk m-mapper. if err := h.chunkDiskMapper.Truncate(mint); err != nil { @@ -1054,10 +1067,8 @@ func (h *Head) appender() *headAppender { cleanupAppendIDsBelow := h.iso.lowWatermark() return &headAppender{ - head: h, - // Set the minimum valid time to whichever is greater the head min valid time or the compaction window. - // This ensures that no samples will be added within the compaction window to avoid races. - minValidTime: max(h.minValidTime.Load(), h.MaxTime()-h.chunkRange.Load()/2), + head: h, + minValidTime: h.appendableMinValidTime(), mint: math.MaxInt64, maxt: math.MinInt64, samples: h.getAppendBuffer(), @@ -1067,6 +1078,12 @@ func (h *Head) appender() *headAppender { } } +func (h *Head) appendableMinValidTime() int64 { + // Setting the minimum valid time to whichever is greater, the head min valid time or the compaction window, + // ensures that no samples will be added within the compaction window to avoid races. + return max(h.minValidTime.Load(), h.MaxTime()-h.chunkRange.Load()/2) +} + func max(a, b int64) int64 { if a > b { return a @@ -1335,13 +1352,14 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { } // gc removes data before the minimum timestamp from the head. -func (h *Head) gc() { +// It returns the actual min times of the chunks present in the Head. +func (h *Head) gc() int64 { // Only data strictly lower than this timestamp must be deleted. mint := h.MinTime() // Drop old chunks and remember series IDs and hashes if they can be // deleted entirely. - deleted, chunksRemoved := h.series.gc(mint) + deleted, chunksRemoved, actualMint := h.series.gc(mint) seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) @@ -1382,6 +1400,8 @@ func (h *Head) gc() { panic(err) } h.symbols = symbols + + return actualMint } // Tombstones returns a new reader over the head's tombstones @@ -1813,11 +1833,12 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st // gc garbage collects old chunks that are strictly before mint and removes // series entirely that have no chunks left. -func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { +func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int, int64) { var ( - deleted = map[uint64]struct{}{} - deletedForCallback = []labels.Labels{} - rmChunks = 0 + deleted = map[uint64]struct{}{} + deletedForCallback = []labels.Labels{} + rmChunks = 0 + actualMint int64 = math.MaxInt64 ) // Run through all series and truncate old chunks. Mark those with no // chunks left as deleted and store their ID. @@ -1830,6 +1851,10 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { rmChunks += series.truncateChunksBefore(mint) if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit { + seriesMint := series.minTime() + if seriesMint < actualMint { + actualMint = seriesMint + } series.Unlock() continue } @@ -1864,7 +1889,11 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { deletedForCallback = deletedForCallback[:0] } - return deleted, rmChunks + if actualMint == math.MaxInt64 { + actualMint = mint + } + + return deleted, rmChunks, actualMint } func (s *stripeSeries) getByID(id uint64) *memSeries { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 5f4f59892..ec15d2cf0 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1897,3 +1897,37 @@ func TestErrReuseAppender(t *testing.T) { require.Error(t, app.Commit()) require.Error(t, app.Rollback()) } + +func TestHeadMintAfterTruncation(t *testing.T) { + chunkRange := int64(2000) + head, _ := newTestHead(t, chunkRange, false) + + app := head.Appender(context.Background()) + _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 100, 100) + require.NoError(t, err) + _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 4000, 200) + require.NoError(t, err) + _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 8000, 300) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Truncating outside the appendable window and actual mint being outside + // appendable window should leave mint at the actual mint. + require.NoError(t, head.Truncate(3500)) + require.Equal(t, int64(4000), head.MinTime()) + require.Equal(t, int64(4000), head.minValidTime.Load()) + + // After truncation outside the appendable windown if the actual min time + // is in the appendable window then we should leave mint at the start of appendable window. + require.NoError(t, head.Truncate(5000)) + require.Equal(t, head.appendableMinValidTime(), head.MinTime()) + require.Equal(t, head.appendableMinValidTime(), head.minValidTime.Load()) + + // If the truncation time is inside the appendable window, then the min time + // should be the truncation time. + require.NoError(t, head.Truncate(7500)) + require.Equal(t, int64(7500), head.MinTime()) + require.Equal(t, int64(7500), head.minValidTime.Load()) + + require.NoError(t, head.Close()) +}