From 37df50adb9365b415e224128e02b2f5f606a71c2 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Wed, 13 Nov 2024 14:20:11 -0800 Subject: [PATCH] Attempt for record type --- tsdb/agent/db.go | 41 +++-- tsdb/agent/db_test.go | 7 +- tsdb/docs/format/wal.md | 26 +++ tsdb/encoding/encoding.go | 8 - tsdb/head_append.go | 46 ++++- tsdb/head_test.go | 313 +++++++++++++++++++++++++++++++++++ tsdb/head_wal.go | 7 +- tsdb/record/record.go | 112 +++++++++---- tsdb/record/record_test.go | 71 +++++++- tsdb/testutil.go | 34 +++- tsdb/tsdbutil/histogram.go | 23 +++ tsdb/wlog/checkpoint.go | 40 ++++- tsdb/wlog/checkpoint_test.go | 10 +- tsdb/wlog/watcher_test.go | 6 +- 14 files changed, 660 insertions(+), 84 deletions(-) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 3863e6cd99..5067edc3ae 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -463,7 +463,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H return } decoded <- samples - case record.HistogramSamples: + case record.HistogramSamples, record.CustomBucketHistogramSamples: histograms := histogramsPool.Get()[:0] histograms, err = dec.HistogramSamples(rec, histograms) if err != nil { @@ -475,7 +475,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H return } decoded <- histograms - case record.FloatHistogramSamples: + case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples: floatHistograms := floatHistogramsPool.Get()[:0] floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) if err != nil { @@ -1154,19 +1154,40 @@ func (a *appender) log() error { } if len(a.pendingHistograms) > 0 { - buf = encoder.HistogramSamples(a.pendingHistograms, buf) - if err := a.wal.Log(buf); err != nil { - return err + buf1, buf2 := encoder.HistogramSamples(a.pendingHistograms, buf) + //buf = append(buf1, buf2...) + //if err := a.wal.Log(buf); err != nil { + // return err + //} + if len(buf1) > 0 { + buf = buf1[:0] + if err := a.wal.Log(buf1); err != nil { + return err + } + } + if len(buf2) > 0 { + buf = buf2[:0] + if err := a.wal.Log(buf2); err != nil { + return err + } } - buf = buf[:0] } if len(a.pendingFloatHistograms) > 0 { - buf = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf) - if err := a.wal.Log(buf); err != nil { - return err + buf1, buf2 := encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf) + if len(buf1) > 0 { + buf = buf1[:0] + if err := a.wal.Log(buf1); err != nil { + return err + } } - buf = buf[:0] + if len(buf2) > 0 { + buf = buf2[:0] + if err := a.wal.Log(buf2); err != nil { + return err + } + } + //buf = buf[:0] } if len(a.pendingExamplars) > 0 { diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index b28c29095c..5332c61cdb 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -193,7 +193,8 @@ func TestCommit(t *testing.T) { ) for r.Next() { rec := r.Record() - switch dec.Type(rec) { + recType := dec.Type(rec) + switch recType { case record.Series: var series []record.RefSeries series, err = dec.Series(rec, series) @@ -206,13 +207,13 @@ func TestCommit(t *testing.T) { require.NoError(t, err) walSamplesCount += len(samples) - case record.HistogramSamples: + case record.HistogramSamples, record.CustomBucketHistogramSamples: var histograms []record.RefHistogramSample histograms, err = dec.HistogramSamples(rec, histograms) require.NoError(t, err) walHistogramCount += len(histograms) - case record.FloatHistogramSamples: + case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples: var floatHistograms []record.RefFloatHistogramSample floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) require.NoError(t, err) diff --git a/tsdb/docs/format/wal.md b/tsdb/docs/format/wal.md index db1ce97a8b..835ede4113 100644 --- a/tsdb/docs/format/wal.md +++ b/tsdb/docs/format/wal.md @@ -79,6 +79,32 @@ The first sample record begins at the second row. └──────────────────────────────────────────────────────────────────┘ ``` +### Native histogram records + +Native histogram records are encoded as + +``` +┌──────────────────────────────────────────────────────────────────┐ +│ type = 2 <1b> │ +├──────────────────────────────────────────────────────────────────┤ +│ ┌────────────────────┬───────────────────────────┐ │ +│ │ id <8b> │ timestamp <8b> │ │ +│ └────────────────────┴───────────────────────────┘ │ +│ ┌────────────────────┬───────────────────────────┬ │ +│ │ id_delta │ timestamp_delta │ │ +│ ├────────────────────┴───────────────────────────┴─────────────┤ │ +│ │ n = len(labels) │ │ +│ ├──────────────────────┬───────────────────────────────────────┤ │ +│ │ len(str_1) │ str_1 │ │ +│ ├──────────────────────┴───────────────────────────────────────┤ │ +│ │ ... │ │ +│ ├───────────────────────┬──────────────────────────────────────┤ │ +│ │ len(str_2n) │ str_2n │ │ │ +│ └───────────────────────┴────────────────┴─────────────────────┘ │ +│ . . . │ +└──────────────────────────────────────────────────────────────────┘ +``` + ### Tombstone records Tombstone records encode tombstones as a list of triples `(series_id, min_time, max_time)` diff --git a/tsdb/encoding/encoding.go b/tsdb/encoding/encoding.go index c339a9a5bb..cc7d0990f6 100644 --- a/tsdb/encoding/encoding.go +++ b/tsdb/encoding/encoding.go @@ -104,14 +104,6 @@ func (e *Encbuf) PutHashSum(h hash.Hash) { e.B = h.Sum(e.B) } -// IsWholeWhenMultiplied checks to see if the number when multiplied by 1000 can -// be converted into an integer without losing precision. -func IsWholeWhenMultiplied(in float64) bool { - i := uint(math.Round(in * 1000)) - out := float64(i) / 1000 - return in == out -} - // Decbuf provides safe methods to extract data from a byte slice. It does all // necessary bounds checking and advancing of the byte slice. // Several datums can be extracted without checking for errors. However, before using diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 7dacb9037b..3701a57135 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -942,17 +942,47 @@ func (a *headAppender) log() error { } } if len(a.histograms) > 0 { - rec = enc.HistogramSamples(a.histograms, buf) - buf = rec[:0] - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log histograms: %w", err) + rec1, rec2 := enc.HistogramSamples(a.histograms, buf) + //rec = append(rec1, rec2...) + // + //buf = rec[:0] + // + //if err := a.head.wal.Log(rec); err != nil { + // return fmt.Errorf("log samples: %w", err) + //} + if len(rec1) != 0 { + buf = rec1[:0] + if err := a.head.wal.Log(rec1); err != nil { + return fmt.Errorf("log histograms: %w", err) + } + } + if len(rec2) != 0 { + buf = rec2[:0] + if err := a.head.wal.Log(rec2); err != nil { + return fmt.Errorf("log custom bucket histograms: %w", err) + } } } if len(a.floatHistograms) > 0 { - rec = enc.FloatHistogramSamples(a.floatHistograms, buf) - buf = rec[:0] - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log float histograms: %w", err) + rec1, rec2 := enc.FloatHistogramSamples(a.floatHistograms, buf) + //rec = append(rec1, rec2...) + // + //buf = rec[:0] + // + //if err := a.head.wal.Log(rec); err != nil { + // return fmt.Errorf("log samples: %w", err) + //} + if len(rec1) != 0 { + buf = rec1[:0] + if err := a.head.wal.Log(rec1); err != nil { + return fmt.Errorf("log float histograms: %w", err) + } + } + if len(rec2) != 0 { + buf = rec2[:0] + if err := a.head.wal.Log(rec2); err != nil { + return fmt.Errorf("log custom bucket float histograms: %w", err) + } } } // Exemplars should be logged after samples (float/native histogram/etc), diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 2ca3aeffc7..527476e113 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -740,6 +740,89 @@ func TestHead_ReadWAL(t *testing.T) { } } +func TestHead_ReadWAL2(t *testing.T) { + for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { + entries := []interface{}{ + []record.RefSeries{ + {Ref: 10, Labels: labels.FromStrings("a", "1")}, + {Ref: 11, Labels: labels.FromStrings("a", "2")}, + {Ref: 100, Labels: labels.FromStrings("a", "3")}, + }, + []record.RefHistogramSample{ + {Ref: 0, T: 99, H: tsdbutil.GenerateTestHistogram(1)}, + {Ref: 10, T: 100, H: tsdbutil.GenerateTestCustomBucketsHistogram(2)}, + {Ref: 100, T: 100, H: tsdbutil.GenerateTestHistogram(3)}, + }, + []record.RefSeries{ + {Ref: 50, Labels: labels.FromStrings("a", "4")}, + // This series has two refs pointing to it. + {Ref: 101, Labels: labels.FromStrings("a", "3")}, + }, + []record.RefHistogramSample{ + {Ref: 10, T: 101, H: tsdbutil.GenerateTestHistogram(5)}, + {Ref: 50, T: 101, H: tsdbutil.GenerateTestHistogram(6)}, + {Ref: 101, T: 101, H: tsdbutil.GenerateTestCustomBucketsHistogram(7)}, + }, + []tombstones.Stone{ + {Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}}, + }, + []record.RefExemplar{ + {Ref: 10, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + } + + head, w := newTestHead(t, 1000, compress, false) + defer func() { + require.NoError(t, head.Close()) + }() + + populateTestWL(t, w, entries) + + require.NoError(t, head.Init(math.MinInt64)) + require.Equal(t, uint64(101), head.lastSeriesID.Load()) + + s10 := head.series.getByID(10) + s11 := head.series.getByID(11) + s50 := head.series.getByID(50) + s100 := head.series.getByID(100) + + testutil.RequireEqual(t, labels.FromStrings("a", "1"), s10.lset) + require.Nil(t, s11) // Series without samples should be garbage collected at head.Init(). + testutil.RequireEqual(t, labels.FromStrings("a", "4"), s50.lset) + testutil.RequireEqual(t, labels.FromStrings("a", "3"), s100.lset) + + expandChunk := func(c chunkenc.Iterator) (x []sample) { + for c.Next() == chunkenc.ValHistogram { + t, v := c.AtHistogram(nil) + //t, v := c.At() + x = append(x, sample{t: t, h: v}) + } + require.NoError(t, c.Err()) + return x + } + + c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + require.NoError(t, err) + require.Equal(t, []sample{{100, 0, tsdbutil.GenerateTestCustomBucketsHistogram(2), nil}, {101, 0, tsdbutil.GenerateTestCustomBucketsHistogram(5), nil}}, expandChunk(c.chunk.Iterator(nil))) + c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + require.NoError(t, err) + require.Equal(t, []sample{{101, 0, tsdbutil.GenerateTestHistogram(6), nil}}, expandChunk(c.chunk.Iterator(nil))) + // The samples before the new series record should be discarded since a duplicate record + // is only possible when old samples were compacted. + c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + require.NoError(t, err) + require.Equal(t, []sample{{101, 0, tsdbutil.GenerateTestCustomBucketsHistogram(7), nil}}, expandChunk(c.chunk.Iterator(nil))) + + q, err := head.ExemplarQuerier(context.Background()) + require.NoError(t, err) + e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}) + require.NoError(t, err) + require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0])) + }) + } +} + func TestHead_WALMultiRef(t *testing.T) { head, w := newTestHead(t, 1000, wlog.CompressionNone, false) @@ -3953,6 +4036,194 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { testQuery() } +func TestHistogramInWALAndMmapChunk2(t *testing.T) { + head, _ := newTestHead(t, 3000, wlog.CompressionNone, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + + // Series with only histograms. + s1 := labels.FromStrings("a", "b1") + k1 := s1.String() + numHistograms := 300 + exp := map[string][]chunks.Sample{} + ts := int64(0) + var app storage.Appender + for _, custom := range []bool{true, false} { + app = head.Appender(context.Background()) + var hists []*histogram.Histogram + if custom { + hists = tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) + } else { + hists = tsdbutil.GenerateTestHistograms(numHistograms) + } + for _, h := range hists { + if !custom { + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + } + _, err := app.AppendHistogram(0, s1, ts, h, nil) + require.NoError(t, err) + exp[k1] = append(exp[k1], sample{t: ts, h: h.Copy()}) + ts++ + if ts%5 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) + } + for _, custom := range []bool{true, false} { + app = head.Appender(context.Background()) + var hists []*histogram.FloatHistogram + if custom { + hists = tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) + } else { + hists = tsdbutil.GenerateTestFloatHistograms(numHistograms) + } + for _, h := range hists { + if !custom { + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + } + _, err := app.AppendHistogram(0, s1, ts, nil, h) + require.NoError(t, err) + exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()}) + ts++ + if ts%5 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) + head.mmapHeadChunks() + } + + // There should be 20 mmap chunks in s1. + ms := head.series.getByHash(s1.Hash(), s1) + require.Len(t, ms.mmappedChunks, 19) + expMmapChunks := make([]*mmappedChunk, 0, 20) + for _, mmap := range ms.mmappedChunks { + require.Positive(t, mmap.numSamples) + cpy := *mmap + expMmapChunks = append(expMmapChunks, &cpy) + } + expHeadChunkSamples := ms.headChunks.chunk.NumSamples() + require.Positive(t, expHeadChunkSamples) + + // Series with mix of histograms and float. + s2 := labels.FromStrings("a", "b2") + k2 := s2.String() + ts = 0 + for _, custom := range []bool{true, false} { + app = head.Appender(context.Background()) + var hists []*histogram.Histogram + if custom { + hists = tsdbutil.GenerateTestCustomBucketsHistograms(100) + } else { + hists = tsdbutil.GenerateTestHistograms(100) + } + for _, h := range hists { + ts++ + if !custom { + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + } + _, err := app.AppendHistogram(0, s2, ts, h, nil) + require.NoError(t, err) + eh := h.Copy() + if ts > 30 && (ts-10)%20 == 1 { + // Need "unknown" hint after float sample. + eh.CounterResetHint = histogram.UnknownCounterReset + } + exp[k2] = append(exp[k2], sample{t: ts, h: eh}) + if ts%20 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + // Add some float. + for i := 0; i < 10; i++ { + ts++ + _, err := app.Append(0, s2, ts, float64(ts)) + require.NoError(t, err) + exp[k2] = append(exp[k2], sample{t: ts, f: float64(ts)}) + } + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) + } + for _, custom := range []bool{true, false} { + app = head.Appender(context.Background()) + var hists []*histogram.FloatHistogram + if custom { + hists = tsdbutil.GenerateTestCustomBucketsFloatHistograms(100) + } else { + hists = tsdbutil.GenerateTestFloatHistograms(100) + } + for _, h := range hists { + ts++ + if !custom { + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + } + _, err := app.AppendHistogram(0, s2, ts, nil, h) + require.NoError(t, err) + eh := h.Copy() + if ts > 30 && (ts-10)%20 == 1 { + // Need "unknown" hint after float sample. + eh.CounterResetHint = histogram.UnknownCounterReset + } + exp[k2] = append(exp[k2], sample{t: ts, fh: eh}) + if ts%20 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + // Add some float. + for i := 0; i < 10; i++ { + ts++ + _, err := app.Append(0, s2, ts, float64(ts)) + require.NoError(t, err) + exp[k2] = append(exp[k2], sample{t: ts, f: float64(ts)}) + } + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) + } + + // Restart head. + require.NoError(t, head.Close()) + startHead := func() { + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, nil, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(0)) + } + startHead() + + // Checking contents of s1. + ms = head.series.getByHash(s1.Hash(), s1) + require.Equal(t, expMmapChunks, ms.mmappedChunks) + require.Equal(t, expHeadChunkSamples, ms.headChunks.chunk.NumSamples()) + + testQuery := func() { + q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) + require.NoError(t, err) + act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*")) + compareSeries(t, exp, act) + } + testQuery() + + // Restart with no mmap chunks to test WAL replay. + require.NoError(t, head.Close()) + require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) + startHead() + testQuery() +} + func TestChunkSnapshot(t *testing.T) { head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false) defer func() { @@ -5089,6 +5360,48 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { require.Positive(t, offset) } +func TestHistogramWALANDWBLReplay(t *testing.T) { + dir := t.TempDir() + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + require.NoError(t, err) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + require.NoError(t, err) + + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = dir + opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds()) + opts.EnableNativeHistograms.Store(true) + opts.EnableOOONativeHistograms.Store(true) + + h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) + require.NoError(t, err) + require.NoError(t, h.Init(0)) + + var expOOOSamples []chunks.Sample + l := labels.FromStrings("foo", "bar") + appendSample := func(mins int64, val float64, isOOO bool, isCustomBucketHistogram bool) { + app := h.Appender(context.Background()) + var s sample + if isCustomBucketHistogram { + s = sample{t: mins * time.Minute.Milliseconds(), h: tsdbutil.GenerateTestCustomBucketsHistogram(int(val))} + } else { + s = sample{t: mins * time.Minute.Milliseconds(), h: tsdbutil.GenerateTestHistogram(int(val))} + } + _, err := app.AppendHistogram(0, l, mins*time.Minute.Milliseconds(), s.h, nil) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + if isOOO { + expOOOSamples = append(expOOOSamples, s) + } + } + + // In-order histogram samples. + appendSample(60, 60, false, false) + +} + // TestWBLReplay checks the replay at a low level. func TestWBLReplay(t *testing.T) { for name, scenario := range sampleTypeScenarios { diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index dd4f4f8b17..9d1e24b706 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -139,7 +139,8 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch dec := record.NewDecoder(syms) for r.Next() { rec := r.Record() - switch dec.Type(rec) { + recType := dec.Type(rec) + switch recType { case record.Series: series := seriesPool.Get()[:0] series, err = dec.Series(rec, series) @@ -188,7 +189,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decoded <- exemplars - case record.HistogramSamples: + case record.HistogramSamples, record.CustomBucketHistogramSamples: hists := histogramsPool.Get()[:0] hists, err = dec.HistogramSamples(rec, hists) if err != nil { @@ -200,7 +201,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decoded <- hists - case record.FloatHistogramSamples: + case record.FloatHistogramSamples, record.CustomBucketFloatHistogramSamples: hists := floatHistogramsPool.Get()[:0] hists, err = dec.FloatHistogramSamples(rec, hists) if err != nil { diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 83adecdbb4..15f5053d53 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -52,6 +52,10 @@ const ( HistogramSamples Type = 7 // FloatHistogramSamples is used to match WAL records of type Float Histograms. FloatHistogramSamples Type = 8 + // CustomBucketHistogramSamples is used to match WAL records of type Histogram with custom buckets. + CustomBucketHistogramSamples Type = 9 + // CustomBucketFloatHistogramSamples is used to match WAL records of type Float Histogram with custom buckets. + CustomBucketFloatHistogramSamples Type = 10 ) func (rt Type) String() string { @@ -68,6 +72,10 @@ func (rt Type) String() string { return "histogram_samples" case FloatHistogramSamples: return "float_histogram_samples" + case CustomBucketHistogramSamples: + return "custom_bucket_histogram_samples" + case CustomBucketFloatHistogramSamples: + return "custom_bucket_float_histogram_samples" case MmapMarkers: return "mmapmarkers" case Metadata: @@ -185,6 +193,10 @@ type RefFloatHistogramSample struct { FH *histogram.FloatHistogram } +type RefCustomBucketHistogramSample struct { + RefHistogramSample +} + // RefMmapMarker marks that the all the samples of the given series until now have been m-mapped to disk. type RefMmapMarker struct { Ref chunks.HeadSeriesRef @@ -207,7 +219,7 @@ func (d *Decoder) Type(rec []byte) Type { return Unknown } switch t := Type(rec[0]); t { - case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples: + case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples, CustomBucketHistogramSamples, CustomBucketFloatHistogramSamples: return t } return Unknown @@ -428,7 +440,7 @@ func (d *Decoder) MmapMarkers(rec []byte, markers []RefMmapMarker) ([]RefMmapMar func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) ([]RefHistogramSample, error) { dec := encoding.Decbuf{B: rec} t := Type(dec.Byte()) - if t != HistogramSamples { + if t != HistogramSamples && t != CustomBucketHistogramSamples { return nil, errors.New("invalid record type") } if dec.Len() == 0 { @@ -509,12 +521,10 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) { if histogram.IsCustomBucketsSchema(h.Schema) { l = buf.Uvarint() if l > 0 { - if l > 0 { - h.CustomValues = make([]float64, l) - } - for i := range h.CustomValues { - h.CustomValues[i] = buf.Be64Float64() - } + h.CustomValues = make([]float64, l) + } + for i := range h.CustomValues { + h.CustomValues[i] = buf.Be64Float64() } } } @@ -522,7 +532,7 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) { func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogramSample) ([]RefFloatHistogramSample, error) { dec := encoding.Decbuf{B: rec} t := Type(dec.Byte()) - if t != FloatHistogramSamples { + if t != FloatHistogramSamples && t != CustomBucketFloatHistogramSamples { return nil, errors.New("invalid record type") } if dec.Len() == 0 { @@ -603,12 +613,10 @@ func DecodeFloatHistogram(buf *encoding.Decbuf, fh *histogram.FloatHistogram) { if histogram.IsCustomBucketsSchema(fh.Schema) { l = buf.Uvarint() if l > 0 { - if l > 0 { - fh.CustomValues = make([]float64, l) - } - for i := range fh.CustomValues { - fh.CustomValues[i] = buf.Be64Float64() - } + fh.CustomValues = make([]float64, l) + } + for i := range fh.CustomValues { + fh.CustomValues[i] = buf.Be64Float64() } } } @@ -740,12 +748,15 @@ func (e *Encoder) MmapMarkers(markers []RefMmapMarker, b []byte) []byte { return buf.Get() } -func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) []byte { +func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]byte, []byte) { buf := encoding.Encbuf{B: b} buf.PutByte(byte(HistogramSamples)) + customBucketHistBuf := encoding.Encbuf{B: b} + customBucketHistBuf.PutByte(byte(CustomBucketHistogramSamples)) + if len(histograms) == 0 { - return buf.Get() + return buf.Get(), customBucketHistBuf.Get() } // Store base timestamp and base reference number of first histogram. @@ -754,14 +765,34 @@ func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) [] buf.PutBE64(uint64(first.Ref)) buf.PutBE64int64(first.T) - for _, h := range histograms { - buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) - buf.PutVarint64(h.T - first.T) + customBucketHistBuf.PutBE64(uint64(first.Ref)) + customBucketHistBuf.PutBE64int64(first.T) - EncodeHistogram(&buf, h.H) + histsAdded := 0 + customBucketHistsAdded := 0 + for _, h := range histograms { + if h.H.UsesCustomBuckets() { + customBucketHistBuf.PutVarint64(int64(h.Ref) - int64(first.Ref)) + customBucketHistBuf.PutVarint64(h.T - first.T) + + EncodeHistogram(&customBucketHistBuf, h.H) + customBucketHistsAdded++ + } else { + buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) + buf.PutVarint64(h.T - first.T) + + EncodeHistogram(&buf, h.H) + histsAdded++ + } } - return buf.Get() + if customBucketHistsAdded == 0 { + customBucketHistBuf.Reset() + } else if histsAdded == 0 { + buf.Reset() + } + + return buf.Get(), customBucketHistBuf.Get() } // EncodeHistogram encodes a Histogram into a byte slice. @@ -805,12 +836,15 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) { } } -func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte { +func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) ([]byte, []byte) { buf := encoding.Encbuf{B: b} buf.PutByte(byte(FloatHistogramSamples)) + customBucketHistBuf := encoding.Encbuf{B: b} + customBucketHistBuf.PutByte(byte(CustomBucketFloatHistogramSamples)) + if len(histograms) == 0 { - return buf.Get() + return buf.Get(), customBucketHistBuf.Get() } // Store base timestamp and base reference number of first histogram. @@ -819,14 +853,34 @@ func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b buf.PutBE64(uint64(first.Ref)) buf.PutBE64int64(first.T) - for _, h := range histograms { - buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) - buf.PutVarint64(h.T - first.T) + customBucketHistBuf.PutBE64(uint64(first.Ref)) + customBucketHistBuf.PutBE64int64(first.T) - EncodeFloatHistogram(&buf, h.FH) + histsAdded := 0 + customBucketHistsAdded := 0 + for _, h := range histograms { + if h.FH.UsesCustomBuckets() { + customBucketHistBuf.PutVarint64(int64(h.Ref) - int64(first.Ref)) + customBucketHistBuf.PutVarint64(h.T - first.T) + + EncodeFloatHistogram(&customBucketHistBuf, h.FH) + customBucketHistsAdded++ + } else { + buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) + buf.PutVarint64(h.T - first.T) + + EncodeFloatHistogram(&buf, h.FH) + histsAdded++ + } } - return buf.Get() + if customBucketHistsAdded == 0 { + customBucketHistBuf.Reset() + } else if histsAdded == 0 { + buf.Reset() + } + + return buf.Get(), customBucketHistBuf.Get() } // EncodeFloatHistogram encodes the Float Histogram into a byte slice. diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index f3a657aecb..67c7eab970 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -148,10 +148,30 @@ func TestRecord_EncodeDecode(t *testing.T) { NegativeBuckets: []int64{1, 2, -1}, }, }, + { + Ref: 67, + T: 5678, + H: &histogram.Histogram{ + Count: 8, + ZeroThreshold: 0.001, + Sum: 35.5, + Schema: -53, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []int64{2, -1, 2, 0}, + CustomValues: []float64{0, 2, 4, 6, 8}, + }, + }, } - decHistograms, err := dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil) + histSamples, customBucketHistSamples := enc.HistogramSamples(histograms, nil) + decHistograms, err := dec.HistogramSamples(histSamples, nil) require.NoError(t, err) + decCustomBucketHistograms, err := dec.HistogramSamples(customBucketHistSamples, nil) + require.NoError(t, err) + decHistograms = append(decHistograms, decCustomBucketHistograms...) require.Equal(t, histograms, decHistograms) floatHistograms := make([]RefFloatHistogramSample, len(histograms)) @@ -162,24 +182,36 @@ func TestRecord_EncodeDecode(t *testing.T) { FH: h.H.ToFloat(nil), } } - decFloatHistograms, err := dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil) + histSamples, customBucketFloatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil) + decFloatHistograms, err := dec.FloatHistogramSamples(histSamples, nil) require.NoError(t, err) + decCustomBucketFloatHistograms, err := dec.FloatHistogramSamples(customBucketFloatHistSamples, nil) + decFloatHistograms = append(decFloatHistograms, decCustomBucketFloatHistograms...) require.Equal(t, floatHistograms, decFloatHistograms) // Gauge integer histograms. for i := range histograms { histograms[i].H.CounterResetHint = histogram.GaugeType } - decHistograms, err = dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil) + + gaugeHistSamples, customBucketGaugeHistSamples := enc.HistogramSamples(histograms, nil) + decGaugeHistograms, err := dec.HistogramSamples(gaugeHistSamples, nil) require.NoError(t, err) - require.Equal(t, histograms, decHistograms) + decCustomBucketGaugeHistograms, err := dec.HistogramSamples(customBucketGaugeHistSamples, nil) + require.NoError(t, err) + decGaugeHistograms = append(decGaugeHistograms, decCustomBucketGaugeHistograms...) + require.Equal(t, histograms, decGaugeHistograms) // Gauge float histograms. for i := range floatHistograms { floatHistograms[i].FH.CounterResetHint = histogram.GaugeType } - decFloatHistograms, err = dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil) + + gaugeHistSamples, customBucketGaugeFloatHistSamples := enc.FloatHistogramSamples(floatHistograms, nil) + decGaugeFloatHistograms, err := dec.FloatHistogramSamples(gaugeHistSamples, nil) require.NoError(t, err) + decCustomBucketGaugeFloatHistograms, err := dec.FloatHistogramSamples(customBucketGaugeFloatHistSamples, nil) + decFloatHistograms = append(decGaugeFloatHistograms, decCustomBucketGaugeFloatHistograms...) require.Equal(t, floatHistograms, decFloatHistograms) } @@ -265,8 +297,12 @@ func TestRecord_Corrupted(t *testing.T) { }, } - corrupted := enc.HistogramSamples(histograms, nil)[:8] - _, err := dec.HistogramSamples(corrupted, nil) + corruptedHists, corruptedCustomBucketHists := enc.HistogramSamples(histograms, nil) + corruptedHists = corruptedHists[:8] + corruptedCustomBucketHists = corruptedCustomBucketHists[:8] + _, err := dec.HistogramSamples(corruptedHists, nil) + require.ErrorIs(t, err, encoding.ErrInvalidSize) + _, err = dec.HistogramSamples(corruptedCustomBucketHists, nil) require.ErrorIs(t, err, encoding.ErrInvalidSize) }) } @@ -308,9 +344,28 @@ func TestRecord_Type(t *testing.T) { PositiveBuckets: []int64{1, 1, -1, 0}, }, }, + { + Ref: 67, + T: 5678, + H: &histogram.Histogram{ + Count: 8, + ZeroThreshold: 0.001, + Sum: 35.5, + Schema: -53, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []int64{2, -1, 2, 0}, + CustomValues: []float64{0, 2, 4, 6, 8}, + }, + }, } - recordType = dec.Type(enc.HistogramSamples(histograms, nil)) + hists, customBucketHists := enc.HistogramSamples(histograms, nil) + recordType = dec.Type(hists) require.Equal(t, HistogramSamples, recordType) + recordType = dec.Type(customBucketHists) + require.Equal(t, CustomBucketHistogramSamples, recordType) recordType = dec.Type(nil) require.Equal(t, Unknown, recordType) diff --git a/tsdb/testutil.go b/tsdb/testutil.go index c39eb133c7..a13d89186e 100644 --- a/tsdb/testutil.go +++ b/tsdb/testutil.go @@ -29,11 +29,13 @@ import ( ) const ( - float = "float" - intHistogram = "integer histogram" - floatHistogram = "float histogram" - gaugeIntHistogram = "gauge int histogram" - gaugeFloatHistogram = "gauge float histogram" + float = "float" + intHistogram = "integer histogram" + floatHistogram = "float histogram" + customBucketIntHistogram = "custom bucket int histogram" + customBucketFloatHistogram = "custom bucket float histogram" + gaugeIntHistogram = "gauge int histogram" + gaugeFloatHistogram = "gauge float histogram" ) type testValue struct { @@ -82,6 +84,28 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{ return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))} }, }, + customBucketIntHistogram: { + sampleType: sampleMetricTypeHistogram, + appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { + s := sample{t: ts, h: tsdbutil.GenerateTestCustomBucketsHistogram(int(value))} + ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil) + return ref, s, err + }, + sampleFunc: func(ts, value int64) sample { + return sample{t: ts, h: tsdbutil.GenerateTestCustomBucketsHistogram(int(value))} + }, + }, + customBucketFloatHistogram: { + sampleType: sampleMetricTypeHistogram, + appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { + s := sample{t: ts, fh: tsdbutil.GenerateTestCustomBucketsFloatHistogram(int(value))} + ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh) + return ref, s, err + }, + sampleFunc: func(ts, value int64) sample { + return sample{t: ts, fh: tsdbutil.GenerateTestCustomBucketsFloatHistogram(int(value))} + }, + }, gaugeIntHistogram: { sampleType: sampleMetricTypeHistogram, appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { diff --git a/tsdb/tsdbutil/histogram.go b/tsdb/tsdbutil/histogram.go index ce934a638d..4b6cebd579 100644 --- a/tsdb/tsdbutil/histogram.go +++ b/tsdb/tsdbutil/histogram.go @@ -57,6 +57,18 @@ func GenerateTestHistogram(i int) *histogram.Histogram { } } +func GenerateTestCustomBucketsHistograms(n int) (r []*histogram.Histogram) { + for i := 0; i < n; i++ { + h := GenerateTestCustomBucketsHistogram(i) + if i > 0 { + h.CounterResetHint = histogram.NotCounterReset + } + r = append(r, h) + + } + return r +} + func GenerateTestCustomBucketsHistogram(i int) *histogram.Histogram { return &histogram.Histogram{ Count: 5 + uint64(i*4), @@ -117,6 +129,17 @@ func GenerateTestFloatHistogram(i int) *histogram.FloatHistogram { } } +func GenerateTestCustomBucketsFloatHistograms(n int) (r []*histogram.FloatHistogram) { + for i := 0; i < n; i++ { + h := GenerateTestCustomBucketsFloatHistogram(i) + if i > 0 { + h.CounterResetHint = histogram.NotCounterReset + } + r = append(r, h) + } + return r +} + func GenerateTestCustomBucketsFloatHistogram(i int) *histogram.FloatHistogram { return &histogram.FloatHistogram{ Count: 5 + float64(i*4), diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 58e11c770e..cd82676da9 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -221,11 +221,27 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He } } if len(repl) > 0 { - buf = enc.HistogramSamples(repl, buf) + buf, _ = enc.HistogramSamples(repl, buf) + } + stats.TotalSamples += len(histogramSamples) + stats.DroppedSamples += len(histogramSamples) - len(repl) + case record.CustomBucketHistogramSamples: + histogramSamples, err = dec.HistogramSamples(rec, histogramSamples) + if err != nil { + return nil, fmt.Errorf("decode histogram samples: %w", err) + } + // Drop irrelevant histogramSamples in place. + repl := histogramSamples[:0] + for _, h := range histogramSamples { + if h.T >= mint { + repl = append(repl, h) + } + } + if len(repl) > 0 { + _, buf = enc.HistogramSamples(repl, buf) } stats.TotalSamples += len(histogramSamples) stats.DroppedSamples += len(histogramSamples) - len(repl) - case record.FloatHistogramSamples: floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples) if err != nil { @@ -239,11 +255,27 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He } } if len(repl) > 0 { - buf = enc.FloatHistogramSamples(repl, buf) + buf, _ = enc.FloatHistogramSamples(repl, buf) + } + stats.TotalSamples += len(floatHistogramSamples) + stats.DroppedSamples += len(floatHistogramSamples) - len(repl) + case record.CustomBucketFloatHistogramSamples: + floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples) + if err != nil { + return nil, fmt.Errorf("decode float histogram samples: %w", err) + } + // Drop irrelevant floatHistogramSamples in place. + repl := floatHistogramSamples[:0] + for _, fh := range floatHistogramSamples { + if fh.T >= mint { + repl = append(repl, fh) + } + } + if len(repl) > 0 { + _, buf = enc.FloatHistogramSamples(repl, buf) } stats.TotalSamples += len(floatHistogramSamples) stats.DroppedSamples += len(floatHistogramSamples) - len(repl) - case record.Tombstones: tstones, err = dec.Tombstones(rec, tstones) if err != nil { diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index 8ee193f5ac..a5692a9aa4 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -208,22 +208,24 @@ func TestCheckpoint(t *testing.T) { require.NoError(t, w.Log(b)) samplesInWAL += 4 h := makeHistogram(i) - b = enc.HistogramSamples([]record.RefHistogramSample{ + b1, b2 := enc.HistogramSamples([]record.RefHistogramSample{ {Ref: 0, T: last, H: h}, {Ref: 1, T: last + 10000, H: h}, {Ref: 2, T: last + 20000, H: h}, {Ref: 3, T: last + 30000, H: h}, }, nil) - require.NoError(t, w.Log(b)) + require.NoError(t, w.Log(b1)) + require.NoError(t, w.Log(b2)) histogramsInWAL += 4 fh := makeFloatHistogram(i) - b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{ + b1, b2 = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{ {Ref: 0, T: last, FH: fh}, {Ref: 1, T: last + 10000, FH: fh}, {Ref: 2, T: last + 20000, FH: fh}, {Ref: 3, T: last + 30000, FH: fh}, }, nil) - require.NoError(t, w.Log(b)) + require.NoError(t, w.Log(b1)) + require.NoError(t, w.Log(b2)) floatHistogramsInWAL += 4 b = enc.Exemplars([]record.RefExemplar{ diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 398b0f4414..c2499a7cec 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -209,19 +209,21 @@ func TestTailSamples(t *testing.T) { NegativeBuckets: []int64{int64(-i) - 1}, } - histogram := enc.HistogramSamples([]record.RefHistogramSample{{ + histogram, customBucketHistogram := enc.HistogramSamples([]record.RefHistogramSample{{ Ref: chunks.HeadSeriesRef(inner), T: now.UnixNano() + 1, H: hist, }}, nil) require.NoError(t, w.Log(histogram)) + require.NoError(t, w.Log(customBucketHistogram)) - floatHistogram := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ + floatHistogram, floatCustomBucketHistogram := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ Ref: chunks.HeadSeriesRef(inner), T: now.UnixNano() + 1, FH: hist.ToFloat(nil), }}, nil) require.NoError(t, w.Log(floatHistogram)) + require.NoError(t, w.Log(floatCustomBucketHistogram)) } }