histogram_quantile for sparse histograms (#9935)

* MergeFloatBucketIterator for []FloatBucketIterator

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* histogram_quantile for histograms

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix histogram_quantile

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Unit test and enhancements

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Iterators to iterate buckets in reverse and all buckets together including zero bucket

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Consider all buckets for histogram_quantile and fix the implementation

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Remove unneeded code

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix lint

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-12-06 19:17:22 +05:30 committed by GitHub
parent 0e1b9dd308
commit 4a43349aca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 750 additions and 2 deletions

View file

@ -418,6 +418,27 @@ func (h *FloatHistogram) NegativeBucketIterator() FloatBucketIterator {
return newFloatBucketIterator(h, false)
}
// PositiveReverseBucketIterator returns a FloatBucketIterator to iterate over all
// positive buckets in decending order (starting at the highest bucket and going
// down upto zero bucket).
func (h *FloatHistogram) PositiveReverseBucketIterator() FloatBucketIterator {
return newReverseFloatBucketIterator(h, true)
}
// NegativeReverseBucketIterator returns a FloatBucketIterator to iterate over all
// negative buckets in ascending order (starting at the lowest bucket and doing up
// upto zero bucket).
func (h *FloatHistogram) NegativeReverseBucketIterator() FloatBucketIterator {
return newReverseFloatBucketIterator(h, false)
}
// AllFloatBucketIterator returns a FloatBucketIterator to iterate over all
// negative, zero, and positive buckets in ascending order (starting at the
// lowest bucket and going up).
func (h *FloatHistogram) AllFloatBucketIterator() FloatBucketIterator {
return newAllFloatBucketIterator(h)
}
// CumulativeBucketIterator returns a FloatBucketIterator to iterate over a
// cumulative view of the buckets. This method currently only supports
// FloatHistograms without negative buckets and panics if the FloatHistogram has
@ -549,6 +570,148 @@ func (r *floatBucketIterator) At() FloatBucket {
}
}
type reverseFloatBucketIterator struct {
schema int32
spans []Span
buckets []float64
positive bool // Whether this is for positive buckets.
spansIdx int // Current span within spans slice.
idxInSpan int32 // Index in the current span. 0 <= idxInSpan < span.Length.
bucketsIdx int // Current bucket within buckets slice.
currCount float64 // Count in the current bucket.
currIdx int32 // The actual bucket index.
currLower, currUpper float64 // Limits of the current bucket.
initiated bool
}
func newReverseFloatBucketIterator(h *FloatHistogram, positive bool) *reverseFloatBucketIterator {
r := &reverseFloatBucketIterator{schema: h.Schema, positive: positive}
if positive {
r.spans = h.PositiveSpans
r.buckets = h.PositiveBuckets
} else {
r.spans = h.NegativeSpans
r.buckets = h.NegativeBuckets
}
return r
}
func (r *reverseFloatBucketIterator) Next() bool {
if !r.initiated {
r.initiated = true
r.spansIdx = len(r.spans) - 1
r.bucketsIdx = len(r.buckets) - 1
if r.spansIdx >= 0 {
r.idxInSpan = int32(r.spans[r.spansIdx].Length) - 1
}
r.currIdx = 0
for _, s := range r.spans {
r.currIdx += s.Offset + int32(s.Length)
}
}
r.currIdx--
if r.bucketsIdx < 0 {
return false
}
for r.idxInSpan < 0 {
// We have exhausted the current span and have to find a new
// one. We'll even handle pathologic spans of length 0.
r.spansIdx--
r.idxInSpan = int32(r.spans[r.spansIdx].Length) - 1
r.currIdx -= r.spans[r.spansIdx+1].Offset
}
r.currCount = r.buckets[r.bucketsIdx]
if r.positive {
r.currUpper = getBound(r.currIdx, r.schema)
r.currLower = getBound(r.currIdx-1, r.schema)
} else {
r.currLower = -getBound(r.currIdx, r.schema)
r.currUpper = -getBound(r.currIdx-1, r.schema)
}
r.bucketsIdx--
r.idxInSpan--
return true
}
func (r *reverseFloatBucketIterator) At() FloatBucket {
return FloatBucket{
Count: r.currCount,
Lower: r.currLower,
Upper: r.currUpper,
LowerInclusive: r.currLower < 0,
UpperInclusive: r.currUpper > 0,
Index: r.currIdx,
}
}
type allFloatBucketIterator struct {
h *FloatHistogram
negIter, posIter FloatBucketIterator
// -1 means we are iterating negative buckets.
// 0 means it is time for zero bucket.
// 1 means we are iterating positive buckets.
// Anything else means iteration is over.
state int8
currBucket FloatBucket
}
func newAllFloatBucketIterator(h *FloatHistogram) *allFloatBucketIterator {
return &allFloatBucketIterator{
h: h,
negIter: h.NegativeReverseBucketIterator(),
posIter: h.PositiveBucketIterator(),
state: -1,
}
}
func (r *allFloatBucketIterator) Next() bool {
switch r.state {
case -1:
if r.negIter.Next() {
r.currBucket = r.negIter.At()
return true
}
r.state = 0
return r.Next()
case 0:
r.state = 1
if r.h.ZeroCount > 0 {
r.currBucket = FloatBucket{
Lower: -r.h.ZeroThreshold,
Upper: r.h.ZeroThreshold,
LowerInclusive: true,
UpperInclusive: true,
Count: r.h.ZeroCount,
Index: math.MinInt32, // TODO(codesome): What is the index for this?
}
return true
}
return r.Next()
case 1:
if r.posIter.Next() {
r.currBucket = r.posIter.At()
return true
}
r.state = 42
return false
}
return false
}
func (r *allFloatBucketIterator) At() FloatBucket {
return r.currBucket
}
type cumulativeFloatBucketIterator struct {
h *FloatHistogram

View file

@ -14,6 +14,8 @@
package histogram
import (
"fmt"
"math"
"testing"
"github.com/stretchr/testify/require"
@ -770,3 +772,223 @@ func TestFloatHistogramSub(t *testing.T) {
})
}
}
func TestReverseFloatBucketIterator(t *testing.T) {
h := &FloatHistogram{
Count: 405,
ZeroCount: 102,
ZeroThreshold: 0.001,
Sum: 1008.4,
Schema: 1,
PositiveSpans: []Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 0},
{Offset: 3, Length: 3},
{Offset: 3, Length: 0},
{Offset: 2, Length: 0},
{Offset: 5, Length: 3},
},
PositiveBuckets: []float64{100, 344, 123, 55, 3, 63, 2, 54, 235, 33},
NegativeSpans: []Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 0},
{Offset: 3, Length: 0},
{Offset: 3, Length: 4},
{Offset: 2, Length: 0},
{Offset: 5, Length: 3},
},
NegativeBuckets: []float64{10, 34, 1230, 54, 67, 63, 2, 554, 235, 33},
}
// Assuming that the regular iterator is correct.
// Positive buckets.
var expBuckets, actBuckets []FloatBucket
it := h.PositiveBucketIterator()
for it.Next() {
// Append in reverse to check reversed list.
expBuckets = append([]FloatBucket{it.At()}, expBuckets...)
}
it = h.PositiveReverseBucketIterator()
for it.Next() {
actBuckets = append(actBuckets, it.At())
}
require.Greater(t, len(expBuckets), 0)
require.Greater(t, len(actBuckets), 0)
require.Equal(t, expBuckets, actBuckets)
// Negative buckets.
expBuckets = expBuckets[:0]
actBuckets = actBuckets[:0]
it = h.NegativeBucketIterator()
for it.Next() {
// Append in reverse to check reversed list.
expBuckets = append([]FloatBucket{it.At()}, expBuckets...)
}
it = h.NegativeReverseBucketIterator()
for it.Next() {
actBuckets = append(actBuckets, it.At())
}
require.Greater(t, len(expBuckets), 0)
require.Greater(t, len(actBuckets), 0)
require.Equal(t, expBuckets, actBuckets)
}
func TestAllFloatBucketIterator(t *testing.T) {
cases := []struct {
h FloatHistogram
// To determine the expected buckets.
includeNeg, includeZero, includePos bool
}{
{
h: FloatHistogram{
Count: 405,
ZeroCount: 102,
ZeroThreshold: 0.001,
Sum: 1008.4,
Schema: 1,
PositiveSpans: []Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 0},
{Offset: 3, Length: 3},
{Offset: 3, Length: 0},
{Offset: 2, Length: 0},
{Offset: 5, Length: 3},
},
PositiveBuckets: []float64{100, 344, 123, 55, 3, 63, 2, 54, 235, 33},
NegativeSpans: []Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 0},
{Offset: 3, Length: 0},
{Offset: 3, Length: 4},
{Offset: 2, Length: 0},
{Offset: 5, Length: 3},
},
NegativeBuckets: []float64{10, 34, 1230, 54, 67, 63, 2, 554, 235, 33},
},
includeNeg: true,
includeZero: true,
includePos: true,
},
{
h: FloatHistogram{
Count: 405,
ZeroCount: 102,
ZeroThreshold: 0.001,
Sum: 1008.4,
Schema: 1,
NegativeSpans: []Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 0},
{Offset: 3, Length: 0},
{Offset: 3, Length: 4},
{Offset: 2, Length: 0},
{Offset: 5, Length: 3},
},
NegativeBuckets: []float64{10, 34, 1230, 54, 67, 63, 2, 554, 235, 33},
},
includeNeg: true,
includeZero: true,
includePos: false,
},
{
h: FloatHistogram{
Count: 405,
ZeroCount: 102,
ZeroThreshold: 0.001,
Sum: 1008.4,
Schema: 1,
PositiveSpans: []Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 0},
{Offset: 3, Length: 3},
{Offset: 3, Length: 0},
{Offset: 2, Length: 0},
{Offset: 5, Length: 3},
},
PositiveBuckets: []float64{100, 344, 123, 55, 3, 63, 2, 54, 235, 33},
},
includeNeg: false,
includeZero: true,
includePos: true,
},
{
h: FloatHistogram{
Count: 405,
ZeroCount: 102,
ZeroThreshold: 0.001,
Sum: 1008.4,
Schema: 1,
},
includeNeg: false,
includeZero: true,
includePos: false,
},
{
h: FloatHistogram{
Count: 405,
ZeroCount: 0,
ZeroThreshold: 0.001,
Sum: 1008.4,
Schema: 1,
PositiveSpans: []Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 0},
{Offset: 3, Length: 3},
{Offset: 3, Length: 0},
{Offset: 2, Length: 0},
{Offset: 5, Length: 3},
},
PositiveBuckets: []float64{100, 344, 123, 55, 3, 63, 2, 54, 235, 33},
NegativeSpans: []Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 0},
{Offset: 3, Length: 0},
{Offset: 3, Length: 4},
{Offset: 2, Length: 0},
{Offset: 5, Length: 3},
},
NegativeBuckets: []float64{10, 34, 1230, 54, 67, 63, 2, 554, 235, 33},
},
includeNeg: true,
includeZero: false,
includePos: true,
},
}
for i, c := range cases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
var expBuckets, actBuckets []FloatBucket
if c.includeNeg {
it := c.h.NegativeReverseBucketIterator()
for it.Next() {
expBuckets = append(expBuckets, it.At())
}
}
if c.includeZero {
expBuckets = append(expBuckets, FloatBucket{
Lower: -c.h.ZeroThreshold,
Upper: c.h.ZeroThreshold,
LowerInclusive: true,
UpperInclusive: true,
Count: c.h.ZeroCount,
Index: math.MinInt32,
})
}
if c.includePos {
it := c.h.PositiveBucketIterator()
for it.Next() {
expBuckets = append(expBuckets, it.At())
}
}
it := c.h.AllFloatBucketIterator()
for it.Next() {
actBuckets = append(actBuckets, it.At())
}
require.Equal(t, expBuckets, actBuckets)
})
}
}

View file

@ -935,8 +935,10 @@ type EvalNodeHelper struct {
// Caches.
// DropMetricName and label_*.
Dmn map[uint64]labels.Labels
// funcHistogramQuantile.
// funcHistogramQuantile for conventional histograms.
signatureToMetricWithBuckets map[string]*metricWithBuckets
// funcHistogramQuantile for the new histograms.
signatureToMetricWithHistograms map[string]*metricWithHistograms
// label_replace.
regex *regexp.Regexp

View file

@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"math"
"os"
"sort"
"testing"
@ -2662,3 +2663,239 @@ func TestSparseHistogramRate(t *testing.T) {
}
require.Equal(t, expectedHistogram, actualHistogram)
}
func TestSparseHistogram_HistogramQuantile(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there.
type subCase struct {
quantile string
value float64
}
cases := []struct {
text string
// Histogram to test.
h *histogram.Histogram
// Different quantiles to test for this histogram.
subCases []subCase
}{
{
text: "all positive buckets with zero bucket",
h: &histogram.Histogram{
Count: 12,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100, // Does not matter.
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, 1, -2, 3},
},
subCases: []subCase{
{
quantile: "1.0001",
value: math.Inf(1),
},
{
quantile: "1",
value: 16,
},
{
quantile: "0.99",
value: 15.759999999999998,
},
{
quantile: "0.9",
value: 13.600000000000001,
},
{
quantile: "0.6",
value: 4.799999999999997,
},
{
quantile: "0.5",
value: 1.6666666666666665,
},
{ // Zero bucket.
quantile: "0.1",
value: 0.0006000000000000001,
},
{
quantile: "0",
value: 0,
},
{
quantile: "-1",
value: math.Inf(-1),
},
},
},
{
text: "all negative buckets with zero bucket",
h: &histogram.Histogram{
Count: 12,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100, // Does not matter.
Schema: 0,
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{2, 1, -2, 3},
},
subCases: []subCase{
{
quantile: "1.0001",
value: math.Inf(1),
},
{ // Zero bucket.
quantile: "1",
value: 0.001,
},
{ // Zero bucket.
quantile: "0.99",
value: 0.0008799999999999991,
},
{ // Zero bucket.
quantile: "0.9",
value: -0.00019999999999999933,
},
{
quantile: "0.5",
value: -1.6666666666666667,
},
{
quantile: "0.1",
value: -13.6,
},
{
quantile: "0",
value: -16,
},
{
quantile: "-1",
value: math.Inf(-1),
},
},
},
{
text: "both positive and negative buckets with zero bucket",
h: &histogram.Histogram{
Count: 24,
ZeroCount: 4,
ZeroThreshold: 0.001,
Sum: 100, // Does not matter.
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, 1, -2, 3},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{2, 1, -2, 3},
},
subCases: []subCase{
{
quantile: "1.0001",
value: math.Inf(1),
},
{
quantile: "1",
value: 16,
},
{
quantile: "0.99",
value: 15.519999999999996,
},
{
quantile: "0.9",
value: 11.200000000000003,
},
{
quantile: "0.7",
value: 1.2666666666666657,
},
{ // Zero bucket.
quantile: "0.55",
value: 0.0006000000000000005,
},
{ // Zero bucket.
quantile: "0.5",
value: 0,
},
{ // Zero bucket.
quantile: "0.45",
value: -0.0005999999999999996,
},
{
quantile: "0.3",
value: -1.266666666666667,
},
{
quantile: "0.1",
value: -11.2,
},
{
quantile: "0.01",
value: -15.52,
},
{
quantile: "0",
value: -16,
},
{
quantile: "-1",
value: math.Inf(-1),
},
},
},
}
for i, c := range cases {
t.Run(c.text, func(t *testing.T) {
// TODO(codesome): Check if TSDB is handling these histograms properly.
// When testing, the 3rd case of both pos neg was getting no histograms in the query engine
// when the storage was shared even with good time gap between all histograms.
// It is possible that the recode is failing. It was fine between first 2 cases where there is
// a change of bucket layout.
test, err := NewTest(t, "")
require.NoError(t, err)
t.Cleanup(test.Close)
seriesName := "sparse_histogram_series"
lbls := labels.FromStrings("__name__", seriesName)
engine := test.QueryEngine()
ts := int64(i+1) * int64(10*time.Minute/time.Millisecond)
app := test.Storage().Appender(context.TODO())
_, err = app.AppendHistogram(0, lbls, ts, c.h)
require.NoError(t, err)
require.NoError(t, app.Commit())
for j, sc := range c.subCases {
t.Run(fmt.Sprintf("%d %s", j, sc.quantile), func(t *testing.T) {
queryString := fmt.Sprintf("histogram_quantile(%s, %s)", sc.quantile, seriesName)
qry, err := engine.NewInstantQuery(test.Queryable(), queryString, timestamp.Time(ts))
require.NoError(t, err)
res := qry.Exec(test.Context())
require.NoError(t, res.Err)
vector, err := res.Vector()
require.NoError(t, err)
require.Len(t, vector, 1)
require.Nil(t, vector[0].H)
require.Equal(t, sc.value, vector[0].V)
})
}
})
}
}

View file

@ -858,6 +858,7 @@ func funcHistogramQuantile(vals []parser.Value, args parser.Expressions, enh *Ev
q := vals[0].(Vector)[0].V
inVec := vals[1].(Vector)
sigf := signatureFunc(false, enh.lblBuf, excludedLabels...)
ignoreSignature := make(map[string]bool) // For signatures having both new and old histograms.
if enh.signatureToMetricWithBuckets == nil {
enh.signatureToMetricWithBuckets = map[string]*metricWithBuckets{}
@ -866,7 +867,41 @@ func funcHistogramQuantile(vals []parser.Value, args parser.Expressions, enh *Ev
v.buckets = v.buckets[:0]
}
}
if enh.signatureToMetricWithHistograms == nil {
enh.signatureToMetricWithHistograms = map[string]*metricWithHistograms{}
} else {
for _, v := range enh.signatureToMetricWithHistograms {
v.histogram = nil
}
}
for _, el := range inVec {
l := sigf(el.Metric)
if ignoreSignature[l] {
continue
}
if el.H != nil { // It's a histogram type.
_, ok := enh.signatureToMetricWithBuckets[l]
if ok {
// This signature exists for both conventional and new histograms which is not supported.
delete(enh.signatureToMetricWithBuckets, l)
delete(enh.signatureToMetricWithHistograms, l)
ignoreSignature[l] = true
continue
}
_, ok = enh.signatureToMetricWithHistograms[l]
if ok {
panic(errors.New("histogram_quantile: vector cannot contain metrics with the same labelset"))
}
el.Metric = labels.NewBuilder(el.Metric).
Del(labels.BucketLabel, labels.MetricName).
Labels()
enh.signatureToMetricWithHistograms[l] = &metricWithHistograms{el.Metric, el.H}
continue
}
upperBound, err := strconv.ParseFloat(
el.Metric.Get(model.BucketLabel), 64,
)
@ -875,7 +910,15 @@ func funcHistogramQuantile(vals []parser.Value, args parser.Expressions, enh *Ev
// TODO(beorn7): Issue a warning somehow.
continue
}
l := sigf(el.Metric)
_, ok := enh.signatureToMetricWithHistograms[l]
if ok {
// This signature exists for both conventional and new histograms which is not supported.
delete(enh.signatureToMetricWithBuckets, l)
delete(enh.signatureToMetricWithHistograms, l)
ignoreSignature[l] = true
continue
}
mb, ok := enh.signatureToMetricWithBuckets[l]
if !ok {
@ -898,6 +941,15 @@ func funcHistogramQuantile(vals []parser.Value, args parser.Expressions, enh *Ev
}
}
for _, mh := range enh.signatureToMetricWithHistograms {
if mh.histogram != nil {
enh.Out = append(enh.Out, Sample{
Metric: mh.metric,
Point: Point{V: histogramQuantile(q, mh.histogram)},
})
}
}
return enh.Out
}

View file

@ -17,6 +17,7 @@ import (
"math"
"sort"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
)
@ -46,6 +47,11 @@ type metricWithBuckets struct {
buckets buckets
}
type metricWithHistograms struct {
metric labels.Labels
histogram *histogram.FloatHistogram
}
// bucketQuantile calculates the quantile 'q' based on the given buckets. The
// buckets will be sorted by upperBound by this function (i.e. no sorting
// needed before calling this function). The quantile value is interpolated
@ -114,6 +120,72 @@ func bucketQuantile(q float64, buckets buckets) float64 {
return bucketStart + (bucketEnd-bucketStart)*(rank/count)
}
// histogramQuantile calculates the quantile 'q' based on the given histogram.
// The quantile value is interpolated assuming a linear distribution within a bucket.
// A natural lower bound of 0 is assumed if the upper bound of the
// lowest bucket is greater 0. In that case, interpolation in the lowest bucket
// happens linearly between 0 and the upper bound of the lowest bucket.
// However, if the lowest bucket has an upper bound less or equal 0, this upper
// bound is returned if the quantile falls into the lowest bucket.
//
// There are a number of special cases (once we have a way to report errors
// happening during evaluations of AST functions, we should report those
// explicitly):
//
// If 'buckets' has 0 observations, NaN is returned.
//
// If q<0, -Inf is returned.
//
// The following special cases are ignored from conventional histograms because
// we don't have a +Inf bucket in new histograms:
// If the highest bucket is not +Inf, NaN is returned.
// If 'buckets' has fewer than 2 elements, NaN is returned.
//
// TODO(codesome): Support negative buckets.
func histogramQuantile(q float64, h *histogram.FloatHistogram) float64 {
if q < 0 {
return math.Inf(-1)
}
if q > 1 {
return math.Inf(+1)
}
if h.Count == 0 {
return math.NaN()
}
var (
bucket *histogram.FloatBucket
count float64
it = h.AllFloatBucketIterator()
rank = q * h.Count
idx = -1
)
// TODO(codesome): Do we need any special handling for negative buckets?
for it.Next() {
idx++
b := it.At()
count += b.Count
if count >= rank {
bucket = &b
break
}
}
if bucket == nil {
panic("histogramQuantile: not possible")
}
if idx == 0 && bucket.Lower < 0 && bucket.Upper > 0 {
// Zero bucket has the result and it happens to be the first bucket of this histogram.
// So we consider 0 to be the lower bound.
bucket.Lower = 0
}
rank -= count - bucket.Count
// TODO(codesome): Use a better estimation than linear.
return bucket.Lower + (bucket.Upper-bucket.Lower)*(rank/bucket.Count)
}
// coalesceBuckets merges buckets with the same upper bound.
//
// The input buckets must be sorted.