diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 5067edc3ae..bcfc7be129 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -1154,40 +1154,35 @@ func (a *appender) log() error { } if len(a.pendingHistograms) > 0 { - buf1, buf2 := encoder.HistogramSamples(a.pendingHistograms, buf) - //buf = append(buf1, buf2...) - //if err := a.wal.Log(buf); err != nil { - // return err - //} - if len(buf1) > 0 { - buf = buf1[:0] - if err := a.wal.Log(buf1); err != nil { - return err - } + var customBucketsExist bool + buf, customBucketsExist = encoder.HistogramSamples(a.pendingHistograms, buf) + if err := a.wal.Log(buf); err != nil { + return err } - if len(buf2) > 0 { - buf = buf2[:0] - if err := a.wal.Log(buf2); err != nil { + buf = buf[:0] + if customBucketsExist { + buf = encoder.CustomBucketHistogramSamples(a.pendingHistograms, buf) + if err := a.wal.Log(buf); err != nil { return err } + buf = buf[:0] } } if len(a.pendingFloatHistograms) > 0 { - buf1, buf2 := encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf) - if len(buf1) > 0 { - buf = buf1[:0] - if err := a.wal.Log(buf1); err != nil { + var customBucketsExist bool + buf, customBucketsExist = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + if customBucketsExist { + buf = encoder.CustomBucketFloatHistogramSamples(a.pendingFloatHistograms, buf) + if err := a.wal.Log(buf); err != nil { return err } + buf = buf[:0] } - if len(buf2) > 0 { - buf = buf2[:0] - if err := a.wal.Log(buf2); err != nil { - return err - } - } - //buf = buf[:0] } if len(a.pendingExamplars) > 0 { diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 5332c61cdb..6b5d9ece05 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -163,6 +163,18 @@ func TestCommit(t *testing.T) { } } + lbls = labelsForTest(t.Name()+"_custom_bucket_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + customBucketHistograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), customBucketHistograms[i], nil) + require.NoError(t, err) + } + } + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) for _, l := range lbls { lset := labels.New(l...) @@ -175,6 +187,18 @@ func TestCommit(t *testing.T) { } } + lbls = labelsForTest(t.Name()+"custom_bucket_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + customBucketFloatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), nil, customBucketFloatHistograms[i]) + require.NoError(t, err) + } + } + require.NoError(t, app.Commit()) require.NoError(t, s.Close()) @@ -230,11 +254,11 @@ func TestCommit(t *testing.T) { } // Check that the WAL contained the same number of committed series/samples/exemplars. - require.Equal(t, numSeries*3, walSeriesCount, "unexpected number of series") + require.Equal(t, numSeries*5, walSeriesCount, "unexpected number of series") require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples") require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars") - require.Equal(t, numSeries*numHistograms, walHistogramCount, "unexpected number of histograms") - require.Equal(t, numSeries*numHistograms, walFloatHistogramCount, "unexpected number of float histograms") + require.Equal(t, numSeries*numHistograms*2, walHistogramCount, "unexpected number of histograms") + require.Equal(t, numSeries*numHistograms*2, walFloatHistogramCount, "unexpected number of float histograms") } func TestRollback(t *testing.T) { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 3701a57135..78b256fee3 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -942,45 +942,30 @@ func (a *headAppender) log() error { } } if len(a.histograms) > 0 { - rec1, rec2 := enc.HistogramSamples(a.histograms, buf) - //rec = append(rec1, rec2...) - // - //buf = rec[:0] - // - //if err := a.head.wal.Log(rec); err != nil { - // return fmt.Errorf("log samples: %w", err) - //} - if len(rec1) != 0 { - buf = rec1[:0] - if err := a.head.wal.Log(rec1); err != nil { - return fmt.Errorf("log histograms: %w", err) - } + rec, customBucketsExist := enc.HistogramSamples(a.histograms, buf) + buf = rec[:0] + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log histograms: %w", err) } - if len(rec2) != 0 { - buf = rec2[:0] - if err := a.head.wal.Log(rec2); err != nil { + + if customBucketsExist { + enc.CustomBucketHistogramSamples(a.histograms, buf) + buf = rec[:0] + if err := a.head.wal.Log(rec); err != nil { return fmt.Errorf("log custom bucket histograms: %w", err) } } } if len(a.floatHistograms) > 0 { - rec1, rec2 := enc.FloatHistogramSamples(a.floatHistograms, buf) - //rec = append(rec1, rec2...) - // - //buf = rec[:0] - // - //if err := a.head.wal.Log(rec); err != nil { - // return fmt.Errorf("log samples: %w", err) - //} - if len(rec1) != 0 { - buf = rec1[:0] - if err := a.head.wal.Log(rec1); err != nil { - return fmt.Errorf("log float histograms: %w", err) - } + rec, customBucketsExist := enc.FloatHistogramSamples(a.floatHistograms, buf) + buf = rec[:0] + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log float histograms: %w", err) } - if len(rec2) != 0 { - buf = rec2[:0] - if err := a.head.wal.Log(rec2); err != nil { + + if customBucketsExist { + buf = rec[:0] + if err := a.head.wal.Log(rec); err != nil { return fmt.Errorf("log custom bucket float histograms: %w", err) } } diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 15f5053d53..2b19cdbb6f 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -193,10 +193,6 @@ type RefFloatHistogramSample struct { FH *histogram.FloatHistogram } -type RefCustomBucketHistogramSample struct { - RefHistogramSample -} - // RefMmapMarker marks that the all the samples of the given series until now have been m-mapped to disk. type RefMmapMarker struct { Ref chunks.HeadSeriesRef @@ -748,15 +744,12 @@ func (e *Encoder) MmapMarkers(markers []RefMmapMarker, b []byte) []byte { return buf.Get() } -func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]byte, []byte) { +func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]byte, bool) { buf := encoding.Encbuf{B: b} buf.PutByte(byte(HistogramSamples)) - customBucketHistBuf := encoding.Encbuf{B: b} - customBucketHistBuf.PutByte(byte(CustomBucketHistogramSamples)) - if len(histograms) == 0 { - return buf.Get(), customBucketHistBuf.Get() + return buf.Get(), false } // Store base timestamp and base reference number of first histogram. @@ -765,34 +758,46 @@ func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([ buf.PutBE64(uint64(first.Ref)) buf.PutBE64int64(first.T) - customBucketHistBuf.PutBE64(uint64(first.Ref)) - customBucketHistBuf.PutBE64int64(first.T) - - histsAdded := 0 - customBucketHistsAdded := 0 + customBucketSamplesExist := false for _, h := range histograms { if h.H.UsesCustomBuckets() { - customBucketHistBuf.PutVarint64(int64(h.Ref) - int64(first.Ref)) - customBucketHistBuf.PutVarint64(h.T - first.T) + customBucketSamplesExist = true + continue + } - EncodeHistogram(&customBucketHistBuf, h.H) - customBucketHistsAdded++ - } else { + buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) + buf.PutVarint64(h.T - first.T) + + EncodeHistogram(&buf, h.H) + } + + return buf.Get(), customBucketSamplesExist +} + +func (e *Encoder) CustomBucketHistogramSamples(histograms []RefHistogramSample, b []byte) []byte { + buf := encoding.Encbuf{B: b} + buf.PutByte(byte(CustomBucketHistogramSamples)) + + if len(histograms) == 0 { + return buf.Get() + } + + // Store base timestamp and base reference number of first histogram. + // All histograms encode their timestamp and ref as delta to those. + first := histograms[0] + buf.PutBE64(uint64(first.Ref)) + buf.PutBE64int64(first.T) + + for _, h := range histograms { + if h.H.UsesCustomBuckets() { buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) buf.PutVarint64(h.T - first.T) EncodeHistogram(&buf, h.H) - histsAdded++ } } - if customBucketHistsAdded == 0 { - customBucketHistBuf.Reset() - } else if histsAdded == 0 { - buf.Reset() - } - - return buf.Get(), customBucketHistBuf.Get() + return buf.Get() } // EncodeHistogram encodes a Histogram into a byte slice. @@ -836,15 +841,12 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) { } } -func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) ([]byte, []byte) { +func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) ([]byte, bool) { buf := encoding.Encbuf{B: b} buf.PutByte(byte(FloatHistogramSamples)) - customBucketHistBuf := encoding.Encbuf{B: b} - customBucketHistBuf.PutByte(byte(CustomBucketFloatHistogramSamples)) - if len(histograms) == 0 { - return buf.Get(), customBucketHistBuf.Get() + return buf.Get(), false } // Store base timestamp and base reference number of first histogram. @@ -853,34 +855,46 @@ func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b buf.PutBE64(uint64(first.Ref)) buf.PutBE64int64(first.T) - customBucketHistBuf.PutBE64(uint64(first.Ref)) - customBucketHistBuf.PutBE64int64(first.T) - - histsAdded := 0 - customBucketHistsAdded := 0 + customBucketsExist := false for _, h := range histograms { if h.FH.UsesCustomBuckets() { - customBucketHistBuf.PutVarint64(int64(h.Ref) - int64(first.Ref)) - customBucketHistBuf.PutVarint64(h.T - first.T) + customBucketsExist = true + continue + } - EncodeFloatHistogram(&customBucketHistBuf, h.FH) - customBucketHistsAdded++ - } else { + buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) + buf.PutVarint64(h.T - first.T) + + EncodeFloatHistogram(&buf, h.FH) + } + + return buf.Get(), customBucketsExist +} + +func (e *Encoder) CustomBucketFloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte { + buf := encoding.Encbuf{B: b} + buf.PutByte(byte(CustomBucketFloatHistogramSamples)) + + if len(histograms) == 0 { + return buf.Get() + } + + // Store base timestamp and base reference number of first histogram. + // All histograms encode their timestamp and ref as delta to those. + first := histograms[0] + buf.PutBE64(uint64(first.Ref)) + buf.PutBE64int64(first.T) + + for _, h := range histograms { + if h.FH.UsesCustomBuckets() { buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) buf.PutVarint64(h.T - first.T) EncodeFloatHistogram(&buf, h.FH) - histsAdded++ } } - if customBucketHistsAdded == 0 { - customBucketHistBuf.Reset() - } else if histsAdded == 0 { - buf.Reset() - } - - return buf.Get(), customBucketHistBuf.Get() + return buf.Get() } // EncodeFloatHistogram encodes the Float Histogram into a byte slice. diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index 67c7eab970..af94f2b207 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -166,12 +166,12 @@ func TestRecord_EncodeDecode(t *testing.T) { }, } - histSamples, customBucketHistSamples := enc.HistogramSamples(histograms, nil) + histSamples, _ := enc.HistogramSamples(histograms, nil) + customBucketHistSamples := enc.CustomBucketHistogramSamples(histograms, nil) decHistograms, err := dec.HistogramSamples(histSamples, nil) require.NoError(t, err) - decCustomBucketHistograms, err := dec.HistogramSamples(customBucketHistSamples, nil) - require.NoError(t, err) - decHistograms = append(decHistograms, decCustomBucketHistograms...) + decCustomBucketHistSamples, err := dec.HistogramSamples(customBucketHistSamples, nil) + decHistograms = append(decHistograms, decCustomBucketHistSamples...) require.Equal(t, histograms, decHistograms) floatHistograms := make([]RefFloatHistogramSample, len(histograms)) @@ -182,10 +182,12 @@ func TestRecord_EncodeDecode(t *testing.T) { FH: h.H.ToFloat(nil), } } - histSamples, customBucketFloatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil) - decFloatHistograms, err := dec.FloatHistogramSamples(histSamples, nil) + floatHistSamples, _ := enc.FloatHistogramSamples(floatHistograms, nil) + customBucketFloatHistSamples := enc.CustomBucketFloatHistogramSamples(floatHistograms, nil) + decFloatHistograms, err := dec.FloatHistogramSamples(floatHistSamples, nil) require.NoError(t, err) decCustomBucketFloatHistograms, err := dec.FloatHistogramSamples(customBucketFloatHistSamples, nil) + require.NoError(t, err) decFloatHistograms = append(decFloatHistograms, decCustomBucketFloatHistograms...) require.Equal(t, floatHistograms, decFloatHistograms) @@ -194,7 +196,8 @@ func TestRecord_EncodeDecode(t *testing.T) { histograms[i].H.CounterResetHint = histogram.GaugeType } - gaugeHistSamples, customBucketGaugeHistSamples := enc.HistogramSamples(histograms, nil) + gaugeHistSamples, _ := enc.HistogramSamples(histograms, nil) + customBucketGaugeHistSamples := enc.CustomBucketHistogramSamples(histograms, nil) decGaugeHistograms, err := dec.HistogramSamples(gaugeHistSamples, nil) require.NoError(t, err) decCustomBucketGaugeHistograms, err := dec.HistogramSamples(customBucketGaugeHistSamples, nil) @@ -207,10 +210,12 @@ func TestRecord_EncodeDecode(t *testing.T) { floatHistograms[i].FH.CounterResetHint = histogram.GaugeType } - gaugeHistSamples, customBucketGaugeFloatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil) - decGaugeFloatHistograms, err := dec.FloatHistogramSamples(gaugeHistSamples, nil) + gaugeFloatHistSamples, _ := enc.FloatHistogramSamples(floatHistograms, nil) + customBucketGaugeFloatHistSamples := enc.CustomBucketFloatHistogramSamples(floatHistograms, nil) + decGaugeFloatHistograms, err := dec.FloatHistogramSamples(gaugeFloatHistSamples, nil) require.NoError(t, err) decCustomBucketGaugeFloatHistograms, err := dec.FloatHistogramSamples(customBucketGaugeFloatHistSamples, nil) + require.NoError(t, err) decFloatHistograms = append(decGaugeFloatHistograms, decCustomBucketGaugeFloatHistograms...) require.Equal(t, floatHistograms, decFloatHistograms) } @@ -295,10 +300,27 @@ func TestRecord_Corrupted(t *testing.T) { PositiveBuckets: []int64{1, 1, -1, 0}, }, }, + { + Ref: 67, + T: 5678, + H: &histogram.Histogram{ + Count: 8, + ZeroThreshold: 0.001, + Sum: 35.5, + Schema: -53, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []int64{2, -1, 2, 0}, + CustomValues: []float64{0, 2, 4, 6, 8}, + }, + }, } - corruptedHists, corruptedCustomBucketHists := enc.HistogramSamples(histograms, nil) + corruptedHists, _ := enc.HistogramSamples(histograms, nil) corruptedHists = corruptedHists[:8] + corruptedCustomBucketHists := enc.CustomBucketHistogramSamples(histograms, nil) corruptedCustomBucketHists = corruptedCustomBucketHists[:8] _, err := dec.HistogramSamples(corruptedHists, nil) require.ErrorIs(t, err, encoding.ErrInvalidSize) @@ -361,9 +383,10 @@ func TestRecord_Type(t *testing.T) { }, }, } - hists, customBucketHists := enc.HistogramSamples(histograms, nil) + hists, _ := enc.HistogramSamples(histograms, nil) recordType = dec.Type(hists) require.Equal(t, HistogramSamples, recordType) + customBucketHists := enc.CustomBucketHistogramSamples(histograms, nil) recordType = dec.Type(customBucketHists) require.Equal(t, CustomBucketHistogramSamples, recordType) diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index cd82676da9..5bb79595b9 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -238,7 +238,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He } } if len(repl) > 0 { - _, buf = enc.HistogramSamples(repl, buf) + buf = enc.CustomBucketHistogramSamples(repl, buf) } stats.TotalSamples += len(histogramSamples) stats.DroppedSamples += len(histogramSamples) - len(repl) @@ -272,7 +272,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He } } if len(repl) > 0 { - _, buf = enc.FloatHistogramSamples(repl, buf) + buf = enc.CustomBucketFloatHistogramSamples(repl, buf) } stats.TotalSamples += len(floatHistogramSamples) stats.DroppedSamples += len(floatHistogramSamples) - len(repl) diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index a5692a9aa4..f947f28095 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -127,6 +127,20 @@ func TestCheckpoint(t *testing.T) { PositiveBuckets: []int64{int64(i + 1), 1, -1, 0}, } } + makeCustomBucketHistogram := func(i int) *histogram.Histogram { + return &histogram.Histogram{ + Count: 5 + uint64(i*4), + ZeroCount: 2 + uint64(i), + ZeroThreshold: 0.001, + Sum: 18.4 * float64(i+1), + Schema: -53, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + CustomValues: []float64{0, 1, 2, 3, 4}, + } + } makeFloatHistogram := func(i int) *histogram.FloatHistogram { return &histogram.FloatHistogram{ Count: 5 + float64(i*4), @@ -141,6 +155,20 @@ func TestCheckpoint(t *testing.T) { PositiveBuckets: []float64{float64(i + 1), 1, -1, 0}, } } + makeCustomBucketFloatHistogram := func(i int) *histogram.FloatHistogram { + return &histogram.FloatHistogram{ + Count: 5 + float64(i*4), + ZeroCount: 2 + float64(i), + ZeroThreshold: 0.001, + Sum: 18.4 * float64(i+1), + Schema: -53, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + CustomValues: []float64{0, 1, 2, 3, 4}, + } + } for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { @@ -208,24 +236,40 @@ func TestCheckpoint(t *testing.T) { require.NoError(t, w.Log(b)) samplesInWAL += 4 h := makeHistogram(i) - b1, b2 := enc.HistogramSamples([]record.RefHistogramSample{ + b, _ = enc.HistogramSamples([]record.RefHistogramSample{ {Ref: 0, T: last, H: h}, {Ref: 1, T: last + 10000, H: h}, {Ref: 2, T: last + 20000, H: h}, {Ref: 3, T: last + 30000, H: h}, }, nil) - require.NoError(t, w.Log(b1)) - require.NoError(t, w.Log(b2)) + require.NoError(t, w.Log(b)) + histogramsInWAL += 4 + cbh := makeCustomBucketHistogram(i) + b = enc.CustomBucketHistogramSamples([]record.RefHistogramSample{ + {Ref: 0, T: last, H: cbh}, + {Ref: 1, T: last + 10000, H: cbh}, + {Ref: 2, T: last + 20000, H: cbh}, + {Ref: 3, T: last + 30000, H: cbh}, + }, nil) + require.NoError(t, w.Log(b)) histogramsInWAL += 4 fh := makeFloatHistogram(i) - b1, b2 = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{ + b, _ = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{ {Ref: 0, T: last, FH: fh}, {Ref: 1, T: last + 10000, FH: fh}, {Ref: 2, T: last + 20000, FH: fh}, {Ref: 3, T: last + 30000, FH: fh}, }, nil) - require.NoError(t, w.Log(b1)) - require.NoError(t, w.Log(b2)) + require.NoError(t, w.Log(b)) + floatHistogramsInWAL += 4 + cbfh := makeCustomBucketFloatHistogram(i) + b = enc.CustomBucketFloatHistogramSamples([]record.RefFloatHistogramSample{ + {Ref: 0, T: last, FH: cbfh}, + {Ref: 1, T: last + 10000, FH: cbfh}, + {Ref: 2, T: last + 20000, FH: cbfh}, + {Ref: 3, T: last + 30000, FH: cbfh}, + }, nil) + require.NoError(t, w.Log(b)) floatHistogramsInWAL += 4 b = enc.Exemplars([]record.RefExemplar{ @@ -286,14 +330,14 @@ func TestCheckpoint(t *testing.T) { require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp") } samplesInCheckpoint += len(samples) - case record.HistogramSamples: + case record.HistogramSamples, record.CustomBucketHistogramSamples: histograms, err := dec.HistogramSamples(rec, nil) require.NoError(t, err) for _, h := range histograms { require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp") } histogramsInCheckpoint += len(histograms) - case record.FloatHistogramSamples: + case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples: floatHistograms, err := dec.FloatHistogramSamples(rec, nil) require.NoError(t, err) for _, h := range floatHistograms { diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 89db5d2dd7..169bd296fe 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -546,7 +546,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } w.writer.AppendExemplars(exemplars) - case record.HistogramSamples: + case record.HistogramSamples, record.CustomBucketHistogramSamples: // Skip if experimental "histograms over remote write" is not enabled. if !w.sendHistograms { break @@ -574,7 +574,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { histogramsToSend = histogramsToSend[:0] } - case record.FloatHistogramSamples: + case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples: // Skip if experimental "histograms over remote write" is not enabled. if !w.sendHistograms { break diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index c2499a7cec..5ff70bb215 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -209,21 +209,43 @@ func TestTailSamples(t *testing.T) { NegativeBuckets: []int64{int64(-i) - 1}, } - histogram, customBucketHistogram := enc.HistogramSamples([]record.RefHistogramSample{{ + histograms, _ := enc.HistogramSamples([]record.RefHistogramSample{{ Ref: chunks.HeadSeriesRef(inner), T: now.UnixNano() + 1, H: hist, }}, nil) - require.NoError(t, w.Log(histogram)) - require.NoError(t, w.Log(customBucketHistogram)) + require.NoError(t, w.Log(histograms)) - floatHistogram, floatCustomBucketHistogram := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ + customBucketHist := &histogram.Histogram{ + Schema: -53, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 2, + Sum: 0, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + CustomValues: []float64{float64(i) + 2}, + } + + customBucketHistograms := enc.CustomBucketHistogramSamples([]record.RefHistogramSample{{ + Ref: chunks.HeadSeriesRef(inner), + T: now.UnixNano() + 1, + H: customBucketHist, + }}, nil) + require.NoError(t, w.Log(customBucketHistograms)) + + floatHistograms, _ := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ Ref: chunks.HeadSeriesRef(inner), T: now.UnixNano() + 1, FH: hist.ToFloat(nil), }}, nil) - require.NoError(t, w.Log(floatHistogram)) - require.NoError(t, w.Log(floatCustomBucketHistogram)) + require.NoError(t, w.Log(floatHistograms)) + + customBucketFloatHistograms := enc.CustomBucketFloatHistogramSamples([]record.RefFloatHistogramSample{{ + Ref: chunks.HeadSeriesRef(inner), + T: now.UnixNano() + 1, + FH: customBucketHist.ToFloat(nil), + }}, nil) + require.NoError(t, w.Log(customBucketFloatHistograms)) } } @@ -250,7 +272,7 @@ func TestTailSamples(t *testing.T) { expectedSeries := seriesCount expectedSamples := seriesCount * samplesCount expectedExemplars := seriesCount * exemplarsCount - expectedHistograms := seriesCount * histogramsCount + expectedHistograms := seriesCount * histogramsCount * 2 retry(t, defaultRetryInterval, defaultRetries, func() bool { return wt.checkNumSeries() >= expectedSeries })