Add the histogram_quantile function.

Since we are now getting really deep into floating point calculation,
the tests had to take into account the precision loss. Since the rule
tests are based on direct line matching in the output, implementing
the "almost equal" semantics was pretty cumbersome, but here we are.
This commit is contained in:
beorn7 2015-02-19 19:56:45 +01:00
parent 452c88964a
commit 9e7c3e3bcd
4 changed files with 620 additions and 7 deletions

View file

@ -18,6 +18,7 @@ import (
"fmt"
"math"
"sort"
"strconv"
"time"
clientmodel "github.com/prometheus/client_golang/model"
@ -498,6 +499,44 @@ func derivImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
return resultVector
}
// === histogram_quantile(k ScalarNode, vector VectorNode) Vector ===
func histogramQuantileImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
q := args[0].(ScalarNode).Eval(timestamp)
inVec := args[1].(VectorNode).Eval(timestamp)
outVec := Vector{}
fpToMetricWithBuckets := map[clientmodel.Fingerprint]*metricWithBuckets{}
for _, el := range inVec {
upperBound, err := strconv.ParseFloat(
string(el.Metric.Metric[clientmodel.BucketLabel]), 64,
)
if err != nil {
// Oops, no bucket label or malformed label value. Skip.
// TODO(beorn7): Issue a warning somehow.
continue
}
// TODO avoid copying each time by using a custom fingerprint
el.Metric.Delete(clientmodel.BucketLabel)
el.Metric.Delete(clientmodel.MetricNameLabel)
fp := el.Metric.Metric.Fingerprint()
mb, ok := fpToMetricWithBuckets[fp]
if !ok {
mb = &metricWithBuckets{el.Metric, nil}
fpToMetricWithBuckets[fp] = mb
}
mb.buckets = append(mb.buckets, bucket{upperBound, el.Value})
}
for _, mb := range fpToMetricWithBuckets {
outVec = append(outVec, &Sample{
Metric: mb.metric,
Value: clientmodel.SampleValue(quantile(q, mb.buckets)),
Timestamp: timestamp,
})
}
return outVec
}
var functions = map[string]*Function{
"abs": {
name: "abs",
@ -548,6 +587,12 @@ var functions = map[string]*Function{
returnType: VectorType,
callFn: deltaImpl,
},
"deriv": {
name: "deriv",
argTypes: []ExprType{MatrixType},
returnType: VectorType,
callFn: derivImpl,
},
"drop_common_labels": {
name: "drop_common_labels",
argTypes: []ExprType{VectorType},
@ -560,6 +605,12 @@ var functions = map[string]*Function{
returnType: VectorType,
callFn: floorImpl,
},
"histogram_quantile": {
name: "histogram_quantile",
argTypes: []ExprType{ScalarType, VectorType},
returnType: VectorType,
callFn: histogramQuantileImpl,
},
"max_over_time": {
name: "max_over_time",
argTypes: []ExprType{MatrixType},
@ -621,12 +672,6 @@ var functions = map[string]*Function{
returnType: VectorType,
callFn: topkImpl,
},
"deriv": {
name: "deriv",
argTypes: []ExprType{MatrixType},
returnType: VectorType,
callFn: derivImpl,
},
}
// GetFunction returns a predefined Function object for the given

99
rules/ast/quantile.go Normal file
View file

@ -0,0 +1,99 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ast
import (
"math"
"sort"
clientmodel "github.com/prometheus/client_golang/model"
)
// Helpers to calculate quantiles.
type bucket struct {
upperBound float64
count clientmodel.SampleValue
}
// buckets implements sort.Interface.
type buckets []bucket
func (b buckets) Len() int { return len(b) }
func (b buckets) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b buckets) Less(i, j int) bool { return b[i].upperBound < b[j].upperBound }
type metricWithBuckets struct {
metric clientmodel.COWMetric
buckets buckets
}
// quantile calculates the quantile 'q' based on the given buckets. The buckets
// will be sorted by upperBound by this function (i.e. no sorting needed before
// calling this function). The quantile value is interpolated assuming a linear
// distribution within a bucket. However, if the quantile falls into the highest
// bucket, the upper bound of the 2nd highest bucket is returned. A natural
// lower bound of 0 is assumed if the upper bound of the lowest bucket is
// greater 0. In that case, interpolation in the lowest bucket happens linearly
// between 0 and the upper bound of the lowest bucket. However, if the lowest
// bucket has an upper bound less or equal 0, this upper bound is returned if
// the quantile falls into the lowest bucket.
//
// There are a number of special cases (once we have a way to report errors
// happening during evaluations of AST functions, we should report those
// explicitly):
//
// If 'buckets' has fewer than 2 elements, NaN is returned.
//
// If the highest bucket is not +Inf, NaN is returned.
//
// If q<0, -Inf is returned.
//
// If q>1, +Inf is returned.
func quantile(q clientmodel.SampleValue, buckets buckets) float64 {
if q < 0 {
return math.Inf(-1)
}
if q > 1 {
return math.Inf(+1)
}
if len(buckets) < 2 {
return math.NaN()
}
sort.Sort(buckets)
if !math.IsInf(buckets[len(buckets)-1].upperBound, +1) {
return math.NaN()
}
rank := q * buckets[len(buckets)-1].count
b := sort.Search(len(buckets)-1, func(i int) bool { return buckets[i].count >= rank })
if b == len(buckets)-1 {
return buckets[len(buckets)-2].upperBound
}
if b == 0 && buckets[0].upperBound <= 0 {
return buckets[0].upperBound
}
var (
bucketStart float64
bucketEnd = buckets[b].upperBound
count = buckets[b].count
)
if b > 0 {
bucketStart = buckets[b-1].upperBound
count -= buckets[b-1].count
rank -= buckets[b-1].count
}
return bucketStart + (bucketEnd-bucketStart)*float64(rank/count)
}

View file

@ -205,6 +205,224 @@ var testMatrix = ast.Matrix{
},
Values: getTestValueStream(0, 200, 20, testStartTime),
},
// Two histogram with 4 buckets each (*_sum and *_count not included,
// only buckets). Lowest bucket for one histogram < 0, for the other >
// 0. They have the same name, just separated by label. Not useful in
// practice, but can happen (if clients change bucketing), and the
// server has to cope with it.
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "0.1",
"start": "positive",
},
},
Values: getTestValueStream(0, 50, 5, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": ".2",
"start": "positive",
},
},
Values: getTestValueStream(0, 70, 7, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "1e0",
"start": "positive",
},
},
Values: getTestValueStream(0, 110, 11, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "+Inf",
"start": "positive",
},
},
Values: getTestValueStream(0, 120, 12, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "-.2",
"start": "negative",
},
},
Values: getTestValueStream(0, 10, 1, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "-0.1",
"start": "negative",
},
},
Values: getTestValueStream(0, 20, 2, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "0.3",
"start": "negative",
},
},
Values: getTestValueStream(0, 20, 2, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "+Inf",
"start": "negative",
},
},
Values: getTestValueStream(0, 30, 3, testStartTime),
},
// Now a more realistic histogram per job and instance to test aggregation.
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins1",
"le": "0.1",
},
},
Values: getTestValueStream(0, 10, 1, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins1",
"le": "0.2",
},
},
Values: getTestValueStream(0, 30, 3, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins1",
"le": "+Inf",
},
},
Values: getTestValueStream(0, 40, 4, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins2",
"le": "0.1",
},
},
Values: getTestValueStream(0, 20, 2, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins2",
"le": "0.2",
},
},
Values: getTestValueStream(0, 50, 5, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins2",
"le": "+Inf",
},
},
Values: getTestValueStream(0, 60, 6, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins1",
"le": "0.1",
},
},
Values: getTestValueStream(0, 30, 3, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins1",
"le": "0.2",
},
},
Values: getTestValueStream(0, 40, 4, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins1",
"le": "+Inf",
},
},
Values: getTestValueStream(0, 60, 6, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins2",
"le": "0.1",
},
},
Values: getTestValueStream(0, 40, 4, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins2",
"le": "0.2",
},
},
Values: getTestValueStream(0, 70, 7, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins2",
"le": "+Inf",
},
},
Values: getTestValueStream(0, 90, 9, testStartTime),
},
}
var testVector = getTestVectorFromTestMatrix(testMatrix)

View file

@ -15,7 +15,10 @@ package rules
import (
"fmt"
"math"
"path"
"regexp"
"strconv"
"strings"
"testing"
"time"
@ -32,6 +35,13 @@ import (
var (
testEvalTime = testStartTime.Add(testSampleInterval * 10)
fixturesPath = "fixtures"
reSample = regexp.MustCompile(`^(.*) \=\> (\-?\d+\.?\d*e?\d*|[+-]Inf|NaN) \@\[(\d+)\]$`)
minNormal = math.Float64frombits(0x0010000000000000) // The smallest positive normal value of type float64.
)
const (
epsilon = 0.000001 // Relative error allowed for sample values.
)
func annotateWithTime(lines []string, timestamp clientmodel.Timestamp) []string {
@ -53,6 +63,51 @@ func vectorComparisonString(expected []string, actual []string) string {
separator)
}
// samplesAlmostEqual returns true if the two sample lines only differ by a
// small relative error in their sample value.
func samplesAlmostEqual(a, b string) bool {
if a == b {
// Fast path if strings are equal.
return true
}
aMatches := reSample.FindStringSubmatch(a)
if aMatches == nil {
panic(fmt.Errorf("sample %q did not match regular expression", a))
}
bMatches := reSample.FindStringSubmatch(b)
if bMatches == nil {
panic(fmt.Errorf("sample %q did not match regular expression", b))
}
if aMatches[1] != bMatches[1] {
return false // Labels don't match.
}
if aMatches[3] != bMatches[3] {
return false // Timestamps don't match.
}
// If we are here, we have the diff in the floats.
// We have to check if they are almost equal.
aVal, err := strconv.ParseFloat(aMatches[2], 64)
if err != nil {
panic(err)
}
bVal, err := strconv.ParseFloat(bMatches[2], 64)
if err != nil {
panic(err)
}
// Cf. http://floating-point-gui.de/errors/comparison/
if aVal == bVal {
return true
}
diff := math.Abs(aVal - bVal)
if aVal == 0 || bVal == 0 || diff < minNormal {
return diff < epsilon*minNormal
}
return diff/(math.Abs(aVal)+math.Abs(bVal)) < epsilon
}
func newTestStorage(t testing.TB) (storage local.Storage, closer test.Closer) {
storage, closer = local.NewTestStorage(t)
storeMatrix(storage, testMatrix)
@ -555,6 +610,26 @@ func TestExpressions(t *testing.T) {
`x{y="testvalue"} => 100 @[%v]`,
`label_grouping_test{a="a", b="abb"} => 200 @[%v]`,
`label_grouping_test{a="aa", b="bb"} => 100 @[%v]`,
`testhistogram_bucket{le="0.1", start="positive"} => 50 @[%v]`,
`testhistogram_bucket{le=".2", start="positive"} => 70 @[%v]`,
`testhistogram_bucket{le="1e0", start="positive"} => 110 @[%v]`,
`testhistogram_bucket{le="+Inf", start="positive"} => 120 @[%v]`,
`testhistogram_bucket{le="-.2", start="negative"} => 10 @[%v]`,
`testhistogram_bucket{le="-0.1", start="negative"} => 20 @[%v]`,
`testhistogram_bucket{le="0.3", start="negative"} => 20 @[%v]`,
`testhistogram_bucket{le="+Inf", start="negative"} => 30 @[%v]`,
`request_duration_seconds_bucket{instance="ins1", job="job1", le="0.1"} => 10 @[%v]`,
`request_duration_seconds_bucket{instance="ins1", job="job1", le="0.2"} => 30 @[%v]`,
`request_duration_seconds_bucket{instance="ins1", job="job1", le="+Inf"} => 40 @[%v]`,
`request_duration_seconds_bucket{instance="ins2", job="job1", le="0.1"} => 20 @[%v]`,
`request_duration_seconds_bucket{instance="ins2", job="job1", le="0.2"} => 50 @[%v]`,
`request_duration_seconds_bucket{instance="ins2", job="job1", le="+Inf"} => 60 @[%v]`,
`request_duration_seconds_bucket{instance="ins1", job="job2", le="0.1"} => 30 @[%v]`,
`request_duration_seconds_bucket{instance="ins1", job="job2", le="0.2"} => 40 @[%v]`,
`request_duration_seconds_bucket{instance="ins1", job="job2", le="+Inf"} => 60 @[%v]`,
`request_duration_seconds_bucket{instance="ins2", job="job2", le="0.1"} => 40 @[%v]`,
`request_duration_seconds_bucket{instance="ins2", job="job2", le="0.2"} => 70 @[%v]`,
`request_duration_seconds_bucket{instance="ins2", job="job2", le="+Inf"} => 90 @[%v]`,
},
},
{
@ -651,6 +726,182 @@ func TestExpressions(t *testing.T) {
`{a="aa", b="bb"} => 100 @[%v]`,
},
},
// Quantile too low.
{
expr: `histogram_quantile(-0.1, testhistogram_bucket)`,
output: []string{
`{start="positive"} => -Inf @[%v]`,
`{start="negative"} => -Inf @[%v]`,
},
},
// Quantile too high.
{
expr: `histogram_quantile(1.01, testhistogram_bucket)`,
output: []string{
`{start="positive"} => +Inf @[%v]`,
`{start="negative"} => +Inf @[%v]`,
},
},
// Quantile value in lowest bucket, which is positive.
{
expr: `histogram_quantile(0, testhistogram_bucket{start="positive"})`,
output: []string{
`{start="positive"} => 0 @[%v]`,
},
},
// Quantile value in lowest bucket, which is negative.
{
expr: `histogram_quantile(0, testhistogram_bucket{start="negative"})`,
output: []string{
`{start="negative"} => -0.2 @[%v]`,
},
},
// Quantile value in highest bucket.
{
expr: `histogram_quantile(1, testhistogram_bucket)`,
output: []string{
`{start="positive"} => 1 @[%v]`,
`{start="negative"} => 0.3 @[%v]`,
},
},
// Finally some useful quantiles.
{
expr: `histogram_quantile(0.2, testhistogram_bucket)`,
output: []string{
`{start="positive"} => 0.048 @[%v]`,
`{start="negative"} => -0.2 @[%v]`,
},
},
{
expr: `histogram_quantile(0.5, testhistogram_bucket)`,
output: []string{
`{start="positive"} => 0.15 @[%v]`,
`{start="negative"} => -0.15 @[%v]`,
},
},
{
expr: `histogram_quantile(0.8, testhistogram_bucket)`,
output: []string{
`{start="positive"} => 0.72 @[%v]`,
`{start="negative"} => 0.3 @[%v]`,
},
},
// More realistic with rates.
{
expr: `histogram_quantile(0.2, rate(testhistogram_bucket[5m]))`,
output: []string{
`{start="positive"} => 0.048 @[%v]`,
`{start="negative"} => -0.2 @[%v]`,
},
},
{
expr: `histogram_quantile(0.5, rate(testhistogram_bucket[5m]))`,
output: []string{
`{start="positive"} => 0.15 @[%v]`,
`{start="negative"} => -0.15 @[%v]`,
},
},
{
expr: `histogram_quantile(0.8, rate(testhistogram_bucket[5m]))`,
output: []string{
`{start="positive"} => 0.72 @[%v]`,
`{start="negative"} => 0.3 @[%v]`,
},
},
// Aggregated histogram: Everything in one.
{
expr: `histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le))`,
output: []string{
`{} => 0.075 @[%v]`,
},
},
{
expr: `histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le))`,
output: []string{
`{} => 0.1277777777777778 @[%v]`,
},
},
// Aggregated histogram: Everything in one. Now with avg, which does not change anything.
{
expr: `histogram_quantile(0.3, avg(rate(request_duration_seconds_bucket[5m])) by (le))`,
output: []string{
`{} => 0.075 @[%v]`,
},
},
{
expr: `histogram_quantile(0.5, avg(rate(request_duration_seconds_bucket[5m])) by (le))`,
output: []string{
`{} => 0.12777777777777778 @[%v]`,
},
},
// Aggregated histogram: By job.
{
expr: `histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, instance))`,
output: []string{
`{instance="ins1"} => 0.075 @[%v]`,
`{instance="ins2"} => 0.075 @[%v]`,
},
},
{
expr: `histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, instance))`,
output: []string{
`{instance="ins1"} => 0.1333333333 @[%v]`,
`{instance="ins2"} => 0.125 @[%v]`,
},
},
// Aggregated histogram: By instance.
{
expr: `histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, job))`,
output: []string{
`{job="job1"} => 0.1 @[%v]`,
`{job="job2"} => 0.0642857142857143 @[%v]`,
},
},
{
expr: `histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, job))`,
output: []string{
`{job="job1"} => 0.14 @[%v]`,
`{job="job2"} => 0.1125 @[%v]`,
},
},
// Aggregated histogram: By job and instance.
{
expr: `histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, job, instance))`,
output: []string{
`{instance="ins1", job="job1"} => 0.11 @[%v]`,
`{instance="ins2", job="job1"} => 0.09 @[%v]`,
`{instance="ins1", job="job2"} => 0.06 @[%v]`,
`{instance="ins2", job="job2"} => 0.0675 @[%v]`,
},
},
{
expr: `histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, job, instance))`,
output: []string{
`{instance="ins1", job="job1"} => 0.15 @[%v]`,
`{instance="ins2", job="job1"} => 0.1333333333333333 @[%v]`,
`{instance="ins1", job="job2"} => 0.1 @[%v]`,
`{instance="ins2", job="job2"} => 0.1166666666666667 @[%v]`,
},
},
// The unaggregated histogram for comparison. Same result as the previous one.
{
expr: `histogram_quantile(0.3, rate(request_duration_seconds_bucket[5m]))`,
output: []string{
`{instance="ins1", job="job1"} => 0.11 @[%v]`,
`{instance="ins2", job="job1"} => 0.09 @[%v]`,
`{instance="ins1", job="job2"} => 0.06 @[%v]`,
`{instance="ins2", job="job2"} => 0.0675 @[%v]`,
},
},
{
expr: `histogram_quantile(0.5, rate(request_duration_seconds_bucket[5m]))`,
output: []string{
`{instance="ins1", job="job1"} => 0.15 @[%v]`,
`{instance="ins2", job="job1"} => 0.13333333333333333 @[%v]`,
`{instance="ins1", job="job2"} => 0.1 @[%v]`,
`{instance="ins2", job="job2"} => 0.11666666666666667 @[%v]`,
},
},
}
storage, closer := newTestStorage(t)
@ -691,7 +942,7 @@ func TestExpressions(t *testing.T) {
for j, expectedSample := range expectedLines {
found := false
for _, actualSample := range resultLines {
if actualSample == expectedSample {
if samplesAlmostEqual(actualSample, expectedSample) {
found = true
}
}