From c8b267efd6159c0f76608fffeeacf4f030fa1053 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 2 Nov 2021 20:31:32 +0530 Subject: [PATCH] Get histograms from TSDB to the rate() function implementation Signed-off-by: Ganesh Vernekar --- promql/engine.go | 63 ++++++++++++++++++++++++--------- promql/engine_test.go | 32 +++++++++++++++++ promql/test_test.go | 10 +++--- promql/value.go | 3 ++ rules/manager.go | 2 +- storage/buffer.go | 77 +++++++++++++++++++++++++++++++---------- storage/series.go | 23 +++++++----- tsdb/compact_test.go | 6 ++-- tsdb/head.go | 23 +++++++++++- tsdb/head_read.go | 41 +++++++++++++++------- tsdb/head_test.go | 31 ++++------------- tsdb/tsdbutil/buffer.go | 5 +++ tsdb/tsdbutil/chunks.go | 4 ++- 13 files changed, 229 insertions(+), 91 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index bd3d836a31..be7913a813 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -40,6 +40,7 @@ import ( "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/stats" ) @@ -1735,29 +1736,57 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m } buf := it.Buffer() - for buf.Next() { - t, v := buf.At() - if value.IsStaleNaN(v) { - continue - } - // Values in the buffer are guaranteed to be smaller than maxt. - if t >= mint { - if ev.currentSamples >= ev.maxSamples { - ev.error(ErrTooManySamples(env)) + if it.ChunkEncoding() == chunkenc.EncHistogram { + for buf.Next() { + t, h := buf.AtHistogram() + if value.IsStaleNaN(h.Sum) { + continue + } + // Values in the buffer are guaranteed to be smaller than maxt. + if t >= mint { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + ev.currentSamples++ + out = append(out, Point{T: t, H: &h}) + } + } + } else { + for buf.Next() { + t, v := buf.At() + if value.IsStaleNaN(v) { + continue + } + // Values in the buffer are guaranteed to be smaller than maxt. + if t >= mint { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + ev.currentSamples++ + out = append(out, Point{T: t, V: v}) } - ev.currentSamples++ - out = append(out, Point{T: t, V: v}) } } // The seeked sample might also be in the range. if ok { - t, v := it.Values() - if t == maxt && !value.IsStaleNaN(v) { - if ev.currentSamples >= ev.maxSamples { - ev.error(ErrTooManySamples(env)) + if it.ChunkEncoding() == chunkenc.EncHistogram { + t, h := it.HistogramValues() + if t == maxt && !value.IsStaleNaN(h.Sum) { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + out = append(out, Point{T: t, H: &h}) + ev.currentSamples++ + } + } else { + t, v := it.Values() + if t == maxt && !value.IsStaleNaN(v) { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + out = append(out, Point{T: t, V: v}) + ev.currentSamples++ } - out = append(out, Point{T: t, V: v}) - ev.currentSamples++ } } return out diff --git a/promql/engine_test.go b/promql/engine_test.go index c4eb07d523..88eed7f8bb 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -16,6 +16,7 @@ package promql import ( "context" "errors" + "fmt" "io/ioutil" "os" "sort" @@ -30,6 +31,7 @@ import ( "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" ) func TestMain(m *testing.M) { @@ -2429,3 +2431,33 @@ func TestRangeQuery(t *testing.T) { }) } } + +func TestSparseHistogramRate(t *testing.T) { + // Currently, this test it to only find panics or errors in the engine execution path. + // The panic stack trace will mostly tell you what code path is breaking and needs fixing for + // fetching the raw histograms and passing it rightly upto the rate() function implementation. + // TODO: Check the result for correctness once implementation is ready. + + test, err := NewTest(t, "") + require.NoError(t, err) + defer test.Close() + + seriesName := "sparse_histogram_series" + lbls := labels.FromStrings("__name__", seriesName) + + app := test.Storage().Appender(context.TODO()) + for i, h := range tsdb.GenerateTestHistograms(100) { + _, err := app.AppendHistogram(0, lbls, int64(i)*int64(15*time.Second/time.Millisecond), h) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + require.NoError(t, test.Run()) + engine := test.QueryEngine() + + queryString := fmt.Sprintf("rate(%s[1m])", seriesName) + qry, err := engine.NewInstantQuery(test.Queryable(), queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) + require.NoError(t, err) + res := qry.Exec(test.Context()) + require.NoError(t, res.Err) +} diff --git a/promql/test_test.go b/promql/test_test.go index ec50e57532..ec2bac1b1e 100644 --- a/promql/test_test.go +++ b/promql/test_test.go @@ -47,7 +47,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { { Metric: labels.FromStrings("__name__", "metric1"), Points: []Point{ - {0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5}, + {0, 1, nil}, {10000, 2, nil}, {20000, 3, nil}, {30000, 4, nil}, {40000, 5, nil}, }, }, }, @@ -58,7 +58,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { { Metric: labels.FromStrings("__name__", "metric1"), Points: []Point{ - {0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5}, + {0, 1, nil}, {10000, 2, nil}, {20000, 3, nil}, {30000, 4, nil}, {40000, 5, nil}, }, }, }, @@ -69,7 +69,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { { Metric: labels.FromStrings("__name__", "metric1"), Points: []Point{ - {0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5}, {50000, 6}, {60000, 7}, + {0, 1, nil}, {10000, 2, nil}, {20000, 3, nil}, {30000, 4, nil}, {40000, 5, nil}, {50000, 6, nil}, {60000, 7, nil}, }, }, }, @@ -89,13 +89,13 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { { Metric: labels.FromStrings("__name__", "metric1"), Points: []Point{ - {0, 1}, {10000, 1}, {20000, 1}, {30000, 1}, {40000, 1}, {50000, 1}, + {0, 1, nil}, {10000, 1, nil}, {20000, 1, nil}, {30000, 1, nil}, {40000, 1, nil}, {50000, 1, nil}, }, }, { Metric: labels.FromStrings("__name__", "metric2"), Points: []Point{ - {0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5}, {50000, 6}, {60000, 7}, {70000, 8}, + {0, 1, nil}, {10000, 2, nil}, {20000, 3, nil}, {30000, 4, nil}, {40000, 5, nil}, {50000, 6, nil}, {60000, 7, nil}, {70000, 8, nil}, }, }, }, diff --git a/promql/value.go b/promql/value.go index 3a724d0533..63590042b6 100644 --- a/promql/value.go +++ b/promql/value.go @@ -78,9 +78,12 @@ func (s Series) String() string { } // Point represents a single data point for a given timestamp. +// If H is not nil, then this is a histogram point and only (T, H) is valid. +// If H is nil, then only (T, V) is valid. type Point struct { T int64 V float64 + H *histogram.Histogram } func (p Point) String() string { diff --git a/rules/manager.go b/rules/manager.go index fa8cd6763a..a7863a60e3 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -197,7 +197,7 @@ func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc { return v, nil case promql.Scalar: return promql.Vector{promql.Sample{ - Point: promql.Point(v), + Point: promql.Point{T: v.T, V: v.V}, Metric: labels.Labels{}, }}, nil default: diff --git a/storage/buffer.go b/storage/buffer.go index 2d06d3fb26..45bfe0645c 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -40,8 +40,9 @@ func NewBuffer(delta int64) *BufferedSeriesIterator { // NewBufferIterator returns a new iterator that buffers the values within the // time range of the current element and the duration of delta before. func NewBufferIterator(it chunkenc.Iterator, delta int64) *BufferedSeriesIterator { + // TODO(codesome): based on encoding, allocate different buffer. bit := &BufferedSeriesIterator{ - buf: newSampleRing(delta, 16), + buf: newSampleRing(delta, 16, it.ChunkEncoding()), delta: delta, } bit.Reset(it) @@ -67,8 +68,9 @@ func (b *BufferedSeriesIterator) ReduceDelta(delta int64) bool { // PeekBack returns the nth previous element of the iterator. If there is none buffered, // ok is false. -func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, ok bool) { - return b.buf.nthLast(n) +func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, h *histogram.Histogram, ok bool) { + s, ok := b.buf.nthLast(n) + return s.t, s.v, s.h, ok } // Buffer returns an iterator over the buffered data. Invalidates previously @@ -90,7 +92,11 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool { if !b.ok { return false } - b.lastTime, _ = b.Values() + if b.it.ChunkEncoding() == chunkenc.EncHistogram { + b.lastTime, _ = b.HistogramValues() + } else { + b.lastTime, _ = b.Values() + } } if b.lastTime >= t { @@ -112,11 +118,21 @@ func (b *BufferedSeriesIterator) Next() bool { } // Add current element to buffer before advancing. - b.buf.add(b.it.At()) + if b.it.ChunkEncoding() == chunkenc.EncHistogram { + t, h := b.it.AtHistogram() + b.buf.add(sample{t: t, h: &h}) + } else { + t, v := b.it.At() + b.buf.add(sample{t: t, v: v}) + } b.ok = b.it.Next() if b.ok { - b.lastTime, _ = b.Values() + if b.it.ChunkEncoding() == chunkenc.EncHistogram { + b.lastTime, _ = b.HistogramValues() + } else { + b.lastTime, _ = b.Values() + } } return b.ok @@ -127,6 +143,16 @@ func (b *BufferedSeriesIterator) Values() (int64, float64) { return b.it.At() } +// HistogramValues returns the current histogram element of the iterator. +func (b *BufferedSeriesIterator) HistogramValues() (int64, histogram.Histogram) { + return b.it.AtHistogram() +} + +// ChunkEncoding return the chunk encoding of the underlying iterator. +func (b *BufferedSeriesIterator) ChunkEncoding() chunkenc.Encoding { + return b.it.ChunkEncoding() +} + // Err returns the last encountered error. func (b *BufferedSeriesIterator) Err() error { return b.it.Err() @@ -135,6 +161,7 @@ func (b *BufferedSeriesIterator) Err() error { type sample struct { t int64 v float64 + h *histogram.Histogram } func (s sample) T() int64 { @@ -145,9 +172,14 @@ func (s sample) V() float64 { return s.v } +func (s sample) H() *histogram.Histogram { + return s.h +} + type sampleRing struct { delta int64 + enc chunkenc.Encoding buf []sample // lookback buffer i int // position of most recent element in ring buffer f int // position of first element in ring buffer @@ -156,8 +188,8 @@ type sampleRing struct { it sampleRingIterator } -func newSampleRing(delta int64, sz int) *sampleRing { - r := &sampleRing{delta: delta, buf: make([]sample, sz)} +func newSampleRing(delta int64, sz int, enc chunkenc.Encoding) *sampleRing { + r := &sampleRing{delta: delta, buf: make([]sample, sz), enc: enc} r.reset() return r @@ -200,13 +232,12 @@ func (it *sampleRingIterator) At() (int64, float64) { // AtHistogram always returns (0, histogram.Histogram{}) because there is no // support for histogram values yet. -// TODO(beorn7): Fix that for histogram support in PromQL. func (it *sampleRingIterator) AtHistogram() (int64, histogram.Histogram) { - return 0, histogram.Histogram{} + return it.r.atHistogram(it.i) } func (it *sampleRingIterator) ChunkEncoding() chunkenc.Encoding { - return chunkenc.EncXOR + return it.r.enc } func (r *sampleRing) at(i int) (int64, float64) { @@ -215,9 +246,20 @@ func (r *sampleRing) at(i int) (int64, float64) { return s.t, s.v } +func (r *sampleRing) atHistogram(i int) (int64, histogram.Histogram) { + j := (r.f + i) % len(r.buf) + s := r.buf[j] + return s.t, *s.h +} + +func (r *sampleRing) atSample(i int) sample { + j := (r.f + i) % len(r.buf) + return r.buf[j] +} + // add adds a sample to the ring buffer and frees all samples that fall // out of the delta range. -func (r *sampleRing) add(t int64, v float64) { +func (r *sampleRing) add(s sample) { l := len(r.buf) // Grow the ring buffer if it fits no more elements. if l == r.l { @@ -236,11 +278,11 @@ func (r *sampleRing) add(t int64, v float64) { } } - r.buf[r.i] = sample{t: t, v: v} + r.buf[r.i] = s r.l++ // Free head of the buffer of samples that just fell out of the range. - tmin := t - r.delta + tmin := s.t - r.delta for r.buf[r.f].t < tmin { r.f++ if r.f >= l { @@ -276,12 +318,11 @@ func (r *sampleRing) reduceDelta(delta int64) bool { } // nthLast returns the nth most recent element added to the ring. -func (r *sampleRing) nthLast(n int) (int64, float64, bool) { +func (r *sampleRing) nthLast(n int) (sample, bool) { if n > r.l { - return 0, 0, false + return sample{}, false } - t, v := r.at(r.l - n) - return t, v, true + return r.atSample(r.l - n), true } func (r *sampleRing) samples() []sample { diff --git a/storage/series.go b/storage/series.go index 60541164b4..d597a3b90e 100644 --- a/storage/series.go +++ b/storage/series.go @@ -297,19 +297,26 @@ func (e errChunksIterator) Err() error { return e.err } // ExpandSamples iterates over all samples in the iterator, buffering all in slice. // Optionally it takes samples constructor, useful when you want to compare sample slices with different // sample implementations. if nil, sample type from this package will be used. -func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, v float64) tsdbutil.Sample) ([]tsdbutil.Sample, error) { +func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, v float64, h *histogram.Histogram) tsdbutil.Sample) ([]tsdbutil.Sample, error) { if newSampleFn == nil { - newSampleFn = func(t int64, v float64) tsdbutil.Sample { return sample{t, v} } + newSampleFn = func(t int64, v float64, h *histogram.Histogram) tsdbutil.Sample { return sample{t, v, h} } } var result []tsdbutil.Sample - for iter.Next() { - t, v := iter.At() - // NaNs can't be compared normally, so substitute for another value. - if math.IsNaN(v) { - v = -42 + if iter.ChunkEncoding() == chunkenc.EncHistogram { + for iter.Next() { + t, h := iter.AtHistogram() + result = append(result, newSampleFn(t, 0, &h)) + } + } else { + for iter.Next() { + t, v := iter.At() + // NaNs can't be compared normally, so substitute for another value. + if math.IsNaN(v) { + v = -42 + } + result = append(result, newSampleFn(t, v, nil)) } - result = append(result, newSampleFn(t, v)) } return result, iter.Err() } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index e9ac42077e..5f3d2374b6 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1336,7 +1336,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) { timeStep := DefaultBlockDuration / int64(numHistograms) expHists := make([]timedHist, 0, numHistograms) l := labels.Labels{{Name: "a", Value: "b"}} - for i, h := range generateHistograms(numHistograms) { + for i, h := range GenerateTestHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, int64(i)*timeStep, h) require.NoError(t, err) expHists = append(expHists, timedHist{int64(i) * timeStep, h}) @@ -1715,8 +1715,8 @@ func TestSparseHistogramCompactionAndQuery(t *testing.T) { } expHists := make(map[string][]timedHist) - series1Histograms := generateHistograms(20) - series2Histograms := generateHistograms(20) + series1Histograms := GenerateTestHistograms(20) + series2Histograms := GenerateTestHistograms(20) idx1, idx2 := -1, -1 addNextHists := func(ts int64, app storage.Appender) { lbls1 := labels.Labels{{Name: "a", Value: "b"}} diff --git a/tsdb/head.go b/tsdb/head.go index 989049d27f..93e6d568b2 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1496,11 +1496,13 @@ type histogramSample struct { type sample struct { t int64 v float64 + h *histogram.Histogram } -func newSample(t int64, v float64) tsdbutil.Sample { return sample{t, v} } +func newSample(t int64, v float64) tsdbutil.Sample { return sample{t, v, nil} } func (s sample) T() int64 { return s.t } func (s sample) V() float64 { return s.v } +func (s sample) H() *histogram.Histogram { return s.h } // memSeries is the in-memory representation of a series. None of its methods // are goroutine safe and it is the caller's responsibility to lock it. @@ -1658,3 +1660,22 @@ func (h *Head) updateWALReplayStatusRead(current int) { h.stats.WALReplayStatus.Current = current } + +func GenerateTestHistograms(n int) (r []histogram.Histogram) { + for i := 0; i < n; i++ { + r = append(r, histogram.Histogram{ + Count: 5 + uint64(i*4), + ZeroCount: 2 + uint64(i), + ZeroThreshold: 0.001, + Sum: 18.4 * float64(i+1), + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{int64(i + 1), 1, -1, 0}, + }) + } + + return r +} diff --git a/tsdb/head_read.go b/tsdb/head_read.go index c9d496f784..7c20c43ad7 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -444,6 +444,7 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper * msIter.stopAfter = stopAfter msIter.buf = s.sampleBuf msIter.histogramBuf = s.histogramBuf + msIter.histogramSeries = s.histogramSeries return msIter } return &memSafeIterator{ @@ -452,18 +453,20 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper * i: -1, stopAfter: stopAfter, }, - total: numSamples, - buf: s.sampleBuf, - histogramBuf: s.histogramBuf, + total: numSamples, + buf: s.sampleBuf, + histogramBuf: s.histogramBuf, + histogramSeries: s.histogramSeries, } } type memSafeIterator struct { stopIterator - total int - buf [4]sample - histogramBuf [4]histogramSample + histogramSeries bool + total int + buf [4]sample + histogramBuf [4]histogramSample } func (it *memSafeIterator) Seek(t int64) bool { @@ -471,15 +474,29 @@ func (it *memSafeIterator) Seek(t int64) bool { return false } - ts, _ := it.At() - - for t > ts || it.i == -1 { - if !it.Next() { - return false - } + var ts int64 + if it.histogramSeries { + ts, _ = it.AtHistogram() + } else { ts, _ = it.At() } + if it.histogramSeries { + for t > ts || it.i == -1 { + if !it.Next() { + return false + } + ts, _ = it.AtHistogram() + } + } else { + for t > ts || it.i == -1 { + if !it.Next() { + return false + } + ts, _ = it.At() + } + } + return true } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 4eac434d3d..7831ad9d85 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2544,7 +2544,7 @@ func TestAppendHistogram(t *testing.T) { h histogram.Histogram } expHistograms := make([]timedHistogram, 0, numHistograms) - for i, h := range generateHistograms(numHistograms) { + for i, h := range GenerateTestHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, int64(i), h) require.NoError(t, err) expHistograms = append(expHistograms, timedHistogram{int64(i), h}) @@ -2591,7 +2591,7 @@ func TestHistogramInWAL(t *testing.T) { h histogram.Histogram } expHistograms := make([]timedHistogram, 0, numHistograms) - for i, h := range generateHistograms(numHistograms) { + for i, h := range GenerateTestHistograms(numHistograms) { h.NegativeSpans = h.PositiveSpans h.NegativeBuckets = h.PositiveBuckets _, err := app.AppendHistogram(0, l, int64(i), h) @@ -2630,25 +2630,6 @@ func TestHistogramInWAL(t *testing.T) { require.Equal(t, expHistograms, actHistograms) } -func generateHistograms(n int) (r []histogram.Histogram) { - for i := 0; i < n; i++ { - r = append(r, histogram.Histogram{ - Count: 5 + uint64(i*4), - ZeroCount: 2 + uint64(i), - ZeroThreshold: 0.001, - Sum: 18.4 * float64(i+1), - Schema: 1, - PositiveSpans: []histogram.Span{ - {Offset: 0, Length: 2}, - {Offset: 1, Length: 2}, - }, - PositiveBuckets: []int64{int64(i + 1), 1, -1, 0}, - }) - } - - return r -} - func TestChunkSnapshot(t *testing.T) { head, _ := newTestHead(t, 120*4, false) defer func() { @@ -2962,7 +2943,7 @@ func TestHistogramMetrics(t *testing.T) { for x := 0; x < 5; x++ { expHSeries++ l := labels.Labels{{Name: "a", Value: fmt.Sprintf("b%d", x)}} - for i, h := range generateHistograms(10) { + for i, h := range GenerateTestHistograms(10) { app := head.Appender(context.Background()) _, err := app.AppendHistogram(0, l, int64(i), h) require.NoError(t, err) @@ -3039,7 +3020,7 @@ func TestHistogramStaleSample(t *testing.T) { // Adding stale in the same appender. app := head.Appender(context.Background()) - for _, h := range generateHistograms(numHistograms) { + for _, h := range GenerateTestHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h) require.NoError(t, err) expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h}) @@ -3058,7 +3039,7 @@ func TestHistogramStaleSample(t *testing.T) { // Adding stale in different appender and continuing series after a stale sample. app = head.Appender(context.Background()) - for _, h := range generateHistograms(2 * numHistograms)[numHistograms:] { + for _, h := range GenerateTestHistograms(2 * numHistograms)[numHistograms:] { _, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h) require.NoError(t, err) expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h}) @@ -3112,7 +3093,7 @@ func TestHistogramCounterResetHeader(t *testing.T) { require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) } - h := generateHistograms(1)[0] + h := GenerateTestHistograms(1)[0] if len(h.NegativeBuckets) == 0 { h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...) h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...) diff --git a/tsdb/tsdbutil/buffer.go b/tsdb/tsdbutil/buffer.go index 2c9bbb10bd..5dde74835d 100644 --- a/tsdb/tsdbutil/buffer.go +++ b/tsdb/tsdbutil/buffer.go @@ -102,6 +102,7 @@ func (b *BufferedSeriesIterator) Err() error { type sample struct { t int64 v float64 + h *histogram.Histogram } func (s sample) T() int64 { @@ -112,6 +113,10 @@ func (s sample) V() float64 { return s.v } +func (s sample) H() *histogram.Histogram { + return s.h +} + type sampleRing struct { delta int64 diff --git a/tsdb/tsdbutil/chunks.go b/tsdb/tsdbutil/chunks.go index 5ae58b0a8c..5b4e954ae5 100644 --- a/tsdb/tsdbutil/chunks.go +++ b/tsdb/tsdbutil/chunks.go @@ -14,6 +14,7 @@ package tsdbutil import ( + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" ) @@ -26,6 +27,7 @@ type Samples interface { type Sample interface { T() int64 V() float64 + H() *histogram.Histogram } type SampleSlice []Sample @@ -61,7 +63,7 @@ func ChunkFromSamplesGeneric(s Samples) chunks.Meta { func PopulatedChunk(numSamples int, minTime int64) chunks.Meta { samples := make([]Sample, numSamples) for i := 0; i < numSamples; i++ { - samples[i] = sample{minTime + int64(i*1000), 1.0} + samples[i] = sample{t: minTime + int64(i*1000), v: 1.0} } return ChunkFromSamples(samples) }