mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
ead0933dd9
Signed-off-by: czm <zhongming.chang@daocloud.io>
1957 lines
56 KiB
Go
1957 lines
56 KiB
Go
// Copyright 2013 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package promql
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"regexp"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
opentracing "github.com/opentracing/opentracing-go"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/model"
|
|
|
|
"github.com/prometheus/prometheus/pkg/gate"
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
|
"github.com/prometheus/prometheus/pkg/value"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/util/stats"
|
|
)
|
|
|
|
const (
|
|
namespace = "prometheus"
|
|
subsystem = "engine"
|
|
queryTag = "query"
|
|
env = "query execution"
|
|
|
|
// The largest SampleValue that can be converted to an int64 without overflow.
|
|
maxInt64 = 9223372036854774784
|
|
// The smallest SampleValue that can be converted to an int64 without underflow.
|
|
minInt64 = -9223372036854775808
|
|
)
|
|
|
|
var (
|
|
// LookbackDelta determines the time since the last sample after which a time
|
|
// series is considered stale.
|
|
LookbackDelta = 5 * time.Minute
|
|
|
|
// DefaultEvaluationInterval is the default evaluation interval of
|
|
// a subquery in milliseconds.
|
|
DefaultEvaluationInterval int64
|
|
)
|
|
|
|
// SetDefaultEvaluationInterval sets DefaultEvaluationInterval.
|
|
func SetDefaultEvaluationInterval(ev time.Duration) {
|
|
atomic.StoreInt64(&DefaultEvaluationInterval, durationToInt64Millis(ev))
|
|
}
|
|
|
|
// GetDefaultEvaluationInterval returns the DefaultEvaluationInterval as time.Duration.
|
|
func GetDefaultEvaluationInterval() int64 {
|
|
return atomic.LoadInt64(&DefaultEvaluationInterval)
|
|
}
|
|
|
|
type engineMetrics struct {
|
|
currentQueries prometheus.Gauge
|
|
maxConcurrentQueries prometheus.Gauge
|
|
queryQueueTime prometheus.Summary
|
|
queryPrepareTime prometheus.Summary
|
|
queryInnerEval prometheus.Summary
|
|
queryResultSort prometheus.Summary
|
|
}
|
|
|
|
// convertibleToInt64 returns true if v does not over-/underflow an int64.
|
|
func convertibleToInt64(v float64) bool {
|
|
return v <= maxInt64 && v >= minInt64
|
|
}
|
|
|
|
type (
|
|
// ErrQueryTimeout is returned if a query timed out during processing.
|
|
ErrQueryTimeout string
|
|
// ErrQueryCanceled is returned if a query was canceled during processing.
|
|
ErrQueryCanceled string
|
|
// ErrTooManySamples is returned if a query would load more than the maximum allowed samples into memory.
|
|
ErrTooManySamples string
|
|
// ErrStorage is returned if an error was encountered in the storage layer
|
|
// during query handling.
|
|
ErrStorage struct{ Err error }
|
|
)
|
|
|
|
func (e ErrQueryTimeout) Error() string {
|
|
return fmt.Sprintf("query timed out in %s", string(e))
|
|
}
|
|
func (e ErrQueryCanceled) Error() string {
|
|
return fmt.Sprintf("query was canceled in %s", string(e))
|
|
}
|
|
func (e ErrTooManySamples) Error() string {
|
|
return fmt.Sprintf("query processing would load too many samples into memory in %s", string(e))
|
|
}
|
|
func (e ErrStorage) Error() string {
|
|
return e.Err.Error()
|
|
}
|
|
|
|
// A Query is derived from an a raw query string and can be run against an engine
|
|
// it is associated with.
|
|
type Query interface {
|
|
// Exec processes the query. Can only be called once.
|
|
Exec(ctx context.Context) *Result
|
|
// Close recovers memory used by the query result.
|
|
Close()
|
|
// Statement returns the parsed statement of the query.
|
|
Statement() Statement
|
|
// Stats returns statistics about the lifetime of the query.
|
|
Stats() *stats.QueryTimers
|
|
// Cancel signals that a running query execution should be aborted.
|
|
Cancel()
|
|
}
|
|
|
|
// query implements the Query interface.
|
|
type query struct {
|
|
// Underlying data provider.
|
|
queryable storage.Queryable
|
|
// The original query string.
|
|
q string
|
|
// Statement of the parsed query.
|
|
stmt Statement
|
|
// Timer stats for the query execution.
|
|
stats *stats.QueryTimers
|
|
// Result matrix for reuse.
|
|
matrix Matrix
|
|
// Cancellation function for the query.
|
|
cancel func()
|
|
|
|
// The engine against which the query is executed.
|
|
ng *Engine
|
|
}
|
|
|
|
// Statement implements the Query interface.
|
|
func (q *query) Statement() Statement {
|
|
return q.stmt
|
|
}
|
|
|
|
// Stats implements the Query interface.
|
|
func (q *query) Stats() *stats.QueryTimers {
|
|
return q.stats
|
|
}
|
|
|
|
// Cancel implements the Query interface.
|
|
func (q *query) Cancel() {
|
|
if q.cancel != nil {
|
|
q.cancel()
|
|
}
|
|
}
|
|
|
|
// Close implements the Query interface.
|
|
func (q *query) Close() {
|
|
for _, s := range q.matrix {
|
|
putPointSlice(s.Points)
|
|
}
|
|
}
|
|
|
|
// Exec implements the Query interface.
|
|
func (q *query) Exec(ctx context.Context) *Result {
|
|
if span := opentracing.SpanFromContext(ctx); span != nil {
|
|
span.SetTag(queryTag, q.stmt.String())
|
|
}
|
|
|
|
// Log query in active log.
|
|
var queryIndex int
|
|
if q.ng.activeQueryTracker != nil {
|
|
queryIndex = q.ng.activeQueryTracker.Insert(q.q)
|
|
}
|
|
|
|
// Exec query.
|
|
res, warnings, err := q.ng.exec(ctx, q)
|
|
|
|
// Delete query from active log.
|
|
if q.ng.activeQueryTracker != nil {
|
|
q.ng.activeQueryTracker.Delete(queryIndex)
|
|
}
|
|
return &Result{Err: err, Value: res, Warnings: warnings}
|
|
}
|
|
|
|
// contextDone returns an error if the context was canceled or timed out.
|
|
func contextDone(ctx context.Context, env string) error {
|
|
if err := ctx.Err(); err != nil {
|
|
return contextErr(err, env)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func contextErr(err error, env string) error {
|
|
switch err {
|
|
case context.Canceled:
|
|
return ErrQueryCanceled(env)
|
|
case context.DeadlineExceeded:
|
|
return ErrQueryTimeout(env)
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
|
|
// EngineOpts contains configuration options used when creating a new Engine.
|
|
type EngineOpts struct {
|
|
Logger log.Logger
|
|
Reg prometheus.Registerer
|
|
MaxConcurrent int
|
|
MaxSamples int
|
|
Timeout time.Duration
|
|
ActiveQueryTracker *ActiveQueryTracker
|
|
}
|
|
|
|
// Engine handles the lifetime of queries from beginning to end.
|
|
// It is connected to a querier.
|
|
type Engine struct {
|
|
logger log.Logger
|
|
metrics *engineMetrics
|
|
timeout time.Duration
|
|
gate *gate.Gate
|
|
maxSamplesPerQuery int
|
|
activeQueryTracker *ActiveQueryTracker
|
|
}
|
|
|
|
// NewEngine returns a new engine.
|
|
func NewEngine(opts EngineOpts) *Engine {
|
|
if opts.Logger == nil {
|
|
opts.Logger = log.NewNopLogger()
|
|
}
|
|
|
|
metrics := &engineMetrics{
|
|
currentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "queries",
|
|
Help: "The current number of queries being executed or waiting.",
|
|
}),
|
|
maxConcurrentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "queries_concurrent_max",
|
|
Help: "The max number of concurrent queries.",
|
|
}),
|
|
queryQueueTime: prometheus.NewSummary(prometheus.SummaryOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "query_duration_seconds",
|
|
Help: "Query timings",
|
|
ConstLabels: prometheus.Labels{"slice": "queue_time"},
|
|
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
|
}),
|
|
queryPrepareTime: prometheus.NewSummary(prometheus.SummaryOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "query_duration_seconds",
|
|
Help: "Query timings",
|
|
ConstLabels: prometheus.Labels{"slice": "prepare_time"},
|
|
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
|
}),
|
|
queryInnerEval: prometheus.NewSummary(prometheus.SummaryOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "query_duration_seconds",
|
|
Help: "Query timings",
|
|
ConstLabels: prometheus.Labels{"slice": "inner_eval"},
|
|
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
|
}),
|
|
queryResultSort: prometheus.NewSummary(prometheus.SummaryOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "query_duration_seconds",
|
|
Help: "Query timings",
|
|
ConstLabels: prometheus.Labels{"slice": "result_sort"},
|
|
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
|
}),
|
|
}
|
|
metrics.maxConcurrentQueries.Set(float64(opts.MaxConcurrent))
|
|
|
|
if opts.Reg != nil {
|
|
opts.Reg.MustRegister(
|
|
metrics.currentQueries,
|
|
metrics.maxConcurrentQueries,
|
|
metrics.queryQueueTime,
|
|
metrics.queryPrepareTime,
|
|
metrics.queryInnerEval,
|
|
metrics.queryResultSort,
|
|
)
|
|
}
|
|
|
|
return &Engine{
|
|
gate: gate.New(opts.MaxConcurrent),
|
|
timeout: opts.Timeout,
|
|
logger: opts.Logger,
|
|
metrics: metrics,
|
|
maxSamplesPerQuery: opts.MaxSamples,
|
|
activeQueryTracker: opts.ActiveQueryTracker,
|
|
}
|
|
}
|
|
|
|
// NewInstantQuery returns an evaluation query for the given expression at the given time.
|
|
func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (Query, error) {
|
|
expr, err := ParseExpr(qs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
qry := ng.newQuery(q, expr, ts, ts, 0)
|
|
qry.q = qs
|
|
|
|
return qry, nil
|
|
}
|
|
|
|
// NewRangeQuery returns an evaluation query for the given time range and with
|
|
// the resolution set by the interval.
|
|
func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time.Time, interval time.Duration) (Query, error) {
|
|
expr, err := ParseExpr(qs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if expr.Type() != ValueTypeVector && expr.Type() != ValueTypeScalar {
|
|
return nil, errors.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", documentedType(expr.Type()))
|
|
}
|
|
qry := ng.newQuery(q, expr, start, end, interval)
|
|
qry.q = qs
|
|
|
|
return qry, nil
|
|
}
|
|
|
|
func (ng *Engine) newQuery(q storage.Queryable, expr Expr, start, end time.Time, interval time.Duration) *query {
|
|
es := &EvalStmt{
|
|
Expr: expr,
|
|
Start: start,
|
|
End: end,
|
|
Interval: interval,
|
|
}
|
|
qry := &query{
|
|
stmt: es,
|
|
ng: ng,
|
|
stats: stats.NewQueryTimers(),
|
|
queryable: q,
|
|
}
|
|
return qry
|
|
}
|
|
|
|
// testStmt is an internal helper statement that allows execution
|
|
// of an arbitrary function during handling. It is used to test the Engine.
|
|
type testStmt func(context.Context) error
|
|
|
|
func (testStmt) String() string { return "test statement" }
|
|
func (testStmt) stmt() {}
|
|
|
|
func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
|
|
qry := &query{
|
|
q: "test statement",
|
|
stmt: testStmt(f),
|
|
ng: ng,
|
|
stats: stats.NewQueryTimers(),
|
|
}
|
|
return qry
|
|
}
|
|
|
|
// exec executes the query.
|
|
//
|
|
// At this point per query only one EvalStmt is evaluated. Alert and record
|
|
// statements are not handled by the Engine.
|
|
func (ng *Engine) exec(ctx context.Context, q *query) (Value, storage.Warnings, error) {
|
|
ng.metrics.currentQueries.Inc()
|
|
defer ng.metrics.currentQueries.Dec()
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
|
|
q.cancel = cancel
|
|
|
|
execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime)
|
|
defer execSpanTimer.Finish()
|
|
|
|
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
|
|
|
|
if err := ng.gate.Start(ctx); err != nil {
|
|
return nil, nil, contextErr(err, "query queue")
|
|
}
|
|
defer ng.gate.Done()
|
|
|
|
queueSpanTimer.Finish()
|
|
|
|
// Cancel when execution is done or an error was raised.
|
|
defer q.cancel()
|
|
|
|
const env = "query execution"
|
|
|
|
evalSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.EvalTotalTime)
|
|
defer evalSpanTimer.Finish()
|
|
|
|
// The base context might already be canceled on the first iteration (e.g. during shutdown).
|
|
if err := contextDone(ctx, env); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
switch s := q.Statement().(type) {
|
|
case *EvalStmt:
|
|
return ng.execEvalStmt(ctx, q, s)
|
|
case testStmt:
|
|
return nil, nil, s(ctx)
|
|
}
|
|
|
|
panic(errors.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement()))
|
|
}
|
|
|
|
func timeMilliseconds(t time.Time) int64 {
|
|
return t.UnixNano() / int64(time.Millisecond/time.Nanosecond)
|
|
}
|
|
|
|
func durationMilliseconds(d time.Duration) int64 {
|
|
return int64(d / (time.Millisecond / time.Nanosecond))
|
|
}
|
|
|
|
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
|
|
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, storage.Warnings, error) {
|
|
prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime)
|
|
querier, warnings, err := ng.populateSeries(ctxPrepare, query.queryable, s)
|
|
prepareSpanTimer.Finish()
|
|
|
|
// XXX(fabxc): the querier returned by populateSeries might be instantiated
|
|
// we must not return without closing irrespective of the error.
|
|
// TODO: make this semantically saner.
|
|
if querier != nil {
|
|
defer querier.Close()
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, warnings, err
|
|
}
|
|
|
|
evalSpanTimer, ctxInnerEval := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval)
|
|
// Instant evaluation. This is executed as a range evaluation with one step.
|
|
if s.Start == s.End && s.Interval == 0 {
|
|
start := timeMilliseconds(s.Start)
|
|
evaluator := &evaluator{
|
|
startTimestamp: start,
|
|
endTimestamp: start,
|
|
interval: 1,
|
|
ctx: ctxInnerEval,
|
|
maxSamples: ng.maxSamplesPerQuery,
|
|
defaultEvalInterval: GetDefaultEvaluationInterval(),
|
|
logger: ng.logger,
|
|
}
|
|
val, err := evaluator.Eval(s.Expr)
|
|
if err != nil {
|
|
return nil, warnings, err
|
|
}
|
|
|
|
evalSpanTimer.Finish()
|
|
|
|
mat, ok := val.(Matrix)
|
|
if !ok {
|
|
panic(errors.Errorf("promql.Engine.exec: invalid expression type %q", val.Type()))
|
|
}
|
|
query.matrix = mat
|
|
switch s.Expr.Type() {
|
|
case ValueTypeVector:
|
|
// Convert matrix with one value per series into vector.
|
|
vector := make(Vector, len(mat))
|
|
for i, s := range mat {
|
|
// Point might have a different timestamp, force it to the evaluation
|
|
// timestamp as that is when we ran the evaluation.
|
|
vector[i] = Sample{Metric: s.Metric, Point: Point{V: s.Points[0].V, T: start}}
|
|
}
|
|
return vector, warnings, nil
|
|
case ValueTypeScalar:
|
|
return Scalar{V: mat[0].Points[0].V, T: start}, warnings, nil
|
|
case ValueTypeMatrix:
|
|
return mat, warnings, nil
|
|
default:
|
|
panic(errors.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type()))
|
|
}
|
|
|
|
}
|
|
|
|
// Range evaluation.
|
|
evaluator := &evaluator{
|
|
startTimestamp: timeMilliseconds(s.Start),
|
|
endTimestamp: timeMilliseconds(s.End),
|
|
interval: durationMilliseconds(s.Interval),
|
|
ctx: ctxInnerEval,
|
|
maxSamples: ng.maxSamplesPerQuery,
|
|
defaultEvalInterval: GetDefaultEvaluationInterval(),
|
|
logger: ng.logger,
|
|
}
|
|
val, err := evaluator.Eval(s.Expr)
|
|
if err != nil {
|
|
return nil, warnings, err
|
|
}
|
|
evalSpanTimer.Finish()
|
|
|
|
mat, ok := val.(Matrix)
|
|
if !ok {
|
|
panic(errors.Errorf("promql.Engine.exec: invalid expression type %q", val.Type()))
|
|
}
|
|
query.matrix = mat
|
|
|
|
if err := contextDone(ctx, "expression evaluation"); err != nil {
|
|
return nil, warnings, err
|
|
}
|
|
|
|
// TODO(fabxc): order ensured by storage?
|
|
// TODO(fabxc): where to ensure metric labels are a copy from the storage internals.
|
|
sortSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.ResultSortTime, ng.metrics.queryResultSort)
|
|
sort.Sort(mat)
|
|
sortSpanTimer.Finish()
|
|
|
|
return mat, warnings, nil
|
|
}
|
|
|
|
// cumulativeSubqueryOffset returns the sum of range and offset of all subqueries in the path.
|
|
func (ng *Engine) cumulativeSubqueryOffset(path []Node) time.Duration {
|
|
var subqOffset time.Duration
|
|
for _, node := range path {
|
|
switch n := node.(type) {
|
|
case *SubqueryExpr:
|
|
subqOffset += n.Range + n.Offset
|
|
}
|
|
}
|
|
return subqOffset
|
|
}
|
|
|
|
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, storage.Warnings, error) {
|
|
var maxOffset time.Duration
|
|
Inspect(s.Expr, func(node Node, path []Node) error {
|
|
subqOffset := ng.cumulativeSubqueryOffset(path)
|
|
switch n := node.(type) {
|
|
case *VectorSelector:
|
|
if maxOffset < LookbackDelta+subqOffset {
|
|
maxOffset = LookbackDelta + subqOffset
|
|
}
|
|
if n.Offset+LookbackDelta+subqOffset > maxOffset {
|
|
maxOffset = n.Offset + LookbackDelta + subqOffset
|
|
}
|
|
case *MatrixSelector:
|
|
if maxOffset < n.Range+subqOffset {
|
|
maxOffset = n.Range + subqOffset
|
|
}
|
|
if n.Offset+n.Range+subqOffset > maxOffset {
|
|
maxOffset = n.Offset + n.Range + subqOffset
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
mint := s.Start.Add(-maxOffset)
|
|
|
|
querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
var warnings storage.Warnings
|
|
|
|
Inspect(s.Expr, func(node Node, path []Node) error {
|
|
var set storage.SeriesSet
|
|
var wrn storage.Warnings
|
|
params := &storage.SelectParams{
|
|
Start: timestamp.FromTime(s.Start),
|
|
End: timestamp.FromTime(s.End),
|
|
Step: durationToInt64Millis(s.Interval),
|
|
}
|
|
|
|
// We need to make sure we select the timerange selected by the subquery.
|
|
// TODO(gouthamve): cumulativeSubqueryOffset gives the sum of range and the offset
|
|
// we can optimise it by separating out the range and offsets, and subtracting the offsets
|
|
// from end also.
|
|
subqOffset := ng.cumulativeSubqueryOffset(path)
|
|
offsetMilliseconds := durationMilliseconds(subqOffset)
|
|
params.Start = params.Start - offsetMilliseconds
|
|
|
|
switch n := node.(type) {
|
|
case *VectorSelector:
|
|
params.Start = params.Start - durationMilliseconds(LookbackDelta)
|
|
params.Func = extractFuncFromPath(path)
|
|
if n.Offset > 0 {
|
|
offsetMilliseconds := durationMilliseconds(n.Offset)
|
|
params.Start = params.Start - offsetMilliseconds
|
|
params.End = params.End - offsetMilliseconds
|
|
}
|
|
|
|
set, wrn, err = querier.Select(params, n.LabelMatchers...)
|
|
warnings = append(warnings, wrn...)
|
|
if err != nil {
|
|
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
|
return err
|
|
}
|
|
n.unexpandedSeriesSet = set
|
|
|
|
case *MatrixSelector:
|
|
params.Func = extractFuncFromPath(path)
|
|
// For all matrix queries we want to ensure that we have (end-start) + range selected
|
|
// this way we have `range` data before the start time
|
|
params.Start = params.Start - durationMilliseconds(n.Range)
|
|
if n.Offset > 0 {
|
|
offsetMilliseconds := durationMilliseconds(n.Offset)
|
|
params.Start = params.Start - offsetMilliseconds
|
|
params.End = params.End - offsetMilliseconds
|
|
}
|
|
|
|
set, wrn, err = querier.Select(params, n.LabelMatchers...)
|
|
warnings = append(warnings, wrn...)
|
|
if err != nil {
|
|
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
|
return err
|
|
}
|
|
n.unexpandedSeriesSet = set
|
|
}
|
|
return nil
|
|
})
|
|
return querier, warnings, err
|
|
}
|
|
|
|
// extractFuncFromPath walks up the path and searches for the first instance of
|
|
// a function or aggregation.
|
|
func extractFuncFromPath(p []Node) string {
|
|
if len(p) == 0 {
|
|
return ""
|
|
}
|
|
switch n := p[len(p)-1].(type) {
|
|
case *AggregateExpr:
|
|
return n.Op.String()
|
|
case *Call:
|
|
return n.Func.Name
|
|
case *BinaryExpr:
|
|
// If we hit a binary expression we terminate since we only care about functions
|
|
// or aggregations over a single metric.
|
|
return ""
|
|
}
|
|
return extractFuncFromPath(p[:len(p)-1])
|
|
}
|
|
|
|
func checkForSeriesSetExpansion(ctx context.Context, expr Expr) {
|
|
switch e := expr.(type) {
|
|
case *MatrixSelector:
|
|
if e.series == nil {
|
|
series, err := expandSeriesSet(ctx, e.unexpandedSeriesSet)
|
|
if err != nil {
|
|
panic(err)
|
|
} else {
|
|
e.series = series
|
|
}
|
|
}
|
|
case *VectorSelector:
|
|
if e.series == nil {
|
|
series, err := expandSeriesSet(ctx, e.unexpandedSeriesSet)
|
|
if err != nil {
|
|
panic(err)
|
|
} else {
|
|
e.series = series
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, err error) {
|
|
for it.Next() {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
res = append(res, it.At())
|
|
}
|
|
return res, it.Err()
|
|
}
|
|
|
|
// An evaluator evaluates given expressions over given fixed timestamps. It
|
|
// is attached to an engine through which it connects to a querier and reports
|
|
// errors. On timeout or cancellation of its context it terminates.
|
|
type evaluator struct {
|
|
ctx context.Context
|
|
|
|
startTimestamp int64 // Start time in milliseconds.
|
|
endTimestamp int64 // End time in milliseconds.
|
|
interval int64 // Interval in milliseconds.
|
|
|
|
maxSamples int
|
|
currentSamples int
|
|
defaultEvalInterval int64
|
|
logger log.Logger
|
|
}
|
|
|
|
// errorf causes a panic with the input formatted into an error.
|
|
func (ev *evaluator) errorf(format string, args ...interface{}) {
|
|
ev.error(errors.Errorf(format, args...))
|
|
}
|
|
|
|
// error causes a panic with the given error.
|
|
func (ev *evaluator) error(err error) {
|
|
panic(err)
|
|
}
|
|
|
|
// recover is the handler that turns panics into returns from the top level of evaluation.
|
|
func (ev *evaluator) recover(errp *error) {
|
|
e := recover()
|
|
if e == nil {
|
|
return
|
|
}
|
|
if err, ok := e.(runtime.Error); ok {
|
|
// Print the stack trace but do not inhibit the running application.
|
|
buf := make([]byte, 64<<10)
|
|
buf = buf[:runtime.Stack(buf, false)]
|
|
|
|
level.Error(ev.logger).Log("msg", "runtime panic in parser", "err", e, "stacktrace", string(buf))
|
|
*errp = errors.Wrap(err, "unexpected error")
|
|
} else {
|
|
*errp = e.(error)
|
|
}
|
|
}
|
|
|
|
func (ev *evaluator) Eval(expr Expr) (v Value, err error) {
|
|
defer ev.recover(&err)
|
|
return ev.eval(expr), nil
|
|
}
|
|
|
|
// EvalNodeHelper stores extra information and caches for evaluating a single node across steps.
|
|
type EvalNodeHelper struct {
|
|
// Evaluation timestamp.
|
|
ts int64
|
|
// Vector that can be used for output.
|
|
out Vector
|
|
|
|
// Caches.
|
|
// dropMetricName and label_*.
|
|
dmn map[uint64]labels.Labels
|
|
// signatureFunc.
|
|
sigf map[uint64]uint64
|
|
// funcHistogramQuantile.
|
|
signatureToMetricWithBuckets map[uint64]*metricWithBuckets
|
|
// label_replace.
|
|
regex *regexp.Regexp
|
|
|
|
// For binary vector matching.
|
|
rightSigs map[uint64]Sample
|
|
matchedSigs map[uint64]map[uint64]struct{}
|
|
resultMetric map[uint64]labels.Labels
|
|
}
|
|
|
|
// dropMetricName is a cached version of dropMetricName.
|
|
func (enh *EvalNodeHelper) dropMetricName(l labels.Labels) labels.Labels {
|
|
if enh.dmn == nil {
|
|
enh.dmn = make(map[uint64]labels.Labels, len(enh.out))
|
|
}
|
|
h := l.Hash()
|
|
ret, ok := enh.dmn[h]
|
|
if ok {
|
|
return ret
|
|
}
|
|
ret = dropMetricName(l)
|
|
enh.dmn[h] = ret
|
|
return ret
|
|
}
|
|
|
|
// signatureFunc is a cached version of signatureFunc.
|
|
func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.Labels) uint64 {
|
|
if enh.sigf == nil {
|
|
enh.sigf = make(map[uint64]uint64, len(enh.out))
|
|
}
|
|
f := signatureFunc(on, names...)
|
|
return func(l labels.Labels) uint64 {
|
|
h := l.Hash()
|
|
ret, ok := enh.sigf[h]
|
|
if ok {
|
|
return ret
|
|
}
|
|
ret = f(l)
|
|
enh.sigf[h] = ret
|
|
return ret
|
|
}
|
|
}
|
|
|
|
// rangeEval evaluates the given expressions, and then for each step calls
|
|
// the given function with the values computed for each expression at that
|
|
// step. The return value is the combination into time series of all the
|
|
// function call results.
|
|
func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs ...Expr) Matrix {
|
|
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
|
|
matrixes := make([]Matrix, len(exprs))
|
|
origMatrixes := make([]Matrix, len(exprs))
|
|
originalNumSamples := ev.currentSamples
|
|
|
|
for i, e := range exprs {
|
|
// Functions will take string arguments from the expressions, not the values.
|
|
if e != nil && e.Type() != ValueTypeString {
|
|
// ev.currentSamples will be updated to the correct value within the ev.eval call.
|
|
matrixes[i] = ev.eval(e).(Matrix)
|
|
|
|
// Keep a copy of the original point slices so that they
|
|
// can be returned to the pool.
|
|
origMatrixes[i] = make(Matrix, len(matrixes[i]))
|
|
copy(origMatrixes[i], matrixes[i])
|
|
}
|
|
}
|
|
|
|
vectors := make([]Vector, len(exprs)) // Input vectors for the function.
|
|
args := make([]Value, len(exprs)) // Argument to function.
|
|
// Create an output vector that is as big as the input matrix with
|
|
// the most time series.
|
|
biggestLen := 1
|
|
for i := range exprs {
|
|
vectors[i] = make(Vector, 0, len(matrixes[i]))
|
|
if len(matrixes[i]) > biggestLen {
|
|
biggestLen = len(matrixes[i])
|
|
}
|
|
}
|
|
enh := &EvalNodeHelper{out: make(Vector, 0, biggestLen)}
|
|
seriess := make(map[uint64]Series, biggestLen) // Output series by series hash.
|
|
tempNumSamples := ev.currentSamples
|
|
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
|
|
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
|
|
ev.error(err)
|
|
}
|
|
// Reset number of samples in memory after each timestamp.
|
|
ev.currentSamples = tempNumSamples
|
|
// Gather input vectors for this timestamp.
|
|
for i := range exprs {
|
|
vectors[i] = vectors[i][:0]
|
|
for si, series := range matrixes[i] {
|
|
for _, point := range series.Points {
|
|
if point.T == ts {
|
|
if ev.currentSamples < ev.maxSamples {
|
|
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, Point: point})
|
|
// Move input vectors forward so we don't have to re-scan the same
|
|
// past points at the next step.
|
|
matrixes[i][si].Points = series.Points[1:]
|
|
ev.currentSamples++
|
|
} else {
|
|
ev.error(ErrTooManySamples(env))
|
|
}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
args[i] = vectors[i]
|
|
}
|
|
// Make the function call.
|
|
enh.ts = ts
|
|
result := f(args, enh)
|
|
if result.ContainsSameLabelset() {
|
|
ev.errorf("vector cannot contain metrics with the same labelset")
|
|
}
|
|
enh.out = result[:0] // Reuse result vector.
|
|
|
|
ev.currentSamples += len(result)
|
|
// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
|
|
// needs to include the samples from the result here, as they're still in memory.
|
|
tempNumSamples += len(result)
|
|
|
|
if ev.currentSamples > ev.maxSamples {
|
|
ev.error(ErrTooManySamples(env))
|
|
}
|
|
|
|
// If this could be an instant query, shortcut so as not to change sort order.
|
|
if ev.endTimestamp == ev.startTimestamp {
|
|
mat := make(Matrix, len(result))
|
|
for i, s := range result {
|
|
s.Point.T = ts
|
|
mat[i] = Series{Metric: s.Metric, Points: []Point{s.Point}}
|
|
}
|
|
ev.currentSamples = originalNumSamples + mat.TotalSamples()
|
|
return mat
|
|
}
|
|
|
|
// Add samples in output vector to output series.
|
|
for _, sample := range result {
|
|
h := sample.Metric.Hash()
|
|
ss, ok := seriess[h]
|
|
if !ok {
|
|
ss = Series{
|
|
Metric: sample.Metric,
|
|
Points: getPointSlice(numSteps),
|
|
}
|
|
}
|
|
sample.Point.T = ts
|
|
ss.Points = append(ss.Points, sample.Point)
|
|
seriess[h] = ss
|
|
|
|
}
|
|
}
|
|
|
|
// Reuse the original point slices.
|
|
for _, m := range origMatrixes {
|
|
for _, s := range m {
|
|
putPointSlice(s.Points)
|
|
}
|
|
}
|
|
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
|
|
mat := make(Matrix, 0, len(seriess))
|
|
for _, ss := range seriess {
|
|
mat = append(mat, ss)
|
|
}
|
|
ev.currentSamples = originalNumSamples + mat.TotalSamples()
|
|
return mat
|
|
}
|
|
|
|
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
|
|
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
|
|
func (ev *evaluator) evalSubquery(subq *SubqueryExpr) *MatrixSelector {
|
|
val := ev.eval(subq).(Matrix)
|
|
ms := &MatrixSelector{
|
|
Range: subq.Range,
|
|
Offset: subq.Offset,
|
|
series: make([]storage.Series, 0, len(val)),
|
|
}
|
|
for _, s := range val {
|
|
ms.series = append(ms.series, NewStorageSeries(s))
|
|
}
|
|
return ms
|
|
}
|
|
|
|
// eval evaluates the given expression as the given AST expression node requires.
|
|
func (ev *evaluator) eval(expr Expr) Value {
|
|
// This is the top-level evaluation method.
|
|
// Thus, we check for timeout/cancellation here.
|
|
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
|
|
ev.error(err)
|
|
}
|
|
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
|
|
|
|
switch e := expr.(type) {
|
|
case *AggregateExpr:
|
|
if s, ok := e.Param.(*StringLiteral); ok {
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
return ev.aggregation(e.Op, e.Grouping, e.Without, s.Val, v[0].(Vector), enh)
|
|
}, e.Expr)
|
|
}
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
var param float64
|
|
if e.Param != nil {
|
|
param = v[0].(Vector)[0].V
|
|
}
|
|
return ev.aggregation(e.Op, e.Grouping, e.Without, param, v[1].(Vector), enh)
|
|
}, e.Param, e.Expr)
|
|
|
|
case *Call:
|
|
if e.Func.Name == "timestamp" {
|
|
// Matrix evaluation always returns the evaluation time,
|
|
// so this function needs special handling when given
|
|
// a vector selector.
|
|
vs, ok := e.Args[0].(*VectorSelector)
|
|
if ok {
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
return e.Func.Call([]Value{ev.vectorSelector(vs, enh.ts)}, e.Args, enh)
|
|
})
|
|
}
|
|
}
|
|
|
|
// Check if the function has a matrix argument.
|
|
var matrixArgIndex int
|
|
var matrixArg bool
|
|
for i, a := range e.Args {
|
|
if _, ok := a.(*MatrixSelector); ok {
|
|
matrixArgIndex = i
|
|
matrixArg = true
|
|
break
|
|
}
|
|
// SubqueryExpr can be used in place of MatrixSelector.
|
|
if subq, ok := a.(*SubqueryExpr); ok {
|
|
matrixArgIndex = i
|
|
matrixArg = true
|
|
// Replacing SubqueryExpr with MatrixSelector.
|
|
e.Args[i] = ev.evalSubquery(subq)
|
|
break
|
|
}
|
|
}
|
|
if !matrixArg {
|
|
// Does not have a matrix argument.
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
return e.Func.Call(v, e.Args, enh)
|
|
}, e.Args...)
|
|
}
|
|
|
|
inArgs := make([]Value, len(e.Args))
|
|
// Evaluate any non-matrix arguments.
|
|
otherArgs := make([]Matrix, len(e.Args))
|
|
otherInArgs := make([]Vector, len(e.Args))
|
|
for i, e := range e.Args {
|
|
if i != matrixArgIndex {
|
|
otherArgs[i] = ev.eval(e).(Matrix)
|
|
otherInArgs[i] = Vector{Sample{}}
|
|
inArgs[i] = otherInArgs[i]
|
|
}
|
|
}
|
|
|
|
sel := e.Args[matrixArgIndex].(*MatrixSelector)
|
|
checkForSeriesSetExpansion(ev.ctx, sel)
|
|
mat := make(Matrix, 0, len(sel.series)) // Output matrix.
|
|
offset := durationMilliseconds(sel.Offset)
|
|
selRange := durationMilliseconds(sel.Range)
|
|
stepRange := selRange
|
|
if stepRange > ev.interval {
|
|
stepRange = ev.interval
|
|
}
|
|
// Reuse objects across steps to save memory allocations.
|
|
points := getPointSlice(16)
|
|
inMatrix := make(Matrix, 1)
|
|
inArgs[matrixArgIndex] = inMatrix
|
|
enh := &EvalNodeHelper{out: make(Vector, 0, 1)}
|
|
// Process all the calls for one time series at a time.
|
|
it := storage.NewBuffer(selRange)
|
|
for i, s := range sel.series {
|
|
points = points[:0]
|
|
it.Reset(s.Iterator())
|
|
ss := Series{
|
|
// For all range vector functions, the only change to the
|
|
// output labels is dropping the metric name so just do
|
|
// it once here.
|
|
Metric: dropMetricName(sel.series[i].Labels()),
|
|
Points: getPointSlice(numSteps),
|
|
}
|
|
inMatrix[0].Metric = sel.series[i].Labels()
|
|
for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval {
|
|
step++
|
|
// Set the non-matrix arguments.
|
|
// They are scalar, so it is safe to use the step number
|
|
// when looking up the argument, as there will be no gaps.
|
|
for j := range e.Args {
|
|
if j != matrixArgIndex {
|
|
otherInArgs[j][0].V = otherArgs[j][0].Points[step].V
|
|
}
|
|
}
|
|
maxt := ts - offset
|
|
mint := maxt - selRange
|
|
// Evaluate the matrix selector for this series for this step.
|
|
points = ev.matrixIterSlice(it, mint, maxt, points)
|
|
if len(points) == 0 {
|
|
continue
|
|
}
|
|
inMatrix[0].Points = points
|
|
enh.ts = ts
|
|
// Make the function call.
|
|
outVec := e.Func.Call(inArgs, e.Args, enh)
|
|
enh.out = outVec[:0]
|
|
if len(outVec) > 0 {
|
|
ss.Points = append(ss.Points, Point{V: outVec[0].Point.V, T: ts})
|
|
}
|
|
// Only buffer stepRange milliseconds from the second step on.
|
|
it.ReduceDelta(stepRange)
|
|
}
|
|
if len(ss.Points) > 0 {
|
|
if ev.currentSamples < ev.maxSamples {
|
|
mat = append(mat, ss)
|
|
ev.currentSamples += len(ss.Points)
|
|
} else {
|
|
ev.error(ErrTooManySamples(env))
|
|
}
|
|
}
|
|
}
|
|
if mat.ContainsSameLabelset() {
|
|
ev.errorf("vector cannot contain metrics with the same labelset")
|
|
}
|
|
|
|
putPointSlice(points)
|
|
return mat
|
|
|
|
case *ParenExpr:
|
|
return ev.eval(e.Expr)
|
|
|
|
case *UnaryExpr:
|
|
mat := ev.eval(e.Expr).(Matrix)
|
|
if e.Op == ItemSUB {
|
|
for i := range mat {
|
|
mat[i].Metric = dropMetricName(mat[i].Metric)
|
|
for j := range mat[i].Points {
|
|
mat[i].Points[j].V = -mat[i].Points[j].V
|
|
}
|
|
}
|
|
if mat.ContainsSameLabelset() {
|
|
ev.errorf("vector cannot contain metrics with the same labelset")
|
|
}
|
|
}
|
|
return mat
|
|
|
|
case *BinaryExpr:
|
|
switch lt, rt := e.LHS.Type(), e.RHS.Type(); {
|
|
case lt == ValueTypeScalar && rt == ValueTypeScalar:
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
val := scalarBinop(e.Op, v[0].(Vector)[0].Point.V, v[1].(Vector)[0].Point.V)
|
|
return append(enh.out, Sample{Point: Point{V: val}})
|
|
}, e.LHS, e.RHS)
|
|
case lt == ValueTypeVector && rt == ValueTypeVector:
|
|
switch e.Op {
|
|
case ItemLAND:
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh)
|
|
}, e.LHS, e.RHS)
|
|
case ItemLOR:
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh)
|
|
}, e.LHS, e.RHS)
|
|
case ItemLUnless:
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh)
|
|
}, e.LHS, e.RHS)
|
|
default:
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
return ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, enh)
|
|
}, e.LHS, e.RHS)
|
|
}
|
|
|
|
case lt == ValueTypeVector && rt == ValueTypeScalar:
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
return ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].Point.V}, false, e.ReturnBool, enh)
|
|
}, e.LHS, e.RHS)
|
|
|
|
case lt == ValueTypeScalar && rt == ValueTypeVector:
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
return ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].Point.V}, true, e.ReturnBool, enh)
|
|
}, e.LHS, e.RHS)
|
|
}
|
|
|
|
case *NumberLiteral:
|
|
return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector {
|
|
return append(enh.out, Sample{Point: Point{V: e.Val}})
|
|
})
|
|
|
|
case *VectorSelector:
|
|
checkForSeriesSetExpansion(ev.ctx, e)
|
|
mat := make(Matrix, 0, len(e.series))
|
|
it := storage.NewBuffer(durationMilliseconds(LookbackDelta))
|
|
for i, s := range e.series {
|
|
it.Reset(s.Iterator())
|
|
ss := Series{
|
|
Metric: e.series[i].Labels(),
|
|
Points: getPointSlice(numSteps),
|
|
}
|
|
|
|
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
|
|
_, v, ok := ev.vectorSelectorSingle(it, e, ts)
|
|
if ok {
|
|
if ev.currentSamples < ev.maxSamples {
|
|
ss.Points = append(ss.Points, Point{V: v, T: ts})
|
|
ev.currentSamples++
|
|
} else {
|
|
ev.error(ErrTooManySamples(env))
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(ss.Points) > 0 {
|
|
mat = append(mat, ss)
|
|
}
|
|
|
|
}
|
|
return mat
|
|
|
|
case *MatrixSelector:
|
|
if ev.startTimestamp != ev.endTimestamp {
|
|
panic(errors.New("cannot do range evaluation of matrix selector"))
|
|
}
|
|
return ev.matrixSelector(e)
|
|
|
|
case *SubqueryExpr:
|
|
offsetMillis := durationToInt64Millis(e.Offset)
|
|
rangeMillis := durationToInt64Millis(e.Range)
|
|
newEv := &evaluator{
|
|
endTimestamp: ev.endTimestamp - offsetMillis,
|
|
interval: ev.defaultEvalInterval,
|
|
ctx: ev.ctx,
|
|
currentSamples: ev.currentSamples,
|
|
maxSamples: ev.maxSamples,
|
|
defaultEvalInterval: ev.defaultEvalInterval,
|
|
logger: ev.logger,
|
|
}
|
|
|
|
if e.Step != 0 {
|
|
newEv.interval = durationToInt64Millis(e.Step)
|
|
}
|
|
|
|
// Start with the first timestamp after (ev.startTimestamp - offset - range)
|
|
// that is aligned with the step (multiple of 'newEv.interval').
|
|
newEv.startTimestamp = newEv.interval * ((ev.startTimestamp - offsetMillis - rangeMillis) / newEv.interval)
|
|
if newEv.startTimestamp < (ev.startTimestamp - offsetMillis - rangeMillis) {
|
|
newEv.startTimestamp += newEv.interval
|
|
}
|
|
|
|
res := newEv.eval(e.Expr)
|
|
ev.currentSamples = newEv.currentSamples
|
|
return res
|
|
}
|
|
|
|
panic(errors.Errorf("unhandled expression of type: %T", expr))
|
|
}
|
|
|
|
func durationToInt64Millis(d time.Duration) int64 {
|
|
return int64(d / time.Millisecond)
|
|
}
|
|
|
|
// vectorSelector evaluates a *VectorSelector expression.
|
|
func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector {
|
|
checkForSeriesSetExpansion(ev.ctx, node)
|
|
|
|
var (
|
|
vec = make(Vector, 0, len(node.series))
|
|
)
|
|
|
|
it := storage.NewBuffer(durationMilliseconds(LookbackDelta))
|
|
for i, s := range node.series {
|
|
it.Reset(s.Iterator())
|
|
|
|
t, v, ok := ev.vectorSelectorSingle(it, node, ts)
|
|
if ok {
|
|
vec = append(vec, Sample{
|
|
Metric: node.series[i].Labels(),
|
|
Point: Point{V: v, T: t},
|
|
})
|
|
ev.currentSamples++
|
|
}
|
|
|
|
if ev.currentSamples >= ev.maxSamples {
|
|
ev.error(ErrTooManySamples(env))
|
|
}
|
|
}
|
|
return vec
|
|
}
|
|
|
|
// vectorSelectorSingle evaluates a instant vector for the iterator of one time series.
|
|
func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, node *VectorSelector, ts int64) (int64, float64, bool) {
|
|
refTime := ts - durationMilliseconds(node.Offset)
|
|
var t int64
|
|
var v float64
|
|
|
|
ok := it.Seek(refTime)
|
|
if !ok {
|
|
if it.Err() != nil {
|
|
ev.error(it.Err())
|
|
}
|
|
}
|
|
|
|
if ok {
|
|
t, v = it.Values()
|
|
}
|
|
|
|
if !ok || t > refTime {
|
|
t, v, ok = it.PeekBack(1)
|
|
if !ok || t < refTime-durationMilliseconds(LookbackDelta) {
|
|
return 0, 0, false
|
|
}
|
|
}
|
|
if value.IsStaleNaN(v) {
|
|
return 0, 0, false
|
|
}
|
|
return t, v, true
|
|
}
|
|
|
|
var pointPool = sync.Pool{}
|
|
|
|
func getPointSlice(sz int) []Point {
|
|
p := pointPool.Get()
|
|
if p != nil {
|
|
return p.([]Point)
|
|
}
|
|
return make([]Point, 0, sz)
|
|
}
|
|
|
|
func putPointSlice(p []Point) {
|
|
//lint:ignore SA6002 relax staticcheck verification.
|
|
pointPool.Put(p[:0])
|
|
}
|
|
|
|
// matrixSelector evaluates a *MatrixSelector expression.
|
|
func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
|
|
checkForSeriesSetExpansion(ev.ctx, node)
|
|
|
|
var (
|
|
offset = durationMilliseconds(node.Offset)
|
|
maxt = ev.startTimestamp - offset
|
|
mint = maxt - durationMilliseconds(node.Range)
|
|
matrix = make(Matrix, 0, len(node.series))
|
|
)
|
|
|
|
it := storage.NewBuffer(durationMilliseconds(node.Range))
|
|
for i, s := range node.series {
|
|
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
|
|
ev.error(err)
|
|
}
|
|
it.Reset(s.Iterator())
|
|
ss := Series{
|
|
Metric: node.series[i].Labels(),
|
|
}
|
|
|
|
ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16))
|
|
|
|
if len(ss.Points) > 0 {
|
|
matrix = append(matrix, ss)
|
|
} else {
|
|
putPointSlice(ss.Points)
|
|
}
|
|
}
|
|
return matrix
|
|
}
|
|
|
|
// matrixIterSlice populates a matrix vector covering the requested range for a
|
|
// single time series, with points retrieved from an iterator.
|
|
//
|
|
// As an optimization, the matrix vector may already contain points of the same
|
|
// time series from the evaluation of an earlier step (with lower mint and maxt
|
|
// values). Any such points falling before mint are discarded; points that fall
|
|
// into the [mint, maxt] range are retained; only points with later timestamps
|
|
// are populated from the iterator.
|
|
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point {
|
|
if len(out) > 0 && out[len(out)-1].T >= mint {
|
|
// There is an overlap between previous and current ranges, retain common
|
|
// points. In most such cases:
|
|
// (a) the overlap is significantly larger than the eval step; and/or
|
|
// (b) the number of samples is relatively small.
|
|
// so a linear search will be as fast as a binary search.
|
|
var drop int
|
|
for drop = 0; out[drop].T < mint; drop++ {
|
|
}
|
|
copy(out, out[drop:])
|
|
out = out[:len(out)-drop]
|
|
// Only append points with timestamps after the last timestamp we have.
|
|
mint = out[len(out)-1].T + 1
|
|
} else {
|
|
out = out[:0]
|
|
}
|
|
|
|
ok := it.Seek(maxt)
|
|
if !ok {
|
|
if it.Err() != nil {
|
|
ev.error(it.Err())
|
|
}
|
|
}
|
|
|
|
buf := it.Buffer()
|
|
for buf.Next() {
|
|
t, v := buf.At()
|
|
if value.IsStaleNaN(v) {
|
|
continue
|
|
}
|
|
// Values in the buffer are guaranteed to be smaller than maxt.
|
|
if t >= mint {
|
|
if ev.currentSamples >= ev.maxSamples {
|
|
ev.error(ErrTooManySamples(env))
|
|
}
|
|
out = append(out, Point{T: t, V: v})
|
|
ev.currentSamples++
|
|
}
|
|
}
|
|
// The seeked sample might also be in the range.
|
|
if ok {
|
|
t, v := it.Values()
|
|
if t == maxt && !value.IsStaleNaN(v) {
|
|
if ev.currentSamples >= ev.maxSamples {
|
|
ev.error(ErrTooManySamples(env))
|
|
}
|
|
out = append(out, Point{T: t, V: v})
|
|
ev.currentSamples++
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *VectorMatching, enh *EvalNodeHelper) Vector {
|
|
if matching.Card != CardManyToMany {
|
|
panic("set operations must only use many-to-many matching")
|
|
}
|
|
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...)
|
|
|
|
// The set of signatures for the right-hand side Vector.
|
|
rightSigs := map[uint64]struct{}{}
|
|
// Add all rhs samples to a map so we can easily find matches later.
|
|
for _, rs := range rhs {
|
|
rightSigs[sigf(rs.Metric)] = struct{}{}
|
|
}
|
|
|
|
for _, ls := range lhs {
|
|
// If there's a matching entry in the right-hand side Vector, add the sample.
|
|
if _, ok := rightSigs[sigf(ls.Metric)]; ok {
|
|
enh.out = append(enh.out, ls)
|
|
}
|
|
}
|
|
return enh.out
|
|
}
|
|
|
|
func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *VectorMatching, enh *EvalNodeHelper) Vector {
|
|
if matching.Card != CardManyToMany {
|
|
panic("set operations must only use many-to-many matching")
|
|
}
|
|
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...)
|
|
|
|
leftSigs := map[uint64]struct{}{}
|
|
// Add everything from the left-hand-side Vector.
|
|
for _, ls := range lhs {
|
|
leftSigs[sigf(ls.Metric)] = struct{}{}
|
|
enh.out = append(enh.out, ls)
|
|
}
|
|
// Add all right-hand side elements which have not been added from the left-hand side.
|
|
for _, rs := range rhs {
|
|
if _, ok := leftSigs[sigf(rs.Metric)]; !ok {
|
|
enh.out = append(enh.out, rs)
|
|
}
|
|
}
|
|
return enh.out
|
|
}
|
|
|
|
func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *VectorMatching, enh *EvalNodeHelper) Vector {
|
|
if matching.Card != CardManyToMany {
|
|
panic("set operations must only use many-to-many matching")
|
|
}
|
|
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...)
|
|
|
|
rightSigs := map[uint64]struct{}{}
|
|
for _, rs := range rhs {
|
|
rightSigs[sigf(rs.Metric)] = struct{}{}
|
|
}
|
|
|
|
for _, ls := range lhs {
|
|
if _, ok := rightSigs[sigf(ls.Metric)]; !ok {
|
|
enh.out = append(enh.out, ls)
|
|
}
|
|
}
|
|
return enh.out
|
|
}
|
|
|
|
// VectorBinop evaluates a binary operation between two Vectors, excluding set operators.
|
|
func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorMatching, returnBool bool, enh *EvalNodeHelper) Vector {
|
|
if matching.Card == CardManyToMany {
|
|
panic("many-to-many only allowed for set operators")
|
|
}
|
|
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...)
|
|
|
|
// The control flow below handles one-to-one or many-to-one matching.
|
|
// For one-to-many, swap sidedness and account for the swap when calculating
|
|
// values.
|
|
if matching.Card == CardOneToMany {
|
|
lhs, rhs = rhs, lhs
|
|
}
|
|
|
|
// All samples from the rhs hashed by the matching label/values.
|
|
if enh.rightSigs == nil {
|
|
enh.rightSigs = make(map[uint64]Sample, len(enh.out))
|
|
} else {
|
|
for k := range enh.rightSigs {
|
|
delete(enh.rightSigs, k)
|
|
}
|
|
}
|
|
rightSigs := enh.rightSigs
|
|
|
|
// Add all rhs samples to a map so we can easily find matches later.
|
|
for _, rs := range rhs {
|
|
sig := sigf(rs.Metric)
|
|
// The rhs is guaranteed to be the 'one' side. Having multiple samples
|
|
// with the same signature means that the matching is many-to-many.
|
|
if duplSample, found := rightSigs[sig]; found {
|
|
// oneSide represents which side of the vector represents the 'one' in the many-to-one relationship.
|
|
oneSide := "right"
|
|
if matching.Card == CardOneToMany {
|
|
oneSide = "left"
|
|
}
|
|
matchedLabels := rs.Metric.MatchLabels(matching.On, matching.MatchingLabels...)
|
|
// Many-to-many matching not allowed.
|
|
ev.errorf("found duplicate series for the match group %s on the %s hand-side of the operation: [%s, %s]"+
|
|
";many-to-many matching not allowed: matching labels must be unique on one side", matchedLabels.String(), oneSide, rs.Metric.String(), duplSample.Metric.String())
|
|
}
|
|
rightSigs[sig] = rs
|
|
}
|
|
|
|
// Tracks the match-signature. For one-to-one operations the value is nil. For many-to-one
|
|
// the value is a set of signatures to detect duplicated result elements.
|
|
if enh.matchedSigs == nil {
|
|
enh.matchedSigs = make(map[uint64]map[uint64]struct{}, len(rightSigs))
|
|
} else {
|
|
for k := range enh.matchedSigs {
|
|
delete(enh.matchedSigs, k)
|
|
}
|
|
}
|
|
matchedSigs := enh.matchedSigs
|
|
|
|
// For all lhs samples find a respective rhs sample and perform
|
|
// the binary operation.
|
|
for _, ls := range lhs {
|
|
sig := sigf(ls.Metric)
|
|
|
|
rs, found := rightSigs[sig] // Look for a match in the rhs Vector.
|
|
if !found {
|
|
continue
|
|
}
|
|
|
|
// Account for potentially swapped sidedness.
|
|
vl, vr := ls.V, rs.V
|
|
if matching.Card == CardOneToMany {
|
|
vl, vr = vr, vl
|
|
}
|
|
value, keep := vectorElemBinop(op, vl, vr)
|
|
if returnBool {
|
|
if keep {
|
|
value = 1.0
|
|
} else {
|
|
value = 0.0
|
|
}
|
|
} else if !keep {
|
|
continue
|
|
}
|
|
metric := resultMetric(ls.Metric, rs.Metric, op, matching, enh)
|
|
|
|
insertedSigs, exists := matchedSigs[sig]
|
|
if matching.Card == CardOneToOne {
|
|
if exists {
|
|
ev.errorf("multiple matches for labels: many-to-one matching must be explicit (group_left/group_right)")
|
|
}
|
|
matchedSigs[sig] = nil // Set existence to true.
|
|
} else {
|
|
// In many-to-one matching the grouping labels have to ensure a unique metric
|
|
// for the result Vector. Check whether those labels have already been added for
|
|
// the same matching labels.
|
|
insertSig := metric.Hash()
|
|
|
|
if !exists {
|
|
insertedSigs = map[uint64]struct{}{}
|
|
matchedSigs[sig] = insertedSigs
|
|
} else if _, duplicate := insertedSigs[insertSig]; duplicate {
|
|
ev.errorf("multiple matches for labels: grouping labels must ensure unique matches")
|
|
}
|
|
insertedSigs[insertSig] = struct{}{}
|
|
}
|
|
|
|
enh.out = append(enh.out, Sample{
|
|
Metric: metric,
|
|
Point: Point{V: value},
|
|
})
|
|
}
|
|
return enh.out
|
|
}
|
|
|
|
// signatureFunc returns a function that calculates the signature for a metric
|
|
// ignoring the provided labels. If on, then the given labels are only used instead.
|
|
func signatureFunc(on bool, names ...string) func(labels.Labels) uint64 {
|
|
sort.Strings(names)
|
|
if on {
|
|
return func(lset labels.Labels) uint64 {
|
|
h, _ := lset.HashForLabels(make([]byte, 0, 1024), names...)
|
|
return h
|
|
}
|
|
}
|
|
return func(lset labels.Labels) uint64 {
|
|
h, _ := lset.HashWithoutLabels(make([]byte, 0, 1024), names...)
|
|
return h
|
|
}
|
|
}
|
|
|
|
// resultMetric returns the metric for the given sample(s) based on the Vector
|
|
// binary operation and the matching options.
|
|
func resultMetric(lhs, rhs labels.Labels, op ItemType, matching *VectorMatching, enh *EvalNodeHelper) labels.Labels {
|
|
if enh.resultMetric == nil {
|
|
enh.resultMetric = make(map[uint64]labels.Labels, len(enh.out))
|
|
}
|
|
// op and matching are always the same for a given node, so
|
|
// there's no need to include them in the hash key.
|
|
// If the lhs and rhs are the same then the xor would be 0,
|
|
// so add in one side to protect against that.
|
|
lh := lhs.Hash()
|
|
h := (lh ^ rhs.Hash()) + lh
|
|
if ret, ok := enh.resultMetric[h]; ok {
|
|
return ret
|
|
}
|
|
|
|
lb := labels.NewBuilder(lhs)
|
|
|
|
if shouldDropMetricName(op) {
|
|
lb.Del(labels.MetricName)
|
|
}
|
|
|
|
if matching.Card == CardOneToOne {
|
|
if matching.On {
|
|
Outer:
|
|
for _, l := range lhs {
|
|
for _, n := range matching.MatchingLabels {
|
|
if l.Name == n {
|
|
continue Outer
|
|
}
|
|
}
|
|
lb.Del(l.Name)
|
|
}
|
|
} else {
|
|
lb.Del(matching.MatchingLabels...)
|
|
}
|
|
}
|
|
for _, ln := range matching.Include {
|
|
// Included labels from the `group_x` modifier are taken from the "one"-side.
|
|
if v := rhs.Get(ln); v != "" {
|
|
lb.Set(ln, v)
|
|
} else {
|
|
lb.Del(ln)
|
|
}
|
|
}
|
|
|
|
ret := lb.Labels()
|
|
enh.resultMetric[h] = ret
|
|
return ret
|
|
}
|
|
|
|
// VectorscalarBinop evaluates a binary operation between a Vector and a Scalar.
|
|
func (ev *evaluator) VectorscalarBinop(op ItemType, lhs Vector, rhs Scalar, swap, returnBool bool, enh *EvalNodeHelper) Vector {
|
|
for _, lhsSample := range lhs {
|
|
lv, rv := lhsSample.V, rhs.V
|
|
// lhs always contains the Vector. If the original position was different
|
|
// swap for calculating the value.
|
|
if swap {
|
|
lv, rv = rv, lv
|
|
}
|
|
value, keep := vectorElemBinop(op, lv, rv)
|
|
// Catch cases where the scalar is the LHS in a scalar-vector comparison operation.
|
|
// We want to always keep the vector element value as the output value, even if it's on the RHS.
|
|
if op.isComparisonOperator() && swap {
|
|
value = rv
|
|
}
|
|
if returnBool {
|
|
if keep {
|
|
value = 1.0
|
|
} else {
|
|
value = 0.0
|
|
}
|
|
keep = true
|
|
}
|
|
if keep {
|
|
lhsSample.V = value
|
|
if shouldDropMetricName(op) || returnBool {
|
|
lhsSample.Metric = enh.dropMetricName(lhsSample.Metric)
|
|
}
|
|
enh.out = append(enh.out, lhsSample)
|
|
}
|
|
}
|
|
return enh.out
|
|
}
|
|
|
|
func dropMetricName(l labels.Labels) labels.Labels {
|
|
return labels.NewBuilder(l).Del(labels.MetricName).Labels()
|
|
}
|
|
|
|
// scalarBinop evaluates a binary operation between two Scalars.
|
|
func scalarBinop(op ItemType, lhs, rhs float64) float64 {
|
|
switch op {
|
|
case ItemADD:
|
|
return lhs + rhs
|
|
case ItemSUB:
|
|
return lhs - rhs
|
|
case ItemMUL:
|
|
return lhs * rhs
|
|
case ItemDIV:
|
|
return lhs / rhs
|
|
case ItemPOW:
|
|
return math.Pow(lhs, rhs)
|
|
case ItemMOD:
|
|
return math.Mod(lhs, rhs)
|
|
case ItemEQL:
|
|
return btos(lhs == rhs)
|
|
case ItemNEQ:
|
|
return btos(lhs != rhs)
|
|
case ItemGTR:
|
|
return btos(lhs > rhs)
|
|
case ItemLSS:
|
|
return btos(lhs < rhs)
|
|
case ItemGTE:
|
|
return btos(lhs >= rhs)
|
|
case ItemLTE:
|
|
return btos(lhs <= rhs)
|
|
}
|
|
panic(errors.Errorf("operator %q not allowed for Scalar operations", op))
|
|
}
|
|
|
|
// vectorElemBinop evaluates a binary operation between two Vector elements.
|
|
func vectorElemBinop(op ItemType, lhs, rhs float64) (float64, bool) {
|
|
switch op {
|
|
case ItemADD:
|
|
return lhs + rhs, true
|
|
case ItemSUB:
|
|
return lhs - rhs, true
|
|
case ItemMUL:
|
|
return lhs * rhs, true
|
|
case ItemDIV:
|
|
return lhs / rhs, true
|
|
case ItemPOW:
|
|
return math.Pow(lhs, rhs), true
|
|
case ItemMOD:
|
|
return math.Mod(lhs, rhs), true
|
|
case ItemEQL:
|
|
return lhs, lhs == rhs
|
|
case ItemNEQ:
|
|
return lhs, lhs != rhs
|
|
case ItemGTR:
|
|
return lhs, lhs > rhs
|
|
case ItemLSS:
|
|
return lhs, lhs < rhs
|
|
case ItemGTE:
|
|
return lhs, lhs >= rhs
|
|
case ItemLTE:
|
|
return lhs, lhs <= rhs
|
|
}
|
|
panic(errors.Errorf("operator %q not allowed for operations between Vectors", op))
|
|
}
|
|
|
|
type groupedAggregation struct {
|
|
labels labels.Labels
|
|
value float64
|
|
mean float64
|
|
groupCount int
|
|
heap vectorByValueHeap
|
|
reverseHeap vectorByReverseValueHeap
|
|
}
|
|
|
|
// aggregation evaluates an aggregation operation on a Vector.
|
|
func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, param interface{}, vec Vector, enh *EvalNodeHelper) Vector {
|
|
|
|
result := map[uint64]*groupedAggregation{}
|
|
var k int64
|
|
if op == ItemTopK || op == ItemBottomK {
|
|
f := param.(float64)
|
|
if !convertibleToInt64(f) {
|
|
ev.errorf("Scalar value %v overflows int64", f)
|
|
}
|
|
k = int64(f)
|
|
if k < 1 {
|
|
return Vector{}
|
|
}
|
|
}
|
|
var q float64
|
|
if op == ItemQuantile {
|
|
q = param.(float64)
|
|
}
|
|
var valueLabel string
|
|
if op == ItemCountValues {
|
|
valueLabel = param.(string)
|
|
if !model.LabelName(valueLabel).IsValid() {
|
|
ev.errorf("invalid label name %q", valueLabel)
|
|
}
|
|
if !without {
|
|
grouping = append(grouping, valueLabel)
|
|
}
|
|
}
|
|
|
|
sort.Strings(grouping)
|
|
lb := labels.NewBuilder(nil)
|
|
buf := make([]byte, 0, 1024)
|
|
for _, s := range vec {
|
|
metric := s.Metric
|
|
|
|
if op == ItemCountValues {
|
|
lb.Reset(metric)
|
|
lb.Set(valueLabel, strconv.FormatFloat(s.V, 'f', -1, 64))
|
|
metric = lb.Labels()
|
|
}
|
|
|
|
var (
|
|
groupingKey uint64
|
|
)
|
|
if without {
|
|
groupingKey, buf = metric.HashWithoutLabels(buf, grouping...)
|
|
} else {
|
|
groupingKey, buf = metric.HashForLabels(buf, grouping...)
|
|
}
|
|
|
|
group, ok := result[groupingKey]
|
|
// Add a new group if it doesn't exist.
|
|
if !ok {
|
|
var m labels.Labels
|
|
|
|
if without {
|
|
lb.Reset(metric)
|
|
lb.Del(grouping...)
|
|
lb.Del(labels.MetricName)
|
|
m = lb.Labels()
|
|
} else {
|
|
m = make(labels.Labels, 0, len(grouping))
|
|
for _, l := range metric {
|
|
for _, n := range grouping {
|
|
if l.Name == n {
|
|
m = append(m, l)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
sort.Sort(m)
|
|
}
|
|
result[groupingKey] = &groupedAggregation{
|
|
labels: m,
|
|
value: s.V,
|
|
mean: s.V,
|
|
groupCount: 1,
|
|
}
|
|
inputVecLen := int64(len(vec))
|
|
resultSize := k
|
|
if k > inputVecLen {
|
|
resultSize = inputVecLen
|
|
}
|
|
if op == ItemStdvar || op == ItemStddev {
|
|
result[groupingKey].value = 0.0
|
|
} else if op == ItemTopK || op == ItemQuantile {
|
|
result[groupingKey].heap = make(vectorByValueHeap, 0, resultSize)
|
|
heap.Push(&result[groupingKey].heap, &Sample{
|
|
Point: Point{V: s.V},
|
|
Metric: s.Metric,
|
|
})
|
|
} else if op == ItemBottomK {
|
|
result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 0, resultSize)
|
|
heap.Push(&result[groupingKey].reverseHeap, &Sample{
|
|
Point: Point{V: s.V},
|
|
Metric: s.Metric,
|
|
})
|
|
}
|
|
continue
|
|
}
|
|
|
|
switch op {
|
|
case ItemSum:
|
|
group.value += s.V
|
|
|
|
case ItemAvg:
|
|
group.groupCount++
|
|
group.mean += (s.V - group.mean) / float64(group.groupCount)
|
|
|
|
case ItemMax:
|
|
if group.value < s.V || math.IsNaN(group.value) {
|
|
group.value = s.V
|
|
}
|
|
|
|
case ItemMin:
|
|
if group.value > s.V || math.IsNaN(group.value) {
|
|
group.value = s.V
|
|
}
|
|
|
|
case ItemCount, ItemCountValues:
|
|
group.groupCount++
|
|
|
|
case ItemStdvar, ItemStddev:
|
|
group.groupCount++
|
|
delta := s.V - group.mean
|
|
group.mean += delta / float64(group.groupCount)
|
|
group.value += delta * (s.V - group.mean)
|
|
|
|
case ItemTopK:
|
|
if int64(len(group.heap)) < k || group.heap[0].V < s.V || math.IsNaN(group.heap[0].V) {
|
|
if int64(len(group.heap)) == k {
|
|
heap.Pop(&group.heap)
|
|
}
|
|
heap.Push(&group.heap, &Sample{
|
|
Point: Point{V: s.V},
|
|
Metric: s.Metric,
|
|
})
|
|
}
|
|
|
|
case ItemBottomK:
|
|
if int64(len(group.reverseHeap)) < k || group.reverseHeap[0].V > s.V || math.IsNaN(group.reverseHeap[0].V) {
|
|
if int64(len(group.reverseHeap)) == k {
|
|
heap.Pop(&group.reverseHeap)
|
|
}
|
|
heap.Push(&group.reverseHeap, &Sample{
|
|
Point: Point{V: s.V},
|
|
Metric: s.Metric,
|
|
})
|
|
}
|
|
|
|
case ItemQuantile:
|
|
group.heap = append(group.heap, s)
|
|
|
|
default:
|
|
panic(errors.Errorf("expected aggregation operator but got %q", op))
|
|
}
|
|
}
|
|
|
|
// Construct the result Vector from the aggregated groups.
|
|
for _, aggr := range result {
|
|
switch op {
|
|
case ItemAvg:
|
|
aggr.value = aggr.mean
|
|
|
|
case ItemCount, ItemCountValues:
|
|
aggr.value = float64(aggr.groupCount)
|
|
|
|
case ItemStdvar:
|
|
aggr.value = aggr.value / float64(aggr.groupCount)
|
|
|
|
case ItemStddev:
|
|
aggr.value = math.Sqrt(aggr.value / float64(aggr.groupCount))
|
|
|
|
case ItemTopK:
|
|
// The heap keeps the lowest value on top, so reverse it.
|
|
sort.Sort(sort.Reverse(aggr.heap))
|
|
for _, v := range aggr.heap {
|
|
enh.out = append(enh.out, Sample{
|
|
Metric: v.Metric,
|
|
Point: Point{V: v.V},
|
|
})
|
|
}
|
|
continue // Bypass default append.
|
|
|
|
case ItemBottomK:
|
|
// The heap keeps the lowest value on top, so reverse it.
|
|
sort.Sort(sort.Reverse(aggr.reverseHeap))
|
|
for _, v := range aggr.reverseHeap {
|
|
enh.out = append(enh.out, Sample{
|
|
Metric: v.Metric,
|
|
Point: Point{V: v.V},
|
|
})
|
|
}
|
|
continue // Bypass default append.
|
|
|
|
case ItemQuantile:
|
|
aggr.value = quantile(q, aggr.heap)
|
|
|
|
default:
|
|
// For other aggregations, we already have the right value.
|
|
}
|
|
|
|
enh.out = append(enh.out, Sample{
|
|
Metric: aggr.labels,
|
|
Point: Point{V: aggr.value},
|
|
})
|
|
}
|
|
return enh.out
|
|
}
|
|
|
|
// btos returns 1 if b is true, 0 otherwise.
|
|
func btos(b bool) float64 {
|
|
if b {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// shouldDropMetricName returns whether the metric name should be dropped in the
|
|
// result of the op operation.
|
|
func shouldDropMetricName(op ItemType) bool {
|
|
switch op {
|
|
case ItemADD, ItemSUB, ItemDIV, ItemMUL, ItemPOW, ItemMOD:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// documentedType returns the internal type to the equivalent
|
|
// user facing terminology as defined in the documentation.
|
|
func documentedType(t ValueType) string {
|
|
switch t {
|
|
case "vector":
|
|
return "instant vector"
|
|
case "matrix":
|
|
return "range vector"
|
|
default:
|
|
return string(t)
|
|
}
|
|
}
|