From 527831ec6e5156ea82c758a8747ce0d8bd3bd4e5 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Mon, 28 Jul 2014 16:12:58 +0200 Subject: [PATCH] Add abs() and over-time aggregation functions. This implements aggregation functions over time as request in https://github.com/prometheus/prometheus/issues/383. Change-Id: Ifd69b850de8cfdf6e7a6c0e042056fa4c672410e --- rules/ast/functions.go | 117 +++++++++++++++++++++++++++++++++++++++++ rules/rules_test.go | 56 +++++++++++++++++++- 2 files changed, 172 insertions(+), 1 deletion(-) diff --git a/rules/ast/functions.go b/rules/ast/functions.go index f4577780c..a27bc87ce 100644 --- a/rules/ast/functions.go +++ b/rules/ast/functions.go @@ -21,6 +21,7 @@ import ( clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/utility" ) @@ -275,7 +276,105 @@ func countScalarImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args [] return clientmodel.SampleValue(len(args[0].(VectorNode).Eval(timestamp, view))) } +func aggrOverTime(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node, aggrFn func(metric.Values) clientmodel.SampleValue) interface{} { + n := args[0].(MatrixNode) + matrixVal := n.Eval(timestamp, view) + resultVector := Vector{} + + for _, el := range matrixVal { + if len(el.Values) == 0 { + continue + } + + resultVector = append(resultVector, &clientmodel.Sample{ + Metric: el.Metric, + Value: aggrFn(el.Values), + Timestamp: timestamp, + }) + } + return resultVector +} + +// === avg_over_time(matrix MatrixNode) Vector === +func avgOverTimeImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} { + return aggrOverTime(timestamp, view, args, func(values metric.Values) clientmodel.SampleValue { + var sum clientmodel.SampleValue + for _, v := range values { + sum += v.Value + } + return sum / clientmodel.SampleValue(len(values)) + }) +} + +// === count_over_time(matrix MatrixNode) Vector === +func countOverTimeImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} { + return aggrOverTime(timestamp, view, args, func(values metric.Values) clientmodel.SampleValue { + return clientmodel.SampleValue(len(values)) + }) +} + +// === max_over_time(matrix MatrixNode) Vector === +func maxOverTimeImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} { + return aggrOverTime(timestamp, view, args, func(values metric.Values) clientmodel.SampleValue { + max := math.Inf(-1) + for _, v := range values { + max = math.Max(max, float64(v.Value)) + } + return clientmodel.SampleValue(max) + }) +} + +// === min_over_time(matrix MatrixNode) Vector === +func minOverTimeImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} { + return aggrOverTime(timestamp, view, args, func(values metric.Values) clientmodel.SampleValue { + min := math.Inf(1) + for _, v := range values { + min = math.Min(min, float64(v.Value)) + } + return clientmodel.SampleValue(min) + }) +} + +// === sum_over_time(matrix MatrixNode) Vector === +func sumOverTimeImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} { + return aggrOverTime(timestamp, view, args, func(values metric.Values) clientmodel.SampleValue { + var sum clientmodel.SampleValue + for _, v := range values { + sum += v.Value + } + return sum + }) +} + +// === abs(vector VectorNode) Vector === +func absImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} { + n := args[0].(VectorNode) + vector := n.Eval(timestamp, view) + for _, el := range vector { + el.Value = clientmodel.SampleValue(math.Abs(float64(el.Value))) + } + return vector +} + var functions = map[string]*Function{ + "abs": { + name: "abs", + argTypes: []ExprType{VECTOR}, + returnType: VECTOR, + callFn: absImpl, + }, + "avg_over_time": { + name: "avg_over_time", + argTypes: []ExprType{MATRIX}, + returnType: VECTOR, + callFn: avgOverTimeImpl, + }, + "count_over_time": { + name: "count_over_time", + argTypes: []ExprType{MATRIX}, + returnType: VECTOR, + callFn: countOverTimeImpl, + }, "count_scalar": { name: "count_scalar", argTypes: []ExprType{VECTOR}, @@ -288,6 +387,18 @@ var functions = map[string]*Function{ returnType: VECTOR, callFn: deltaImpl, }, + "max_over_time": { + name: "max_over_time", + argTypes: []ExprType{MATRIX}, + returnType: VECTOR, + callFn: maxOverTimeImpl, + }, + "min_over_time": { + name: "min_over_time", + argTypes: []ExprType{MATRIX}, + returnType: VECTOR, + callFn: minOverTimeImpl, + }, "rate": { name: "rate", argTypes: []ExprType{MATRIX}, @@ -318,6 +429,12 @@ var functions = map[string]*Function{ returnType: VECTOR, callFn: sortDescImpl, }, + "sum_over_time": { + name: "sum_over_time", + argTypes: []ExprType{MATRIX}, + returnType: VECTOR, + callFn: sumOverTimeImpl, + }, "time": { name: "time", argTypes: []ExprType{}, diff --git a/rules/rules_test.go b/rules/rules_test.go index 22ddea906..d29a463af 100644 --- a/rules/rules_test.go +++ b/rules/rules_test.go @@ -436,12 +436,66 @@ func TestExpressions(t *testing.T) { }, { expr: `http_requests{group="production",job=~"^api"}`, output: []string{ - `http_requests{group="production", instance="1", job="api-server"} => 200 @[%v]`, `http_requests{group="production", instance="0", job="api-server"} => 100 @[%v]`, + `http_requests{group="production", instance="1", job="api-server"} => 200 @[%v]`, }, fullRanges: 0, intervalRanges: 2, }, + { + expr: `abs(-1 * http_requests{group="production",job="api-server"})`, + output: []string{ + `http_requests{group="production", instance="0", job="api-server"} => 100 @[%v]`, + `http_requests{group="production", instance="1", job="api-server"} => 200 @[%v]`, + }, + fullRanges: 0, + intervalRanges: 2, + }, + { + expr: `avg_over_time(http_requests{group="production",job="api-server"}[1h])`, + output: []string{ + `http_requests{group="production", instance="0", job="api-server"} => 50 @[%v]`, + `http_requests{group="production", instance="1", job="api-server"} => 100 @[%v]`, + }, + fullRanges: 2, + intervalRanges: 0, + }, + { + expr: `count_over_time(http_requests{group="production",job="api-server"}[1h])`, + output: []string{ + `http_requests{group="production", instance="0", job="api-server"} => 11 @[%v]`, + `http_requests{group="production", instance="1", job="api-server"} => 11 @[%v]`, + }, + fullRanges: 2, + intervalRanges: 0, + }, + { + expr: `max_over_time(http_requests{group="production",job="api-server"}[1h])`, + output: []string{ + `http_requests{group="production", instance="0", job="api-server"} => 100 @[%v]`, + `http_requests{group="production", instance="1", job="api-server"} => 200 @[%v]`, + }, + fullRanges: 2, + intervalRanges: 0, + }, + { + expr: `min_over_time(http_requests{group="production",job="api-server"}[1h])`, + output: []string{ + `http_requests{group="production", instance="0", job="api-server"} => 0 @[%v]`, + `http_requests{group="production", instance="1", job="api-server"} => 0 @[%v]`, + }, + fullRanges: 2, + intervalRanges: 0, + }, + { + expr: `sum_over_time(http_requests{group="production",job="api-server"}[1h])`, + output: []string{ + `http_requests{group="production", instance="0", job="api-server"} => 550 @[%v]`, + `http_requests{group="production", instance="1", job="api-server"} => 1100 @[%v]`, + }, + fullRanges: 2, + intervalRanges: 0, + }, } tieredStorage, closer := newTestStorage(t)