prometheus/rules/ast/ast.go

1172 lines
33 KiB
Go
Raw Normal View History

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ast
import (
"errors"
"flag"
2013-01-17 15:07:00 -08:00
"fmt"
"math"
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
)
var (
stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
queryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.")
)
type queryTimeoutError struct {
timeoutAfter time.Duration
}
func (e queryTimeoutError) Error() string {
return fmt.Sprintf("query timeout after %v", e.timeoutAfter)
}
// ----------------------------------------------------------------------------
// Raw data value types.
// SampleStream is a stream of Values belonging to an attached COWMetric.
type SampleStream struct {
Metric clientmodel.COWMetric `json:"metric"`
Values metric.Values `json:"values"`
}
// Sample is a single sample belonging to a COWMetric.
type Sample struct {
Metric clientmodel.COWMetric `json:"metric"`
Value clientmodel.SampleValue `json:"value"`
Timestamp clientmodel.Timestamp `json:"timestamp"`
}
// Vector is basically only an alias for clientmodel.Samples, but the
// contract is that in a Vector, all Samples have the same timestamp.
type Vector []*Sample
// Matrix is a slice of SampleStreams that implements sort.Interface and
// has a String method.
// BUG(julius): Pointerize this.
type Matrix []SampleStream
type groupedAggregation struct {
labels clientmodel.COWMetric
value clientmodel.SampleValue
groupCount int
}
// ----------------------------------------------------------------------------
// Enums.
// ExprType is an enum for the rule language expression types.
type ExprType int
// Possible language expression types. We define these as integer constants
// because sometimes we need to pass around just the type without an object of
// that type.
const (
ScalarType ExprType = iota
VectorType
MatrixType
StringType
)
// BinOpType is an enum for binary operator types.
type BinOpType int
// Possible binary operator types.
const (
Add BinOpType = iota
Sub
Mul
Div
Mod
NE
EQ
GT
LT
GE
LE
And
Or
)
// shouldDropMetric indicates whether the metric name should be dropped after
// applying this operator to a vector.
func (opType BinOpType) shouldDropMetric() bool {
switch opType {
case Add, Sub, Mul, Div, Mod:
return true
default:
return false
}
}
// AggrType is an enum for aggregation types.
type AggrType int
// Possible aggregation types.
const (
Sum AggrType = iota
Avg
Min
Max
Count
)
// ----------------------------------------------------------------------------
// Interfaces.
// Nodes is a slice of any mix of node types as all node types
// implement the Node interface.
type Nodes []Node
// Node is the top-level interface for any kind of nodes. Each node
// type implements one of the ...Node interfaces, each of which embeds
// this Node interface.
type Node interface {
Type() ExprType
Children() Nodes
NodeTreeToDotGraph() string
String() string
}
// ScalarNode is a Node for scalar values.
type ScalarNode interface {
Node
// Eval evaluates and returns the value of the scalar represented by this node.
Eval(timestamp clientmodel.Timestamp) clientmodel.SampleValue
}
// VectorNode is a Node for vector values.
type VectorNode interface {
Node
// Eval evaluates the node recursively and returns the result
// as a Vector (i.e. a slice of Samples all at the given
// Timestamp).
Eval(timestamp clientmodel.Timestamp) Vector
}
// MatrixNode is a Node for matrix values.
type MatrixNode interface {
Node
// Eval evaluates the node recursively and returns the result as a Matrix.
Eval(timestamp clientmodel.Timestamp) Matrix
// Eval evaluates the node recursively and returns the result
// as a Matrix that only contains the boundary values.
EvalBoundaries(timestamp clientmodel.Timestamp) Matrix
}
// StringNode is a Node for string values.
type StringNode interface {
Node
// Eval evaluates and returns the value of the string
// represented by this node.
Eval(timestamp clientmodel.Timestamp) string
}
// ----------------------------------------------------------------------------
// ScalarNode types.
type (
// ScalarLiteral represents a numeric selector.
ScalarLiteral struct {
value clientmodel.SampleValue
}
// ScalarFunctionCall represents a function with a numeric
// return type.
ScalarFunctionCall struct {
function *Function
args Nodes
}
// ScalarArithExpr represents an arithmetic expression of
// numeric type.
ScalarArithExpr struct {
opType BinOpType
lhs ScalarNode
rhs ScalarNode
}
)
// ----------------------------------------------------------------------------
// VectorNode types.
type (
// A VectorSelector represents a metric name plus labelset.
VectorSelector struct {
labelMatchers metric.LabelMatchers
Implement offset operator. This allows changing the time offset for individual instant and range vectors in a query. For example, this returns the value of `foo` 5 minutes in the past relative to the current query evaluation time: foo offset 5m Note that the `offset` modifier always needs to follow the selector immediately. I.e. the following would be correct: sum(foo offset 5m) // GOOD. While the following would be *incorrect*: sum(foo) offset 5m // INVALID. The same works for range vectors. This returns the 5-minutes-rate that `foo` had a week ago: rate(foo[5m] offset 1w) This change touches the following components: * Lexer/parser: additions to correctly parse the new `offset`/`OFFSET` keyword. * AST: vector and matrix nodes now have an additional `offset` field. This is used during their evaluation to adjust query and result times appropriately. * Query analyzer: now works on separate sets of ranges and instants per offset. Isolating different offsets from each other completely in this way keeps the preloading code relatively simple. No storage engine changes were needed by this change. The rules tests have been changed to not probe the internal implementation details of the query analyzer anymore (how many instants and ranges have been preloaded). This would also become too cumbersome to test with the new model, and measuring the result of the query should be sufficient. This fixes https://github.com/prometheus/prometheus/issues/529 This fixed https://github.com/prometheus/promdash/issues/201
2015-02-17 17:30:41 -08:00
offset time.Duration
// The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.COWMetric
// Fingerprints are populated from label matchers at query analysis time.
fingerprints clientmodel.Fingerprints
}
// VectorFunctionCall represents a function with vector return
// type.
VectorFunctionCall struct {
function *Function
args Nodes
}
// A VectorAggregation with vector return type.
VectorAggregation struct {
aggrType AggrType
groupBy clientmodel.LabelNames
keepExtraLabels bool
vector VectorNode
}
// VectorArithExpr represents an arithmetic expression of vector type. At
// least one of the two operand Nodes must be a VectorNode. The other may be
// a VectorNode or ScalarNode. Both criteria are checked at runtime.
VectorArithExpr struct {
opType BinOpType
lhs Node
rhs Node
matchCardinality VectorMatchCardinality
matchOn clientmodel.LabelNames
includeLabels clientmodel.LabelNames
}
)
type VectorMatchCardinality int
const (
MatchOneToOne VectorMatchCardinality = iota
MatchManyToOne
MatchOneToMany
MatchManyToMany
)
// ----------------------------------------------------------------------------
// MatrixNode types.
type (
// A MatrixSelector represents a metric name plus labelset and
// timerange.
MatrixSelector struct {
labelMatchers metric.LabelMatchers
// The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.COWMetric
// Fingerprints are populated from label matchers at query analysis time.
fingerprints clientmodel.Fingerprints
2013-03-26 04:33:48 -07:00
interval time.Duration
Implement offset operator. This allows changing the time offset for individual instant and range vectors in a query. For example, this returns the value of `foo` 5 minutes in the past relative to the current query evaluation time: foo offset 5m Note that the `offset` modifier always needs to follow the selector immediately. I.e. the following would be correct: sum(foo offset 5m) // GOOD. While the following would be *incorrect*: sum(foo) offset 5m // INVALID. The same works for range vectors. This returns the 5-minutes-rate that `foo` had a week ago: rate(foo[5m] offset 1w) This change touches the following components: * Lexer/parser: additions to correctly parse the new `offset`/`OFFSET` keyword. * AST: vector and matrix nodes now have an additional `offset` field. This is used during their evaluation to adjust query and result times appropriately. * Query analyzer: now works on separate sets of ranges and instants per offset. Isolating different offsets from each other completely in this way keeps the preloading code relatively simple. No storage engine changes were needed by this change. The rules tests have been changed to not probe the internal implementation details of the query analyzer anymore (how many instants and ranges have been preloaded). This would also become too cumbersome to test with the new model, and measuring the result of the query should be sufficient. This fixes https://github.com/prometheus/prometheus/issues/529 This fixed https://github.com/prometheus/promdash/issues/201
2015-02-17 17:30:41 -08:00
offset time.Duration
}
)
// ----------------------------------------------------------------------------
// StringNode types.
type (
// A StringLiteral is what you think it is.
StringLiteral struct {
str string
}
// StringFunctionCall represents a function with string return
// type.
StringFunctionCall struct {
function *Function
args Nodes
}
)
// ----------------------------------------------------------------------------
// Implementations.
// Type implements the Node interface.
func (node ScalarLiteral) Type() ExprType { return ScalarType }
// Type implements the Node interface.
func (node ScalarFunctionCall) Type() ExprType { return ScalarType }
// Type implements the Node interface.
func (node ScalarArithExpr) Type() ExprType { return ScalarType }
// Type implements the Node interface.
func (node VectorSelector) Type() ExprType { return VectorType }
// Type implements the Node interface.
func (node VectorFunctionCall) Type() ExprType { return VectorType }
// Type implements the Node interface.
func (node VectorAggregation) Type() ExprType { return VectorType }
// Type implements the Node interface.
func (node VectorArithExpr) Type() ExprType { return VectorType }
// Type implements the Node interface.
func (node MatrixSelector) Type() ExprType { return MatrixType }
// Type implements the Node interface.
func (node StringLiteral) Type() ExprType { return StringType }
// Type implements the Node interface.
func (node StringFunctionCall) Type() ExprType { return StringType }
// Children implements the Node interface and returns an empty slice.
func (node ScalarLiteral) Children() Nodes { return Nodes{} }
// Children implements the Node interface and returns the args of the
// function call.
func (node ScalarFunctionCall) Children() Nodes { return node.args }
// Children implements the Node interface and returns the LHS and the RHS
// of the expression.
func (node ScalarArithExpr) Children() Nodes { return Nodes{node.lhs, node.rhs} }
// Children implements the Node interface and returns an empty slice.
func (node VectorSelector) Children() Nodes { return Nodes{} }
// Children implements the Node interface and returns the args of the
// function call.
func (node VectorFunctionCall) Children() Nodes { return node.args }
// Children implements the Node interface and returns the vector to be
// aggregated.
func (node VectorAggregation) Children() Nodes { return Nodes{node.vector} }
// Children implements the Node interface and returns the LHS and the RHS
// of the expression.
func (node VectorArithExpr) Children() Nodes { return Nodes{node.lhs, node.rhs} }
// Children implements the Node interface and returns an empty slice.
func (node MatrixSelector) Children() Nodes { return Nodes{} }
// Children implements the Node interface and returns an empty slice.
func (node StringLiteral) Children() Nodes { return Nodes{} }
// Children implements the Node interface and returns the args of the
// function call.
func (node StringFunctionCall) Children() Nodes { return node.args }
// Eval implements the ScalarNode interface and returns the selector
// value.
func (node *ScalarLiteral) Eval(timestamp clientmodel.Timestamp) clientmodel.SampleValue {
return node.value
}
// Eval implements the ScalarNode interface and returns the result of
// the expression.
func (node *ScalarArithExpr) Eval(timestamp clientmodel.Timestamp) clientmodel.SampleValue {
lhs := node.lhs.Eval(timestamp)
rhs := node.rhs.Eval(timestamp)
return evalScalarBinop(node.opType, lhs, rhs)
}
// Eval implements the ScalarNode interface and returns the result of
// the function call.
func (node *ScalarFunctionCall) Eval(timestamp clientmodel.Timestamp) clientmodel.SampleValue {
return node.function.callFn(timestamp, node.args).(clientmodel.SampleValue)
}
// EvalVectorInstant evaluates a VectorNode with an instant query.
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) {
totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
defer totalEvalTimer.Stop()
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
if err != nil {
return nil, err
}
defer closer.Close()
if et := totalEvalTimer.ElapsedTime(); et > *queryTimeout {
return nil, queryTimeoutError{et}
}
return node.Eval(timestamp), nil
}
// EvalVectorRange evaluates a VectorNode with a range query.
func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (Matrix, error) {
totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
defer totalEvalTimer.Stop()
// Explicitly initialize to an empty matrix since a nil Matrix encodes to
// null in JSON.
matrix := Matrix{}
prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start()
closer, err := prepareRangeQuery(node, start, end, interval, storage, queryStats)
prepareTimer.Stop()
if err != nil {
return nil, err
}
defer closer.Close()
evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start()
sampleStreams := map[clientmodel.Fingerprint]*SampleStream{}
for t := start; !t.After(end); t = t.Add(interval) {
if et := totalEvalTimer.ElapsedTime(); et > *queryTimeout {
evalTimer.Stop()
return nil, queryTimeoutError{et}
}
vector := node.Eval(t)
2013-01-17 15:07:00 -08:00
for _, sample := range vector {
samplePair := metric.SamplePair{
2013-01-17 15:07:00 -08:00
Value: sample.Value,
Timestamp: sample.Timestamp,
}
fp := sample.Metric.Metric.Fingerprint()
if sampleStreams[fp] == nil {
sampleStreams[fp] = &SampleStream{
2013-01-17 15:07:00 -08:00
Metric: sample.Metric,
Values: metric.Values{samplePair},
2013-01-17 15:07:00 -08:00
}
} else {
sampleStreams[fp].Values = append(sampleStreams[fp].Values, samplePair)
2013-01-17 15:07:00 -08:00
}
}
}
evalTimer.Stop()
2013-01-17 15:07:00 -08:00
appendTimer := queryStats.GetTimer(stats.ResultAppendTime).Start()
for _, sampleStream := range sampleStreams {
matrix = append(matrix, *sampleStream)
2013-01-17 15:07:00 -08:00
}
appendTimer.Stop()
return matrix, nil
2013-01-15 02:30:55 -08:00
}
func labelIntersection(metric1, metric2 clientmodel.COWMetric) clientmodel.COWMetric {
for label, value := range metric1.Metric {
if metric2.Metric[label] != value {
metric1.Delete(label)
}
}
return metric1
}
Use custom timestamp type for sample timestamps and related code. So far we've been using Go's native time.Time for anything related to sample timestamps. Since the range of time.Time is much bigger than what we need, this has created two problems: - there could be time.Time values which were out of the range/precision of the time type that we persist to disk, therefore causing incorrectly ordered keys. One bug caused by this was: https://github.com/prometheus/prometheus/issues/367 It would be good to use a timestamp type that's more closely aligned with what the underlying storage supports. - sizeof(time.Time) is 192, while Prometheus should be ok with a single 64-bit Unix timestamp (possibly even a 32-bit one). Since we store samples in large numbers, this seriously affects memory usage. Furthermore, copying/working with the data will be faster if it's smaller. *MEMORY USAGE RESULTS* Initial memory usage comparisons for a running Prometheus with 1 timeseries and 100,000 samples show roughly a 13% decrease in total (VIRT) memory usage. In my tests, this advantage for some reason decreased a bit the more samples the timeseries had (to 5-7% for millions of samples). This I can't fully explain, but perhaps garbage collection issues were involved. *WHEN TO USE THE NEW TIMESTAMP TYPE* The new clientmodel.Timestamp type should be used whenever time calculations are either directly or indirectly related to sample timestamps. For example: - the timestamp of a sample itself - all kinds of watermarks - anything that may become or is compared to a sample timestamp (like the timestamp passed into Target.Scrape()). When to still use time.Time: - for measuring durations/times not related to sample timestamps, like duration telemetry exporting, timers that indicate how frequently to execute some action, etc. *NOTE ON OPERATOR OPTIMIZATION TESTS* We don't use operator optimization code anymore, but it still lives in the code as dead code. It still has tests, but I couldn't get all of them to pass with the new timestamp format. I commented out the failing cases for now, but we should probably remove the dead code soon. I just didn't want to do that in the same change as this. Change-Id: I821787414b0debe85c9fffaeb57abd453727af0f
2013-10-28 06:35:02 -07:00
func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[uint64]*groupedAggregation, timestamp clientmodel.Timestamp) Vector {
vector := Vector{}
for _, aggregation := range aggregations {
switch node.aggrType {
case Avg:
aggregation.value = aggregation.value / clientmodel.SampleValue(aggregation.groupCount)
case Count:
aggregation.value = clientmodel.SampleValue(aggregation.groupCount)
default:
// For other aggregations, we already have the right value.
}
sample := &Sample{
Metric: aggregation.labels,
Value: aggregation.value,
Timestamp: timestamp,
}
vector = append(vector, sample)
}
return vector
}
// Eval implements the VectorNode interface and returns the aggregated
// Vector.
func (node *VectorAggregation) Eval(timestamp clientmodel.Timestamp) Vector {
vector := node.vector.Eval(timestamp)
result := map[uint64]*groupedAggregation{}
for _, sample := range vector {
groupingKey := clientmodel.SignatureForLabels(sample.Metric.Metric, node.groupBy)
if groupedResult, ok := result[groupingKey]; ok {
if node.keepExtraLabels {
groupedResult.labels = labelIntersection(groupedResult.labels, sample.Metric)
}
switch node.aggrType {
case Sum:
groupedResult.value += sample.Value
case Avg:
groupedResult.value += sample.Value
groupedResult.groupCount++
case Max:
if groupedResult.value < sample.Value {
groupedResult.value = sample.Value
}
case Min:
if groupedResult.value > sample.Value {
groupedResult.value = sample.Value
}
case Count:
groupedResult.groupCount++
default:
panic("Unknown aggregation type")
}
} else {
var m clientmodel.COWMetric
if node.keepExtraLabels {
m = sample.Metric
m.Delete(clientmodel.MetricNameLabel)
} else {
m = clientmodel.COWMetric{
Metric: clientmodel.Metric{},
Copied: true,
}
for _, l := range node.groupBy {
if v, ok := sample.Metric.Metric[l]; ok {
m.Set(l, v)
}
}
}
result[groupingKey] = &groupedAggregation{
labels: m,
value: sample.Value,
groupCount: 1,
}
}
}
return node.groupedAggregationsToVector(result, timestamp)
}
// Eval implements the VectorNode interface and returns the value of
// the selector.
func (node *VectorSelector) Eval(timestamp clientmodel.Timestamp) Vector {
//// timer := v.stats.GetTimer(stats.GetValueAtTimeTime).Start()
samples := Vector{}
for fp, it := range node.iterators {
Implement offset operator. This allows changing the time offset for individual instant and range vectors in a query. For example, this returns the value of `foo` 5 minutes in the past relative to the current query evaluation time: foo offset 5m Note that the `offset` modifier always needs to follow the selector immediately. I.e. the following would be correct: sum(foo offset 5m) // GOOD. While the following would be *incorrect*: sum(foo) offset 5m // INVALID. The same works for range vectors. This returns the 5-minutes-rate that `foo` had a week ago: rate(foo[5m] offset 1w) This change touches the following components: * Lexer/parser: additions to correctly parse the new `offset`/`OFFSET` keyword. * AST: vector and matrix nodes now have an additional `offset` field. This is used during their evaluation to adjust query and result times appropriately. * Query analyzer: now works on separate sets of ranges and instants per offset. Isolating different offsets from each other completely in this way keeps the preloading code relatively simple. No storage engine changes were needed by this change. The rules tests have been changed to not probe the internal implementation details of the query analyzer anymore (how many instants and ranges have been preloaded). This would also become too cumbersome to test with the new model, and measuring the result of the query should be sufficient. This fixes https://github.com/prometheus/prometheus/issues/529 This fixed https://github.com/prometheus/promdash/issues/201
2015-02-17 17:30:41 -08:00
sampleCandidates := it.GetValueAtTime(timestamp.Add(-node.offset))
samplePair := chooseClosestSample(sampleCandidates, timestamp.Add(-node.offset))
if samplePair != nil {
samples = append(samples, &Sample{
Metric: node.metrics[fp],
Value: samplePair.Value,
Timestamp: timestamp,
})
}
}
//// timer.Stop()
return samples
}
// chooseClosestSample chooses the closest sample of a list of samples
// surrounding a given target time. If samples are found both before and after
// the target time, the sample value is interpolated between these. Otherwise,
// the single closest sample is returned verbatim.
func chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp) *metric.SamplePair {
var closestBefore *metric.SamplePair
var closestAfter *metric.SamplePair
for _, candidate := range samples {
delta := candidate.Timestamp.Sub(timestamp)
// Samples before target time.
if delta < 0 {
// Ignore samples outside of staleness policy window.
if -delta > *stalenessDelta {
continue
}
// Ignore samples that are farther away than what we've seen before.
if closestBefore != nil && candidate.Timestamp.Before(closestBefore.Timestamp) {
continue
}
sample := candidate
closestBefore = &sample
}
// Samples after target time.
if delta >= 0 {
// Ignore samples outside of staleness policy window.
if delta > *stalenessDelta {
continue
}
// Ignore samples that are farther away than samples we've seen before.
if closestAfter != nil && candidate.Timestamp.After(closestAfter.Timestamp) {
continue
}
sample := candidate
closestAfter = &sample
}
}
switch {
case closestBefore != nil && closestAfter != nil:
return interpolateSamples(closestBefore, closestAfter, timestamp)
case closestBefore != nil:
return closestBefore
default:
return closestAfter
}
}
// interpolateSamples interpolates a value at a target time between two
// provided sample pairs.
func interpolateSamples(first, second *metric.SamplePair, timestamp clientmodel.Timestamp) *metric.SamplePair {
dv := second.Value - first.Value
dt := second.Timestamp.Sub(first.Timestamp)
dDt := dv / clientmodel.SampleValue(dt)
offset := clientmodel.SampleValue(timestamp.Sub(first.Timestamp))
return &metric.SamplePair{
Value: first.Value + (offset * dDt),
Timestamp: timestamp,
}
}
// Eval implements the VectorNode interface and returns the result of
// the function call.
func (node *VectorFunctionCall) Eval(timestamp clientmodel.Timestamp) Vector {
return node.function.callFn(timestamp, node.args).(Vector)
}
func evalScalarBinop(opType BinOpType,
lhs clientmodel.SampleValue,
rhs clientmodel.SampleValue) clientmodel.SampleValue {
switch opType {
case Add:
return lhs + rhs
case Sub:
return lhs - rhs
case Mul:
return lhs * rhs
case Div:
return lhs / rhs
case Mod:
if rhs != 0 {
return clientmodel.SampleValue(int(lhs) % int(rhs))
}
return clientmodel.SampleValue(math.NaN())
case EQ:
if lhs == rhs {
return 1
}
return 0
case NE:
if lhs != rhs {
return 1
}
return 0
case GT:
if lhs > rhs {
return 1
}
return 0
case LT:
if lhs < rhs {
return 1
}
return 0
case GE:
if lhs >= rhs {
return 1
}
return 0
case LE:
if lhs <= rhs {
return 1
}
return 0
}
panic("Not all enum values enumerated in switch")
}
func evalVectorBinop(opType BinOpType,
lhs clientmodel.SampleValue,
rhs clientmodel.SampleValue) (clientmodel.SampleValue, bool) {
switch opType {
case Add:
return lhs + rhs, true
case Sub:
return lhs - rhs, true
case Mul:
return lhs * rhs, true
case Div:
return lhs / rhs, true
case Mod:
if rhs != 0 {
return clientmodel.SampleValue(int(lhs) % int(rhs)), true
}
return clientmodel.SampleValue(math.NaN()), true
case EQ:
if lhs == rhs {
return lhs, true
}
return 0, false
case NE:
if lhs != rhs {
return lhs, true
}
return 0, false
case GT:
if lhs > rhs {
return lhs, true
}
return 0, false
case LT:
if lhs < rhs {
return lhs, true
}
return 0, false
case GE:
if lhs >= rhs {
return lhs, true
}
return 0, false
case LE:
if lhs <= rhs {
return lhs, true
}
return 0, false
}
panic("Not all enum values enumerated in switch")
}
func labelsEqual(labels1, labels2 clientmodel.Metric) bool {
for label, value := range labels1 {
if labels2[label] != value && label != clientmodel.MetricNameLabel {
return false
}
}
return true
}
// Eval implements the VectorNode interface and returns the result of
// the expression.
func (node *VectorArithExpr) Eval(timestamp clientmodel.Timestamp) Vector {
// Calculate vector-to-vector operation.
if node.lhs.Type() == VectorType && node.rhs.Type() == VectorType {
lhs := node.lhs.(VectorNode).Eval(timestamp)
rhs := node.rhs.(VectorNode).Eval(timestamp)
return node.evalVectors(timestamp, lhs, rhs)
}
// Calculate vector-to-scalar operation.
var lhs Vector
var rhs clientmodel.SampleValue
swap := false
if node.lhs.Type() == ScalarType && node.rhs.Type() == VectorType {
lhs = node.rhs.(VectorNode).Eval(timestamp)
rhs = node.lhs.(ScalarNode).Eval(timestamp)
swap = true
} else {
lhs = node.lhs.(VectorNode).Eval(timestamp)
rhs = node.rhs.(ScalarNode).Eval(timestamp)
}
result := make(Vector, 0, len(lhs))
for _, lhsSample := range lhs {
lv, rv := lhsSample.Value, rhs
// lhs always contains the vector. If the original position was different
// swap for calculating the value.
if swap {
lv, rv = rv, lv
}
value, keep := evalVectorBinop(node.opType, lv, rv)
if keep {
lhsSample.Value = value
if node.opType.shouldDropMetric() {
lhsSample.Metric.Delete(clientmodel.MetricNameLabel)
}
result = append(result, lhsSample)
}
}
return result
}
// evalVectors evaluates the binary operation for the given vectors.
func (node *VectorArithExpr) evalVectors(timestamp clientmodel.Timestamp, lhs, rhs Vector) Vector {
result := make(Vector, 0, len(rhs))
// The control flow below handles one-to-one or many-to-one matching.
// For one-to-many, swap sidedness and account for the swap when calculating
// values.
if node.matchCardinality == MatchOneToMany {
lhs, rhs = rhs, lhs
}
// All samples from the rhs hashed by the matching label/values.
rm := make(map[uint64]*Sample)
// Maps the hash of the label values used for matching to the hashes of the label
// values of the include labels (if any). It is used to keep track of already
// inserted samples.
added := make(map[uint64][]uint64)
// Add all rhs samples to a map so we can easily find matches later.
for _, rs := range rhs {
hash := node.hashForMetric(rs.Metric.Metric)
// The rhs is guaranteed to be the 'one' side. Having multiple samples
// with the same hash means that the matching is many-to-many,
// which is not supported.
if _, found := rm[hash]; node.matchCardinality != MatchManyToMany && found {
// Many-to-many matching not allowed.
// TODO(fabxc): Return a query error here once AST nodes support that.
return Vector{}
}
// In many-to-many matching the entry is simply overwritten. It can thus only
// be used to check whether any matching rhs entry exists but not retrieve them all.
rm[hash] = rs
}
// For all lhs samples find a respective rhs sample and perform
// the binary operation.
for _, ls := range lhs {
hash := node.hashForMetric(ls.Metric.Metric)
// Any lhs sample we encounter in an OR operation belongs to the result.
if node.opType == Or {
ls.Metric = node.resultMetric(ls, nil)
result = append(result, ls)
added[hash] = nil // Ensure matching rhs sample is not added later.
continue
}
rs, found := rm[hash] // Look for a match in the rhs vector.
if !found {
continue
}
var value clientmodel.SampleValue
var keep bool
if node.opType == And {
value = ls.Value
keep = true
} else {
if _, exists := added[hash]; node.matchCardinality == MatchOneToOne && exists {
// Many-to-one matching must be explicit.
// TODO(fabxc): Return a query error here once AST nodes support that.
return Vector{}
}
// Account for potentially swapped sidedness.
vl, vr := ls.Value, rs.Value
if node.matchCardinality == MatchOneToMany {
vl, vr = vr, vl
}
value, keep = evalVectorBinop(node.opType, vl, vr)
}
if keep {
metric := node.resultMetric(ls, rs)
// Check if the same label set has been added for a many-to-one matching before.
if node.matchCardinality == MatchManyToOne || node.matchCardinality == MatchOneToMany {
insHash := clientmodel.SignatureForLabels(metric.Metric, node.includeLabels)
if ihs, exists := added[hash]; exists {
for _, ih := range ihs {
if ih == insHash {
// TODO(fabxc): Return a query error here once AST nodes support that.
return Vector{}
}
}
added[hash] = append(ihs, insHash)
} else {
added[hash] = []uint64{insHash}
}
}
ns := &Sample{
Metric: metric,
Value: value,
Timestamp: timestamp,
}
result = append(result, ns)
added[hash] = added[hash] // Set existance to true.
}
}
// Add all remaining samples in the rhs in an OR operation if they
// have not been matched up with a lhs sample.
if node.opType == Or {
for hash, rs := range rm {
if _, exists := added[hash]; !exists {
rs.Metric = node.resultMetric(rs, nil)
result = append(result, rs)
}
}
}
return result
}
// resultMetric returns the metric for the given sample(s) based on the vector
// binary operation and the matching options. If a label that has to be included is set on
// both sides an error is returned.
func (node *VectorArithExpr) resultMetric(ls, rs *Sample) clientmodel.COWMetric {
if len(node.matchOn) == 0 || node.opType == Or || node.opType == And {
if node.opType.shouldDropMetric() {
ls.Metric.Delete(clientmodel.MetricNameLabel)
}
return ls.Metric
}
m := clientmodel.Metric{}
for _, ln := range node.matchOn {
m[ln] = ls.Metric.Metric[ln]
}
for _, ln := range node.includeLabels {
// Included labels from the `group_x` modifier are taken from the "many"-side.
v, ok := ls.Metric.Metric[ln]
if ok {
m[ln] = v
}
}
return clientmodel.COWMetric{false, m}
}
// hashForMetric calculates a hash value for the given metric based on the matching
// options for the binary operation.
func (node *VectorArithExpr) hashForMetric(metric clientmodel.Metric) uint64 {
var labels clientmodel.LabelNames
if len(node.matchOn) > 0 {
var match bool
for _, ln := range node.matchOn {
if _, match = metric[ln]; !match {
break
}
}
// If the metric does not contain the labels to match on, build the hash
// over the whole metric to give it a unique hash.
if !match {
labels = make(clientmodel.LabelNames, 0, len(metric))
for ln := range metric {
labels = append(labels, ln)
}
} else {
labels = node.matchOn
}
} else {
labels = make(clientmodel.LabelNames, 0, len(metric))
for ln := range metric {
if ln != clientmodel.MetricNameLabel {
labels = append(labels, ln)
}
}
}
return clientmodel.SignatureForLabels(metric, labels)
}
// Eval implements the MatrixNode interface and returns the value of
// the selector.
func (node *MatrixSelector) Eval(timestamp clientmodel.Timestamp) Matrix {
interval := &metric.Interval{
Implement offset operator. This allows changing the time offset for individual instant and range vectors in a query. For example, this returns the value of `foo` 5 minutes in the past relative to the current query evaluation time: foo offset 5m Note that the `offset` modifier always needs to follow the selector immediately. I.e. the following would be correct: sum(foo offset 5m) // GOOD. While the following would be *incorrect*: sum(foo) offset 5m // INVALID. The same works for range vectors. This returns the 5-minutes-rate that `foo` had a week ago: rate(foo[5m] offset 1w) This change touches the following components: * Lexer/parser: additions to correctly parse the new `offset`/`OFFSET` keyword. * AST: vector and matrix nodes now have an additional `offset` field. This is used during their evaluation to adjust query and result times appropriately. * Query analyzer: now works on separate sets of ranges and instants per offset. Isolating different offsets from each other completely in this way keeps the preloading code relatively simple. No storage engine changes were needed by this change. The rules tests have been changed to not probe the internal implementation details of the query analyzer anymore (how many instants and ranges have been preloaded). This would also become too cumbersome to test with the new model, and measuring the result of the query should be sufficient. This fixes https://github.com/prometheus/prometheus/issues/529 This fixed https://github.com/prometheus/promdash/issues/201
2015-02-17 17:30:41 -08:00
OldestInclusive: timestamp.Add(-node.interval - node.offset),
NewestInclusive: timestamp.Add(-node.offset),
}
//// timer := v.stats.GetTimer(stats.GetRangeValuesTime).Start()
sampleStreams := []SampleStream{}
for fp, it := range node.iterators {
samplePairs := it.GetRangeValues(*interval)
if len(samplePairs) == 0 {
continue
}
Implement offset operator. This allows changing the time offset for individual instant and range vectors in a query. For example, this returns the value of `foo` 5 minutes in the past relative to the current query evaluation time: foo offset 5m Note that the `offset` modifier always needs to follow the selector immediately. I.e. the following would be correct: sum(foo offset 5m) // GOOD. While the following would be *incorrect*: sum(foo) offset 5m // INVALID. The same works for range vectors. This returns the 5-minutes-rate that `foo` had a week ago: rate(foo[5m] offset 1w) This change touches the following components: * Lexer/parser: additions to correctly parse the new `offset`/`OFFSET` keyword. * AST: vector and matrix nodes now have an additional `offset` field. This is used during their evaluation to adjust query and result times appropriately. * Query analyzer: now works on separate sets of ranges and instants per offset. Isolating different offsets from each other completely in this way keeps the preloading code relatively simple. No storage engine changes were needed by this change. The rules tests have been changed to not probe the internal implementation details of the query analyzer anymore (how many instants and ranges have been preloaded). This would also become too cumbersome to test with the new model, and measuring the result of the query should be sufficient. This fixes https://github.com/prometheus/prometheus/issues/529 This fixed https://github.com/prometheus/promdash/issues/201
2015-02-17 17:30:41 -08:00
if node.offset != 0 {
for _, sp := range samplePairs {
sp.Timestamp = sp.Timestamp.Add(node.offset)
}
}
sampleStream := SampleStream{
Metric: node.metrics[fp],
Values: samplePairs,
}
sampleStreams = append(sampleStreams, sampleStream)
}
//// timer.Stop()
return sampleStreams
}
// EvalBoundaries implements the MatrixNode interface and returns the
// boundary values of the selector.
func (node *MatrixSelector) EvalBoundaries(timestamp clientmodel.Timestamp) Matrix {
interval := &metric.Interval{
OldestInclusive: timestamp.Add(-node.interval),
NewestInclusive: timestamp,
}
//// timer := v.stats.GetTimer(stats.GetBoundaryValuesTime).Start()
sampleStreams := []SampleStream{}
for fp, it := range node.iterators {
samplePairs := it.GetBoundaryValues(*interval)
if len(samplePairs) == 0 {
continue
}
sampleStream := SampleStream{
Metric: node.metrics[fp],
Values: samplePairs,
}
sampleStreams = append(sampleStreams, sampleStream)
}
//// timer.Stop()
return sampleStreams
}
// Len implements sort.Interface.
2013-01-15 02:30:55 -08:00
func (matrix Matrix) Len() int {
2013-01-17 15:07:00 -08:00
return len(matrix)
2013-01-15 02:30:55 -08:00
}
// Less implements sort.Interface.
2013-01-15 02:30:55 -08:00
func (matrix Matrix) Less(i, j int) bool {
return matrix[i].Metric.String() < matrix[j].Metric.String()
2013-01-15 02:30:55 -08:00
}
// Swap implements sort.Interface.
2013-01-15 02:30:55 -08:00
func (matrix Matrix) Swap(i, j int) {
2013-01-17 15:07:00 -08:00
matrix[i], matrix[j] = matrix[j], matrix[i]
2013-01-15 02:30:55 -08:00
}
// Eval implements the StringNode interface and returns the value of
// the selector.
func (node *StringLiteral) Eval(timestamp clientmodel.Timestamp) string {
return node.str
}
// Eval implements the StringNode interface and returns the result of
// the function call.
func (node *StringFunctionCall) Eval(timestamp clientmodel.Timestamp) string {
return node.function.callFn(timestamp, node.args).(string)
}
// ----------------------------------------------------------------------------
// Constructors.
// NewScalarLiteral returns a ScalarLiteral with the given value.
func NewScalarLiteral(value clientmodel.SampleValue) *ScalarLiteral {
return &ScalarLiteral{
value: value,
}
}
// NewVectorSelector returns a (not yet evaluated) VectorSelector with
// the given LabelSet.
Implement offset operator. This allows changing the time offset for individual instant and range vectors in a query. For example, this returns the value of `foo` 5 minutes in the past relative to the current query evaluation time: foo offset 5m Note that the `offset` modifier always needs to follow the selector immediately. I.e. the following would be correct: sum(foo offset 5m) // GOOD. While the following would be *incorrect*: sum(foo) offset 5m // INVALID. The same works for range vectors. This returns the 5-minutes-rate that `foo` had a week ago: rate(foo[5m] offset 1w) This change touches the following components: * Lexer/parser: additions to correctly parse the new `offset`/`OFFSET` keyword. * AST: vector and matrix nodes now have an additional `offset` field. This is used during their evaluation to adjust query and result times appropriately. * Query analyzer: now works on separate sets of ranges and instants per offset. Isolating different offsets from each other completely in this way keeps the preloading code relatively simple. No storage engine changes were needed by this change. The rules tests have been changed to not probe the internal implementation details of the query analyzer anymore (how many instants and ranges have been preloaded). This would also become too cumbersome to test with the new model, and measuring the result of the query should be sufficient. This fixes https://github.com/prometheus/prometheus/issues/529 This fixed https://github.com/prometheus/promdash/issues/201
2015-02-17 17:30:41 -08:00
func NewVectorSelector(m metric.LabelMatchers, offset time.Duration) *VectorSelector {
return &VectorSelector{
labelMatchers: m,
Implement offset operator. This allows changing the time offset for individual instant and range vectors in a query. For example, this returns the value of `foo` 5 minutes in the past relative to the current query evaluation time: foo offset 5m Note that the `offset` modifier always needs to follow the selector immediately. I.e. the following would be correct: sum(foo offset 5m) // GOOD. While the following would be *incorrect*: sum(foo) offset 5m // INVALID. The same works for range vectors. This returns the 5-minutes-rate that `foo` had a week ago: rate(foo[5m] offset 1w) This change touches the following components: * Lexer/parser: additions to correctly parse the new `offset`/`OFFSET` keyword. * AST: vector and matrix nodes now have an additional `offset` field. This is used during their evaluation to adjust query and result times appropriately. * Query analyzer: now works on separate sets of ranges and instants per offset. Isolating different offsets from each other completely in this way keeps the preloading code relatively simple. No storage engine changes were needed by this change. The rules tests have been changed to not probe the internal implementation details of the query analyzer anymore (how many instants and ranges have been preloaded). This would also become too cumbersome to test with the new model, and measuring the result of the query should be sufficient. This fixes https://github.com/prometheus/prometheus/issues/529 This fixed https://github.com/prometheus/promdash/issues/201
2015-02-17 17:30:41 -08:00
offset: offset,
iterators: map[clientmodel.Fingerprint]local.SeriesIterator{},
metrics: map[clientmodel.Fingerprint]clientmodel.COWMetric{},
}
}
// NewVectorAggregation returns a (not yet evaluated)
// VectorAggregation, aggregating the given VectorNode using the given
// AggrType, grouping by the given LabelNames.
func NewVectorAggregation(aggrType AggrType, vector VectorNode, groupBy clientmodel.LabelNames, keepExtraLabels bool) *VectorAggregation {
return &VectorAggregation{
aggrType: aggrType,
groupBy: groupBy,
keepExtraLabels: keepExtraLabels,
vector: vector,
}
}
// NewFunctionCall returns a (not yet evaluated) function call node
// (of type ScalarFunctionCall, VectorFunctionCall, or
// StringFunctionCall).
func NewFunctionCall(function *Function, args Nodes) (Node, error) {
if err := function.CheckArgTypes(args); err != nil {
return nil, err
}
switch function.returnType {
case ScalarType:
return &ScalarFunctionCall{
function: function,
args: args,
}, nil
case VectorType:
return &VectorFunctionCall{
function: function,
args: args,
}, nil
case StringType:
return &StringFunctionCall{
function: function,
args: args,
}, nil
}
panic("Function with invalid return type")
}
func nodesHaveTypes(nodes Nodes, exprTypes []ExprType) bool {
for _, node := range nodes {
correctType := false
for _, exprType := range exprTypes {
if node.Type() == exprType {
correctType = true
}
}
if !correctType {
return false
}
}
return true
}
// NewArithExpr returns a (not yet evaluated) expression node (of type
// VectorArithExpr or ScalarArithExpr).
func NewArithExpr(opType BinOpType, lhs Node, rhs Node, matchCard VectorMatchCardinality, matchOn, include clientmodel.LabelNames) (Node, error) {
if !nodesHaveTypes(Nodes{lhs, rhs}, []ExprType{ScalarType, VectorType}) {
return nil, errors.New("binary operands must be of vector or scalar type")
}
if opType == And || opType == Or {
if lhs.Type() == ScalarType || rhs.Type() == ScalarType {
return nil, errors.New("AND and OR operators may only be used between vectors")
}
// Logical operations must never be used with group modifiers.
if len(include) > 0 {
return nil, errors.New("AND and OR operators must not have a group modifier")
}
}
if lhs.Type() != VectorType || rhs.Type() != VectorType {
if matchCard != MatchOneToOne || matchOn != nil || include != nil {
return nil, errors.New("binary scalar expressions cannot have vector matching options")
}
}
if lhs.Type() == VectorType || rhs.Type() == VectorType {
return &VectorArithExpr{
opType: opType,
lhs: lhs,
rhs: rhs,
matchCardinality: matchCard,
matchOn: matchOn,
includeLabels: include,
}, nil
}
return &ScalarArithExpr{
opType: opType,
lhs: lhs.(ScalarNode),
rhs: rhs.(ScalarNode),
}, nil
}
// NewMatrixSelector returns a (not yet evaluated) MatrixSelector with
// the given VectorSelector and Duration.
Implement offset operator. This allows changing the time offset for individual instant and range vectors in a query. For example, this returns the value of `foo` 5 minutes in the past relative to the current query evaluation time: foo offset 5m Note that the `offset` modifier always needs to follow the selector immediately. I.e. the following would be correct: sum(foo offset 5m) // GOOD. While the following would be *incorrect*: sum(foo) offset 5m // INVALID. The same works for range vectors. This returns the 5-minutes-rate that `foo` had a week ago: rate(foo[5m] offset 1w) This change touches the following components: * Lexer/parser: additions to correctly parse the new `offset`/`OFFSET` keyword. * AST: vector and matrix nodes now have an additional `offset` field. This is used during their evaluation to adjust query and result times appropriately. * Query analyzer: now works on separate sets of ranges and instants per offset. Isolating different offsets from each other completely in this way keeps the preloading code relatively simple. No storage engine changes were needed by this change. The rules tests have been changed to not probe the internal implementation details of the query analyzer anymore (how many instants and ranges have been preloaded). This would also become too cumbersome to test with the new model, and measuring the result of the query should be sufficient. This fixes https://github.com/prometheus/prometheus/issues/529 This fixed https://github.com/prometheus/promdash/issues/201
2015-02-17 17:30:41 -08:00
func NewMatrixSelector(vector *VectorSelector, interval time.Duration, offset time.Duration) *MatrixSelector {
return &MatrixSelector{
labelMatchers: vector.labelMatchers,
interval: interval,
Implement offset operator. This allows changing the time offset for individual instant and range vectors in a query. For example, this returns the value of `foo` 5 minutes in the past relative to the current query evaluation time: foo offset 5m Note that the `offset` modifier always needs to follow the selector immediately. I.e. the following would be correct: sum(foo offset 5m) // GOOD. While the following would be *incorrect*: sum(foo) offset 5m // INVALID. The same works for range vectors. This returns the 5-minutes-rate that `foo` had a week ago: rate(foo[5m] offset 1w) This change touches the following components: * Lexer/parser: additions to correctly parse the new `offset`/`OFFSET` keyword. * AST: vector and matrix nodes now have an additional `offset` field. This is used during their evaluation to adjust query and result times appropriately. * Query analyzer: now works on separate sets of ranges and instants per offset. Isolating different offsets from each other completely in this way keeps the preloading code relatively simple. No storage engine changes were needed by this change. The rules tests have been changed to not probe the internal implementation details of the query analyzer anymore (how many instants and ranges have been preloaded). This would also become too cumbersome to test with the new model, and measuring the result of the query should be sufficient. This fixes https://github.com/prometheus/prometheus/issues/529 This fixed https://github.com/prometheus/promdash/issues/201
2015-02-17 17:30:41 -08:00
offset: offset,
iterators: map[clientmodel.Fingerprint]local.SeriesIterator{},
metrics: map[clientmodel.Fingerprint]clientmodel.COWMetric{},
}
}
// NewStringLiteral returns a StringLiteral with the given string as
// value.
func NewStringLiteral(str string) *StringLiteral {
return &StringLiteral{
str: str,
}
}