TSDB: keep duplicate series records in checkpoints while their samples may still be present (#16060)
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled

Renames the head's deleted map to walExpiries, and creates entries for any
duplicate series records encountered during WAL replay, with the expiry set
to the highest current WAL segment number. Any subsequent WAL
checkpoints will see the duplicate series entry in the walExpiries map, and
keep the series record until the last WAL segment that could contain its
samples is deleted.

Other considerations:

WBL: series records aren't written to the WBL, so there are no duplicates to deal with
agent mode: has its own WAL replay logic that handles duplicate series records differently, and is outside the scope of this PR
This commit is contained in:
Patryk Prus 2025-03-05 13:45:08 -05:00 committed by GitHub
parent 7cbf749096
commit 61aa82865d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 143 additions and 45 deletions

View file

@ -2,6 +2,8 @@
## unreleased
* [BUGFIX] TSDB: fix unknown series errors and possible lost data during WAL replay when series are removed from the head due to inactivity and reappear before the next WAL checkpoint. #16060
## 3.2.1 / 2025-02-25
* [BUGFIX] Don't send Accept` header `escape=allow-utf-8` when `metric_name_validation_scheme: legacy` is configured. #16061

View file

@ -624,6 +624,19 @@ Loop:
}
}
// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint
// last is the last WAL segment that was considered for checkpointing.
func (db *DB) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool {
// Keep the record if the series exists in the db.
if db.series.GetByID(id) != nil {
return true
}
// Keep the record if the series was recently deleted.
seg, ok := db.deleted[id]
return ok && seg > last
}
func (db *DB) truncate(mint int64) error {
db.mtx.RLock()
defer db.mtx.RUnlock()
@ -656,18 +669,9 @@ func (db *DB) truncate(mint int64) error {
return nil
}
keep := func(id chunks.HeadSeriesRef) bool {
if db.series.GetByID(id) != nil {
return true
}
seg, ok := db.deleted[id]
return ok && seg > last
}
db.metrics.checkpointCreationTotal.Inc()
if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil {
if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpoint, mint); err != nil {
db.metrics.checkpointCreationFail.Inc()
var cerr *wlog.CorruptionErr
if errors.As(err, &cerr) {

View file

@ -1553,7 +1553,7 @@ func TestSizeRetention(t *testing.T) {
// Create a WAL checkpoint, and compare sizes.
first, last, err := wlog.Segments(db.Head().wal.Dir())
require.NoError(t, err)
_, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(_ chunks.HeadSeriesRef) bool { return false }, 0)
_, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(_ chunks.HeadSeriesRef, _ int) bool { return false }, 0)
require.NoError(t, err)
blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics.
walSize, err = db.Head().wal.Size()
@ -4723,7 +4723,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) {
// Let's create a checkpoint.
first, last, err := wlog.Segments(w.Dir())
require.NoError(t, err)
keep := func(id chunks.HeadSeriesRef) bool {
keep := func(id chunks.HeadSeriesRef, _ int) bool {
return id != 3
}
_, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0)

View file

@ -107,8 +107,8 @@ type Head struct {
// All series addressable by their ID or hash.
series *stripeSeries
deletedMtx sync.Mutex
deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until.
walExpiriesMtx sync.Mutex
walExpiries map[chunks.HeadSeriesRef]int // Series no longer in the head, and what WAL segment they must be kept until.
// TODO(codesome): Extend MemPostings to return only OOOPostings, Set OOOStatus, ... Like an additional map of ooo postings.
postings *index.MemPostings // Postings lists for terms.
@ -340,7 +340,7 @@ func (h *Head) resetInMemoryState() error {
h.exemplars = es
h.postings = index.NewUnorderedMemPostings()
h.tombstones = tombstones.NewMemTombstones()
h.deleted = map[chunks.HeadSeriesRef]int{}
h.walExpiries = map[chunks.HeadSeriesRef]int{}
h.chunkRange.Store(h.opts.ChunkRange)
h.minTime.Store(math.MaxInt64)
h.maxTime.Store(math.MinInt64)
@ -762,7 +762,7 @@ func (h *Head) Init(minValidTime int64) error {
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil {
if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt); err != nil {
return fmt.Errorf("backfill checkpoint: %w", err)
}
h.updateWALReplayStatusRead(startFrom)
@ -795,7 +795,7 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil {
return fmt.Errorf("segment reader (offset=%d): %w", offset, err)
}
err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks)
err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt)
if err := sr.Close(); err != nil {
h.logger.Warn("Error while closing the wal segments reader", "err", err)
}
@ -1262,6 +1262,34 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64)
return false, false, 0
}
func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int, bool) {
h.walExpiriesMtx.Lock()
defer h.walExpiriesMtx.Unlock()
keepUntil, ok := h.walExpiries[id]
return keepUntil, ok
}
func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int) {
h.walExpiriesMtx.Lock()
defer h.walExpiriesMtx.Unlock()
h.walExpiries[id] = keepUntil
}
// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint
// last is the last WAL segment that was considered for checkpointing.
func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool {
// Keep the record if the series exists in the head.
if h.series.getByID(id) != nil {
return true
}
// Keep the record if the series has an expiry set.
keepUntil, ok := h.getWALExpiry(id)
return ok && keepUntil > last
}
// truncateWAL removes old data before mint from the WAL.
func (h *Head) truncateWAL(mint int64) error {
h.chunkSnapshotMtx.Lock()
@ -1295,17 +1323,8 @@ func (h *Head) truncateWAL(mint int64) error {
return nil
}
keep := func(id chunks.HeadSeriesRef) bool {
if h.series.getByID(id) != nil {
return true
}
h.deletedMtx.Lock()
keepUntil, ok := h.deleted[id]
h.deletedMtx.Unlock()
return ok && keepUntil > last
}
h.metrics.checkpointCreationTotal.Inc()
if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil {
if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpoint, mint); err != nil {
h.metrics.checkpointCreationFail.Inc()
var cerr *chunks.CorruptionErr
if errors.As(err, &cerr) {
@ -1320,15 +1339,15 @@ func (h *Head) truncateWAL(mint int64) error {
h.logger.Error("truncating segments failed", "err", err)
}
// The checkpoint is written and segments before it is truncated, so we no
// longer need to track deleted series that are before it.
h.deletedMtx.Lock()
for ref, segment := range h.deleted {
// The checkpoint is written and segments before it is truncated, so stop
// tracking expired series.
h.walExpiriesMtx.Lock()
for ref, segment := range h.walExpiries {
if segment <= last {
delete(h.deleted, ref)
delete(h.walExpiries, ref)
}
}
h.deletedMtx.Unlock()
h.walExpiriesMtx.Unlock()
h.metrics.checkpointDeleteTotal.Inc()
if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil {
@ -1595,7 +1614,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
if h.wal != nil {
_, last, _ := wlog.Segments(h.wal.Dir())
h.deletedMtx.Lock()
h.walExpiriesMtx.Lock()
// Keep series records until we're past segment 'last'
// because the WAL will still have samples records with
// this ref ID. If we didn't keep these series records then
@ -1603,9 +1622,9 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
// that reads the WAL, wouldn't be able to use those
// samples since we would have no labels for that ref ID.
for ref := range deleted {
h.deleted[chunks.HeadSeriesRef(ref)] = last
h.walExpiries[chunks.HeadSeriesRef(ref)] = last
}
h.deletedMtx.Unlock()
h.walExpiriesMtx.Unlock()
}
return actualInOrderMint, minOOOTime, minMmapFile

View file

@ -719,12 +719,25 @@ func TestHead_ReadWAL(t *testing.T) {
s11 := head.series.getByID(11)
s50 := head.series.getByID(50)
s100 := head.series.getByID(100)
s101 := head.series.getByID(101)
testutil.RequireEqual(t, labels.FromStrings("a", "1"), s10.lset)
require.Nil(t, s11) // Series without samples should be garbage collected at head.Init().
testutil.RequireEqual(t, labels.FromStrings("a", "4"), s50.lset)
testutil.RequireEqual(t, labels.FromStrings("a", "3"), s100.lset)
// Duplicate series record should not be written to the head.
require.Nil(t, s101)
// But it should have a WAL expiry set.
keepUntil, ok := head.getWALExpiry(101)
require.True(t, ok)
_, last, err := wlog.Segments(w.Dir())
require.NoError(t, err)
require.Equal(t, last, keepUntil)
// Only the duplicate series record should have a WAL expiry set.
_, ok = head.getWALExpiry(50)
require.False(t, ok)
expandChunk := func(c chunkenc.Iterator) (x []sample) {
for c.Next() == chunkenc.ValFloat {
t, v := c.At()
@ -796,7 +809,7 @@ func TestHead_WALMultiRef(t *testing.T) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = w.Dir()
opts.ChunkDirRoot = head.opts.ChunkDirRoot
head, err = NewHead(nil, nil, w, nil, opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(0))
@ -815,6 +828,64 @@ func TestHead_WALMultiRef(t *testing.T) {
}}, series)
}
func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
existingRef := 1
existingLbls := labels.FromStrings("foo", "bar")
deletedKeepUntil := 10
cases := []struct {
name string
prepare func(t *testing.T, h *Head)
seriesRef chunks.HeadSeriesRef
last int
expected bool
}{
{
name: "keep series still in the head",
prepare: func(t *testing.T, h *Head) {
_, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls)
require.NoError(t, err)
},
seriesRef: chunks.HeadSeriesRef(existingRef),
expected: true,
},
{
name: "keep deleted series with keepUntil > last",
prepare: func(_ *testing.T, h *Head) {
h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil)
},
seriesRef: chunks.HeadSeriesRef(existingRef),
last: deletedKeepUntil - 1,
expected: true,
},
{
name: "drop deleted series with keepUntil <= last",
prepare: func(_ *testing.T, h *Head) {
h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil)
},
seriesRef: chunks.HeadSeriesRef(existingRef),
last: deletedKeepUntil,
expected: false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
t.Cleanup(func() {
require.NoError(t, h.Close())
})
if tc.prepare != nil {
tc.prepare(t, h)
}
kept := h.keepSeriesInWALCheckpoint(tc.seriesRef, tc.last)
require.Equal(t, tc.expected, kept)
})
}
}
func TestHead_ActiveAppenders(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
defer head.Close()

View file

@ -50,7 +50,7 @@ type histogramRecord struct {
fh *histogram.FloatHistogram
}
func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk, lastSegment int) (err error) {
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs atomic.Uint64
@ -237,6 +237,8 @@ Outer:
}
if !created {
multiRef[walSeries.Ref] = mSeries.ref
// Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints.
h.setWALExpiry(walSeries.Ref, lastSegment)
}
idx := uint64(mSeries.ref) % uint64(concurrency)

View file

@ -93,7 +93,7 @@ const CheckpointPrefix = "checkpoint."
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) {
func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef, last int) bool, mint int64) (*CheckpointStats, error) {
stats := &CheckpointStats{}
var sgmReader io.ReadCloser
@ -181,7 +181,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
// Drop irrelevant series in place.
repl := series[:0]
for _, s := range series {
if keep(s.Ref) {
if keep(s.Ref, to) {
repl = append(repl, s)
}
}
@ -323,7 +323,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
// Only keep reference to the latest found metadata for each refID.
repl := 0
for _, m := range metadata {
if keep(m.Ref) {
if keep(m.Ref, to) {
if _, ok := latestMetadataMap[m.Ref]; !ok {
repl++
}

View file

@ -291,7 +291,7 @@ func TestCheckpoint(t *testing.T) {
}
require.NoError(t, w.Close())
stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef) bool {
stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef, _ int) bool {
return x%2 == 0
}, last/2)
require.NoError(t, err)

View file

@ -399,7 +399,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
}
}
Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(_ chunks.HeadSeriesRef) bool { return true }, 0)
Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0)
w.Truncate(1)
// Write more records after checkpointing.
@ -490,7 +490,7 @@ func TestReadCheckpoint(t *testing.T) {
}
_, err = w.NextSegmentSync()
require.NoError(t, err)
_, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(_ chunks.HeadSeriesRef) bool { return true }, 0)
_, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0)
require.NoError(t, err)
require.NoError(t, w.Truncate(32))
@ -653,7 +653,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
return wt.checkNumSeries() == seriesCount
}, 10*time.Second, 1*time.Second)
_, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(_ chunks.HeadSeriesRef) bool { return true }, 0)
_, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0)
require.NoError(t, err)
err = w.Truncate(5)