'@ <timestamp>' modifier (#8121)

This commit adds `@ <timestamp>` modifier as per this design doc: https://docs.google.com/document/d/1uSbD3T2beM-iX4-Hp7V074bzBRiRNlqUdcWP6JTDQSs/edit.

An example query:

```
rate(process_cpu_seconds_total[1m]) 
  and
topk(7, rate(process_cpu_seconds_total[1h] @ 1234))
```

which ranks based on last 1h rate and w.r.t. unix timestamp 1234 but actually plots the 1m rate.

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2021-01-20 16:27:39 +05:30 committed by GitHub
parent b7fe028740
commit 9199fcb8d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 3208 additions and 942 deletions

View file

@ -97,6 +97,53 @@ func init() {
}
}
type flagConfig struct {
configFile string
localStoragePath string
notifier notifier.Options
notifierTimeout model.Duration
forGracePeriod model.Duration
outageTolerance model.Duration
resendDelay model.Duration
web web.Options
tsdb tsdbOptions
lookbackDelta model.Duration
webTimeout model.Duration
queryTimeout model.Duration
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline model.Duration
featureList []string
// These options are extracted from featureList
// for ease of use.
enablePromQLAtModifier bool
prometheusURL string
corsRegexString string
promlogConfig promlog.Config
}
// setFeatureListOptions sets the corresponding options from the featureList.
func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
for _, f := range c.featureList {
opts := strings.Split(f, ",")
for _, o := range opts {
switch o {
case "promql-at-modifier":
c.enablePromQLAtModifier = true
case "":
continue
default:
level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o)
}
}
}
return nil
}
func main() {
if os.Getenv("DEBUG") != "" {
runtime.SetBlockProfileRate(20)
@ -108,29 +155,7 @@ func main() {
newFlagRetentionDuration model.Duration
)
cfg := struct {
configFile string
localStoragePath string
notifier notifier.Options
notifierTimeout model.Duration
forGracePeriod model.Duration
outageTolerance model.Duration
resendDelay model.Duration
web web.Options
tsdb tsdbOptions
lookbackDelta model.Duration
webTimeout model.Duration
queryTimeout model.Duration
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline model.Duration
prometheusURL string
corsRegexString string
promlogConfig promlog.Config
}{
cfg := flagConfig{
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
},
@ -265,6 +290,9 @@ func main() {
a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return.").
Default("50000000").IntVar(&cfg.queryMaxSamples)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: 'promql-at-modifier' to enable the @ modifier. See https://prometheus.io/docs/prometheus/latest/disabled_features/ for more details.").
Default("").StringsVar(&cfg.featureList)
promlogflag.AddFlags(a, &cfg.promlogConfig)
_, err := a.Parse(os.Args[1:])
@ -276,6 +304,11 @@ func main() {
logger := promlog.New(&cfg.promlogConfig)
if err := cfg.setFeatureListOptions(logger); err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "Error parsing feature list"))
os.Exit(1)
}
cfg.web.ExternalURL, err = computeExternalURL(cfg.prometheusURL, cfg.web.ListenAddress)
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "parse external URL %q", cfg.prometheusURL))
@ -400,6 +433,7 @@ func main() {
ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")),
LookbackDelta: time.Duration(cfg.lookbackDelta),
NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,
EnableAtModifier: cfg.enablePromQLAtModifier,
}
queryEngine = promql.NewEngine(opts)

19
docs/disabled_features.md Normal file
View file

@ -0,0 +1,19 @@
---
title: Disabled Features
sort_rank: 10
---
# Disabled Features
Here is a list of features that are disabled by default since they are breaking changes or are considered experimental.
Their behaviour can change in future releases which will be communicated via the [release changelog](https://github.com/prometheus/prometheus/blob/master/CHANGELOG.md).
You can enable them using the `--enable-feature` flag with a comma separated list of features.
They may be enabled by default in future versions.
## `@` Modifier in PromQL
`--enable-feature=promql-at-modifier`
This feature lets you specify the evaluation time for instant vector selectors,
range vector selectors, and subqueries. More details can be found [here](querying/basics.md#@-modifier).

View file

@ -204,11 +204,51 @@ The same works for range vectors. This returns the 5-minute rate that
rate(http_requests_total[5m] offset 1w)
### @ modifier
The `@` modifier allows changing the evaluation time for individual instant
and range vectors in a query. The time supplied to `@` modifier
is a unix timestamp and described with a float literal.
For example, the following expression returns the value of
`http_requests_total` at `2021-01-04T07:40:00+00:00`:
http_requests_total @ 1609746000
Note that the `@` modifier always needs to follow the selector
immediately, i.e. the following would be correct:
sum(http_requests_total{method="GET"} @ 1609746000) // GOOD.
While the following would be *incorrect*:
sum(http_requests_total{method="GET"}) @ 1609746000 // INVALID.
The same works for range vectors. This returns the 5-minute rate that
`http_requests_total` had at `2021-01-04T07:40:00+00:00`:
rate(http_requests_total[5m] @ 1609746000)
`@` modifier supports all representation of float literals described
above within the limits of `int64`. It can also be used along
with `offset` modifier where the offset is applied relative to the `@`
modifier time irrespective of which modifier is written first.
These 2 queries will produce the same result.
# offset after @
http_requests_total @ 1609746000 offset 5m
# offset before @
http_requests_total offset 5m @ 1609746000
This modifier is disabled by default since it breaks the invariant that PromQL
does not look ahead of the evaluation time for samples. It can be enabled by setting
`--enable-feature=promql-at-modifier` flag. It will be enabled by default in the future.
## Subquery
Subquery allows you to run an instant query for a given range and resolution. The result of a subquery is a range vector.
Syntax: `<instant_query> '[' <range> ':' [<resolution>] ']' [ offset <duration> ]`
Syntax: `<instant_query> '[' <range> ':' [<resolution>] ']' [ @ <float_literal> ] [ offset <duration> ]`
* `<resolution>` is optional. Default is the global evaluation interval.

View file

@ -13,7 +13,10 @@
package timestamp
import "time"
import (
"math"
"time"
)
// FromTime returns a new millisecond timestamp from a time.
func FromTime(t time.Time) int64 {
@ -24,3 +27,8 @@ func FromTime(t time.Time) int64 {
func Time(ts int64) time.Time {
return time.Unix(ts/1000, (ts%1000)*int64(time.Millisecond)).UTC()
}
// FromFloatSeconds returns a millisecond timestamp from float seconds.
func FromFloatSeconds(ts float64) int64 {
return int64(math.Round(ts * 1000))
}

View file

@ -208,6 +208,9 @@ type EngineOpts struct {
// NoStepSubqueryIntervalFn is the default evaluation interval of
// a subquery in milliseconds if no step in range vector was specified `[30m:<step>]`.
NoStepSubqueryIntervalFn func(rangeMillis int64) int64
// EnableAtModifier if true enables @ modifier. Disabled otherwise.
EnableAtModifier bool
}
// Engine handles the lifetime of queries from beginning to end.
@ -222,6 +225,7 @@ type Engine struct {
queryLoggerLock sync.RWMutex
lookbackDelta time.Duration
noStepSubqueryIntervalFn func(rangeMillis int64) int64
enableAtModifier bool
}
// NewEngine returns a new engine.
@ -302,6 +306,7 @@ func NewEngine(opts EngineOpts) *Engine {
activeQueryTracker: opts.ActiveQueryTracker,
lookbackDelta: opts.LookbackDelta,
noStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn,
enableAtModifier: opts.EnableAtModifier,
}
}
@ -334,7 +339,10 @@ func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time)
if err != nil {
return nil, err
}
qry := ng.newQuery(q, expr, ts, ts, 0)
qry, err := ng.newQuery(q, expr, ts, ts, 0)
if err != nil {
return nil, err
}
qry.q = qs
return qry, nil
@ -350,15 +358,22 @@ func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time.
if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
return nil, errors.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
}
qry := ng.newQuery(q, expr, start, end, interval)
qry, err := ng.newQuery(q, expr, start, end, interval)
if err != nil {
return nil, err
}
qry.q = qs
return qry, nil
}
func (ng *Engine) newQuery(q storage.Queryable, expr parser.Expr, start, end time.Time, interval time.Duration) *query {
func (ng *Engine) newQuery(q storage.Queryable, expr parser.Expr, start, end time.Time, interval time.Duration) (*query, error) {
if err := ng.validateOpts(expr); err != nil {
return nil, err
}
es := &parser.EvalStmt{
Expr: expr,
Expr: WrapWithStepInvariantExpr(expr),
Start: start,
End: end,
Interval: interval,
@ -369,7 +384,39 @@ func (ng *Engine) newQuery(q storage.Queryable, expr parser.Expr, start, end tim
stats: stats.NewQueryTimers(),
queryable: q,
}
return qry
return qry, nil
}
func (ng *Engine) validateOpts(expr parser.Expr) error {
if ng.enableAtModifier {
return nil
}
var validationErr error
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.VectorSelector:
if n.Timestamp != nil {
validationErr = errors.New("@ modifier is disabled")
return validationErr
}
case *parser.MatrixSelector:
if n.VectorSelector.(*parser.VectorSelector).Timestamp != nil {
validationErr = errors.New("@ modifier is disabled")
return validationErr
}
case *parser.SubqueryExpr:
if n.Timestamp != nil {
validationErr = errors.New("@ modifier is disabled")
return validationErr
}
}
return nil
})
return validationErr
}
func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
@ -477,8 +524,8 @@ func durationMilliseconds(d time.Duration) int64 {
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.EvalStmt) (parser.Value, storage.Warnings, error) {
prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime)
mint := ng.findMinTime(s)
querier, err := query.queryable.Querier(ctxPrepare, timestamp.FromTime(mint), timestamp.FromTime(s.End))
mint, maxt := ng.findMinMaxTime(s)
querier, err := query.queryable.Querier(ctxPrepare, mint, maxt)
if err != nil {
prepareSpanTimer.Finish()
return nil, nil, err
@ -488,6 +535,9 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
ng.populateSeries(querier, s)
prepareSpanTimer.Finish()
// Modify the offset of vector and matrix selectors for the @ modifier
// w.r.t. the start time since only 1 evaluation will be done on them.
setOffsetForAtModifier(timeMilliseconds(s.Start), s.Expr)
evalSpanTimer, ctxInnerEval := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval)
// Instant evaluation. This is executed as a range evaluation with one step.
if s.Start == s.End && s.Interval == 0 {
@ -576,45 +626,102 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
return mat, warnings, nil
}
// subqueryOffsetRange returns the sum of offsets and ranges of all subqueries in the path.
func (ng *Engine) subqueryOffsetRange(path []parser.Node) (time.Duration, time.Duration) {
// subqueryTimes returns the sum of offsets and ranges of all subqueries in the path.
// If the @ modifier is used, then the offset and range is w.r.t. that timestamp
// (i.e. the sum is reset when we have @ modifier).
// The returned *int64 is the closest timestamp that was seen. nil for no @ modifier.
func subqueryTimes(path []parser.Node) (time.Duration, time.Duration, *int64) {
var (
subqOffset time.Duration
subqRange time.Duration
subqOffset, subqRange time.Duration
ts int64 = math.MaxInt64
)
for _, node := range path {
switch n := node.(type) {
case *parser.SubqueryExpr:
subqOffset += n.Offset
subqOffset += n.OriginalOffset
subqRange += n.Range
if n.Timestamp != nil {
// The @ modifier on subquery invalidates all the offset and
// range till now. Hence resetting it here.
subqOffset = n.OriginalOffset
subqRange = n.Range
ts = *n.Timestamp
}
}
}
return subqOffset, subqRange
var tsp *int64
if ts != math.MaxInt64 {
tsp = &ts
}
return subqOffset, subqRange, tsp
}
func (ng *Engine) findMinTime(s *parser.EvalStmt) time.Time {
var maxOffset time.Duration
func (ng *Engine) findMinMaxTime(s *parser.EvalStmt) (int64, int64) {
var minTimestamp, maxTimestamp int64 = math.MaxInt64, math.MinInt64
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
// the variable.
var evalRange time.Duration
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
subqOffset, subqRange := ng.subqueryOffsetRange(path)
switch n := node.(type) {
case *parser.VectorSelector:
if maxOffset < ng.lookbackDelta+subqOffset+subqRange {
maxOffset = ng.lookbackDelta + subqOffset + subqRange
start, end := ng.getTimeRangesForSelector(s, n, path, evalRange)
if start < minTimestamp {
minTimestamp = start
}
if n.Offset+ng.lookbackDelta+subqOffset+subqRange > maxOffset {
maxOffset = n.Offset + ng.lookbackDelta + subqOffset + subqRange
if end > maxTimestamp {
maxTimestamp = end
}
evalRange = 0
case *parser.MatrixSelector:
if maxOffset < n.Range+subqOffset+subqRange {
maxOffset = n.Range + subqOffset + subqRange
}
if m := n.VectorSelector.(*parser.VectorSelector).Offset + n.Range + subqOffset + subqRange; m > maxOffset {
maxOffset = m
}
evalRange = n.Range
}
return nil
})
return s.Start.Add(-maxOffset)
if maxTimestamp == math.MinInt64 {
// This happens when there was no selector. Hence no time range to select.
minTimestamp = 0
maxTimestamp = 0
}
return minTimestamp, maxTimestamp
}
func (ng *Engine) getTimeRangesForSelector(s *parser.EvalStmt, n *parser.VectorSelector, path []parser.Node, evalRange time.Duration) (int64, int64) {
start, end := timestamp.FromTime(s.Start), timestamp.FromTime(s.End)
subqOffset, subqRange, subqTs := subqueryTimes(path)
if subqTs != nil {
// The timestamp on the subquery overrides the eval statement time ranges.
start = *subqTs
end = *subqTs
}
if n.Timestamp != nil {
// The timestamp on the selector overrides everything.
start = *n.Timestamp
end = *n.Timestamp
} else {
offsetMilliseconds := durationMilliseconds(subqOffset)
start = start - offsetMilliseconds - durationMilliseconds(subqRange)
end = end - offsetMilliseconds
}
if evalRange == 0 {
start = start - durationMilliseconds(ng.lookbackDelta)
} else {
// For all matrix queries we want to ensure that we have (end-start) + range selected
// this way we have `range` data before the start time
start = start - durationMilliseconds(evalRange)
}
offsetMilliseconds := durationMilliseconds(n.OriginalOffset)
start = start - offsetMilliseconds
end = end - offsetMilliseconds
return start, end
}
func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) {
@ -626,40 +733,18 @@ func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) {
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.VectorSelector:
start, end := ng.getTimeRangesForSelector(s, n, path, evalRange)
hints := &storage.SelectHints{
Start: timestamp.FromTime(s.Start),
End: timestamp.FromTime(s.End),
Start: start,
End: end,
Step: durationMilliseconds(s.Interval),
Range: durationMilliseconds(evalRange),
Func: extractFuncFromPath(path),
}
// We need to make sure we select the timerange selected by the subquery.
// The subqueryOffsetRange function gives the sum of range and the
// sum of offset.
// TODO(bwplotka): Add support for better hints when subquerying. See: https://github.com/prometheus/prometheus/issues/7630.
subqOffset, subqRange := ng.subqueryOffsetRange(path)
offsetMilliseconds := durationMilliseconds(subqOffset)
hints.Start = hints.Start - offsetMilliseconds - durationMilliseconds(subqRange)
hints.End = hints.End - offsetMilliseconds
if evalRange == 0 {
hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta)
} else {
hints.Range = durationMilliseconds(evalRange)
// For all matrix queries we want to ensure that we have (end-start) + range selected
// this way we have `range` data before the start time
hints.Start = hints.Start - durationMilliseconds(evalRange)
evalRange = 0
}
hints.Func = extractFuncFromPath(path)
evalRange = 0
hints.By, hints.Grouping = extractGroupsFromPath(path)
if n.Offset > 0 {
offsetMilliseconds := durationMilliseconds(n.Offset)
hints.Start = hints.Start - offsetMilliseconds
hints.End = hints.End - offsetMilliseconds
}
n.UnexpandedSeriesSet = querier.Select(false, hints, n.LabelMatchers...)
case *parser.MatrixSelector:
evalRange = n.Range
}
@ -852,7 +937,7 @@ func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.L
// the given function with the values computed for each expression at that
// step. The return value is the combination into time series of all the
// function call results.
func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) (Vector, storage.Warnings), exprs ...parser.Expr) (Matrix, storage.Warnings) {
func (ev *evaluator) rangeEval(funcCall func([]parser.Value, *EvalNodeHelper) (Vector, storage.Warnings), exprs ...parser.Expr) (Matrix, storage.Warnings) {
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
matrixes := make([]Matrix, len(exprs))
origMatrixes := make([]Matrix, len(exprs))
@ -917,7 +1002,7 @@ func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) (Vector,
}
// Make the function call.
enh.Ts = ts
result, ws := f(args, enh)
result, ws := funcCall(args, enh)
if result.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
}
@ -978,21 +1063,30 @@ func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) (Vector,
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, storage.Warnings) {
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, storage.Warnings) {
val, ws := ev.eval(subq)
mat := val.(Matrix)
vs := &parser.VectorSelector{
Offset: subq.Offset,
Series: make([]storage.Series, 0, len(mat)),
OriginalOffset: subq.OriginalOffset,
Offset: subq.Offset,
Series: make([]storage.Series, 0, len(mat)),
Timestamp: subq.Timestamp,
}
if subq.Timestamp != nil {
// The offset of subquery is not modified in case of @ modifier.
// Hence we take care of that here for the result.
vs.Offset = subq.OriginalOffset + time.Duration(ev.startTimestamp-*subq.Timestamp)*time.Millisecond
}
ms := &parser.MatrixSelector{
Range: subq.Range,
VectorSelector: vs,
}
totalSamples := 0
for _, s := range mat {
totalSamples += len(s.Points)
vs.Series = append(vs.Series, NewStorageSeries(s))
}
return ms, ws
return ms, totalSamples, ws
}
// eval evaluates the given expression as the given AST expression node requires.
@ -1007,7 +1101,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
switch e := expr.(type) {
case *parser.AggregateExpr:
unwrapParenExpr(&e.Param)
if s, ok := e.Param.(*parser.StringLiteral); ok {
if s, ok := unwrapStepInvariantExpr(e.Param).(*parser.StringLiteral); ok {
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.aggregation(e.Op, e.Grouping, e.Without, s.Val, v[0].(Vector), enh), nil
}, e.Expr)
@ -1022,13 +1116,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
case *parser.Call:
call := FunctionCalls[e.Func.Name]
if e.Func.Name == "timestamp" {
// Matrix evaluation always returns the evaluation time,
// so this function needs special handling when given
// a vector selector.
unwrapParenExpr(&e.Args[0])
vs, ok := e.Args[0].(*parser.VectorSelector)
arg := unwrapStepInvariantExpr(e.Args[0])
vs, ok := arg.(*parser.VectorSelector)
if ok {
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
val, ws := ev.vectorSelector(vs, enh.Ts)
@ -1045,7 +1139,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
)
for i := range e.Args {
unwrapParenExpr(&e.Args[i])
a := e.Args[i]
a := unwrapStepInvariantExpr(e.Args[i])
if _, ok := a.(*parser.MatrixSelector); ok {
matrixArgIndex = i
matrixArg = true
@ -1056,9 +1150,14 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
matrixArgIndex = i
matrixArg = true
// Replacing parser.SubqueryExpr with parser.MatrixSelector.
val, ws := ev.evalSubquery(subq)
val, totalSamples, ws := ev.evalSubquery(subq)
e.Args[i] = val
warnings = append(warnings, ws...)
defer func() {
// subquery result takes space in the memory. Get rid of that at the end.
val.VectorSelector.(*parser.VectorSelector).Series = nil
ev.currentSamples -= totalSamples
}()
break
}
}
@ -1083,7 +1182,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
}
}
sel := e.Args[matrixArgIndex].(*parser.MatrixSelector)
sel := unwrapStepInvariantExpr(e.Args[matrixArgIndex]).(*parser.MatrixSelector)
selVS := sel.VectorSelector.(*parser.VectorSelector)
ws, err := checkAndExpandSeriesSet(ev.ctx, sel)
@ -1146,7 +1245,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
it.ReduceDelta(stepRange)
}
if len(ss.Points) > 0 {
if ev.currentSamples < ev.maxSamples {
if ev.currentSamples+len(ss.Points) <= ev.maxSamples {
mat = append(mat, ss)
ev.currentSamples += len(ss.Points)
} else {
@ -1266,6 +1365,9 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
return append(enh.Out, Sample{Point: Point{V: e.Val}}), nil
})
case *parser.StringLiteral:
return String{V: e.Val, T: ev.startTimestamp}, nil
case *parser.VectorSelector:
ws, err := checkAndExpandSeriesSet(ev.ctx, e)
if err != nil {
@ -1332,11 +1434,65 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
newEv.startTimestamp += newEv.interval
}
if newEv.startTimestamp != ev.startTimestamp {
// Adjust the offset of selectors based on the new
// start time of the evaluator since the calculation
// of the offset with @ happens w.r.t. the start time.
setOffsetForAtModifier(newEv.startTimestamp, e.Expr)
}
res, ws := newEv.eval(e.Expr)
ev.currentSamples = newEv.currentSamples
return res, ws
case *parser.StringLiteral:
return String{V: e.Val, T: ev.startTimestamp}, nil
case *parser.StepInvariantExpr:
switch ce := e.Expr.(type) {
case *parser.StringLiteral, *parser.NumberLiteral:
return ev.eval(ce)
}
newEv := &evaluator{
startTimestamp: ev.startTimestamp,
endTimestamp: ev.startTimestamp, // Always a single evaluation.
interval: ev.interval,
ctx: ev.ctx,
currentSamples: ev.currentSamples,
maxSamples: ev.maxSamples,
logger: ev.logger,
lookbackDelta: ev.lookbackDelta,
noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn,
}
res, ws := newEv.eval(e.Expr)
ev.currentSamples = newEv.currentSamples
switch e.Expr.(type) {
case *parser.MatrixSelector, *parser.SubqueryExpr:
// We do not duplicate results for range selectors since result is a matrix
// with their unique timestamps which does not depend on the step.
return res, ws
}
// For every evaluation while the value remains same, the timestamp for that
// value would change for different eval times. Hence we duplicate the result
// with changed timestamps.
mat, ok := res.(Matrix)
if !ok {
panic(errors.Errorf("unexpected result in StepInvariantExpr evaluation: %T", expr))
}
for i := range mat {
if len(mat[i].Points) != 1 {
panic(errors.Errorf("unexpected number of samples"))
}
for ts := ev.startTimestamp + ev.interval; ts <= ev.endTimestamp; ts = ts + ev.interval {
mat[i].Points = append(mat[i].Points, Point{
T: ts,
V: mat[i].Points[0].V,
})
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
}
return res, ws
}
panic(errors.Errorf("unhandled expression of type: %T", expr))
@ -1359,12 +1515,13 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect
Metric: node.Series[i].Labels(),
Point: Point{V: v, T: t},
})
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
return vec, ws
}
@ -1497,8 +1654,8 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
out = append(out, Point{T: t, V: v})
ev.currentSamples++
out = append(out, Point{T: t, V: v})
}
}
// The seeked sample might also be in the range.
@ -2141,3 +2298,141 @@ func unwrapParenExpr(e *parser.Expr) {
}
}
}
func unwrapStepInvariantExpr(e parser.Expr) parser.Expr {
if p, ok := e.(*parser.StepInvariantExpr); ok {
return p.Expr
}
return e
}
// WrapWithStepInvariantExpr wraps all possible parts of the given
// expression with StepInvariantExpr wherever valid.
func WrapWithStepInvariantExpr(expr parser.Expr) parser.Expr {
isStepInvariant := wrapWithStepInvariantExprHelper(expr)
if isStepInvariant {
return newStepInvariantExpr(expr)
}
return expr
}
// wrapWithStepInvariantExprHelper wraps the child nodes of the expression
// with a StepInvariantExpr wherever valid. The returned boolean is true if the
// passed expression qualifies to be wrapped by StepInvariantExpr.
func wrapWithStepInvariantExprHelper(expr parser.Expr) bool {
switch n := expr.(type) {
case *parser.VectorSelector:
return n.Timestamp != nil
case *parser.AggregateExpr:
return wrapWithStepInvariantExprHelper(n.Expr)
case *parser.BinaryExpr:
isInvariant1, isInvariant2 := wrapWithStepInvariantExprHelper(n.LHS), wrapWithStepInvariantExprHelper(n.RHS)
if isInvariant1 && isInvariant2 {
return true
}
if isInvariant1 {
n.LHS = newStepInvariantExpr(n.LHS)
}
if isInvariant2 {
n.RHS = newStepInvariantExpr(n.RHS)
}
return false
case *parser.Call:
_, ok := AtModifierUnsafeFunctions[n.Func.Name]
isStepInvariant := !ok
isStepInvariantSlice := make([]bool, len(n.Args))
for i := range n.Args {
isStepInvariantSlice[i] = wrapWithStepInvariantExprHelper(n.Args[i])
isStepInvariant = isStepInvariant && isStepInvariantSlice[i]
}
if isStepInvariant {
// The function and all arguments are step invariant.
return true
}
for i, isi := range isStepInvariantSlice {
if isi {
n.Args[i] = newStepInvariantExpr(n.Args[i])
}
}
return false
case *parser.MatrixSelector:
return n.VectorSelector.(*parser.VectorSelector).Timestamp != nil
case *parser.SubqueryExpr:
// Since we adjust offset for the @ modifier evaluation,
// it gets tricky to adjust it for every subquery step.
// Hence we wrap the inside of subquery irrespective of
// @ on subquery (given it is also step invariant) so that
// it is evaluated only once w.r.t. the start time of subquery.
isInvariant := wrapWithStepInvariantExprHelper(n.Expr)
if isInvariant {
n.Expr = newStepInvariantExpr(n.Expr)
}
return n.Timestamp != nil
case *parser.ParenExpr:
return wrapWithStepInvariantExprHelper(n.Expr)
case *parser.UnaryExpr:
return wrapWithStepInvariantExprHelper(n.Expr)
case *parser.StringLiteral, *parser.NumberLiteral:
return true
}
panic(fmt.Sprintf("found unexpected node %#v", expr))
}
func newStepInvariantExpr(expr parser.Expr) parser.Expr {
if e, ok := expr.(*parser.ParenExpr); ok {
// Wrapping the inside of () makes it easy to unwrap the paren later.
// But this effectively unwraps the paren.
return newStepInvariantExpr(e.Expr)
}
return &parser.StepInvariantExpr{Expr: expr}
}
// setOffsetForAtModifier modifies the offset of vector and matrix selector
// and subquery in the tree to accommodate the timestamp of @ modifier.
// The offset is adjusted w.r.t. the given evaluation time.
func setOffsetForAtModifier(evalTime int64, expr parser.Expr) {
getOffset := func(ts *int64, originalOffset time.Duration, path []parser.Node) time.Duration {
if ts == nil {
return originalOffset
}
subqOffset, _, subqTs := subqueryTimes(path)
if subqTs != nil {
subqOffset += time.Duration(evalTime-*subqTs) * time.Millisecond
}
offsetForTs := time.Duration(evalTime-*ts) * time.Millisecond
offsetDiff := offsetForTs - subqOffset
return originalOffset + offsetDiff
}
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.VectorSelector:
n.Offset = getOffset(n.Timestamp, n.OriginalOffset, path)
case *parser.MatrixSelector:
vs := n.VectorSelector.(*parser.VectorSelector)
vs.Offset = getOffset(vs.Timestamp, vs.OriginalOffset, path)
case *parser.SubqueryExpr:
n.Offset = getOffset(n.Timestamp, n.OriginalOffset, path)
}
return nil
})
}

File diff suppressed because it is too large Load diff

View file

@ -59,7 +59,6 @@ func funcTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper)
func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector {
ms := args[0].(*parser.MatrixSelector)
vs := ms.VectorSelector.(*parser.VectorSelector)
var (
samples = vals[0].(Matrix)[0]
rangeStart = enh.Ts - durationMilliseconds(ms.Range+vs.Offset)
@ -598,7 +597,6 @@ func funcDeriv(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper
func funcPredictLinear(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
samples := vals[0].(Matrix)[0]
duration := vals[1].(Vector)[0].V
// No sense in trying to predict anything without at least two points.
// Drop this Vector element.
if len(samples.Points) < 2 {
@ -701,10 +699,10 @@ func funcChanges(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelp
func funcLabelReplace(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
var (
vector = vals[0].(Vector)
dst = args[1].(*parser.StringLiteral).Val
repl = args[2].(*parser.StringLiteral).Val
src = args[3].(*parser.StringLiteral).Val
regexStr = args[4].(*parser.StringLiteral).Val
dst = stringFromArg(args[1])
repl = stringFromArg(args[2])
src = stringFromArg(args[3])
regexStr = stringFromArg(args[4])
)
if enh.regex == nil {
@ -764,8 +762,8 @@ func funcVector(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelpe
func funcLabelJoin(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
var (
vector = vals[0].(Vector)
dst = args[1].(*parser.StringLiteral).Val
sep = args[2].(*parser.StringLiteral).Val
dst = stringFromArg(args[1])
sep = stringFromArg(args[2])
srcLabels = make([]string, len(args)-3)
)
@ -774,7 +772,7 @@ func funcLabelJoin(vals []parser.Value, args parser.Expressions, enh *EvalNodeHe
}
for i := 3; i < len(args); i++ {
src := args[i].(*parser.StringLiteral).Val
src := stringFromArg(args[i])
if !model.LabelName(src).IsValid() {
panic(errors.Errorf("invalid source label name in label_join(): %s", src))
}
@ -938,6 +936,21 @@ var FunctionCalls = map[string]FunctionCall{
"year": funcYear,
}
// AtModifierUnsafeFunctions are the functions whose result
// can vary if evaluation time is changed when the arguments are
// step invariant. It also includes functions that use the timestamps
// of the passed instant vector argument to calculate a result since
// that can also change with change in eval time.
var AtModifierUnsafeFunctions = map[string]struct{}{
// Step invariant functions.
"days_in_month": {}, "day_of_month": {}, "day_of_week": {},
"hour": {}, "minute": {}, "month": {}, "year": {},
"predict_linear": {}, "time": {},
// Uses timestamp of the argument for the result,
// hence unsafe to use with @ modifier.
"timestamp": {},
}
type vectorByValueHeap Vector
func (s vectorByValueHeap) Len() int {
@ -1028,3 +1041,7 @@ func createLabelsForAbsentFunction(expr parser.Expr) labels.Labels {
}
return m
}
func stringFromArg(e parser.Expr) string {
return unwrapStepInvariantExpr(e).(*parser.StringLiteral).Val
}

View file

@ -125,10 +125,17 @@ type MatrixSelector struct {
// SubqueryExpr represents a subquery.
type SubqueryExpr struct {
Expr Expr
Range time.Duration
Offset time.Duration
Step time.Duration
Expr Expr
Range time.Duration
// OriginalOffset is the actual offset that was set in the query.
// This never changes.
OriginalOffset time.Duration
// Offset is the offset used during the query execution
// which is calculated using the original offset, at modifier time,
// eval time, and subquery offsets in the AST tree.
Offset time.Duration
Timestamp *int64
Step time.Duration
EndPos Pos
}
@ -162,10 +169,28 @@ type UnaryExpr struct {
StartPos Pos
}
// StepInvariantExpr represents a query which evaluates to the same result
// irrespective of the evaluation time given the raw samples from TSDB remain unchanged.
// Currently this is only used for engine optimisations and the parser does not produce this.
type StepInvariantExpr struct {
Expr Expr
}
func (e *StepInvariantExpr) String() string { return e.Expr.String() }
func (e *StepInvariantExpr) PositionRange() PositionRange { return e.Expr.PositionRange() }
// VectorSelector represents a Vector selection.
type VectorSelector struct {
Name string
Name string
// OriginalOffset is the actual offset that was set in the query.
// This never changes.
OriginalOffset time.Duration
// Offset is the offset used during the query execution
// which is calculated using the original offset, at modifier time,
// eval time, and subquery offsets in the AST tree.
Offset time.Duration
Timestamp *int64
LabelMatchers []*labels.Matcher
// The unexpanded seriesSet populated at query preparation time.
@ -203,17 +228,19 @@ func (e *BinaryExpr) Type() ValueType {
}
return ValueTypeVector
}
func (e *StepInvariantExpr) Type() ValueType { return e.Expr.Type() }
func (*AggregateExpr) PromQLExpr() {}
func (*BinaryExpr) PromQLExpr() {}
func (*Call) PromQLExpr() {}
func (*MatrixSelector) PromQLExpr() {}
func (*SubqueryExpr) PromQLExpr() {}
func (*NumberLiteral) PromQLExpr() {}
func (*ParenExpr) PromQLExpr() {}
func (*StringLiteral) PromQLExpr() {}
func (*UnaryExpr) PromQLExpr() {}
func (*VectorSelector) PromQLExpr() {}
func (*AggregateExpr) PromQLExpr() {}
func (*BinaryExpr) PromQLExpr() {}
func (*Call) PromQLExpr() {}
func (*MatrixSelector) PromQLExpr() {}
func (*SubqueryExpr) PromQLExpr() {}
func (*NumberLiteral) PromQLExpr() {}
func (*ParenExpr) PromQLExpr() {}
func (*StringLiteral) PromQLExpr() {}
func (*UnaryExpr) PromQLExpr() {}
func (*VectorSelector) PromQLExpr() {}
func (*StepInvariantExpr) PromQLExpr() {}
// VectorMatchCardinality describes the cardinality relationship
// of two Vectors in a binary operation.
@ -347,6 +374,8 @@ func Children(node Node) []Node {
return []Node{n.Expr}
case *MatrixSelector:
return []Node{n.VectorSelector}
case *StepInvariantExpr:
return []Node{n.Expr}
case *NumberLiteral, *StringLiteral, *VectorSelector:
// nothing to do
return []Node{}

View file

@ -83,6 +83,7 @@ NEQ
NEQ_REGEX
POW
SUB
AT
%token operatorsEnd
// Aggregators.
@ -137,8 +138,8 @@ START_METRIC_SELECTOR
%type <strings> grouping_label_list grouping_labels maybe_grouping_labels
%type <series> series_item series_values
%type <uint> uint
%type <float> number series_value signed_number
%type <node> aggregate_expr aggregate_modifier bin_modifier binary_expr bool_modifier expr function_call function_call_args function_call_body group_modifiers label_matchers matrix_selector number_literal offset_expr on_or_ignoring paren_expr string_literal subquery_expr unary_expr vector_selector
%type <float> number series_value signed_number signed_or_unsigned_number
%type <node> step_invariant_expr aggregate_expr aggregate_modifier bin_modifier binary_expr bool_modifier expr function_call function_call_args function_call_body group_modifiers label_matchers matrix_selector number_literal offset_expr on_or_ignoring paren_expr string_literal subquery_expr unary_expr vector_selector
%type <duration> duration maybe_duration
%start start
@ -187,6 +188,7 @@ expr :
| subquery_expr
| unary_expr
| vector_selector
| step_invariant_expr
;
/*
@ -200,8 +202,8 @@ aggregate_expr : aggregate_op aggregate_modifier function_call_body
| aggregate_op function_call_body
{ $$ = yylex.(*parser).newAggregateExpr($1, &AggregateExpr{}, $2) }
| aggregate_op error
{
yylex.(*parser).unexpected("aggregation","");
{
yylex.(*parser).unexpected("aggregation","");
$$ = yylex.(*parser).newAggregateExpr($1, &AggregateExpr{}, Expressions{})
}
;
@ -380,6 +382,19 @@ offset_expr: expr OFFSET duration
| expr OFFSET error
{ yylex.(*parser).unexpected("offset", "duration"); $$ = $1 }
;
/*
* @ modifiers.
*/
step_invariant_expr: expr AT signed_or_unsigned_number
{
yylex.(*parser).setTimestamp($1, $3)
$$ = $1
}
| expr AT error
{ yylex.(*parser).unexpected("@", "timestamp"); $$ = $1 }
;
/*
* Subquery and range selectors.
@ -391,8 +406,10 @@ matrix_selector : expr LEFT_BRACKET duration RIGHT_BRACKET
vs, ok := $1.(*VectorSelector)
if !ok{
errMsg = "ranges only allowed for vector selectors"
} else if vs.Offset != 0{
} else if vs.OriginalOffset != 0{
errMsg = "no offset modifiers allowed before range"
} else if vs.Timestamp != nil {
errMsg = "no @ modifiers allowed before range"
}
if errMsg != ""{
@ -664,6 +681,8 @@ signed_number : ADD number { $$ = $2 }
| SUB number { $$ = -$2 }
;
signed_or_unsigned_number: number | signed_number ;
uint : NUMBER
{
var err error

File diff suppressed because it is too large Load diff

View file

@ -440,7 +440,8 @@ func lexStatements(l *Lexer) stateFn {
}
l.emit(RIGHT_BRACKET)
l.bracketOpen = false
case r == '@':
l.emit(AT)
default:
return l.errorf("unexpected character: %q", r)
}

View file

@ -15,6 +15,7 @@ package parser
import (
"fmt"
"math"
"os"
"runtime"
"strconv"
@ -26,6 +27,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/util/strutil"
)
@ -318,7 +320,7 @@ func (p *parser) Lex(lval *yySymType) int {
case EOF:
lval.item.Typ = EOF
p.InjectItem(0)
case RIGHT_BRACE, RIGHT_PAREN, RIGHT_BRACKET, DURATION:
case RIGHT_BRACE, RIGHT_PAREN, RIGHT_BRACKET, DURATION, NUMBER:
p.lastClosing = lval.item.Pos + Pos(len(lval.item.Val))
}
@ -680,34 +682,92 @@ func (p *parser) newLabelMatcher(label Item, operator Item, value Item) *labels.
return m
}
// addOffset is used to set the offset in the generated parser.
func (p *parser) addOffset(e Node, offset time.Duration) {
var offsetp *time.Duration
var orgoffsetp *time.Duration
var endPosp *Pos
switch s := e.(type) {
case *VectorSelector:
offsetp = &s.Offset
orgoffsetp = &s.OriginalOffset
endPosp = &s.PosRange.End
case *MatrixSelector:
if vs, ok := s.VectorSelector.(*VectorSelector); ok {
offsetp = &vs.Offset
vs, ok := s.VectorSelector.(*VectorSelector)
if !ok {
p.addParseErrf(e.PositionRange(), "ranges only allowed for vector selectors")
return
}
orgoffsetp = &vs.OriginalOffset
endPosp = &s.EndPos
case *SubqueryExpr:
offsetp = &s.Offset
orgoffsetp = &s.OriginalOffset
endPosp = &s.EndPos
default:
p.addParseErrf(e.PositionRange(), "offset modifier must be preceded by an instant or range selector, but follows a %T instead", e)
p.addParseErrf(e.PositionRange(), "offset modifier must be preceded by an instant selector vector or range vector selector or a subquery")
return
}
// it is already ensured by parseDuration func that there never will be a zero offset modifier
if *offsetp != 0 {
if *orgoffsetp != 0 {
p.addParseErrf(e.PositionRange(), "offset may not be set multiple times")
} else if offsetp != nil {
*offsetp = offset
} else if orgoffsetp != nil {
*orgoffsetp = offset
}
*endPosp = p.lastClosing
}
// setTimestamp is used to set the timestamp from the @ modifier in the generated parser.
func (p *parser) setTimestamp(e Node, ts float64) {
if math.IsInf(ts, -1) || math.IsInf(ts, 1) || math.IsNaN(ts) ||
ts >= float64(math.MaxInt64) || ts <= float64(math.MinInt64) {
p.addParseErrf(e.PositionRange(), "timestamp out of bounds for @ modifier: %f", ts)
}
var timestampp **int64
var endPosp *Pos
switch s := e.(type) {
case *VectorSelector:
timestampp = &s.Timestamp
endPosp = &s.PosRange.End
case *MatrixSelector:
vs, ok := s.VectorSelector.(*VectorSelector)
if !ok {
p.addParseErrf(e.PositionRange(), "ranges only allowed for vector selectors")
return
}
timestampp = &vs.Timestamp
endPosp = &s.EndPos
case *SubqueryExpr:
timestampp = &s.Timestamp
endPosp = &s.EndPos
default:
p.addParseErrf(e.PositionRange(), "@ modifier must be preceded by an instant selector vector or range vector selector or a subquery")
return
}
if *timestampp != nil {
p.addParseErrf(e.PositionRange(), "@ <timestamp> may not be set multiple times")
} else if timestampp != nil {
*timestampp = new(int64)
**timestampp = timestamp.FromFloatSeconds(ts)
}
*endPosp = p.lastClosing
}
func MustLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher {
m, err := labels.NewMatcher(mt, name, val)
if err != nil {
panic(err)
}
return m
}
func MustGetFunction(name string) *Function {
f, ok := getFunction(name)
if !ok {
panic(errors.Errorf("function %q does not exist", name))
}
return f
}

File diff suppressed because it is too large Load diff

View file

@ -116,14 +116,24 @@ func (node *MatrixSelector) String() string {
// Copy the Vector selector before changing the offset
vecSelector := *node.VectorSelector.(*VectorSelector)
offset := ""
if vecSelector.Offset != time.Duration(0) {
offset = fmt.Sprintf(" offset %s", model.Duration(vecSelector.Offset))
if vecSelector.OriginalOffset != time.Duration(0) {
offset = fmt.Sprintf(" offset %s", model.Duration(vecSelector.OriginalOffset))
}
at := ""
if vecSelector.Timestamp != nil {
at = fmt.Sprintf(" @ %.3f", float64(*vecSelector.Timestamp)/1000.0)
}
// Do not print the offset twice.
vecSelector.Offset = 0
// Do not print the @ and offset twice.
offsetVal, atVal := vecSelector.OriginalOffset, vecSelector.Timestamp
vecSelector.OriginalOffset = 0
vecSelector.Timestamp = nil
return fmt.Sprintf("%s[%s]%s", vecSelector.String(), model.Duration(node.Range), offset)
str := fmt.Sprintf("%s[%s]%s%s", vecSelector.String(), model.Duration(node.Range), at, offset)
vecSelector.OriginalOffset, vecSelector.Timestamp = offsetVal, atVal
return str
}
func (node *SubqueryExpr) String() string {
@ -132,10 +142,14 @@ func (node *SubqueryExpr) String() string {
step = model.Duration(node.Step).String()
}
offset := ""
if node.Offset != time.Duration(0) {
offset = fmt.Sprintf(" offset %s", model.Duration(node.Offset))
if node.OriginalOffset != time.Duration(0) {
offset = fmt.Sprintf(" offset %s", model.Duration(node.OriginalOffset))
}
return fmt.Sprintf("%s[%s:%s]%s", node.Expr.String(), model.Duration(node.Range), step, offset)
at := ""
if node.Timestamp != nil {
at = fmt.Sprintf(" @ %.3f", float64(*node.Timestamp)/1000.0)
}
return fmt.Sprintf("%s[%s:%s]%s%s", node.Expr.String(), model.Duration(node.Range), step, at, offset)
}
func (node *NumberLiteral) String() string {
@ -164,13 +178,17 @@ func (node *VectorSelector) String() string {
labelStrings = append(labelStrings, matcher.String())
}
offset := ""
if node.Offset != time.Duration(0) {
offset = fmt.Sprintf(" offset %s", model.Duration(node.Offset))
if node.OriginalOffset != time.Duration(0) {
offset = fmt.Sprintf(" offset %s", model.Duration(node.OriginalOffset))
}
at := ""
if node.Timestamp != nil {
at = fmt.Sprintf(" @ %.3f", float64(*node.Timestamp)/1000.0)
}
if len(labelStrings) == 0 {
return fmt.Sprintf("%s%s", node.Name, offset)
return fmt.Sprintf("%s%s%s", node.Name, at, offset)
}
sort.Strings(labelStrings)
return fmt.Sprintf("%s{%s}%s", node.Name, strings.Join(labelStrings, ","), offset)
return fmt.Sprintf("%s{%s}%s%s", node.Name, strings.Join(labelStrings, ","), at, offset)
}

View file

@ -27,6 +27,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
@ -427,6 +428,74 @@ func (t *Test) Run() error {
return nil
}
type atModifierTestCase struct {
expr string
evalTime time.Time
}
func atModifierTestCases(exprStr string, evalTime time.Time) ([]atModifierTestCase, error) {
expr, err := parser.ParseExpr(exprStr)
if err != nil {
return nil, err
}
ts := timestamp.FromTime(evalTime)
containsNonStepInvariant := false
// Setting the @ timestamp for all selectors to be evalTime.
// If there is a subquery, then the selectors inside it don't get the @ timestamp.
// If any selector already has the @ timestamp set, then it is untouched.
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
_, _, subqTs := subqueryTimes(path)
if subqTs != nil {
// There is a subquery with timestamp in the path,
// hence don't change any timestamps further.
return nil
}
switch n := node.(type) {
case *parser.VectorSelector:
if n.Timestamp == nil {
n.Timestamp = makeInt64Pointer(ts)
}
case *parser.MatrixSelector:
if vs := n.VectorSelector.(*parser.VectorSelector); vs.Timestamp == nil {
vs.Timestamp = makeInt64Pointer(ts)
}
case *parser.SubqueryExpr:
if n.Timestamp == nil {
n.Timestamp = makeInt64Pointer(ts)
}
case *parser.Call:
_, ok := AtModifierUnsafeFunctions[n.Func.Name]
containsNonStepInvariant = containsNonStepInvariant || ok
}
return nil
})
if containsNonStepInvariant {
// Since there is a step invariant function, we cannot automatically
// generate step invariant test cases for it sanely.
return nil, nil
}
newExpr := expr.String() // With all the @ evalTime set.
additionalEvalTimes := []int64{-10 * ts, 0, ts / 5, ts, 10 * ts}
if ts == 0 {
additionalEvalTimes = []int64{-1000, -ts, 1000}
}
testCases := make([]atModifierTestCase, 0, len(additionalEvalTimes))
for _, et := range additionalEvalTimes {
testCases = append(testCases, atModifierTestCase{
expr: newExpr,
evalTime: timestamp.Time(et),
})
}
return testCases, nil
}
// exec processes a single step of the test.
func (t *Test) exec(tc testCommand) error {
switch cmd := tc.(type) {
@ -445,59 +514,66 @@ func (t *Test) exec(tc testCommand) error {
}
case *evalCmd:
q, err := t.QueryEngine().NewInstantQuery(t.storage, cmd.expr, cmd.start)
queries, err := atModifierTestCases(cmd.expr, cmd.start)
if err != nil {
return err
}
defer q.Close()
res := q.Exec(t.context)
if res.Err != nil {
if cmd.fail {
return nil
queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...)
for _, iq := range queries {
q, err := t.QueryEngine().NewInstantQuery(t.storage, iq.expr, iq.evalTime)
if err != nil {
return err
}
defer q.Close()
res := q.Exec(t.context)
if res.Err != nil {
if cmd.fail {
continue
}
return errors.Wrapf(res.Err, "error evaluating query %q (line %d)", iq.expr, cmd.line)
}
if res.Err == nil && cmd.fail {
return errors.Errorf("expected error evaluating query %q (line %d) but got none", iq.expr, cmd.line)
}
err = cmd.compareResult(res.Value)
if err != nil {
return errors.Wrapf(err, "error in %s %s", cmd, iq.expr)
}
return errors.Wrapf(res.Err, "error evaluating query %q (line %d)", cmd.expr, cmd.line)
}
if res.Err == nil && cmd.fail {
return errors.Errorf("expected error evaluating query %q (line %d) but got none", cmd.expr, cmd.line)
}
err = cmd.compareResult(res.Value)
if err != nil {
return errors.Wrapf(err, "error in %s %s", cmd, cmd.expr)
}
// Check query returns same result in range mode,
// by checking against the middle step.
q, err = t.queryEngine.NewRangeQuery(t.storage, cmd.expr, cmd.start.Add(-time.Minute), cmd.start.Add(time.Minute), time.Minute)
if err != nil {
return err
}
rangeRes := q.Exec(t.context)
if rangeRes.Err != nil {
return errors.Wrapf(rangeRes.Err, "error evaluating query %q (line %d) in range mode", cmd.expr, cmd.line)
}
defer q.Close()
if cmd.ordered {
// Ordering isn't defined for range queries.
return nil
}
mat := rangeRes.Value.(Matrix)
vec := make(Vector, 0, len(mat))
for _, series := range mat {
for _, point := range series.Points {
if point.T == timeMilliseconds(cmd.start) {
vec = append(vec, Sample{Metric: series.Metric, Point: point})
break
// Check query returns same result in range mode,
// by checking against the middle step.
q, err = t.queryEngine.NewRangeQuery(t.storage, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute)
if err != nil {
return err
}
rangeRes := q.Exec(t.context)
if rangeRes.Err != nil {
return errors.Wrapf(rangeRes.Err, "error evaluating query %q (line %d) in range mode", iq.expr, cmd.line)
}
defer q.Close()
if cmd.ordered {
// Ordering isn't defined for range queries.
continue
}
mat := rangeRes.Value.(Matrix)
vec := make(Vector, 0, len(mat))
for _, series := range mat {
for _, point := range series.Points {
if point.T == timeMilliseconds(iq.evalTime) {
vec = append(vec, Sample{Metric: series.Metric, Point: point})
break
}
}
}
}
if _, ok := res.Value.(Scalar); ok {
err = cmd.compareResult(Scalar{V: vec[0].Point.V})
} else {
err = cmd.compareResult(vec)
}
if err != nil {
return errors.Wrapf(err, "error in %s %s (line %d) rande mode", cmd, cmd.expr, cmd.line)
if _, ok := res.Value.(Scalar); ok {
err = cmd.compareResult(Scalar{V: vec[0].Point.V})
} else {
err = cmd.compareResult(vec)
}
if err != nil {
return errors.Wrapf(err, "error in %s %s (line %d) rande mode", cmd, iq.expr, cmd.line)
}
}
default:
@ -524,6 +600,7 @@ func (t *Test) clear() {
MaxSamples: 10000,
Timeout: 100 * time.Second,
NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) },
EnableAtModifier: true,
}
t.queryEngine = NewEngine(opts)
@ -633,10 +710,11 @@ func (ll *LazyLoader) clear() {
ll.storage = teststorage.New(ll)
opts := EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: 100 * time.Second,
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: 100 * time.Second,
EnableAtModifier: true,
}
ll.queryEngine = NewEngine(opts)
@ -701,3 +779,9 @@ func (ll *LazyLoader) Close() {
ll.T.Fatalf("closing test storage: %s", err)
}
}
func makeInt64Pointer(val int64) *int64 {
valp := new(int64)
*valp = val
return valp
}

166
promql/testdata/at_modifier.test vendored Normal file
View file

@ -0,0 +1,166 @@
load 10s
metric{job="1"} 0+1x1000
metric{job="2"} 0+2x1000
load 1ms
metric_ms 0+1x10000
# Instant vector selectors.
eval instant at 10s metric @ 100
metric{job="1"} 10
metric{job="2"} 20
eval instant at 10s metric @ 100 offset 50s
metric{job="1"} 5
metric{job="2"} 10
eval instant at 10s metric offset 50s @ 100
metric{job="1"} 5
metric{job="2"} 10
eval instant at 10s -metric @ 100
{job="1"} -10
{job="2"} -20
eval instant at 10s ---metric @ 100
{job="1"} -10
{job="2"} -20
# Millisecond precision.
eval instant at 100s metric_ms @ 1.234
metric_ms 1234
# Range vector selectors.
eval instant at 25s sum_over_time(metric{job="1"}[100s] @ 100)
{job="1"} 55
eval instant at 25s sum_over_time(metric{job="1"}[100s] @ 100 offset 50s)
{job="1"} 15
eval instant at 25s sum_over_time(metric{job="1"}[100s] offset 50s @ 100)
{job="1"} 15
# Different timestamps.
eval instant at 25s metric{job="1"} @ 50 + metric{job="1"} @ 100
{job="1"} 15
eval instant at 25s rate(metric{job="1"}[100s] @ 100) + label_replace(rate(metric{job="2"}[123s] @ 200), "job", "1", "", "")
{job="1"} 0.3
eval instant at 25s sum_over_time(metric{job="1"}[100s] @ 100) + label_replace(sum_over_time(metric{job="2"}[100s] @ 100), "job", "1", "", "")
{job="1"} 165
# Subqueries.
# 10*(1+2+...+9) + 10.
eval instant at 25s sum_over_time(metric{job="1"}[100s:1s] @ 100)
{job="1"} 460
# 10*(1+2+...+7) + 8.
eval instant at 25s sum_over_time(metric{job="1"}[100s:1s] @ 100 offset 20s)
{job="1"} 288
# 10*(1+2+...+7) + 8.
eval instant at 25s sum_over_time(metric{job="1"}[100s:1s] offset 20s @ 100)
{job="1"} 288
# Subquery with different timestamps.
# Since vector selector has timestamp, the result value does not depend on the timestamp of subqueries.
# Inner most sum=1+2+...+10=55.
# With [100s:25s] subquery, it's 55*5.
eval instant at 100s sum_over_time(sum_over_time(metric{job="1"}[100s] @ 100)[100s:25s] @ 50)
{job="1"} 275
# Nested subqueries with different timestamps on both.
# Since vector selector has timestamp, the result value does not depend on the timestamp of subqueries.
# Sum of innermost subquery is 275 as above. The outer subquery repeats it 4 times.
eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[100s] @ 100)[100s:25s] @ 50)[3s:1s] @ 3000)
{job="1"} 1100
# Testing the inner subquery timestamp since vector selector does not have @.
# Inner sum for subquery [100s:25s] @ 50 are
# at -50 nothing, at -25 nothing, at 0=0, at 25=2, at 50=4+5=9.
# This sum of 11 is repeated 4 times by outer subquery.
eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[10s])[100s:25s] @ 50)[3s:1s] @ 200)
{job="1"} 44
# Inner sum for subquery [100s:25s] @ 200 are
# at 100=9+10, at 125=12, at 150=14+15, at 175=17, at 200=19+20.
# This sum of 116 is repeated 4 times by outer subquery.
eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[10s])[100s:25s] @ 200)[3s:1s] @ 50)
{job="1"} 464
# Nested subqueries with timestamp only on outer subquery.
# Outer most subquery:
# at 900=783
# inner subquery: at 870=87+86+85, at 880=88+87+86, at 890=89+88+87
# at 925=537
# inner subquery: at 895=89+88, at 905=90+89, at 915=90+91
# at 950=828
# inner subquery: at 920=92+91+90, at 930=93+92+91, at 940=94+93+92
# at 975=567
# inner subquery: at 945=94+93, at 955=95+94, at 965=96+95
# at 1000=873
# inner subquery: at 970=97+96+95, at 980=98+97+96, at 990=99+98+97
eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[20s])[20s:10s] offset 10s)[100s:25s] @ 1000)
{job="1"} 3588
# minute is counted on the value of the sample.
eval instant at 10s minute(metric @ 1500)
{job="1"} 2
{job="2"} 5
# timestamp() takes the time of the sample and not the evaluation time.
eval instant at 10m timestamp(metric{job="1"} @ 10)
{job="1"} 10
# The result of inner timestamp() will have the timestamp as the
# eval time, hence entire expression is not step invariant and depends on eval time.
eval instant at 10m timestamp(timestamp(metric{job="1"} @ 10))
{job="1"} 600
eval instant at 15m timestamp(timestamp(metric{job="1"} @ 10))
{job="1"} 900
# Time functions inside a subquery.
# minute is counted on the value of the sample.
eval instant at 0s sum_over_time(minute(metric @ 1500)[100s:10s])
{job="1"} 22
{job="2"} 55
# If nothing passed, minute() takes eval time.
# Here the eval time is determined by the subquery.
# [50m:1m] at 6000, i.e. 100m, is 50m to 100m.
# sum=50+51+52+...+59+0+1+2+...+40.
eval instant at 0s sum_over_time(minute()[50m:1m] @ 6000)
{} 1365
# sum=45+46+47+...+59+0+1+2+...+35.
eval instant at 0s sum_over_time(minute()[50m:1m] @ 6000 offset 5m)
{} 1410
# time() is the eval time which is determined by subquery here.
# 2900+2901+...+3000 = (3000*3001 - 2899*2900)/2.
eval instant at 0s sum_over_time(vector(time())[100s:1s] @ 3000)
{} 297950
# 2300+2301+...+2400 = (2400*2401 - 2299*2300)/2.
eval instant at 0s sum_over_time(vector(time())[100s:1s] @ 3000 offset 600s)
{} 237350
# timestamp() takes the time of the sample and not the evaluation time.
eval instant at 0s sum_over_time(timestamp(metric{job="1"} @ 10)[100s:10s] @ 3000)
{job="1"} 110
# The result of inner timestamp() will have the timestamp as the
# eval time, hence entire expression is not step invariant and depends on eval time.
# Here eval time is determined by the subquery.
eval instant at 0s sum_over_time(timestamp(timestamp(metric{job="1"} @ 999))[10s:1s] @ 10)
{job="1"} 55
clear

View file

@ -224,6 +224,18 @@ eval instant at 50m deriv(testcounter_reset_middle[100m])
eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600)
{} 76.81818181818181
# intercept at t = 3000+3600 = 6600
eval instant at 50m predict_linear(testcounter_reset_middle[100m] @ 3000, 3600)
{} 76.81818181818181
# intercept at t = 600+3600 = 4200
eval instant at 10m predict_linear(testcounter_reset_middle[100m] @ 3000, 3600)
{} 51.36363636363637
# intercept at t = 4200+3600 = 7800
eval instant at 70m predict_linear(testcounter_reset_middle[100m] @ 3000, 3600)
{} 89.54545454545455
# With http_requests, there is a sample value exactly at the end of
# the range, and it has exactly the predicted value, so predict_linear
# can be emulated with deriv.
@ -829,8 +841,6 @@ eval instant at 16m absent_over_time(httpd_handshake_failures_total[1m])
eval instant at 16m absent_over_time({instance="127.0.0.1"}[5m])
eval instant at 16m absent_over_time({instance="127.0.0.1"}[5m])
eval instant at 21m absent_over_time({instance="127.0.0.1"}[5m])
{instance="127.0.0.1"} 1

View file

@ -105,7 +105,7 @@ eval instant at 50m histogram_quantile(0.3, avg(rate(request_duration_seconds_bu
eval instant at 50m histogram_quantile(0.5, avg(rate(request_duration_seconds_bucket[5m])) by (le))
{} 0.12777777777777778
# Aggregated histogram: By job.
# Aggregated histogram: By instance.
eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, instance))
{instance="ins1"} 0.075
{instance="ins2"} 0.075
@ -114,7 +114,7 @@ eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bu
{instance="ins1"} 0.1333333333
{instance="ins2"} 0.125
# Aggregated histogram: By instance.
# Aggregated histogram: By job.
eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, job))
{job="job1"} 0.1
{job="job2"} 0.0642857142857143
@ -190,4 +190,4 @@ load 5m
empty_bucket{le="+Inf", job="job1", instance="ins1"} 0x10
eval instant at 50m histogram_quantile(0.2, rate(empty_bucket[5m]))
{instance="ins1", job="job1"} NaN
{instance="ins1", job="job1"} NaN