diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 94b63882c..b08ea7a54 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -149,6 +149,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "exemplar-storage": c.tsdb.EnableExemplarStorage = true level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled") + case "memory-snapshot-on-shutdown": + c.tsdb.EnableMemorySnapshotOnShutdown = true + level.Info(logger).Log("msg", "Experimental memory snapshot on shutdown enabled") case "": continue default: @@ -309,7 +312,7 @@ func main() { a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return."). Default("50000000").IntVar(&cfg.queryMaxSamples) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: promql-at-modifier, promql-negative-offset, remote-write-receiver, exemplar-storage, expand-external-labels. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) promlogflag.AddFlags(a, &cfg.promlogConfig) @@ -1263,34 +1266,36 @@ func (rm *readyScrapeManager) Get() (*scrape.Manager, error) { // tsdbOptions is tsdb.Option version with defined units. // This is required as tsdb.Option fields are unit agnostic (time). type tsdbOptions struct { - WALSegmentSize units.Base2Bytes - MaxBlockChunkSegmentSize units.Base2Bytes - RetentionDuration model.Duration - MaxBytes units.Base2Bytes - NoLockfile bool - AllowOverlappingBlocks bool - WALCompression bool - StripeSize int - MinBlockDuration model.Duration - MaxBlockDuration model.Duration - EnableExemplarStorage bool - MaxExemplars int64 + WALSegmentSize units.Base2Bytes + MaxBlockChunkSegmentSize units.Base2Bytes + RetentionDuration model.Duration + MaxBytes units.Base2Bytes + NoLockfile bool + AllowOverlappingBlocks bool + WALCompression bool + StripeSize int + MinBlockDuration model.Duration + MaxBlockDuration model.Duration + EnableExemplarStorage bool + MaxExemplars int64 + EnableMemorySnapshotOnShutdown bool } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { return tsdb.Options{ - WALSegmentSize: int(opts.WALSegmentSize), - MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize), - RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), - MaxBytes: int64(opts.MaxBytes), - NoLockfile: opts.NoLockfile, - AllowOverlappingBlocks: opts.AllowOverlappingBlocks, - WALCompression: opts.WALCompression, - StripeSize: opts.StripeSize, - MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), - MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), - EnableExemplarStorage: opts.EnableExemplarStorage, - MaxExemplars: opts.MaxExemplars, + WALSegmentSize: int(opts.WALSegmentSize), + MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize), + RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), + MaxBytes: int64(opts.MaxBytes), + NoLockfile: opts.NoLockfile, + AllowOverlappingBlocks: opts.AllowOverlappingBlocks, + WALCompression: opts.WALCompression, + StripeSize: opts.StripeSize, + MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), + MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), + EnableExemplarStorage: opts.EnableExemplarStorage, + MaxExemplars: opts.MaxExemplars, + EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown, } } diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 58d8189c3..e3ab40a64 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -40,7 +40,7 @@ with more recent data. More details can be found [here](querying/basics.md#offset-modifier). -## Remote Write Receiver +## Remote Write Receiver `--enable-feature=remote-write-receiver` @@ -53,3 +53,11 @@ The remote write receiver allows Prometheus to accept remote write requests from [OpenMetrics](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars) introduces the ability for scrape targets to add exemplars to certain metrics. Exemplars are references to data outside of the MetricSet. A common use case are IDs of program traces. Exemplar storage is implemented as a fixed size circular buffer that stores exemplars in memory for all series. Enabling this feature will enable the storage of exemplars scraped by Prometheus. The flag `storage.exemplars.exemplars-limit` can be used to control the size of circular buffer by # of exemplars. An exemplar with just a `traceID=` uses roughly 100 bytes of memory via the in-memory exemplar storage. If the exemplar storage is enabled, we will also append the exemplars to WAL for local persistence (for WAL duration). + +## Memory Snapshot on Shutdown + +`--enable-feature=memory-snapshot-on-shutdown` + +This takes the snapshot of the chunks that are in memory along with the series information when shutting down and stores +it on disk. This will reduce the startup time since the memory state can be restored with this snapshot and m-mapped +chunks without the need of WAL replay. diff --git a/tsdb/db.go b/tsdb/db.go index 9d17b406b..046fb5271 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -151,6 +151,9 @@ type Options struct { // Enables the in memory exemplar storage,. EnableExemplarStorage bool + // Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster. + EnableMemorySnapshotOnShutdown bool + // MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory. // See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage. MaxExemplars int64 @@ -722,6 +725,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.SeriesCallback = opts.SeriesLifecycleCallback headOpts.EnableExemplarStorage = opts.EnableExemplarStorage headOpts.MaxExemplars.Store(opts.MaxExemplars) + headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown db.head, err = NewHead(r, l, wlog, headOpts, stats.Head) if err != nil { return nil, err diff --git a/tsdb/docs/format/README.md b/tsdb/docs/format/README.md index 5bd3d9882..6d088c734 100644 --- a/tsdb/docs/format/README.md +++ b/tsdb/docs/format/README.md @@ -5,3 +5,4 @@ * [Head Chunks](head_chunks.md) * [Tombstones](tombstones.md) * [Wal](wal.md) +* [Memory Snapshot](memory_snapshot.md) diff --git a/tsdb/docs/format/memory_snapshot.md b/tsdb/docs/format/memory_snapshot.md new file mode 100644 index 000000000..b6c53cd8d --- /dev/null +++ b/tsdb/docs/format/memory_snapshot.md @@ -0,0 +1,62 @@ +# Memory Snapshot Format + +Memory snapshot uses the WAL package and writes each series as a WAL record. +Below are the formats of the individual records. + +### Series records + +This record is a snapshot of a single series. Only one series exists per record. +It includes the metadata of the series and the in-memory chunk data if it exists. +The sampleBuf is the last 4 samples in the in-memory chunk. + +``` +┌──────────────────────────┬────────────────────────────┐ +│ Record Type │ Series Ref │ +├──────────────────────────┴────────────────────────────┤ +│ Number of Labels │ +├──────────────────────────────┬────────────────────────┤ +│ len(name_1) │ name_1 │ +├──────────────────────────────┼────────────────────────┤ +│ len(val_1) │ val_1 │ +├──────────────────────────────┴────────────────────────┤ +│ . . . │ +├──────────────────────────────┬────────────────────────┤ +│ len(name_N) │ name_N │ +├──────────────────────────────┼────────────────────────┤ +│ len(val_N) │ val_N │ +├──────────────────────────────┴────────────────────────┤ +│ Chunk Range │ +├───────────────────────────────────────────────────────┤ +│ Chunk Exists │ +│ # 1 if head chunk exists, 0 otherwise to detect a nil | +| # chunk. Below fields exists only when it's 1 here. | +├───────────────────────────┬───────────────────────────┤ +│ Chunk Mint │ Chunk Maxt │ +├───────────────────────────┴───────────────────────────┤ +│ Chunk Encoding │ +├──────────────────────────────┬────────────────────────┤ +│ len(Chunk) │ Chunk │ +├──────────────────────────┬───┴────────────────────────┤ +| sampleBuf[0].t | sampleBuf[0].v | +├──────────────────────────┼────────────────────────────┤ +| sampleBuf[1].t | sampleBuf[1].v | +├──────────────────────────┼────────────────────────────┤ +| sampleBuf[2].t | sampleBuf[2].v | +├──────────────────────────┼────────────────────────────┤ +| sampleBuf[3].t | sampleBuf[3].v | +└──────────────────────────┴────────────────────────────┘ +``` + +### Tombstone record + +This includes all the tombstones in the Head block. A single record is written into +the snapshot for all the tombstones. The encoded tombstones uses the same encoding +as tombstone file in blocks. + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Record Type │ +├───────────────────────────────────┬─────────────────────────────┤ +│ len(Encoded Tombstones) │ Encoded Tombstones │ +└───────────────────────────────────┴─────────────────────────────┘ +``` diff --git a/tsdb/encoding/encoding.go b/tsdb/encoding/encoding.go index 54ab8ff36..8a94ff7ba 100644 --- a/tsdb/encoding/encoding.go +++ b/tsdb/encoding/encoding.go @@ -17,6 +17,7 @@ import ( "encoding/binary" "hash" "hash/crc32" + "math" "unsafe" "github.com/dennwc/varint" @@ -40,6 +41,7 @@ func (e *Encbuf) Len() int { return len(e.B) } func (e *Encbuf) PutString(s string) { e.B = append(e.B, s...) } func (e *Encbuf) PutByte(c byte) { e.B = append(e.B, c) } +func (e *Encbuf) PutBytes(b []byte) { e.B = append(e.B, b...) } func (e *Encbuf) PutBE32int(x int) { e.PutBE32(uint32(x)) } func (e *Encbuf) PutUvarint32(x uint32) { e.PutUvarint64(uint64(x)) } @@ -56,6 +58,10 @@ func (e *Encbuf) PutBE64(x uint64) { e.B = append(e.B, e.C[:8]...) } +func (e *Encbuf) PutBEFloat64(x float64) { + e.PutBE64(math.Float64bits(x)) +} + func (e *Encbuf) PutUvarint64(x uint64) { n := binary.PutUvarint(e.C[:], x) e.B = append(e.B, e.C[:n]...) @@ -73,6 +79,12 @@ func (e *Encbuf) PutUvarintStr(s string) { e.PutString(s) } +// PutUvarintBytes writes a a variable length byte buffer. +func (e *Encbuf) PutUvarintBytes(b []byte) { + e.PutUvarint(len(b)) + e.PutBytes(b) +} + // PutHash appends a hash over the buffers current contents to the buffer. func (e *Encbuf) PutHash(h hash.Hash) { h.Reset() @@ -249,6 +261,10 @@ func (d *Decbuf) Be64() uint64 { return x } +func (d *Decbuf) Be64Float64() float64 { + return math.Float64frombits(d.Be64()) +} + func (d *Decbuf) Be32() uint32 { if d.E != nil { return 0 diff --git a/tsdb/head.go b/tsdb/head.go index 8291b0352..de66987f6 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -97,6 +97,8 @@ type Head struct { // chunkDiskMapper is used to write and read Head chunks to/from disk. chunkDiskMapper *chunks.ChunkDiskMapper + chunkSnapshotMtx sync.Mutex + closedMtx sync.Mutex closed bool @@ -122,9 +124,10 @@ type HeadOptions struct { // StripeSize sets the number of entries in the hash map, it must be a power of 2. // A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. - StripeSize int - SeriesCallback SeriesLifecycleCallback - EnableExemplarStorage bool + StripeSize int + SeriesCallback SeriesLifecycleCallback + EnableExemplarStorage bool + EnableMemorySnapshotOnShutdown bool // Runtime reloadable options. MaxExemplars atomic.Int64 @@ -439,11 +442,25 @@ func (h *Head) Init(minValidTime int64) error { h.minValidTime.Store(minValidTime) defer h.postings.EnsureOrder() defer h.gc() // After loading the wal remove the obsolete data from the head. + defer func() { + // Loading of m-mapped chunks and snapshot can make the mint of the Head + // to go below minValidTime. + if h.MinTime() < h.minValidTime.Load() { + h.minTime.Store(h.minValidTime.Load()) + } + }() level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any") start := time.Now() - mmappedChunks, err := h.loadMmappedChunks() + snapIdx, snapOffset, refSeries, err := h.loadChunkSnapshot() + if err != nil { + return err + } + level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String()) + + mmapChunkReplayStart := time.Now() + mmappedChunks, err := h.loadMmappedChunks(refSeries) if err != nil { level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err) if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok { @@ -451,10 +468,10 @@ func (h *Head) Init(minValidTime int64) error { } // If this fails, data will be recovered from WAL. // Hence we wont lose any data (given WAL is not corrupt). - mmappedChunks = h.removeCorruptedMmappedChunks(err) + mmappedChunks = h.removeCorruptedMmappedChunks(err, refSeries) } - level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(start).String()) + level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String()) if h.wal == nil { level.Info(h.logger).Log("msg", "WAL not found") return nil @@ -502,6 +519,9 @@ func (h *Head) Init(minValidTime int64) error { walReplayStart := time.Now() + if snapIdx > startFrom { + startFrom = snapIdx + } // Backfill segments from the most recent checkpoint onwards. for i := startFrom; i <= endAt; i++ { s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i)) @@ -509,7 +529,14 @@ func (h *Head) Init(minValidTime int64) error { return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) } - sr := wal.NewSegmentBufReader(s) + offset := 0 + if i == snapIdx { + offset = snapOffset + } + sr, err := wal.NewSegmentBufReaderWithOffset(offset, s) + if err != nil { + return errors.Wrapf(err, "segment reader (offset=%d)", offset) + } err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks) if err := sr.Close(); err != nil { level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err) @@ -533,29 +560,49 @@ func (h *Head) Init(minValidTime int64) error { return nil } -func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) { +func (h *Head) loadMmappedChunks(refSeries map[uint64]*memSeries) (map[uint64][]*mmappedChunk, error) { mmappedChunks := map[uint64][]*mmappedChunk{} if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error { if maxt < h.minValidTime.Load() { return nil } - - slice := mmappedChunks[seriesRef] - if len(slice) > 0 { - if slice[len(slice)-1].maxTime >= mint { - return &chunks.CorruptionErr{ - Err: errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef), - } + ms, ok := refSeries[seriesRef] + if !ok { + slice := mmappedChunks[seriesRef] + if len(slice) > 0 && slice[len(slice)-1].maxTime >= mint { + return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef) } + + slice = append(slice, &mmappedChunk{ + ref: chunkRef, + minTime: mint, + maxTime: maxt, + numSamples: numSamples, + }) + mmappedChunks[seriesRef] = slice + return nil } - slice = append(slice, &mmappedChunk{ + if len(ms.mmappedChunks) > 0 && ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime >= mint { + return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef) + } + + h.metrics.chunks.Inc() + h.metrics.chunksCreated.Inc() + ms.mmappedChunks = append(ms.mmappedChunks, &mmappedChunk{ ref: chunkRef, minTime: mint, maxTime: maxt, numSamples: numSamples, }) - mmappedChunks[seriesRef] = slice + h.updateMinMaxTime(mint, maxt) + if ms.headChunk != nil && maxt >= ms.headChunk.minTime { + // The head chunk was completed and was m-mapped after taking the snapshot. + // Hence remove this chunk. + ms.nextAt = 0 + ms.headChunk = nil + ms.app = nil + } return nil }); err != nil { return nil, errors.Wrap(err, "iterate on on-disk chunks") @@ -565,7 +612,7 @@ func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) { // removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously // loaded mmapped chunks. -func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChunk { +func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[uint64]*memSeries) map[uint64][]*mmappedChunk { level.Info(h.logger).Log("msg", "Deleting mmapped chunk files") if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil { @@ -574,7 +621,7 @@ func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChun } level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks") - mmappedChunks, err := h.loadMmappedChunks() + mmappedChunks, err := h.loadMmappedChunks(refSeries) if err != nil { level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err) mmappedChunks = map[uint64][]*mmappedChunk{} @@ -661,6 +708,9 @@ func (h *Head) Truncate(mint int64) (err error) { // truncateMemory removes old data before mint from the head. func (h *Head) truncateMemory(mint int64) (err error) { + h.chunkSnapshotMtx.Lock() + defer h.chunkSnapshotMtx.Unlock() + defer func() { if err != nil { h.metrics.headTruncateFail.Inc() @@ -796,6 +846,9 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) // truncateWAL removes old data before mint from the WAL. func (h *Head) truncateWAL(mint int64) error { + h.chunkSnapshotMtx.Lock() + defer h.chunkSnapshotMtx.Unlock() + if h.wal == nil || mint <= h.lastWALTruncationTime.Load() { return nil } @@ -1095,15 +1148,20 @@ func (h *Head) compactable() bool { } // Close flushes the WAL and closes the head. +// It also takes a snapshot of in-memory chunks if enabled. func (h *Head) Close() error { h.closedMtx.Lock() defer h.closedMtx.Unlock() h.closed = true errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close()) + if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown { + errs.Add(h.performChunkSnapshot()) + } if h.wal != nil { errs.Add(h.wal.Close()) } return errs.Err() + } // String returns an human readable representation of the TSDB head. It's important to diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 85a3ff14a..75f75dd7b 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -468,12 +468,20 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper c = s.cutNewHeadChunk(t, chunkDiskMapper) chunkCreated = true } - numSamples := c.chunk.NumSamples() // Out of order sample. if c.maxTime >= t { return false, chunkCreated } + + numSamples := c.chunk.NumSamples() + if numSamples == 0 { + // It could be the new chunk created after reading the chunk snapshot, + // hence we fix the minTime of the chunk here. + c.minTime = t + s.nextAt = rangeForTimestamp(c.minTime, s.chunkRange) + } + // If we reach 25% of a chunk's desired sample count, predict an end time // for this chunk that will try to make samples equally distributed within // the remaining chunks in the current chunk range. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 68207360e..5513ca645 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2465,3 +2465,163 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) { }) } } + +func TestChunkSnapshot(t *testing.T) { + head, _ := newTestHead(t, 120*4, false) + defer func() { + head.opts.EnableMemorySnapshotOnShutdown = false + require.NoError(t, head.Close()) + }() + + numSeries := 10 + expSeries := make(map[string][]tsdbutil.Sample) + expTombstones := make(map[uint64]tombstones.Intervals) + { // Initial data that goes into snapshot. + // Add some initial samples with >=1 m-map chunk. + app := head.Appender(context.Background()) + for i := 1; i <= numSeries; i++ { + lbls := labels.Labels{labels.Label{Name: "foo", Value: fmt.Sprintf("bar%d", i)}} + lblStr := lbls.String() + // 240 samples should m-map at least 1 chunk. + for ts := int64(1); ts <= 240; ts++ { + val := rand.Float64() + expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val}) + _, err := app.Append(0, lbls, ts, val) + require.NoError(t, err) + } + } + require.NoError(t, app.Commit()) + + // Add some tombstones. + var enc record.Encoder + for i := 1; i <= numSeries; i++ { + ref := uint64(i) + itvs := tombstones.Intervals{ + {Mint: 1234, Maxt: 2345}, + {Mint: 3456, Maxt: 4567}, + } + for _, itv := range itvs { + expTombstones[ref].Add(itv) + } + head.tombstones.AddInterval(ref, itvs...) + err := head.wal.Log(enc.Tombstones([]tombstones.Stone{ + {Ref: ref, Intervals: itvs}, + }, nil)) + require.NoError(t, err) + } + } + + // These references should be the ones used for the snapshot. + wlast, woffset, err := head.wal.LastSegmentAndOffset() + require.NoError(t, err) + + { // Creating snapshot and verifying it. + head.opts.EnableMemorySnapshotOnShutdown = true + require.NoError(t, head.Close()) // This will create a snapshot. + + _, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot) + require.NoError(t, err) + require.Equal(t, wlast, sidx) + require.Equal(t, woffset, soffset) + } + + { // Test the replay of snapshot. + // Create new Head which should replay this snapshot. + w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(math.MinInt64)) + + // Test query for snapshot replay. + q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*")) + require.Equal(t, expSeries, series) + + // Check the tombstones. + tr, err := head.Tombstones() + require.NoError(t, err) + actTombstones := make(map[uint64]tombstones.Intervals) + require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error { + for _, itv := range itvs { + actTombstones[ref].Add(itv) + } + return nil + })) + require.Equal(t, expTombstones, actTombstones) + } + + { // Additional data to only include in WAL and m-mapped chunks and not snapshot. This mimics having an old snapshot on disk. + + // Add more samples. + app := head.Appender(context.Background()) + for i := 1; i <= numSeries; i++ { + lbls := labels.Labels{labels.Label{Name: "foo", Value: fmt.Sprintf("bar%d", i)}} + lblStr := lbls.String() + // 240 samples should m-map at least 1 chunk. + for ts := int64(241); ts <= 480; ts++ { + val := rand.Float64() + expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val}) + _, err := app.Append(0, lbls, ts, val) + require.NoError(t, err) + } + } + require.NoError(t, app.Commit()) + + // Add more tombstones. + var enc record.Encoder + for i := 1; i <= numSeries; i++ { + ref := uint64(i) + itvs := tombstones.Intervals{ + {Mint: 12345, Maxt: 23456}, + {Mint: 34567, Maxt: 45678}, + } + for _, itv := range itvs { + expTombstones[ref].Add(itv) + } + head.tombstones.AddInterval(ref, itvs...) + err := head.wal.Log(enc.Tombstones([]tombstones.Stone{ + {Ref: ref, Intervals: itvs}, + }, nil)) + require.NoError(t, err) + } + } + + { // Close Head and verify that new snapshot was not created. + head.opts.EnableMemorySnapshotOnShutdown = false + require.NoError(t, head.Close()) // This should not create a snapshot. + + _, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot) + require.NoError(t, err) + require.Equal(t, wlast, sidx) + require.Equal(t, woffset, soffset) + } + + { // Test the replay of snapshot, m-map chunks, and WAL. + // Create new Head to replay snapshot, m-map chunks, and WAL. + w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(math.MinInt64)) + + // Test query when data is replayed from snapshot, m-map chunks, and WAL. + q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*")) + require.Equal(t, expSeries, series) + + // Check the tombstones. + tr, err := head.Tombstones() + require.NoError(t, err) + actTombstones := make(map[uint64]tombstones.Intervals) + require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error { + for _, itv := range itvs { + actTombstones[ref].Add(itv) + } + return nil + })) + require.Equal(t, expTombstones, actTombstones) + } +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 056bd288a..9ca209945 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -15,8 +15,18 @@ package tsdb import ( "fmt" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/encoding" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/fileutil" + "io/ioutil" "math" + "os" + "path/filepath" "runtime" + "strconv" + "strings" "sync" "time" @@ -386,3 +396,474 @@ func (h *Head) processWALSamples( return unknownRefs } + +const ( + chunkSnapshotRecordTypeSeries uint8 = 1 + chunkSnapshotRecordTypeTombstones uint8 = 2 +) + +type chunkSnapshotRecord struct { + ref uint64 + lset labels.Labels + chunkRange int64 + mc *memChunk + sampleBuf [4]sample +} + +func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { + buf := encoding.Encbuf{B: b} + + buf.PutByte(chunkSnapshotRecordTypeSeries) + buf.PutBE64(s.ref) + buf.PutUvarint(len(s.lset)) + for _, l := range s.lset { + buf.PutUvarintStr(l.Name) + buf.PutUvarintStr(l.Value) + } + buf.PutBE64int64(s.chunkRange) + + s.Lock() + if s.headChunk == nil { + buf.PutUvarint(0) + } else { + buf.PutUvarint(1) + buf.PutBE64int64(s.headChunk.minTime) + buf.PutBE64int64(s.headChunk.maxTime) + buf.PutByte(byte(s.headChunk.chunk.Encoding())) + buf.PutUvarintBytes(s.headChunk.chunk.Bytes()) + // Put the sample buf. + for _, smpl := range s.sampleBuf { + buf.PutBE64int64(smpl.t) + buf.PutBEFloat64(smpl.v) + } + } + s.Unlock() + + return buf.Get() +} + +func decodeSeriesFromChunkSnapshot(b []byte) (csr chunkSnapshotRecord, err error) { + dec := encoding.Decbuf{B: b} + + if flag := dec.Byte(); flag != chunkSnapshotRecordTypeSeries { + return csr, errors.Errorf("invalid record type %x", flag) + } + + csr.ref = dec.Be64() + + // The label set written to the disk is already sorted. + csr.lset = make(labels.Labels, dec.Uvarint()) + for i := range csr.lset { + csr.lset[i].Name = dec.UvarintStr() + csr.lset[i].Value = dec.UvarintStr() + } + + csr.chunkRange = dec.Be64int64() + if dec.Uvarint() == 0 { + return + } + + csr.mc = &memChunk{} + csr.mc.minTime = dec.Be64int64() + csr.mc.maxTime = dec.Be64int64() + enc := chunkenc.Encoding(dec.Byte()) + + // The underlying bytes gets re-used later, so make a copy. + chunkBytes := dec.UvarintBytes() + chunkBytesCopy := make([]byte, len(chunkBytes)) + copy(chunkBytesCopy, chunkBytes) + + chk, err := chunkenc.FromData(enc, chunkBytesCopy) + if err != nil { + return csr, errors.Wrap(err, "chunk from data") + } + csr.mc.chunk = chk + + for i := range csr.sampleBuf { + csr.sampleBuf[i].t = dec.Be64int64() + csr.sampleBuf[i].v = dec.Be64Float64() + } + + err = dec.Err() + if err != nil && len(dec.B) > 0 { + err = errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) + } + + return +} + +func encodeTombstonesToSnapshotRecord(tr tombstones.Reader) ([]byte, error) { + buf := encoding.Encbuf{} + + buf.PutByte(chunkSnapshotRecordTypeTombstones) + b, err := tombstones.Encode(tr) + if err != nil { + return nil, errors.Wrap(err, "encode tombstones") + } + buf.PutUvarintBytes(b) + + return buf.Get(), nil +} + +func decodeTombstonesSnapshotRecord(b []byte) (tombstones.Reader, error) { + dec := encoding.Decbuf{B: b} + + if flag := dec.Byte(); flag != chunkSnapshotRecordTypeTombstones { + return nil, errors.Errorf("invalid record type %x", flag) + } + + tr, err := tombstones.Decode(dec.UvarintBytes()) + return tr, errors.Wrap(err, "decode tombstones") +} + +const chunkSnapshotPrefix = "chunk_snapshot." + +// ChunkSnapshot creates a snapshot of all the series and tombstones in the head. +// It deletes the old chunk snapshots if the chunk snapshot creation is successful. +// +// The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written +// using the WAL package. N is the last WAL segment present during snapshotting and +// M is the offset in segment N upto which data was written. +func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { + if h.wal == nil { + // If we are not storing any WAL, does not make sense to take a snapshot too. + level.Warn(h.logger).Log("msg", "skipping chunk snapshotting as WAL is disabled") + return &ChunkSnapshotStats{}, nil + } + h.chunkSnapshotMtx.Lock() + defer h.chunkSnapshotMtx.Unlock() + + stats := &ChunkSnapshotStats{} + + wlast, woffset, err := h.wal.LastSegmentAndOffset() + if err != nil && err != record.ErrNotFound { + return stats, errors.Wrap(err, "get last wal segment and offset") + } + + _, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot) + if err != nil && err != record.ErrNotFound { + return stats, errors.Wrap(err, "find last chunk snapshot") + } + + if wlast == cslast && woffset == csoffset { + // Nothing has been written to the WAL/Head since the last snapshot. + return stats, nil + } + + snapshotName := fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset) + + cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName) + cpdirtmp := cpdir + ".tmp" + stats.Dir = cpdir + + if err := os.MkdirAll(cpdirtmp, 0777); err != nil { + return stats, errors.Wrap(err, "create chunk snapshot dir") + } + cp, err := wal.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled()) + if err != nil { + return stats, errors.Wrap(err, "open chunk snapshot") + } + + // Ensures that an early return caused by an error doesn't leave any tmp files. + defer func() { + cp.Close() + os.RemoveAll(cpdirtmp) + }() + + var ( + buf []byte + recs [][]byte + ) + stripeSize := h.series.size + for i := 0; i < stripeSize; i++ { + h.series.locks[i].RLock() + + for _, s := range h.series.series[i] { + start := len(buf) + buf = s.encodeToSnapshotRecord(buf) + if len(buf[start:]) == 0 { + continue // All contents discarded. + } + recs = append(recs, buf[start:]) + // Flush records in 10 MB increments. + if len(buf) > 10*1024*1024 { + if err := cp.Log(recs...); err != nil { + h.series.locks[i].RUnlock() + return stats, errors.Wrap(err, "flush records") + } + buf, recs = buf[:0], recs[:0] + } + } + stats.TotalSeries += len(h.series.series[i]) + + h.series.locks[i].RUnlock() + } + + // Add tombstones to the snapshot. + tombstonesReader, err := h.Tombstones() + if err != nil { + return stats, errors.Wrap(err, "get tombstones") + } + rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader) + if err != nil { + return stats, errors.Wrap(err, "encode tombstones") + } + recs = append(recs, rec) + + // Flush remaining records. + if err := cp.Log(recs...); err != nil { + return stats, errors.Wrap(err, "flush records") + } + if err := cp.Close(); err != nil { + return stats, errors.Wrap(err, "close chunk snapshot") + } + if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { + return stats, errors.Wrap(err, "rename chunk snapshot directory") + } + + if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, cslast, csoffset); err != nil { + // Leftover old chunk snapshots do not cause problems down the line beyond + // occupying disk space. + // They will just be ignored since a higher chunk snapshot exists. + level.Error(h.logger).Log("msg", "delete old chunk snapshots", "err", err) + } + return stats, nil +} + +func (h *Head) performChunkSnapshot() error { + level.Info(h.logger).Log("msg", "creating chunk snapshot") + startTime := time.Now() + stats, err := h.ChunkSnapshot() + elapsed := time.Since(startTime) + if err == nil { + level.Info(h.logger).Log("msg", "chunk snapshot complete", "duration", elapsed.String(), "num_series", stats.TotalSeries, "dir", stats.Dir) + } + return errors.Wrap(err, "chunk snapshot") +} + +// ChunkSnapshotStats returns stats about a created chunk snapshot. +type ChunkSnapshotStats struct { + TotalSeries int + Dir string +} + +// LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot. +// If dir does not contain any chunk snapshots, ErrNotFound is returned. +func LastChunkSnapshot(dir string) (string, int, int, error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return "", 0, 0, err + } + // Traverse list backwards since there may be multiple chunk snapshots left. + for i := len(files) - 1; i >= 0; i-- { + fi := files[i] + + if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) { + continue + } + if !fi.IsDir() { + return "", 0, 0, errors.Errorf("chunk snapshot %s is not a directory", fi.Name()) + } + + splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".") + if len(splits) != 2 { + return "", 0, 0, errors.Errorf("chunk snapshot %s is not in the right format", fi.Name()) + } + + idx, err := strconv.Atoi(splits[0]) + if err != nil { + continue + } + + offset, err := strconv.Atoi(splits[1]) + if err != nil { + continue + } + + return filepath.Join(dir, fi.Name()), idx, offset, nil + } + return "", 0, 0, record.ErrNotFound +} + +// DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index. +func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error { + files, err := ioutil.ReadDir(dir) + if err != nil { + return err + } + + errs := tsdb_errors.NewMulti() + for _, fi := range files { + if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) { + continue + } + + splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".") + if len(splits) != 2 { + continue + } + + idx, err := strconv.Atoi(splits[0]) + if err != nil { + continue + } + + offset, err := strconv.Atoi(splits[1]) + if err != nil { + continue + } + + if idx <= maxIndex && offset < maxOffset { + if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { + errs.Add(err) + } + } + + } + return errs.Err() +} + +func (h *Head) loadChunkSnapshot() (int, int, map[uint64]*memSeries, error) { + dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot) + if err != nil { + if err == record.ErrNotFound { + return snapIdx, snapOffset, nil, nil + } + return snapIdx, snapOffset, nil, errors.Wrap(err, "find last chunk snapshot") + } + + start := time.Now() + sr, err := wal.NewSegmentsReader(dir) + if err != nil { + return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot") + } + defer func() { + if err := sr.Close(); err != nil { + level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err) + } + }() + + var ( + numSeries = 0 + unknownRefs = int64(0) + n = runtime.GOMAXPROCS(0) + wg sync.WaitGroup + recordChan = make(chan chunkSnapshotRecord, 5*n) + shardedRefSeries = make([]map[uint64]*memSeries, n) + errChan = make(chan error, n) + ) + + wg.Add(n) + for i := 0; i < n; i++ { + go func(idx int, rc <-chan chunkSnapshotRecord) { + defer wg.Done() + defer func() { + // If there was an error, drain the channel + // to unblock the main thread. + for range rc { + } + }() + + shardedRefSeries[idx] = make(map[uint64]*memSeries) + localRefSeries := shardedRefSeries[idx] + + for csr := range rc { + series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset) + if err != nil { + errChan <- err + return + } + localRefSeries[csr.ref] = series + if h.lastSeriesID.Load() < series.ref { + h.lastSeriesID.Store(series.ref) + } + + series.chunkRange = csr.chunkRange + if csr.mc == nil { + continue + } + series.nextAt = csr.mc.maxTime // This will create a new chunk on append. + series.headChunk = csr.mc + for i := range series.sampleBuf { + series.sampleBuf[i].t = csr.sampleBuf[i].t + series.sampleBuf[i].v = csr.sampleBuf[i].v + } + + app, err := series.headChunk.chunk.Appender() + if err != nil { + errChan <- err + return + } + series.app = app + + h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime) + } + }(i, recordChan) + } + + r := wal.NewReader(sr) + var loopErr error +Outer: + for r.Next() { + select { + case err := <-errChan: + errChan <- err + break Outer + default: + } + + rec := r.Record() + switch rec[0] { + case chunkSnapshotRecordTypeSeries: + numSeries++ + csr, err := decodeSeriesFromChunkSnapshot(rec) + if err != nil { + loopErr = errors.Wrap(err, "decode series record") + break Outer + } + recordChan <- csr + + case chunkSnapshotRecordTypeTombstones: + tr, err := decodeTombstonesSnapshotRecord(rec) + if err != nil { + loopErr = errors.Wrap(err, "decode tombstones") + break Outer + } + + if err = tr.Iter(func(ref uint64, ivs tombstones.Intervals) error { + h.tombstones.AddInterval(ref, ivs...) + return nil + }); err != nil { + loopErr = errors.Wrap(err, "iterate tombstones") + break Outer + } + } + + } + close(recordChan) + wg.Wait() + + close(errChan) + merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop")) + for err := range errChan { + merr.Add(errors.Wrap(err, "record processing")) + } + if err := merr.Err(); err != nil { + return -1, -1, nil, err + } + + refSeries := make(map[uint64]*memSeries, numSeries) + for _, shard := range shardedRefSeries { + for k, v := range shard { + refSeries[k] = v + } + } + + elapsed := time.Since(start) + level.Info(h.logger).Log("msg", "chunk snapshot loaded", "dir", dir, "num_series", numSeries, "duration", elapsed.String()) + if unknownRefs > 0 { + level.Warn(h.logger).Log("msg", "unknown series references during chunk snapshot replay", "count", unknownRefs) + } + + return snapIdx, snapOffset, refSeries, nil +} diff --git a/tsdb/wal/wal.go b/tsdb/wal/wal.go index a11994157..c5023b0fd 100644 --- a/tsdb/wal/wal.go +++ b/tsdb/wal/wal.go @@ -699,6 +699,22 @@ func (w *WAL) log(rec []byte, final bool) error { return nil } +// LastSegmentAndOffset returns the last segment number of the WAL +// and the offset in that file upto which the segment has been filled. +func (w *WAL) LastSegmentAndOffset() (seg, offset int, err error) { + w.mtx.Lock() + defer w.mtx.Unlock() + + _, seg, err = Segments(w.Dir()) + if err != nil { + return + } + + offset = (w.donePages * pageSize) + w.page.alloc + + return +} + // Truncate drops all segments before i. func (w *WAL) Truncate(i int) (err error) { w.metrics.truncateTotal.Inc() @@ -867,6 +883,21 @@ func NewSegmentBufReader(segs ...*Segment) *segmentBufReader { } } +// nolint:golint +func NewSegmentBufReaderWithOffset(offset int, segs ...*Segment) (sbr *segmentBufReader, err error) { + if offset == 0 { + return NewSegmentBufReader(segs...), nil + } + sbr = &segmentBufReader{ + buf: bufio.NewReaderSize(segs[0], 16*pageSize), + segs: segs, + } + if offset > 0 { + _, err = sbr.buf.Discard(offset) + } + return sbr, err +} + func (r *segmentBufReader) Close() (err error) { for _, s := range r.segs { if e := s.Close(); e != nil {