mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #1799 from prometheus/quantile
Implement quantile and quantile_over_time
This commit is contained in:
commit
c3a7941da7
|
@ -1076,6 +1076,10 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
|
||||||
return vector{}
|
return vector{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
var q float64
|
||||||
|
if op == itemQuantile {
|
||||||
|
q = ev.evalFloat(param)
|
||||||
|
}
|
||||||
var valueLabel model.LabelName
|
var valueLabel model.LabelName
|
||||||
if op == itemCountValues {
|
if op == itemCountValues {
|
||||||
valueLabel = model.LabelName(ev.evalString(param).Value)
|
valueLabel = model.LabelName(ev.evalString(param).Value)
|
||||||
|
@ -1133,7 +1137,7 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
|
||||||
valuesSquaredSum: s.Value * s.Value,
|
valuesSquaredSum: s.Value * s.Value,
|
||||||
groupCount: 1,
|
groupCount: 1,
|
||||||
}
|
}
|
||||||
if op == itemTopK {
|
if op == itemTopK || op == itemQuantile {
|
||||||
result[groupingKey].heap = make(vectorByValueHeap, 0, k)
|
result[groupingKey].heap = make(vectorByValueHeap, 0, k)
|
||||||
heap.Push(&result[groupingKey].heap, &sample{Value: s.Value, Metric: s.Metric})
|
heap.Push(&result[groupingKey].heap, &sample{Value: s.Value, Metric: s.Metric})
|
||||||
} else if op == itemBottomK {
|
} else if op == itemBottomK {
|
||||||
|
@ -1181,6 +1185,8 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
|
||||||
}
|
}
|
||||||
heap.Push(&groupedResult.reverseHeap, &sample{Value: s.Value, Metric: s.Metric})
|
heap.Push(&groupedResult.reverseHeap, &sample{Value: s.Value, Metric: s.Metric})
|
||||||
}
|
}
|
||||||
|
case itemQuantile:
|
||||||
|
groupedResult.heap = append(groupedResult.heap, s)
|
||||||
default:
|
default:
|
||||||
panic(fmt.Errorf("expected aggregation operator but got %q", op))
|
panic(fmt.Errorf("expected aggregation operator but got %q", op))
|
||||||
}
|
}
|
||||||
|
@ -1223,6 +1229,8 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
continue // Bypass default append.
|
continue // Bypass default append.
|
||||||
|
case itemQuantile:
|
||||||
|
aggr.value = model.SampleValue(quantile(q, aggr.heap))
|
||||||
default:
|
default:
|
||||||
// For other aggregations, we already have the right value.
|
// For other aggregations, we already have the right value.
|
||||||
}
|
}
|
||||||
|
|
|
@ -478,6 +478,31 @@ func funcSumOverTime(ev *evaluator, args Expressions) model.Value {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// === quantile_over_time(matrix model.ValMatrix) Vector ===
|
||||||
|
func funcQuantileOverTime(ev *evaluator, args Expressions) model.Value {
|
||||||
|
q := ev.evalFloat(args[0])
|
||||||
|
mat := ev.evalMatrix(args[1])
|
||||||
|
resultVector := vector{}
|
||||||
|
|
||||||
|
for _, el := range mat {
|
||||||
|
if len(el.Values) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
el.Metric.Del(model.MetricNameLabel)
|
||||||
|
values := make(vectorByValueHeap, 0, len(el.Values))
|
||||||
|
for _, v := range el.Values {
|
||||||
|
values = append(values, &sample{Value: v.Value})
|
||||||
|
}
|
||||||
|
resultVector = append(resultVector, &sample{
|
||||||
|
Metric: el.Metric,
|
||||||
|
Value: model.SampleValue(quantile(q, values)),
|
||||||
|
Timestamp: ev.Timestamp,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return resultVector
|
||||||
|
}
|
||||||
|
|
||||||
// === stddev_over_time(matrix model.ValMatrix) Vector ===
|
// === stddev_over_time(matrix model.ValMatrix) Vector ===
|
||||||
func funcStddevOverTime(ev *evaluator, args Expressions) model.Value {
|
func funcStddevOverTime(ev *evaluator, args Expressions) model.Value {
|
||||||
return aggrOverTime(ev, args, func(values []model.SamplePair) model.SampleValue {
|
return aggrOverTime(ev, args, func(values []model.SamplePair) model.SampleValue {
|
||||||
|
@ -705,7 +730,7 @@ func funcHistogramQuantile(ev *evaluator, args Expressions) model.Value {
|
||||||
for _, mb := range signatureToMetricWithBuckets {
|
for _, mb := range signatureToMetricWithBuckets {
|
||||||
outVec = append(outVec, &sample{
|
outVec = append(outVec, &sample{
|
||||||
Metric: mb.metric,
|
Metric: mb.metric,
|
||||||
Value: model.SampleValue(quantile(q, mb.buckets)),
|
Value: model.SampleValue(bucketQuantile(q, mb.buckets)),
|
||||||
Timestamp: ev.Timestamp,
|
Timestamp: ev.Timestamp,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -973,6 +998,12 @@ var functions = map[string]*Function{
|
||||||
ReturnType: model.ValVector,
|
ReturnType: model.ValVector,
|
||||||
Call: funcPredictLinear,
|
Call: funcPredictLinear,
|
||||||
},
|
},
|
||||||
|
"quantile_over_time": {
|
||||||
|
Name: "quantile_over_time",
|
||||||
|
ArgTypes: []model.ValueType{model.ValScalar, model.ValMatrix},
|
||||||
|
ReturnType: model.ValVector,
|
||||||
|
Call: funcQuantileOverTime,
|
||||||
|
},
|
||||||
"rate": {
|
"rate": {
|
||||||
Name: "rate",
|
Name: "rate",
|
||||||
ArgTypes: []model.ValueType{model.ValMatrix},
|
ArgTypes: []model.ValueType{model.ValMatrix},
|
||||||
|
|
|
@ -59,7 +59,7 @@ func (i itemType) isAggregator() bool { return i > aggregatorsStart && i < aggre
|
||||||
// isAggregator returns true if the item is an aggregator that takes a parameter.
|
// isAggregator returns true if the item is an aggregator that takes a parameter.
|
||||||
// Returns false otherwise
|
// Returns false otherwise
|
||||||
func (i itemType) isAggregatorWithParam() bool {
|
func (i itemType) isAggregatorWithParam() bool {
|
||||||
return i == itemTopK || i == itemBottomK || i == itemCountValues
|
return i == itemTopK || i == itemBottomK || i == itemCountValues || i == itemQuantile
|
||||||
}
|
}
|
||||||
|
|
||||||
// isKeyword returns true if the item corresponds to a keyword.
|
// isKeyword returns true if the item corresponds to a keyword.
|
||||||
|
@ -177,6 +177,7 @@ const (
|
||||||
itemTopK
|
itemTopK
|
||||||
itemBottomK
|
itemBottomK
|
||||||
itemCountValues
|
itemCountValues
|
||||||
|
itemQuantile
|
||||||
aggregatorsEnd
|
aggregatorsEnd
|
||||||
|
|
||||||
keywordsStart
|
keywordsStart
|
||||||
|
@ -215,6 +216,7 @@ var key = map[string]itemType{
|
||||||
"topk": itemTopK,
|
"topk": itemTopK,
|
||||||
"bottomk": itemBottomK,
|
"bottomk": itemBottomK,
|
||||||
"count_values": itemCountValues,
|
"count_values": itemCountValues,
|
||||||
|
"quantile": itemQuantile,
|
||||||
|
|
||||||
// Keywords.
|
// Keywords.
|
||||||
"alert": itemAlert,
|
"alert": itemAlert,
|
||||||
|
|
|
@ -1042,7 +1042,7 @@ func (p *parser) checkType(node Node) (typ model.ValueType) {
|
||||||
p.errorf("aggregation operator expected in aggregation expression but got %q", n.Op)
|
p.errorf("aggregation operator expected in aggregation expression but got %q", n.Op)
|
||||||
}
|
}
|
||||||
p.expectType(n.Expr, model.ValVector, "aggregation expression")
|
p.expectType(n.Expr, model.ValVector, "aggregation expression")
|
||||||
if n.Op == itemTopK || n.Op == itemBottomK {
|
if n.Op == itemTopK || n.Op == itemBottomK || n.Op == itemQuantile {
|
||||||
p.expectType(n.Param, model.ValScalar, "aggregation parameter")
|
p.expectType(n.Param, model.ValScalar, "aggregation parameter")
|
||||||
}
|
}
|
||||||
if n.Op == itemCountValues {
|
if n.Op == itemCountValues {
|
||||||
|
|
|
@ -48,16 +48,16 @@ type metricWithBuckets struct {
|
||||||
buckets buckets
|
buckets buckets
|
||||||
}
|
}
|
||||||
|
|
||||||
// quantile calculates the quantile 'q' based on the given buckets. The buckets
|
// bucketQuantile calculates the quantile 'q' based on the given buckets. The
|
||||||
// will be sorted by upperBound by this function (i.e. no sorting needed before
|
// buckets will be sorted by upperBound by this function (i.e. no sorting
|
||||||
// calling this function). The quantile value is interpolated assuming a linear
|
// needed before calling this function). The quantile value is interpolated
|
||||||
// distribution within a bucket. However, if the quantile falls into the highest
|
// assuming a linear distribution within a bucket. However, if the quantile
|
||||||
// bucket, the upper bound of the 2nd highest bucket is returned. A natural
|
// falls into the highest bucket, the upper bound of the 2nd highest bucket is
|
||||||
// lower bound of 0 is assumed if the upper bound of the lowest bucket is
|
// returned. A natural lower bound of 0 is assumed if the upper bound of the
|
||||||
// greater 0. In that case, interpolation in the lowest bucket happens linearly
|
// lowest bucket is greater 0. In that case, interpolation in the lowest bucket
|
||||||
// between 0 and the upper bound of the lowest bucket. However, if the lowest
|
// happens linearly between 0 and the upper bound of the lowest bucket.
|
||||||
// bucket has an upper bound less or equal 0, this upper bound is returned if
|
// However, if the lowest bucket has an upper bound less or equal 0, this upper
|
||||||
// the quantile falls into the lowest bucket.
|
// bound is returned if the quantile falls into the lowest bucket.
|
||||||
//
|
//
|
||||||
// There are a number of special cases (once we have a way to report errors
|
// There are a number of special cases (once we have a way to report errors
|
||||||
// happening during evaluations of AST functions, we should report those
|
// happening during evaluations of AST functions, we should report those
|
||||||
|
@ -70,7 +70,7 @@ type metricWithBuckets struct {
|
||||||
// If q<0, -Inf is returned.
|
// If q<0, -Inf is returned.
|
||||||
//
|
//
|
||||||
// If q>1, +Inf is returned.
|
// If q>1, +Inf is returned.
|
||||||
func quantile(q model.SampleValue, buckets buckets) float64 {
|
func bucketQuantile(q model.SampleValue, buckets buckets) float64 {
|
||||||
if q < 0 {
|
if q < 0 {
|
||||||
return math.Inf(-1)
|
return math.Inf(-1)
|
||||||
}
|
}
|
||||||
|
@ -106,3 +106,33 @@ func quantile(q model.SampleValue, buckets buckets) float64 {
|
||||||
}
|
}
|
||||||
return bucketStart + (bucketEnd-bucketStart)*float64(rank/count)
|
return bucketStart + (bucketEnd-bucketStart)*float64(rank/count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// qauntile calculates the given quantile of a vector of samples.
|
||||||
|
//
|
||||||
|
// The vector will be sorted.
|
||||||
|
// If 'values' has zero elements, NaN is returned.
|
||||||
|
// If q<0, -Inf is returned.
|
||||||
|
// If q>1, +Inf is returned.
|
||||||
|
func quantile(q float64, values vectorByValueHeap) float64 {
|
||||||
|
if len(values) == 0 {
|
||||||
|
return math.NaN()
|
||||||
|
}
|
||||||
|
if q < 0 {
|
||||||
|
return math.Inf(-1)
|
||||||
|
}
|
||||||
|
if q > 1 {
|
||||||
|
return math.Inf(+1)
|
||||||
|
}
|
||||||
|
sort.Sort(values)
|
||||||
|
|
||||||
|
n := float64(len(values))
|
||||||
|
// When the quantile lies between two samples,
|
||||||
|
// we use a weighted average of the two samples.
|
||||||
|
rank := q * (n - 1)
|
||||||
|
|
||||||
|
lowerIndex := math.Max(0, math.Floor(rank))
|
||||||
|
upperIndex := math.Min(n-1, lowerIndex+1)
|
||||||
|
|
||||||
|
weight := rank - math.Floor(rank)
|
||||||
|
return float64(values[int(lowerIndex)].Value)*(1-weight) + float64(values[int(upperIndex)].Value)*weight
|
||||||
|
}
|
||||||
|
|
19
promql/testdata/aggregators.test
vendored
19
promql/testdata/aggregators.test
vendored
|
@ -220,3 +220,22 @@ eval instant at 5m count_values by (job, group)("job", version)
|
||||||
{job="6", group="production"} 5
|
{job="6", group="production"} 5
|
||||||
{job="8", group="canary"} 2
|
{job="8", group="canary"} 2
|
||||||
{job="7", group="canary"} 2
|
{job="7", group="canary"} 2
|
||||||
|
|
||||||
|
|
||||||
|
# Tests for quantile.
|
||||||
|
clear
|
||||||
|
|
||||||
|
load 10s
|
||||||
|
data{test="two samples",point="a"} 0
|
||||||
|
data{test="two samples",point="b"} 1
|
||||||
|
data{test="three samples",point="a"} 0
|
||||||
|
data{test="three samples",point="b"} 1
|
||||||
|
data{test="three samples",point="c"} 2
|
||||||
|
data{test="uneven samples",point="a"} 0
|
||||||
|
data{test="uneven samples",point="b"} 1
|
||||||
|
data{test="uneven samples",point="c"} 4
|
||||||
|
|
||||||
|
eval instant at 1m quantile without(point)(0.8, data)
|
||||||
|
{test="two samples"} 0.8
|
||||||
|
{test="three samples"} 1.6
|
||||||
|
{test="uneven samples"} 2.8
|
||||||
|
|
44
promql/testdata/functions.test
vendored
44
promql/testdata/functions.test
vendored
|
@ -276,7 +276,6 @@ eval instant at 8000s holt_winters(http_requests[1m], 0.01, 0.1)
|
||||||
{job="api-server", instance="0", group="canary"} 24000
|
{job="api-server", instance="0", group="canary"} 24000
|
||||||
{job="api-server", instance="1", group="canary"} -32000
|
{job="api-server", instance="1", group="canary"} -32000
|
||||||
|
|
||||||
|
|
||||||
# Tests for stddev_over_time and stdvar_over_time.
|
# Tests for stddev_over_time and stdvar_over_time.
|
||||||
clear
|
clear
|
||||||
load 10s
|
load 10s
|
||||||
|
@ -287,3 +286,46 @@ eval instant at 1m stdvar_over_time(metric[1m])
|
||||||
|
|
||||||
eval instant at 1m stddev_over_time(metric[1m])
|
eval instant at 1m stddev_over_time(metric[1m])
|
||||||
{} 3.249615
|
{} 3.249615
|
||||||
|
|
||||||
|
# Tests for quantile_over_time
|
||||||
|
clear
|
||||||
|
|
||||||
|
load 10s
|
||||||
|
data{test="two samples"} 0 1
|
||||||
|
data{test="three samples"} 0 1 2
|
||||||
|
data{test="uneven samples"} 0 1 4
|
||||||
|
|
||||||
|
eval instant at 1m quantile_over_time(0, data[1m])
|
||||||
|
{test="two samples"} 0
|
||||||
|
{test="three samples"} 0
|
||||||
|
{test="uneven samples"} 0
|
||||||
|
|
||||||
|
eval instant at 1m quantile_over_time(0.5, data[1m])
|
||||||
|
{test="two samples"} 0.5
|
||||||
|
{test="three samples"} 1
|
||||||
|
{test="uneven samples"} 1
|
||||||
|
|
||||||
|
eval instant at 1m quantile_over_time(0.75, data[1m])
|
||||||
|
{test="two samples"} 0.75
|
||||||
|
{test="three samples"} 1.5
|
||||||
|
{test="uneven samples"} 2.5
|
||||||
|
|
||||||
|
eval instant at 1m quantile_over_time(0.8, data[1m])
|
||||||
|
{test="two samples"} 0.8
|
||||||
|
{test="three samples"} 1.6
|
||||||
|
{test="uneven samples"} 2.8
|
||||||
|
|
||||||
|
eval instant at 1m quantile_over_time(1, data[1m])
|
||||||
|
{test="two samples"} 1
|
||||||
|
{test="three samples"} 2
|
||||||
|
{test="uneven samples"} 4
|
||||||
|
|
||||||
|
eval instant at 1m quantile_over_time(-1, data[1m])
|
||||||
|
{test="two samples"} -Inf
|
||||||
|
{test="three samples"} -Inf
|
||||||
|
{test="uneven samples"} -Inf
|
||||||
|
|
||||||
|
eval instant at 1m quantile_over_time(2, data[1m])
|
||||||
|
{test="two samples"} +Inf
|
||||||
|
{test="three samples"} +Inf
|
||||||
|
{test="uneven samples"} +Inf
|
||||||
|
|
Loading…
Reference in a new issue