diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index bcfc7be129..5cf56d5871 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -463,7 +463,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H return } decoded <- samples - case record.HistogramSamples, record.CustomBucketHistogramSamples: + case record.HistogramSamples, record.HistogramSamplesLegacy: histograms := histogramsPool.Get()[:0] histograms, err = dec.HistogramSamples(rec, histograms) if err != nil { @@ -475,7 +475,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H return } decoded <- histograms - case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples: + case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy: floatHistograms := floatHistogramsPool.Get()[:0] floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) if err != nil { @@ -1154,35 +1154,19 @@ func (a *appender) log() error { } if len(a.pendingHistograms) > 0 { - var customBucketsExist bool - buf, customBucketsExist = encoder.HistogramSamples(a.pendingHistograms, buf) + buf = encoder.HistogramSamples(a.pendingHistograms, buf) if err := a.wal.Log(buf); err != nil { return err } buf = buf[:0] - if customBucketsExist { - buf = encoder.CustomBucketHistogramSamples(a.pendingHistograms, buf) - if err := a.wal.Log(buf); err != nil { - return err - } - buf = buf[:0] - } } if len(a.pendingFloatHistograms) > 0 { - var customBucketsExist bool - buf, customBucketsExist = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf) + buf = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf) if err := a.wal.Log(buf); err != nil { return err } buf = buf[:0] - if customBucketsExist { - buf = encoder.CustomBucketFloatHistogramSamples(a.pendingFloatHistograms, buf) - if err := a.wal.Log(buf); err != nil { - return err - } - buf = buf[:0] - } } if len(a.pendingExamplars) > 0 { diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 6b5d9ece05..8bcb71c86a 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -163,7 +163,7 @@ func TestCommit(t *testing.T) { } } - lbls = labelsForTest(t.Name()+"_custom_bucket_histogram", numSeries) + lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries) for _, l := range lbls { lset := labels.New(l...) @@ -187,7 +187,7 @@ func TestCommit(t *testing.T) { } } - lbls = labelsForTest(t.Name()+"custom_bucket_float_histogram", numSeries) + lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries) for _, l := range lbls { lset := labels.New(l...) @@ -231,13 +231,13 @@ func TestCommit(t *testing.T) { require.NoError(t, err) walSamplesCount += len(samples) - case record.HistogramSamples, record.CustomBucketHistogramSamples: + case record.HistogramSamples, record.HistogramSamplesLegacy: var histograms []record.RefHistogramSample histograms, err = dec.HistogramSamples(rec, histograms) require.NoError(t, err) walHistogramCount += len(histograms) - case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples: + case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy: var floatHistograms []record.RefFloatHistogramSample floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) require.NoError(t, err) @@ -294,6 +294,18 @@ func TestRollback(t *testing.T) { } } + lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + require.NoError(t, err) + } + } + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) for _, l := range lbls { lset := labels.New(l...) @@ -306,6 +318,18 @@ func TestRollback(t *testing.T) { } } + lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + require.NoError(t, err) + } + } + // Do a rollback, which should clear uncommitted data. A followup call to // commit should persist nothing to the WAL. require.NoError(t, app.Rollback()) @@ -346,13 +370,13 @@ func TestRollback(t *testing.T) { require.NoError(t, err) walExemplarsCount += len(exemplars) - case record.HistogramSamples: + case record.HistogramSamples, record.HistogramSamplesLegacy: var histograms []record.RefHistogramSample histograms, err = dec.HistogramSamples(rec, histograms) require.NoError(t, err) walHistogramCount += len(histograms) - case record.FloatHistogramSamples: + case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy: var floatHistograms []record.RefFloatHistogramSample floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) require.NoError(t, err) @@ -363,7 +387,7 @@ func TestRollback(t *testing.T) { } // Check that only series get stored after calling Rollback. - require.Equal(t, numSeries*3, walSeriesCount, "series should have been written to WAL") + require.Equal(t, numSeries*5, walSeriesCount, "series should have been written to WAL") require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL") require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL") require.Equal(t, 0, walHistogramCount, "histograms should not have been written to WAL") @@ -412,6 +436,19 @@ func TestFullTruncateWAL(t *testing.T) { require.NoError(t, app.Commit()) } + lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(lastTs), histograms[i], nil) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) for _, l := range lbls { lset := labels.New(l...) @@ -425,11 +462,24 @@ func TestFullTruncateWAL(t *testing.T) { require.NoError(t, app.Commit()) } + lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(lastTs), nil, floatHistograms[i]) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + // Truncate WAL with mint to GC all the samples. s.truncate(lastTs + 1) m := gatherFamily(t, reg, "prometheus_agent_deleted_series") - require.Equal(t, float64(numSeries*3), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count") + require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count") } func TestPartialTruncateWAL(t *testing.T) { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 306dc4579e..4bbf4b4656 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4281,6 +4281,188 @@ func TestOOOWALWrite(t *testing.T) { }, }, }, + "custom buckets histogram": { + appendSample: func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) { + seriesRef, err := app.AppendHistogram(0, l, minutes(mins), tsdbutil.GenerateTestCustomBucketsHistogram(int(mins)), nil) + require.NoError(t, err) + return seriesRef, nil + }, + expectedOOORecords: []interface{}{ + // The MmapRef in this are not hand calculated, and instead taken from the test run. + // What is important here is the order of records, and that MmapRef increases for each record. + []record.RefMmapMarker{ + {Ref: 1}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(40), H: tsdbutil.GenerateTestCustomBucketsHistogram(40)}, + }, + + []record.RefMmapMarker{ + {Ref: 2}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(42), H: tsdbutil.GenerateTestCustomBucketsHistogram(42)}, + }, + + []record.RefHistogramSample{ + {Ref: 2, T: minutes(45), H: tsdbutil.GenerateTestCustomBucketsHistogram(45)}, + {Ref: 1, T: minutes(35), H: tsdbutil.GenerateTestCustomBucketsHistogram(35)}, + }, + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 8}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(36), H: tsdbutil.GenerateTestCustomBucketsHistogram(36)}, + {Ref: 1, T: minutes(37), H: tsdbutil.GenerateTestCustomBucketsHistogram(37)}, + }, + + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 82}, + }, + []record.RefHistogramSample{ // Does not contain the in-order sample here. + {Ref: 1, T: minutes(50), H: tsdbutil.GenerateTestCustomBucketsHistogram(50)}, + }, + + // Single commit but multiple OOO records. + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 160}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(50), H: tsdbutil.GenerateTestCustomBucketsHistogram(50)}, + {Ref: 2, T: minutes(51), H: tsdbutil.GenerateTestCustomBucketsHistogram(51)}, + }, + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 239}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(52), H: tsdbutil.GenerateTestCustomBucketsHistogram(52)}, + {Ref: 2, T: minutes(53), H: tsdbutil.GenerateTestCustomBucketsHistogram(53)}, + }, + }, + expectedInORecords: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: s1}, + {Ref: 2, Labels: s2}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(60), H: tsdbutil.GenerateTestCustomBucketsHistogram(60)}, + {Ref: 2, T: minutes(60), H: tsdbutil.GenerateTestCustomBucketsHistogram(60)}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(40), H: tsdbutil.GenerateTestCustomBucketsHistogram(40)}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(42), H: tsdbutil.GenerateTestCustomBucketsHistogram(42)}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(45), H: tsdbutil.GenerateTestCustomBucketsHistogram(45)}, + {Ref: 1, T: minutes(35), H: tsdbutil.GenerateTestCustomBucketsHistogram(35)}, + {Ref: 1, T: minutes(36), H: tsdbutil.GenerateTestCustomBucketsHistogram(36)}, + {Ref: 1, T: minutes(37), H: tsdbutil.GenerateTestCustomBucketsHistogram(37)}, + }, + []record.RefHistogramSample{ // Contains both in-order and ooo sample. + {Ref: 1, T: minutes(50), H: tsdbutil.GenerateTestCustomBucketsHistogram(50)}, + {Ref: 2, T: minutes(65), H: tsdbutil.GenerateTestCustomBucketsHistogram(65)}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(50), H: tsdbutil.GenerateTestCustomBucketsHistogram(50)}, + {Ref: 2, T: minutes(51), H: tsdbutil.GenerateTestCustomBucketsHistogram(51)}, + {Ref: 2, T: minutes(52), H: tsdbutil.GenerateTestCustomBucketsHistogram(52)}, + {Ref: 2, T: minutes(53), H: tsdbutil.GenerateTestCustomBucketsHistogram(53)}, + }, + }, + }, + "custom buckets float histogram": { + appendSample: func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) { + seriesRef, err := app.AppendHistogram(0, l, minutes(mins), nil, tsdbutil.GenerateTestCustomBucketsFloatHistogram(int(mins))) + require.NoError(t, err) + return seriesRef, nil + }, + expectedOOORecords: []interface{}{ + // The MmapRef in this are not hand calculated, and instead taken from the test run. + // What is important here is the order of records, and that MmapRef increases for each record. + []record.RefMmapMarker{ + {Ref: 1}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(40), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(40)}, + }, + + []record.RefMmapMarker{ + {Ref: 2}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(42), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(42)}, + }, + + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(45), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(45)}, + {Ref: 1, T: minutes(35), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(35)}, + }, + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 8}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(36), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(36)}, + {Ref: 1, T: minutes(37), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(37)}, + }, + + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 134}, + }, + []record.RefFloatHistogramSample{ // Does not contain the in-order sample here. + {Ref: 1, T: minutes(50), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(50)}, + }, + + // Single commit but multiple OOO records. + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 263}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(50), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(50)}, + {Ref: 2, T: minutes(51), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(51)}, + }, + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 393}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(52), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(52)}, + {Ref: 2, T: minutes(53), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(53)}, + }, + }, + expectedInORecords: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: s1}, + {Ref: 2, Labels: s2}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(60), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(60)}, + {Ref: 2, T: minutes(60), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(60)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(40), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(40)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(42), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(42)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(45), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(45)}, + {Ref: 1, T: minutes(35), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(35)}, + {Ref: 1, T: minutes(36), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(36)}, + {Ref: 1, T: minutes(37), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(37)}, + }, + []record.RefFloatHistogramSample{ // Contains both in-order and ooo sample. + {Ref: 1, T: minutes(50), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(50)}, + {Ref: 2, T: minutes(65), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(65)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(50), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(50)}, + {Ref: 2, T: minutes(51), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(51)}, + {Ref: 2, T: minutes(52), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(52)}, + {Ref: 2, T: minutes(53), FH: tsdbutil.GenerateTestCustomBucketsFloatHistogram(53)}, + }, + }, + }, } for name, scenario := range scenarios { t.Run(name, func(t *testing.T) { @@ -4374,11 +4556,11 @@ func testOOOWALWrite(t *testing.T, markers, err := dec.MmapMarkers(rec, nil) require.NoError(t, err) records = append(records, markers) - case record.HistogramSamples: + case record.HistogramSamples, record.HistogramSamplesLegacy: histogramSamples, err := dec.HistogramSamples(rec, nil) require.NoError(t, err) records = append(records, histogramSamples) - case record.FloatHistogramSamples: + case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy: floatHistogramSamples, err := dec.FloatHistogramSamples(rec, nil) require.NoError(t, err) records = append(records, floatHistogramSamples) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 78b256fee3..7dacb9037b 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -942,33 +942,18 @@ func (a *headAppender) log() error { } } if len(a.histograms) > 0 { - rec, customBucketsExist := enc.HistogramSamples(a.histograms, buf) + rec = enc.HistogramSamples(a.histograms, buf) buf = rec[:0] if err := a.head.wal.Log(rec); err != nil { return fmt.Errorf("log histograms: %w", err) } - - if customBucketsExist { - enc.CustomBucketHistogramSamples(a.histograms, buf) - buf = rec[:0] - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log custom bucket histograms: %w", err) - } - } } if len(a.floatHistograms) > 0 { - rec, customBucketsExist := enc.FloatHistogramSamples(a.floatHistograms, buf) + rec = enc.FloatHistogramSamples(a.floatHistograms, buf) buf = rec[:0] if err := a.head.wal.Log(rec); err != nil { return fmt.Errorf("log float histograms: %w", err) } - - if customBucketsExist { - buf = rec[:0] - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log custom bucket float histograms: %w", err) - } - } } // Exemplars should be logged after samples (float/native histogram/etc), // otherwise it might happen that we send the exemplars in a remote write diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 527476e113..c3377fecff 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -187,11 +187,11 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { samples, err := dec.Samples(rec, nil) require.NoError(t, err) recs = append(recs, samples) - case record.HistogramSamples: + case record.HistogramSamples, record.HistogramSamplesLegacy: samples, err := dec.HistogramSamples(rec, nil) require.NoError(t, err) recs = append(recs, samples) - case record.FloatHistogramSamples: + case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy: samples, err := dec.FloatHistogramSamples(rec, nil) require.NoError(t, err) recs = append(recs, samples) @@ -740,89 +740,6 @@ func TestHead_ReadWAL(t *testing.T) { } } -func TestHead_ReadWAL2(t *testing.T) { - for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - entries := []interface{}{ - []record.RefSeries{ - {Ref: 10, Labels: labels.FromStrings("a", "1")}, - {Ref: 11, Labels: labels.FromStrings("a", "2")}, - {Ref: 100, Labels: labels.FromStrings("a", "3")}, - }, - []record.RefHistogramSample{ - {Ref: 0, T: 99, H: tsdbutil.GenerateTestHistogram(1)}, - {Ref: 10, T: 100, H: tsdbutil.GenerateTestCustomBucketsHistogram(2)}, - {Ref: 100, T: 100, H: tsdbutil.GenerateTestHistogram(3)}, - }, - []record.RefSeries{ - {Ref: 50, Labels: labels.FromStrings("a", "4")}, - // This series has two refs pointing to it. - {Ref: 101, Labels: labels.FromStrings("a", "3")}, - }, - []record.RefHistogramSample{ - {Ref: 10, T: 101, H: tsdbutil.GenerateTestHistogram(5)}, - {Ref: 50, T: 101, H: tsdbutil.GenerateTestHistogram(6)}, - {Ref: 101, T: 101, H: tsdbutil.GenerateTestCustomBucketsHistogram(7)}, - }, - []tombstones.Stone{ - {Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}}, - }, - []record.RefExemplar{ - {Ref: 10, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")}, - }, - } - - head, w := newTestHead(t, 1000, compress, false) - defer func() { - require.NoError(t, head.Close()) - }() - - populateTestWL(t, w, entries) - - require.NoError(t, head.Init(math.MinInt64)) - require.Equal(t, uint64(101), head.lastSeriesID.Load()) - - s10 := head.series.getByID(10) - s11 := head.series.getByID(11) - s50 := head.series.getByID(50) - s100 := head.series.getByID(100) - - testutil.RequireEqual(t, labels.FromStrings("a", "1"), s10.lset) - require.Nil(t, s11) // Series without samples should be garbage collected at head.Init(). - testutil.RequireEqual(t, labels.FromStrings("a", "4"), s50.lset) - testutil.RequireEqual(t, labels.FromStrings("a", "3"), s100.lset) - - expandChunk := func(c chunkenc.Iterator) (x []sample) { - for c.Next() == chunkenc.ValHistogram { - t, v := c.AtHistogram(nil) - //t, v := c.At() - x = append(x, sample{t: t, h: v}) - } - require.NoError(t, c.Err()) - return x - } - - c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool) - require.NoError(t, err) - require.Equal(t, []sample{{100, 0, tsdbutil.GenerateTestCustomBucketsHistogram(2), nil}, {101, 0, tsdbutil.GenerateTestCustomBucketsHistogram(5), nil}}, expandChunk(c.chunk.Iterator(nil))) - c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool) - require.NoError(t, err) - require.Equal(t, []sample{{101, 0, tsdbutil.GenerateTestHistogram(6), nil}}, expandChunk(c.chunk.Iterator(nil))) - // The samples before the new series record should be discarded since a duplicate record - // is only possible when old samples were compacted. - c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool) - require.NoError(t, err) - require.Equal(t, []sample{{101, 0, tsdbutil.GenerateTestCustomBucketsHistogram(7), nil}}, expandChunk(c.chunk.Iterator(nil))) - - q, err := head.ExemplarQuerier(context.Background()) - require.NoError(t, err) - e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}) - require.NoError(t, err) - require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0])) - }) - } -} - func TestHead_WALMultiRef(t *testing.T) { head, w := newTestHead(t, 1000, wlog.CompressionNone, false) @@ -4036,194 +3953,6 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { testQuery() } -func TestHistogramInWALAndMmapChunk2(t *testing.T) { - head, _ := newTestHead(t, 3000, wlog.CompressionNone, false) - t.Cleanup(func() { - require.NoError(t, head.Close()) - }) - require.NoError(t, head.Init(0)) - - // Series with only histograms. - s1 := labels.FromStrings("a", "b1") - k1 := s1.String() - numHistograms := 300 - exp := map[string][]chunks.Sample{} - ts := int64(0) - var app storage.Appender - for _, custom := range []bool{true, false} { - app = head.Appender(context.Background()) - var hists []*histogram.Histogram - if custom { - hists = tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) - } else { - hists = tsdbutil.GenerateTestHistograms(numHistograms) - } - for _, h := range hists { - if !custom { - h.NegativeSpans = h.PositiveSpans - h.NegativeBuckets = h.PositiveBuckets - } - _, err := app.AppendHistogram(0, s1, ts, h, nil) - require.NoError(t, err) - exp[k1] = append(exp[k1], sample{t: ts, h: h.Copy()}) - ts++ - if ts%5 == 0 { - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - } - } - require.NoError(t, app.Commit()) - } - for _, custom := range []bool{true, false} { - app = head.Appender(context.Background()) - var hists []*histogram.FloatHistogram - if custom { - hists = tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) - } else { - hists = tsdbutil.GenerateTestFloatHistograms(numHistograms) - } - for _, h := range hists { - if !custom { - h.NegativeSpans = h.PositiveSpans - h.NegativeBuckets = h.PositiveBuckets - } - _, err := app.AppendHistogram(0, s1, ts, nil, h) - require.NoError(t, err) - exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()}) - ts++ - if ts%5 == 0 { - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - } - } - require.NoError(t, app.Commit()) - head.mmapHeadChunks() - } - - // There should be 20 mmap chunks in s1. - ms := head.series.getByHash(s1.Hash(), s1) - require.Len(t, ms.mmappedChunks, 19) - expMmapChunks := make([]*mmappedChunk, 0, 20) - for _, mmap := range ms.mmappedChunks { - require.Positive(t, mmap.numSamples) - cpy := *mmap - expMmapChunks = append(expMmapChunks, &cpy) - } - expHeadChunkSamples := ms.headChunks.chunk.NumSamples() - require.Positive(t, expHeadChunkSamples) - - // Series with mix of histograms and float. - s2 := labels.FromStrings("a", "b2") - k2 := s2.String() - ts = 0 - for _, custom := range []bool{true, false} { - app = head.Appender(context.Background()) - var hists []*histogram.Histogram - if custom { - hists = tsdbutil.GenerateTestCustomBucketsHistograms(100) - } else { - hists = tsdbutil.GenerateTestHistograms(100) - } - for _, h := range hists { - ts++ - if !custom { - h.NegativeSpans = h.PositiveSpans - h.NegativeBuckets = h.PositiveBuckets - } - _, err := app.AppendHistogram(0, s2, ts, h, nil) - require.NoError(t, err) - eh := h.Copy() - if ts > 30 && (ts-10)%20 == 1 { - // Need "unknown" hint after float sample. - eh.CounterResetHint = histogram.UnknownCounterReset - } - exp[k2] = append(exp[k2], sample{t: ts, h: eh}) - if ts%20 == 0 { - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - // Add some float. - for i := 0; i < 10; i++ { - ts++ - _, err := app.Append(0, s2, ts, float64(ts)) - require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: ts, f: float64(ts)}) - } - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - } - } - require.NoError(t, app.Commit()) - } - for _, custom := range []bool{true, false} { - app = head.Appender(context.Background()) - var hists []*histogram.FloatHistogram - if custom { - hists = tsdbutil.GenerateTestCustomBucketsFloatHistograms(100) - } else { - hists = tsdbutil.GenerateTestFloatHistograms(100) - } - for _, h := range hists { - ts++ - if !custom { - h.NegativeSpans = h.PositiveSpans - h.NegativeBuckets = h.PositiveBuckets - } - _, err := app.AppendHistogram(0, s2, ts, nil, h) - require.NoError(t, err) - eh := h.Copy() - if ts > 30 && (ts-10)%20 == 1 { - // Need "unknown" hint after float sample. - eh.CounterResetHint = histogram.UnknownCounterReset - } - exp[k2] = append(exp[k2], sample{t: ts, fh: eh}) - if ts%20 == 0 { - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - // Add some float. - for i := 0; i < 10; i++ { - ts++ - _, err := app.Append(0, s2, ts, float64(ts)) - require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: ts, f: float64(ts)}) - } - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - } - } - require.NoError(t, app.Commit()) - } - - // Restart head. - require.NoError(t, head.Close()) - startHead := func() { - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) - require.NoError(t, err) - head, err = NewHead(nil, nil, w, nil, head.opts, nil) - require.NoError(t, err) - require.NoError(t, head.Init(0)) - } - startHead() - - // Checking contents of s1. - ms = head.series.getByHash(s1.Hash(), s1) - require.Equal(t, expMmapChunks, ms.mmappedChunks) - require.Equal(t, expHeadChunkSamples, ms.headChunks.chunk.NumSamples()) - - testQuery := func() { - q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) - require.NoError(t, err) - act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*")) - compareSeries(t, exp, act) - } - testQuery() - - // Restart with no mmap chunks to test WAL replay. - require.NoError(t, head.Close()) - require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) - startHead() - testQuery() -} - func TestChunkSnapshot(t *testing.T) { head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false) defer func() { @@ -5360,48 +5089,6 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { require.Positive(t, offset) } -func TestHistogramWALANDWBLReplay(t *testing.T) { - dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) - require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) - require.NoError(t, err) - - opts := DefaultHeadOptions() - opts.ChunkRange = 1000 - opts.ChunkDirRoot = dir - opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds()) - opts.EnableNativeHistograms.Store(true) - opts.EnableOOONativeHistograms.Store(true) - - h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) - require.NoError(t, err) - require.NoError(t, h.Init(0)) - - var expOOOSamples []chunks.Sample - l := labels.FromStrings("foo", "bar") - appendSample := func(mins int64, val float64, isOOO bool, isCustomBucketHistogram bool) { - app := h.Appender(context.Background()) - var s sample - if isCustomBucketHistogram { - s = sample{t: mins * time.Minute.Milliseconds(), h: tsdbutil.GenerateTestCustomBucketsHistogram(int(val))} - } else { - s = sample{t: mins * time.Minute.Milliseconds(), h: tsdbutil.GenerateTestHistogram(int(val))} - } - _, err := app.AppendHistogram(0, l, mins*time.Minute.Milliseconds(), s.h, nil) - require.NoError(t, err) - require.NoError(t, app.Commit()) - - if isOOO { - expOOOSamples = append(expOOOSamples, s) - } - } - - // In-order histogram samples. - appendSample(60, 60, false, false) - -} - // TestWBLReplay checks the replay at a low level. func TestWBLReplay(t *testing.T) { for name, scenario := range sampleTypeScenarios { diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 9d1e24b706..458162522b 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -189,7 +189,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decoded <- exemplars - case record.HistogramSamples, record.CustomBucketHistogramSamples: + case record.HistogramSamples, record.HistogramSamplesLegacy: hists := histogramsPool.Get()[:0] hists, err = dec.HistogramSamples(rec, hists) if err != nil { @@ -201,7 +201,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decoded <- hists - case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples: + case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy: hists := floatHistogramsPool.Get()[:0] hists, err = dec.FloatHistogramSamples(rec, hists) if err != nil { @@ -726,7 +726,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decodedCh <- markers - case record.HistogramSamples: + case record.HistogramSamples, record.HistogramSamplesLegacy: hists := histogramSamplesPool.Get()[:0] hists, err = dec.HistogramSamples(rec, hists) if err != nil { @@ -738,7 +738,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decodedCh <- hists - case record.FloatHistogramSamples: + case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy: hists := floatHistogramSamplesPool.Get()[:0] hists, err = dec.FloatHistogramSamples(rec, hists) if err != nil { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index bc1cb67d1e..adbd3278ba 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -963,7 +963,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( }, }, { - name: "After Series() prev head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in response.", + name: "After Series() prev head mmapped after getting samples, new head gets new samples also overlapping, none should appear in response.", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 2b19cdbb6f..2dd7ffe027 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -48,14 +48,14 @@ const ( MmapMarkers Type = 5 // Metadata is used to match WAL records of type Metadata. Metadata Type = 6 - // HistogramSamples is used to match WAL records of type Histograms. - HistogramSamples Type = 7 - // FloatHistogramSamples is used to match WAL records of type Float Histograms. - FloatHistogramSamples Type = 8 - // CustomBucketHistogramSamples is used to match WAL records of type Histogram with custom buckets. - CustomBucketHistogramSamples Type = 9 - // CustomBucketFloatHistogramSamples is used to match WAL records of type Float Histogram with custom buckets. - CustomBucketFloatHistogramSamples Type = 10 + // HistogramSamplesLegacy is used to match WAL records of type Histograms prior to intrdocuing support of custom buckets, to maintain backwards compatibility. + HistogramSamplesLegacy Type = 7 + // FloatHistogramSamplesLegacy is used to match WAL records of type Float Histograms proior to introducing support of custom buckets, to maintain backwards compatibility. + FloatHistogramSamplesLegacy Type = 8 + // HistogramSamples is used to match WAL records of type Histogram, and supports custom buckets. + HistogramSamples Type = 9 + // FloatHistogramSamples is used to match WAL records of type Float Histogram, and supports custom buckets. + FloatHistogramSamples Type = 10 ) func (rt Type) String() string { @@ -68,14 +68,14 @@ func (rt Type) String() string { return "tombstones" case Exemplars: return "exemplars" + case HistogramSamplesLegacy: + return "histogram_samples_legacy" + case FloatHistogramSamplesLegacy: + return "float_histogram_samples_legacy" case HistogramSamples: - return "histogram_samples" + return "histogram_sample" case FloatHistogramSamples: return "float_histogram_samples" - case CustomBucketHistogramSamples: - return "custom_bucket_histogram_samples" - case CustomBucketFloatHistogramSamples: - return "custom_bucket_float_histogram_samples" case MmapMarkers: return "mmapmarkers" case Metadata: @@ -215,7 +215,7 @@ func (d *Decoder) Type(rec []byte) Type { return Unknown } switch t := Type(rec[0]); t { - case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples, CustomBucketHistogramSamples, CustomBucketFloatHistogramSamples: + case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamplesLegacy, FloatHistogramSamplesLegacy, HistogramSamples, FloatHistogramSamples: return t } return Unknown @@ -436,7 +436,7 @@ func (d *Decoder) MmapMarkers(rec []byte, markers []RefMmapMarker) ([]RefMmapMar func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) ([]RefHistogramSample, error) { dec := encoding.Decbuf{B: rec} t := Type(dec.Byte()) - if t != HistogramSamples && t != CustomBucketHistogramSamples { + if t != HistogramSamples && t != HistogramSamplesLegacy { return nil, errors.New("invalid record type") } if dec.Len() == 0 { @@ -528,7 +528,7 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) { func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) { dec := encoding.Decbuf{B: rec} t := Type(dec.Byte()) - if t != FloatHistogramSamples && t != CustomBucketFloatHistogramSamples { + if t != FloatHistogramSamples && t != FloatHistogramSamplesLegacy { return nil, errors.New("invalid record type") } if dec.Len() == 0 { @@ -744,40 +744,10 @@ func (e *Encoder) MmapMarkers(markers []RefMmapMarker, b []byte) []byte { return buf.Get() } -func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]byte, bool) { +func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) []byte { buf := encoding.Encbuf{B: b} buf.PutByte(byte(HistogramSamples)) - if len(histograms) == 0 { - return buf.Get(), false - } - - // Store base timestamp and base reference number of first histogram. - // All histograms encode their timestamp and ref as delta to those. - first := histograms[0] - buf.PutBE64(uint64(first.Ref)) - buf.PutBE64int64(first.T) - - customBucketSamplesExist := false - for _, h := range histograms { - if h.H.UsesCustomBuckets() { - customBucketSamplesExist = true - continue - } - - buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) - buf.PutVarint64(h.T - first.T) - - EncodeHistogram(&buf, h.H) - } - - return buf.Get(), customBucketSamplesExist -} - -func (e *Encoder) CustomBucketHistogramSamples(histograms []RefHistogramSample, b []byte) []byte { - buf := encoding.Encbuf{B: b} - buf.PutByte(byte(CustomBucketHistogramSamples)) - if len(histograms) == 0 { return buf.Get() } @@ -789,12 +759,10 @@ func (e *Encoder) CustomBucketHistogramSamples(histograms []RefHistogramSample, buf.PutBE64int64(first.T) for _, h := range histograms { - if h.H.UsesCustomBuckets() { - buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) - buf.PutVarint64(h.T - first.T) + buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) + buf.PutVarint64(h.T - first.T) - EncodeHistogram(&buf, h.H) - } + EncodeHistogram(&buf, h.H) } return buf.Get() @@ -841,40 +809,10 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) { } } -func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) ([]byte, bool) { +func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte { buf := encoding.Encbuf{B: b} buf.PutByte(byte(FloatHistogramSamples)) - if len(histograms) == 0 { - return buf.Get(), false - } - - // Store base timestamp and base reference number of first histogram. - // All histograms encode their timestamp and ref as delta to those. - first := histograms[0] - buf.PutBE64(uint64(first.Ref)) - buf.PutBE64int64(first.T) - - customBucketsExist := false - for _, h := range histograms { - if h.FH.UsesCustomBuckets() { - customBucketsExist = true - continue - } - - buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) - buf.PutVarint64(h.T - first.T) - - EncodeFloatHistogram(&buf, h.FH) - } - - return buf.Get(), customBucketsExist -} - -func (e *Encoder) CustomBucketFloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte { - buf := encoding.Encbuf{B: b} - buf.PutByte(byte(CustomBucketFloatHistogramSamples)) - if len(histograms) == 0 { return buf.Get() } @@ -886,12 +824,10 @@ func (e *Encoder) CustomBucketFloatHistogramSamples(histograms []RefFloatHistogr buf.PutBE64int64(first.T) for _, h := range histograms { - if h.FH.UsesCustomBuckets() { - buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) - buf.PutVarint64(h.T - first.T) + buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) + buf.PutVarint64(h.T - first.T) - EncodeFloatHistogram(&buf, h.FH) - } + EncodeFloatHistogram(&buf, h.FH) } return buf.Get() diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index af94f2b207..901fe2e9f6 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -166,12 +166,9 @@ func TestRecord_EncodeDecode(t *testing.T) { }, } - histSamples, _ := enc.HistogramSamples(histograms, nil) - customBucketHistSamples := enc.CustomBucketHistogramSamples(histograms, nil) + histSamples := enc.HistogramSamples(histograms, nil) decHistograms, err := dec.HistogramSamples(histSamples, nil) require.NoError(t, err) - decCustomBucketHistSamples, err := dec.HistogramSamples(customBucketHistSamples, nil) - decHistograms = append(decHistograms, decCustomBucketHistSamples...) require.Equal(t, histograms, decHistograms) floatHistograms := make([]RefFloatHistogramSample, len(histograms)) @@ -182,13 +179,9 @@ func TestRecord_EncodeDecode(t *testing.T) { FH: h.H.ToFloat(nil), } } - floatHistSamples, _ := enc.FloatHistogramSamples(floatHistograms, nil) - customBucketFloatHistSamples := enc.CustomBucketFloatHistogramSamples(floatHistograms, nil) + floatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil) decFloatHistograms, err := dec.FloatHistogramSamples(floatHistSamples, nil) require.NoError(t, err) - decCustomBucketFloatHistograms, err := dec.FloatHistogramSamples(customBucketFloatHistSamples, nil) - require.NoError(t, err) - decFloatHistograms = append(decFloatHistograms, decCustomBucketFloatHistograms...) require.Equal(t, floatHistograms, decFloatHistograms) // Gauge integer histograms. @@ -196,13 +189,9 @@ func TestRecord_EncodeDecode(t *testing.T) { histograms[i].H.CounterResetHint = histogram.GaugeType } - gaugeHistSamples, _ := enc.HistogramSamples(histograms, nil) - customBucketGaugeHistSamples := enc.CustomBucketHistogramSamples(histograms, nil) + gaugeHistSamples := enc.HistogramSamples(histograms, nil) decGaugeHistograms, err := dec.HistogramSamples(gaugeHistSamples, nil) require.NoError(t, err) - decCustomBucketGaugeHistograms, err := dec.HistogramSamples(customBucketGaugeHistSamples, nil) - require.NoError(t, err) - decGaugeHistograms = append(decGaugeHistograms, decCustomBucketGaugeHistograms...) require.Equal(t, histograms, decGaugeHistograms) // Gauge float histograms. @@ -210,14 +199,10 @@ func TestRecord_EncodeDecode(t *testing.T) { floatHistograms[i].FH.CounterResetHint = histogram.GaugeType } - gaugeFloatHistSamples, _ := enc.FloatHistogramSamples(floatHistograms, nil) - customBucketGaugeFloatHistSamples := enc.CustomBucketFloatHistogramSamples(floatHistograms, nil) + gaugeFloatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil) decGaugeFloatHistograms, err := dec.FloatHistogramSamples(gaugeFloatHistSamples, nil) require.NoError(t, err) - decCustomBucketGaugeFloatHistograms, err := dec.FloatHistogramSamples(customBucketGaugeFloatHistSamples, nil) - require.NoError(t, err) - decFloatHistograms = append(decGaugeFloatHistograms, decCustomBucketGaugeFloatHistograms...) - require.Equal(t, floatHistograms, decFloatHistograms) + require.Equal(t, floatHistograms, decGaugeFloatHistograms) } // TestRecord_Corrupted ensures that corrupted records return the correct error. @@ -318,14 +303,10 @@ func TestRecord_Corrupted(t *testing.T) { }, } - corruptedHists, _ := enc.HistogramSamples(histograms, nil) + corruptedHists := enc.HistogramSamples(histograms, nil) corruptedHists = corruptedHists[:8] - corruptedCustomBucketHists := enc.CustomBucketHistogramSamples(histograms, nil) - corruptedCustomBucketHists = corruptedCustomBucketHists[:8] _, err := dec.HistogramSamples(corruptedHists, nil) require.ErrorIs(t, err, encoding.ErrInvalidSize) - _, err = dec.HistogramSamples(corruptedCustomBucketHists, nil) - require.ErrorIs(t, err, encoding.ErrInvalidSize) }) } @@ -383,12 +364,9 @@ func TestRecord_Type(t *testing.T) { }, }, } - hists, _ := enc.HistogramSamples(histograms, nil) + hists := enc.HistogramSamples(histograms, nil) recordType = dec.Type(hists) require.Equal(t, HistogramSamples, recordType) - customBucketHists := enc.CustomBucketHistogramSamples(histograms, nil) - recordType = dec.Type(customBucketHists) - require.Equal(t, CustomBucketHistogramSamples, recordType) recordType = dec.Type(nil) require.Equal(t, Unknown, recordType) diff --git a/tsdb/tsdbutil/histogram.go b/tsdb/tsdbutil/histogram.go index 4b6cebd579..9230b68376 100644 --- a/tsdb/tsdbutil/histogram.go +++ b/tsdb/tsdbutil/histogram.go @@ -64,7 +64,6 @@ func GenerateTestCustomBucketsHistograms(n int) (r []*histogram.Histogram) { h.CounterResetHint = histogram.NotCounterReset } r = append(r, h) - } return r } diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 5bb79595b9..ffb96dbe22 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -208,7 +208,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He stats.TotalSamples += len(samples) stats.DroppedSamples += len(samples) - len(repl) - case record.HistogramSamples: + case record.HistogramSamples, record.HistogramSamplesLegacy: histogramSamples, err = dec.HistogramSamples(rec, histogramSamples) if err != nil { return nil, fmt.Errorf("decode histogram samples: %w", err) @@ -221,28 +221,11 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He } } if len(repl) > 0 { - buf, _ = enc.HistogramSamples(repl, buf) + buf = enc.HistogramSamples(repl, buf) } stats.TotalSamples += len(histogramSamples) stats.DroppedSamples += len(histogramSamples) - len(repl) - case record.CustomBucketHistogramSamples: - histogramSamples, err = dec.HistogramSamples(rec, histogramSamples) - if err != nil { - return nil, fmt.Errorf("decode histogram samples: %w", err) - } - // Drop irrelevant histogramSamples in place. - repl := histogramSamples[:0] - for _, h := range histogramSamples { - if h.T >= mint { - repl = append(repl, h) - } - } - if len(repl) > 0 { - buf = enc.CustomBucketHistogramSamples(repl, buf) - } - stats.TotalSamples += len(histogramSamples) - stats.DroppedSamples += len(histogramSamples) - len(repl) - case record.FloatHistogramSamples: + case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy: floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples) if err != nil { return nil, fmt.Errorf("decode float histogram samples: %w", err) @@ -255,24 +238,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He } } if len(repl) > 0 { - buf, _ = enc.FloatHistogramSamples(repl, buf) - } - stats.TotalSamples += len(floatHistogramSamples) - stats.DroppedSamples += len(floatHistogramSamples) - len(repl) - case record.CustomBucketFloatHistogramSamples: - floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples) - if err != nil { - return nil, fmt.Errorf("decode float histogram samples: %w", err) - } - // Drop irrelevant floatHistogramSamples in place. - repl := floatHistogramSamples[:0] - for _, fh := range floatHistogramSamples { - if fh.T >= mint { - repl = append(repl, fh) - } - } - if len(repl) > 0 { - buf = enc.CustomBucketFloatHistogramSamples(repl, buf) + buf = enc.FloatHistogramSamples(repl, buf) } stats.TotalSamples += len(floatHistogramSamples) stats.DroppedSamples += len(floatHistogramSamples) - len(repl) diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index f947f28095..b2c603f134 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -236,7 +236,7 @@ func TestCheckpoint(t *testing.T) { require.NoError(t, w.Log(b)) samplesInWAL += 4 h := makeHistogram(i) - b, _ = enc.HistogramSamples([]record.RefHistogramSample{ + b = enc.HistogramSamples([]record.RefHistogramSample{ {Ref: 0, T: last, H: h}, {Ref: 1, T: last + 10000, H: h}, {Ref: 2, T: last + 20000, H: h}, @@ -245,7 +245,7 @@ func TestCheckpoint(t *testing.T) { require.NoError(t, w.Log(b)) histogramsInWAL += 4 cbh := makeCustomBucketHistogram(i) - b = enc.CustomBucketHistogramSamples([]record.RefHistogramSample{ + b = enc.HistogramSamples([]record.RefHistogramSample{ {Ref: 0, T: last, H: cbh}, {Ref: 1, T: last + 10000, H: cbh}, {Ref: 2, T: last + 20000, H: cbh}, @@ -254,7 +254,7 @@ func TestCheckpoint(t *testing.T) { require.NoError(t, w.Log(b)) histogramsInWAL += 4 fh := makeFloatHistogram(i) - b, _ = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{ + b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{ {Ref: 0, T: last, FH: fh}, {Ref: 1, T: last + 10000, FH: fh}, {Ref: 2, T: last + 20000, FH: fh}, @@ -263,7 +263,7 @@ func TestCheckpoint(t *testing.T) { require.NoError(t, w.Log(b)) floatHistogramsInWAL += 4 cbfh := makeCustomBucketFloatHistogram(i) - b = enc.CustomBucketFloatHistogramSamples([]record.RefFloatHistogramSample{ + b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{ {Ref: 0, T: last, FH: cbfh}, {Ref: 1, T: last + 10000, FH: cbfh}, {Ref: 2, T: last + 20000, FH: cbfh}, @@ -330,14 +330,14 @@ func TestCheckpoint(t *testing.T) { require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp") } samplesInCheckpoint += len(samples) - case record.HistogramSamples, record.CustomBucketHistogramSamples: + case record.HistogramSamples, record.HistogramSamplesLegacy: histograms, err := dec.HistogramSamples(rec, nil) require.NoError(t, err) for _, h := range histograms { require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp") } histogramsInCheckpoint += len(histograms) - case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples: + case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy: floatHistograms, err := dec.FloatHistogramSamples(rec, nil) require.NoError(t, err) for _, h := range floatHistograms { diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 169bd296fe..07f881eeaf 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -546,7 +546,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } w.writer.AppendExemplars(exemplars) - case record.HistogramSamples, record.CustomBucketHistogramSamples: + case record.HistogramSamples, record.HistogramSamplesLegacy: // Skip if experimental "histograms over remote write" is not enabled. if !w.sendHistograms { break @@ -574,7 +574,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { histogramsToSend = histogramsToSend[:0] } - case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples: + case record.FloatHistogramSamples, record.FloatHistogramSamplesLegacy: // Skip if experimental "histograms over remote write" is not enabled. if !w.sendHistograms { break diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 5ff70bb215..21490154d9 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -209,7 +209,7 @@ func TestTailSamples(t *testing.T) { NegativeBuckets: []int64{int64(-i) - 1}, } - histograms, _ := enc.HistogramSamples([]record.RefHistogramSample{{ + histograms := enc.HistogramSamples([]record.RefHistogramSample{{ Ref: chunks.HeadSeriesRef(inner), T: now.UnixNano() + 1, H: hist, @@ -226,21 +226,21 @@ func TestTailSamples(t *testing.T) { CustomValues: []float64{float64(i) + 2}, } - customBucketHistograms := enc.CustomBucketHistogramSamples([]record.RefHistogramSample{{ + customBucketHistograms := enc.HistogramSamples([]record.RefHistogramSample{{ Ref: chunks.HeadSeriesRef(inner), T: now.UnixNano() + 1, H: customBucketHist, }}, nil) require.NoError(t, w.Log(customBucketHistograms)) - floatHistograms, _ := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ + floatHistograms := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ Ref: chunks.HeadSeriesRef(inner), T: now.UnixNano() + 1, FH: hist.ToFloat(nil), }}, nil) require.NoError(t, w.Log(floatHistograms)) - customBucketFloatHistograms := enc.CustomBucketFloatHistogramSamples([]record.RefFloatHistogramSample{{ + customBucketFloatHistograms := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ Ref: chunks.HeadSeriesRef(inner), T: now.UnixNano() + 1, FH: customBucketHist.ToFloat(nil),