mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-28 15:09:39 -08:00
fee88a7a77
Also, resolve a few other TODOs. Change-Id: Icb39b5a5e8ca22ebcb48771cd8951c5d9e112691
599 lines
16 KiB
Go
599 lines
16 KiB
Go
// Copyright 2013 Prometheus Team
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package ast
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"math"
|
|
"sort"
|
|
"time"
|
|
|
|
clientmodel "github.com/prometheus/client_golang/model"
|
|
|
|
"github.com/prometheus/prometheus/storage/metric"
|
|
)
|
|
|
|
// Function represents a function of the expression language and is
|
|
// used by function nodes.
|
|
type Function struct {
|
|
name string
|
|
argTypes []ExprType
|
|
returnType ExprType
|
|
callFn func(timestamp clientmodel.Timestamp, args []Node) interface{}
|
|
}
|
|
|
|
// CheckArgTypes returns a non-nil error if the number or types of
|
|
// passed in arg nodes do not match the function's expectations.
|
|
func (function *Function) CheckArgTypes(args []Node) error {
|
|
if len(function.argTypes) != len(args) {
|
|
return fmt.Errorf(
|
|
"wrong number of arguments to function %v(): %v expected, %v given",
|
|
function.name, len(function.argTypes), len(args),
|
|
)
|
|
}
|
|
for idx, argType := range function.argTypes {
|
|
invalidType := false
|
|
var expectedType string
|
|
if _, ok := args[idx].(ScalarNode); argType == SCALAR && !ok {
|
|
invalidType = true
|
|
expectedType = "scalar"
|
|
}
|
|
if _, ok := args[idx].(VectorNode); argType == VECTOR && !ok {
|
|
invalidType = true
|
|
expectedType = "vector"
|
|
}
|
|
if _, ok := args[idx].(MatrixNode); argType == MATRIX && !ok {
|
|
invalidType = true
|
|
expectedType = "matrix"
|
|
}
|
|
if _, ok := args[idx].(StringNode); argType == STRING && !ok {
|
|
invalidType = true
|
|
expectedType = "string"
|
|
}
|
|
|
|
if invalidType {
|
|
return fmt.Errorf(
|
|
"wrong type for argument %v in function %v(), expected %v",
|
|
idx, function.name, expectedType,
|
|
)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// === time() clientmodel.SampleValue ===
|
|
func timeImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
return clientmodel.SampleValue(timestamp.Unix())
|
|
}
|
|
|
|
// === delta(matrix MatrixNode, isCounter ScalarNode) Vector ===
|
|
func deltaImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
matrixNode := args[0].(MatrixNode)
|
|
isCounter := args[1].(ScalarNode).Eval(timestamp) > 0
|
|
resultVector := Vector{}
|
|
|
|
// If we treat these metrics as counters, we need to fetch all values
|
|
// in the interval to find breaks in the timeseries' monotonicity.
|
|
// I.e. if a counter resets, we want to ignore that reset.
|
|
var matrixValue Matrix
|
|
if isCounter {
|
|
matrixValue = matrixNode.Eval(timestamp)
|
|
} else {
|
|
matrixValue = matrixNode.EvalBoundaries(timestamp)
|
|
}
|
|
for _, samples := range matrixValue {
|
|
// No sense in trying to compute a delta without at least two points. Drop
|
|
// this vector element.
|
|
if len(samples.Values) < 2 {
|
|
continue
|
|
}
|
|
|
|
counterCorrection := clientmodel.SampleValue(0)
|
|
lastValue := clientmodel.SampleValue(0)
|
|
for _, sample := range samples.Values {
|
|
currentValue := sample.Value
|
|
if isCounter && currentValue < lastValue {
|
|
counterCorrection += lastValue - currentValue
|
|
}
|
|
lastValue = currentValue
|
|
}
|
|
resultValue := lastValue - samples.Values[0].Value + counterCorrection
|
|
|
|
targetInterval := args[0].(*MatrixSelector).interval
|
|
sampledInterval := samples.Values[len(samples.Values)-1].Timestamp.Sub(samples.Values[0].Timestamp)
|
|
if sampledInterval == 0 {
|
|
// Only found one sample. Cannot compute a rate from this.
|
|
continue
|
|
}
|
|
// Correct for differences in target vs. actual delta interval.
|
|
//
|
|
// Above, we didn't actually calculate the delta for the specified target
|
|
// interval, but for an interval between the first and last found samples
|
|
// under the target interval, which will usually have less time between
|
|
// them. Depending on how many samples are found under a target interval,
|
|
// the delta results are distorted and temporal aliasing occurs (ugly
|
|
// bumps). This effect is corrected for below.
|
|
intervalCorrection := clientmodel.SampleValue(targetInterval) / clientmodel.SampleValue(sampledInterval)
|
|
resultValue *= intervalCorrection
|
|
|
|
resultSample := &clientmodel.Sample{
|
|
Metric: samples.Metric,
|
|
Value: resultValue,
|
|
Timestamp: timestamp,
|
|
}
|
|
delete(resultSample.Metric, clientmodel.MetricNameLabel)
|
|
resultVector = append(resultVector, resultSample)
|
|
}
|
|
return resultVector
|
|
}
|
|
|
|
// === rate(node MatrixNode) Vector ===
|
|
func rateImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
args = append(args, &ScalarLiteral{value: 1})
|
|
vector := deltaImpl(timestamp, args).(Vector)
|
|
|
|
// TODO: could be other type of MatrixNode in the future (right now, only
|
|
// MatrixSelector exists). Find a better way of getting the duration of a
|
|
// matrix, such as looking at the samples themselves.
|
|
interval := args[0].(*MatrixSelector).interval
|
|
for i := range vector {
|
|
vector[i].Value /= clientmodel.SampleValue(interval / time.Second)
|
|
}
|
|
return vector
|
|
}
|
|
|
|
type vectorByValueHeap Vector
|
|
|
|
func (s vectorByValueHeap) Len() int {
|
|
return len(s)
|
|
}
|
|
|
|
func (s vectorByValueHeap) Less(i, j int) bool {
|
|
return s[i].Value < s[j].Value
|
|
}
|
|
|
|
func (s vectorByValueHeap) Swap(i, j int) {
|
|
s[i], s[j] = s[j], s[i]
|
|
}
|
|
|
|
func (s *vectorByValueHeap) Push(x interface{}) {
|
|
*s = append(*s, x.(*clientmodel.Sample))
|
|
}
|
|
|
|
func (s *vectorByValueHeap) Pop() interface{} {
|
|
old := *s
|
|
n := len(old)
|
|
el := old[n-1]
|
|
*s = old[0 : n-1]
|
|
return el
|
|
}
|
|
|
|
type reverseHeap struct {
|
|
heap.Interface
|
|
}
|
|
|
|
func (s reverseHeap) Less(i, j int) bool {
|
|
return s.Interface.Less(j, i)
|
|
}
|
|
|
|
// === sort(node VectorNode) Vector ===
|
|
func sortImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
byValueSorter := vectorByValueHeap(args[0].(VectorNode).Eval(timestamp))
|
|
sort.Sort(byValueSorter)
|
|
return Vector(byValueSorter)
|
|
}
|
|
|
|
// === sortDesc(node VectorNode) Vector ===
|
|
func sortDescImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
byValueSorter := vectorByValueHeap(args[0].(VectorNode).Eval(timestamp))
|
|
sort.Sort(sort.Reverse(byValueSorter))
|
|
return Vector(byValueSorter)
|
|
}
|
|
|
|
// === topk(k ScalarNode, node VectorNode) Vector ===
|
|
func topkImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
k := int(args[0].(ScalarNode).Eval(timestamp))
|
|
if k < 1 {
|
|
return Vector{}
|
|
}
|
|
|
|
topk := make(vectorByValueHeap, 0, k)
|
|
vector := args[1].(VectorNode).Eval(timestamp)
|
|
|
|
for _, el := range vector {
|
|
if len(topk) < k || topk[0].Value < el.Value {
|
|
if len(topk) == k {
|
|
heap.Pop(&topk)
|
|
}
|
|
heap.Push(&topk, el)
|
|
}
|
|
}
|
|
sort.Sort(sort.Reverse(topk))
|
|
return Vector(topk)
|
|
}
|
|
|
|
// === bottomk(k ScalarNode, node VectorNode) Vector ===
|
|
func bottomkImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
k := int(args[0].(ScalarNode).Eval(timestamp))
|
|
if k < 1 {
|
|
return Vector{}
|
|
}
|
|
|
|
bottomk := make(vectorByValueHeap, 0, k)
|
|
bkHeap := reverseHeap{Interface: &bottomk}
|
|
vector := args[1].(VectorNode).Eval(timestamp)
|
|
|
|
for _, el := range vector {
|
|
if len(bottomk) < k || bottomk[0].Value > el.Value {
|
|
if len(bottomk) == k {
|
|
heap.Pop(&bkHeap)
|
|
}
|
|
heap.Push(&bkHeap, el)
|
|
}
|
|
}
|
|
sort.Sort(bottomk)
|
|
return Vector(bottomk)
|
|
}
|
|
|
|
// === drop_common_labels(node VectorNode) Vector ===
|
|
func dropCommonLabelsImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
vector := args[0].(VectorNode).Eval(timestamp)
|
|
if len(vector) < 1 {
|
|
return Vector{}
|
|
}
|
|
common := clientmodel.LabelSet{}
|
|
for k, v := range vector[0].Metric {
|
|
// TODO(julius): Should we also drop common metric names?
|
|
if k == clientmodel.MetricNameLabel {
|
|
continue
|
|
}
|
|
common[k] = v
|
|
}
|
|
|
|
for _, el := range vector[1:] {
|
|
for k, v := range common {
|
|
if el.Metric[k] != v {
|
|
// Deletion of map entries while iterating over them is safe.
|
|
// From http://golang.org/ref/spec#For_statements:
|
|
// "If map entries that have not yet been reached are deleted during
|
|
// iteration, the corresponding iteration values will not be produced."
|
|
delete(common, k)
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, el := range vector {
|
|
for k := range el.Metric {
|
|
if _, ok := common[k]; ok {
|
|
delete(el.Metric, k)
|
|
}
|
|
}
|
|
}
|
|
return vector
|
|
}
|
|
|
|
// === sampleVectorImpl() Vector ===
|
|
func sampleVectorImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
return Vector{
|
|
&clientmodel.Sample{
|
|
Metric: clientmodel.Metric{
|
|
clientmodel.MetricNameLabel: "http_requests",
|
|
clientmodel.JobLabel: "api-server",
|
|
"instance": "0",
|
|
},
|
|
Value: 10,
|
|
Timestamp: timestamp,
|
|
},
|
|
&clientmodel.Sample{
|
|
Metric: clientmodel.Metric{
|
|
clientmodel.MetricNameLabel: "http_requests",
|
|
clientmodel.JobLabel: "api-server",
|
|
"instance": "1",
|
|
},
|
|
Value: 20,
|
|
Timestamp: timestamp,
|
|
},
|
|
&clientmodel.Sample{
|
|
Metric: clientmodel.Metric{
|
|
clientmodel.MetricNameLabel: "http_requests",
|
|
clientmodel.JobLabel: "api-server",
|
|
"instance": "2",
|
|
},
|
|
Value: 30,
|
|
Timestamp: timestamp,
|
|
},
|
|
&clientmodel.Sample{
|
|
Metric: clientmodel.Metric{
|
|
clientmodel.MetricNameLabel: "http_requests",
|
|
clientmodel.JobLabel: "api-server",
|
|
"instance": "3",
|
|
"group": "canary",
|
|
},
|
|
Value: 40,
|
|
Timestamp: timestamp,
|
|
},
|
|
&clientmodel.Sample{
|
|
Metric: clientmodel.Metric{
|
|
clientmodel.MetricNameLabel: "http_requests",
|
|
clientmodel.JobLabel: "api-server",
|
|
"instance": "2",
|
|
"group": "canary",
|
|
},
|
|
Value: 40,
|
|
Timestamp: timestamp,
|
|
},
|
|
&clientmodel.Sample{
|
|
Metric: clientmodel.Metric{
|
|
clientmodel.MetricNameLabel: "http_requests",
|
|
clientmodel.JobLabel: "api-server",
|
|
"instance": "3",
|
|
"group": "mytest",
|
|
},
|
|
Value: 40,
|
|
Timestamp: timestamp,
|
|
},
|
|
&clientmodel.Sample{
|
|
Metric: clientmodel.Metric{
|
|
clientmodel.MetricNameLabel: "http_requests",
|
|
clientmodel.JobLabel: "api-server",
|
|
"instance": "3",
|
|
"group": "mytest",
|
|
},
|
|
Value: 40,
|
|
Timestamp: timestamp,
|
|
},
|
|
}
|
|
}
|
|
|
|
// === scalar(node VectorNode) Scalar ===
|
|
func scalarImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
v := args[0].(VectorNode).Eval(timestamp)
|
|
if len(v) != 1 {
|
|
return clientmodel.SampleValue(math.NaN())
|
|
}
|
|
return clientmodel.SampleValue(v[0].Value)
|
|
}
|
|
|
|
// === count_scalar(vector VectorNode) model.SampleValue ===
|
|
func countScalarImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
return clientmodel.SampleValue(len(args[0].(VectorNode).Eval(timestamp)))
|
|
}
|
|
|
|
func aggrOverTime(timestamp clientmodel.Timestamp, args []Node, aggrFn func(metric.Values) clientmodel.SampleValue) interface{} {
|
|
n := args[0].(MatrixNode)
|
|
matrixVal := n.Eval(timestamp)
|
|
resultVector := Vector{}
|
|
|
|
for _, el := range matrixVal {
|
|
if len(el.Values) == 0 {
|
|
continue
|
|
}
|
|
|
|
delete(el.Metric, clientmodel.MetricNameLabel)
|
|
resultVector = append(resultVector, &clientmodel.Sample{
|
|
Metric: el.Metric,
|
|
Value: aggrFn(el.Values),
|
|
Timestamp: timestamp,
|
|
})
|
|
}
|
|
return resultVector
|
|
}
|
|
|
|
// === avg_over_time(matrix MatrixNode) Vector ===
|
|
func avgOverTimeImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
return aggrOverTime(timestamp, args, func(values metric.Values) clientmodel.SampleValue {
|
|
var sum clientmodel.SampleValue
|
|
for _, v := range values {
|
|
sum += v.Value
|
|
}
|
|
return sum / clientmodel.SampleValue(len(values))
|
|
})
|
|
}
|
|
|
|
// === count_over_time(matrix MatrixNode) Vector ===
|
|
func countOverTimeImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
return aggrOverTime(timestamp, args, func(values metric.Values) clientmodel.SampleValue {
|
|
return clientmodel.SampleValue(len(values))
|
|
})
|
|
}
|
|
|
|
// === max_over_time(matrix MatrixNode) Vector ===
|
|
func maxOverTimeImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
return aggrOverTime(timestamp, args, func(values metric.Values) clientmodel.SampleValue {
|
|
max := math.Inf(-1)
|
|
for _, v := range values {
|
|
max = math.Max(max, float64(v.Value))
|
|
}
|
|
return clientmodel.SampleValue(max)
|
|
})
|
|
}
|
|
|
|
// === min_over_time(matrix MatrixNode) Vector ===
|
|
func minOverTimeImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
return aggrOverTime(timestamp, args, func(values metric.Values) clientmodel.SampleValue {
|
|
min := math.Inf(1)
|
|
for _, v := range values {
|
|
min = math.Min(min, float64(v.Value))
|
|
}
|
|
return clientmodel.SampleValue(min)
|
|
})
|
|
}
|
|
|
|
// === sum_over_time(matrix MatrixNode) Vector ===
|
|
func sumOverTimeImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
return aggrOverTime(timestamp, args, func(values metric.Values) clientmodel.SampleValue {
|
|
var sum clientmodel.SampleValue
|
|
for _, v := range values {
|
|
sum += v.Value
|
|
}
|
|
return sum
|
|
})
|
|
}
|
|
|
|
// === abs(vector VectorNode) Vector ===
|
|
func absImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
n := args[0].(VectorNode)
|
|
vector := n.Eval(timestamp)
|
|
for _, el := range vector {
|
|
delete(el.Metric, clientmodel.MetricNameLabel)
|
|
el.Value = clientmodel.SampleValue(math.Abs(float64(el.Value)))
|
|
}
|
|
return vector
|
|
}
|
|
|
|
// === absent(vector VectorNode) Vector ===
|
|
func absentImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
|
|
n := args[0].(VectorNode)
|
|
if len(n.Eval(timestamp)) > 0 {
|
|
return Vector{}
|
|
}
|
|
m := clientmodel.Metric{}
|
|
if vs, ok := n.(*VectorSelector); ok {
|
|
for _, matcher := range vs.labelMatchers {
|
|
if matcher.Type == metric.Equal && matcher.Name != clientmodel.MetricNameLabel {
|
|
m[matcher.Name] = matcher.Value
|
|
}
|
|
}
|
|
}
|
|
return Vector{
|
|
&clientmodel.Sample{
|
|
Metric: m,
|
|
Value: 1,
|
|
Timestamp: timestamp,
|
|
},
|
|
}
|
|
}
|
|
|
|
var functions = map[string]*Function{
|
|
"abs": {
|
|
name: "abs",
|
|
argTypes: []ExprType{VECTOR},
|
|
returnType: VECTOR,
|
|
callFn: absImpl,
|
|
},
|
|
"absent": {
|
|
name: "absent",
|
|
argTypes: []ExprType{VECTOR},
|
|
returnType: VECTOR,
|
|
callFn: absentImpl,
|
|
},
|
|
"avg_over_time": {
|
|
name: "avg_over_time",
|
|
argTypes: []ExprType{MATRIX},
|
|
returnType: VECTOR,
|
|
callFn: avgOverTimeImpl,
|
|
},
|
|
"bottomk": {
|
|
name: "bottomk",
|
|
argTypes: []ExprType{SCALAR, VECTOR},
|
|
returnType: VECTOR,
|
|
callFn: bottomkImpl,
|
|
},
|
|
"count_over_time": {
|
|
name: "count_over_time",
|
|
argTypes: []ExprType{MATRIX},
|
|
returnType: VECTOR,
|
|
callFn: countOverTimeImpl,
|
|
},
|
|
"count_scalar": {
|
|
name: "count_scalar",
|
|
argTypes: []ExprType{VECTOR},
|
|
returnType: SCALAR,
|
|
callFn: countScalarImpl,
|
|
},
|
|
"delta": {
|
|
name: "delta",
|
|
argTypes: []ExprType{MATRIX, SCALAR},
|
|
returnType: VECTOR,
|
|
callFn: deltaImpl,
|
|
},
|
|
"drop_common_labels": {
|
|
name: "drop_common_labels",
|
|
argTypes: []ExprType{VECTOR},
|
|
returnType: VECTOR,
|
|
callFn: dropCommonLabelsImpl,
|
|
},
|
|
"max_over_time": {
|
|
name: "max_over_time",
|
|
argTypes: []ExprType{MATRIX},
|
|
returnType: VECTOR,
|
|
callFn: maxOverTimeImpl,
|
|
},
|
|
"min_over_time": {
|
|
name: "min_over_time",
|
|
argTypes: []ExprType{MATRIX},
|
|
returnType: VECTOR,
|
|
callFn: minOverTimeImpl,
|
|
},
|
|
"rate": {
|
|
name: "rate",
|
|
argTypes: []ExprType{MATRIX},
|
|
returnType: VECTOR,
|
|
callFn: rateImpl,
|
|
},
|
|
"sampleVector": {
|
|
name: "sampleVector",
|
|
argTypes: []ExprType{},
|
|
returnType: VECTOR,
|
|
callFn: sampleVectorImpl,
|
|
},
|
|
"scalar": {
|
|
name: "scalar",
|
|
argTypes: []ExprType{VECTOR},
|
|
returnType: SCALAR,
|
|
callFn: scalarImpl,
|
|
},
|
|
"sort": {
|
|
name: "sort",
|
|
argTypes: []ExprType{VECTOR},
|
|
returnType: VECTOR,
|
|
callFn: sortImpl,
|
|
},
|
|
"sort_desc": {
|
|
name: "sort_desc",
|
|
argTypes: []ExprType{VECTOR},
|
|
returnType: VECTOR,
|
|
callFn: sortDescImpl,
|
|
},
|
|
"sum_over_time": {
|
|
name: "sum_over_time",
|
|
argTypes: []ExprType{MATRIX},
|
|
returnType: VECTOR,
|
|
callFn: sumOverTimeImpl,
|
|
},
|
|
"time": {
|
|
name: "time",
|
|
argTypes: []ExprType{},
|
|
returnType: SCALAR,
|
|
callFn: timeImpl,
|
|
},
|
|
"topk": {
|
|
name: "topk",
|
|
argTypes: []ExprType{SCALAR, VECTOR},
|
|
returnType: VECTOR,
|
|
callFn: topkImpl,
|
|
},
|
|
}
|
|
|
|
// GetFunction returns a predefined Function object for the given
|
|
// name.
|
|
func GetFunction(name string) (*Function, error) {
|
|
function, ok := functions[name]
|
|
if !ok {
|
|
return nil, fmt.Errorf("couldn't find function %v()", name)
|
|
}
|
|
return function, nil
|
|
}
|