diff --git a/tsdb/head.go b/tsdb/head.go index 8b3d9787c..d5f7144fd 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -310,12 +310,22 @@ func (h *Head) resetInMemoryState() error { return err } + if h.series != nil { + // reset the existing series to make sure we call the appropriated hooks + // and increment the series removed metrics + fs := h.series.iterForDeletion(func(_ int, _ uint64, s *memSeries, flushedForCallback map[chunks.HeadSeriesRef]labels.Labels) { + // All series should be flushed + flushedForCallback[s.ref] = s.lset + }) + h.metrics.seriesRemoved.Add(float64(fs)) + } + + h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback) h.iso = newIsolation(h.opts.IsolationDisabled) h.oooIso = newOOOIsolation() - + h.numSeries.Store(0) h.exemplarMetrics = em h.exemplars = es - h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback) h.postings = index.NewUnorderedMemPostings() h.tombstones = tombstones.NewMemTombstones() h.deleted = map[chunks.HeadSeriesRef]int{} @@ -1861,11 +1871,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st // minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series. func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) { var ( - deleted = map[storage.SeriesRef]struct{}{} - rmChunks = 0 - actualMint int64 = math.MaxInt64 - minOOOTime int64 = math.MaxInt64 - deletedFromPrevStripe = 0 + deleted = map[storage.SeriesRef]struct{}{} + rmChunks = 0 + actualMint int64 = math.MaxInt64 + minOOOTime int64 = math.MaxInt64 ) minMmapFile = math.MaxInt32 @@ -1923,27 +1932,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( deletedForCallback[series.ref] = series.lset } - // Run through all series shard by shard, checking which should be deleted. - for i := 0; i < s.size; i++ { - deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe) - s.locks[i].Lock() - - // Delete conflicts first so seriesHashmap.del doesn't move them to the `unique` field, - // after deleting `unique`. - for hash, all := range s.hashes[i].conflicts { - for _, series := range all { - check(i, hash, series, deletedForCallback) - } - } - for hash, series := range s.hashes[i].unique { - check(i, hash, series, deletedForCallback) - } - - s.locks[i].Unlock() - - s.seriesLifecycleCallback.PostDeletion(deletedForCallback) - deletedFromPrevStripe = len(deletedForCallback) - } + s.iterForDeletion(check) if actualMint == math.MaxInt64 { actualMint = mint @@ -1952,6 +1941,35 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( return deleted, rmChunks, actualMint, minOOOTime, minMmapFile } +// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each. +// The checkDeletedFunc takes a map as input and should add to it all series that were deleted and should be included +// when invoking the PostDeletion hook. +func (s *stripeSeries) iterForDeletion(checkDeletedFunc func(int, uint64, *memSeries, map[chunks.HeadSeriesRef]labels.Labels)) int { + seriesSetFromPrevStripe := 0 + totalDeletedSeries := 0 + // Run through all series shard by shard + for i := 0; i < s.size; i++ { + seriesSet := make(map[chunks.HeadSeriesRef]labels.Labels, seriesSetFromPrevStripe) + s.locks[i].Lock() + // Iterate conflicts first so f doesn't move them to the `unique` field, + // after deleting `unique`. + for hash, all := range s.hashes[i].conflicts { + for _, series := range all { + checkDeletedFunc(i, hash, series, seriesSet) + } + } + + for hash, series := range s.hashes[i].unique { + checkDeletedFunc(i, hash, series, seriesSet) + } + s.locks[i].Unlock() + s.seriesLifecycleCallback.PostDeletion(seriesSet) + totalDeletedSeries += len(seriesSet) + seriesSetFromPrevStripe = len(seriesSet) + } + return totalDeletedSeries +} + func (s *stripeSeries) getByID(id chunks.HeadSeriesRef) *memSeries { i := uint64(id) & uint64(s.size-1) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 804886ad7..6b4ec4ca4 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -4007,6 +4007,9 @@ func TestSnapshotError(t *testing.T) { require.NoError(t, err) f, err := os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0) require.NoError(t, err) + // Create snapshot backup to be restored on future test cases. + snapshotBackup, err := io.ReadAll(f) + require.NoError(t, err) _, err = f.WriteAt([]byte{0b11111111}, 18) require.NoError(t, err) require.NoError(t, f.Close()) @@ -4021,10 +4024,44 @@ func TestSnapshotError(t *testing.T) { // There should be no series in the memory after snapshot error since WAL was removed. require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal)) + require.Equal(t, uint64(0), head.NumSeries()) require.Nil(t, head.series.getByHash(lbls.Hash(), lbls)) tm, err = head.tombstones.Get(1) require.NoError(t, err) require.Empty(t, tm) + require.NoError(t, head.Close()) + + // Test corruption in the middle of the snapshot. + f, err = os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0) + require.NoError(t, err) + _, err = f.WriteAt(snapshotBackup, 0) + require.NoError(t, err) + _, err = f.WriteAt([]byte{0b11111111}, 300) + require.NoError(t, err) + require.NoError(t, f.Close()) + + c := &countSeriesLifecycleCallback{} + opts := head.opts + opts.SeriesCallback = c + + w, err = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + require.NoError(t, err) + head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(math.MinInt64)) + + // There should be no series in the memory after snapshot error since WAL was removed. + require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal)) + require.Nil(t, head.series.getByHash(lbls.Hash(), lbls)) + require.Equal(t, uint64(0), head.NumSeries()) + + // Since the snapshot could replay certain series, we continue invoking the create hooks. + // In such instances, we need to ensure that we also trigger the delete hooks when resetting the memory. + require.Equal(t, int64(2), c.created.Load()) + require.Equal(t, int64(2), c.deleted.Load()) + + require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.seriesRemoved)) + require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.seriesCreated)) } func TestHistogramMetrics(t *testing.T) { @@ -5829,3 +5866,14 @@ func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) { require.False(t, head.compactable()) } + +type countSeriesLifecycleCallback struct { + created atomic.Int64 + deleted atomic.Int64 +} + +func (c *countSeriesLifecycleCallback) PreCreation(labels.Labels) error { return nil } +func (c *countSeriesLifecycleCallback) PostCreation(labels.Labels) { c.created.Inc() } +func (c *countSeriesLifecycleCallback) PostDeletion(s map[chunks.HeadSeriesRef]labels.Labels) { + c.deleted.Add(int64(len(s))) +}