tsdb: Support native histograms in snapshot on shutdown (#12258)

Signed-off-by: Marc Tuduri <marctc@protonmail.com>
This commit is contained in:
Marc Tudurí 2023-07-05 11:44:13 +02:00 committed by GitHub
parent f06f899a6c
commit 4851ced266
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 263 additions and 162 deletions

View file

@ -3230,8 +3230,12 @@ func TestChunkSnapshot(t *testing.T) {
numSeries := 10
expSeries := make(map[string][]tsdbutil.Sample)
expHist := make(map[string][]tsdbutil.Sample)
expFloatHist := make(map[string][]tsdbutil.Sample)
expTombstones := make(map[storage.SeriesRef]tombstones.Intervals)
expExemplars := make([]ex, 0)
histograms := tsdbutil.GenerateTestGaugeHistograms(481)
floatHistogram := tsdbutil.GenerateTestGaugeFloatHistograms(481)
addExemplar := func(app storage.Appender, ref storage.SeriesRef, lbls labels.Labels, ts int64) {
e := ex{
@ -3250,9 +3254,21 @@ func TestChunkSnapshot(t *testing.T) {
checkSamples := func() {
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"))
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expSeries, series)
}
checkHistograms := func() {
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "hist", "baz.*"))
require.Equal(t, expHist, series)
}
checkFloatHistograms := func() {
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "floathist", "bat.*"))
require.Equal(t, expFloatHist, series)
}
checkTombstones := func() {
tr, err := head.Tombstones()
require.NoError(t, err)
@ -3301,6 +3317,8 @@ func TestChunkSnapshot(t *testing.T) {
require.NoError(t, head.Init(math.MinInt64))
checkSamples()
checkHistograms()
checkFloatHistograms()
checkTombstones()
checkExemplars()
}
@ -3311,6 +3329,11 @@ func TestChunkSnapshot(t *testing.T) {
for i := 1; i <= numSeries; i++ {
lbls := labels.FromStrings("foo", fmt.Sprintf("bar%d", i))
lblStr := lbls.String()
lblsHist := labels.FromStrings("hist", fmt.Sprintf("baz%d", i))
lblsHistStr := lblsHist.String()
lblsFloatHist := labels.FromStrings("floathist", fmt.Sprintf("bat%d", i))
lblsFloatHistStr := lblsFloatHist.String()
// 240 samples should m-map at least 1 chunk.
for ts := int64(1); ts <= 240; ts++ {
val := rand.Float64()
@ -3318,6 +3341,16 @@ func TestChunkSnapshot(t *testing.T) {
ref, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
hist := histograms[int(ts)]
expHist[lblsHistStr] = append(expHist[lblsHistStr], sample{ts, 0, hist, nil})
_, err = app.AppendHistogram(0, lblsHist, ts, hist, nil)
require.NoError(t, err)
floatHist := floatHistogram[int(ts)]
expFloatHist[lblsFloatHistStr] = append(expFloatHist[lblsFloatHistStr], sample{ts, 0, nil, floatHist})
_, err = app.AppendHistogram(0, lblsFloatHist, ts, nil, floatHist)
require.NoError(t, err)
// Add an exemplar and to create multiple WAL records.
if ts%10 == 0 {
addExemplar(app, ref, lbls, ts)
@ -3371,6 +3404,11 @@ func TestChunkSnapshot(t *testing.T) {
for i := 1; i <= numSeries; i++ {
lbls := labels.FromStrings("foo", fmt.Sprintf("bar%d", i))
lblStr := lbls.String()
lblsHist := labels.FromStrings("hist", fmt.Sprintf("baz%d", i))
lblsHistStr := lblsHist.String()
lblsFloatHist := labels.FromStrings("floathist", fmt.Sprintf("bat%d", i))
lblsFloatHistStr := lblsFloatHist.String()
// 240 samples should m-map at least 1 chunk.
for ts := int64(241); ts <= 480; ts++ {
val := rand.Float64()
@ -3378,6 +3416,16 @@ func TestChunkSnapshot(t *testing.T) {
ref, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
hist := histograms[int(ts)]
expHist[lblsHistStr] = append(expHist[lblsHistStr], sample{ts, 0, hist, nil})
_, err = app.AppendHistogram(0, lblsHist, ts, hist, nil)
require.NoError(t, err)
floatHist := floatHistogram[int(ts)]
expFloatHist[lblsFloatHistStr] = append(expFloatHist[lblsFloatHistStr], sample{ts, 0, nil, floatHist})
_, err = app.AppendHistogram(0, lblsFloatHist, ts, nil, floatHist)
require.NoError(t, err)
// Add an exemplar and to create multiple WAL records.
if ts%10 == 0 {
addExemplar(app, ref, lbls, ts)
@ -3468,6 +3516,19 @@ func TestSnapshotError(t *testing.T) {
lbls := labels.FromStrings("foo", "bar")
_, err := app.Append(0, lbls, 99, 99)
require.NoError(t, err)
// Add histograms
hist := tsdbutil.GenerateTestGaugeHistograms(1)[0]
floatHist := tsdbutil.GenerateTestGaugeFloatHistograms(1)[0]
lblsHist := labels.FromStrings("hist", "bar")
lblsFloatHist := labels.FromStrings("floathist", "bar")
_, err = app.AppendHistogram(0, lblsHist, 99, hist, nil)
require.NoError(t, err)
_, err = app.AppendHistogram(0, lblsFloatHist, 99, nil, floatHist)
require.NoError(t, err)
require.NoError(t, app.Commit())
// Add some tombstones.

View file

@ -943,10 +943,12 @@ const (
)
type chunkSnapshotRecord struct {
ref chunks.HeadSeriesRef
lset labels.Labels
mc *memChunk
lastValue float64
ref chunks.HeadSeriesRef
lset labels.Labels
mc *memChunk
lastValue float64
lastHistogramValue *histogram.Histogram
lastFloatHistogramValue *histogram.FloatHistogram
}
func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
@ -961,18 +963,27 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
if s.headChunk == nil {
buf.PutUvarint(0)
} else {
enc := s.headChunk.chunk.Encoding()
buf.PutUvarint(1)
buf.PutBE64int64(s.headChunk.minTime)
buf.PutBE64int64(s.headChunk.maxTime)
buf.PutByte(byte(s.headChunk.chunk.Encoding()))
buf.PutByte(byte(enc))
buf.PutUvarintBytes(s.headChunk.chunk.Bytes())
// Backwards compatibility for old sampleBuf which had last 4 samples.
for i := 0; i < 3; i++ {
switch enc {
case chunkenc.EncXOR:
// Backwards compatibility for old sampleBuf which had last 4 samples.
for i := 0; i < 3; i++ {
buf.PutBE64int64(0)
buf.PutBEFloat64(0)
}
buf.PutBE64int64(0)
buf.PutBEFloat64(0)
buf.PutBEFloat64(s.lastValue)
case chunkenc.EncHistogram:
record.EncodeHistogram(&buf, s.lastHistogramValue)
default: // chunkenc.FloatHistogram.
record.EncodeFloatHistogram(&buf, s.lastFloatHistogramValue)
}
buf.PutBE64int64(0)
buf.PutBEFloat64(s.lastValue)
}
s.Unlock()
@ -1012,13 +1023,22 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh
}
csr.mc.chunk = chk
// Backwards-compatibility for old sampleBuf which had last 4 samples.
for i := 0; i < 3; i++ {
switch enc {
case chunkenc.EncXOR:
// Backwards-compatibility for old sampleBuf which had last 4 samples.
for i := 0; i < 3; i++ {
_ = dec.Be64int64()
_ = dec.Be64Float64()
}
_ = dec.Be64int64()
_ = dec.Be64Float64()
csr.lastValue = dec.Be64Float64()
case chunkenc.EncHistogram:
csr.lastHistogramValue = &histogram.Histogram{}
record.DecodeHistogram(&dec, csr.lastHistogramValue)
default: // chunkenc.FloatHistogram.
csr.lastFloatHistogramValue = &histogram.FloatHistogram{}
record.DecodeFloatHistogram(&dec, csr.lastFloatHistogramValue)
}
_ = dec.Be64int64()
csr.lastValue = dec.Be64Float64()
err = dec.Err()
if err != nil && len(dec.B) > 0 {
@ -1396,6 +1416,8 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
series.nextAt = csr.mc.maxTime // This will create a new chunk on append.
series.headChunk = csr.mc
series.lastValue = csr.lastValue
series.lastHistogramValue = csr.lastHistogramValue
series.lastFloatHistogramValue = csr.lastFloatHistogramValue
app, err := series.headChunk.chunk.Appender()
if err != nil {

View file

@ -441,49 +441,7 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample)
H: &histogram.Histogram{},
}
rh.H.CounterResetHint = histogram.CounterResetHint(dec.Byte())
rh.H.Schema = int32(dec.Varint64())
rh.H.ZeroThreshold = math.Float64frombits(dec.Be64())
rh.H.ZeroCount = dec.Uvarint64()
rh.H.Count = dec.Uvarint64()
rh.H.Sum = math.Float64frombits(dec.Be64())
l := dec.Uvarint()
if l > 0 {
rh.H.PositiveSpans = make([]histogram.Span, l)
}
for i := range rh.H.PositiveSpans {
rh.H.PositiveSpans[i].Offset = int32(dec.Varint64())
rh.H.PositiveSpans[i].Length = dec.Uvarint32()
}
l = dec.Uvarint()
if l > 0 {
rh.H.NegativeSpans = make([]histogram.Span, l)
}
for i := range rh.H.NegativeSpans {
rh.H.NegativeSpans[i].Offset = int32(dec.Varint64())
rh.H.NegativeSpans[i].Length = dec.Uvarint32()
}
l = dec.Uvarint()
if l > 0 {
rh.H.PositiveBuckets = make([]int64, l)
}
for i := range rh.H.PositiveBuckets {
rh.H.PositiveBuckets[i] = dec.Varint64()
}
l = dec.Uvarint()
if l > 0 {
rh.H.NegativeBuckets = make([]int64, l)
}
for i := range rh.H.NegativeBuckets {
rh.H.NegativeBuckets[i] = dec.Varint64()
}
DecodeHistogram(&dec, rh.H)
histograms = append(histograms, rh)
}
@ -496,6 +454,52 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample)
return histograms, nil
}
// DecodeHistogram decodes a Histogram from a byte slice.
func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) {
h.CounterResetHint = histogram.CounterResetHint(buf.Byte())
h.Schema = int32(buf.Varint64())
h.ZeroThreshold = math.Float64frombits(buf.Be64())
h.ZeroCount = buf.Uvarint64()
h.Count = buf.Uvarint64()
h.Sum = math.Float64frombits(buf.Be64())
l := buf.Uvarint()
if l > 0 {
h.PositiveSpans = make([]histogram.Span, l)
}
for i := range h.PositiveSpans {
h.PositiveSpans[i].Offset = int32(buf.Varint64())
h.PositiveSpans[i].Length = buf.Uvarint32()
}
l = buf.Uvarint()
if l > 0 {
h.NegativeSpans = make([]histogram.Span, l)
}
for i := range h.NegativeSpans {
h.NegativeSpans[i].Offset = int32(buf.Varint64())
h.NegativeSpans[i].Length = buf.Uvarint32()
}
l = buf.Uvarint()
if l > 0 {
h.PositiveBuckets = make([]int64, l)
}
for i := range h.PositiveBuckets {
h.PositiveBuckets[i] = buf.Varint64()
}
l = buf.Uvarint()
if l > 0 {
h.NegativeBuckets = make([]int64, l)
}
for i := range h.NegativeBuckets {
h.NegativeBuckets[i] = buf.Varint64()
}
}
func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) {
dec := encoding.Decbuf{B: rec}
t := Type(dec.Byte())
@ -519,49 +523,7 @@ func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogr
FH: &histogram.FloatHistogram{},
}
rh.FH.CounterResetHint = histogram.CounterResetHint(dec.Byte())
rh.FH.Schema = int32(dec.Varint64())
rh.FH.ZeroThreshold = dec.Be64Float64()
rh.FH.ZeroCount = dec.Be64Float64()
rh.FH.Count = dec.Be64Float64()
rh.FH.Sum = dec.Be64Float64()
l := dec.Uvarint()
if l > 0 {
rh.FH.PositiveSpans = make([]histogram.Span, l)
}
for i := range rh.FH.PositiveSpans {
rh.FH.PositiveSpans[i].Offset = int32(dec.Varint64())
rh.FH.PositiveSpans[i].Length = dec.Uvarint32()
}
l = dec.Uvarint()
if l > 0 {
rh.FH.NegativeSpans = make([]histogram.Span, l)
}
for i := range rh.FH.NegativeSpans {
rh.FH.NegativeSpans[i].Offset = int32(dec.Varint64())
rh.FH.NegativeSpans[i].Length = dec.Uvarint32()
}
l = dec.Uvarint()
if l > 0 {
rh.FH.PositiveBuckets = make([]float64, l)
}
for i := range rh.FH.PositiveBuckets {
rh.FH.PositiveBuckets[i] = dec.Be64Float64()
}
l = dec.Uvarint()
if l > 0 {
rh.FH.NegativeBuckets = make([]float64, l)
}
for i := range rh.FH.NegativeBuckets {
rh.FH.NegativeBuckets[i] = dec.Be64Float64()
}
DecodeFloatHistogram(&dec, rh.FH)
histograms = append(histograms, rh)
}
@ -574,6 +536,52 @@ func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogr
return histograms, nil
}
// Decode decodes a Histogram from a byte slice.
func DecodeFloatHistogram(buf *encoding.Decbuf, fh *histogram.FloatHistogram) {
fh.CounterResetHint = histogram.CounterResetHint(buf.Byte())
fh.Schema = int32(buf.Varint64())
fh.ZeroThreshold = buf.Be64Float64()
fh.ZeroCount = buf.Be64Float64()
fh.Count = buf.Be64Float64()
fh.Sum = buf.Be64Float64()
l := buf.Uvarint()
if l > 0 {
fh.PositiveSpans = make([]histogram.Span, l)
}
for i := range fh.PositiveSpans {
fh.PositiveSpans[i].Offset = int32(buf.Varint64())
fh.PositiveSpans[i].Length = buf.Uvarint32()
}
l = buf.Uvarint()
if l > 0 {
fh.NegativeSpans = make([]histogram.Span, l)
}
for i := range fh.NegativeSpans {
fh.NegativeSpans[i].Offset = int32(buf.Varint64())
fh.NegativeSpans[i].Length = buf.Uvarint32()
}
l = buf.Uvarint()
if l > 0 {
fh.PositiveBuckets = make([]float64, l)
}
for i := range fh.PositiveBuckets {
fh.PositiveBuckets[i] = buf.Be64Float64()
}
l = buf.Uvarint()
if l > 0 {
fh.NegativeBuckets = make([]float64, l)
}
for i := range fh.NegativeBuckets {
fh.NegativeBuckets[i] = buf.Be64Float64()
}
}
// Encoder encodes series, sample, and tombstones records.
// The zero value is ready to use.
type Encoder struct{}
@ -719,41 +727,46 @@ func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) []
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
buf.PutVarint64(h.T - first.T)
buf.PutByte(byte(h.H.CounterResetHint))
buf.PutVarint64(int64(h.H.Schema))
buf.PutBE64(math.Float64bits(h.H.ZeroThreshold))
buf.PutUvarint64(h.H.ZeroCount)
buf.PutUvarint64(h.H.Count)
buf.PutBE64(math.Float64bits(h.H.Sum))
buf.PutUvarint(len(h.H.PositiveSpans))
for _, s := range h.H.PositiveSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.H.NegativeSpans))
for _, s := range h.H.NegativeSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.H.PositiveBuckets))
for _, b := range h.H.PositiveBuckets {
buf.PutVarint64(b)
}
buf.PutUvarint(len(h.H.NegativeBuckets))
for _, b := range h.H.NegativeBuckets {
buf.PutVarint64(b)
}
EncodeHistogram(&buf, h.H)
}
return buf.Get()
}
// EncodeHistogram encodes a Histogram into a byte slice.
func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) {
buf.PutByte(byte(h.CounterResetHint))
buf.PutVarint64(int64(h.Schema))
buf.PutBE64(math.Float64bits(h.ZeroThreshold))
buf.PutUvarint64(h.ZeroCount)
buf.PutUvarint64(h.Count)
buf.PutBE64(math.Float64bits(h.Sum))
buf.PutUvarint(len(h.PositiveSpans))
for _, s := range h.PositiveSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.NegativeSpans))
for _, s := range h.NegativeSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.PositiveBuckets))
for _, b := range h.PositiveBuckets {
buf.PutVarint64(b)
}
buf.PutUvarint(len(h.NegativeBuckets))
for _, b := range h.NegativeBuckets {
buf.PutVarint64(b)
}
}
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(FloatHistogramSamples))
@ -772,37 +785,42 @@ func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
buf.PutVarint64(h.T - first.T)
buf.PutByte(byte(h.FH.CounterResetHint))
buf.PutVarint64(int64(h.FH.Schema))
buf.PutBEFloat64(h.FH.ZeroThreshold)
buf.PutBEFloat64(h.FH.ZeroCount)
buf.PutBEFloat64(h.FH.Count)
buf.PutBEFloat64(h.FH.Sum)
buf.PutUvarint(len(h.FH.PositiveSpans))
for _, s := range h.FH.PositiveSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.FH.NegativeSpans))
for _, s := range h.FH.NegativeSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.FH.PositiveBuckets))
for _, b := range h.FH.PositiveBuckets {
buf.PutBEFloat64(b)
}
buf.PutUvarint(len(h.FH.NegativeBuckets))
for _, b := range h.FH.NegativeBuckets {
buf.PutBEFloat64(b)
}
EncodeFloatHistogram(&buf, h.FH)
}
return buf.Get()
}
// Encode encodes the Float Histogram into a byte slice.
func EncodeFloatHistogram(buf *encoding.Encbuf, h *histogram.FloatHistogram) {
buf.PutByte(byte(h.CounterResetHint))
buf.PutVarint64(int64(h.Schema))
buf.PutBEFloat64(h.ZeroThreshold)
buf.PutBEFloat64(h.ZeroCount)
buf.PutBEFloat64(h.Count)
buf.PutBEFloat64(h.Sum)
buf.PutUvarint(len(h.PositiveSpans))
for _, s := range h.PositiveSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.NegativeSpans))
for _, s := range h.NegativeSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.PositiveBuckets))
for _, b := range h.PositiveBuckets {
buf.PutBEFloat64(b)
}
buf.PutUvarint(len(h.NegativeBuckets))
for _, b := range h.NegativeBuckets {
buf.PutBEFloat64(b)
}
}