Snapshot in-memory chunks on shutdown for faster restarts (#7229)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-08-06 22:21:01 +05:30 committed by GitHub
parent 5df37892aa
commit ee7e0071d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 880 additions and 46 deletions

View file

@ -149,6 +149,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "exemplar-storage":
c.tsdb.EnableExemplarStorage = true
level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled")
case "memory-snapshot-on-shutdown":
c.tsdb.EnableMemorySnapshotOnShutdown = true
level.Info(logger).Log("msg", "Experimental memory snapshot on shutdown enabled")
case "":
continue
default:
@ -309,7 +312,7 @@ func main() {
a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return.").
Default("50000000").IntVar(&cfg.queryMaxSamples)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: promql-at-modifier, promql-negative-offset, remote-write-receiver, exemplar-storage, expand-external-labels. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
promlogflag.AddFlags(a, &cfg.promlogConfig)
@ -1263,34 +1266,36 @@ func (rm *readyScrapeManager) Get() (*scrape.Manager, error) {
// tsdbOptions is tsdb.Option version with defined units.
// This is required as tsdb.Option fields are unit agnostic (time).
type tsdbOptions struct {
WALSegmentSize units.Base2Bytes
MaxBlockChunkSegmentSize units.Base2Bytes
RetentionDuration model.Duration
MaxBytes units.Base2Bytes
NoLockfile bool
AllowOverlappingBlocks bool
WALCompression bool
StripeSize int
MinBlockDuration model.Duration
MaxBlockDuration model.Duration
EnableExemplarStorage bool
MaxExemplars int64
WALSegmentSize units.Base2Bytes
MaxBlockChunkSegmentSize units.Base2Bytes
RetentionDuration model.Duration
MaxBytes units.Base2Bytes
NoLockfile bool
AllowOverlappingBlocks bool
WALCompression bool
StripeSize int
MinBlockDuration model.Duration
MaxBlockDuration model.Duration
EnableExemplarStorage bool
MaxExemplars int64
EnableMemorySnapshotOnShutdown bool
}
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
return tsdb.Options{
WALSegmentSize: int(opts.WALSegmentSize),
MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize),
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
MaxBytes: int64(opts.MaxBytes),
NoLockfile: opts.NoLockfile,
AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
WALCompression: opts.WALCompression,
StripeSize: opts.StripeSize,
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
EnableExemplarStorage: opts.EnableExemplarStorage,
MaxExemplars: opts.MaxExemplars,
WALSegmentSize: int(opts.WALSegmentSize),
MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize),
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
MaxBytes: int64(opts.MaxBytes),
NoLockfile: opts.NoLockfile,
AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
WALCompression: opts.WALCompression,
StripeSize: opts.StripeSize,
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
EnableExemplarStorage: opts.EnableExemplarStorage,
MaxExemplars: opts.MaxExemplars,
EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown,
}
}

View file

@ -40,7 +40,7 @@ with more recent data.
More details can be found [here](querying/basics.md#offset-modifier).
## Remote Write Receiver
## Remote Write Receiver
`--enable-feature=remote-write-receiver`
@ -53,3 +53,11 @@ The remote write receiver allows Prometheus to accept remote write requests from
[OpenMetrics](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars) introduces the ability for scrape targets to add exemplars to certain metrics. Exemplars are references to data outside of the MetricSet. A common use case are IDs of program traces.
Exemplar storage is implemented as a fixed size circular buffer that stores exemplars in memory for all series. Enabling this feature will enable the storage of exemplars scraped by Prometheus. The flag `storage.exemplars.exemplars-limit` can be used to control the size of circular buffer by # of exemplars. An exemplar with just a `traceID=<jaeger-trace-id>` uses roughly 100 bytes of memory via the in-memory exemplar storage. If the exemplar storage is enabled, we will also append the exemplars to WAL for local persistence (for WAL duration).
## Memory Snapshot on Shutdown
`--enable-feature=memory-snapshot-on-shutdown`
This takes the snapshot of the chunks that are in memory along with the series information when shutting down and stores
it on disk. This will reduce the startup time since the memory state can be restored with this snapshot and m-mapped
chunks without the need of WAL replay.

View file

@ -151,6 +151,9 @@ type Options struct {
// Enables the in memory exemplar storage,.
EnableExemplarStorage bool
// Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster.
EnableMemorySnapshotOnShutdown bool
// MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory.
// See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage.
MaxExemplars int64
@ -722,6 +725,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.SeriesCallback = opts.SeriesLifecycleCallback
headOpts.EnableExemplarStorage = opts.EnableExemplarStorage
headOpts.MaxExemplars.Store(opts.MaxExemplars)
headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown
db.head, err = NewHead(r, l, wlog, headOpts, stats.Head)
if err != nil {
return nil, err

View file

@ -5,3 +5,4 @@
* [Head Chunks](head_chunks.md)
* [Tombstones](tombstones.md)
* [Wal](wal.md)
* [Memory Snapshot](memory_snapshot.md)

View file

@ -0,0 +1,62 @@
# Memory Snapshot Format
Memory snapshot uses the WAL package and writes each series as a WAL record.
Below are the formats of the individual records.
### Series records
This record is a snapshot of a single series. Only one series exists per record.
It includes the metadata of the series and the in-memory chunk data if it exists.
The sampleBuf is the last 4 samples in the in-memory chunk.
```
┌──────────────────────────┬────────────────────────────┐
│ Record Type <byte> │ Series Ref <uint64>
├──────────────────────────┴────────────────────────────┤
│ Number of Labels <uvarint>
├──────────────────────────────┬────────────────────────┤
│ len(name_1) <uvarint> │ name_1 <bytes>
├──────────────────────────────┼────────────────────────┤
│ len(val_1) <uvarint> │ val_1 <bytes>
├──────────────────────────────┴────────────────────────┤
│ . . . │
├──────────────────────────────┬────────────────────────┤
│ len(name_N) <uvarint> │ name_N <bytes>
├──────────────────────────────┼────────────────────────┤
│ len(val_N) <uvarint> │ val_N <bytes>
├──────────────────────────────┴────────────────────────┤
│ Chunk Range <int64>
├───────────────────────────────────────────────────────┤
│ Chunk Exists <uvarint>
│ # 1 if head chunk exists, 0 otherwise to detect a nil |
| # chunk. Below fields exists only when it's 1 here. |
├───────────────────────────┬───────────────────────────┤
│ Chunk Mint <int64> │ Chunk Maxt <int64>
├───────────────────────────┴───────────────────────────┤
│ Chunk Encoding <byte>
├──────────────────────────────┬────────────────────────┤
│ len(Chunk) <uvarint> │ Chunk <bytes>
├──────────────────────────┬───┴────────────────────────┤
| sampleBuf[0].t <int64> | sampleBuf[0].v <float64> |
├──────────────────────────┼────────────────────────────┤
| sampleBuf[1].t <int64> | sampleBuf[1].v <float64> |
├──────────────────────────┼────────────────────────────┤
| sampleBuf[2].t <int64> | sampleBuf[2].v <float64> |
├──────────────────────────┼────────────────────────────┤
| sampleBuf[3].t <int64> | sampleBuf[3].v <float64> |
└──────────────────────────┴────────────────────────────┘
```
### Tombstone record
This includes all the tombstones in the Head block. A single record is written into
the snapshot for all the tombstones. The encoded tombstones uses the same encoding
as tombstone file in blocks.
```
┌─────────────────────────────────────────────────────────────────┐
│ Record Type <byte>
├───────────────────────────────────┬─────────────────────────────┤
│ len(Encoded Tombstones) <uvarint> │ Encoded Tombstones <bytes>
└───────────────────────────────────┴─────────────────────────────┘
```

View file

@ -17,6 +17,7 @@ import (
"encoding/binary"
"hash"
"hash/crc32"
"math"
"unsafe"
"github.com/dennwc/varint"
@ -40,6 +41,7 @@ func (e *Encbuf) Len() int { return len(e.B) }
func (e *Encbuf) PutString(s string) { e.B = append(e.B, s...) }
func (e *Encbuf) PutByte(c byte) { e.B = append(e.B, c) }
func (e *Encbuf) PutBytes(b []byte) { e.B = append(e.B, b...) }
func (e *Encbuf) PutBE32int(x int) { e.PutBE32(uint32(x)) }
func (e *Encbuf) PutUvarint32(x uint32) { e.PutUvarint64(uint64(x)) }
@ -56,6 +58,10 @@ func (e *Encbuf) PutBE64(x uint64) {
e.B = append(e.B, e.C[:8]...)
}
func (e *Encbuf) PutBEFloat64(x float64) {
e.PutBE64(math.Float64bits(x))
}
func (e *Encbuf) PutUvarint64(x uint64) {
n := binary.PutUvarint(e.C[:], x)
e.B = append(e.B, e.C[:n]...)
@ -73,6 +79,12 @@ func (e *Encbuf) PutUvarintStr(s string) {
e.PutString(s)
}
// PutUvarintBytes writes a a variable length byte buffer.
func (e *Encbuf) PutUvarintBytes(b []byte) {
e.PutUvarint(len(b))
e.PutBytes(b)
}
// PutHash appends a hash over the buffers current contents to the buffer.
func (e *Encbuf) PutHash(h hash.Hash) {
h.Reset()
@ -249,6 +261,10 @@ func (d *Decbuf) Be64() uint64 {
return x
}
func (d *Decbuf) Be64Float64() float64 {
return math.Float64frombits(d.Be64())
}
func (d *Decbuf) Be32() uint32 {
if d.E != nil {
return 0

View file

@ -97,6 +97,8 @@ type Head struct {
// chunkDiskMapper is used to write and read Head chunks to/from disk.
chunkDiskMapper *chunks.ChunkDiskMapper
chunkSnapshotMtx sync.Mutex
closedMtx sync.Mutex
closed bool
@ -122,9 +124,10 @@ type HeadOptions struct {
// StripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
StripeSize int
SeriesCallback SeriesLifecycleCallback
EnableExemplarStorage bool
StripeSize int
SeriesCallback SeriesLifecycleCallback
EnableExemplarStorage bool
EnableMemorySnapshotOnShutdown bool
// Runtime reloadable options.
MaxExemplars atomic.Int64
@ -439,11 +442,25 @@ func (h *Head) Init(minValidTime int64) error {
h.minValidTime.Store(minValidTime)
defer h.postings.EnsureOrder()
defer h.gc() // After loading the wal remove the obsolete data from the head.
defer func() {
// Loading of m-mapped chunks and snapshot can make the mint of the Head
// to go below minValidTime.
if h.MinTime() < h.minValidTime.Load() {
h.minTime.Store(h.minValidTime.Load())
}
}()
level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any")
start := time.Now()
mmappedChunks, err := h.loadMmappedChunks()
snapIdx, snapOffset, refSeries, err := h.loadChunkSnapshot()
if err != nil {
return err
}
level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String())
mmapChunkReplayStart := time.Now()
mmappedChunks, err := h.loadMmappedChunks(refSeries)
if err != nil {
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err)
if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok {
@ -451,10 +468,10 @@ func (h *Head) Init(minValidTime int64) error {
}
// If this fails, data will be recovered from WAL.
// Hence we wont lose any data (given WAL is not corrupt).
mmappedChunks = h.removeCorruptedMmappedChunks(err)
mmappedChunks = h.removeCorruptedMmappedChunks(err, refSeries)
}
level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(start).String())
level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String())
if h.wal == nil {
level.Info(h.logger).Log("msg", "WAL not found")
return nil
@ -502,6 +519,9 @@ func (h *Head) Init(minValidTime int64) error {
walReplayStart := time.Now()
if snapIdx > startFrom {
startFrom = snapIdx
}
// Backfill segments from the most recent checkpoint onwards.
for i := startFrom; i <= endAt; i++ {
s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i))
@ -509,7 +529,14 @@ func (h *Head) Init(minValidTime int64) error {
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
}
sr := wal.NewSegmentBufReader(s)
offset := 0
if i == snapIdx {
offset = snapOffset
}
sr, err := wal.NewSegmentBufReaderWithOffset(offset, s)
if err != nil {
return errors.Wrapf(err, "segment reader (offset=%d)", offset)
}
err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks)
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
@ -533,29 +560,49 @@ func (h *Head) Init(minValidTime int64) error {
return nil
}
func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) {
func (h *Head) loadMmappedChunks(refSeries map[uint64]*memSeries) (map[uint64][]*mmappedChunk, error) {
mmappedChunks := map[uint64][]*mmappedChunk{}
if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error {
if maxt < h.minValidTime.Load() {
return nil
}
slice := mmappedChunks[seriesRef]
if len(slice) > 0 {
if slice[len(slice)-1].maxTime >= mint {
return &chunks.CorruptionErr{
Err: errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef),
}
ms, ok := refSeries[seriesRef]
if !ok {
slice := mmappedChunks[seriesRef]
if len(slice) > 0 && slice[len(slice)-1].maxTime >= mint {
return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef)
}
slice = append(slice, &mmappedChunk{
ref: chunkRef,
minTime: mint,
maxTime: maxt,
numSamples: numSamples,
})
mmappedChunks[seriesRef] = slice
return nil
}
slice = append(slice, &mmappedChunk{
if len(ms.mmappedChunks) > 0 && ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime >= mint {
return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef)
}
h.metrics.chunks.Inc()
h.metrics.chunksCreated.Inc()
ms.mmappedChunks = append(ms.mmappedChunks, &mmappedChunk{
ref: chunkRef,
minTime: mint,
maxTime: maxt,
numSamples: numSamples,
})
mmappedChunks[seriesRef] = slice
h.updateMinMaxTime(mint, maxt)
if ms.headChunk != nil && maxt >= ms.headChunk.minTime {
// The head chunk was completed and was m-mapped after taking the snapshot.
// Hence remove this chunk.
ms.nextAt = 0
ms.headChunk = nil
ms.app = nil
}
return nil
}); err != nil {
return nil, errors.Wrap(err, "iterate on on-disk chunks")
@ -565,7 +612,7 @@ func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) {
// removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously
// loaded mmapped chunks.
func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChunk {
func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[uint64]*memSeries) map[uint64][]*mmappedChunk {
level.Info(h.logger).Log("msg", "Deleting mmapped chunk files")
if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil {
@ -574,7 +621,7 @@ func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChun
}
level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks")
mmappedChunks, err := h.loadMmappedChunks()
mmappedChunks, err := h.loadMmappedChunks(refSeries)
if err != nil {
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err)
mmappedChunks = map[uint64][]*mmappedChunk{}
@ -661,6 +708,9 @@ func (h *Head) Truncate(mint int64) (err error) {
// truncateMemory removes old data before mint from the head.
func (h *Head) truncateMemory(mint int64) (err error) {
h.chunkSnapshotMtx.Lock()
defer h.chunkSnapshotMtx.Unlock()
defer func() {
if err != nil {
h.metrics.headTruncateFail.Inc()
@ -796,6 +846,9 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64)
// truncateWAL removes old data before mint from the WAL.
func (h *Head) truncateWAL(mint int64) error {
h.chunkSnapshotMtx.Lock()
defer h.chunkSnapshotMtx.Unlock()
if h.wal == nil || mint <= h.lastWALTruncationTime.Load() {
return nil
}
@ -1095,15 +1148,20 @@ func (h *Head) compactable() bool {
}
// Close flushes the WAL and closes the head.
// It also takes a snapshot of in-memory chunks if enabled.
func (h *Head) Close() error {
h.closedMtx.Lock()
defer h.closedMtx.Unlock()
h.closed = true
errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close())
if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown {
errs.Add(h.performChunkSnapshot())
}
if h.wal != nil {
errs.Add(h.wal.Close())
}
return errs.Err()
}
// String returns an human readable representation of the TSDB head. It's important to

View file

@ -468,12 +468,20 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
c = s.cutNewHeadChunk(t, chunkDiskMapper)
chunkCreated = true
}
numSamples := c.chunk.NumSamples()
// Out of order sample.
if c.maxTime >= t {
return false, chunkCreated
}
numSamples := c.chunk.NumSamples()
if numSamples == 0 {
// It could be the new chunk created after reading the chunk snapshot,
// hence we fix the minTime of the chunk here.
c.minTime = t
s.nextAt = rangeForTimestamp(c.minTime, s.chunkRange)
}
// If we reach 25% of a chunk's desired sample count, predict an end time
// for this chunk that will try to make samples equally distributed within
// the remaining chunks in the current chunk range.

View file

@ -2465,3 +2465,163 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) {
})
}
}
func TestChunkSnapshot(t *testing.T) {
head, _ := newTestHead(t, 120*4, false)
defer func() {
head.opts.EnableMemorySnapshotOnShutdown = false
require.NoError(t, head.Close())
}()
numSeries := 10
expSeries := make(map[string][]tsdbutil.Sample)
expTombstones := make(map[uint64]tombstones.Intervals)
{ // Initial data that goes into snapshot.
// Add some initial samples with >=1 m-map chunk.
app := head.Appender(context.Background())
for i := 1; i <= numSeries; i++ {
lbls := labels.Labels{labels.Label{Name: "foo", Value: fmt.Sprintf("bar%d", i)}}
lblStr := lbls.String()
// 240 samples should m-map at least 1 chunk.
for ts := int64(1); ts <= 240; ts++ {
val := rand.Float64()
expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val})
_, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
// Add some tombstones.
var enc record.Encoder
for i := 1; i <= numSeries; i++ {
ref := uint64(i)
itvs := tombstones.Intervals{
{Mint: 1234, Maxt: 2345},
{Mint: 3456, Maxt: 4567},
}
for _, itv := range itvs {
expTombstones[ref].Add(itv)
}
head.tombstones.AddInterval(ref, itvs...)
err := head.wal.Log(enc.Tombstones([]tombstones.Stone{
{Ref: ref, Intervals: itvs},
}, nil))
require.NoError(t, err)
}
}
// These references should be the ones used for the snapshot.
wlast, woffset, err := head.wal.LastSegmentAndOffset()
require.NoError(t, err)
{ // Creating snapshot and verifying it.
head.opts.EnableMemorySnapshotOnShutdown = true
require.NoError(t, head.Close()) // This will create a snapshot.
_, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot)
require.NoError(t, err)
require.Equal(t, wlast, sidx)
require.Equal(t, woffset, soffset)
}
{ // Test the replay of snapshot.
// Create new Head which should replay this snapshot.
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
// Test query for snapshot replay.
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"))
require.Equal(t, expSeries, series)
// Check the tombstones.
tr, err := head.Tombstones()
require.NoError(t, err)
actTombstones := make(map[uint64]tombstones.Intervals)
require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error {
for _, itv := range itvs {
actTombstones[ref].Add(itv)
}
return nil
}))
require.Equal(t, expTombstones, actTombstones)
}
{ // Additional data to only include in WAL and m-mapped chunks and not snapshot. This mimics having an old snapshot on disk.
// Add more samples.
app := head.Appender(context.Background())
for i := 1; i <= numSeries; i++ {
lbls := labels.Labels{labels.Label{Name: "foo", Value: fmt.Sprintf("bar%d", i)}}
lblStr := lbls.String()
// 240 samples should m-map at least 1 chunk.
for ts := int64(241); ts <= 480; ts++ {
val := rand.Float64()
expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val})
_, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
// Add more tombstones.
var enc record.Encoder
for i := 1; i <= numSeries; i++ {
ref := uint64(i)
itvs := tombstones.Intervals{
{Mint: 12345, Maxt: 23456},
{Mint: 34567, Maxt: 45678},
}
for _, itv := range itvs {
expTombstones[ref].Add(itv)
}
head.tombstones.AddInterval(ref, itvs...)
err := head.wal.Log(enc.Tombstones([]tombstones.Stone{
{Ref: ref, Intervals: itvs},
}, nil))
require.NoError(t, err)
}
}
{ // Close Head and verify that new snapshot was not created.
head.opts.EnableMemorySnapshotOnShutdown = false
require.NoError(t, head.Close()) // This should not create a snapshot.
_, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot)
require.NoError(t, err)
require.Equal(t, wlast, sidx)
require.Equal(t, woffset, soffset)
}
{ // Test the replay of snapshot, m-map chunks, and WAL.
// Create new Head to replay snapshot, m-map chunks, and WAL.
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
// Test query when data is replayed from snapshot, m-map chunks, and WAL.
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"))
require.Equal(t, expSeries, series)
// Check the tombstones.
tr, err := head.Tombstones()
require.NoError(t, err)
actTombstones := make(map[uint64]tombstones.Intervals)
require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error {
for _, itv := range itvs {
actTombstones[ref].Add(itv)
}
return nil
}))
require.Equal(t, expTombstones, actTombstones)
}
}

View file

@ -15,8 +15,18 @@ package tsdb
import (
"fmt"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/encoding"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"io/ioutil"
"math"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
@ -386,3 +396,474 @@ func (h *Head) processWALSamples(
return unknownRefs
}
const (
chunkSnapshotRecordTypeSeries uint8 = 1
chunkSnapshotRecordTypeTombstones uint8 = 2
)
type chunkSnapshotRecord struct {
ref uint64
lset labels.Labels
chunkRange int64
mc *memChunk
sampleBuf [4]sample
}
func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(chunkSnapshotRecordTypeSeries)
buf.PutBE64(s.ref)
buf.PutUvarint(len(s.lset))
for _, l := range s.lset {
buf.PutUvarintStr(l.Name)
buf.PutUvarintStr(l.Value)
}
buf.PutBE64int64(s.chunkRange)
s.Lock()
if s.headChunk == nil {
buf.PutUvarint(0)
} else {
buf.PutUvarint(1)
buf.PutBE64int64(s.headChunk.minTime)
buf.PutBE64int64(s.headChunk.maxTime)
buf.PutByte(byte(s.headChunk.chunk.Encoding()))
buf.PutUvarintBytes(s.headChunk.chunk.Bytes())
// Put the sample buf.
for _, smpl := range s.sampleBuf {
buf.PutBE64int64(smpl.t)
buf.PutBEFloat64(smpl.v)
}
}
s.Unlock()
return buf.Get()
}
func decodeSeriesFromChunkSnapshot(b []byte) (csr chunkSnapshotRecord, err error) {
dec := encoding.Decbuf{B: b}
if flag := dec.Byte(); flag != chunkSnapshotRecordTypeSeries {
return csr, errors.Errorf("invalid record type %x", flag)
}
csr.ref = dec.Be64()
// The label set written to the disk is already sorted.
csr.lset = make(labels.Labels, dec.Uvarint())
for i := range csr.lset {
csr.lset[i].Name = dec.UvarintStr()
csr.lset[i].Value = dec.UvarintStr()
}
csr.chunkRange = dec.Be64int64()
if dec.Uvarint() == 0 {
return
}
csr.mc = &memChunk{}
csr.mc.minTime = dec.Be64int64()
csr.mc.maxTime = dec.Be64int64()
enc := chunkenc.Encoding(dec.Byte())
// The underlying bytes gets re-used later, so make a copy.
chunkBytes := dec.UvarintBytes()
chunkBytesCopy := make([]byte, len(chunkBytes))
copy(chunkBytesCopy, chunkBytes)
chk, err := chunkenc.FromData(enc, chunkBytesCopy)
if err != nil {
return csr, errors.Wrap(err, "chunk from data")
}
csr.mc.chunk = chk
for i := range csr.sampleBuf {
csr.sampleBuf[i].t = dec.Be64int64()
csr.sampleBuf[i].v = dec.Be64Float64()
}
err = dec.Err()
if err != nil && len(dec.B) > 0 {
err = errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return
}
func encodeTombstonesToSnapshotRecord(tr tombstones.Reader) ([]byte, error) {
buf := encoding.Encbuf{}
buf.PutByte(chunkSnapshotRecordTypeTombstones)
b, err := tombstones.Encode(tr)
if err != nil {
return nil, errors.Wrap(err, "encode tombstones")
}
buf.PutUvarintBytes(b)
return buf.Get(), nil
}
func decodeTombstonesSnapshotRecord(b []byte) (tombstones.Reader, error) {
dec := encoding.Decbuf{B: b}
if flag := dec.Byte(); flag != chunkSnapshotRecordTypeTombstones {
return nil, errors.Errorf("invalid record type %x", flag)
}
tr, err := tombstones.Decode(dec.UvarintBytes())
return tr, errors.Wrap(err, "decode tombstones")
}
const chunkSnapshotPrefix = "chunk_snapshot."
// ChunkSnapshot creates a snapshot of all the series and tombstones in the head.
// It deletes the old chunk snapshots if the chunk snapshot creation is successful.
//
// The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written
// using the WAL package. N is the last WAL segment present during snapshotting and
// M is the offset in segment N upto which data was written.
func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
if h.wal == nil {
// If we are not storing any WAL, does not make sense to take a snapshot too.
level.Warn(h.logger).Log("msg", "skipping chunk snapshotting as WAL is disabled")
return &ChunkSnapshotStats{}, nil
}
h.chunkSnapshotMtx.Lock()
defer h.chunkSnapshotMtx.Unlock()
stats := &ChunkSnapshotStats{}
wlast, woffset, err := h.wal.LastSegmentAndOffset()
if err != nil && err != record.ErrNotFound {
return stats, errors.Wrap(err, "get last wal segment and offset")
}
_, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
if err != nil && err != record.ErrNotFound {
return stats, errors.Wrap(err, "find last chunk snapshot")
}
if wlast == cslast && woffset == csoffset {
// Nothing has been written to the WAL/Head since the last snapshot.
return stats, nil
}
snapshotName := fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset)
cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName)
cpdirtmp := cpdir + ".tmp"
stats.Dir = cpdir
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return stats, errors.Wrap(err, "create chunk snapshot dir")
}
cp, err := wal.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled())
if err != nil {
return stats, errors.Wrap(err, "open chunk snapshot")
}
// Ensures that an early return caused by an error doesn't leave any tmp files.
defer func() {
cp.Close()
os.RemoveAll(cpdirtmp)
}()
var (
buf []byte
recs [][]byte
)
stripeSize := h.series.size
for i := 0; i < stripeSize; i++ {
h.series.locks[i].RLock()
for _, s := range h.series.series[i] {
start := len(buf)
buf = s.encodeToSnapshotRecord(buf)
if len(buf[start:]) == 0 {
continue // All contents discarded.
}
recs = append(recs, buf[start:])
// Flush records in 10 MB increments.
if len(buf) > 10*1024*1024 {
if err := cp.Log(recs...); err != nil {
h.series.locks[i].RUnlock()
return stats, errors.Wrap(err, "flush records")
}
buf, recs = buf[:0], recs[:0]
}
}
stats.TotalSeries += len(h.series.series[i])
h.series.locks[i].RUnlock()
}
// Add tombstones to the snapshot.
tombstonesReader, err := h.Tombstones()
if err != nil {
return stats, errors.Wrap(err, "get tombstones")
}
rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader)
if err != nil {
return stats, errors.Wrap(err, "encode tombstones")
}
recs = append(recs, rec)
// Flush remaining records.
if err := cp.Log(recs...); err != nil {
return stats, errors.Wrap(err, "flush records")
}
if err := cp.Close(); err != nil {
return stats, errors.Wrap(err, "close chunk snapshot")
}
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return stats, errors.Wrap(err, "rename chunk snapshot directory")
}
if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, cslast, csoffset); err != nil {
// Leftover old chunk snapshots do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher chunk snapshot exists.
level.Error(h.logger).Log("msg", "delete old chunk snapshots", "err", err)
}
return stats, nil
}
func (h *Head) performChunkSnapshot() error {
level.Info(h.logger).Log("msg", "creating chunk snapshot")
startTime := time.Now()
stats, err := h.ChunkSnapshot()
elapsed := time.Since(startTime)
if err == nil {
level.Info(h.logger).Log("msg", "chunk snapshot complete", "duration", elapsed.String(), "num_series", stats.TotalSeries, "dir", stats.Dir)
}
return errors.Wrap(err, "chunk snapshot")
}
// ChunkSnapshotStats returns stats about a created chunk snapshot.
type ChunkSnapshotStats struct {
TotalSeries int
Dir string
}
// LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot.
// If dir does not contain any chunk snapshots, ErrNotFound is returned.
func LastChunkSnapshot(dir string) (string, int, int, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return "", 0, 0, err
}
// Traverse list backwards since there may be multiple chunk snapshots left.
for i := len(files) - 1; i >= 0; i-- {
fi := files[i]
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
continue
}
if !fi.IsDir() {
return "", 0, 0, errors.Errorf("chunk snapshot %s is not a directory", fi.Name())
}
splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
if len(splits) != 2 {
return "", 0, 0, errors.Errorf("chunk snapshot %s is not in the right format", fi.Name())
}
idx, err := strconv.Atoi(splits[0])
if err != nil {
continue
}
offset, err := strconv.Atoi(splits[1])
if err != nil {
continue
}
return filepath.Join(dir, fi.Name()), idx, offset, nil
}
return "", 0, 0, record.ErrNotFound
}
// DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.
func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error {
files, err := ioutil.ReadDir(dir)
if err != nil {
return err
}
errs := tsdb_errors.NewMulti()
for _, fi := range files {
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
continue
}
splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
if len(splits) != 2 {
continue
}
idx, err := strconv.Atoi(splits[0])
if err != nil {
continue
}
offset, err := strconv.Atoi(splits[1])
if err != nil {
continue
}
if idx <= maxIndex && offset < maxOffset {
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
errs.Add(err)
}
}
}
return errs.Err()
}
func (h *Head) loadChunkSnapshot() (int, int, map[uint64]*memSeries, error) {
dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
if err != nil {
if err == record.ErrNotFound {
return snapIdx, snapOffset, nil, nil
}
return snapIdx, snapOffset, nil, errors.Wrap(err, "find last chunk snapshot")
}
start := time.Now()
sr, err := wal.NewSegmentsReader(dir)
if err != nil {
return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot")
}
defer func() {
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
}()
var (
numSeries = 0
unknownRefs = int64(0)
n = runtime.GOMAXPROCS(0)
wg sync.WaitGroup
recordChan = make(chan chunkSnapshotRecord, 5*n)
shardedRefSeries = make([]map[uint64]*memSeries, n)
errChan = make(chan error, n)
)
wg.Add(n)
for i := 0; i < n; i++ {
go func(idx int, rc <-chan chunkSnapshotRecord) {
defer wg.Done()
defer func() {
// If there was an error, drain the channel
// to unblock the main thread.
for range rc {
}
}()
shardedRefSeries[idx] = make(map[uint64]*memSeries)
localRefSeries := shardedRefSeries[idx]
for csr := range rc {
series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset)
if err != nil {
errChan <- err
return
}
localRefSeries[csr.ref] = series
if h.lastSeriesID.Load() < series.ref {
h.lastSeriesID.Store(series.ref)
}
series.chunkRange = csr.chunkRange
if csr.mc == nil {
continue
}
series.nextAt = csr.mc.maxTime // This will create a new chunk on append.
series.headChunk = csr.mc
for i := range series.sampleBuf {
series.sampleBuf[i].t = csr.sampleBuf[i].t
series.sampleBuf[i].v = csr.sampleBuf[i].v
}
app, err := series.headChunk.chunk.Appender()
if err != nil {
errChan <- err
return
}
series.app = app
h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime)
}
}(i, recordChan)
}
r := wal.NewReader(sr)
var loopErr error
Outer:
for r.Next() {
select {
case err := <-errChan:
errChan <- err
break Outer
default:
}
rec := r.Record()
switch rec[0] {
case chunkSnapshotRecordTypeSeries:
numSeries++
csr, err := decodeSeriesFromChunkSnapshot(rec)
if err != nil {
loopErr = errors.Wrap(err, "decode series record")
break Outer
}
recordChan <- csr
case chunkSnapshotRecordTypeTombstones:
tr, err := decodeTombstonesSnapshotRecord(rec)
if err != nil {
loopErr = errors.Wrap(err, "decode tombstones")
break Outer
}
if err = tr.Iter(func(ref uint64, ivs tombstones.Intervals) error {
h.tombstones.AddInterval(ref, ivs...)
return nil
}); err != nil {
loopErr = errors.Wrap(err, "iterate tombstones")
break Outer
}
}
}
close(recordChan)
wg.Wait()
close(errChan)
merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop"))
for err := range errChan {
merr.Add(errors.Wrap(err, "record processing"))
}
if err := merr.Err(); err != nil {
return -1, -1, nil, err
}
refSeries := make(map[uint64]*memSeries, numSeries)
for _, shard := range shardedRefSeries {
for k, v := range shard {
refSeries[k] = v
}
}
elapsed := time.Since(start)
level.Info(h.logger).Log("msg", "chunk snapshot loaded", "dir", dir, "num_series", numSeries, "duration", elapsed.String())
if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references during chunk snapshot replay", "count", unknownRefs)
}
return snapIdx, snapOffset, refSeries, nil
}

View file

@ -699,6 +699,22 @@ func (w *WAL) log(rec []byte, final bool) error {
return nil
}
// LastSegmentAndOffset returns the last segment number of the WAL
// and the offset in that file upto which the segment has been filled.
func (w *WAL) LastSegmentAndOffset() (seg, offset int, err error) {
w.mtx.Lock()
defer w.mtx.Unlock()
_, seg, err = Segments(w.Dir())
if err != nil {
return
}
offset = (w.donePages * pageSize) + w.page.alloc
return
}
// Truncate drops all segments before i.
func (w *WAL) Truncate(i int) (err error) {
w.metrics.truncateTotal.Inc()
@ -867,6 +883,21 @@ func NewSegmentBufReader(segs ...*Segment) *segmentBufReader {
}
}
// nolint:golint
func NewSegmentBufReaderWithOffset(offset int, segs ...*Segment) (sbr *segmentBufReader, err error) {
if offset == 0 {
return NewSegmentBufReader(segs...), nil
}
sbr = &segmentBufReader{
buf: bufio.NewReaderSize(segs[0], 16*pageSize),
segs: segs,
}
if offset > 0 {
_, err = sbr.buf.Discard(offset)
}
return sbr, err
}
func (r *segmentBufReader) Close() (err error) {
for _, s := range r.segs {
if e := s.Close(); e != nil {