Exemplars in snapshot (#9255)

* Exemplars in snapshot

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix lint

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Add docs

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix lint

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix comments

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-08-30 19:34:38 +05:30 committed by GitHub
parent dff78eb508
commit 35b1a82594
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 265 additions and 56 deletions

View file

@ -3,6 +3,11 @@
Memory snapshot uses the WAL package and writes each series as a WAL record.
Below are the formats of the individual records.
The order of records in the snapshot is always:
1. Starts with series records, one per series, in an unsorted fashion.
2. After all series are done, we write a tombstone record containing all the tombstones.
3. At the end, we write one or more exemplar records while batching up the exemplars in each record. Exemplars are in the order they were written to the circular buffer.
### Series records
This record is a snapshot of a single series. Only one series exists per record.
@ -60,3 +65,30 @@ as tombstone file in blocks.
│ len(Encoded Tombstones) <uvarint> │ Encoded Tombstones <bytes>
└───────────────────────────────────┴─────────────────────────────┘
```
### Exemplar record
A single exemplar record contains one or more exemplars, encoded in the same way as we do in WAL but with changed record type.
```
┌───────────────────────────────────────────────────────────────────┐
│ Record Type <byte>
├───────────────────────────────────────────────────────────────────┤
│ ┌────────────────────┬───────────────────────────┐ │
│ │ series ref <8b> │ timestamp <8b> │ │
│ └────────────────────┴───────────────────────────┘ │
│ ┌─────────────────────┬───────────────────────────┬─────────────┐ │
│ │ ref_delta <uvarint> │ timestamp_delta <uvarint> │ value <8b> │ │
│ ├─────────────────────┴───────────────────────────┴─────────────┤ │
│ │ n = len(labels) <uvarint> │ │
│ ├───────────────────────────────┬───────────────────────────────┤ │
│ │ len(str_1) <uvarint> │ str_1 <bytes> │ │
│ ├───────────────────────────────┴───────────────────────────────┤ │
│ │ ... │ │
│ ├───────────────────────────────┬───────────────────────────────┤ │
│ │ len(str_2n) <uvarint> │ str_2n <bytes> │ │
│ ├───────────────────────────────┴───────────────────────────────┤ │
│ . . . │
└───────────────────────────────────────────────────────────────────┘
```

View file

@ -400,3 +400,23 @@ func (ce *CircularExemplarStorage) computeMetrics() {
ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000)
}
}
// IterateExemplars iterates through all the exemplars from oldest to newest appended and calls
// the given function on all of them till the end (or) till the first function call that returns an error.
func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error {
ce.lock.RLock()
defer ce.lock.RUnlock()
idx := ce.nextIndex
l := len(ce.exemplars)
for i := 0; i < l; i, idx = i+1, (idx+1)%l {
if ce.exemplars[idx] == nil {
continue
}
err := f(ce.exemplars[idx].ref.seriesLabels, ce.exemplars[idx].exemplar)
if err != nil {
return err
}
}
return nil
}

View file

@ -477,16 +477,27 @@ func TestResize(t *testing.T) {
}
}
func BenchmarkAddExemplar(t *testing.B) {
exs, err := NewCircularExemplarStorage(int64(t.N), eMetrics)
require.NoError(t, err)
es := exs.(*CircularExemplarStorage)
func BenchmarkAddExemplar(b *testing.B) {
// We need to include these labels since we do length calculation
// before adding.
exLabels := labels.Labels{{Name: "traceID", Value: "89620921"}}
for i := 0; i < t.N; i++ {
l := labels.FromStrings("service", strconv.Itoa(i))
for _, n := range []int{10000, 100000, 1000000} {
b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
exs, err := NewCircularExemplarStorage(int64(n), eMetrics)
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)})
require.NoError(t, err)
b.ResetTimer()
l := labels.Labels{{Name: "service", Value: strconv.Itoa(0)}}
for i := 0; i < n; i++ {
if i%100 == 0 {
l = labels.Labels{{Name: "service", Value: strconv.Itoa(i)}}
}
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels})
require.NoError(b, err)
}
})
}
}

View file

@ -113,6 +113,7 @@ type ExemplarStorage interface {
storage.ExemplarQueryable
AddExemplar(labels.Labels, exemplar.Exemplar) error
ValidateExemplar(labels.Labels, exemplar.Exemplar) error
IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error
}
// HeadOptions are parameters for the Head block.
@ -454,7 +455,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
// Init loads data from the write ahead log and prepares the head for writes.
// It should be called before using an appender so that it
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) (err error) {
func (h *Head) Init(minValidTime int64) error {
h.minValidTime.Store(minValidTime)
defer h.postings.EnsureOrder()
defer h.gc() // After loading the wal remove the obsolete data from the head.
@ -474,6 +475,7 @@ func (h *Head) Init(minValidTime int64) (err error) {
if h.opts.EnableMemorySnapshotOnShutdown {
level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot")
var err error
snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot()
if err != nil {
snapIdx, snapOffset = -1, 0

View file

@ -2496,9 +2496,62 @@ func TestChunkSnapshot(t *testing.T) {
require.NoError(t, head.Close())
}()
type ex struct {
seriesLabels labels.Labels
e exemplar.Exemplar
}
numSeries := 10
expSeries := make(map[string][]tsdbutil.Sample)
expTombstones := make(map[uint64]tombstones.Intervals)
expExemplars := make([]ex, 0)
addExemplar := func(app storage.Appender, ref uint64, lbls labels.Labels, ts int64) {
e := ex{
seriesLabels: lbls,
e: exemplar.Exemplar{
Labels: labels.Labels{{Name: "traceID", Value: fmt.Sprintf("%d", rand.Int())}},
Value: rand.Float64(),
Ts: ts,
},
}
expExemplars = append(expExemplars, e)
_, err := app.AppendExemplar(ref, e.seriesLabels, e.e)
require.NoError(t, err)
}
checkSamples := func() {
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"))
require.Equal(t, expSeries, series)
}
checkTombstones := func() {
tr, err := head.Tombstones()
require.NoError(t, err)
actTombstones := make(map[uint64]tombstones.Intervals)
require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error {
for _, itv := range itvs {
actTombstones[ref].Add(itv)
}
return nil
}))
require.Equal(t, expTombstones, actTombstones)
}
checkExemplars := func() {
actExemplars := make([]ex, 0, len(expExemplars))
err := head.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error {
actExemplars = append(actExemplars, ex{
seriesLabels: seriesLabels,
e: e,
})
return nil
})
require.NoError(t, err)
// Verifies both existence of right exemplars and order of exemplars in the buffer.
require.Equal(t, expExemplars, actExemplars)
}
{ // Initial data that goes into snapshot.
// Add some initial samples with >=1 m-map chunk.
app := head.Appender(context.Background())
@ -2509,11 +2562,12 @@ func TestChunkSnapshot(t *testing.T) {
for ts := int64(1); ts <= 200; ts++ {
val := rand.Float64()
expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val})
_, err := app.Append(0, lbls, ts, val)
ref, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
// To create multiple WAL records.
// Add an exemplar and to create multiple WAL records.
if ts%10 == 0 {
addExemplar(app, ref, lbls, ts)
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
@ -2538,6 +2592,7 @@ func TestChunkSnapshot(t *testing.T) {
}, nil))
require.NoError(t, err)
}
}
// These references should be the ones used for the snapshot.
@ -2563,22 +2618,9 @@ func TestChunkSnapshot(t *testing.T) {
require.NoError(t, head.Init(math.MinInt64))
// Test query for snapshot replay.
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"))
require.Equal(t, expSeries, series)
// Check the tombstones.
tr, err := head.Tombstones()
require.NoError(t, err)
actTombstones := make(map[uint64]tombstones.Intervals)
require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error {
for _, itv := range itvs {
actTombstones[ref].Add(itv)
}
return nil
}))
require.Equal(t, expTombstones, actTombstones)
checkSamples()
checkTombstones()
checkExemplars()
}
{ // Additional data to only include in WAL and m-mapped chunks and not snapshot. This mimics having an old snapshot on disk.
@ -2592,11 +2634,12 @@ func TestChunkSnapshot(t *testing.T) {
for ts := int64(201); ts <= 400; ts++ {
val := rand.Float64()
expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val})
_, err := app.Append(0, lbls, ts, val)
ref, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
// To create multiple WAL records.
// Add an exemplar and to create multiple WAL records.
if ts%10 == 0 {
addExemplar(app, ref, lbls, ts)
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
@ -2643,22 +2686,9 @@ func TestChunkSnapshot(t *testing.T) {
require.NoError(t, head.Init(math.MinInt64))
// Test query when data is replayed from snapshot, m-map chunks, and WAL.
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"))
require.Equal(t, expSeries, series)
// Check the tombstones.
tr, err := head.Tombstones()
require.NoError(t, err)
actTombstones := make(map[uint64]tombstones.Intervals)
require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error {
for _, itv := range itvs {
actTombstones[ref].Add(itv)
}
return nil
}))
require.Equal(t, expTombstones, actTombstones)
checkSamples()
checkTombstones()
checkExemplars()
}
}

View file

@ -413,6 +413,7 @@ func (h *Head) processWALSamples(
const (
chunkSnapshotRecordTypeSeries uint8 = 1
chunkSnapshotRecordTypeTombstones uint8 = 2
chunkSnapshotRecordTypeExemplars uint8 = 3
)
type chunkSnapshotRecord struct {
@ -537,6 +538,10 @@ const chunkSnapshotPrefix = "chunk_snapshot."
// The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written
// using the WAL package. N is the last WAL segment present during snapshotting and
// M is the offset in segment N upto which data was written.
//
// The snapshot first contains all series (each in individual records and not sorted), followed by
// tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they
// were written to the circular buffer.
func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
if h.wal == nil {
// If we are not storing any WAL, does not make sense to take a snapshot too.
@ -587,6 +592,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
buf []byte
recs [][]byte
)
// Add all series to the snapshot.
stripeSize := h.series.size
for i := 0; i < stripeSize; i++ {
h.series.locks[i].RLock()
@ -622,11 +628,61 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
return stats, errors.Wrap(err, "encode tombstones")
}
recs = append(recs, rec)
// Flush remaining records.
// Flush remaining series records and tombstones.
if err := cp.Log(recs...); err != nil {
return stats, errors.Wrap(err, "flush records")
}
buf = buf[:0]
// Add exemplars in the snapshot.
// We log in batches, with each record having upto 10000 exemplars.
// Assuming 100 bytes (overestimate) per exemplar, that's ~1MB.
maxExemplarsPerRecord := 10000
batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord)
enc := record.Encoder{}
flushExemplars := func() error {
if len(batch) == 0 {
return nil
}
buf = buf[:0]
encbuf := encoding.Encbuf{B: buf}
encbuf.PutByte(chunkSnapshotRecordTypeExemplars)
enc.EncodeExemplarsIntoBuffer(batch, &encbuf)
if err := cp.Log(encbuf.Get()); err != nil {
return errors.Wrap(err, "log exemplars")
}
buf, batch = buf[:0], batch[:0]
return nil
}
err = h.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error {
if len(batch) >= maxExemplarsPerRecord {
if err := flushExemplars(); err != nil {
return errors.Wrap(err, "flush exemplars")
}
}
ms := h.series.getByHash(seriesLabels.Hash(), seriesLabels)
if ms == nil {
// It is possible that exemplar refers to some old series. We discard such exemplars.
return nil
}
batch = append(batch, record.RefExemplar{
Ref: ms.ref,
T: e.Ts,
V: e.Value,
Labels: e.Labels,
})
return nil
})
if err != nil {
return stats, errors.Wrap(err, "iterate exemplars")
}
// Flush remaining exemplars.
if err := flushExemplars(); err != nil {
return stats, errors.Wrap(err, "flush exemplars at the end")
}
if err := cp.Close(); err != nil {
return stats, errors.Wrap(err, "close chunk snapshot")
}
@ -766,6 +822,9 @@ func (h *Head) loadChunkSnapshot() (int, int, map[uint64]*memSeries, error) {
recordChan = make(chan chunkSnapshotRecord, 5*n)
shardedRefSeries = make([]map[uint64]*memSeries, n)
errChan = make(chan error, n)
refSeries map[uint64]*memSeries
exemplarBuf []record.RefExemplar
dec record.Decoder
)
wg.Add(n)
@ -852,15 +911,58 @@ Outer:
loopErr = errors.Wrap(err, "iterate tombstones")
break Outer
}
case chunkSnapshotRecordTypeExemplars:
// Exemplars are at the end of snapshot. So all series are loaded at this point.
if len(refSeries) == 0 {
close(recordChan)
wg.Wait()
refSeries = make(map[uint64]*memSeries, numSeries)
for _, shard := range shardedRefSeries {
for k, v := range shard {
refSeries[k] = v
}
}
}
decbuf := encoding.Decbuf{B: rec[1:]}
exemplarBuf = exemplarBuf[:0]
exemplarBuf, err = dec.ExemplarsFromBuffer(&decbuf, exemplarBuf)
if err != nil {
loopErr = errors.Wrap(err, "exemplars from buffer")
break Outer
}
for _, e := range exemplarBuf {
ms, ok := refSeries[e.Ref]
if !ok {
unknownRefs++
continue
}
if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{
Labels: e.Labels,
Value: e.V,
Ts: e.T,
}); err != nil {
loopErr = errors.Wrap(err, "append exemplar")
break Outer
}
}
default:
// This is a record type we don't understand. It is either and old format from earlier versions,
// or a new format and the code was rolled back to old version.
loopErr = errors.Errorf("unsuported snapshot record type 0b%b", rec[0])
break Outer
}
}
close(recordChan)
wg.Wait()
if len(refSeries) == 0 {
close(recordChan)
wg.Wait()
}
close(errChan)
merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop"))
@ -875,10 +977,13 @@ Outer:
return -1, -1, nil, errors.Wrap(r.Err(), "read records")
}
refSeries := make(map[uint64]*memSeries, numSeries)
for _, shard := range shardedRefSeries {
for k, v := range shard {
refSeries[k] = v
if len(refSeries) == 0 {
// We had no exemplar record, so we have to build the map here.
refSeries = make(map[uint64]*memSeries, numSeries)
for _, shard := range shardedRefSeries {
for k, v := range shard {
refSeries[k] = v
}
}
}

View file

@ -182,6 +182,11 @@ func (d *Decoder) Exemplars(rec []byte, exemplars []RefExemplar) ([]RefExemplar,
if t != Exemplars {
return nil, errors.New("invalid record type")
}
return d.ExemplarsFromBuffer(&dec, exemplars)
}
func (d *Decoder) ExemplarsFromBuffer(dec *encoding.Decbuf, exemplars []RefExemplar) ([]RefExemplar, error) {
if dec.Len() == 0 {
return exemplars, nil
}
@ -287,6 +292,12 @@ func (e *Encoder) Exemplars(exemplars []RefExemplar, b []byte) []byte {
return buf.Get()
}
e.EncodeExemplarsIntoBuffer(exemplars, &buf)
return buf.Get()
}
func (e *Encoder) EncodeExemplarsIntoBuffer(exemplars []RefExemplar, buf *encoding.Encbuf) {
// Store base timestamp and base reference number of first sample.
// All samples encode their timestamp and ref as delta to those.
first := exemplars[0]
@ -305,6 +316,4 @@ func (e *Encoder) Exemplars(exemplars []RefExemplar, b []byte) []byte {
buf.PutUvarintStr(l.Value)
}
}
return buf.Get()
}