Add quantile aggregator.

This commit is contained in:
Brian Brazil 2016-07-08 13:48:48 +01:00
parent 15f9fe0a45
commit 0303ccc6a7
7 changed files with 47 additions and 12 deletions

View file

@ -1076,6 +1076,10 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
return vector{} return vector{}
} }
} }
var q float64
if op == itemQuantile {
q = ev.evalFloat(param)
}
var valueLabel model.LabelName var valueLabel model.LabelName
if op == itemCountValues { if op == itemCountValues {
valueLabel = model.LabelName(ev.evalString(param).Value) valueLabel = model.LabelName(ev.evalString(param).Value)
@ -1133,7 +1137,7 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
valuesSquaredSum: s.Value * s.Value, valuesSquaredSum: s.Value * s.Value,
groupCount: 1, groupCount: 1,
} }
if op == itemTopK { if op == itemTopK || op == itemQuantile {
result[groupingKey].heap = make(vectorByValueHeap, 0, k) result[groupingKey].heap = make(vectorByValueHeap, 0, k)
heap.Push(&result[groupingKey].heap, &sample{Value: s.Value, Metric: s.Metric}) heap.Push(&result[groupingKey].heap, &sample{Value: s.Value, Metric: s.Metric})
} else if op == itemBottomK { } else if op == itemBottomK {
@ -1181,6 +1185,8 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
} }
heap.Push(&groupedResult.reverseHeap, &sample{Value: s.Value, Metric: s.Metric}) heap.Push(&groupedResult.reverseHeap, &sample{Value: s.Value, Metric: s.Metric})
} }
case itemQuantile:
groupedResult.heap = append(groupedResult.heap, s)
default: default:
panic(fmt.Errorf("expected aggregation operator but got %q", op)) panic(fmt.Errorf("expected aggregation operator but got %q", op))
} }
@ -1223,6 +1229,8 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
}) })
} }
continue // Bypass default append. continue // Bypass default append.
case itemQuantile:
aggr.value = model.SampleValue(quantile(q, aggr.heap))
default: default:
// For other aggregations, we already have the right value. // For other aggregations, we already have the right value.
} }

View file

@ -490,9 +490,9 @@ func funcQuantileOverTime(ev *evaluator, args Expressions) model.Value {
} }
el.Metric.Del(model.MetricNameLabel) el.Metric.Del(model.MetricNameLabel)
values := make([]float64, 0, len(el.Values)) values := make(vectorByValueHeap, 0, len(el.Values))
for _, v := range el.Values { for _, v := range el.Values {
values = append(values, float64(v.Value)) values = append(values, &sample{Value: v.Value})
} }
resultVector = append(resultVector, &sample{ resultVector = append(resultVector, &sample{
Metric: el.Metric, Metric: el.Metric,
@ -529,7 +529,7 @@ func funcStdvarOverTime(ev *evaluator, args Expressions) model.Value {
avg := sum / count avg := sum / count
return squaredSum/count - avg*avg return squaredSum/count - avg*avg
}) })
}
// === abs(vector model.ValVector) Vector === // === abs(vector model.ValVector) Vector ===
func funcAbs(ev *evaluator, args Expressions) model.Value { func funcAbs(ev *evaluator, args Expressions) model.Value {

View file

@ -59,7 +59,7 @@ func (i itemType) isAggregator() bool { return i > aggregatorsStart && i < aggre
// isAggregator returns true if the item is an aggregator that takes a parameter. // isAggregator returns true if the item is an aggregator that takes a parameter.
// Returns false otherwise // Returns false otherwise
func (i itemType) isAggregatorWithParam() bool { func (i itemType) isAggregatorWithParam() bool {
return i == itemTopK || i == itemBottomK || i == itemCountValues return i == itemTopK || i == itemBottomK || i == itemCountValues || i == itemQuantile
} }
// isKeyword returns true if the item corresponds to a keyword. // isKeyword returns true if the item corresponds to a keyword.
@ -177,6 +177,7 @@ const (
itemTopK itemTopK
itemBottomK itemBottomK
itemCountValues itemCountValues
itemQuantile
aggregatorsEnd aggregatorsEnd
keywordsStart keywordsStart
@ -215,6 +216,7 @@ var key = map[string]itemType{
"topk": itemTopK, "topk": itemTopK,
"bottomk": itemBottomK, "bottomk": itemBottomK,
"count_values": itemCountValues, "count_values": itemCountValues,
"quantile": itemQuantile,
// Keywords. // Keywords.
"alert": itemAlert, "alert": itemAlert,

View file

@ -1042,7 +1042,7 @@ func (p *parser) checkType(node Node) (typ model.ValueType) {
p.errorf("aggregation operator expected in aggregation expression but got %q", n.Op) p.errorf("aggregation operator expected in aggregation expression but got %q", n.Op)
} }
p.expectType(n.Expr, model.ValVector, "aggregation expression") p.expectType(n.Expr, model.ValVector, "aggregation expression")
if n.Op == itemTopK || n.Op == itemBottomK { if n.Op == itemTopK || n.Op == itemBottomK || n.Op == itemQuantile {
p.expectType(n.Param, model.ValScalar, "aggregation parameter") p.expectType(n.Param, model.ValScalar, "aggregation parameter")
} }
if n.Op == itemCountValues { if n.Op == itemCountValues {

View file

@ -107,16 +107,23 @@ func bucketQuantile(q model.SampleValue, buckets buckets) float64 {
return bucketStart + (bucketEnd-bucketStart)*float64(rank/count) return bucketStart + (bucketEnd-bucketStart)*float64(rank/count)
} }
// qauntile calculates the given quantile of a slice of floats. // qauntile calculates the given quantile of a vector of samples.
// The slice will be sorted. //
func quantile(q float64, values []float64) float64 { // The vector will be sorted.
// If 'values' has zero elements, NaN is returned.
// If q<0, -Inf is returned.
// If q>1, +Inf is returned.
func quantile(q float64, values vectorByValueHeap) float64 {
if len(values) == 0 {
return math.NaN()
}
if q < 0 { if q < 0 {
return math.Inf(-1) return math.Inf(-1)
} }
if q > 1 { if q > 1 {
return math.Inf(+1) return math.Inf(+1)
} }
sort.Float64s(values) sort.Sort(values)
n := float64(len(values)) n := float64(len(values))
// When the quantile lies between two samples, // When the quantile lies between two samples,
@ -127,5 +134,5 @@ func quantile(q float64, values []float64) float64 {
upperIndex := math.Min(n-1, lowerIndex+1) upperIndex := math.Min(n-1, lowerIndex+1)
weight := rank - math.Floor(rank) weight := rank - math.Floor(rank)
return values[int(lowerIndex)]*(1-weight) + values[int(upperIndex)]*weight return float64(values[int(lowerIndex)].Value)*(1-weight) + float64(values[int(upperIndex)].Value)*weight
} }

View file

@ -220,3 +220,22 @@ eval instant at 5m count_values by (job, group)("job", version)
{job="6", group="production"} 5 {job="6", group="production"} 5
{job="8", group="canary"} 2 {job="8", group="canary"} 2
{job="7", group="canary"} 2 {job="7", group="canary"} 2
# Tests for quantile.
clear
load 10s
data{test="two samples",point="a"} 0
data{test="two samples",point="b"} 1
data{test="three samples",point="a"} 0
data{test="three samples",point="b"} 1
data{test="three samples",point="c"} 2
data{test="uneven samples",point="a"} 0
data{test="uneven samples",point="b"} 1
data{test="uneven samples",point="c"} 4
eval instant at 1m quantile without(point)(0.8, data)
{test="two samples"} 0.8
{test="three samples"} 1.6
{test="uneven samples"} 2.8

View file

@ -329,4 +329,3 @@ eval instant at 1m quantile_over_time(2, data[1m])
{test="two samples"} +Inf {test="two samples"} +Inf
{test="three samples"} +Inf {test="three samples"} +Inf
{test="uneven samples"} +Inf {test="uneven samples"} +Inf
>>>>>>> Add quantile_over_time function