From d07b70a2a86ec8c8689bed0f9841f48a74768108 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 26 Apr 2023 16:26:34 -0700 Subject: [PATCH] track the last segment ID for which we appended a sample per series, and use that to know if we can delete a series from the WAL/checkpoint entirely Signed-off-by: Callum Styan --- tsdb/head.go | 49 ++++++++++--------------- tsdb/head_append.go | 6 +++- tsdb/head_test.go | 88 +++++++++++++++++++++++++++++++++++++++++++++ tsdb/wlog/wlog.go | 4 +++ 4 files changed, 115 insertions(+), 32 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 5bd5bbccc..1648d626d 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -64,6 +64,18 @@ var ( defaultWALReplayConcurrency = runtime.GOMAXPROCS(0) ) +type seriesAndSegment struct { + *stripeSeries + segments map[storage.SeriesRef]int +} + +func newSeriesAndSegment(stripeSize int, seriesCallback SeriesLifecycleCallback) seriesAndSegment { + var s seriesAndSegment + s.stripeSeries = newStripeSeries(stripeSize, seriesCallback) + s.segments = make(map[storage.SeriesRef]int) + return s +} + // Head handles reads and writes of time series data within a time window. type Head struct { chunkRange atomic.Int64 @@ -94,7 +106,7 @@ type Head struct { memChunkPool sync.Pool // All series addressable by their ID or hash. - series *stripeSeries + series seriesAndSegment deletedMtx sync.Mutex deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until. @@ -296,7 +308,7 @@ func (h *Head) resetInMemoryState() error { h.exemplarMetrics = em h.exemplars = es - h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback) + h.series = newSeriesAndSegment(h.opts.StripeSize, h.opts.SeriesCallback) h.postings = index.NewUnorderedMemPostings() h.tombstones = tombstones.NewMemTombstones() h.deleted = map[chunks.HeadSeriesRef]int{} @@ -1211,10 +1223,10 @@ func (h *Head) truncateWAL(mint int64) error { if h.series.getByID(id) != nil { return true } - h.deletedMtx.Lock() - _, ok := h.deleted[id] - h.deletedMtx.Unlock() - return ok + if h.series.segments[storage.SeriesRef(id)] > last { + return true + } + return false } h.metrics.checkpointCreationTotal.Inc() if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil { @@ -1231,16 +1243,6 @@ func (h *Head) truncateWAL(mint int64) error { level.Error(h.logger).Log("msg", "truncating segments failed", "err", err) } - // The checkpoint is written and segments before it is truncated, so we no - // longer need to track deleted series that are before it. - h.deletedMtx.Lock() - for ref, segment := range h.deleted { - if segment < first { - delete(h.deleted, ref) - } - } - h.deletedMtx.Unlock() - h.metrics.checkpointDeleteTotal.Inc() if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil { // Leftover old checkpoints do not cause problems down the line beyond @@ -1488,21 +1490,6 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { h.tombstones.DeleteTombstones(deleted) h.tombstones.TruncateBefore(mint) - if h.wal != nil { - _, last, _ := wlog.Segments(h.wal.Dir()) - h.deletedMtx.Lock() - // Keep series records until we're past segment 'last' - // because the WAL will still have samples records with - // this ref ID. If we didn't keep these series records then - // on start up when we replay the WAL, or any other code - // that reads the WAL, wouldn't be able to use those - // samples since we would have no labels for that ref ID. - for ref := range deleted { - h.deleted[chunks.HeadSeriesRef(ref)] = last - } - h.deletedMtx.Unlock() - } - return actualInOrderMint, minOOOTime, minMmapFile } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 86cb09751..0028aaea9 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -48,7 +48,11 @@ func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 a.head.initTime(t) a.app = a.head.appender() - return a.app.Append(ref, lset, t, v) + ref, err := a.app.Append(ref, lset, t, v) + if err != nil { + a.head.series.segments[ref] = a.head.wal.CurrentSegment() + } + return ref, err } func (a *initAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 45b587405..da5c41b97 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1095,6 +1095,94 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { require.Equal(t, 0, metadata) } +func TestDeleteWithSegmentIndexLogic(t *testing.T) { + numSamples := 10000 + + // Enough samples to cause a checkpoint. + hb, w := newTestHead(t, int64(numSamples)*10, false, false) + + for i := 0; i < numSamples; i++ { + app := hb.Appender(context.Background()) + _, err := app.Append(0, labels.FromStrings("a", "b"), int64(i), 0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + require.NoError(t, hb.Truncate(1)) + + // Confirm there's been a checkpoint. + cdir, _, err := wlog.LastCheckpoint(w.Dir()) + require.NoError(t, err) + // Read in checkpoint and WAL. + recs := readTestWAL(t, cdir) + recs = append(recs, readTestWAL(t, w.Dir())...) + + var series, samples, stones, metadata int + for _, rec := range recs { + switch rec.(type) { + case []record.RefSeries: + series++ + case []record.RefSample: + samples++ + case []tombstones.Stone: + stones++ + case []record.RefMetadata: + metadata++ + default: + t.Fatalf("unknown record type") + } + } + require.Equal(t, 1, series) + require.Equal(t, 9999, samples) + require.Equal(t, 0, metadata) + + // lets write some samples for a new series and truncate again to see if deletes work + // as expected + n := time.Now().Unix() + // create a new segment so we avoid samples for the previous series being in the same segment as + // our new series for the purposes of the test + w.NextSegmentSync() + + for i := 0; i < numSamples; i++ { + app := hb.Appender(context.Background()) + _, err := app.Append(0, labels.FromStrings("b", "c"), int64(i)+n, 0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + require.NoError(t, hb.Truncate(n)) + require.NoError(t, hb.Close()) + // Confirm there's been a checkpoint. + cdir, _, err = wlog.LastCheckpoint(w.Dir()) + require.NoError(t, err) + // Read in checkpoint and WAL. + recs = readTestWAL(t, cdir) + recs = append(recs, readTestWAL(t, w.Dir())...) + + // reset the counters + series = 0 + samples = 0 + stones = 0 + metadata = 0 + for _, rec := range recs { + switch rec.(type) { + case []record.RefSeries: + series++ + case []record.RefSample: + samples++ + case []tombstones.Stone: + stones++ + case []record.RefMetadata: + metadata++ + default: + t.Fatalf("unknown record type") + } + } + + require.Equal(t, 1, series) + require.Equal(t, 10000, samples) + require.Equal(t, 0, metadata) +} + func TestDelete_e2e(t *testing.T) { numDatapoints := 1000 numRanges := 1000 diff --git a/tsdb/wlog/wlog.go b/tsdb/wlog/wlog.go index df8bab53f..cc22ba075 100644 --- a/tsdb/wlog/wlog.go +++ b/tsdb/wlog/wlog.go @@ -748,6 +748,10 @@ func (w *WL) LastSegmentAndOffset() (seg, offset int, err error) { return } +func (w *WL) CurrentSegment() int { + return w.segment.i +} + // Truncate drops all segments before i. func (w *WL) Truncate(i int) (err error) { w.metrics.truncateTotal.Inc()