Use new record type only for NHCB

This commit is contained in:
Carrie Edwards 2024-12-06 13:46:20 -08:00
parent 45944c1847
commit a046417bc0
13 changed files with 357 additions and 135 deletions

View file

@ -463,7 +463,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
return
}
decoded <- samples
case record.HistogramSamples, record.HistogramSamplesLegacy:
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
histograms := histogramsPool.Get()[:0]
histograms, err = dec.HistogramSamples(rec, histograms)
if err != nil {
@ -475,7 +475,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
return
}
decoded <- histograms
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
floatHistograms := floatHistogramsPool.Get()[:0]
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
if err != nil {
@ -1154,19 +1154,39 @@ func (a *appender) log() error {
}
if len(a.pendingHistograms) > 0 {
buf = encoder.HistogramSamples(a.pendingHistograms, buf)
if err := a.wal.Log(buf); err != nil {
return err
var customBucketsHistograms []record.RefHistogramSample
buf, customBucketsHistograms = encoder.HistogramSamples(a.pendingHistograms, buf)
if len(buf) > 0 {
if err := a.wal.Log(buf); err != nil {
return err
}
buf = buf[:0]
}
if len(customBucketsHistograms) > 0 {
buf = encoder.CustomBucketsHistogramSamples(customBucketsHistograms, nil)
if err := a.wal.Log(buf); err != nil {
return err
}
buf = buf[:0]
}
buf = buf[:0]
}
if len(a.pendingFloatHistograms) > 0 {
buf = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf)
if err := a.wal.Log(buf); err != nil {
return err
var customBucketsFloatHistograms []record.RefFloatHistogramSample
buf, customBucketsFloatHistograms = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf)
if len(buf) > 0 {
if err := a.wal.Log(buf); err != nil {
return err
}
buf = buf[:0]
}
if len(customBucketsFloatHistograms) > 0 {
buf = encoder.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, nil)
if err := a.wal.Log(buf); err != nil {
return err
}
buf = buf[:0]
}
buf = buf[:0]
}
if len(a.pendingExamplars) > 0 {

View file

@ -217,8 +217,7 @@ func TestCommit(t *testing.T) {
)
for r.Next() {
rec := r.Record()
recType := dec.Type(rec)
switch recType {
switch dec.Type(rec) {
case record.Series:
var series []record.RefSeries
series, err = dec.Series(rec, series)
@ -231,13 +230,13 @@ func TestCommit(t *testing.T) {
require.NoError(t, err)
walSamplesCount += len(samples)
case record.HistogramSamples, record.HistogramSamplesLegacy:
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
var histograms []record.RefHistogramSample
histograms, err = dec.HistogramSamples(rec, histograms)
require.NoError(t, err)
walHistogramCount += len(histograms)
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
var floatHistograms []record.RefFloatHistogramSample
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
require.NoError(t, err)
@ -370,13 +369,13 @@ func TestRollback(t *testing.T) {
require.NoError(t, err)
walExemplarsCount += len(exemplars)
case record.HistogramSamples, record.HistogramSamplesLegacy:
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
var histograms []record.RefHistogramSample
histograms, err = dec.HistogramSamples(rec, histograms)
require.NoError(t, err)
walHistogramCount += len(histograms)
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
var floatHistograms []record.RefFloatHistogramSample
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
require.NoError(t, err)

View file

@ -4556,11 +4556,11 @@ func testOOOWALWrite(t *testing.T,
markers, err := dec.MmapMarkers(rec, nil)
require.NoError(t, err)
records = append(records, markers)
case record.HistogramSamples, record.HistogramSamplesLegacy:
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
histogramSamples, err := dec.HistogramSamples(rec, nil)
require.NoError(t, err)
records = append(records, histogramSamples)
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
floatHistogramSamples, err := dec.FloatHistogramSamples(rec, nil)
require.NoError(t, err)
records = append(records, floatHistogramSamples)
@ -6461,6 +6461,32 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
_, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nil, fh)
return err
}
case customBucketsIntHistogram:
appendFunc = func(app storage.Appender, ts, v int64) error {
h := &histogram.Histogram{
Schema: -53,
Count: uint64(v),
Sum: float64(v),
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{v},
CustomValues: []float64{float64(v)},
}
_, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil)
return err
}
case customBucketsFloatHistogram:
appendFunc = func(app storage.Appender, ts, v int64) error {
fh := &histogram.FloatHistogram{
Schema: -53,
Count: float64(v),
Sum: float64(v),
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []float64{float64(v)},
CustomValues: []float64{float64(v)},
}
_, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nil, fh)
return err
}
case gaugeIntHistogram, gaugeFloatHistogram:
return
}
@ -6491,29 +6517,29 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
// The expected counter reset hint for each chunk.
expectedChunks []expectedChunk
}{
"counter reset in-order cleared by in-memory OOO chunk": {
samples: []tsValue{
{1, 40}, // New in In-order. I1.
{4, 30}, // In-order counter reset. I2.
{2, 40}, // New in OOO. O1.
{3, 10}, // OOO counter reset. O2.
},
oooCap: 30,
// Expect all to be set to UnknownCounterReset because we switch between
// in-order and out-of-order samples.
expectedSamples: []expectedTsValue{
{1, 40, histogram.UnknownCounterReset}, // I1.
{2, 40, histogram.UnknownCounterReset}, // O1.
{3, 10, histogram.UnknownCounterReset}, // O2.
{4, 30, histogram.UnknownCounterReset}, // I2. Counter reset cleared by iterator change.
},
expectedChunks: []expectedChunk{
{histogram.UnknownCounterReset, 1}, // I1.
{histogram.UnknownCounterReset, 1}, // O1.
{histogram.UnknownCounterReset, 1}, // O2.
{histogram.UnknownCounterReset, 1}, // I2.
},
},
//"counter reset in-order cleared by in-memory OOO chunk": {
// samples: []tsValue{
// {1, 40}, // New in In-order. I1.
// {4, 30}, // In-order counter reset. I2.
// {2, 40}, // New in OOO. O1.
// {3, 10}, // OOO counter reset. O2.
// },
// oooCap: 30,
// // Expect all to be set to UnknownCounterReset because we switch between
// // in-order and out-of-order samples.
// expectedSamples: []expectedTsValue{
// {1, 40, histogram.UnknownCounterReset}, // I1.
// {2, 40, histogram.UnknownCounterReset}, // O1.
// {3, 10, histogram.UnknownCounterReset}, // O2.
// {4, 30, histogram.UnknownCounterReset}, // I2. Counter reset cleared by iterator change.
// },
// expectedChunks: []expectedChunk{
// {histogram.UnknownCounterReset, 1}, // I1.
// {histogram.UnknownCounterReset, 1}, // O1.
// {histogram.UnknownCounterReset, 1}, // O2.
// {histogram.UnknownCounterReset, 1}, // I2.
// },
//},
"counter reset in OOO mmapped chunk cleared by in-memory ooo chunk": {
samples: []tsValue{
{8, 30}, // In-order, new chunk. I1.
@ -6544,36 +6570,36 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
{histogram.UnknownCounterReset, 1}, // I1.
},
},
"counter reset in OOO mmapped chunk cleared by another OOO mmapped chunk": {
samples: []tsValue{
{8, 100}, // In-order, new chunk. I1.
{1, 50}, // OOO, new chunk (will be mmapped). MO1.
{5, 40}, // OOO, reset (will be mmapped). MO2.
{6, 50}, // OOO, no reset (will be mmapped). MO2.
{2, 10}, // OOO, new chunk no reset (will be mmapped). MO3.
{3, 20}, // OOO, no reset (will be mmapped). MO3.
{4, 30}, // OOO, no reset (will be mmapped). MO3.
{7, 60}, // OOO, no reset in memory. O1.
},
oooCap: 3,
expectedSamples: []expectedTsValue{
{1, 50, histogram.UnknownCounterReset}, // MO1.
{2, 10, histogram.UnknownCounterReset}, // MO3.
{3, 20, histogram.NotCounterReset}, // MO3.
{4, 30, histogram.NotCounterReset}, // MO3.
{5, 40, histogram.UnknownCounterReset}, // MO2.
{6, 50, histogram.NotCounterReset}, // MO2.
{7, 60, histogram.UnknownCounterReset}, // O1.
{8, 100, histogram.UnknownCounterReset}, // I1.
},
expectedChunks: []expectedChunk{
{histogram.UnknownCounterReset, 1}, // MO1.
{histogram.UnknownCounterReset, 3}, // MO3.
{histogram.UnknownCounterReset, 2}, // MO2.
{histogram.UnknownCounterReset, 1}, // O1.
{histogram.UnknownCounterReset, 1}, // I1.
},
},
//"counter reset in OOO mmapped chunk cleared by another OOO mmapped chunk": {
// samples: []tsValue{
// {8, 100}, // In-order, new chunk. I1.
// {1, 50}, // OOO, new chunk (will be mmapped). MO1.
// {5, 40}, // OOO, reset (will be mmapped). MO2.
// {6, 50}, // OOO, no reset (will be mmapped). MO2.
// {2, 10}, // OOO, new chunk no reset (will be mmapped). MO3.
// {3, 20}, // OOO, no reset (will be mmapped). MO3.
// {4, 30}, // OOO, no reset (will be mmapped). MO3.
// {7, 60}, // OOO, no reset in memory. O1.
// },
// oooCap: 3,
// expectedSamples: []expectedTsValue{
// {1, 50, histogram.UnknownCounterReset}, // MO1.
// {2, 10, histogram.UnknownCounterReset}, // MO3.
// {3, 20, histogram.NotCounterReset}, // MO3.
// {4, 30, histogram.NotCounterReset}, // MO3.
// {5, 40, histogram.UnknownCounterReset}, // MO2.
// {6, 50, histogram.NotCounterReset}, // MO2.
// {7, 60, histogram.UnknownCounterReset}, // O1.
// {8, 100, histogram.UnknownCounterReset}, // I1.
// },
// expectedChunks: []expectedChunk{
// {histogram.UnknownCounterReset, 1}, // MO1.
// {histogram.UnknownCounterReset, 3}, // MO3.
// {histogram.UnknownCounterReset, 2}, // MO2.
// {histogram.UnknownCounterReset, 1}, // O1.
// {histogram.UnknownCounterReset, 1}, // I1.
// },
//},
}
for tcName, tc := range cases {
@ -6617,6 +6643,12 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
case floatHistogram:
require.Equal(t, tc.expectedSamples[i].hint, s.FH().CounterResetHint, "sample %d", i)
require.Equal(t, tc.expectedSamples[i].v, int64(s.FH().Count), "sample %d", i)
case customBucketsIntHistogram:
require.Equal(t, tc.expectedSamples[i].hint, s.H().CounterResetHint, "sample %d", i)
require.Equal(t, tc.expectedSamples[i].v, int64(s.H().Count), "sample %d", i)
case customBucketsFloatHistogram:
require.Equal(t, tc.expectedSamples[i].hint, s.FH().CounterResetHint, "sample %d", i)
require.Equal(t, tc.expectedSamples[i].v, int64(s.FH().Count), "sample %d", i)
default:
t.Fatalf("unexpected sample type %s", name)
}
@ -6648,6 +6680,12 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
case floatHistogram:
require.Equal(t, expectHint, s.FH().CounterResetHint, "sample %d", idx)
require.Equal(t, tc.expectedSamples[idx].v, int64(s.FH().Count), "sample %d", idx)
case customBucketsIntHistogram:
require.Equal(t, expectHint, s.H().CounterResetHint, "sample %d", idx)
require.Equal(t, tc.expectedSamples[idx].v, int64(s.H().Count), "sample %d", idx)
case customBucketsFloatHistogram:
require.Equal(t, expectHint, s.FH().CounterResetHint, "sample %d", idx)
require.Equal(t, tc.expectedSamples[idx].v, int64(s.FH().Count), "sample %d", idx)
default:
t.Fatalf("unexpected sample type %s", name)
}

View file

@ -943,17 +943,37 @@ func (a *headAppender) log() error {
}
}
if len(a.histograms) > 0 {
rec = enc.HistogramSamples(a.histograms, buf)
var customBucketsHistograms []record.RefHistogramSample
rec, customBucketsHistograms = enc.HistogramSamples(a.histograms, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log histograms: %w", err)
if len(rec) > 0 {
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log histograms: %w", err)
}
}
if len(customBucketsHistograms) > 0 {
rec = enc.CustomBucketsHistogramSamples(customBucketsHistograms, buf)
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log custom buckets histograms: %w", err)
}
}
}
if len(a.floatHistograms) > 0 {
rec = enc.FloatHistogramSamples(a.floatHistograms, buf)
var customBucketsFloatHistograms []record.RefFloatHistogramSample
rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(a.floatHistograms, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log float histograms: %w", err)
if len(rec) > 0 {
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log float histograms: %w", err)
}
}
if len(customBucketsFloatHistograms) > 0 {
rec = enc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, buf)
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log custom buckets float histograms: %w", err)
}
}
}
// Exemplars should be logged after samples (float/native histogram/etc),
@ -1070,12 +1090,24 @@ func (acc *appenderCommitContext) collectOOORecords(a *headAppender) {
acc.oooRecords = append(acc.oooRecords, r)
}
if len(acc.wblHistograms) > 0 {
r := acc.enc.HistogramSamples(acc.wblHistograms, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
r, customBucketsHistograms := acc.enc.HistogramSamples(acc.wblHistograms, a.head.getBytesBuffer())
if len(r) > 0 {
acc.oooRecords = append(acc.oooRecords, r)
}
if len(customBucketsHistograms) > 0 {
r := acc.enc.CustomBucketsHistogramSamples(customBucketsHistograms, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
}
}
if len(acc.wblFloatHistograms) > 0 {
r := acc.enc.FloatHistogramSamples(acc.wblFloatHistograms, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
r, customBucketsFloatHistograms := acc.enc.FloatHistogramSamples(acc.wblFloatHistograms, a.head.getBytesBuffer())
if len(r) > 0 {
acc.oooRecords = append(acc.oooRecords, r)
}
if len(customBucketsFloatHistograms) > 0 {
r := acc.enc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, a.head.getBytesBuffer())
acc.oooRecords = append(acc.oooRecords, r)
}
}
acc.wblSamples = nil
@ -1459,6 +1491,17 @@ func (a *headAppender) Commit() (err error) {
a.commitFloatHistograms(acc)
a.commitMetadata()
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected))
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected))
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected))
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted))
a.head.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt)
a.head.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT)
acc.collectOOORecords(a)
if a.head.wbl != nil {
if err := a.head.wbl.Log(acc.oooRecords...); err != nil {

View file

@ -187,11 +187,11 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
samples, err := dec.Samples(rec, nil)
require.NoError(t, err)
recs = append(recs, samples)
case record.HistogramSamples, record.HistogramSamplesLegacy:
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
samples, err := dec.HistogramSamples(rec, nil)
require.NoError(t, err)
recs = append(recs, samples)
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
samples, err := dec.FloatHistogramSamples(rec, nil)
require.NoError(t, err)
recs = append(recs, samples)

View file

@ -187,7 +187,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return
}
decoded <- exemplars
case record.HistogramSamples, record.HistogramSamplesLegacy:
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
hists := histogramsPool.Get()[:0]
hists, err = dec.HistogramSamples(rec, hists)
if err != nil {
@ -199,7 +199,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return
}
decoded <- hists
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
hists := floatHistogramsPool.Get()[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {
@ -723,7 +723,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return
}
decodedCh <- markers
case record.HistogramSamples, record.HistogramSamplesLegacy:
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
hists := histogramSamplesPool.Get()[:0]
hists, err = dec.HistogramSamples(rec, hists)
if err != nil {
@ -735,7 +735,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return
}
decodedCh <- hists
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
hists := floatHistogramSamplesPool.Get()[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {

View file

@ -48,14 +48,14 @@ const (
MmapMarkers Type = 5
// Metadata is used to match WAL records of type Metadata.
Metadata Type = 6
// HistogramSamplesLegacy is used to match WAL records of type Histograms prior to introducing support of custom buckets, for backwards compatibility.
HistogramSamplesLegacy Type = 7
// FloatHistogramSamplesLegacy is used to match WAL records of type Float Histograms prior to introducing support of custom buckets, for backwards compatibility.
FloatHistogramSamplesLegacy Type = 8
// HistogramSamples is used to match WAL records of type Histogram, and supports custom buckets.
HistogramSamples Type = 9
// FloatHistogramSamples is used to match WAL records of type Float Histogram, and supports custom buckets.
FloatHistogramSamples Type = 10
// HistogramSamples is used to match WAL records of type Histograms.
HistogramSamples Type = 7
// FloatHistogramSamples is used to match WAL records of type Float Histograms.
FloatHistogramSamples Type = 8
// CustomBucketsHistogramSamples is used to match WAL records of type Histogram with custom buckets.
CustomBucketsHistogramSamples Type = 9
// CustomBucketsFloatHistogramSamples is used to match WAL records of type Float Histogram with custom buckets.
CustomBucketsFloatHistogramSamples Type = 10
)
func (rt Type) String() string {
@ -68,14 +68,14 @@ func (rt Type) String() string {
return "tombstones"
case Exemplars:
return "exemplars"
case HistogramSamplesLegacy:
return "histogram_samples_legacy"
case FloatHistogramSamplesLegacy:
return "float_histogram_samples_legacy"
case HistogramSamples:
return "histogram_samples"
case FloatHistogramSamples:
return "float_histogram_samples"
case CustomBucketsHistogramSamples:
return "custom_buckets_histogram_samples"
case CustomBucketsFloatHistogramSamples:
return "custom_buckets_float_histogram_samples"
case MmapMarkers:
return "mmapmarkers"
case Metadata:
@ -215,7 +215,7 @@ func (d *Decoder) Type(rec []byte) Type {
return Unknown
}
switch t := Type(rec[0]); t {
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamplesLegacy, FloatHistogramSamplesLegacy, HistogramSamples, FloatHistogramSamples:
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples, CustomBucketsHistogramSamples, CustomBucketsFloatHistogramSamples:
return t
}
return Unknown
@ -436,7 +436,7 @@ func (d *Decoder) MmapMarkers(rec []byte, markers []RefMmapMarker) ([]RefMmapMar
func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) ([]RefHistogramSample, error) {
dec := encoding.Decbuf{B: rec}
t := Type(dec.Byte())
if t != HistogramSamples && t != HistogramSamplesLegacy {
if t != HistogramSamples && t != CustomBucketsHistogramSamples {
return nil, errors.New("invalid record type")
}
if dec.Len() == 0 {
@ -528,7 +528,7 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) {
func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) {
dec := encoding.Decbuf{B: rec}
t := Type(dec.Byte())
if t != FloatHistogramSamples && t != FloatHistogramSamplesLegacy {
if t != FloatHistogramSamples && t != CustomBucketsFloatHistogramSamples {
return nil, errors.New("invalid record type")
}
if dec.Len() == 0 {
@ -744,10 +744,44 @@ func (e *Encoder) MmapMarkers(markers []RefMmapMarker, b []byte) []byte {
return buf.Get()
}
func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) []byte {
func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]byte, []RefHistogramSample) {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(HistogramSamples))
if len(histograms) == 0 {
return buf.Get(), nil
}
var customBucketHistograms []RefHistogramSample
// Store base timestamp and base reference number of first histogram.
// All histograms encode their timestamp and ref as delta to those.
first := histograms[0]
buf.PutBE64(uint64(first.Ref))
buf.PutBE64int64(first.T)
for _, h := range histograms {
if h.H.UsesCustomBuckets() {
customBucketHistograms = append(customBucketHistograms, h)
continue
}
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
buf.PutVarint64(h.T - first.T)
EncodeHistogram(&buf, h.H)
}
// Reset buffer if only custom bucket histograms existed in list of histogram samples
if len(histograms) == len(customBucketHistograms) {
buf.Reset()
}
return buf.Get(), customBucketHistograms
}
func (e *Encoder) CustomBucketsHistogramSamples(histograms []RefHistogramSample, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(CustomBucketsHistogramSamples))
if len(histograms) == 0 {
return buf.Get()
}
@ -809,10 +843,45 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) {
}
}
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte {
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) ([]byte, []RefFloatHistogramSample) {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(FloatHistogramSamples))
if len(histograms) == 0 {
return buf.Get(), nil
}
var customBucketsFloatHistograms []RefFloatHistogramSample
// Store base timestamp and base reference number of first histogram.
// All histograms encode their timestamp and ref as delta to those.
first := histograms[0]
buf.PutBE64(uint64(first.Ref))
buf.PutBE64int64(first.T)
for _, h := range histograms {
if h.FH.UsesCustomBuckets() {
customBucketsFloatHistograms = append(customBucketsFloatHistograms, h)
continue
}
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
buf.PutVarint64(h.T - first.T)
EncodeFloatHistogram(&buf, h.FH)
}
// Reset buffer if only custom bucket histograms existed in list of histogram samples
if len(histograms) == len(customBucketsFloatHistograms) {
buf.Reset()
}
return buf.Get(), customBucketsFloatHistograms
}
func (e *Encoder) CustomBucketsFloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(CustomBucketsFloatHistogramSamples))
if len(histograms) == 0 {
return buf.Get()
}

View file

@ -166,9 +166,13 @@ func TestRecord_EncodeDecode(t *testing.T) {
},
}
histSamples := enc.HistogramSamples(histograms, nil)
histSamples, customBucketsHistograms := enc.HistogramSamples(histograms, nil)
customBucketsHistSamples := enc.CustomBucketsHistogramSamples(customBucketsHistograms, nil)
decHistograms, err := dec.HistogramSamples(histSamples, nil)
require.NoError(t, err)
decCustomBucketsHistograms, err := dec.HistogramSamples(customBucketsHistSamples, nil)
require.NoError(t, err)
decHistograms = append(decHistograms, decCustomBucketsHistograms...)
require.Equal(t, histograms, decHistograms)
floatHistograms := make([]RefFloatHistogramSample, len(histograms))
@ -179,9 +183,13 @@ func TestRecord_EncodeDecode(t *testing.T) {
FH: h.H.ToFloat(nil),
}
}
floatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil)
floatHistSamples, customBucketsFloatHistograms := enc.FloatHistogramSamples(floatHistograms, nil)
customBucketsFloatHistSamples := enc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, nil)
decFloatHistograms, err := dec.FloatHistogramSamples(floatHistSamples, nil)
require.NoError(t, err)
decCustomBucketsFloatHistograms, err := dec.FloatHistogramSamples(customBucketsFloatHistSamples, nil)
require.NoError(t, err)
decFloatHistograms = append(decFloatHistograms, decCustomBucketsFloatHistograms...)
require.Equal(t, floatHistograms, decFloatHistograms)
// Gauge integer histograms.
@ -189,9 +197,13 @@ func TestRecord_EncodeDecode(t *testing.T) {
histograms[i].H.CounterResetHint = histogram.GaugeType
}
gaugeHistSamples := enc.HistogramSamples(histograms, nil)
gaugeHistSamples, customBucketsGaugeHistograms := enc.HistogramSamples(histograms, nil)
customBucketsGaugeHistSamples := enc.CustomBucketsHistogramSamples(customBucketsGaugeHistograms, nil)
decGaugeHistograms, err := dec.HistogramSamples(gaugeHistSamples, nil)
require.NoError(t, err)
decCustomBucketsGaugeHistograms, err := dec.HistogramSamples(customBucketsGaugeHistSamples, nil)
require.NoError(t, err)
decGaugeHistograms = append(decGaugeHistograms, decCustomBucketsGaugeHistograms...)
require.Equal(t, histograms, decGaugeHistograms)
// Gauge float histograms.
@ -199,9 +211,12 @@ func TestRecord_EncodeDecode(t *testing.T) {
floatHistograms[i].FH.CounterResetHint = histogram.GaugeType
}
gaugeFloatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil)
gaugeFloatHistSamples, customBucketsGaugeFloatHistograms := enc.FloatHistogramSamples(floatHistograms, nil)
customBucketsGaugeFloatHistSamples := enc.CustomBucketsFloatHistogramSamples(customBucketsGaugeFloatHistograms, nil)
decGaugeFloatHistograms, err := dec.FloatHistogramSamples(gaugeFloatHistSamples, nil)
require.NoError(t, err)
decCustomBucketsGaugeFloatHistograms, err := dec.FloatHistogramSamples(customBucketsGaugeFloatHistSamples, nil)
decGaugeFloatHistograms = append(decGaugeFloatHistograms, decCustomBucketsGaugeFloatHistograms...)
require.Equal(t, floatHistograms, decGaugeFloatHistograms)
}
@ -303,10 +318,14 @@ func TestRecord_Corrupted(t *testing.T) {
},
}
corruptedHists := enc.HistogramSamples(histograms, nil)
corruptedHists, customBucketsHists := enc.HistogramSamples(histograms, nil)
corruptedHists = corruptedHists[:8]
corruptedCustomBucketsHists := enc.CustomBucketsHistogramSamples(customBucketsHists, nil)
corruptedCustomBucketsHists = corruptedCustomBucketsHists[:8]
_, err := dec.HistogramSamples(corruptedHists, nil)
require.ErrorIs(t, err, encoding.ErrInvalidSize)
_, err = dec.HistogramSamples(corruptedCustomBucketsHists, nil)
require.ErrorIs(t, err, encoding.ErrInvalidSize)
})
}
@ -364,9 +383,12 @@ func TestRecord_Type(t *testing.T) {
},
},
}
hists := enc.HistogramSamples(histograms, nil)
hists, customBucketsHistograms := enc.HistogramSamples(histograms, nil)
recordType = dec.Type(hists)
require.Equal(t, HistogramSamples, recordType)
customBucketsHists := enc.CustomBucketsHistogramSamples(customBucketsHistograms, nil)
recordType = dec.Type(customBucketsHists)
require.Equal(t, CustomBucketsHistogramSamples, recordType)
recordType = dec.Type(nil)
require.Equal(t, Unknown, recordType)

View file

@ -29,13 +29,13 @@ import (
)
const (
float = "float"
intHistogram = "integer histogram"
floatHistogram = "float histogram"
customBucketIntHistogram = "custom bucket int histogram"
customBucketFloatHistogram = "custom bucket float histogram"
gaugeIntHistogram = "gauge int histogram"
gaugeFloatHistogram = "gauge float histogram"
float = "float"
intHistogram = "integer histogram"
floatHistogram = "float histogram"
customBucketsIntHistogram = "custom buckets int histogram"
customBucketsFloatHistogram = "custom buckets float histogram"
gaugeIntHistogram = "gauge int histogram"
gaugeFloatHistogram = "gauge float histogram"
)
type testValue struct {
@ -84,7 +84,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
},
},
customBucketIntHistogram: {
customBucketsIntHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, h: tsdbutil.GenerateTestCustomBucketsHistogram(int(value))}
@ -95,7 +95,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
return sample{t: ts, h: tsdbutil.GenerateTestCustomBucketsHistogram(int(value))}
},
},
customBucketFloatHistogram: {
customBucketsFloatHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, fh: tsdbutil.GenerateTestCustomBucketsFloatHistogram(int(value))}

View file

@ -208,7 +208,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
stats.TotalSamples += len(samples)
stats.DroppedSamples += len(samples) - len(repl)
case record.HistogramSamples, record.HistogramSamplesLegacy:
case record.HistogramSamples:
histogramSamples, err = dec.HistogramSamples(rec, histogramSamples)
if err != nil {
return nil, fmt.Errorf("decode histogram samples: %w", err)
@ -221,11 +221,25 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
}
}
if len(repl) > 0 {
buf = enc.HistogramSamples(repl, buf)
buf, _ = enc.HistogramSamples(repl, buf)
}
stats.TotalSamples += len(histogramSamples)
stats.DroppedSamples += len(histogramSamples) - len(repl)
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
case record.CustomBucketsHistogramSamples:
histogramSamples, err = dec.HistogramSamples(rec, histogramSamples)
// Drop irrelevant histogramSamples in place.
repl := histogramSamples[:0]
for _, h := range histogramSamples {
if h.T >= mint {
repl = append(repl, h)
}
}
if len(repl) > 0 {
buf = enc.CustomBucketsHistogramSamples(repl, buf)
}
stats.TotalSamples += len(histogramSamples)
stats.DroppedSamples += len(histogramSamples) - len(repl)
case record.FloatHistogramSamples:
floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples)
if err != nil {
return nil, fmt.Errorf("decode float histogram samples: %w", err)
@ -238,7 +252,24 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
}
}
if len(repl) > 0 {
buf = enc.FloatHistogramSamples(repl, buf)
buf, _ = enc.FloatHistogramSamples(repl, buf)
}
stats.TotalSamples += len(floatHistogramSamples)
stats.DroppedSamples += len(floatHistogramSamples) - len(repl)
case record.CustomBucketsFloatHistogramSamples:
floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples)
if err != nil {
return nil, fmt.Errorf("decode float histogram samples: %w", err)
}
// Drop irrelevant floatHistogramSamples in place.
repl := floatHistogramSamples[:0]
for _, fh := range floatHistogramSamples {
if fh.T >= mint {
repl = append(repl, fh)
}
}
if len(repl) > 0 {
buf = enc.CustomBucketsFloatHistogramSamples(repl, buf)
}
stats.TotalSamples += len(floatHistogramSamples)
stats.DroppedSamples += len(floatHistogramSamples) - len(repl)

View file

@ -236,7 +236,7 @@ func TestCheckpoint(t *testing.T) {
require.NoError(t, w.Log(b))
samplesInWAL += 4
h := makeHistogram(i)
b = enc.HistogramSamples([]record.RefHistogramSample{
b, _ = enc.HistogramSamples([]record.RefHistogramSample{
{Ref: 0, T: last, H: h},
{Ref: 1, T: last + 10000, H: h},
{Ref: 2, T: last + 20000, H: h},
@ -245,7 +245,7 @@ func TestCheckpoint(t *testing.T) {
require.NoError(t, w.Log(b))
histogramsInWAL += 4
cbh := makeCustomBucketHistogram(i)
b = enc.HistogramSamples([]record.RefHistogramSample{
b = enc.CustomBucketsHistogramSamples([]record.RefHistogramSample{
{Ref: 0, T: last, H: cbh},
{Ref: 1, T: last + 10000, H: cbh},
{Ref: 2, T: last + 20000, H: cbh},
@ -254,7 +254,7 @@ func TestCheckpoint(t *testing.T) {
require.NoError(t, w.Log(b))
histogramsInWAL += 4
fh := makeFloatHistogram(i)
b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
b, _ = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
{Ref: 0, T: last, FH: fh},
{Ref: 1, T: last + 10000, FH: fh},
{Ref: 2, T: last + 20000, FH: fh},
@ -263,7 +263,7 @@ func TestCheckpoint(t *testing.T) {
require.NoError(t, w.Log(b))
floatHistogramsInWAL += 4
cbfh := makeCustomBucketFloatHistogram(i)
b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
b = enc.CustomBucketsFloatHistogramSamples([]record.RefFloatHistogramSample{
{Ref: 0, T: last, FH: cbfh},
{Ref: 1, T: last + 10000, FH: cbfh},
{Ref: 2, T: last + 20000, FH: cbfh},
@ -330,14 +330,14 @@ func TestCheckpoint(t *testing.T) {
require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp")
}
samplesInCheckpoint += len(samples)
case record.HistogramSamples, record.HistogramSamplesLegacy:
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
histograms, err := dec.HistogramSamples(rec, nil)
require.NoError(t, err)
for _, h := range histograms {
require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp")
}
histogramsInCheckpoint += len(histograms)
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
floatHistograms, err := dec.FloatHistogramSamples(rec, nil)
require.NoError(t, err)
for _, h := range floatHistograms {

View file

@ -546,7 +546,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
}
w.writer.AppendExemplars(exemplars)
case record.HistogramSamples, record.HistogramSamplesLegacy:
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms {
break
@ -574,7 +574,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
histogramsToSend = histogramsToSend[:0]
}
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms {
break

View file

@ -209,7 +209,7 @@ func TestTailSamples(t *testing.T) {
NegativeBuckets: []int64{int64(-i) - 1},
}
histograms := enc.HistogramSamples([]record.RefHistogramSample{{
histograms, _ := enc.HistogramSamples([]record.RefHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
H: hist,
@ -226,21 +226,21 @@ func TestTailSamples(t *testing.T) {
CustomValues: []float64{float64(i) + 2},
}
customBucketHistograms := enc.HistogramSamples([]record.RefHistogramSample{{
customBucketHistograms := enc.CustomBucketsHistogramSamples([]record.RefHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
H: customBucketHist,
}}, nil)
require.NoError(t, w.Log(customBucketHistograms))
floatHistograms := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
floatHistograms, _ := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
FH: hist.ToFloat(nil),
}}, nil)
require.NoError(t, w.Log(floatHistograms))
customBucketFloatHistograms := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
customBucketFloatHistograms := enc.CustomBucketsFloatHistogramSamples([]record.RefFloatHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
FH: customBucketHist.ToFloat(nil),