From 786d3d99baf8077b5999c32919cde5693ab1c26b Mon Sep 17 00:00:00 2001 From: Patryk Prus
Date: Fri, 28 Feb 2025 11:30:08 -0500 Subject: [PATCH] head.deleted -> head.walExpiries Signed-off-by: Patryk Prus
--- tsdb/head.go | 44 ++++++++++++++++++++++---------------------- tsdb/head_test.go | 12 ++++++------ tsdb/head_wal.go | 2 +- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index f21b94bfe1..95d15f7641 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -107,8 +107,8 @@ type Head struct { // All series addressable by their ID or hash. series *stripeSeries - deletedMtx sync.Mutex - deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until. + walExpiriesMtx sync.Mutex + walExpiries map[chunks.HeadSeriesRef]int // Series no longer in the head, and what WAL segment they must be kept until. // TODO(codesome): Extend MemPostings to return only OOOPostings, Set OOOStatus, ... Like an additional map of ooo postings. postings *index.MemPostings // Postings lists for terms. @@ -340,7 +340,7 @@ func (h *Head) resetInMemoryState() error { h.exemplars = es h.postings = index.NewUnorderedMemPostings() h.tombstones = tombstones.NewMemTombstones() - h.deleted = map[chunks.HeadSeriesRef]int{} + h.walExpiries = map[chunks.HeadSeriesRef]int{} h.chunkRange.Store(h.opts.ChunkRange) h.minTime.Store(math.MaxInt64) h.maxTime.Store(math.MinInt64) @@ -1262,19 +1262,19 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) return false, false, 0 } -func (h *Head) checkDeleted(id chunks.HeadSeriesRef) (int, bool) { - h.deletedMtx.Lock() - defer h.deletedMtx.Unlock() +func (h *Head) checkWALExpiry(id chunks.HeadSeriesRef) (int, bool) { + h.walExpiriesMtx.Lock() + defer h.walExpiriesMtx.Unlock() - keepUntil, ok := h.deleted[id] + keepUntil, ok := h.walExpiries[id] return keepUntil, ok } -func (h *Head) markDeleted(id chunks.HeadSeriesRef, keepUntil int) { - h.deletedMtx.Lock() - defer h.deletedMtx.Unlock() +func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int) { + h.walExpiriesMtx.Lock() + defer h.walExpiriesMtx.Unlock() - h.deleted[id] = keepUntil + h.walExpiries[id] = keepUntil } // keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint @@ -1285,8 +1285,8 @@ func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool return true } - // Keep the record if the series was recently deleted. - keepUntil, ok := h.checkDeleted(id) + // Keep the record if the series has an expiry set. + keepUntil, ok := h.checkWALExpiry(id) return ok && keepUntil > last } @@ -1339,15 +1339,15 @@ func (h *Head) truncateWAL(mint int64) error { h.logger.Error("truncating segments failed", "err", err) } - // The checkpoint is written and segments before it is truncated, so we no - // longer need to track deleted series that are before it. - h.deletedMtx.Lock() - for ref, segment := range h.deleted { + // The checkpoint is written and segments before it is truncated, so stop + // tracking expired series. + h.walExpiriesMtx.Lock() + for ref, segment := range h.walExpiries { if segment <= last { - delete(h.deleted, ref) + delete(h.walExpiries, ref) } } - h.deletedMtx.Unlock() + h.walExpiriesMtx.Unlock() h.metrics.checkpointDeleteTotal.Inc() if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil { @@ -1614,7 +1614,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { if h.wal != nil { _, last, _ := wlog.Segments(h.wal.Dir()) - h.deletedMtx.Lock() + h.walExpiriesMtx.Lock() // Keep series records until we're past segment 'last' // because the WAL will still have samples records with // this ref ID. If we didn't keep these series records then @@ -1622,9 +1622,9 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // that reads the WAL, wouldn't be able to use those // samples since we would have no labels for that ref ID. for ref := range deleted { - h.deleted[chunks.HeadSeriesRef(ref)] = last + h.walExpiries[chunks.HeadSeriesRef(ref)] = last } - h.deletedMtx.Unlock() + h.walExpiriesMtx.Unlock() } return actualInOrderMint, minOOOTime, minMmapFile diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 1595249639..5435985234 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -728,14 +728,14 @@ func TestHead_ReadWAL(t *testing.T) { // Duplicate series record should not be written to the head. require.Nil(t, s101) - // But it should be marked as deleted. - keepUntil, ok := head.checkDeleted(101) + // But it should have a WAL expiry set. + keepUntil, ok := head.checkWALExpiry(101) require.True(t, ok) _, last, err := wlog.Segments(w.Dir()) require.NoError(t, err) require.Equal(t, last, keepUntil) - // Only the duplicate series record should be marked as deleted. - _, ok = head.checkDeleted(50) + // Only the duplicate series record should have a WAL expiry set. + _, ok = head.checkWALExpiry(50) require.False(t, ok) expandChunk := func(c chunkenc.Iterator) (x []sample) { @@ -852,7 +852,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { { name: "keep deleted series with keepUntil > last", prepare: func(_ *testing.T, h *Head) { - h.markDeleted(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) + h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) }, seriesRef: chunks.HeadSeriesRef(existingRef), last: deletedKeepUntil - 1, @@ -861,7 +861,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { { name: "drop deleted series with keepUntil <= last", prepare: func(_ *testing.T, h *Head) { - h.markDeleted(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) + h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) }, seriesRef: chunks.HeadSeriesRef(existingRef), last: deletedKeepUntil, diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 447ad23a17..651d529029 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -243,7 +243,7 @@ Outer: if !created { multiRef[walSeries.Ref] = mSeries.ref // Mark the walSeries as deleted, so it is kept in subsequent WAL checkpoints. - h.markDeleted(walSeries.Ref, last) + h.setWALExpiry(walSeries.Ref, last) } idx := uint64(mSeries.ref) % uint64(concurrency)