From 61aa82865d9c8474393bbbdcd539c58f63ba514f Mon Sep 17 00:00:00 2001 From: Patryk Prus
Date: Wed, 5 Mar 2025 13:45:08 -0500 Subject: [PATCH] TSDB: keep duplicate series records in checkpoints while their samples may still be present (#16060) Renames the head's deleted map to walExpiries, and creates entries for any duplicate series records encountered during WAL replay, with the expiry set to the highest current WAL segment number. Any subsequent WAL checkpoints will see the duplicate series entry in the walExpiries map, and keep the series record until the last WAL segment that could contain its samples is deleted. Other considerations: WBL: series records aren't written to the WBL, so there are no duplicates to deal with agent mode: has its own WAL replay logic that handles duplicate series records differently, and is outside the scope of this PR --- CHANGELOG.md | 2 + tsdb/agent/db.go | 24 +++++++----- tsdb/db_test.go | 4 +- tsdb/head.go | 67 +++++++++++++++++++++------------ tsdb/head_test.go | 73 +++++++++++++++++++++++++++++++++++- tsdb/head_wal.go | 4 +- tsdb/wlog/checkpoint.go | 6 +-- tsdb/wlog/checkpoint_test.go | 2 +- tsdb/wlog/watcher_test.go | 6 +-- 9 files changed, 143 insertions(+), 45 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89d540eaf2..468854d9e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## unreleased +* [BUGFIX] TSDB: fix unknown series errors and possible lost data during WAL replay when series are removed from the head due to inactivity and reappear before the next WAL checkpoint. #16060 + ## 3.2.1 / 2025-02-25 * [BUGFIX] Don't send Accept` header `escape=allow-utf-8` when `metric_name_validation_scheme: legacy` is configured. #16061 diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index cf4a977288..38e6927ffb 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -624,6 +624,19 @@ Loop: } } +// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint +// last is the last WAL segment that was considered for checkpointing. +func (db *DB) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool { + // Keep the record if the series exists in the db. + if db.series.GetByID(id) != nil { + return true + } + + // Keep the record if the series was recently deleted. + seg, ok := db.deleted[id] + return ok && seg > last +} + func (db *DB) truncate(mint int64) error { db.mtx.RLock() defer db.mtx.RUnlock() @@ -656,18 +669,9 @@ func (db *DB) truncate(mint int64) error { return nil } - keep := func(id chunks.HeadSeriesRef) bool { - if db.series.GetByID(id) != nil { - return true - } - - seg, ok := db.deleted[id] - return ok && seg > last - } - db.metrics.checkpointCreationTotal.Inc() - if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil { + if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpoint, mint); err != nil { db.metrics.checkpointCreationFail.Inc() var cerr *wlog.CorruptionErr if errors.As(err, &cerr) { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 4811923e7c..37efe40036 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1553,7 +1553,7 @@ func TestSizeRetention(t *testing.T) { // Create a WAL checkpoint, and compare sizes. first, last, err := wlog.Segments(db.Head().wal.Dir()) require.NoError(t, err) - _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(_ chunks.HeadSeriesRef) bool { return false }, 0) + _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(_ chunks.HeadSeriesRef, _ int) bool { return false }, 0) require.NoError(t, err) blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. walSize, err = db.Head().wal.Size() @@ -4723,7 +4723,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) { // Let's create a checkpoint. first, last, err := wlog.Segments(w.Dir()) require.NoError(t, err) - keep := func(id chunks.HeadSeriesRef) bool { + keep := func(id chunks.HeadSeriesRef, _ int) bool { return id != 3 } _, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0) diff --git a/tsdb/head.go b/tsdb/head.go index 67fdbab646..4f4e4febfc 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -107,8 +107,8 @@ type Head struct { // All series addressable by their ID or hash. series *stripeSeries - deletedMtx sync.Mutex - deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until. + walExpiriesMtx sync.Mutex + walExpiries map[chunks.HeadSeriesRef]int // Series no longer in the head, and what WAL segment they must be kept until. // TODO(codesome): Extend MemPostings to return only OOOPostings, Set OOOStatus, ... Like an additional map of ooo postings. postings *index.MemPostings // Postings lists for terms. @@ -340,7 +340,7 @@ func (h *Head) resetInMemoryState() error { h.exemplars = es h.postings = index.NewUnorderedMemPostings() h.tombstones = tombstones.NewMemTombstones() - h.deleted = map[chunks.HeadSeriesRef]int{} + h.walExpiries = map[chunks.HeadSeriesRef]int{} h.chunkRange.Store(h.opts.ChunkRange) h.minTime.Store(math.MaxInt64) h.maxTime.Store(math.MinInt64) @@ -762,7 +762,7 @@ func (h *Head) Init(minValidTime int64) error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil { + if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt); err != nil { return fmt.Errorf("backfill checkpoint: %w", err) } h.updateWALReplayStatusRead(startFrom) @@ -795,7 +795,7 @@ func (h *Head) Init(minValidTime int64) error { if err != nil { return fmt.Errorf("segment reader (offset=%d): %w", offset, err) } - err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks) + err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt) if err := sr.Close(); err != nil { h.logger.Warn("Error while closing the wal segments reader", "err", err) } @@ -1262,6 +1262,34 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) return false, false, 0 } +func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int, bool) { + h.walExpiriesMtx.Lock() + defer h.walExpiriesMtx.Unlock() + + keepUntil, ok := h.walExpiries[id] + return keepUntil, ok +} + +func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int) { + h.walExpiriesMtx.Lock() + defer h.walExpiriesMtx.Unlock() + + h.walExpiries[id] = keepUntil +} + +// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint +// last is the last WAL segment that was considered for checkpointing. +func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool { + // Keep the record if the series exists in the head. + if h.series.getByID(id) != nil { + return true + } + + // Keep the record if the series has an expiry set. + keepUntil, ok := h.getWALExpiry(id) + return ok && keepUntil > last +} + // truncateWAL removes old data before mint from the WAL. func (h *Head) truncateWAL(mint int64) error { h.chunkSnapshotMtx.Lock() @@ -1295,17 +1323,8 @@ func (h *Head) truncateWAL(mint int64) error { return nil } - keep := func(id chunks.HeadSeriesRef) bool { - if h.series.getByID(id) != nil { - return true - } - h.deletedMtx.Lock() - keepUntil, ok := h.deleted[id] - h.deletedMtx.Unlock() - return ok && keepUntil > last - } h.metrics.checkpointCreationTotal.Inc() - if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil { + if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpoint, mint); err != nil { h.metrics.checkpointCreationFail.Inc() var cerr *chunks.CorruptionErr if errors.As(err, &cerr) { @@ -1320,15 +1339,15 @@ func (h *Head) truncateWAL(mint int64) error { h.logger.Error("truncating segments failed", "err", err) } - // The checkpoint is written and segments before it is truncated, so we no - // longer need to track deleted series that are before it. - h.deletedMtx.Lock() - for ref, segment := range h.deleted { + // The checkpoint is written and segments before it is truncated, so stop + // tracking expired series. + h.walExpiriesMtx.Lock() + for ref, segment := range h.walExpiries { if segment <= last { - delete(h.deleted, ref) + delete(h.walExpiries, ref) } } - h.deletedMtx.Unlock() + h.walExpiriesMtx.Unlock() h.metrics.checkpointDeleteTotal.Inc() if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil { @@ -1595,7 +1614,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { if h.wal != nil { _, last, _ := wlog.Segments(h.wal.Dir()) - h.deletedMtx.Lock() + h.walExpiriesMtx.Lock() // Keep series records until we're past segment 'last' // because the WAL will still have samples records with // this ref ID. If we didn't keep these series records then @@ -1603,9 +1622,9 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // that reads the WAL, wouldn't be able to use those // samples since we would have no labels for that ref ID. for ref := range deleted { - h.deleted[chunks.HeadSeriesRef(ref)] = last + h.walExpiries[chunks.HeadSeriesRef(ref)] = last } - h.deletedMtx.Unlock() + h.walExpiriesMtx.Unlock() } return actualInOrderMint, minOOOTime, minMmapFile diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 065e5ff008..695428619f 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -719,12 +719,25 @@ func TestHead_ReadWAL(t *testing.T) { s11 := head.series.getByID(11) s50 := head.series.getByID(50) s100 := head.series.getByID(100) + s101 := head.series.getByID(101) testutil.RequireEqual(t, labels.FromStrings("a", "1"), s10.lset) require.Nil(t, s11) // Series without samples should be garbage collected at head.Init(). testutil.RequireEqual(t, labels.FromStrings("a", "4"), s50.lset) testutil.RequireEqual(t, labels.FromStrings("a", "3"), s100.lset) + // Duplicate series record should not be written to the head. + require.Nil(t, s101) + // But it should have a WAL expiry set. + keepUntil, ok := head.getWALExpiry(101) + require.True(t, ok) + _, last, err := wlog.Segments(w.Dir()) + require.NoError(t, err) + require.Equal(t, last, keepUntil) + // Only the duplicate series record should have a WAL expiry set. + _, ok = head.getWALExpiry(50) + require.False(t, ok) + expandChunk := func(c chunkenc.Iterator) (x []sample) { for c.Next() == chunkenc.ValFloat { t, v := c.At() @@ -796,7 +809,7 @@ func TestHead_WALMultiRef(t *testing.T) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 - opts.ChunkDirRoot = w.Dir() + opts.ChunkDirRoot = head.opts.ChunkDirRoot head, err = NewHead(nil, nil, w, nil, opts, nil) require.NoError(t, err) require.NoError(t, head.Init(0)) @@ -815,6 +828,64 @@ func TestHead_WALMultiRef(t *testing.T) { }}, series) } +func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { + existingRef := 1 + existingLbls := labels.FromStrings("foo", "bar") + deletedKeepUntil := 10 + + cases := []struct { + name string + prepare func(t *testing.T, h *Head) + seriesRef chunks.HeadSeriesRef + last int + expected bool + }{ + { + name: "keep series still in the head", + prepare: func(t *testing.T, h *Head) { + _, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls) + require.NoError(t, err) + }, + seriesRef: chunks.HeadSeriesRef(existingRef), + expected: true, + }, + { + name: "keep deleted series with keepUntil > last", + prepare: func(_ *testing.T, h *Head) { + h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) + }, + seriesRef: chunks.HeadSeriesRef(existingRef), + last: deletedKeepUntil - 1, + expected: true, + }, + { + name: "drop deleted series with keepUntil <= last", + prepare: func(_ *testing.T, h *Head) { + h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) + }, + seriesRef: chunks.HeadSeriesRef(existingRef), + last: deletedKeepUntil, + expected: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + t.Cleanup(func() { + require.NoError(t, h.Close()) + }) + + if tc.prepare != nil { + tc.prepare(t, h) + } + + kept := h.keepSeriesInWALCheckpoint(tc.seriesRef, tc.last) + require.Equal(t, tc.expected, kept) + }) + } +} + func TestHead_ActiveAppenders(t *testing.T) { head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer head.Close() diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ad03fa4766..9385777aaf 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -50,7 +50,7 @@ type histogramRecord struct { fh *histogram.FloatHistogram } -func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { +func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk, lastSegment int) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs atomic.Uint64 @@ -237,6 +237,8 @@ Outer: } if !created { multiRef[walSeries.Ref] = mSeries.ref + // Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints. + h.setWALExpiry(walSeries.Ref, lastSegment) } idx := uint64(mSeries.ref) % uint64(concurrency) diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 5c607d7030..2c1b0c0534 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -93,7 +93,7 @@ const CheckpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) { +func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef, last int) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sgmReader io.ReadCloser @@ -181,7 +181,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Drop irrelevant series in place. repl := series[:0] for _, s := range series { - if keep(s.Ref) { + if keep(s.Ref, to) { repl = append(repl, s) } } @@ -323,7 +323,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Only keep reference to the latest found metadata for each refID. repl := 0 for _, m := range metadata { - if keep(m.Ref) { + if keep(m.Ref, to) { if _, ok := latestMetadataMap[m.Ref]; !ok { repl++ } diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index a052de9258..047b89790b 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -291,7 +291,7 @@ func TestCheckpoint(t *testing.T) { } require.NoError(t, w.Close()) - stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef) bool { + stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef, _ int) bool { return x%2 == 0 }, last/2) require.NoError(t, err) diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 786912704e..0e22bc50a7 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -399,7 +399,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { } } - Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(_ chunks.HeadSeriesRef) bool { return true }, 0) + Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0) w.Truncate(1) // Write more records after checkpointing. @@ -490,7 +490,7 @@ func TestReadCheckpoint(t *testing.T) { } _, err = w.NextSegmentSync() require.NoError(t, err) - _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(_ chunks.HeadSeriesRef) bool { return true }, 0) + _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0) require.NoError(t, err) require.NoError(t, w.Truncate(32)) @@ -653,7 +653,7 @@ func TestCheckpointSeriesReset(t *testing.T) { return wt.checkNumSeries() == seriesCount }, 10*time.Second, 1*time.Second) - _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(_ chunks.HeadSeriesRef) bool { return true }, 0) + _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0) require.NoError(t, err) err = w.Truncate(5)