mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-02 08:31:11 -08:00
Rename old histogram record type, use old names for new records
This commit is contained in:
parent
454f6d39ca
commit
6684344026
|
@ -463,7 +463,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
return
|
||||
}
|
||||
decoded <- samples
|
||||
case record.HistogramSamples, record.CustomBucketHistogramSamples:
|
||||
case record.HistogramSamples, record.HistogramSamplesLegacy:
|
||||
histograms := histogramsPool.Get()[:0]
|
||||
histograms, err = dec.HistogramSamples(rec, histograms)
|
||||
if err != nil {
|
||||
|
@ -475,7 +475,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
return
|
||||
}
|
||||
decoded <- histograms
|
||||
case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples:
|
||||
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
|
||||
floatHistograms := floatHistogramsPool.Get()[:0]
|
||||
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
|
||||
if err != nil {
|
||||
|
@ -1154,35 +1154,19 @@ func (a *appender) log() error {
|
|||
}
|
||||
|
||||
if len(a.pendingHistograms) > 0 {
|
||||
var customBucketsExist bool
|
||||
buf, customBucketsExist = encoder.HistogramSamples(a.pendingHistograms, buf)
|
||||
buf = encoder.HistogramSamples(a.pendingHistograms, buf)
|
||||
if err := a.wal.Log(buf); err != nil {
|
||||
return err
|
||||
}
|
||||
buf = buf[:0]
|
||||
if customBucketsExist {
|
||||
buf = encoder.CustomBucketHistogramSamples(a.pendingHistograms, buf)
|
||||
if err := a.wal.Log(buf); err != nil {
|
||||
return err
|
||||
}
|
||||
buf = buf[:0]
|
||||
}
|
||||
}
|
||||
|
||||
if len(a.pendingFloatHistograms) > 0 {
|
||||
var customBucketsExist bool
|
||||
buf, customBucketsExist = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf)
|
||||
buf = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf)
|
||||
if err := a.wal.Log(buf); err != nil {
|
||||
return err
|
||||
}
|
||||
buf = buf[:0]
|
||||
if customBucketsExist {
|
||||
buf = encoder.CustomBucketFloatHistogramSamples(a.pendingFloatHistograms, buf)
|
||||
if err := a.wal.Log(buf); err != nil {
|
||||
return err
|
||||
}
|
||||
buf = buf[:0]
|
||||
}
|
||||
}
|
||||
|
||||
if len(a.pendingExamplars) > 0 {
|
||||
|
|
|
@ -163,7 +163,7 @@ func TestCommit(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"_custom_bucket_histogram", numSeries)
|
||||
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
|
@ -187,7 +187,7 @@ func TestCommit(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"custom_bucket_float_histogram", numSeries)
|
||||
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
|
@ -231,13 +231,13 @@ func TestCommit(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
walSamplesCount += len(samples)
|
||||
|
||||
case record.HistogramSamples, record.CustomBucketHistogramSamples:
|
||||
case record.HistogramSamples, record.HistogramSamplesLegacy:
|
||||
var histograms []record.RefHistogramSample
|
||||
histograms, err = dec.HistogramSamples(rec, histograms)
|
||||
require.NoError(t, err)
|
||||
walHistogramCount += len(histograms)
|
||||
|
||||
case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples:
|
||||
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
|
||||
var floatHistograms []record.RefFloatHistogramSample
|
||||
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
|
||||
require.NoError(t, err)
|
||||
|
@ -294,6 +294,18 @@ func TestRollback(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
|
||||
|
||||
for i := 0; i < numHistograms; i++ {
|
||||
_, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
@ -306,6 +318,18 @@ func TestRollback(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
|
||||
|
||||
for i := 0; i < numHistograms; i++ {
|
||||
_, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i])
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Do a rollback, which should clear uncommitted data. A followup call to
|
||||
// commit should persist nothing to the WAL.
|
||||
require.NoError(t, app.Rollback())
|
||||
|
@ -346,13 +370,13 @@ func TestRollback(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
walExemplarsCount += len(exemplars)
|
||||
|
||||
case record.HistogramSamples:
|
||||
case record.HistogramSamples, record.HistogramSamplesLegacy:
|
||||
var histograms []record.RefHistogramSample
|
||||
histograms, err = dec.HistogramSamples(rec, histograms)
|
||||
require.NoError(t, err)
|
||||
walHistogramCount += len(histograms)
|
||||
|
||||
case record.FloatHistogramSamples:
|
||||
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
|
||||
var floatHistograms []record.RefFloatHistogramSample
|
||||
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
|
||||
require.NoError(t, err)
|
||||
|
@ -363,7 +387,7 @@ func TestRollback(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that only series get stored after calling Rollback.
|
||||
require.Equal(t, numSeries*3, walSeriesCount, "series should have been written to WAL")
|
||||
require.Equal(t, numSeries*5, walSeriesCount, "series should have been written to WAL")
|
||||
require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL")
|
||||
require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL")
|
||||
require.Equal(t, 0, walHistogramCount, "histograms should not have been written to WAL")
|
||||
|
@ -412,6 +436,19 @@ func TestFullTruncateWAL(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
|
||||
|
||||
for i := 0; i < numHistograms; i++ {
|
||||
_, err := app.AppendHistogram(0, lset, int64(lastTs), histograms[i], nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
@ -425,11 +462,24 @@ func TestFullTruncateWAL(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
|
||||
|
||||
for i := 0; i < numHistograms; i++ {
|
||||
_, err := app.AppendHistogram(0, lset, int64(lastTs), nil, floatHistograms[i])
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
// Truncate WAL with mint to GC all the samples.
|
||||
s.truncate(lastTs + 1)
|
||||
|
||||
m := gatherFamily(t, reg, "prometheus_agent_deleted_series")
|
||||
require.Equal(t, float64(numSeries*3), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count")
|
||||
require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count")
|
||||
}
|
||||
|
||||
func TestPartialTruncateWAL(t *testing.T) {
|
||||
|
|
186
tsdb/db_test.go
186
tsdb/db_test.go
|
@ -4281,6 +4281,188 @@ func TestOOOWALWrite(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
"custom buckets histogram": {
|
||||
appendSample: func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) {
|
||||
seriesRef, err := app.AppendHistogram(0, l, minutes(mins), tsdbutil.GenerateTestCustomBucketsHistogram(int(mins)), nil)
|
||||
require.NoError(t, err)
|
||||
return seriesRef, nil
|
||||
},
|
||||
expectedOOORecords: []interface{}{
|
||||
// The MmapRef in this are not hand calculated, and instead taken from the test run.
|
||||
// What is important here is the order of records, and that MmapRef increases for each record.
|
||||
[]record.RefMmapMarker{
|
||||
{Ref: 1},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 1, T: minutes(40), H: tsdbutil.GenerateTestCustomBucketsHistogram(40)},
|
||||
},
|
||||
|
||||
[]record.RefMmapMarker{
|
||||
{Ref: 2},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 2, T: minutes(42), H: tsdbutil.GenerateTestCustomBucketsHistogram(42)},
|
||||
},
|
||||
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 2, T: minutes(45), H: tsdbutil.GenerateTestCustomBucketsHistogram(45)},
|
||||
{Ref: 1, T: minutes(35), H: tsdbutil.GenerateTestCustomBucketsHistogram(35)},
|
||||
},
|
||||
[]record.RefMmapMarker{ // 3rd sample, hence m-mapped.
|
||||
{Ref: 1, MmapRef: 0x100000000 + 8},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 1, T: minutes(36), H: tsdbutil.GenerateTestCustomBucketsHistogram(36)},
|
||||
{Ref: 1, T: minutes(37), H: tsdbutil.GenerateTestCustomBucketsHistogram(37)},
|
||||
},
|
||||
|
||||
[]record.RefMmapMarker{ // 3rd sample, hence m-mapped.
|
||||
{Ref: 1, MmapRef: 0x100000000 + 82},
|
||||
},
|
||||
[]record.RefHistogramSample{ // Does not contain the in-order sample here.
|
||||
{Ref: 1, T: minutes(50), H: tsdbutil.GenerateTestCustomBucketsHistogram(50)},
|
||||
},
|
||||
|
||||
// Single commit but multiple OOO records.
|
||||
[]record.RefMmapMarker{
|
||||
{Ref: 2, MmapRef: 0x100000000 + 160},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 2, T: minutes(50), H: tsdbutil.GenerateTestCustomBucketsHistogram(50)},
|
||||
{Ref: 2, T: minutes(51), H: tsdbutil.GenerateTestCustomBucketsHistogram(51)},
|
||||
},
|
||||
[]record.RefMmapMarker{
|
||||
{Ref: 2, MmapRef: 0x100000000 + 239},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 2, T: minutes(52), H: tsdbutil.GenerateTestCustomBucketsHistogram(52)},
|
||||
{Ref: 2, T: minutes(53), H: tsdbutil.GenerateTestCustomBucketsHistogram(53)},
|
||||
},
|
||||
},
|
||||
expectedInORecords: []interface{}{
|
||||
[]record.RefSeries{
|
||||
{Ref: 1, Labels: s1},
|
||||
{Ref: 2, Labels: s2},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 1, T: minutes(60), H: tsdbutil.GenerateTestCustomBucketsHistogram(60)},
|
||||
{Ref: 2, T: minutes(60), H: tsdbutil.GenerateTestCustomBucketsHistogram(60)},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 1, T: minutes(40), H: tsdbutil.GenerateTestCustomBucketsHistogram(40)},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 2, T: minutes(42), H: tsdbutil.GenerateTestCustomBucketsHistogram(42)},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 2, T: minutes(45), H: tsdbutil.GenerateTestCustomBucketsHistogram(45)},
|
||||
{Ref: 1, T: minutes(35), H: tsdbutil.GenerateTestCustomBucketsHistogram(35)},
|
||||
{Ref: 1, T: minutes(36), H: tsdbutil.GenerateTestCustomBucketsHistogram(36)},
|
||||
{Ref: 1, T: minutes(37), H: tsdbutil.GenerateTestCustomBucketsHistogram(37)},
|
||||
},
|
||||
[]record.RefHistogramSample{ // Contains both in-order and ooo sample.
|
||||
{Ref: 1, T: minutes(50), H: tsdbutil.GenerateTestCustomBucketsHistogram(50)},
|
||||
{Ref: 2, T: minutes(65), H: tsdbutil.GenerateTestCustomBucketsHistogram(65)},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 2, T: minutes(50), H: tsdbutil.GenerateTestCustomBucketsHistogram(50)},
|
||||
{Ref: 2, T: minutes(51), H: tsdbutil.GenerateTestCustomBucketsHistogram(51)},
|
||||
{Ref: 2, T: minutes(52), H: tsdbutil.GenerateTestCustomBucketsHistogram(52)},
|
||||
{Ref: 2, T: minutes(53), H: tsdbutil.GenerateTestCustomBucketsHistogram(53)},
|
||||
},
|
||||
},
|
||||
},
|
||||
"custom buckets float histogram": {
|
||||
appendSample: func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) {
|
||||
seriesRef, err := app.AppendHistogram(0, l, minutes(mins), nil, tsdbutil.GenerateTestCustomBucketsFloatHistogram(int(mins)))
|
||||
require.NoError(t, err)
|
||||
return seriesRef, nil
|
||||
},
|
||||
expectedOOORecords: []interface{}{
|
||||
// The MmapRef in this are not hand calculated, and instead taken from the test run.
|
||||
// What is important here is the order of records, and that MmapRef increases for each record.
|
||||
[]record.RefMmapMarker{
|
||||
{Ref: 1},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{
|
||||
{Ref: 1, T: minutes(40), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(40)},
|
||||
},
|
||||
|
||||
[]record.RefMmapMarker{
|
||||
{Ref: 2},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{
|
||||
{Ref: 2, T: minutes(42), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(42)},
|
||||
},
|
||||
|
||||
[]record.RefFloatHistogramSample{
|
||||
{Ref: 2, T: minutes(45), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(45)},
|
||||
{Ref: 1, T: minutes(35), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(35)},
|
||||
},
|
||||
[]record.RefMmapMarker{ // 3rd sample, hence m-mapped.
|
||||
{Ref: 1, MmapRef: 0x100000000 + 8},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{
|
||||
{Ref: 1, T: minutes(36), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(36)},
|
||||
{Ref: 1, T: minutes(37), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(37)},
|
||||
},
|
||||
|
||||
[]record.RefMmapMarker{ // 3rd sample, hence m-mapped.
|
||||
{Ref: 1, MmapRef: 0x100000000 + 134},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{ // Does not contain the in-order sample here.
|
||||
{Ref: 1, T: minutes(50), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(50)},
|
||||
},
|
||||
|
||||
// Single commit but multiple OOO records.
|
||||
[]record.RefMmapMarker{
|
||||
{Ref: 2, MmapRef: 0x100000000 + 263},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{
|
||||
{Ref: 2, T: minutes(50), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(50)},
|
||||
{Ref: 2, T: minutes(51), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(51)},
|
||||
},
|
||||
[]record.RefMmapMarker{
|
||||
{Ref: 2, MmapRef: 0x100000000 + 393},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{
|
||||
{Ref: 2, T: minutes(52), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(52)},
|
||||
{Ref: 2, T: minutes(53), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(53)},
|
||||
},
|
||||
},
|
||||
expectedInORecords: []interface{}{
|
||||
[]record.RefSeries{
|
||||
{Ref: 1, Labels: s1},
|
||||
{Ref: 2, Labels: s2},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{
|
||||
{Ref: 1, T: minutes(60), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(60)},
|
||||
{Ref: 2, T: minutes(60), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(60)},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{
|
||||
{Ref: 1, T: minutes(40), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(40)},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{
|
||||
{Ref: 2, T: minutes(42), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(42)},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{
|
||||
{Ref: 2, T: minutes(45), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(45)},
|
||||
{Ref: 1, T: minutes(35), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(35)},
|
||||
{Ref: 1, T: minutes(36), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(36)},
|
||||
{Ref: 1, T: minutes(37), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(37)},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{ // Contains both in-order and ooo sample.
|
||||
{Ref: 1, T: minutes(50), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(50)},
|
||||
{Ref: 2, T: minutes(65), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(65)},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{
|
||||
{Ref: 2, T: minutes(50), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(50)},
|
||||
{Ref: 2, T: minutes(51), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(51)},
|
||||
{Ref: 2, T: minutes(52), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(52)},
|
||||
{Ref: 2, T: minutes(53), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(53)},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for name, scenario := range scenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
|
@ -4374,11 +4556,11 @@ func testOOOWALWrite(t *testing.T,
|
|||
markers, err := dec.MmapMarkers(rec, nil)
|
||||
require.NoError(t, err)
|
||||
records = append(records, markers)
|
||||
case record.HistogramSamples:
|
||||
case record.HistogramSamples, record.HistogramSamplesLegacy:
|
||||
histogramSamples, err := dec.HistogramSamples(rec, nil)
|
||||
require.NoError(t, err)
|
||||
records = append(records, histogramSamples)
|
||||
case record.FloatHistogramSamples:
|
||||
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
|
||||
floatHistogramSamples, err := dec.FloatHistogramSamples(rec, nil)
|
||||
require.NoError(t, err)
|
||||
records = append(records, floatHistogramSamples)
|
||||
|
|
|
@ -942,33 +942,18 @@ func (a *headAppender) log() error {
|
|||
}
|
||||
}
|
||||
if len(a.histograms) > 0 {
|
||||
rec, customBucketsExist := enc.HistogramSamples(a.histograms, buf)
|
||||
rec = enc.HistogramSamples(a.histograms, buf)
|
||||
buf = rec[:0]
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log histograms: %w", err)
|
||||
}
|
||||
|
||||
if customBucketsExist {
|
||||
enc.CustomBucketHistogramSamples(a.histograms, buf)
|
||||
buf = rec[:0]
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log custom bucket histograms: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(a.floatHistograms) > 0 {
|
||||
rec, customBucketsExist := enc.FloatHistogramSamples(a.floatHistograms, buf)
|
||||
rec = enc.FloatHistogramSamples(a.floatHistograms, buf)
|
||||
buf = rec[:0]
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log float histograms: %w", err)
|
||||
}
|
||||
|
||||
if customBucketsExist {
|
||||
buf = rec[:0]
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log custom bucket float histograms: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Exemplars should be logged after samples (float/native histogram/etc),
|
||||
// otherwise it might happen that we send the exemplars in a remote write
|
||||
|
|
|
@ -187,11 +187,11 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
|
|||
samples, err := dec.Samples(rec, nil)
|
||||
require.NoError(t, err)
|
||||
recs = append(recs, samples)
|
||||
case record.HistogramSamples:
|
||||
case record.HistogramSamples, record.HistogramSamplesLegacy:
|
||||
samples, err := dec.HistogramSamples(rec, nil)
|
||||
require.NoError(t, err)
|
||||
recs = append(recs, samples)
|
||||
case record.FloatHistogramSamples:
|
||||
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
|
||||
samples, err := dec.FloatHistogramSamples(rec, nil)
|
||||
require.NoError(t, err)
|
||||
recs = append(recs, samples)
|
||||
|
@ -740,89 +740,6 @@ func TestHead_ReadWAL(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHead_ReadWAL2(t *testing.T) {
|
||||
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
|
||||
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||
entries := []interface{}{
|
||||
[]record.RefSeries{
|
||||
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
||||
{Ref: 11, Labels: labels.FromStrings("a", "2")},
|
||||
{Ref: 100, Labels: labels.FromStrings("a", "3")},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 0, T: 99, H: tsdbutil.GenerateTestHistogram(1)},
|
||||
{Ref: 10, T: 100, H: tsdbutil.GenerateTestCustomBucketsHistogram(2)},
|
||||
{Ref: 100, T: 100, H: tsdbutil.GenerateTestHistogram(3)},
|
||||
},
|
||||
[]record.RefSeries{
|
||||
{Ref: 50, Labels: labels.FromStrings("a", "4")},
|
||||
// This series has two refs pointing to it.
|
||||
{Ref: 101, Labels: labels.FromStrings("a", "3")},
|
||||
},
|
||||
[]record.RefHistogramSample{
|
||||
{Ref: 10, T: 101, H: tsdbutil.GenerateTestHistogram(5)},
|
||||
{Ref: 50, T: 101, H: tsdbutil.GenerateTestHistogram(6)},
|
||||
{Ref: 101, T: 101, H: tsdbutil.GenerateTestCustomBucketsHistogram(7)},
|
||||
},
|
||||
[]tombstones.Stone{
|
||||
{Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}},
|
||||
},
|
||||
[]record.RefExemplar{
|
||||
{Ref: 10, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")},
|
||||
},
|
||||
}
|
||||
|
||||
head, w := newTestHead(t, 1000, compress, false)
|
||||
defer func() {
|
||||
require.NoError(t, head.Close())
|
||||
}()
|
||||
|
||||
populateTestWL(t, w, entries)
|
||||
|
||||
require.NoError(t, head.Init(math.MinInt64))
|
||||
require.Equal(t, uint64(101), head.lastSeriesID.Load())
|
||||
|
||||
s10 := head.series.getByID(10)
|
||||
s11 := head.series.getByID(11)
|
||||
s50 := head.series.getByID(50)
|
||||
s100 := head.series.getByID(100)
|
||||
|
||||
testutil.RequireEqual(t, labels.FromStrings("a", "1"), s10.lset)
|
||||
require.Nil(t, s11) // Series without samples should be garbage collected at head.Init().
|
||||
testutil.RequireEqual(t, labels.FromStrings("a", "4"), s50.lset)
|
||||
testutil.RequireEqual(t, labels.FromStrings("a", "3"), s100.lset)
|
||||
|
||||
expandChunk := func(c chunkenc.Iterator) (x []sample) {
|
||||
for c.Next() == chunkenc.ValHistogram {
|
||||
t, v := c.AtHistogram(nil)
|
||||
//t, v := c.At()
|
||||
x = append(x, sample{t: t, h: v})
|
||||
}
|
||||
require.NoError(t, c.Err())
|
||||
return x
|
||||
}
|
||||
|
||||
c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{100, 0, tsdbutil.GenerateTestCustomBucketsHistogram(2), nil}, {101, 0, tsdbutil.GenerateTestCustomBucketsHistogram(5), nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{101, 0, tsdbutil.GenerateTestHistogram(6), nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
// The samples before the new series record should be discarded since a duplicate record
|
||||
// is only possible when old samples were compacted.
|
||||
c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{101, 0, tsdbutil.GenerateTestCustomBucketsHistogram(7), nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
|
||||
q, err := head.ExemplarQuerier(context.Background())
|
||||
require.NoError(t, err)
|
||||
e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")})
|
||||
require.NoError(t, err)
|
||||
require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0]))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHead_WALMultiRef(t *testing.T) {
|
||||
head, w := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||
|
||||
|
@ -4036,194 +3953,6 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
|||
testQuery()
|
||||
}
|
||||
|
||||
func TestHistogramInWALAndMmapChunk2(t *testing.T) {
|
||||
head, _ := newTestHead(t, 3000, wlog.CompressionNone, false)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, head.Close())
|
||||
})
|
||||
require.NoError(t, head.Init(0))
|
||||
|
||||
// Series with only histograms.
|
||||
s1 := labels.FromStrings("a", "b1")
|
||||
k1 := s1.String()
|
||||
numHistograms := 300
|
||||
exp := map[string][]chunks.Sample{}
|
||||
ts := int64(0)
|
||||
var app storage.Appender
|
||||
for _, custom := range []bool{true, false} {
|
||||
app = head.Appender(context.Background())
|
||||
var hists []*histogram.Histogram
|
||||
if custom {
|
||||
hists = tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
|
||||
} else {
|
||||
hists = tsdbutil.GenerateTestHistograms(numHistograms)
|
||||
}
|
||||
for _, h := range hists {
|
||||
if !custom {
|
||||
h.NegativeSpans = h.PositiveSpans
|
||||
h.NegativeBuckets = h.PositiveBuckets
|
||||
}
|
||||
_, err := app.AppendHistogram(0, s1, ts, h, nil)
|
||||
require.NoError(t, err)
|
||||
exp[k1] = append(exp[k1], sample{t: ts, h: h.Copy()})
|
||||
ts++
|
||||
if ts%5 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
for _, custom := range []bool{true, false} {
|
||||
app = head.Appender(context.Background())
|
||||
var hists []*histogram.FloatHistogram
|
||||
if custom {
|
||||
hists = tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
|
||||
} else {
|
||||
hists = tsdbutil.GenerateTestFloatHistograms(numHistograms)
|
||||
}
|
||||
for _, h := range hists {
|
||||
if !custom {
|
||||
h.NegativeSpans = h.PositiveSpans
|
||||
h.NegativeBuckets = h.PositiveBuckets
|
||||
}
|
||||
_, err := app.AppendHistogram(0, s1, ts, nil, h)
|
||||
require.NoError(t, err)
|
||||
exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()})
|
||||
ts++
|
||||
if ts%5 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
head.mmapHeadChunks()
|
||||
}
|
||||
|
||||
// There should be 20 mmap chunks in s1.
|
||||
ms := head.series.getByHash(s1.Hash(), s1)
|
||||
require.Len(t, ms.mmappedChunks, 19)
|
||||
expMmapChunks := make([]*mmappedChunk, 0, 20)
|
||||
for _, mmap := range ms.mmappedChunks {
|
||||
require.Positive(t, mmap.numSamples)
|
||||
cpy := *mmap
|
||||
expMmapChunks = append(expMmapChunks, &cpy)
|
||||
}
|
||||
expHeadChunkSamples := ms.headChunks.chunk.NumSamples()
|
||||
require.Positive(t, expHeadChunkSamples)
|
||||
|
||||
// Series with mix of histograms and float.
|
||||
s2 := labels.FromStrings("a", "b2")
|
||||
k2 := s2.String()
|
||||
ts = 0
|
||||
for _, custom := range []bool{true, false} {
|
||||
app = head.Appender(context.Background())
|
||||
var hists []*histogram.Histogram
|
||||
if custom {
|
||||
hists = tsdbutil.GenerateTestCustomBucketsHistograms(100)
|
||||
} else {
|
||||
hists = tsdbutil.GenerateTestHistograms(100)
|
||||
}
|
||||
for _, h := range hists {
|
||||
ts++
|
||||
if !custom {
|
||||
h.NegativeSpans = h.PositiveSpans
|
||||
h.NegativeBuckets = h.PositiveBuckets
|
||||
}
|
||||
_, err := app.AppendHistogram(0, s2, ts, h, nil)
|
||||
require.NoError(t, err)
|
||||
eh := h.Copy()
|
||||
if ts > 30 && (ts-10)%20 == 1 {
|
||||
// Need "unknown" hint after float sample.
|
||||
eh.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
exp[k2] = append(exp[k2], sample{t: ts, h: eh})
|
||||
if ts%20 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
// Add some float.
|
||||
for i := 0; i < 10; i++ {
|
||||
ts++
|
||||
_, err := app.Append(0, s2, ts, float64(ts))
|
||||
require.NoError(t, err)
|
||||
exp[k2] = append(exp[k2], sample{t: ts, f: float64(ts)})
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
for _, custom := range []bool{true, false} {
|
||||
app = head.Appender(context.Background())
|
||||
var hists []*histogram.FloatHistogram
|
||||
if custom {
|
||||
hists = tsdbutil.GenerateTestCustomBucketsFloatHistograms(100)
|
||||
} else {
|
||||
hists = tsdbutil.GenerateTestFloatHistograms(100)
|
||||
}
|
||||
for _, h := range hists {
|
||||
ts++
|
||||
if !custom {
|
||||
h.NegativeSpans = h.PositiveSpans
|
||||
h.NegativeBuckets = h.PositiveBuckets
|
||||
}
|
||||
_, err := app.AppendHistogram(0, s2, ts, nil, h)
|
||||
require.NoError(t, err)
|
||||
eh := h.Copy()
|
||||
if ts > 30 && (ts-10)%20 == 1 {
|
||||
// Need "unknown" hint after float sample.
|
||||
eh.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
exp[k2] = append(exp[k2], sample{t: ts, fh: eh})
|
||||
if ts%20 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
// Add some float.
|
||||
for i := 0; i < 10; i++ {
|
||||
ts++
|
||||
_, err := app.Append(0, s2, ts, float64(ts))
|
||||
require.NoError(t, err)
|
||||
exp[k2] = append(exp[k2], sample{t: ts, f: float64(ts)})
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
// Restart head.
|
||||
require.NoError(t, head.Close())
|
||||
startHead := func() {
|
||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
|
||||
require.NoError(t, err)
|
||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(0))
|
||||
}
|
||||
startHead()
|
||||
|
||||
// Checking contents of s1.
|
||||
ms = head.series.getByHash(s1.Hash(), s1)
|
||||
require.Equal(t, expMmapChunks, ms.mmappedChunks)
|
||||
require.Equal(t, expHeadChunkSamples, ms.headChunks.chunk.NumSamples())
|
||||
|
||||
testQuery := func() {
|
||||
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
|
||||
require.NoError(t, err)
|
||||
act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*"))
|
||||
compareSeries(t, exp, act)
|
||||
}
|
||||
testQuery()
|
||||
|
||||
// Restart with no mmap chunks to test WAL replay.
|
||||
require.NoError(t, head.Close())
|
||||
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
|
||||
startHead()
|
||||
testQuery()
|
||||
}
|
||||
|
||||
func TestChunkSnapshot(t *testing.T) {
|
||||
head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false)
|
||||
defer func() {
|
||||
|
@ -5360,48 +5089,6 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
|
|||
require.Positive(t, offset)
|
||||
}
|
||||
|
||||
func TestHistogramWALANDWBLReplay(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||
require.NoError(t, err)
|
||||
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = 1000
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds())
|
||||
opts.EnableNativeHistograms.Store(true)
|
||||
opts.EnableOOONativeHistograms.Store(true)
|
||||
|
||||
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
var expOOOSamples []chunks.Sample
|
||||
l := labels.FromStrings("foo", "bar")
|
||||
appendSample := func(mins int64, val float64, isOOO bool, isCustomBucketHistogram bool) {
|
||||
app := h.Appender(context.Background())
|
||||
var s sample
|
||||
if isCustomBucketHistogram {
|
||||
s = sample{t: mins * time.Minute.Milliseconds(), h: tsdbutil.GenerateTestCustomBucketsHistogram(int(val))}
|
||||
} else {
|
||||
s = sample{t: mins * time.Minute.Milliseconds(), h: tsdbutil.GenerateTestHistogram(int(val))}
|
||||
}
|
||||
_, err := app.AppendHistogram(0, l, mins*time.Minute.Milliseconds(), s.h, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
if isOOO {
|
||||
expOOOSamples = append(expOOOSamples, s)
|
||||
}
|
||||
}
|
||||
|
||||
// In-order histogram samples.
|
||||
appendSample(60, 60, false, false)
|
||||
|
||||
}
|
||||
|
||||
// TestWBLReplay checks the replay at a low level.
|
||||
func TestWBLReplay(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
|
|
|
@ -189,7 +189,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
return
|
||||
}
|
||||
decoded <- exemplars
|
||||
case record.HistogramSamples, record.CustomBucketHistogramSamples:
|
||||
case record.HistogramSamples, record.HistogramSamplesLegacy:
|
||||
hists := histogramsPool.Get()[:0]
|
||||
hists, err = dec.HistogramSamples(rec, hists)
|
||||
if err != nil {
|
||||
|
@ -201,7 +201,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
return
|
||||
}
|
||||
decoded <- hists
|
||||
case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples:
|
||||
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
|
||||
hists := floatHistogramsPool.Get()[:0]
|
||||
hists, err = dec.FloatHistogramSamples(rec, hists)
|
||||
if err != nil {
|
||||
|
@ -726,7 +726,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
return
|
||||
}
|
||||
decodedCh <- markers
|
||||
case record.HistogramSamples:
|
||||
case record.HistogramSamples, record.HistogramSamplesLegacy:
|
||||
hists := histogramSamplesPool.Get()[:0]
|
||||
hists, err = dec.HistogramSamples(rec, hists)
|
||||
if err != nil {
|
||||
|
@ -738,7 +738,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
return
|
||||
}
|
||||
decodedCh <- hists
|
||||
case record.FloatHistogramSamples:
|
||||
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
|
||||
hists := floatHistogramSamplesPool.Get()[:0]
|
||||
hists, err = dec.FloatHistogramSamples(rec, hists)
|
||||
if err != nil {
|
||||
|
|
|
@ -963,7 +963,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
},
|
||||
},
|
||||
{
|
||||
name: "After Series() prev head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in response.",
|
||||
name: "After Series() prev head mmapped after getting samples, new head gets new samples also overlapping, none should appear in response.",
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(100),
|
||||
firstInOrderSampleAt: minutes(120),
|
||||
|
|
|
@ -48,14 +48,14 @@ const (
|
|||
MmapMarkers Type = 5
|
||||
// Metadata is used to match WAL records of type Metadata.
|
||||
Metadata Type = 6
|
||||
// HistogramSamples is used to match WAL records of type Histograms.
|
||||
HistogramSamples Type = 7
|
||||
// FloatHistogramSamples is used to match WAL records of type Float Histograms.
|
||||
FloatHistogramSamples Type = 8
|
||||
// CustomBucketHistogramSamples is used to match WAL records of type Histogram with custom buckets.
|
||||
CustomBucketHistogramSamples Type = 9
|
||||
// CustomBucketFloatHistogramSamples is used to match WAL records of type Float Histogram with custom buckets.
|
||||
CustomBucketFloatHistogramSamples Type = 10
|
||||
// HistogramSamplesLegacy is used to match WAL records of type Histograms prior to intrdocuing support of custom buckets, to maintain backwards compatibility.
|
||||
HistogramSamplesLegacy Type = 7
|
||||
// FloatHistogramSamplesLegacy is used to match WAL records of type Float Histograms proior to introducing support of custom buckets, to maintain backwards compatibility.
|
||||
FloatHistogramSamplesLegacy Type = 8
|
||||
// HistogramSamples is used to match WAL records of type Histogram, and supports custom buckets.
|
||||
HistogramSamples Type = 9
|
||||
// FloatHistogramSamples is used to match WAL records of type Float Histogram, and supports custom buckets.
|
||||
FloatHistogramSamples Type = 10
|
||||
)
|
||||
|
||||
func (rt Type) String() string {
|
||||
|
@ -68,14 +68,14 @@ func (rt Type) String() string {
|
|||
return "tombstones"
|
||||
case Exemplars:
|
||||
return "exemplars"
|
||||
case HistogramSamplesLegacy:
|
||||
return "histogram_samples_legacy"
|
||||
case FloatHistogramSamplesLegacy:
|
||||
return "float_histogram_samples_legacy"
|
||||
case HistogramSamples:
|
||||
return "histogram_samples"
|
||||
return "histogram_sample"
|
||||
case FloatHistogramSamples:
|
||||
return "float_histogram_samples"
|
||||
case CustomBucketHistogramSamples:
|
||||
return "custom_bucket_histogram_samples"
|
||||
case CustomBucketFloatHistogramSamples:
|
||||
return "custom_bucket_float_histogram_samples"
|
||||
case MmapMarkers:
|
||||
return "mmapmarkers"
|
||||
case Metadata:
|
||||
|
@ -215,7 +215,7 @@ func (d *Decoder) Type(rec []byte) Type {
|
|||
return Unknown
|
||||
}
|
||||
switch t := Type(rec[0]); t {
|
||||
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples, CustomBucketHistogramSamples, CustomBucketFloatHistogramSamples:
|
||||
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamplesLegacy, FloatHistogramSamplesLegacy, HistogramSamples, FloatHistogramSamples:
|
||||
return t
|
||||
}
|
||||
return Unknown
|
||||
|
@ -436,7 +436,7 @@ func (d *Decoder) MmapMarkers(rec []byte, markers []RefMmapMarker) ([]RefMmapMar
|
|||
func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) ([]RefHistogramSample, error) {
|
||||
dec := encoding.Decbuf{B: rec}
|
||||
t := Type(dec.Byte())
|
||||
if t != HistogramSamples && t != CustomBucketHistogramSamples {
|
||||
if t != HistogramSamples && t != HistogramSamplesLegacy {
|
||||
return nil, errors.New("invalid record type")
|
||||
}
|
||||
if dec.Len() == 0 {
|
||||
|
@ -528,7 +528,7 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) {
|
|||
func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) {
|
||||
dec := encoding.Decbuf{B: rec}
|
||||
t := Type(dec.Byte())
|
||||
if t != FloatHistogramSamples && t != CustomBucketFloatHistogramSamples {
|
||||
if t != FloatHistogramSamples && t != FloatHistogramSamplesLegacy {
|
||||
return nil, errors.New("invalid record type")
|
||||
}
|
||||
if dec.Len() == 0 {
|
||||
|
@ -744,40 +744,10 @@ func (e *Encoder) MmapMarkers(markers []RefMmapMarker, b []byte) []byte {
|
|||
return buf.Get()
|
||||
}
|
||||
|
||||
func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]byte, bool) {
|
||||
func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) []byte {
|
||||
buf := encoding.Encbuf{B: b}
|
||||
buf.PutByte(byte(HistogramSamples))
|
||||
|
||||
if len(histograms) == 0 {
|
||||
return buf.Get(), false
|
||||
}
|
||||
|
||||
// Store base timestamp and base reference number of first histogram.
|
||||
// All histograms encode their timestamp and ref as delta to those.
|
||||
first := histograms[0]
|
||||
buf.PutBE64(uint64(first.Ref))
|
||||
buf.PutBE64int64(first.T)
|
||||
|
||||
customBucketSamplesExist := false
|
||||
for _, h := range histograms {
|
||||
if h.H.UsesCustomBuckets() {
|
||||
customBucketSamplesExist = true
|
||||
continue
|
||||
}
|
||||
|
||||
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||
buf.PutVarint64(h.T - first.T)
|
||||
|
||||
EncodeHistogram(&buf, h.H)
|
||||
}
|
||||
|
||||
return buf.Get(), customBucketSamplesExist
|
||||
}
|
||||
|
||||
func (e *Encoder) CustomBucketHistogramSamples(histograms []RefHistogramSample, b []byte) []byte {
|
||||
buf := encoding.Encbuf{B: b}
|
||||
buf.PutByte(byte(CustomBucketHistogramSamples))
|
||||
|
||||
if len(histograms) == 0 {
|
||||
return buf.Get()
|
||||
}
|
||||
|
@ -789,12 +759,10 @@ func (e *Encoder) CustomBucketHistogramSamples(histograms []RefHistogramSample,
|
|||
buf.PutBE64int64(first.T)
|
||||
|
||||
for _, h := range histograms {
|
||||
if h.H.UsesCustomBuckets() {
|
||||
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||
buf.PutVarint64(h.T - first.T)
|
||||
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||
buf.PutVarint64(h.T - first.T)
|
||||
|
||||
EncodeHistogram(&buf, h.H)
|
||||
}
|
||||
EncodeHistogram(&buf, h.H)
|
||||
}
|
||||
|
||||
return buf.Get()
|
||||
|
@ -841,40 +809,10 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) ([]byte, bool) {
|
||||
func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte {
|
||||
buf := encoding.Encbuf{B: b}
|
||||
buf.PutByte(byte(FloatHistogramSamples))
|
||||
|
||||
if len(histograms) == 0 {
|
||||
return buf.Get(), false
|
||||
}
|
||||
|
||||
// Store base timestamp and base reference number of first histogram.
|
||||
// All histograms encode their timestamp and ref as delta to those.
|
||||
first := histograms[0]
|
||||
buf.PutBE64(uint64(first.Ref))
|
||||
buf.PutBE64int64(first.T)
|
||||
|
||||
customBucketsExist := false
|
||||
for _, h := range histograms {
|
||||
if h.FH.UsesCustomBuckets() {
|
||||
customBucketsExist = true
|
||||
continue
|
||||
}
|
||||
|
||||
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||
buf.PutVarint64(h.T - first.T)
|
||||
|
||||
EncodeFloatHistogram(&buf, h.FH)
|
||||
}
|
||||
|
||||
return buf.Get(), customBucketsExist
|
||||
}
|
||||
|
||||
func (e *Encoder) CustomBucketFloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte {
|
||||
buf := encoding.Encbuf{B: b}
|
||||
buf.PutByte(byte(CustomBucketFloatHistogramSamples))
|
||||
|
||||
if len(histograms) == 0 {
|
||||
return buf.Get()
|
||||
}
|
||||
|
@ -886,12 +824,10 @@ func (e *Encoder) CustomBucketFloatHistogramSamples(histograms []RefFloatHistogr
|
|||
buf.PutBE64int64(first.T)
|
||||
|
||||
for _, h := range histograms {
|
||||
if h.FH.UsesCustomBuckets() {
|
||||
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||
buf.PutVarint64(h.T - first.T)
|
||||
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||
buf.PutVarint64(h.T - first.T)
|
||||
|
||||
EncodeFloatHistogram(&buf, h.FH)
|
||||
}
|
||||
EncodeFloatHistogram(&buf, h.FH)
|
||||
}
|
||||
|
||||
return buf.Get()
|
||||
|
|
|
@ -166,12 +166,9 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
histSamples, _ := enc.HistogramSamples(histograms, nil)
|
||||
customBucketHistSamples := enc.CustomBucketHistogramSamples(histograms, nil)
|
||||
histSamples := enc.HistogramSamples(histograms, nil)
|
||||
decHistograms, err := dec.HistogramSamples(histSamples, nil)
|
||||
require.NoError(t, err)
|
||||
decCustomBucketHistSamples, err := dec.HistogramSamples(customBucketHistSamples, nil)
|
||||
decHistograms = append(decHistograms, decCustomBucketHistSamples...)
|
||||
require.Equal(t, histograms, decHistograms)
|
||||
|
||||
floatHistograms := make([]RefFloatHistogramSample, len(histograms))
|
||||
|
@ -182,13 +179,9 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
|||
FH: h.H.ToFloat(nil),
|
||||
}
|
||||
}
|
||||
floatHistSamples, _ := enc.FloatHistogramSamples(floatHistograms, nil)
|
||||
customBucketFloatHistSamples := enc.CustomBucketFloatHistogramSamples(floatHistograms, nil)
|
||||
floatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil)
|
||||
decFloatHistograms, err := dec.FloatHistogramSamples(floatHistSamples, nil)
|
||||
require.NoError(t, err)
|
||||
decCustomBucketFloatHistograms, err := dec.FloatHistogramSamples(customBucketFloatHistSamples, nil)
|
||||
require.NoError(t, err)
|
||||
decFloatHistograms = append(decFloatHistograms, decCustomBucketFloatHistograms...)
|
||||
require.Equal(t, floatHistograms, decFloatHistograms)
|
||||
|
||||
// Gauge integer histograms.
|
||||
|
@ -196,13 +189,9 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
|||
histograms[i].H.CounterResetHint = histogram.GaugeType
|
||||
}
|
||||
|
||||
gaugeHistSamples, _ := enc.HistogramSamples(histograms, nil)
|
||||
customBucketGaugeHistSamples := enc.CustomBucketHistogramSamples(histograms, nil)
|
||||
gaugeHistSamples := enc.HistogramSamples(histograms, nil)
|
||||
decGaugeHistograms, err := dec.HistogramSamples(gaugeHistSamples, nil)
|
||||
require.NoError(t, err)
|
||||
decCustomBucketGaugeHistograms, err := dec.HistogramSamples(customBucketGaugeHistSamples, nil)
|
||||
require.NoError(t, err)
|
||||
decGaugeHistograms = append(decGaugeHistograms, decCustomBucketGaugeHistograms...)
|
||||
require.Equal(t, histograms, decGaugeHistograms)
|
||||
|
||||
// Gauge float histograms.
|
||||
|
@ -210,14 +199,10 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
|||
floatHistograms[i].FH.CounterResetHint = histogram.GaugeType
|
||||
}
|
||||
|
||||
gaugeFloatHistSamples, _ := enc.FloatHistogramSamples(floatHistograms, nil)
|
||||
customBucketGaugeFloatHistSamples := enc.CustomBucketFloatHistogramSamples(floatHistograms, nil)
|
||||
gaugeFloatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil)
|
||||
decGaugeFloatHistograms, err := dec.FloatHistogramSamples(gaugeFloatHistSamples, nil)
|
||||
require.NoError(t, err)
|
||||
decCustomBucketGaugeFloatHistograms, err := dec.FloatHistogramSamples(customBucketGaugeFloatHistSamples, nil)
|
||||
require.NoError(t, err)
|
||||
decFloatHistograms = append(decGaugeFloatHistograms, decCustomBucketGaugeFloatHistograms...)
|
||||
require.Equal(t, floatHistograms, decFloatHistograms)
|
||||
require.Equal(t, floatHistograms, decGaugeFloatHistograms)
|
||||
}
|
||||
|
||||
// TestRecord_Corrupted ensures that corrupted records return the correct error.
|
||||
|
@ -318,14 +303,10 @@ func TestRecord_Corrupted(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
corruptedHists, _ := enc.HistogramSamples(histograms, nil)
|
||||
corruptedHists := enc.HistogramSamples(histograms, nil)
|
||||
corruptedHists = corruptedHists[:8]
|
||||
corruptedCustomBucketHists := enc.CustomBucketHistogramSamples(histograms, nil)
|
||||
corruptedCustomBucketHists = corruptedCustomBucketHists[:8]
|
||||
_, err := dec.HistogramSamples(corruptedHists, nil)
|
||||
require.ErrorIs(t, err, encoding.ErrInvalidSize)
|
||||
_, err = dec.HistogramSamples(corruptedCustomBucketHists, nil)
|
||||
require.ErrorIs(t, err, encoding.ErrInvalidSize)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -383,12 +364,9 @@ func TestRecord_Type(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
hists, _ := enc.HistogramSamples(histograms, nil)
|
||||
hists := enc.HistogramSamples(histograms, nil)
|
||||
recordType = dec.Type(hists)
|
||||
require.Equal(t, HistogramSamples, recordType)
|
||||
customBucketHists := enc.CustomBucketHistogramSamples(histograms, nil)
|
||||
recordType = dec.Type(customBucketHists)
|
||||
require.Equal(t, CustomBucketHistogramSamples, recordType)
|
||||
|
||||
recordType = dec.Type(nil)
|
||||
require.Equal(t, Unknown, recordType)
|
||||
|
|
|
@ -64,7 +64,6 @@ func GenerateTestCustomBucketsHistograms(n int) (r []*histogram.Histogram) {
|
|||
h.CounterResetHint = histogram.NotCounterReset
|
||||
}
|
||||
r = append(r, h)
|
||||
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
|
|
@ -208,7 +208,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
|||
stats.TotalSamples += len(samples)
|
||||
stats.DroppedSamples += len(samples) - len(repl)
|
||||
|
||||
case record.HistogramSamples:
|
||||
case record.HistogramSamples, record.HistogramSamplesLegacy:
|
||||
histogramSamples, err = dec.HistogramSamples(rec, histogramSamples)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode histogram samples: %w", err)
|
||||
|
@ -221,28 +221,11 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
|||
}
|
||||
}
|
||||
if len(repl) > 0 {
|
||||
buf, _ = enc.HistogramSamples(repl, buf)
|
||||
buf = enc.HistogramSamples(repl, buf)
|
||||
}
|
||||
stats.TotalSamples += len(histogramSamples)
|
||||
stats.DroppedSamples += len(histogramSamples) - len(repl)
|
||||
case record.CustomBucketHistogramSamples:
|
||||
histogramSamples, err = dec.HistogramSamples(rec, histogramSamples)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode histogram samples: %w", err)
|
||||
}
|
||||
// Drop irrelevant histogramSamples in place.
|
||||
repl := histogramSamples[:0]
|
||||
for _, h := range histogramSamples {
|
||||
if h.T >= mint {
|
||||
repl = append(repl, h)
|
||||
}
|
||||
}
|
||||
if len(repl) > 0 {
|
||||
buf = enc.CustomBucketHistogramSamples(repl, buf)
|
||||
}
|
||||
stats.TotalSamples += len(histogramSamples)
|
||||
stats.DroppedSamples += len(histogramSamples) - len(repl)
|
||||
case record.FloatHistogramSamples:
|
||||
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
|
||||
floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode float histogram samples: %w", err)
|
||||
|
@ -255,24 +238,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
|||
}
|
||||
}
|
||||
if len(repl) > 0 {
|
||||
buf, _ = enc.FloatHistogramSamples(repl, buf)
|
||||
}
|
||||
stats.TotalSamples += len(floatHistogramSamples)
|
||||
stats.DroppedSamples += len(floatHistogramSamples) - len(repl)
|
||||
case record.CustomBucketFloatHistogramSamples:
|
||||
floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode float histogram samples: %w", err)
|
||||
}
|
||||
// Drop irrelevant floatHistogramSamples in place.
|
||||
repl := floatHistogramSamples[:0]
|
||||
for _, fh := range floatHistogramSamples {
|
||||
if fh.T >= mint {
|
||||
repl = append(repl, fh)
|
||||
}
|
||||
}
|
||||
if len(repl) > 0 {
|
||||
buf = enc.CustomBucketFloatHistogramSamples(repl, buf)
|
||||
buf = enc.FloatHistogramSamples(repl, buf)
|
||||
}
|
||||
stats.TotalSamples += len(floatHistogramSamples)
|
||||
stats.DroppedSamples += len(floatHistogramSamples) - len(repl)
|
||||
|
|
|
@ -236,7 +236,7 @@ func TestCheckpoint(t *testing.T) {
|
|||
require.NoError(t, w.Log(b))
|
||||
samplesInWAL += 4
|
||||
h := makeHistogram(i)
|
||||
b, _ = enc.HistogramSamples([]record.RefHistogramSample{
|
||||
b = enc.HistogramSamples([]record.RefHistogramSample{
|
||||
{Ref: 0, T: last, H: h},
|
||||
{Ref: 1, T: last + 10000, H: h},
|
||||
{Ref: 2, T: last + 20000, H: h},
|
||||
|
@ -245,7 +245,7 @@ func TestCheckpoint(t *testing.T) {
|
|||
require.NoError(t, w.Log(b))
|
||||
histogramsInWAL += 4
|
||||
cbh := makeCustomBucketHistogram(i)
|
||||
b = enc.CustomBucketHistogramSamples([]record.RefHistogramSample{
|
||||
b = enc.HistogramSamples([]record.RefHistogramSample{
|
||||
{Ref: 0, T: last, H: cbh},
|
||||
{Ref: 1, T: last + 10000, H: cbh},
|
||||
{Ref: 2, T: last + 20000, H: cbh},
|
||||
|
@ -254,7 +254,7 @@ func TestCheckpoint(t *testing.T) {
|
|||
require.NoError(t, w.Log(b))
|
||||
histogramsInWAL += 4
|
||||
fh := makeFloatHistogram(i)
|
||||
b, _ = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
|
||||
b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
|
||||
{Ref: 0, T: last, FH: fh},
|
||||
{Ref: 1, T: last + 10000, FH: fh},
|
||||
{Ref: 2, T: last + 20000, FH: fh},
|
||||
|
@ -263,7 +263,7 @@ func TestCheckpoint(t *testing.T) {
|
|||
require.NoError(t, w.Log(b))
|
||||
floatHistogramsInWAL += 4
|
||||
cbfh := makeCustomBucketFloatHistogram(i)
|
||||
b = enc.CustomBucketFloatHistogramSamples([]record.RefFloatHistogramSample{
|
||||
b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
|
||||
{Ref: 0, T: last, FH: cbfh},
|
||||
{Ref: 1, T: last + 10000, FH: cbfh},
|
||||
{Ref: 2, T: last + 20000, FH: cbfh},
|
||||
|
@ -330,14 +330,14 @@ func TestCheckpoint(t *testing.T) {
|
|||
require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp")
|
||||
}
|
||||
samplesInCheckpoint += len(samples)
|
||||
case record.HistogramSamples, record.CustomBucketHistogramSamples:
|
||||
case record.HistogramSamples, record.HistogramSamplesLegacy:
|
||||
histograms, err := dec.HistogramSamples(rec, nil)
|
||||
require.NoError(t, err)
|
||||
for _, h := range histograms {
|
||||
require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp")
|
||||
}
|
||||
histogramsInCheckpoint += len(histograms)
|
||||
case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples:
|
||||
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
|
||||
floatHistograms, err := dec.FloatHistogramSamples(rec, nil)
|
||||
require.NoError(t, err)
|
||||
for _, h := range floatHistograms {
|
||||
|
|
|
@ -546,7 +546,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
|||
}
|
||||
w.writer.AppendExemplars(exemplars)
|
||||
|
||||
case record.HistogramSamples, record.CustomBucketHistogramSamples:
|
||||
case record.HistogramSamples, record.HistogramSamplesLegacy:
|
||||
// Skip if experimental "histograms over remote write" is not enabled.
|
||||
if !w.sendHistograms {
|
||||
break
|
||||
|
@ -574,7 +574,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
|||
histogramsToSend = histogramsToSend[:0]
|
||||
}
|
||||
|
||||
case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples:
|
||||
case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy:
|
||||
// Skip if experimental "histograms over remote write" is not enabled.
|
||||
if !w.sendHistograms {
|
||||
break
|
||||
|
|
|
@ -209,7 +209,7 @@ func TestTailSamples(t *testing.T) {
|
|||
NegativeBuckets: []int64{int64(-i) - 1},
|
||||
}
|
||||
|
||||
histograms, _ := enc.HistogramSamples([]record.RefHistogramSample{{
|
||||
histograms := enc.HistogramSamples([]record.RefHistogramSample{{
|
||||
Ref: chunks.HeadSeriesRef(inner),
|
||||
T: now.UnixNano() + 1,
|
||||
H: hist,
|
||||
|
@ -226,21 +226,21 @@ func TestTailSamples(t *testing.T) {
|
|||
CustomValues: []float64{float64(i) + 2},
|
||||
}
|
||||
|
||||
customBucketHistograms := enc.CustomBucketHistogramSamples([]record.RefHistogramSample{{
|
||||
customBucketHistograms := enc.HistogramSamples([]record.RefHistogramSample{{
|
||||
Ref: chunks.HeadSeriesRef(inner),
|
||||
T: now.UnixNano() + 1,
|
||||
H: customBucketHist,
|
||||
}}, nil)
|
||||
require.NoError(t, w.Log(customBucketHistograms))
|
||||
|
||||
floatHistograms, _ := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
|
||||
floatHistograms := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
|
||||
Ref: chunks.HeadSeriesRef(inner),
|
||||
T: now.UnixNano() + 1,
|
||||
FH: hist.ToFloat(nil),
|
||||
}}, nil)
|
||||
require.NoError(t, w.Log(floatHistograms))
|
||||
|
||||
customBucketFloatHistograms := enc.CustomBucketFloatHistogramSamples([]record.RefFloatHistogramSample{{
|
||||
customBucketFloatHistograms := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
|
||||
Ref: chunks.HeadSeriesRef(inner),
|
||||
T: now.UnixNano() + 1,
|
||||
FH: customBucketHist.ToFloat(nil),
|
||||
|
|
Loading…
Reference in a new issue