storage: histogram support in memoized_iterator

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2021-11-15 21:49:25 +01:00
parent 9b30ca2598
commit 73858d7f82
5 changed files with 91 additions and 56 deletions

View file

@ -47,7 +47,7 @@ type Histogram struct {
ZeroCount uint64 ZeroCount uint64
// Total number of observations. // Total number of observations.
Count uint64 Count uint64
// Sum of observations. // Sum of observations. This is also used as the stale marker.
Sum float64 Sum float64
// Spans for positive and negative buckets (see Span below). // Spans for positive and negative buckets (see Span below).
PositiveSpans, NegativeSpans []Span PositiveSpans, NegativeSpans []Span

View file

@ -35,6 +35,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/pkg/value"
@ -1475,10 +1476,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
} }
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
_, v, ok := ev.vectorSelectorSingle(it, e, ts) _, v, h, ok := ev.vectorSelectorSingle(it, e, ts)
if ok { if ok {
if ev.currentSamples < ev.maxSamples { if ev.currentSamples < ev.maxSamples {
ss.Points = append(ss.Points, Point{V: v, T: ts}) ss.Points = append(ss.Points, Point{V: v, H: h, T: ts})
ev.currentSamples++ ev.currentSamples++
} else { } else {
ev.error(ErrTooManySamples(env)) ev.error(ErrTooManySamples(env))
@ -1601,11 +1602,11 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect
for i, s := range node.Series { for i, s := range node.Series {
it.Reset(s.Iterator()) it.Reset(s.Iterator())
t, v, ok := ev.vectorSelectorSingle(it, node, ts) t, v, h, ok := ev.vectorSelectorSingle(it, node, ts)
if ok { if ok {
vec = append(vec, Sample{ vec = append(vec, Sample{
Metric: node.Series[i].Labels(), Metric: node.Series[i].Labels(),
Point: Point{V: v, T: t}, Point: Point{V: v, H: h, T: t},
}) })
ev.currentSamples++ ev.currentSamples++
@ -1618,33 +1619,37 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect
return vec, ws return vec, ws
} }
// vectorSelectorSingle evaluates a instant vector for the iterator of one time series. // vectorSelectorSingle evaluates an instant vector for the iterator of one time series.
func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) (int64, float64, bool) { func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) (int64, float64, *histogram.Histogram, bool) {
refTime := ts - durationMilliseconds(node.Offset) refTime := ts - durationMilliseconds(node.Offset)
var t int64 var t int64
var v float64 var v float64
var h *histogram.Histogram
ok := it.Seek(refTime) valueType := it.Seek(refTime)
if !ok { switch valueType {
case storage.ValNone:
if it.Err() != nil { if it.Err() != nil {
ev.error(it.Err()) ev.error(it.Err())
} }
} case storage.ValFloat:
if ok {
t, v = it.Values() t, v = it.Values()
case storage.ValHistogram:
t, h = it.HistogramValues()
default:
panic(fmt.Errorf("unknown value type %v", valueType))
} }
if valueType == storage.ValNone || t > refTime {
if !ok || t > refTime { var ok bool
t, v, ok = it.PeekPrev() t, v, h, ok = it.PeekPrev()
if !ok || t < refTime-durationMilliseconds(ev.lookbackDelta) { if !ok || t < refTime-durationMilliseconds(ev.lookbackDelta) {
return 0, 0, false return 0, 0, nil, false
} }
} }
if value.IsStaleNaN(v) { if value.IsStaleNaN(v) || (h != nil && value.IsStaleNaN(h.Sum)) {
return 0, 0, false return 0, 0, nil, false
} }
return t, v, true return t, v, h, true
} }
var pointPool = sync.Pool{} var pointPool = sync.Pool{}

View file

@ -2455,11 +2455,11 @@ func TestSparseHistogramRate(t *testing.T) {
require.NoError(t, test.Run()) require.NoError(t, test.Run())
engine := test.QueryEngine() engine := test.QueryEngine()
//queryString := fmt.Sprintf("rate(%s[1m])", seriesName) queryString := fmt.Sprintf("rate(%s[1m])", seriesName)
queryString := fmt.Sprintf("%s", seriesName) // TODO(beorn7): "%s" returns a float but "%s[1m]" returns matrix of histograms.
qry, err := engine.NewInstantQuery(test.Queryable(), queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) qry, err := engine.NewInstantQuery(test.Queryable(), queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond)))
require.NoError(t, err) require.NoError(t, err)
res := qry.Exec(test.Context()) res := qry.Exec(test.Context())
require.NoError(t, res.Err) require.NoError(t, res.Err)
fmt.Println(res) //fmt.Println(res)
} }

View file

@ -16,20 +16,31 @@ package storage
import ( import (
"math" "math"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
) )
// ValueType defines the type of a value in the storage.
type ValueType int
const (
ValNone ValueType = iota
ValFloat
ValHistogram
)
// MemoizedSeriesIterator wraps an iterator with a buffer to look back the previous element. // MemoizedSeriesIterator wraps an iterator with a buffer to look back the previous element.
type MemoizedSeriesIterator struct { type MemoizedSeriesIterator struct {
it chunkenc.Iterator it chunkenc.Iterator
delta int64 delta int64
lastTime int64 lastTime int64
ok bool valueType ValueType
// Keep track of the previously returned value. // Keep track of the previously returned value.
prevTime int64 prevTime int64
prevValue float64 prevValue float64
prevHistogram *histogram.Histogram
} }
// NewMemoizedEmptyIterator is like NewMemoizedIterator but it's initialised with an empty iterator. // NewMemoizedEmptyIterator is like NewMemoizedIterator but it's initialised with an empty iterator.
@ -53,22 +64,26 @@ func NewMemoizedIterator(it chunkenc.Iterator, delta int64) *MemoizedSeriesItera
func (b *MemoizedSeriesIterator) Reset(it chunkenc.Iterator) { func (b *MemoizedSeriesIterator) Reset(it chunkenc.Iterator) {
b.it = it b.it = it
b.lastTime = math.MinInt64 b.lastTime = math.MinInt64
b.ok = true
b.prevTime = math.MinInt64 b.prevTime = math.MinInt64
it.Next() it.Next()
if it.ChunkEncoding() == chunkenc.EncHistogram {
b.valueType = ValHistogram
} else {
b.valueType = ValFloat
}
} }
// PeekPrev returns the previous element of the iterator. If there is none buffered, // PeekPrev returns the previous element of the iterator. If there is none buffered,
// ok is false. // ok is false.
func (b *MemoizedSeriesIterator) PeekPrev() (t int64, v float64, ok bool) { func (b *MemoizedSeriesIterator) PeekPrev() (t int64, v float64, h *histogram.Histogram, ok bool) {
if b.prevTime == math.MinInt64 { if b.prevTime == math.MinInt64 {
return 0, 0, false return 0, 0, nil, false
} }
return b.prevTime, b.prevValue, true return b.prevTime, b.prevValue, b.prevHistogram, true
} }
// Seek advances the iterator to the element at time t or greater. // Seek advances the iterator to the element at time t or greater.
func (b *MemoizedSeriesIterator) Seek(t int64) bool { func (b *MemoizedSeriesIterator) Seek(t int64) ValueType {
t0 := t - b.delta t0 := t - b.delta
if t0 > b.lastTime { if t0 > b.lastTime {
@ -76,52 +91,61 @@ func (b *MemoizedSeriesIterator) Seek(t int64) bool {
// more than the delta. // more than the delta.
b.prevTime = math.MinInt64 b.prevTime = math.MinInt64
b.ok = b.it.Seek(t0) ok := b.it.Seek(t0)
if !b.ok { if !ok {
return false b.valueType = ValNone
return ValNone
} }
if b.it.ChunkEncoding() == chunkenc.EncHistogram { if b.it.ChunkEncoding() == chunkenc.EncHistogram {
b.valueType = ValHistogram
b.lastTime, _ = b.it.AtHistogram() b.lastTime, _ = b.it.AtHistogram()
} else { } else {
b.valueType = ValFloat
b.lastTime, _ = b.it.At() b.lastTime, _ = b.it.At()
} }
} }
if b.lastTime >= t { if b.lastTime >= t {
return true return b.valueType
} }
for b.Next() { for b.Next() != ValNone {
if b.lastTime >= t { if b.lastTime >= t {
return true return b.valueType
} }
} }
return false return ValNone
} }
// Next advances the iterator to the next element. // Next advances the iterator to the next element.
func (b *MemoizedSeriesIterator) Next() bool { func (b *MemoizedSeriesIterator) Next() ValueType {
if !b.ok { if b.valueType == ValNone {
return false return ValNone
} }
// Keep track of the previous element. // Keep track of the previous element.
if b.it.ChunkEncoding() == chunkenc.EncHistogram { if b.it.ChunkEncoding() == chunkenc.EncHistogram {
b.prevTime, b.prev b.prevTime, b.prevHistogram = b.it.AtHistogram()
b.prevValue = 0
} else { } else {
b.prevTime, b.prevValue = b.it.At() b.prevTime, b.prevValue = b.it.At()
b.prevHistogram = nil
} }
b.ok = b.it.Next() ok := b.it.Next()
if b.ok { if ok {
if b.it.ChunkEncoding() == chunkenc.EncHistogram { if b.it.ChunkEncoding() == chunkenc.EncHistogram {
b.lastTime, _ = b.it.AtHistogram() b.lastTime, _ = b.it.AtHistogram()
b.valueType = ValHistogram
} else { } else {
b.lastTime, _ = b.it.At() b.lastTime, _ = b.it.At()
b.valueType = ValFloat
} }
} else {
b.valueType = ValNone
} }
return b.valueType
return b.ok
} }
// Values returns the current element of the iterator. // Values returns the current element of the iterator.
@ -129,6 +153,11 @@ func (b *MemoizedSeriesIterator) Values() (int64, float64) {
return b.it.At() return b.it.At()
} }
// Values returns the current element of the iterator.
func (b *MemoizedSeriesIterator) HistogramValues() (int64, *histogram.Histogram) {
return b.it.AtHistogram()
}
// Err returns the last encountered error. // Err returns the last encountered error.
func (b *MemoizedSeriesIterator) Err() error { func (b *MemoizedSeriesIterator) Err() error {
return b.it.Err() return b.it.Err()

View file

@ -20,6 +20,7 @@ import (
) )
func TestMemoizedSeriesIterator(t *testing.T) { func TestMemoizedSeriesIterator(t *testing.T) {
// TODO(beorn7): Include histograms in testing.
var it *MemoizedSeriesIterator var it *MemoizedSeriesIterator
sampleEq := func(ets int64, ev float64) { sampleEq := func(ets int64, ev float64) {
@ -28,7 +29,7 @@ func TestMemoizedSeriesIterator(t *testing.T) {
require.Equal(t, ev, v, "value mismatch") require.Equal(t, ev, v, "value mismatch")
} }
prevSampleEq := func(ets int64, ev float64, eok bool) { prevSampleEq := func(ets int64, ev float64, eok bool) {
ts, v, ok := it.PeekPrev() ts, v, _, ok := it.PeekPrev()
require.Equal(t, eok, ok, "exist mismatch") require.Equal(t, eok, ok, "exist mismatch")
require.Equal(t, ets, ts, "timestamp mismatch") require.Equal(t, ets, ts, "timestamp mismatch")
require.Equal(t, ev, v, "value mismatch") require.Equal(t, ev, v, "value mismatch")
@ -45,29 +46,29 @@ func TestMemoizedSeriesIterator(t *testing.T) {
sample{t: 101, v: 10}, sample{t: 101, v: 10},
}), 2) }), 2)
require.True(t, it.Seek(-123), "seek failed") require.Equal(t, it.Seek(-123), ValFloat, "seek failed")
sampleEq(1, 2) sampleEq(1, 2)
prevSampleEq(0, 0, false) prevSampleEq(0, 0, false)
require.True(t, it.Next(), "next failed") require.Equal(t, it.Next(), ValFloat, "next failed")
sampleEq(2, 3) sampleEq(2, 3)
prevSampleEq(1, 2, true) prevSampleEq(1, 2, true)
require.True(t, it.Next(), "next failed") require.Equal(t, it.Next(), ValFloat, "next failed")
require.True(t, it.Next(), "next failed") require.Equal(t, it.Next(), ValFloat, "next failed")
require.True(t, it.Next(), "next failed") require.Equal(t, it.Next(), ValFloat, "next failed")
sampleEq(5, 6) sampleEq(5, 6)
prevSampleEq(4, 5, true) prevSampleEq(4, 5, true)
require.True(t, it.Seek(5), "seek failed") require.Equal(t, it.Seek(5), ValFloat, "seek failed")
sampleEq(5, 6) sampleEq(5, 6)
prevSampleEq(4, 5, true) prevSampleEq(4, 5, true)
require.True(t, it.Seek(101), "seek failed") require.Equal(t, it.Seek(101), ValFloat, "seek failed")
sampleEq(101, 10) sampleEq(101, 10)
prevSampleEq(100, 9, true) prevSampleEq(100, 9, true)
require.False(t, it.Next(), "next succeeded unexpectedly") require.Equal(t, it.Next(), ValNone, "next succeeded unexpectedly")
} }
func BenchmarkMemoizedSeriesIterator(b *testing.B) { func BenchmarkMemoizedSeriesIterator(b *testing.B) {
@ -78,7 +79,7 @@ func BenchmarkMemoizedSeriesIterator(b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for it.Next() { for it.Next() != ValNone {
// scan everything // scan everything
} }
require.NoError(b, it.Err()) require.NoError(b, it.Err())