From 73858d7f82d345aa45ccecaeed26fb640ef3117b Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 15 Nov 2021 21:49:25 +0100 Subject: [PATCH] storage: histogram support in memoized_iterator Signed-off-by: beorn7 --- model/histogram/histogram.go | 2 +- promql/engine.go | 41 ++++++++-------- promql/engine_test.go | 6 +-- storage/memoized_iterator.go | 77 +++++++++++++++++++++---------- storage/memoized_iterator_test.go | 21 +++++---- 5 files changed, 91 insertions(+), 56 deletions(-) diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index 42ce8eb99..91c1aa15c 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -47,7 +47,7 @@ type Histogram struct { ZeroCount uint64 // Total number of observations. Count uint64 - // Sum of observations. + // Sum of observations. This is also used as the stale marker. Sum float64 // Spans for positive and negative buckets (see Span below). PositiveSpans, NegativeSpans []Span diff --git a/promql/engine.go b/promql/engine.go index afe870f2d..a5400cb96 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/common/model" "github.com/uber/jaeger-client-go" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" @@ -1475,10 +1476,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { - _, v, ok := ev.vectorSelectorSingle(it, e, ts) + _, v, h, ok := ev.vectorSelectorSingle(it, e, ts) if ok { if ev.currentSamples < ev.maxSamples { - ss.Points = append(ss.Points, Point{V: v, T: ts}) + ss.Points = append(ss.Points, Point{V: v, H: h, T: ts}) ev.currentSamples++ } else { ev.error(ErrTooManySamples(env)) @@ -1601,11 +1602,11 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect for i, s := range node.Series { it.Reset(s.Iterator()) - t, v, ok := ev.vectorSelectorSingle(it, node, ts) + t, v, h, ok := ev.vectorSelectorSingle(it, node, ts) if ok { vec = append(vec, Sample{ Metric: node.Series[i].Labels(), - Point: Point{V: v, T: t}, + Point: Point{V: v, H: h, T: t}, }) ev.currentSamples++ @@ -1618,33 +1619,37 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect return vec, ws } -// vectorSelectorSingle evaluates a instant vector for the iterator of one time series. -func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) (int64, float64, bool) { +// vectorSelectorSingle evaluates an instant vector for the iterator of one time series. +func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) (int64, float64, *histogram.Histogram, bool) { refTime := ts - durationMilliseconds(node.Offset) var t int64 var v float64 + var h *histogram.Histogram - ok := it.Seek(refTime) - if !ok { + valueType := it.Seek(refTime) + switch valueType { + case storage.ValNone: if it.Err() != nil { ev.error(it.Err()) } - } - - if ok { + case storage.ValFloat: t, v = it.Values() + case storage.ValHistogram: + t, h = it.HistogramValues() + default: + panic(fmt.Errorf("unknown value type %v", valueType)) } - - if !ok || t > refTime { - t, v, ok = it.PeekPrev() + if valueType == storage.ValNone || t > refTime { + var ok bool + t, v, h, ok = it.PeekPrev() if !ok || t < refTime-durationMilliseconds(ev.lookbackDelta) { - return 0, 0, false + return 0, 0, nil, false } } - if value.IsStaleNaN(v) { - return 0, 0, false + if value.IsStaleNaN(v) || (h != nil && value.IsStaleNaN(h.Sum)) { + return 0, 0, nil, false } - return t, v, true + return t, v, h, true } var pointPool = sync.Pool{} diff --git a/promql/engine_test.go b/promql/engine_test.go index 886d2a92e..f484eb1e4 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -2455,11 +2455,11 @@ func TestSparseHistogramRate(t *testing.T) { require.NoError(t, test.Run()) engine := test.QueryEngine() - //queryString := fmt.Sprintf("rate(%s[1m])", seriesName) - queryString := fmt.Sprintf("%s", seriesName) + queryString := fmt.Sprintf("rate(%s[1m])", seriesName) + // TODO(beorn7): "%s" returns a float but "%s[1m]" returns matrix of histograms. qry, err := engine.NewInstantQuery(test.Queryable(), queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) require.NoError(t, err) res := qry.Exec(test.Context()) require.NoError(t, res.Err) - fmt.Println(res) + //fmt.Println(res) } diff --git a/storage/memoized_iterator.go b/storage/memoized_iterator.go index 9d4cc5207..7701238cd 100644 --- a/storage/memoized_iterator.go +++ b/storage/memoized_iterator.go @@ -16,20 +16,31 @@ package storage import ( "math" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" ) +// ValueType defines the type of a value in the storage. +type ValueType int + +const ( + ValNone ValueType = iota + ValFloat + ValHistogram +) + // MemoizedSeriesIterator wraps an iterator with a buffer to look back the previous element. type MemoizedSeriesIterator struct { it chunkenc.Iterator delta int64 - lastTime int64 - ok bool + lastTime int64 + valueType ValueType // Keep track of the previously returned value. - prevTime int64 - prevValue float64 + prevTime int64 + prevValue float64 + prevHistogram *histogram.Histogram } // NewMemoizedEmptyIterator is like NewMemoizedIterator but it's initialised with an empty iterator. @@ -53,22 +64,26 @@ func NewMemoizedIterator(it chunkenc.Iterator, delta int64) *MemoizedSeriesItera func (b *MemoizedSeriesIterator) Reset(it chunkenc.Iterator) { b.it = it b.lastTime = math.MinInt64 - b.ok = true b.prevTime = math.MinInt64 it.Next() + if it.ChunkEncoding() == chunkenc.EncHistogram { + b.valueType = ValHistogram + } else { + b.valueType = ValFloat + } } // PeekPrev returns the previous element of the iterator. If there is none buffered, // ok is false. -func (b *MemoizedSeriesIterator) PeekPrev() (t int64, v float64, ok bool) { +func (b *MemoizedSeriesIterator) PeekPrev() (t int64, v float64, h *histogram.Histogram, ok bool) { if b.prevTime == math.MinInt64 { - return 0, 0, false + return 0, 0, nil, false } - return b.prevTime, b.prevValue, true + return b.prevTime, b.prevValue, b.prevHistogram, true } // Seek advances the iterator to the element at time t or greater. -func (b *MemoizedSeriesIterator) Seek(t int64) bool { +func (b *MemoizedSeriesIterator) Seek(t int64) ValueType { t0 := t - b.delta if t0 > b.lastTime { @@ -76,52 +91,61 @@ func (b *MemoizedSeriesIterator) Seek(t int64) bool { // more than the delta. b.prevTime = math.MinInt64 - b.ok = b.it.Seek(t0) - if !b.ok { - return false + ok := b.it.Seek(t0) + if !ok { + b.valueType = ValNone + return ValNone } if b.it.ChunkEncoding() == chunkenc.EncHistogram { + b.valueType = ValHistogram b.lastTime, _ = b.it.AtHistogram() } else { + b.valueType = ValFloat b.lastTime, _ = b.it.At() } } if b.lastTime >= t { - return true + return b.valueType } - for b.Next() { + for b.Next() != ValNone { if b.lastTime >= t { - return true + return b.valueType } } - return false + return ValNone } // Next advances the iterator to the next element. -func (b *MemoizedSeriesIterator) Next() bool { - if !b.ok { - return false +func (b *MemoizedSeriesIterator) Next() ValueType { + if b.valueType == ValNone { + return ValNone } // Keep track of the previous element. if b.it.ChunkEncoding() == chunkenc.EncHistogram { - b.prevTime, b.prev + b.prevTime, b.prevHistogram = b.it.AtHistogram() + b.prevValue = 0 } else { b.prevTime, b.prevValue = b.it.At() + b.prevHistogram = nil } - b.ok = b.it.Next() - if b.ok { + ok := b.it.Next() + if ok { if b.it.ChunkEncoding() == chunkenc.EncHistogram { b.lastTime, _ = b.it.AtHistogram() + b.valueType = ValHistogram + } else { b.lastTime, _ = b.it.At() + b.valueType = ValFloat } + } else { + b.valueType = ValNone } - - return b.ok + return b.valueType } // Values returns the current element of the iterator. @@ -129,6 +153,11 @@ func (b *MemoizedSeriesIterator) Values() (int64, float64) { return b.it.At() } +// Values returns the current element of the iterator. +func (b *MemoizedSeriesIterator) HistogramValues() (int64, *histogram.Histogram) { + return b.it.AtHistogram() +} + // Err returns the last encountered error. func (b *MemoizedSeriesIterator) Err() error { return b.it.Err() diff --git a/storage/memoized_iterator_test.go b/storage/memoized_iterator_test.go index 849256587..ee5155199 100644 --- a/storage/memoized_iterator_test.go +++ b/storage/memoized_iterator_test.go @@ -20,6 +20,7 @@ import ( ) func TestMemoizedSeriesIterator(t *testing.T) { + // TODO(beorn7): Include histograms in testing. var it *MemoizedSeriesIterator sampleEq := func(ets int64, ev float64) { @@ -28,7 +29,7 @@ func TestMemoizedSeriesIterator(t *testing.T) { require.Equal(t, ev, v, "value mismatch") } prevSampleEq := func(ets int64, ev float64, eok bool) { - ts, v, ok := it.PeekPrev() + ts, v, _, ok := it.PeekPrev() require.Equal(t, eok, ok, "exist mismatch") require.Equal(t, ets, ts, "timestamp mismatch") require.Equal(t, ev, v, "value mismatch") @@ -45,29 +46,29 @@ func TestMemoizedSeriesIterator(t *testing.T) { sample{t: 101, v: 10}, }), 2) - require.True(t, it.Seek(-123), "seek failed") + require.Equal(t, it.Seek(-123), ValFloat, "seek failed") sampleEq(1, 2) prevSampleEq(0, 0, false) - require.True(t, it.Next(), "next failed") + require.Equal(t, it.Next(), ValFloat, "next failed") sampleEq(2, 3) prevSampleEq(1, 2, true) - require.True(t, it.Next(), "next failed") - require.True(t, it.Next(), "next failed") - require.True(t, it.Next(), "next failed") + require.Equal(t, it.Next(), ValFloat, "next failed") + require.Equal(t, it.Next(), ValFloat, "next failed") + require.Equal(t, it.Next(), ValFloat, "next failed") sampleEq(5, 6) prevSampleEq(4, 5, true) - require.True(t, it.Seek(5), "seek failed") + require.Equal(t, it.Seek(5), ValFloat, "seek failed") sampleEq(5, 6) prevSampleEq(4, 5, true) - require.True(t, it.Seek(101), "seek failed") + require.Equal(t, it.Seek(101), ValFloat, "seek failed") sampleEq(101, 10) prevSampleEq(100, 9, true) - require.False(t, it.Next(), "next succeeded unexpectedly") + require.Equal(t, it.Next(), ValNone, "next succeeded unexpectedly") } func BenchmarkMemoizedSeriesIterator(b *testing.B) { @@ -78,7 +79,7 @@ func BenchmarkMemoizedSeriesIterator(b *testing.B) { b.ReportAllocs() b.ResetTimer() - for it.Next() { + for it.Next() != ValNone { // scan everything } require.NoError(b, it.Err())