diff --git a/promql/ast.go b/promql/ast.go index 3f929bbe2..e1d62f8af 100644 --- a/promql/ast.go +++ b/promql/ast.go @@ -103,6 +103,7 @@ type Expressions []Expr type AggregateExpr struct { Op itemType // The used aggregation operation. Expr Expr // The vector expression over which is aggregated. + Param Expr // Parameter used by some aggregators. Grouping model.LabelNames // The labels by which to group the vector. Without bool // Whether to drop the given labels rather than keep them. KeepCommonLabels bool // Whether to keep common labels among result elements. diff --git a/promql/engine.go b/promql/engine.go index 3a9fc8fac..b987b002b 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -14,6 +14,7 @@ package promql import ( + "container/heap" "fmt" "math" "runtime" @@ -610,7 +611,7 @@ func (ev *evaluator) eval(expr Expr) model.Value { switch e := expr.(type) { case *AggregateExpr: vector := ev.evalVector(e.Expr) - return ev.aggregation(e.Op, e.Grouping, e.Without, e.KeepCommonLabels, vector) + return ev.aggregation(e.Op, e.Grouping, e.Without, e.KeepCommonLabels, e.Param, vector) case *BinaryExpr: lhs := ev.evalOneOf(e.LHS, model.ValScalar, model.ValVector) @@ -1060,15 +1061,24 @@ type groupedAggregation struct { value model.SampleValue valuesSquaredSum model.SampleValue groupCount int + heap vectorByValueHeap + reverseHeap vectorByReverseValueHeap } // aggregation evaluates an aggregation operation on a vector. -func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without bool, keepCommon bool, vec vector) vector { +func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without bool, keepCommon bool, param Expr, vec vector) vector { result := map[uint64]*groupedAggregation{} + var k int + if op == itemTopK || op == itemBottomK { + k = ev.evalInt(param) + if k < 1 { + return vector{} + } + } - for _, sample := range vec { - withoutMetric := sample.Metric + for _, s := range vec { + withoutMetric := s.Metric if without { for _, l := range grouping { withoutMetric.Del(l) @@ -1080,7 +1090,7 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without if without { groupingKey = uint64(withoutMetric.Metric.Fingerprint()) } else { - groupingKey = model.SignatureForLabels(sample.Metric.Metric, grouping...) + groupingKey = model.SignatureForLabels(s.Metric.Metric, grouping...) } groupedResult, ok := result[groupingKey] @@ -1088,7 +1098,7 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without if !ok { var m metric.Metric if keepCommon { - m = sample.Metric + m = s.Metric m.Del(model.MetricNameLabel) } else if without { m = withoutMetric @@ -1098,44 +1108,65 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without Copied: true, } for _, l := range grouping { - if v, ok := sample.Metric.Metric[l]; ok { + if v, ok := s.Metric.Metric[l]; ok { m.Set(l, v) } } } result[groupingKey] = &groupedAggregation{ labels: m, - value: sample.Value, - valuesSquaredSum: sample.Value * sample.Value, + value: s.Value, + valuesSquaredSum: s.Value * s.Value, groupCount: 1, } + if op == itemTopK { + result[groupingKey].heap = make(vectorByValueHeap, 0, k) + heap.Push(&result[groupingKey].heap, &sample{Value: s.Value, Metric: s.Metric}) + } else if op == itemBottomK { + result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 0, k) + heap.Push(&result[groupingKey].reverseHeap, &sample{Value: s.Value, Metric: s.Metric}) + } continue } // Add the sample to the existing group. if keepCommon { - groupedResult.labels = labelIntersection(groupedResult.labels, sample.Metric) + groupedResult.labels = labelIntersection(groupedResult.labels, s.Metric) } switch op { case itemSum: - groupedResult.value += sample.Value + groupedResult.value += s.Value case itemAvg: - groupedResult.value += sample.Value + groupedResult.value += s.Value groupedResult.groupCount++ case itemMax: - if groupedResult.value < sample.Value || math.IsNaN(float64(groupedResult.value)) { - groupedResult.value = sample.Value + if groupedResult.value < s.Value || math.IsNaN(float64(groupedResult.value)) { + groupedResult.value = s.Value } case itemMin: - if groupedResult.value > sample.Value || math.IsNaN(float64(groupedResult.value)) { - groupedResult.value = sample.Value + if groupedResult.value > s.Value || math.IsNaN(float64(groupedResult.value)) { + groupedResult.value = s.Value } case itemCount: groupedResult.groupCount++ case itemStdvar, itemStddev: - groupedResult.value += sample.Value - groupedResult.valuesSquaredSum += sample.Value * sample.Value + groupedResult.value += s.Value + groupedResult.valuesSquaredSum += s.Value * s.Value groupedResult.groupCount++ + case itemTopK: + if len(groupedResult.heap) < k || groupedResult.heap[0].Value < s.Value || math.IsNaN(float64(groupedResult.heap[0].Value)) { + if len(groupedResult.heap) == k { + heap.Pop(&groupedResult.heap) + } + heap.Push(&groupedResult.heap, &sample{Value: s.Value, Metric: s.Metric}) + } + case itemBottomK: + if len(groupedResult.reverseHeap) < k || groupedResult.reverseHeap[0].Value > s.Value || math.IsNaN(float64(groupedResult.reverseHeap[0].Value)) { + if len(groupedResult.reverseHeap) == k { + heap.Pop(&groupedResult.reverseHeap) + } + heap.Push(&groupedResult.reverseHeap, &sample{Value: s.Value, Metric: s.Metric}) + } default: panic(fmt.Errorf("expected aggregation operator but got %q", op)) } @@ -1156,6 +1187,28 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without case itemStddev: avg := float64(aggr.value) / float64(aggr.groupCount) aggr.value = model.SampleValue(math.Sqrt(float64(aggr.valuesSquaredSum)/float64(aggr.groupCount) - avg*avg)) + case itemTopK: + // The heap keeps the lowest value on top, so reverse it. + sort.Sort(sort.Reverse(aggr.heap)) + for _, v := range aggr.heap { + resultVector = append(resultVector, &sample{ + Metric: v.Metric, + Value: v.Value, + Timestamp: ev.Timestamp, + }) + } + continue // Bypass default append. + case itemBottomK: + // The heap keeps the lowest value on top, so reverse it. + sort.Sort(sort.Reverse(aggr.reverseHeap)) + for _, v := range aggr.reverseHeap { + resultVector = append(resultVector, &sample{ + Metric: v.Metric, + Value: v.Value, + Timestamp: ev.Timestamp, + }) + } + continue // Bypass default append. default: // For other aggregations, we already have the right value. } diff --git a/promql/functions.go b/promql/functions.go index 71cb1d1df..8bd50667e 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -14,7 +14,6 @@ package promql import ( - "container/heap" "math" "regexp" "sort" @@ -298,52 +297,6 @@ func funcSortDesc(ev *evaluator, args Expressions) model.Value { return vector(byValueSorter) } -// === topk(k model.ValScalar, node model.ValVector) Vector === -func funcTopk(ev *evaluator, args Expressions) model.Value { - k := ev.evalInt(args[0]) - if k < 1 { - return vector{} - } - vec := ev.evalVector(args[1]) - - topk := make(vectorByValueHeap, 0, k) - - for _, el := range vec { - if len(topk) < k || topk[0].Value < el.Value || math.IsNaN(float64(topk[0].Value)) { - if len(topk) == k { - heap.Pop(&topk) - } - heap.Push(&topk, el) - } - } - // The heap keeps the lowest value on top, so reverse it. - sort.Sort(sort.Reverse(topk)) - return vector(topk) -} - -// === bottomk(k model.ValScalar, node model.ValVector) Vector === -func funcBottomk(ev *evaluator, args Expressions) model.Value { - k := ev.evalInt(args[0]) - if k < 1 { - return vector{} - } - vec := ev.evalVector(args[1]) - - bottomk := make(vectorByReverseValueHeap, 0, k) - - for _, el := range vec { - if len(bottomk) < k || bottomk[0].Value > el.Value || math.IsNaN(float64(bottomk[0].Value)) { - if len(bottomk) == k { - heap.Pop(&bottomk) - } - heap.Push(&bottomk, el) - } - } - // The heap keeps the highest value on top, so reverse it. - sort.Sort(sort.Reverse(bottomk)) - return vector(bottomk) -} - // === clamp_max(vector model.ValVector, max Scalar) Vector === func funcClampMax(ev *evaluator, args Expressions) model.Value { vec := ev.evalVector(args[0]) @@ -866,12 +819,6 @@ var functions = map[string]*Function{ ReturnType: model.ValVector, Call: funcAvgOverTime, }, - "bottomk": { - Name: "bottomk", - ArgTypes: []model.ValueType{model.ValScalar, model.ValVector}, - ReturnType: model.ValVector, - Call: funcBottomk, - }, "ceil": { Name: "ceil", ArgTypes: []model.ValueType{model.ValVector}, @@ -1053,12 +1000,6 @@ var functions = map[string]*Function{ ReturnType: model.ValScalar, Call: funcTime, }, - "topk": { - Name: "topk", - ArgTypes: []model.ValueType{model.ValScalar, model.ValVector}, - ReturnType: model.ValVector, - Call: funcTopk, - }, "vector": { Name: "vector", ArgTypes: []model.ValueType{model.ValScalar}, diff --git a/promql/lex.go b/promql/lex.go index d62501198..2d4f6819f 100644 --- a/promql/lex.go +++ b/promql/lex.go @@ -58,6 +58,10 @@ func (i itemType) isOperator() bool { return i > operatorsStart && i < operators // Returns false otherwise func (i itemType) isAggregator() bool { return i > aggregatorsStart && i < aggregatorsEnd } +// isAggregator returns true if the item is an aggregator that takes a parameter. +// Returns false otherwise +func (i itemType) isAggregatorWithParam() bool { return i == itemTopK || i == itemBottomK } + // isKeyword returns true if the item corresponds to a keyword. // Returns false otherwise. func (i itemType) isKeyword() bool { return i > keywordsStart && i < keywordsEnd } @@ -170,6 +174,8 @@ const ( itemMax itemStddev itemStdvar + itemTopK + itemBottomK aggregatorsEnd keywordsStart @@ -203,13 +209,15 @@ var key = map[string]itemType{ "unless": itemLUnless, // Aggregators. - "sum": itemSum, - "avg": itemAvg, - "count": itemCount, - "min": itemMin, - "max": itemMax, - "stddev": itemStddev, - "stdvar": itemStdvar, + "sum": itemSum, + "avg": itemAvg, + "count": itemCount, + "min": itemMin, + "max": itemMax, + "stddev": itemStddev, + "stdvar": itemStdvar, + "topk": itemTopK, + "bottomk": itemBottomK, // Keywords. "alert": itemAlert, diff --git a/promql/parse.go b/promql/parse.go index da9c383b3..68d2e1851 100644 --- a/promql/parse.go +++ b/promql/parse.go @@ -719,6 +719,11 @@ func (p *parser) aggrExpr() *AggregateExpr { } p.expect(itemLeftParen, ctx) + var param Expr + if agop.typ.isAggregatorWithParam() { + param = p.expr() + p.expect(itemComma, ctx) + } e := p.expr() p.expect(itemRightParen, ctx) @@ -746,6 +751,7 @@ func (p *parser) aggrExpr() *AggregateExpr { return &AggregateExpr{ Op: agop.typ, Expr: e, + Param: param, Grouping: grouping, Without: without, KeepCommonLabels: keepCommon, @@ -1043,6 +1049,9 @@ func (p *parser) checkType(node Node) (typ model.ValueType) { p.errorf("aggregation operator expected in aggregation expression but got %q", n.Op) } p.expectType(n.Expr, model.ValVector, "aggregation expression") + if n.Op == itemTopK || n.Op == itemBottomK { + p.expectType(n.Param, model.ValScalar, "aggregation parameter") + } case *BinaryExpr: lt := p.checkType(n.LHS) diff --git a/promql/parse_test.go b/promql/parse_test.go index 5e65bfd98..7aafed52a 100644 --- a/promql/parse_test.go +++ b/promql/parse_test.go @@ -1201,6 +1201,18 @@ var testExpr = []struct { }, Grouping: model.LabelNames{}, }, + }, { + input: "topk(5, some_metric)", + expected: &AggregateExpr{ + Op: itemTopK, + Expr: &VectorSelector{ + Name: "some_metric", + LabelMatchers: metric.LabelMatchers{ + {Type: metric.Equal, Name: model.MetricNameLabel, Value: "some_metric"}, + }, + }, + Param: &NumberLiteral{5}, + }, }, { input: `sum some_metric by (test)`, fail: true, @@ -1237,6 +1249,14 @@ var testExpr = []struct { input: `sum without (test) (some_metric) by (test)`, fail: true, errMsg: "could not parse remaining input \"by (test)\"...", + }, { + input: `topk(some_metric)`, + fail: true, + errMsg: "parse error at char 17: unexpected \")\" in aggregation, expected \",\"", + }, { + input: `topk(some_metric, other_metric)`, + fail: true, + errMsg: "parse error at char 32: expected type scalar in aggregation parameter, got vector", }, // Test function calls. { diff --git a/promql/printer.go b/promql/printer.go index 8d93af29e..21678a7d5 100644 --- a/promql/printer.go +++ b/promql/printer.go @@ -135,7 +135,11 @@ func (es Expressions) String() (s string) { } func (node *AggregateExpr) String() string { - aggrString := fmt.Sprintf("%s(%s)", node.Op, node.Expr) + aggrString := fmt.Sprintf("%s(", node.Op) + if node.Op.isAggregatorWithParam() { + aggrString += fmt.Sprintf("%s, ", node.Param) + } + aggrString += fmt.Sprintf("%s)", node.Expr) if len(node.Grouping) > 0 { var format string if node.Without { diff --git a/promql/printer_test.go b/promql/printer_test.go index 3aeda1024..5f9d8237f 100644 --- a/promql/printer_test.go +++ b/promql/printer_test.go @@ -71,6 +71,9 @@ func TestExprString(t *testing.T) { { in: `sum(task:errors:rate10s{job="s"}) WITHOUT (instance)`, }, + { + in: `topk(5, task:errors:rate10s{job="s"})`, + }, { in: `a - ON(b) c`, }, diff --git a/promql/testdata/aggregators.test b/promql/testdata/aggregators.test index 78bf5192d..d730c89c9 100644 --- a/promql/testdata/aggregators.test +++ b/promql/testdata/aggregators.test @@ -126,3 +126,60 @@ eval instant at 0m max by (group) (http_requests) eval instant at 0m min by (group) (http_requests) {group="production"} 1 {group="canary"} 3 + +clear + +# Tests for topk/bottomk. +load 5m + http_requests{job="api-server", instance="0", group="production"} 0+10x10 + http_requests{job="api-server", instance="1", group="production"} 0+20x10 + http_requests{job="api-server", instance="2", group="production"} NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN + http_requests{job="api-server", instance="0", group="canary"} 0+30x10 + http_requests{job="api-server", instance="1", group="canary"} 0+40x10 + http_requests{job="app-server", instance="0", group="production"} 0+50x10 + http_requests{job="app-server", instance="1", group="production"} 0+60x10 + http_requests{job="app-server", instance="0", group="canary"} 0+70x10 + http_requests{job="app-server", instance="1", group="canary"} 0+80x10 + +eval_ordered instant at 50m topk(3, http_requests) + http_requests{group="canary", instance="1", job="app-server"} 800 + http_requests{group="canary", instance="0", job="app-server"} 700 + http_requests{group="production", instance="1", job="app-server"} 600 + +eval_ordered instant at 50m topk(5, http_requests{group="canary",job="app-server"}) + http_requests{group="canary", instance="1", job="app-server"} 800 + http_requests{group="canary", instance="0", job="app-server"} 700 + +eval_ordered instant at 50m bottomk(3, http_requests) + http_requests{group="production", instance="0", job="api-server"} 100 + http_requests{group="production", instance="1", job="api-server"} 200 + http_requests{group="canary", instance="0", job="api-server"} 300 + +eval_ordered instant at 50m bottomk(5, http_requests{group="canary",job="app-server"}) + http_requests{group="canary", instance="0", job="app-server"} 700 + http_requests{group="canary", instance="1", job="app-server"} 800 + +eval instant at 50m topk by (group) (1, http_requests) + http_requests{group="production", instance="1", job="app-server"} 600 + http_requests{group="canary", instance="1", job="app-server"} 800 + +eval instant at 50m bottomk by (group) (2, http_requests) + http_requests{group="canary", instance="0", job="api-server"} 300 + http_requests{group="canary", instance="1", job="api-server"} 400 + http_requests{group="production", instance="0", job="api-server"} 100 + http_requests{group="production", instance="1", job="api-server"} 200 + +eval_ordered instant at 50m bottomk by (group) (2, http_requests{group="production"}) + http_requests{group="production", instance="0", job="api-server"} 100 + http_requests{group="production", instance="1", job="api-server"} 200 + +# Test NaN is sorted away from the top/bottom. +eval_ordered instant at 50m topk(3, http_requests{job="api-server",group="production"}) + http_requests{job="api-server", instance="1", group="production"} 200 + http_requests{job="api-server", instance="0", group="production"} 100 + http_requests{job="api-server", instance="2", group="production"} NaN + +eval_ordered instant at 50m bottomk(3, http_requests{job="api-server",group="production"}) + http_requests{job="api-server", instance="0", group="production"} 100 + http_requests{job="api-server", instance="1", group="production"} 200 + http_requests{job="api-server", instance="2", group="production"} NaN diff --git a/promql/testdata/functions.test b/promql/testdata/functions.test index a568595a7..15f1affdb 100644 --- a/promql/testdata/functions.test +++ b/promql/testdata/functions.test @@ -210,49 +210,6 @@ eval instant at 0m clamp_max(clamp_min(test_clamp, -20), 70) {src="clamp-b"} 0 {src="clamp-c"} 70 -clear - -# Tests for topk/bottomk. -load 5m - http_requests{job="api-server", instance="0", group="production"} 0+10x10 - http_requests{job="api-server", instance="1", group="production"} 0+20x10 - http_requests{job="api-server", instance="2", group="production"} NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN - http_requests{job="api-server", instance="0", group="canary"} 0+30x10 - http_requests{job="api-server", instance="1", group="canary"} 0+40x10 - http_requests{job="app-server", instance="0", group="production"} 0+50x10 - http_requests{job="app-server", instance="1", group="production"} 0+60x10 - http_requests{job="app-server", instance="0", group="canary"} 0+70x10 - http_requests{job="app-server", instance="1", group="canary"} 0+80x10 - -eval_ordered instant at 50m topk(3, http_requests) - http_requests{group="canary", instance="1", job="app-server"} 800 - http_requests{group="canary", instance="0", job="app-server"} 700 - http_requests{group="production", instance="1", job="app-server"} 600 - -eval_ordered instant at 50m topk(5, http_requests{group="canary",job="app-server"}) - http_requests{group="canary", instance="1", job="app-server"} 800 - http_requests{group="canary", instance="0", job="app-server"} 700 - -eval_ordered instant at 50m bottomk(3, http_requests) - http_requests{group="production", instance="0", job="api-server"} 100 - http_requests{group="production", instance="1", job="api-server"} 200 - http_requests{group="canary", instance="0", job="api-server"} 300 - -eval_ordered instant at 50m bottomk(5, http_requests{group="canary",job="app-server"}) - http_requests{group="canary", instance="0", job="app-server"} 700 - http_requests{group="canary", instance="1", job="app-server"} 800 - -# Test NaN is sorted away from the top/bottom. -eval_ordered instant at 50m topk(3, http_requests{job="api-server",group="production"}) - http_requests{job="api-server", instance="1", group="production"} 200 - http_requests{job="api-server", instance="0", group="production"} 100 - http_requests{job="api-server", instance="2", group="production"} NaN - -eval_ordered instant at 50m bottomk(3, http_requests{job="api-server",group="production"}) - http_requests{job="api-server", instance="0", group="production"} 100 - http_requests{job="api-server", instance="1", group="production"} 200 - http_requests{job="api-server", instance="2", group="production"} NaN - # Tests for sort/sort_desc. clear