diff --git a/tsdb/docs/format/memory_snapshot.md b/tsdb/docs/format/memory_snapshot.md index b6c53cd8d..315080545 100644 --- a/tsdb/docs/format/memory_snapshot.md +++ b/tsdb/docs/format/memory_snapshot.md @@ -3,6 +3,11 @@ Memory snapshot uses the WAL package and writes each series as a WAL record. Below are the formats of the individual records. +The order of records in the snapshot is always: +1. Starts with series records, one per series, in an unsorted fashion. +2. After all series are done, we write a tombstone record containing all the tombstones. +3. At the end, we write one or more exemplar records while batching up the exemplars in each record. Exemplars are in the order they were written to the circular buffer. + ### Series records This record is a snapshot of a single series. Only one series exists per record. @@ -60,3 +65,30 @@ as tombstone file in blocks. │ len(Encoded Tombstones) │ Encoded Tombstones │ └───────────────────────────────────┴─────────────────────────────┘ ``` + + +### Exemplar record + +A single exemplar record contains one or more exemplars, encoded in the same way as we do in WAL but with changed record type. + +``` +┌───────────────────────────────────────────────────────────────────┐ +│ Record Type │ +├───────────────────────────────────────────────────────────────────┤ +│ ┌────────────────────┬───────────────────────────┐ │ +│ │ series ref <8b> │ timestamp <8b> │ │ +│ └────────────────────┴───────────────────────────┘ │ +│ ┌─────────────────────┬───────────────────────────┬─────────────┐ │ +│ │ ref_delta │ timestamp_delta │ value <8b> │ │ +│ ├─────────────────────┴───────────────────────────┴─────────────┤ │ +│ │ n = len(labels) │ │ +│ ├───────────────────────────────┬───────────────────────────────┤ │ +│ │ len(str_1) │ str_1 │ │ +│ ├───────────────────────────────┴───────────────────────────────┤ │ +│ │ ... │ │ +│ ├───────────────────────────────┬───────────────────────────────┤ │ +│ │ len(str_2n) │ str_2n │ │ +│ ├───────────────────────────────┴───────────────────────────────┤ │ +│ . . . │ +└───────────────────────────────────────────────────────────────────┘ +``` diff --git a/tsdb/exemplar.go b/tsdb/exemplar.go index 3aa9607bb..c7cefaba0 100644 --- a/tsdb/exemplar.go +++ b/tsdb/exemplar.go @@ -400,3 +400,23 @@ func (ce *CircularExemplarStorage) computeMetrics() { ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000) } } + +// IterateExemplars iterates through all the exemplars from oldest to newest appended and calls +// the given function on all of them till the end (or) till the first function call that returns an error. +func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error { + ce.lock.RLock() + defer ce.lock.RUnlock() + + idx := ce.nextIndex + l := len(ce.exemplars) + for i := 0; i < l; i, idx = i+1, (idx+1)%l { + if ce.exemplars[idx] == nil { + continue + } + err := f(ce.exemplars[idx].ref.seriesLabels, ce.exemplars[idx].exemplar) + if err != nil { + return err + } + } + return nil +} diff --git a/tsdb/exemplar_test.go b/tsdb/exemplar_test.go index c81b3ffce..8fcaf6d2e 100644 --- a/tsdb/exemplar_test.go +++ b/tsdb/exemplar_test.go @@ -477,16 +477,27 @@ func TestResize(t *testing.T) { } } -func BenchmarkAddExemplar(t *testing.B) { - exs, err := NewCircularExemplarStorage(int64(t.N), eMetrics) - require.NoError(t, err) - es := exs.(*CircularExemplarStorage) +func BenchmarkAddExemplar(b *testing.B) { + // We need to include these labels since we do length calculation + // before adding. + exLabels := labels.Labels{{Name: "traceID", Value: "89620921"}} - for i := 0; i < t.N; i++ { - l := labels.FromStrings("service", strconv.Itoa(i)) + for _, n := range []int{10000, 100000, 1000000} { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + exs, err := NewCircularExemplarStorage(int64(n), eMetrics) + require.NoError(b, err) + es := exs.(*CircularExemplarStorage) - err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)}) - require.NoError(t, err) + b.ResetTimer() + l := labels.Labels{{Name: "service", Value: strconv.Itoa(0)}} + for i := 0; i < n; i++ { + if i%100 == 0 { + l = labels.Labels{{Name: "service", Value: strconv.Itoa(i)}} + } + err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels}) + require.NoError(b, err) + } + }) } } diff --git a/tsdb/head.go b/tsdb/head.go index 8872e0c79..4bbf139cc 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -113,6 +113,7 @@ type ExemplarStorage interface { storage.ExemplarQueryable AddExemplar(labels.Labels, exemplar.Exemplar) error ValidateExemplar(labels.Labels, exemplar.Exemplar) error + IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error } // HeadOptions are parameters for the Head block. @@ -454,7 +455,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second // Init loads data from the write ahead log and prepares the head for writes. // It should be called before using an appender so that it // limits the ingested samples to the head min valid time. -func (h *Head) Init(minValidTime int64) (err error) { +func (h *Head) Init(minValidTime int64) error { h.minValidTime.Store(minValidTime) defer h.postings.EnsureOrder() defer h.gc() // After loading the wal remove the obsolete data from the head. @@ -474,6 +475,7 @@ func (h *Head) Init(minValidTime int64) (err error) { if h.opts.EnableMemorySnapshotOnShutdown { level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot") + var err error snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot() if err != nil { snapIdx, snapOffset = -1, 0 diff --git a/tsdb/head_test.go b/tsdb/head_test.go index dfaf46844..eaaac4166 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2496,9 +2496,62 @@ func TestChunkSnapshot(t *testing.T) { require.NoError(t, head.Close()) }() + type ex struct { + seriesLabels labels.Labels + e exemplar.Exemplar + } + numSeries := 10 expSeries := make(map[string][]tsdbutil.Sample) expTombstones := make(map[uint64]tombstones.Intervals) + expExemplars := make([]ex, 0) + + addExemplar := func(app storage.Appender, ref uint64, lbls labels.Labels, ts int64) { + e := ex{ + seriesLabels: lbls, + e: exemplar.Exemplar{ + Labels: labels.Labels{{Name: "traceID", Value: fmt.Sprintf("%d", rand.Int())}}, + Value: rand.Float64(), + Ts: ts, + }, + } + expExemplars = append(expExemplars, e) + _, err := app.AppendExemplar(ref, e.seriesLabels, e.e) + require.NoError(t, err) + } + + checkSamples := func() { + q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*")) + require.Equal(t, expSeries, series) + } + checkTombstones := func() { + tr, err := head.Tombstones() + require.NoError(t, err) + actTombstones := make(map[uint64]tombstones.Intervals) + require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error { + for _, itv := range itvs { + actTombstones[ref].Add(itv) + } + return nil + })) + require.Equal(t, expTombstones, actTombstones) + } + checkExemplars := func() { + actExemplars := make([]ex, 0, len(expExemplars)) + err := head.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error { + actExemplars = append(actExemplars, ex{ + seriesLabels: seriesLabels, + e: e, + }) + return nil + }) + require.NoError(t, err) + // Verifies both existence of right exemplars and order of exemplars in the buffer. + require.Equal(t, expExemplars, actExemplars) + } + { // Initial data that goes into snapshot. // Add some initial samples with >=1 m-map chunk. app := head.Appender(context.Background()) @@ -2509,11 +2562,12 @@ func TestChunkSnapshot(t *testing.T) { for ts := int64(1); ts <= 200; ts++ { val := rand.Float64() expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val}) - _, err := app.Append(0, lbls, ts, val) + ref, err := app.Append(0, lbls, ts, val) require.NoError(t, err) - // To create multiple WAL records. + // Add an exemplar and to create multiple WAL records. if ts%10 == 0 { + addExemplar(app, ref, lbls, ts) require.NoError(t, app.Commit()) app = head.Appender(context.Background()) } @@ -2538,6 +2592,7 @@ func TestChunkSnapshot(t *testing.T) { }, nil)) require.NoError(t, err) } + } // These references should be the ones used for the snapshot. @@ -2563,22 +2618,9 @@ func TestChunkSnapshot(t *testing.T) { require.NoError(t, head.Init(math.MinInt64)) // Test query for snapshot replay. - q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) - require.NoError(t, err) - series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*")) - require.Equal(t, expSeries, series) - - // Check the tombstones. - tr, err := head.Tombstones() - require.NoError(t, err) - actTombstones := make(map[uint64]tombstones.Intervals) - require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error { - for _, itv := range itvs { - actTombstones[ref].Add(itv) - } - return nil - })) - require.Equal(t, expTombstones, actTombstones) + checkSamples() + checkTombstones() + checkExemplars() } { // Additional data to only include in WAL and m-mapped chunks and not snapshot. This mimics having an old snapshot on disk. @@ -2592,11 +2634,12 @@ func TestChunkSnapshot(t *testing.T) { for ts := int64(201); ts <= 400; ts++ { val := rand.Float64() expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val}) - _, err := app.Append(0, lbls, ts, val) + ref, err := app.Append(0, lbls, ts, val) require.NoError(t, err) - // To create multiple WAL records. + // Add an exemplar and to create multiple WAL records. if ts%10 == 0 { + addExemplar(app, ref, lbls, ts) require.NoError(t, app.Commit()) app = head.Appender(context.Background()) } @@ -2643,22 +2686,9 @@ func TestChunkSnapshot(t *testing.T) { require.NoError(t, head.Init(math.MinInt64)) // Test query when data is replayed from snapshot, m-map chunks, and WAL. - q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) - require.NoError(t, err) - series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*")) - require.Equal(t, expSeries, series) - - // Check the tombstones. - tr, err := head.Tombstones() - require.NoError(t, err) - actTombstones := make(map[uint64]tombstones.Intervals) - require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error { - for _, itv := range itvs { - actTombstones[ref].Add(itv) - } - return nil - })) - require.Equal(t, expTombstones, actTombstones) + checkSamples() + checkTombstones() + checkExemplars() } } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 428a9eb6d..ead81d468 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -413,6 +413,7 @@ func (h *Head) processWALSamples( const ( chunkSnapshotRecordTypeSeries uint8 = 1 chunkSnapshotRecordTypeTombstones uint8 = 2 + chunkSnapshotRecordTypeExemplars uint8 = 3 ) type chunkSnapshotRecord struct { @@ -537,6 +538,10 @@ const chunkSnapshotPrefix = "chunk_snapshot." // The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written // using the WAL package. N is the last WAL segment present during snapshotting and // M is the offset in segment N upto which data was written. +// +// The snapshot first contains all series (each in individual records and not sorted), followed by +// tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they +// were written to the circular buffer. func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { if h.wal == nil { // If we are not storing any WAL, does not make sense to take a snapshot too. @@ -587,6 +592,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { buf []byte recs [][]byte ) + // Add all series to the snapshot. stripeSize := h.series.size for i := 0; i < stripeSize; i++ { h.series.locks[i].RLock() @@ -622,11 +628,61 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { return stats, errors.Wrap(err, "encode tombstones") } recs = append(recs, rec) - - // Flush remaining records. + // Flush remaining series records and tombstones. if err := cp.Log(recs...); err != nil { return stats, errors.Wrap(err, "flush records") } + buf = buf[:0] + + // Add exemplars in the snapshot. + // We log in batches, with each record having upto 10000 exemplars. + // Assuming 100 bytes (overestimate) per exemplar, that's ~1MB. + maxExemplarsPerRecord := 10000 + batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord) + enc := record.Encoder{} + flushExemplars := func() error { + if len(batch) == 0 { + return nil + } + buf = buf[:0] + encbuf := encoding.Encbuf{B: buf} + encbuf.PutByte(chunkSnapshotRecordTypeExemplars) + enc.EncodeExemplarsIntoBuffer(batch, &encbuf) + if err := cp.Log(encbuf.Get()); err != nil { + return errors.Wrap(err, "log exemplars") + } + buf, batch = buf[:0], batch[:0] + return nil + } + err = h.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error { + if len(batch) >= maxExemplarsPerRecord { + if err := flushExemplars(); err != nil { + return errors.Wrap(err, "flush exemplars") + } + } + + ms := h.series.getByHash(seriesLabels.Hash(), seriesLabels) + if ms == nil { + // It is possible that exemplar refers to some old series. We discard such exemplars. + return nil + } + batch = append(batch, record.RefExemplar{ + Ref: ms.ref, + T: e.Ts, + V: e.Value, + Labels: e.Labels, + }) + return nil + }) + if err != nil { + return stats, errors.Wrap(err, "iterate exemplars") + } + + // Flush remaining exemplars. + if err := flushExemplars(); err != nil { + return stats, errors.Wrap(err, "flush exemplars at the end") + } + if err := cp.Close(); err != nil { return stats, errors.Wrap(err, "close chunk snapshot") } @@ -766,6 +822,9 @@ func (h *Head) loadChunkSnapshot() (int, int, map[uint64]*memSeries, error) { recordChan = make(chan chunkSnapshotRecord, 5*n) shardedRefSeries = make([]map[uint64]*memSeries, n) errChan = make(chan error, n) + refSeries map[uint64]*memSeries + exemplarBuf []record.RefExemplar + dec record.Decoder ) wg.Add(n) @@ -852,15 +911,58 @@ Outer: loopErr = errors.Wrap(err, "iterate tombstones") break Outer } + + case chunkSnapshotRecordTypeExemplars: + // Exemplars are at the end of snapshot. So all series are loaded at this point. + if len(refSeries) == 0 { + close(recordChan) + wg.Wait() + + refSeries = make(map[uint64]*memSeries, numSeries) + for _, shard := range shardedRefSeries { + for k, v := range shard { + refSeries[k] = v + } + } + } + + decbuf := encoding.Decbuf{B: rec[1:]} + + exemplarBuf = exemplarBuf[:0] + exemplarBuf, err = dec.ExemplarsFromBuffer(&decbuf, exemplarBuf) + if err != nil { + loopErr = errors.Wrap(err, "exemplars from buffer") + break Outer + } + + for _, e := range exemplarBuf { + ms, ok := refSeries[e.Ref] + if !ok { + unknownRefs++ + continue + } + + if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{ + Labels: e.Labels, + Value: e.V, + Ts: e.T, + }); err != nil { + loopErr = errors.Wrap(err, "append exemplar") + break Outer + } + } + default: // This is a record type we don't understand. It is either and old format from earlier versions, // or a new format and the code was rolled back to old version. loopErr = errors.Errorf("unsuported snapshot record type 0b%b", rec[0]) + break Outer } - } - close(recordChan) - wg.Wait() + if len(refSeries) == 0 { + close(recordChan) + wg.Wait() + } close(errChan) merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop")) @@ -875,10 +977,13 @@ Outer: return -1, -1, nil, errors.Wrap(r.Err(), "read records") } - refSeries := make(map[uint64]*memSeries, numSeries) - for _, shard := range shardedRefSeries { - for k, v := range shard { - refSeries[k] = v + if len(refSeries) == 0 { + // We had no exemplar record, so we have to build the map here. + refSeries = make(map[uint64]*memSeries, numSeries) + for _, shard := range shardedRefSeries { + for k, v := range shard { + refSeries[k] = v + } } } diff --git a/tsdb/record/record.go b/tsdb/record/record.go index b4ee77f0f..3bfb1be93 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -182,6 +182,11 @@ func (d *Decoder) Exemplars(rec []byte, exemplars []RefExemplar) ([]RefExemplar, if t != Exemplars { return nil, errors.New("invalid record type") } + + return d.ExemplarsFromBuffer(&dec, exemplars) +} + +func (d *Decoder) ExemplarsFromBuffer(dec *encoding.Decbuf, exemplars []RefExemplar) ([]RefExemplar, error) { if dec.Len() == 0 { return exemplars, nil } @@ -287,6 +292,12 @@ func (e *Encoder) Exemplars(exemplars []RefExemplar, b []byte) []byte { return buf.Get() } + e.EncodeExemplarsIntoBuffer(exemplars, &buf) + + return buf.Get() +} + +func (e *Encoder) EncodeExemplarsIntoBuffer(exemplars []RefExemplar, buf *encoding.Encbuf) { // Store base timestamp and base reference number of first sample. // All samples encode their timestamp and ref as delta to those. first := exemplars[0] @@ -305,6 +316,4 @@ func (e *Encoder) Exemplars(exemplars []RefExemplar, b []byte) []byte { buf.PutUvarintStr(l.Value) } } - - return buf.Get() }