diff --git a/promql/engine.go b/promql/engine.go index 33c148056..603776d9d 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1076,6 +1076,10 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without return vector{} } } + var q float64 + if op == itemQuantile { + q = ev.evalFloat(param) + } var valueLabel model.LabelName if op == itemCountValues { valueLabel = model.LabelName(ev.evalString(param).Value) @@ -1133,7 +1137,7 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without valuesSquaredSum: s.Value * s.Value, groupCount: 1, } - if op == itemTopK { + if op == itemTopK || op == itemQuantile { result[groupingKey].heap = make(vectorByValueHeap, 0, k) heap.Push(&result[groupingKey].heap, &sample{Value: s.Value, Metric: s.Metric}) } else if op == itemBottomK { @@ -1181,6 +1185,8 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without } heap.Push(&groupedResult.reverseHeap, &sample{Value: s.Value, Metric: s.Metric}) } + case itemQuantile: + groupedResult.heap = append(groupedResult.heap, s) default: panic(fmt.Errorf("expected aggregation operator but got %q", op)) } @@ -1223,6 +1229,8 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without }) } continue // Bypass default append. + case itemQuantile: + aggr.value = model.SampleValue(quantile(q, aggr.heap)) default: // For other aggregations, we already have the right value. } diff --git a/promql/functions.go b/promql/functions.go index 6505cab47..ff54e24d7 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -478,6 +478,31 @@ func funcSumOverTime(ev *evaluator, args Expressions) model.Value { }) } +// === quantile_over_time(matrix model.ValMatrix) Vector === +func funcQuantileOverTime(ev *evaluator, args Expressions) model.Value { + q := ev.evalFloat(args[0]) + mat := ev.evalMatrix(args[1]) + resultVector := vector{} + + for _, el := range mat { + if len(el.Values) == 0 { + continue + } + + el.Metric.Del(model.MetricNameLabel) + values := make(vectorByValueHeap, 0, len(el.Values)) + for _, v := range el.Values { + values = append(values, &sample{Value: v.Value}) + } + resultVector = append(resultVector, &sample{ + Metric: el.Metric, + Value: model.SampleValue(quantile(q, values)), + Timestamp: ev.Timestamp, + }) + } + return resultVector +} + // === stddev_over_time(matrix model.ValMatrix) Vector === func funcStddevOverTime(ev *evaluator, args Expressions) model.Value { return aggrOverTime(ev, args, func(values []model.SamplePair) model.SampleValue { @@ -705,7 +730,7 @@ func funcHistogramQuantile(ev *evaluator, args Expressions) model.Value { for _, mb := range signatureToMetricWithBuckets { outVec = append(outVec, &sample{ Metric: mb.metric, - Value: model.SampleValue(quantile(q, mb.buckets)), + Value: model.SampleValue(bucketQuantile(q, mb.buckets)), Timestamp: ev.Timestamp, }) } @@ -973,6 +998,12 @@ var functions = map[string]*Function{ ReturnType: model.ValVector, Call: funcPredictLinear, }, + "quantile_over_time": { + Name: "quantile_over_time", + ArgTypes: []model.ValueType{model.ValScalar, model.ValMatrix}, + ReturnType: model.ValVector, + Call: funcQuantileOverTime, + }, "rate": { Name: "rate", ArgTypes: []model.ValueType{model.ValMatrix}, diff --git a/promql/lex.go b/promql/lex.go index dfffef20f..5bbe84364 100644 --- a/promql/lex.go +++ b/promql/lex.go @@ -59,7 +59,7 @@ func (i itemType) isAggregator() bool { return i > aggregatorsStart && i < aggre // isAggregator returns true if the item is an aggregator that takes a parameter. // Returns false otherwise func (i itemType) isAggregatorWithParam() bool { - return i == itemTopK || i == itemBottomK || i == itemCountValues + return i == itemTopK || i == itemBottomK || i == itemCountValues || i == itemQuantile } // isKeyword returns true if the item corresponds to a keyword. @@ -177,6 +177,7 @@ const ( itemTopK itemBottomK itemCountValues + itemQuantile aggregatorsEnd keywordsStart @@ -215,6 +216,7 @@ var key = map[string]itemType{ "topk": itemTopK, "bottomk": itemBottomK, "count_values": itemCountValues, + "quantile": itemQuantile, // Keywords. "alert": itemAlert, diff --git a/promql/parse.go b/promql/parse.go index 92f20918b..dbdc662ec 100644 --- a/promql/parse.go +++ b/promql/parse.go @@ -1042,7 +1042,7 @@ func (p *parser) checkType(node Node) (typ model.ValueType) { p.errorf("aggregation operator expected in aggregation expression but got %q", n.Op) } p.expectType(n.Expr, model.ValVector, "aggregation expression") - if n.Op == itemTopK || n.Op == itemBottomK { + if n.Op == itemTopK || n.Op == itemBottomK || n.Op == itemQuantile { p.expectType(n.Param, model.ValScalar, "aggregation parameter") } if n.Op == itemCountValues { diff --git a/promql/quantile.go b/promql/quantile.go index ca7af50d5..a4a6f136b 100644 --- a/promql/quantile.go +++ b/promql/quantile.go @@ -48,16 +48,16 @@ type metricWithBuckets struct { buckets buckets } -// quantile calculates the quantile 'q' based on the given buckets. The buckets -// will be sorted by upperBound by this function (i.e. no sorting needed before -// calling this function). The quantile value is interpolated assuming a linear -// distribution within a bucket. However, if the quantile falls into the highest -// bucket, the upper bound of the 2nd highest bucket is returned. A natural -// lower bound of 0 is assumed if the upper bound of the lowest bucket is -// greater 0. In that case, interpolation in the lowest bucket happens linearly -// between 0 and the upper bound of the lowest bucket. However, if the lowest -// bucket has an upper bound less or equal 0, this upper bound is returned if -// the quantile falls into the lowest bucket. +// bucketQuantile calculates the quantile 'q' based on the given buckets. The +// buckets will be sorted by upperBound by this function (i.e. no sorting +// needed before calling this function). The quantile value is interpolated +// assuming a linear distribution within a bucket. However, if the quantile +// falls into the highest bucket, the upper bound of the 2nd highest bucket is +// returned. A natural lower bound of 0 is assumed if the upper bound of the +// lowest bucket is greater 0. In that case, interpolation in the lowest bucket +// happens linearly between 0 and the upper bound of the lowest bucket. +// However, if the lowest bucket has an upper bound less or equal 0, this upper +// bound is returned if the quantile falls into the lowest bucket. // // There are a number of special cases (once we have a way to report errors // happening during evaluations of AST functions, we should report those @@ -70,7 +70,7 @@ type metricWithBuckets struct { // If q<0, -Inf is returned. // // If q>1, +Inf is returned. -func quantile(q model.SampleValue, buckets buckets) float64 { +func bucketQuantile(q model.SampleValue, buckets buckets) float64 { if q < 0 { return math.Inf(-1) } @@ -106,3 +106,33 @@ func quantile(q model.SampleValue, buckets buckets) float64 { } return bucketStart + (bucketEnd-bucketStart)*float64(rank/count) } + +// qauntile calculates the given quantile of a vector of samples. +// +// The vector will be sorted. +// If 'values' has zero elements, NaN is returned. +// If q<0, -Inf is returned. +// If q>1, +Inf is returned. +func quantile(q float64, values vectorByValueHeap) float64 { + if len(values) == 0 { + return math.NaN() + } + if q < 0 { + return math.Inf(-1) + } + if q > 1 { + return math.Inf(+1) + } + sort.Sort(values) + + n := float64(len(values)) + // When the quantile lies between two samples, + // we use a weighted average of the two samples. + rank := q * (n - 1) + + lowerIndex := math.Max(0, math.Floor(rank)) + upperIndex := math.Min(n-1, lowerIndex+1) + + weight := rank - math.Floor(rank) + return float64(values[int(lowerIndex)].Value)*(1-weight) + float64(values[int(upperIndex)].Value)*weight +} diff --git a/promql/testdata/aggregators.test b/promql/testdata/aggregators.test index 0d5c10ac1..2f590e456 100644 --- a/promql/testdata/aggregators.test +++ b/promql/testdata/aggregators.test @@ -220,3 +220,22 @@ eval instant at 5m count_values by (job, group)("job", version) {job="6", group="production"} 5 {job="8", group="canary"} 2 {job="7", group="canary"} 2 + + +# Tests for quantile. +clear + +load 10s + data{test="two samples",point="a"} 0 + data{test="two samples",point="b"} 1 + data{test="three samples",point="a"} 0 + data{test="three samples",point="b"} 1 + data{test="three samples",point="c"} 2 + data{test="uneven samples",point="a"} 0 + data{test="uneven samples",point="b"} 1 + data{test="uneven samples",point="c"} 4 + +eval instant at 1m quantile without(point)(0.8, data) + {test="two samples"} 0.8 + {test="three samples"} 1.6 + {test="uneven samples"} 2.8 diff --git a/promql/testdata/functions.test b/promql/testdata/functions.test index 4b73e2b8c..58b1ee572 100644 --- a/promql/testdata/functions.test +++ b/promql/testdata/functions.test @@ -276,7 +276,6 @@ eval instant at 8000s holt_winters(http_requests[1m], 0.01, 0.1) {job="api-server", instance="0", group="canary"} 24000 {job="api-server", instance="1", group="canary"} -32000 - # Tests for stddev_over_time and stdvar_over_time. clear load 10s @@ -287,3 +286,46 @@ eval instant at 1m stdvar_over_time(metric[1m]) eval instant at 1m stddev_over_time(metric[1m]) {} 3.249615 + +# Tests for quantile_over_time +clear + +load 10s + data{test="two samples"} 0 1 + data{test="three samples"} 0 1 2 + data{test="uneven samples"} 0 1 4 + +eval instant at 1m quantile_over_time(0, data[1m]) + {test="two samples"} 0 + {test="three samples"} 0 + {test="uneven samples"} 0 + +eval instant at 1m quantile_over_time(0.5, data[1m]) + {test="two samples"} 0.5 + {test="three samples"} 1 + {test="uneven samples"} 1 + +eval instant at 1m quantile_over_time(0.75, data[1m]) + {test="two samples"} 0.75 + {test="three samples"} 1.5 + {test="uneven samples"} 2.5 + +eval instant at 1m quantile_over_time(0.8, data[1m]) + {test="two samples"} 0.8 + {test="three samples"} 1.6 + {test="uneven samples"} 2.8 + +eval instant at 1m quantile_over_time(1, data[1m]) + {test="two samples"} 1 + {test="three samples"} 2 + {test="uneven samples"} 4 + +eval instant at 1m quantile_over_time(-1, data[1m]) + {test="two samples"} -Inf + {test="three samples"} -Inf + {test="uneven samples"} -Inf + +eval instant at 1m quantile_over_time(2, data[1m]) + {test="two samples"} +Inf + {test="three samples"} +Inf + {test="uneven samples"} +Inf