diff --git a/CHANGELOG.md b/CHANGELOG.md index 89d540eaf2..468854d9e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## unreleased +* [BUGFIX] TSDB: fix unknown series errors and possible lost data during WAL replay when series are removed from the head due to inactivity and reappear before the next WAL checkpoint. #16060 + ## 3.2.1 / 2025-02-25 * [BUGFIX] Don't send Accept` header `escape=allow-utf-8` when `metric_name_validation_scheme: legacy` is configured. #16061 diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index cf4a977288..38e6927ffb 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -624,6 +624,19 @@ Loop: } } +// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint +// last is the last WAL segment that was considered for checkpointing. +func (db *DB) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool { + // Keep the record if the series exists in the db. + if db.series.GetByID(id) != nil { + return true + } + + // Keep the record if the series was recently deleted. + seg, ok := db.deleted[id] + return ok && seg > last +} + func (db *DB) truncate(mint int64) error { db.mtx.RLock() defer db.mtx.RUnlock() @@ -656,18 +669,9 @@ func (db *DB) truncate(mint int64) error { return nil } - keep := func(id chunks.HeadSeriesRef) bool { - if db.series.GetByID(id) != nil { - return true - } - - seg, ok := db.deleted[id] - return ok && seg > last - } - db.metrics.checkpointCreationTotal.Inc() - if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil { + if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpoint, mint); err != nil { db.metrics.checkpointCreationFail.Inc() var cerr *wlog.CorruptionErr if errors.As(err, &cerr) { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 4811923e7c..37efe40036 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1553,7 +1553,7 @@ func TestSizeRetention(t *testing.T) { // Create a WAL checkpoint, and compare sizes. first, last, err := wlog.Segments(db.Head().wal.Dir()) require.NoError(t, err) - _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(_ chunks.HeadSeriesRef) bool { return false }, 0) + _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(_ chunks.HeadSeriesRef, _ int) bool { return false }, 0) require.NoError(t, err) blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. walSize, err = db.Head().wal.Size() @@ -4723,7 +4723,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) { // Let's create a checkpoint. first, last, err := wlog.Segments(w.Dir()) require.NoError(t, err) - keep := func(id chunks.HeadSeriesRef) bool { + keep := func(id chunks.HeadSeriesRef, _ int) bool { return id != 3 } _, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0) diff --git a/tsdb/head.go b/tsdb/head.go index 67fdbab646..4f4e4febfc 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -107,8 +107,8 @@ type Head struct { // All series addressable by their ID or hash. series *stripeSeries - deletedMtx sync.Mutex - deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until. + walExpiriesMtx sync.Mutex + walExpiries map[chunks.HeadSeriesRef]int // Series no longer in the head, and what WAL segment they must be kept until. // TODO(codesome): Extend MemPostings to return only OOOPostings, Set OOOStatus, ... Like an additional map of ooo postings. postings *index.MemPostings // Postings lists for terms. @@ -340,7 +340,7 @@ func (h *Head) resetInMemoryState() error { h.exemplars = es h.postings = index.NewUnorderedMemPostings() h.tombstones = tombstones.NewMemTombstones() - h.deleted = map[chunks.HeadSeriesRef]int{} + h.walExpiries = map[chunks.HeadSeriesRef]int{} h.chunkRange.Store(h.opts.ChunkRange) h.minTime.Store(math.MaxInt64) h.maxTime.Store(math.MinInt64) @@ -762,7 +762,7 @@ func (h *Head) Init(minValidTime int64) error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil { + if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt); err != nil { return fmt.Errorf("backfill checkpoint: %w", err) } h.updateWALReplayStatusRead(startFrom) @@ -795,7 +795,7 @@ func (h *Head) Init(minValidTime int64) error { if err != nil { return fmt.Errorf("segment reader (offset=%d): %w", offset, err) } - err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks) + err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt) if err := sr.Close(); err != nil { h.logger.Warn("Error while closing the wal segments reader", "err", err) } @@ -1262,6 +1262,34 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) return false, false, 0 } +func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int, bool) { + h.walExpiriesMtx.Lock() + defer h.walExpiriesMtx.Unlock() + + keepUntil, ok := h.walExpiries[id] + return keepUntil, ok +} + +func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int) { + h.walExpiriesMtx.Lock() + defer h.walExpiriesMtx.Unlock() + + h.walExpiries[id] = keepUntil +} + +// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint +// last is the last WAL segment that was considered for checkpointing. +func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool { + // Keep the record if the series exists in the head. + if h.series.getByID(id) != nil { + return true + } + + // Keep the record if the series has an expiry set. + keepUntil, ok := h.getWALExpiry(id) + return ok && keepUntil > last +} + // truncateWAL removes old data before mint from the WAL. func (h *Head) truncateWAL(mint int64) error { h.chunkSnapshotMtx.Lock() @@ -1295,17 +1323,8 @@ func (h *Head) truncateWAL(mint int64) error { return nil } - keep := func(id chunks.HeadSeriesRef) bool { - if h.series.getByID(id) != nil { - return true - } - h.deletedMtx.Lock() - keepUntil, ok := h.deleted[id] - h.deletedMtx.Unlock() - return ok && keepUntil > last - } h.metrics.checkpointCreationTotal.Inc() - if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil { + if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpoint, mint); err != nil { h.metrics.checkpointCreationFail.Inc() var cerr *chunks.CorruptionErr if errors.As(err, &cerr) { @@ -1320,15 +1339,15 @@ func (h *Head) truncateWAL(mint int64) error { h.logger.Error("truncating segments failed", "err", err) } - // The checkpoint is written and segments before it is truncated, so we no - // longer need to track deleted series that are before it. - h.deletedMtx.Lock() - for ref, segment := range h.deleted { + // The checkpoint is written and segments before it is truncated, so stop + // tracking expired series. + h.walExpiriesMtx.Lock() + for ref, segment := range h.walExpiries { if segment <= last { - delete(h.deleted, ref) + delete(h.walExpiries, ref) } } - h.deletedMtx.Unlock() + h.walExpiriesMtx.Unlock() h.metrics.checkpointDeleteTotal.Inc() if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil { @@ -1595,7 +1614,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { if h.wal != nil { _, last, _ := wlog.Segments(h.wal.Dir()) - h.deletedMtx.Lock() + h.walExpiriesMtx.Lock() // Keep series records until we're past segment 'last' // because the WAL will still have samples records with // this ref ID. If we didn't keep these series records then @@ -1603,9 +1622,9 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // that reads the WAL, wouldn't be able to use those // samples since we would have no labels for that ref ID. for ref := range deleted { - h.deleted[chunks.HeadSeriesRef(ref)] = last + h.walExpiries[chunks.HeadSeriesRef(ref)] = last } - h.deletedMtx.Unlock() + h.walExpiriesMtx.Unlock() } return actualInOrderMint, minOOOTime, minMmapFile diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 065e5ff008..695428619f 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -719,12 +719,25 @@ func TestHead_ReadWAL(t *testing.T) { s11 := head.series.getByID(11) s50 := head.series.getByID(50) s100 := head.series.getByID(100) + s101 := head.series.getByID(101) testutil.RequireEqual(t, labels.FromStrings("a", "1"), s10.lset) require.Nil(t, s11) // Series without samples should be garbage collected at head.Init(). testutil.RequireEqual(t, labels.FromStrings("a", "4"), s50.lset) testutil.RequireEqual(t, labels.FromStrings("a", "3"), s100.lset) + // Duplicate series record should not be written to the head. + require.Nil(t, s101) + // But it should have a WAL expiry set. + keepUntil, ok := head.getWALExpiry(101) + require.True(t, ok) + _, last, err := wlog.Segments(w.Dir()) + require.NoError(t, err) + require.Equal(t, last, keepUntil) + // Only the duplicate series record should have a WAL expiry set. + _, ok = head.getWALExpiry(50) + require.False(t, ok) + expandChunk := func(c chunkenc.Iterator) (x []sample) { for c.Next() == chunkenc.ValFloat { t, v := c.At() @@ -796,7 +809,7 @@ func TestHead_WALMultiRef(t *testing.T) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 - opts.ChunkDirRoot = w.Dir() + opts.ChunkDirRoot = head.opts.ChunkDirRoot head, err = NewHead(nil, nil, w, nil, opts, nil) require.NoError(t, err) require.NoError(t, head.Init(0)) @@ -815,6 +828,64 @@ func TestHead_WALMultiRef(t *testing.T) { }}, series) } +func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { + existingRef := 1 + existingLbls := labels.FromStrings("foo", "bar") + deletedKeepUntil := 10 + + cases := []struct { + name string + prepare func(t *testing.T, h *Head) + seriesRef chunks.HeadSeriesRef + last int + expected bool + }{ + { + name: "keep series still in the head", + prepare: func(t *testing.T, h *Head) { + _, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls) + require.NoError(t, err) + }, + seriesRef: chunks.HeadSeriesRef(existingRef), + expected: true, + }, + { + name: "keep deleted series with keepUntil > last", + prepare: func(_ *testing.T, h *Head) { + h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) + }, + seriesRef: chunks.HeadSeriesRef(existingRef), + last: deletedKeepUntil - 1, + expected: true, + }, + { + name: "drop deleted series with keepUntil <= last", + prepare: func(_ *testing.T, h *Head) { + h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) + }, + seriesRef: chunks.HeadSeriesRef(existingRef), + last: deletedKeepUntil, + expected: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + t.Cleanup(func() { + require.NoError(t, h.Close()) + }) + + if tc.prepare != nil { + tc.prepare(t, h) + } + + kept := h.keepSeriesInWALCheckpoint(tc.seriesRef, tc.last) + require.Equal(t, tc.expected, kept) + }) + } +} + func TestHead_ActiveAppenders(t *testing.T) { head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer head.Close() diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ad03fa4766..9385777aaf 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -50,7 +50,7 @@ type histogramRecord struct { fh *histogram.FloatHistogram } -func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { +func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk, lastSegment int) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs atomic.Uint64 @@ -237,6 +237,8 @@ Outer: } if !created { multiRef[walSeries.Ref] = mSeries.ref + // Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints. + h.setWALExpiry(walSeries.Ref, lastSegment) } idx := uint64(mSeries.ref) % uint64(concurrency) diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 5c607d7030..2c1b0c0534 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -93,7 +93,7 @@ const CheckpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) { +func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef, last int) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sgmReader io.ReadCloser @@ -181,7 +181,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Drop irrelevant series in place. repl := series[:0] for _, s := range series { - if keep(s.Ref) { + if keep(s.Ref, to) { repl = append(repl, s) } } @@ -323,7 +323,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Only keep reference to the latest found metadata for each refID. repl := 0 for _, m := range metadata { - if keep(m.Ref) { + if keep(m.Ref, to) { if _, ok := latestMetadataMap[m.Ref]; !ok { repl++ } diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index a052de9258..047b89790b 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -291,7 +291,7 @@ func TestCheckpoint(t *testing.T) { } require.NoError(t, w.Close()) - stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef) bool { + stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef, _ int) bool { return x%2 == 0 }, last/2) require.NoError(t, err) diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 786912704e..0e22bc50a7 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -399,7 +399,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { } } - Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(_ chunks.HeadSeriesRef) bool { return true }, 0) + Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0) w.Truncate(1) // Write more records after checkpointing. @@ -490,7 +490,7 @@ func TestReadCheckpoint(t *testing.T) { } _, err = w.NextSegmentSync() require.NoError(t, err) - _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(_ chunks.HeadSeriesRef) bool { return true }, 0) + _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0) require.NoError(t, err) require.NoError(t, w.Truncate(32)) @@ -653,7 +653,7 @@ func TestCheckpointSeriesReset(t *testing.T) { return wt.checkNumSeries() == seriesCount }, 10*time.Second, 1*time.Second) - _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(_ chunks.HeadSeriesRef) bool { return true }, 0) + _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0) require.NoError(t, err) err = w.Truncate(5)