From 328a74ca3692f58bad99a9d01bc8857203aa6d86 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Tue, 17 Aug 2021 22:38:16 +0530 Subject: [PATCH] Fix bugs and add enhancements to the chunk snapshot (#9185) Signed-off-by: Ganesh Vernekar --- tsdb/head.go | 94 ++++++++++++++++++++++++++++++++--------------- tsdb/head_test.go | 81 ++++++++++++++++++++++++++++++++++++++-- tsdb/head_wal.go | 10 +++++ 3 files changed, 151 insertions(+), 34 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 5add368b4b..3c7ab291f5 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -15,6 +15,7 @@ package tsdb import ( "fmt" + "io" "math" "path/filepath" "sync" @@ -173,28 +174,14 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti opts.SeriesCallback = &noopSeriesLifecycleCallback{} } - em := NewExemplarMetrics(r) - es, err := NewCircularExemplarStorage(opts.MaxExemplars.Load(), em) - if err != nil { - return nil, err - } - if stats == nil { stats = NewHeadStats() } h := &Head{ - wal: wal, - logger: l, - opts: opts, - exemplarMetrics: em, - exemplars: es, - series: newStripeSeries(opts.StripeSize, opts.SeriesCallback), - symbols: map[string]struct{}{}, - postings: index.NewUnorderedMemPostings(), - tombstones: tombstones.NewMemTombstones(), - iso: newIsolation(), - deleted: map[uint64]int{}, + wal: wal, + logger: l, + opts: opts, memChunkPool: sync.Pool{ New: func() interface{} { return &memChunk{} @@ -203,11 +190,9 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti stats: stats, reg: r, } - h.chunkRange.Store(opts.ChunkRange) - h.minTime.Store(math.MaxInt64) - h.maxTime.Store(math.MinInt64) - h.lastWALTruncationTime.Store(math.MinInt64) - h.lastMemoryTruncationTime.Store(math.MinInt64) + if err := h.resetInMemoryState(); err != nil { + return nil, err + } h.metrics = newHeadMetrics(h, r) if opts.ChunkPool == nil { @@ -226,6 +211,30 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti return h, nil } +func (h *Head) resetInMemoryState() error { + var err error + em := NewExemplarMetrics(h.reg) + es, err := NewCircularExemplarStorage(h.opts.MaxExemplars.Load(), em) + if err != nil { + return err + } + + h.exemplarMetrics = em + h.exemplars = es + h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback) + h.symbols = map[string]struct{}{} + h.postings = index.NewUnorderedMemPostings() + h.tombstones = tombstones.NewMemTombstones() + h.iso = newIsolation() + h.deleted = map[uint64]int{} + h.chunkRange.Store(h.opts.ChunkRange) + h.minTime.Store(math.MaxInt64) + h.maxTime.Store(math.MinInt64) + h.lastWALTruncationTime.Store(math.MinInt64) + h.lastMemoryTruncationTime.Store(math.MinInt64) + return nil +} + type headMetrics struct { activeAppenders prometheus.Gauge series prometheus.GaugeFunc @@ -249,6 +258,7 @@ type headMetrics struct { checkpointCreationFail prometheus.Counter checkpointCreationTotal prometheus.Counter mmapChunkCorruptionTotal prometheus.Counter + snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1. } func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { @@ -343,6 +353,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_mmap_chunk_corruptions_total", Help: "Total number of memory-mapped chunk corruptions.", }), + snapshotReplayErrorTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_snapshot_replay_error_total", + Help: "Total number snapshot replays that failed.", + }), } if r != nil { @@ -369,6 +383,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.checkpointCreationFail, m.checkpointCreationTotal, m.mmapChunkCorruptionTotal, + m.snapshotReplayErrorTotal, // Metrics bound to functions and not needed in tests // can be created and registered on the spot. prometheus.NewGaugeFunc(prometheus.GaugeOpts{ @@ -439,7 +454,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second // Init loads data from the write ahead log and prepares the head for writes. // It should be called before using an appender so that it // limits the ingested samples to the head min valid time. -func (h *Head) Init(minValidTime int64) error { +func (h *Head) Init(minValidTime int64) (err error) { h.minValidTime.Store(minValidTime) defer h.postings.EnsureOrder() defer h.gc() // After loading the wal remove the obsolete data from the head. @@ -454,11 +469,23 @@ func (h *Head) Init(minValidTime int64) error { level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any") start := time.Now() - snapIdx, snapOffset, refSeries, err := h.loadChunkSnapshot() - if err != nil { - return err + snapIdx, snapOffset := -1, 0 + refSeries := make(map[uint64]*memSeries) + + if h.opts.EnableMemorySnapshotOnShutdown { + level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot") + snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot() + if err != nil { + snapIdx, snapOffset = -1, 0 + h.metrics.snapshotReplayErrorTotal.Inc() + level.Error(h.logger).Log("msg", "Failed to load chunk snapshot", "err", err) + // We clear the partially loaded data to replay fresh from the WAL. + if err := h.resetInMemoryState(); err != nil { + return err + } + } + level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String()) } - level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String()) mmapChunkReplayStart := time.Now() mmappedChunks, err := h.loadMmappedChunks(refSeries) @@ -535,6 +562,10 @@ func (h *Head) Init(minValidTime int64) error { offset = snapOffset } sr, err := wal.NewSegmentBufReaderWithOffset(offset, s) + if errors.Cause(err) == io.EOF { + // File does not exist. + continue + } if err != nil { return errors.Wrapf(err, "segment reader (offset=%d)", offset) } @@ -1478,10 +1509,13 @@ func (s *memSeries) minTime() int64 { func (s *memSeries) maxTime() int64 { c := s.head() - if c == nil { - return math.MinInt64 + if c != nil { + return c.maxTime } - return c.maxTime + if len(s.mmappedChunks) > 0 { + return s.mmappedChunks[len(s.mmappedChunks)-1].maxTime + } + return math.MinInt64 } // truncateChunksBefore removes all chunks from the series that diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 911cf28201..dfaf468440 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -21,6 +21,7 @@ import ( "math" "math/rand" "os" + "path" "path/filepath" "sort" "strconv" @@ -2504,12 +2505,18 @@ func TestChunkSnapshot(t *testing.T) { for i := 1; i <= numSeries; i++ { lbls := labels.Labels{labels.Label{Name: "foo", Value: fmt.Sprintf("bar%d", i)}} lblStr := lbls.String() - // 240 samples should m-map at least 1 chunk. - for ts := int64(1); ts <= 240; ts++ { + // Should m-map at least 1 chunk. + for ts := int64(1); ts <= 200; ts++ { val := rand.Float64() expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val}) _, err := app.Append(0, lbls, ts, val) require.NoError(t, err) + + // To create multiple WAL records. + if ts%10 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } } } require.NoError(t, app.Commit()) @@ -2581,12 +2588,18 @@ func TestChunkSnapshot(t *testing.T) { for i := 1; i <= numSeries; i++ { lbls := labels.Labels{labels.Label{Name: "foo", Value: fmt.Sprintf("bar%d", i)}} lblStr := lbls.String() - // 240 samples should m-map at least 1 chunk. - for ts := int64(241); ts <= 480; ts++ { + // Should m-map at least 1 chunk. + for ts := int64(201); ts <= 400; ts++ { val := rand.Float64() expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val}) _, err := app.Append(0, lbls, ts, val) require.NoError(t, err) + + // To create multiple WAL records. + if ts%10 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } } } require.NoError(t, app.Commit()) @@ -2624,6 +2637,7 @@ func TestChunkSnapshot(t *testing.T) { // Create new Head to replay snapshot, m-map chunks, and WAL. w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) require.NoError(t, err) + head.opts.EnableMemorySnapshotOnShutdown = true // Enabled to read from snapshot. head, err = NewHead(nil, nil, w, head.opts, nil) require.NoError(t, err) require.NoError(t, head.Init(math.MinInt64)) @@ -2647,3 +2661,62 @@ func TestChunkSnapshot(t *testing.T) { require.Equal(t, expTombstones, actTombstones) } } + +func TestSnapshotError(t *testing.T) { + head, _ := newTestHead(t, 120*4, false) + defer func() { + head.opts.EnableMemorySnapshotOnShutdown = false + require.NoError(t, head.Close()) + }() + + // Add a sample. + app := head.Appender(context.Background()) + lbls := labels.Labels{labels.Label{Name: "foo", Value: "bar"}} + _, err := app.Append(0, lbls, 99, 99) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Add some tombstones. + itvs := tombstones.Intervals{ + {Mint: 1234, Maxt: 2345}, + {Mint: 3456, Maxt: 4567}, + } + head.tombstones.AddInterval(1, itvs...) + + // Check existance of data. + require.NotNil(t, head.series.getByHash(lbls.Hash(), lbls)) + tm, err := head.tombstones.Get(1) + require.NoError(t, err) + require.NotEqual(t, 0, len(tm)) + + head.opts.EnableMemorySnapshotOnShutdown = true + require.NoError(t, head.Close()) // This will create a snapshot. + + // Remove the WAL so that we don't load from it. + require.NoError(t, os.RemoveAll(head.wal.Dir())) + + // Corrupt the snapshot. + snapDir, _, _, err := LastChunkSnapshot(head.opts.ChunkDirRoot) + require.NoError(t, err) + files, err := ioutil.ReadDir(snapDir) + require.NoError(t, err) + f, err := os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0) + require.NoError(t, err) + _, err = f.WriteAt([]byte{0b11111111}, 18) + require.NoError(t, err) + require.NoError(t, f.Close()) + + // Create new Head which should replay this snapshot. + w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(math.MinInt64)) + + // There should be no series in the memory after snapshot error since WAL was removed. + require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal)) + require.Nil(t, head.series.getByHash(lbls.Hash(), lbls)) + tm, err = head.tombstones.Get(1) + require.NoError(t, err) + require.Equal(t, 0, len(tm)) +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 506a64de97..744c0bdbc0 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -735,6 +735,8 @@ func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error { return errs.Err() } +// loadChunkSnapshot replays the chunk snapshot and restores the Head state from it. If there was any error returned, +// it is the responsibility of the caller to clear the contents of the Head. func (h *Head) loadChunkSnapshot() (int, int, map[uint64]*memSeries, error) { dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot) if err != nil { @@ -849,6 +851,10 @@ Outer: loopErr = errors.Wrap(err, "iterate tombstones") break Outer } + default: + // This is a record type we don't understand. It is either and old format from earlier versions, + // or a new format and the code was rolled back to old version. + loopErr = errors.Errorf("unsuported snapshot record type 0b%b", rec[0]) } } @@ -864,6 +870,10 @@ Outer: return -1, -1, nil, err } + if r.Err() != nil { + return -1, -1, nil, errors.Wrap(r.Err(), "read records") + } + refSeries := make(map[uint64]*memSeries, numSeries) for _, shard := range shardedRefSeries { for k, v := range shard {