diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 8eb218b5ac..2828106204 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -3230,8 +3230,12 @@ func TestChunkSnapshot(t *testing.T) { numSeries := 10 expSeries := make(map[string][]tsdbutil.Sample) + expHist := make(map[string][]tsdbutil.Sample) + expFloatHist := make(map[string][]tsdbutil.Sample) expTombstones := make(map[storage.SeriesRef]tombstones.Intervals) expExemplars := make([]ex, 0) + histograms := tsdbutil.GenerateTestGaugeHistograms(481) + floatHistogram := tsdbutil.GenerateTestGaugeFloatHistograms(481) addExemplar := func(app storage.Appender, ref storage.SeriesRef, lbls labels.Labels, ts int64) { e := ex{ @@ -3250,9 +3254,21 @@ func TestChunkSnapshot(t *testing.T) { checkSamples := func() { q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) require.NoError(t, err) - series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*")) + series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) require.Equal(t, expSeries, series) } + checkHistograms := func() { + q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "hist", "baz.*")) + require.Equal(t, expHist, series) + } + checkFloatHistograms := func() { + q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "floathist", "bat.*")) + require.Equal(t, expFloatHist, series) + } checkTombstones := func() { tr, err := head.Tombstones() require.NoError(t, err) @@ -3301,6 +3317,8 @@ func TestChunkSnapshot(t *testing.T) { require.NoError(t, head.Init(math.MinInt64)) checkSamples() + checkHistograms() + checkFloatHistograms() checkTombstones() checkExemplars() } @@ -3311,6 +3329,11 @@ func TestChunkSnapshot(t *testing.T) { for i := 1; i <= numSeries; i++ { lbls := labels.FromStrings("foo", fmt.Sprintf("bar%d", i)) lblStr := lbls.String() + lblsHist := labels.FromStrings("hist", fmt.Sprintf("baz%d", i)) + lblsHistStr := lblsHist.String() + lblsFloatHist := labels.FromStrings("floathist", fmt.Sprintf("bat%d", i)) + lblsFloatHistStr := lblsFloatHist.String() + // 240 samples should m-map at least 1 chunk. for ts := int64(1); ts <= 240; ts++ { val := rand.Float64() @@ -3318,6 +3341,16 @@ func TestChunkSnapshot(t *testing.T) { ref, err := app.Append(0, lbls, ts, val) require.NoError(t, err) + hist := histograms[int(ts)] + expHist[lblsHistStr] = append(expHist[lblsHistStr], sample{ts, 0, hist, nil}) + _, err = app.AppendHistogram(0, lblsHist, ts, hist, nil) + require.NoError(t, err) + + floatHist := floatHistogram[int(ts)] + expFloatHist[lblsFloatHistStr] = append(expFloatHist[lblsFloatHistStr], sample{ts, 0, nil, floatHist}) + _, err = app.AppendHistogram(0, lblsFloatHist, ts, nil, floatHist) + require.NoError(t, err) + // Add an exemplar and to create multiple WAL records. if ts%10 == 0 { addExemplar(app, ref, lbls, ts) @@ -3371,6 +3404,11 @@ func TestChunkSnapshot(t *testing.T) { for i := 1; i <= numSeries; i++ { lbls := labels.FromStrings("foo", fmt.Sprintf("bar%d", i)) lblStr := lbls.String() + lblsHist := labels.FromStrings("hist", fmt.Sprintf("baz%d", i)) + lblsHistStr := lblsHist.String() + lblsFloatHist := labels.FromStrings("floathist", fmt.Sprintf("bat%d", i)) + lblsFloatHistStr := lblsFloatHist.String() + // 240 samples should m-map at least 1 chunk. for ts := int64(241); ts <= 480; ts++ { val := rand.Float64() @@ -3378,6 +3416,16 @@ func TestChunkSnapshot(t *testing.T) { ref, err := app.Append(0, lbls, ts, val) require.NoError(t, err) + hist := histograms[int(ts)] + expHist[lblsHistStr] = append(expHist[lblsHistStr], sample{ts, 0, hist, nil}) + _, err = app.AppendHistogram(0, lblsHist, ts, hist, nil) + require.NoError(t, err) + + floatHist := floatHistogram[int(ts)] + expFloatHist[lblsFloatHistStr] = append(expFloatHist[lblsFloatHistStr], sample{ts, 0, nil, floatHist}) + _, err = app.AppendHistogram(0, lblsFloatHist, ts, nil, floatHist) + require.NoError(t, err) + // Add an exemplar and to create multiple WAL records. if ts%10 == 0 { addExemplar(app, ref, lbls, ts) @@ -3468,6 +3516,19 @@ func TestSnapshotError(t *testing.T) { lbls := labels.FromStrings("foo", "bar") _, err := app.Append(0, lbls, 99, 99) require.NoError(t, err) + + // Add histograms + hist := tsdbutil.GenerateTestGaugeHistograms(1)[0] + floatHist := tsdbutil.GenerateTestGaugeFloatHistograms(1)[0] + lblsHist := labels.FromStrings("hist", "bar") + lblsFloatHist := labels.FromStrings("floathist", "bar") + + _, err = app.AppendHistogram(0, lblsHist, 99, hist, nil) + require.NoError(t, err) + + _, err = app.AppendHistogram(0, lblsFloatHist, 99, nil, floatHist) + require.NoError(t, err) + require.NoError(t, app.Commit()) // Add some tombstones. diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 71120c55e1..2fe33befba 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -943,10 +943,12 @@ const ( ) type chunkSnapshotRecord struct { - ref chunks.HeadSeriesRef - lset labels.Labels - mc *memChunk - lastValue float64 + ref chunks.HeadSeriesRef + lset labels.Labels + mc *memChunk + lastValue float64 + lastHistogramValue *histogram.Histogram + lastFloatHistogramValue *histogram.FloatHistogram } func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { @@ -961,18 +963,27 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { if s.headChunk == nil { buf.PutUvarint(0) } else { + enc := s.headChunk.chunk.Encoding() buf.PutUvarint(1) buf.PutBE64int64(s.headChunk.minTime) buf.PutBE64int64(s.headChunk.maxTime) - buf.PutByte(byte(s.headChunk.chunk.Encoding())) + buf.PutByte(byte(enc)) buf.PutUvarintBytes(s.headChunk.chunk.Bytes()) - // Backwards compatibility for old sampleBuf which had last 4 samples. - for i := 0; i < 3; i++ { + + switch enc { + case chunkenc.EncXOR: + // Backwards compatibility for old sampleBuf which had last 4 samples. + for i := 0; i < 3; i++ { + buf.PutBE64int64(0) + buf.PutBEFloat64(0) + } buf.PutBE64int64(0) - buf.PutBEFloat64(0) + buf.PutBEFloat64(s.lastValue) + case chunkenc.EncHistogram: + record.EncodeHistogram(&buf, s.lastHistogramValue) + default: // chunkenc.FloatHistogram. + record.EncodeFloatHistogram(&buf, s.lastFloatHistogramValue) } - buf.PutBE64int64(0) - buf.PutBEFloat64(s.lastValue) } s.Unlock() @@ -1012,13 +1023,22 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh } csr.mc.chunk = chk - // Backwards-compatibility for old sampleBuf which had last 4 samples. - for i := 0; i < 3; i++ { + switch enc { + case chunkenc.EncXOR: + // Backwards-compatibility for old sampleBuf which had last 4 samples. + for i := 0; i < 3; i++ { + _ = dec.Be64int64() + _ = dec.Be64Float64() + } _ = dec.Be64int64() - _ = dec.Be64Float64() + csr.lastValue = dec.Be64Float64() + case chunkenc.EncHistogram: + csr.lastHistogramValue = &histogram.Histogram{} + record.DecodeHistogram(&dec, csr.lastHistogramValue) + default: // chunkenc.FloatHistogram. + csr.lastFloatHistogramValue = &histogram.FloatHistogram{} + record.DecodeFloatHistogram(&dec, csr.lastFloatHistogramValue) } - _ = dec.Be64int64() - csr.lastValue = dec.Be64Float64() err = dec.Err() if err != nil && len(dec.B) > 0 { @@ -1396,6 +1416,8 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie series.nextAt = csr.mc.maxTime // This will create a new chunk on append. series.headChunk = csr.mc series.lastValue = csr.lastValue + series.lastHistogramValue = csr.lastHistogramValue + series.lastFloatHistogramValue = csr.lastFloatHistogramValue app, err := series.headChunk.chunk.Appender() if err != nil { diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 231b8b3c13..4cd51d46c0 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -441,49 +441,7 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) H: &histogram.Histogram{}, } - rh.H.CounterResetHint = histogram.CounterResetHint(dec.Byte()) - - rh.H.Schema = int32(dec.Varint64()) - rh.H.ZeroThreshold = math.Float64frombits(dec.Be64()) - - rh.H.ZeroCount = dec.Uvarint64() - rh.H.Count = dec.Uvarint64() - rh.H.Sum = math.Float64frombits(dec.Be64()) - - l := dec.Uvarint() - if l > 0 { - rh.H.PositiveSpans = make([]histogram.Span, l) - } - for i := range rh.H.PositiveSpans { - rh.H.PositiveSpans[i].Offset = int32(dec.Varint64()) - rh.H.PositiveSpans[i].Length = dec.Uvarint32() - } - - l = dec.Uvarint() - if l > 0 { - rh.H.NegativeSpans = make([]histogram.Span, l) - } - for i := range rh.H.NegativeSpans { - rh.H.NegativeSpans[i].Offset = int32(dec.Varint64()) - rh.H.NegativeSpans[i].Length = dec.Uvarint32() - } - - l = dec.Uvarint() - if l > 0 { - rh.H.PositiveBuckets = make([]int64, l) - } - for i := range rh.H.PositiveBuckets { - rh.H.PositiveBuckets[i] = dec.Varint64() - } - - l = dec.Uvarint() - if l > 0 { - rh.H.NegativeBuckets = make([]int64, l) - } - for i := range rh.H.NegativeBuckets { - rh.H.NegativeBuckets[i] = dec.Varint64() - } - + DecodeHistogram(&dec, rh.H) histograms = append(histograms, rh) } @@ -496,6 +454,52 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) return histograms, nil } +// DecodeHistogram decodes a Histogram from a byte slice. +func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) { + h.CounterResetHint = histogram.CounterResetHint(buf.Byte()) + + h.Schema = int32(buf.Varint64()) + h.ZeroThreshold = math.Float64frombits(buf.Be64()) + + h.ZeroCount = buf.Uvarint64() + h.Count = buf.Uvarint64() + h.Sum = math.Float64frombits(buf.Be64()) + + l := buf.Uvarint() + if l > 0 { + h.PositiveSpans = make([]histogram.Span, l) + } + for i := range h.PositiveSpans { + h.PositiveSpans[i].Offset = int32(buf.Varint64()) + h.PositiveSpans[i].Length = buf.Uvarint32() + } + + l = buf.Uvarint() + if l > 0 { + h.NegativeSpans = make([]histogram.Span, l) + } + for i := range h.NegativeSpans { + h.NegativeSpans[i].Offset = int32(buf.Varint64()) + h.NegativeSpans[i].Length = buf.Uvarint32() + } + + l = buf.Uvarint() + if l > 0 { + h.PositiveBuckets = make([]int64, l) + } + for i := range h.PositiveBuckets { + h.PositiveBuckets[i] = buf.Varint64() + } + + l = buf.Uvarint() + if l > 0 { + h.NegativeBuckets = make([]int64, l) + } + for i := range h.NegativeBuckets { + h.NegativeBuckets[i] = buf.Varint64() + } +} + func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) { dec := encoding.Decbuf{B: rec} t := Type(dec.Byte()) @@ -519,49 +523,7 @@ func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogr FH: &histogram.FloatHistogram{}, } - rh.FH.CounterResetHint = histogram.CounterResetHint(dec.Byte()) - - rh.FH.Schema = int32(dec.Varint64()) - rh.FH.ZeroThreshold = dec.Be64Float64() - - rh.FH.ZeroCount = dec.Be64Float64() - rh.FH.Count = dec.Be64Float64() - rh.FH.Sum = dec.Be64Float64() - - l := dec.Uvarint() - if l > 0 { - rh.FH.PositiveSpans = make([]histogram.Span, l) - } - for i := range rh.FH.PositiveSpans { - rh.FH.PositiveSpans[i].Offset = int32(dec.Varint64()) - rh.FH.PositiveSpans[i].Length = dec.Uvarint32() - } - - l = dec.Uvarint() - if l > 0 { - rh.FH.NegativeSpans = make([]histogram.Span, l) - } - for i := range rh.FH.NegativeSpans { - rh.FH.NegativeSpans[i].Offset = int32(dec.Varint64()) - rh.FH.NegativeSpans[i].Length = dec.Uvarint32() - } - - l = dec.Uvarint() - if l > 0 { - rh.FH.PositiveBuckets = make([]float64, l) - } - for i := range rh.FH.PositiveBuckets { - rh.FH.PositiveBuckets[i] = dec.Be64Float64() - } - - l = dec.Uvarint() - if l > 0 { - rh.FH.NegativeBuckets = make([]float64, l) - } - for i := range rh.FH.NegativeBuckets { - rh.FH.NegativeBuckets[i] = dec.Be64Float64() - } - + DecodeFloatHistogram(&dec, rh.FH) histograms = append(histograms, rh) } @@ -574,6 +536,52 @@ func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogr return histograms, nil } +// Decode decodes a Histogram from a byte slice. +func DecodeFloatHistogram(buf *encoding.Decbuf, fh *histogram.FloatHistogram) { + fh.CounterResetHint = histogram.CounterResetHint(buf.Byte()) + + fh.Schema = int32(buf.Varint64()) + fh.ZeroThreshold = buf.Be64Float64() + + fh.ZeroCount = buf.Be64Float64() + fh.Count = buf.Be64Float64() + fh.Sum = buf.Be64Float64() + + l := buf.Uvarint() + if l > 0 { + fh.PositiveSpans = make([]histogram.Span, l) + } + for i := range fh.PositiveSpans { + fh.PositiveSpans[i].Offset = int32(buf.Varint64()) + fh.PositiveSpans[i].Length = buf.Uvarint32() + } + + l = buf.Uvarint() + if l > 0 { + fh.NegativeSpans = make([]histogram.Span, l) + } + for i := range fh.NegativeSpans { + fh.NegativeSpans[i].Offset = int32(buf.Varint64()) + fh.NegativeSpans[i].Length = buf.Uvarint32() + } + + l = buf.Uvarint() + if l > 0 { + fh.PositiveBuckets = make([]float64, l) + } + for i := range fh.PositiveBuckets { + fh.PositiveBuckets[i] = buf.Be64Float64() + } + + l = buf.Uvarint() + if l > 0 { + fh.NegativeBuckets = make([]float64, l) + } + for i := range fh.NegativeBuckets { + fh.NegativeBuckets[i] = buf.Be64Float64() + } +} + // Encoder encodes series, sample, and tombstones records. // The zero value is ready to use. type Encoder struct{} @@ -719,41 +727,46 @@ func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) [] buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) buf.PutVarint64(h.T - first.T) - buf.PutByte(byte(h.H.CounterResetHint)) - - buf.PutVarint64(int64(h.H.Schema)) - buf.PutBE64(math.Float64bits(h.H.ZeroThreshold)) - - buf.PutUvarint64(h.H.ZeroCount) - buf.PutUvarint64(h.H.Count) - buf.PutBE64(math.Float64bits(h.H.Sum)) - - buf.PutUvarint(len(h.H.PositiveSpans)) - for _, s := range h.H.PositiveSpans { - buf.PutVarint64(int64(s.Offset)) - buf.PutUvarint32(s.Length) - } - - buf.PutUvarint(len(h.H.NegativeSpans)) - for _, s := range h.H.NegativeSpans { - buf.PutVarint64(int64(s.Offset)) - buf.PutUvarint32(s.Length) - } - - buf.PutUvarint(len(h.H.PositiveBuckets)) - for _, b := range h.H.PositiveBuckets { - buf.PutVarint64(b) - } - - buf.PutUvarint(len(h.H.NegativeBuckets)) - for _, b := range h.H.NegativeBuckets { - buf.PutVarint64(b) - } + EncodeHistogram(&buf, h.H) } return buf.Get() } +// EncodeHistogram encodes a Histogram into a byte slice. +func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) { + buf.PutByte(byte(h.CounterResetHint)) + + buf.PutVarint64(int64(h.Schema)) + buf.PutBE64(math.Float64bits(h.ZeroThreshold)) + + buf.PutUvarint64(h.ZeroCount) + buf.PutUvarint64(h.Count) + buf.PutBE64(math.Float64bits(h.Sum)) + + buf.PutUvarint(len(h.PositiveSpans)) + for _, s := range h.PositiveSpans { + buf.PutVarint64(int64(s.Offset)) + buf.PutUvarint32(s.Length) + } + + buf.PutUvarint(len(h.NegativeSpans)) + for _, s := range h.NegativeSpans { + buf.PutVarint64(int64(s.Offset)) + buf.PutUvarint32(s.Length) + } + + buf.PutUvarint(len(h.PositiveBuckets)) + for _, b := range h.PositiveBuckets { + buf.PutVarint64(b) + } + + buf.PutUvarint(len(h.NegativeBuckets)) + for _, b := range h.NegativeBuckets { + buf.PutVarint64(b) + } +} + func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte { buf := encoding.Encbuf{B: b} buf.PutByte(byte(FloatHistogramSamples)) @@ -772,37 +785,42 @@ func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) buf.PutVarint64(h.T - first.T) - buf.PutByte(byte(h.FH.CounterResetHint)) - - buf.PutVarint64(int64(h.FH.Schema)) - buf.PutBEFloat64(h.FH.ZeroThreshold) - - buf.PutBEFloat64(h.FH.ZeroCount) - buf.PutBEFloat64(h.FH.Count) - buf.PutBEFloat64(h.FH.Sum) - - buf.PutUvarint(len(h.FH.PositiveSpans)) - for _, s := range h.FH.PositiveSpans { - buf.PutVarint64(int64(s.Offset)) - buf.PutUvarint32(s.Length) - } - - buf.PutUvarint(len(h.FH.NegativeSpans)) - for _, s := range h.FH.NegativeSpans { - buf.PutVarint64(int64(s.Offset)) - buf.PutUvarint32(s.Length) - } - - buf.PutUvarint(len(h.FH.PositiveBuckets)) - for _, b := range h.FH.PositiveBuckets { - buf.PutBEFloat64(b) - } - - buf.PutUvarint(len(h.FH.NegativeBuckets)) - for _, b := range h.FH.NegativeBuckets { - buf.PutBEFloat64(b) - } + EncodeFloatHistogram(&buf, h.FH) } return buf.Get() } + +// Encode encodes the Float Histogram into a byte slice. +func EncodeFloatHistogram(buf *encoding.Encbuf, h *histogram.FloatHistogram) { + buf.PutByte(byte(h.CounterResetHint)) + + buf.PutVarint64(int64(h.Schema)) + buf.PutBEFloat64(h.ZeroThreshold) + + buf.PutBEFloat64(h.ZeroCount) + buf.PutBEFloat64(h.Count) + buf.PutBEFloat64(h.Sum) + + buf.PutUvarint(len(h.PositiveSpans)) + for _, s := range h.PositiveSpans { + buf.PutVarint64(int64(s.Offset)) + buf.PutUvarint32(s.Length) + } + + buf.PutUvarint(len(h.NegativeSpans)) + for _, s := range h.NegativeSpans { + buf.PutVarint64(int64(s.Offset)) + buf.PutUvarint32(s.Length) + } + + buf.PutUvarint(len(h.PositiveBuckets)) + for _, b := range h.PositiveBuckets { + buf.PutBEFloat64(b) + } + + buf.PutUvarint(len(h.NegativeBuckets)) + for _, b := range h.NegativeBuckets { + buf.PutBEFloat64(b) + } +}