Feature: Allow configuration of a rule evaluation delay (#14061)

* [PATCH] Allow having evaluation delay for rule groups

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* [PATCH] Fix lint

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* [PATCH] Move the option to ManagerOptions

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* [PATCH] Include evaluation_delay in the group config

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix comments

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Add a server configuration option.

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Appease the linter #1

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Add the new server flag documentation

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Improve documentation of the new flag and configuration

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Use named parameters for clarity on the `Rule` interface

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Add `initial` to the flag help

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Change the CHANGELOG area from `ruler` to `rules`

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Rename evaluation_delay to `rule_query_offset`/`query_offset` and make it a global configuration option.

Signed-off-by: gotjosh <josue.abreu@gmail.com>

E Your branch is up to date with 'origin/gotjosh/evaluation-delay'.

* more docs

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Improve wording on CHANGELOG

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Add `RuleQueryOffset` to the default config in tests in case it changes

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Update docs/configuration/recording_rules.md

Co-authored-by: Julius Volz <julius.volz@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Rename `RuleQueryOffset` to `QueryOffset` when in the group context.

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Improve docstring and documentation on the `rule_query_offset`

Signed-off-by: gotjosh <josue.abreu@gmail.com>

---------

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
Co-authored-by: Julius Volz <julius.volz@gmail.com>
This commit is contained in:
gotjosh 2024-05-30 11:49:50 +01:00 committed by GitHub
parent 6683895620
commit 37b408c6cd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 471 additions and 380 deletions

View file

@ -3,6 +3,7 @@
## unreleased ## unreleased
* [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980 * [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980
* [FEATURE] Rules: Add new option `query_offset` for each rule group via rule group configuration file and `rule_query_offset` as part of the global configuration to have more resilience for remote write delays. #14061
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974 * [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991 * [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991
* [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620 * [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620

View file

@ -785,6 +785,9 @@ func main() {
ResendDelay: time.Duration(cfg.resendDelay), ResendDelay: time.Duration(cfg.resendDelay),
MaxConcurrentEvals: cfg.maxConcurrentEvals, MaxConcurrentEvals: cfg.maxConcurrentEvals,
ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval, ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval,
DefaultRuleQueryOffset: func() time.Duration {
return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset)
},
}) })
} }

View file

@ -145,6 +145,7 @@ var (
ScrapeInterval: model.Duration(1 * time.Minute), ScrapeInterval: model.Duration(1 * time.Minute),
ScrapeTimeout: model.Duration(10 * time.Second), ScrapeTimeout: model.Duration(10 * time.Second),
EvaluationInterval: model.Duration(1 * time.Minute), EvaluationInterval: model.Duration(1 * time.Minute),
RuleQueryOffset: model.Duration(0 * time.Minute),
// When native histogram feature flag is enabled, ScrapeProtocols default // When native histogram feature flag is enabled, ScrapeProtocols default
// changes to DefaultNativeHistogramScrapeProtocols. // changes to DefaultNativeHistogramScrapeProtocols.
ScrapeProtocols: DefaultScrapeProtocols, ScrapeProtocols: DefaultScrapeProtocols,
@ -397,6 +398,8 @@ type GlobalConfig struct {
ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"` ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"`
// How frequently to evaluate rules by default. // How frequently to evaluate rules by default.
EvaluationInterval model.Duration `yaml:"evaluation_interval,omitempty"` EvaluationInterval model.Duration `yaml:"evaluation_interval,omitempty"`
// Offset the rule evaluation timestamp of this particular group by the specified duration into the past to ensure the underlying metrics have been received.
RuleQueryOffset model.Duration `yaml:"rule_query_offset"`
// File to which PromQL queries are logged. // File to which PromQL queries are logged.
QueryLogFile string `yaml:"query_log_file,omitempty"` QueryLogFile string `yaml:"query_log_file,omitempty"`
// The labels to add to any timeseries that this Prometheus instance scrapes. // The labels to add to any timeseries that this Prometheus instance scrapes.
@ -556,6 +559,7 @@ func (c *GlobalConfig) isZero() bool {
c.ScrapeInterval == 0 && c.ScrapeInterval == 0 &&
c.ScrapeTimeout == 0 && c.ScrapeTimeout == 0 &&
c.EvaluationInterval == 0 && c.EvaluationInterval == 0 &&
c.RuleQueryOffset == 0 &&
c.QueryLogFile == "" && c.QueryLogFile == "" &&
c.ScrapeProtocols == nil c.ScrapeProtocols == nil
} }

View file

@ -71,6 +71,10 @@ global:
# How frequently to evaluate rules. # How frequently to evaluate rules.
[ evaluation_interval: <duration> | default = 1m ] [ evaluation_interval: <duration> | default = 1m ]
# Offset the rule evaluation timestamp of this particular group by the specified duration into the past to ensure the underlying metrics have been received.
# Metric availability delays are more likely to occur when Prometheus is running as a remote write target, but can also occur when there's anomalies with scraping.
[ rule_query_offset: <duration> | default = 0s ]
# The labels to add to any time series or alerts when communicating with # The labels to add to any time series or alerts when communicating with
# external systems (federation, remote storage, Alertmanager). # external systems (federation, remote storage, Alertmanager).
external_labels: external_labels:

View file

@ -86,6 +86,9 @@ name: <string>
# rule can produce. 0 is no limit. # rule can produce. 0 is no limit.
[ limit: <int> | default = 0 ] [ limit: <int> | default = 0 ]
# Offset the rule evaluation timestamp of this particular group by the specified duration into the past.
[ query_offset: <duration> | default = global.rule_query_offset ]
rules: rules:
[ - <rule> ... ] [ - <rule> ... ]
``` ```
@ -148,6 +151,9 @@ the rule, active, pending, or inactive, are cleared as well. The event will be
recorded as an error in the evaluation, and as such no stale markers are recorded as an error in the evaluation, and as such no stale markers are
written. written.
# Rule query offset
This is useful to ensure the underlying metrics have been received and stored in Prometheus. Metric availability delays are more likely to occur when Prometheus is running as a remote write target due to the nature of distributed systems, but can also occur when there's anomalies with scraping and/or short evaluation intervals.
# Failed rule evaluations due to slow evaluation # Failed rule evaluations due to slow evaluation
If a rule group hasn't finished evaluating before its next evaluation is supposed to start (as defined by the `evaluation_interval`), the next evaluation will be skipped. Subsequent evaluations of the rule group will continue to be skipped until the initial evaluation either completes or times out. When this happens, there will be a gap in the metric produced by the recording rule. The `rule_group_iterations_missed_total` metric will be incremented for each missed iteration of the rule group. If a rule group hasn't finished evaluating before its next evaluation is supposed to start (as defined by the `evaluation_interval`), the next evaluation will be skipped. Subsequent evaluations of the rule group will continue to be skipped until the initial evaluation either completes or times out. When this happens, there will be a gap in the metric produced by the recording rule. The `rule_group_iterations_missed_total` metric will be incremented for each missed iteration of the rule group.

View file

@ -138,6 +138,7 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) {
type RuleGroup struct { type RuleGroup struct {
Name string `yaml:"name"` Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"` Interval model.Duration `yaml:"interval,omitempty"`
QueryOffset *model.Duration `yaml:"query_offset,omitempty"`
Limit int `yaml:"limit,omitempty"` Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"` Rules []RuleNode `yaml:"rules"`
} }

View file

@ -338,10 +338,9 @@ const resolvedRetention = 15 * time.Minute
// Eval evaluates the rule expression and then creates pending alerts and fires // Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly. // or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) { func (r *AlertingRule) Eval(ctx context.Context, queryOffset time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
ctx = NewOriginContext(ctx, NewRuleDetail(r)) ctx = NewOriginContext(ctx, NewRuleDetail(r))
res, err := query(ctx, r.vector.String(), ts.Add(-queryOffset))
res, err := query(ctx, r.vector.String(), ts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -484,8 +483,8 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
} }
if r.restored.Load() { if r.restored.Load() {
vec = append(vec, r.sample(a, ts)) vec = append(vec, r.sample(a, ts.Add(-queryOffset)))
vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix()))) vec = append(vec, r.forStateSample(a, ts.Add(-queryOffset), float64(a.ActiveAt.Unix())))
} }
} }

View file

@ -123,7 +123,7 @@ func TestAlertingRuleTemplateWithHistogram(t *testing.T) {
) )
evalTime := time.Now() evalTime := time.Now()
res, err := rule.Eval(context.TODO(), evalTime, q, nil, 0) res, err := rule.Eval(context.TODO(), 0, evalTime, q, nil, 0)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, res, 2) require.Len(t, res, 2)
@ -230,7 +230,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
t.Logf("case %d", i) t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute) evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime) result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err) require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -247,7 +247,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
testutil.RequireEqual(t, result, filteredRes) testutil.RequireEqual(t, result, filteredRes)
} }
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute) evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err) require.NoError(t, err)
require.Empty(t, res) require.Empty(t, res)
} }
@ -315,7 +315,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalLabels.Eval( res, err := ruleWithoutExternalLabels.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -329,7 +329,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
} }
res, err = ruleWithExternalLabels.Eval( res, err = ruleWithExternalLabels.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -408,7 +408,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalURL.Eval( res, err := ruleWithoutExternalURL.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -422,7 +422,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
} }
res, err = ruleWithExternalURL.Eval( res, err = ruleWithExternalURL.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -477,7 +477,7 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := rule.Eval( res, err := rule.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -544,7 +544,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }};
close(getDoneCh) close(getDoneCh)
}() }()
_, err = ruleWithQueryInTemplate.Eval( _, err = ruleWithQueryInTemplate.Eval(
context.TODO(), evalTime, slowQueryFunc, nil, 0, context.TODO(), 0, evalTime, slowQueryFunc, nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
} }
@ -596,7 +596,7 @@ func TestAlertingRuleDuplicate(t *testing.T) {
"", "",
true, log.NewNopLogger(), true, log.NewNopLogger(),
) )
_, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0) _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0)
require.Error(t, err) require.Error(t, err)
require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels") require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels")
} }
@ -644,7 +644,7 @@ func TestAlertingRuleLimit(t *testing.T) {
evalTime := time.Unix(0, 0) evalTime := time.Unix(0, 0)
for _, test := range tests { for _, test := range tests {
switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); { switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
case err != nil: case err != nil:
require.EqualError(t, err, test.err) require.EqualError(t, err, test.err)
case test.err != "": case test.err != "":
@ -871,7 +871,7 @@ func TestKeepFiringFor(t *testing.T) {
t.Logf("case %d", i) t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute) evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime) result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err) require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -888,7 +888,7 @@ func TestKeepFiringFor(t *testing.T) {
testutil.RequireEqual(t, result, filteredRes) testutil.RequireEqual(t, result, filteredRes)
} }
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute) evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err) require.NoError(t, err)
require.Empty(t, res) require.Empty(t, res)
} }
@ -925,7 +925,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
baseTime := time.Unix(0, 0) baseTime := time.Unix(0, 0)
result.T = timestamp.FromTime(baseTime) result.T = timestamp.FromTime(baseTime)
res, err := rule.Eval(context.TODO(), baseTime, EngineQueryFunc(testEngine, storage), nil, 0) res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, res, 2) require.Len(t, res, 2)
@ -940,7 +940,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
} }
evalTime := baseTime.Add(time.Minute) evalTime := baseTime.Add(time.Minute)
res, err = rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err) require.NoError(t, err)
require.Empty(t, res) require.Empty(t, res)
} }
@ -974,7 +974,7 @@ func TestAlertingEvalWithOrigin(t *testing.T) {
true, log.NewNopLogger(), true, log.NewNopLogger(),
) )
_, err = rule.Eval(ctx, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) { _, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
detail = FromOriginContext(ctx) detail = FromOriginContext(ctx)
return nil, nil return nil, nil
}, nil, 0) }, nil, 0)

View file

@ -47,6 +47,7 @@ type Group struct {
name string name string
file string file string
interval time.Duration interval time.Duration
queryOffset *time.Duration
limit int limit int
rules []Rule rules []Rule
seriesInPreviousEval []map[string]labels.Labels // One per Rule. seriesInPreviousEval []map[string]labels.Labels // One per Rule.
@ -90,6 +91,7 @@ type GroupOptions struct {
Rules []Rule Rules []Rule
ShouldRestore bool ShouldRestore bool
Opts *ManagerOptions Opts *ManagerOptions
QueryOffset *time.Duration
done chan struct{} done chan struct{}
EvalIterationFunc GroupEvalIterationFunc EvalIterationFunc GroupEvalIterationFunc
} }
@ -126,6 +128,7 @@ func NewGroup(o GroupOptions) *Group {
name: o.Name, name: o.Name,
file: o.File, file: o.File,
interval: o.Interval, interval: o.Interval,
queryOffset: o.QueryOffset,
limit: o.Limit, limit: o.Limit,
rules: o.Rules, rules: o.Rules,
shouldRestore: o.ShouldRestore, shouldRestore: o.ShouldRestore,
@ -443,6 +446,8 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
wg sync.WaitGroup wg sync.WaitGroup
) )
ruleQueryOffset := g.QueryOffset()
for i, rule := range g.rules { for i, rule := range g.rules {
select { select {
case <-g.done: case <-g.done:
@ -473,7 +478,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
if err != nil { if err != nil {
rule.SetHealth(HealthBad) rule.SetHealth(HealthBad)
rule.SetLastError(err) rule.SetLastError(err)
@ -562,7 +567,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
for metric, lset := range g.seriesInPreviousEval[i] { for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok { if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale. // Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) _, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN))
unwrappedErr := errors.Unwrap(err) unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil { if unwrappedErr == nil {
unwrappedErr = err unwrappedErr = err
@ -601,14 +606,27 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
g.cleanupStaleSeries(ctx, ts) g.cleanupStaleSeries(ctx, ts)
} }
func (g *Group) QueryOffset() time.Duration {
if g.queryOffset != nil {
return *g.queryOffset
}
if g.opts.DefaultRuleQueryOffset != nil {
return g.opts.DefaultRuleQueryOffset()
}
return time.Duration(0)
}
func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) { func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) {
if len(g.staleSeries) == 0 { if len(g.staleSeries) == 0 {
return return
} }
app := g.opts.Appendable.Appender(ctx) app := g.opts.Appendable.Appender(ctx)
queryOffset := g.QueryOffset()
for _, s := range g.staleSeries { for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale. // Rule that produced series no longer configured, mark it stale.
_, err := app.Append(0, s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) _, err := app.Append(0, s, timestamp.FromTime(ts.Add(-queryOffset)), math.Float64frombits(value.StaleNaN))
unwrappedErr := errors.Unwrap(err) unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil { if unwrappedErr == nil {
unwrappedErr = err unwrappedErr = err

View file

@ -116,6 +116,7 @@ type ManagerOptions struct {
ForGracePeriod time.Duration ForGracePeriod time.Duration
ResendDelay time.Duration ResendDelay time.Duration
GroupLoader GroupLoader GroupLoader GroupLoader
DefaultRuleQueryOffset func() time.Duration
MaxConcurrentEvals int64 MaxConcurrentEvals int64
ConcurrentEvalsEnabled bool ConcurrentEvalsEnabled bool
RuleConcurrencyController RuleConcurrencyController RuleConcurrencyController RuleConcurrencyController
@ -336,6 +337,7 @@ func (m *Manager) LoadGroups(
Rules: rules, Rules: rules,
ShouldRestore: shouldRestore, ShouldRestore: shouldRestore,
Opts: m.opts, Opts: m.opts,
QueryOffset: (*time.Duration)(rg.QueryOffset),
done: m.done, done: m.done,
EvalIterationFunc: groupEvalIterationFunc, EvalIterationFunc: groupEvalIterationFunc,
}) })

View file

@ -16,8 +16,10 @@ package rules
import ( import (
"context" "context"
"fmt" "fmt"
"io/fs"
"math" "math"
"os" "os"
"path"
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
@ -162,7 +164,7 @@ func TestAlertingRule(t *testing.T) {
evalTime := baseTime.Add(test.time) evalTime := baseTime.Add(test.time)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err) require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -192,6 +194,8 @@ func TestAlertingRule(t *testing.T) {
} }
func TestForStateAddSamples(t *testing.T) { func TestForStateAddSamples(t *testing.T) {
for _, queryOffset := range []time.Duration{0, time.Minute} {
t.Run(fmt.Sprintf("queryOffset %s", queryOffset.String()), func(t *testing.T) {
storage := promqltest.LoadedStorage(t, ` storage := promqltest.LoadedStorage(t, `
load 5m load 5m
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85 http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85
@ -299,7 +303,7 @@ func TestForStateAddSamples(t *testing.T) {
var forState float64 var forState float64
for i, test := range tests { for i, test := range tests {
t.Logf("case %d", i) t.Logf("case %d", i)
evalTime := baseTime.Add(test.time) evalTime := baseTime.Add(test.time).Add(queryOffset)
if test.persistThisTime { if test.persistThisTime {
forState = float64(evalTime.Unix()) forState = float64(evalTime.Unix())
@ -308,7 +312,7 @@ func TestForStateAddSamples(t *testing.T) {
forState = float64(value.StaleNaN) forState = float64(value.StaleNaN)
} }
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) res, err := rule.Eval(context.TODO(), queryOffset, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err) require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS' samples. var filteredRes promql.Vector // After removing 'ALERTS' samples.
@ -322,7 +326,7 @@ func TestForStateAddSamples(t *testing.T) {
} }
} }
for i := range test.result { for i := range test.result {
test.result[i].T = timestamp.FromTime(evalTime) test.result[i].T = timestamp.FromTime(evalTime.Add(-queryOffset))
// Updating the expected 'for' state. // Updating the expected 'for' state.
if test.result[i].F >= 0 { if test.result[i].F >= 0 {
test.result[i].F = forState test.result[i].F = forState
@ -339,6 +343,8 @@ func TestForStateAddSamples(t *testing.T) {
require.Zero(t, aa.Labels.Get(model.MetricNameLabel), "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels) require.Zero(t, aa.Labels.Get(model.MetricNameLabel), "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
} }
} }
})
}
} }
// sortAlerts sorts `[]*Alert` w.r.t. the Labels. // sortAlerts sorts `[]*Alert` w.r.t. the Labels.
@ -349,6 +355,8 @@ func sortAlerts(items []*Alert) {
} }
func TestForStateRestore(t *testing.T) { func TestForStateRestore(t *testing.T) {
for _, queryOffset := range []time.Duration{0, time.Minute} {
t.Run(fmt.Sprintf("queryOffset %s", queryOffset.String()), func(t *testing.T) {
storage := promqltest.LoadedStorage(t, ` storage := promqltest.LoadedStorage(t, `
load 5m load 5m
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 50 0 0 25 0 0 40 0 120 http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 50 0 0 25 0 0 40 0 120
@ -465,12 +473,13 @@ func TestForStateRestore(t *testing.T) {
Rules: []Rule{newRule}, Rules: []Rule{newRule},
ShouldRestore: true, ShouldRestore: true,
Opts: opts, Opts: opts,
QueryOffset: &queryOffset,
}) })
newGroups := make(map[string]*Group) newGroups := make(map[string]*Group)
newGroups["default;"] = newGroup newGroups["default;"] = newGroup
restoreTime := baseTime.Add(tt.restoreDuration) restoreTime := baseTime.Add(tt.restoreDuration).Add(queryOffset)
// First eval before restoration. // First eval before restoration.
newGroup.Eval(context.TODO(), restoreTime) newGroup.Eval(context.TODO(), restoreTime)
// Restore happens here. // Restore happens here.
@ -510,15 +519,18 @@ func TestForStateRestore(t *testing.T) {
// Difference in time should be within 1e6 ns, i.e. 1ms // Difference in time should be within 1e6 ns, i.e. 1ms
// (due to conversion between ns & ms, float64 & int64). // (due to conversion between ns & ms, float64 & int64).
activeAtDiff := float64(e.ActiveAt.Unix() + int64(tt.downDuration/time.Second) - got[i].ActiveAt.Unix()) activeAtDiff := queryOffset.Seconds() + float64(e.ActiveAt.Unix()+int64(tt.downDuration/time.Second)-got[i].ActiveAt.Unix())
require.Equal(t, 0.0, math.Abs(activeAtDiff), "'for' state restored time is wrong") require.Equal(t, 0.0, math.Abs(activeAtDiff), "'for' state restored time is wrong")
} }
} }
}) })
} }
})
}
} }
func TestStaleness(t *testing.T) { func TestStaleness(t *testing.T) {
for _, queryOffset := range []time.Duration{0, time.Minute} {
st := teststorage.New(t) st := teststorage.New(t)
defer st.Close() defer st.Close()
engineOpts := promql.EngineOpts{ engineOpts := promql.EngineOpts{
@ -545,6 +557,7 @@ func TestStaleness(t *testing.T) {
Rules: []Rule{rule}, Rules: []Rule{rule},
ShouldRestore: true, ShouldRestore: true,
Opts: opts, Opts: opts,
QueryOffset: &queryOffset,
}) })
// A time series that has two samples and then goes stale. // A time series that has two samples and then goes stale.
@ -559,9 +572,9 @@ func TestStaleness(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Execute 3 times, 1 second apart. // Execute 3 times, 1 second apart.
group.Eval(ctx, time.Unix(0, 0)) group.Eval(ctx, time.Unix(0, 0).Add(queryOffset))
group.Eval(ctx, time.Unix(1, 0)) group.Eval(ctx, time.Unix(1, 0).Add(queryOffset))
group.Eval(ctx, time.Unix(2, 0)) group.Eval(ctx, time.Unix(2, 0).Add(queryOffset))
querier, err := st.Querier(0, 2000) querier, err := st.Querier(0, 2000)
require.NoError(t, err) require.NoError(t, err)
@ -586,6 +599,7 @@ func TestStaleness(t *testing.T) {
} }
require.Equal(t, want, samples) require.Equal(t, want, samples)
}
} }
// Convert a SeriesSet into a form usable with require.Equal. // Convert a SeriesSet into a form usable with require.Equal.
@ -609,6 +623,46 @@ func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.FPoint, error) {
return result, ss.Err() return result, ss.Err()
} }
func TestGroup_QueryOffset(t *testing.T) {
config := `
groups:
- name: group1
query_offset: 2m
- name: group2
query_offset: 0s
- name: group3
`
dir := t.TempDir()
fname := path.Join(dir, "rules.yaml")
err := os.WriteFile(fname, []byte(config), fs.ModePerm)
require.NoError(t, err)
m := NewManager(&ManagerOptions{
Logger: log.NewNopLogger(),
DefaultRuleQueryOffset: func() time.Duration {
return time.Minute
},
})
m.start()
err = m.Update(time.Second, []string{fname}, labels.EmptyLabels(), "", nil)
require.NoError(t, err)
rgs := m.RuleGroups()
sort.Slice(rgs, func(i, j int) bool {
return rgs[i].Name() < rgs[j].Name()
})
// From config.
require.Equal(t, 2*time.Minute, rgs[0].QueryOffset())
// Setting 0 in config is detected.
require.Equal(t, time.Duration(0), rgs[1].QueryOffset())
// Default when nothing is set.
require.Equal(t, time.Minute, rgs[2].QueryOffset())
m.Stop()
}
func TestCopyState(t *testing.T) { func TestCopyState(t *testing.T) {
oldGroup := &Group{ oldGroup := &Group{
rules: []Rule{ rules: []Rule{

View file

@ -31,7 +31,7 @@ type unknownRule struct{}
func (u unknownRule) Name() string { return "" } func (u unknownRule) Name() string { return "" }
func (u unknownRule) Labels() labels.Labels { return labels.EmptyLabels() } func (u unknownRule) Labels() labels.Labels { return labels.EmptyLabels() }
func (u unknownRule) Eval(context.Context, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error) { func (u unknownRule) Eval(context.Context, time.Duration, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error) {
return nil, nil return nil, nil
} }
func (u unknownRule) String() string { return "" } func (u unknownRule) String() string { return "" }

View file

@ -77,10 +77,9 @@ func (rule *RecordingRule) Labels() labels.Labels {
} }
// Eval evaluates the rule and then overrides the metric names and labels accordingly. // Eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) { func (rule *RecordingRule) Eval(ctx context.Context, queryOffset time.Duration, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) {
ctx = NewOriginContext(ctx, NewRuleDetail(rule)) ctx = NewOriginContext(ctx, NewRuleDetail(rule))
vector, err := query(ctx, rule.vector.String(), ts.Add(-queryOffset))
vector, err := query(ctx, rule.vector.String(), ts)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -126,7 +126,7 @@ func TestRuleEval(t *testing.T) {
for _, scenario := range ruleEvalTestScenarios { for _, scenario := range ruleEvalTestScenarios {
t.Run(scenario.name, func(t *testing.T) { t.Run(scenario.name, func(t *testing.T) {
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels) rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
result, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0) result, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err) require.NoError(t, err)
testutil.RequireEqual(t, scenario.expected, result) testutil.RequireEqual(t, scenario.expected, result)
}) })
@ -144,7 +144,7 @@ func BenchmarkRuleEval(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0) _, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0)
if err != nil { if err != nil {
require.NoError(b, err) require.NoError(b, err)
} }
@ -173,7 +173,7 @@ func TestRuleEvalDuplicate(t *testing.T) {
expr, _ := parser.ParseExpr(`vector(0) or label_replace(vector(0),"test","x","","")`) expr, _ := parser.ParseExpr(`vector(0) or label_replace(vector(0),"test","x","","")`)
rule := NewRecordingRule("foo", expr, labels.FromStrings("test", "test")) rule := NewRecordingRule("foo", expr, labels.FromStrings("test", "test"))
_, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0) _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0)
require.Error(t, err) require.Error(t, err)
require.EqualError(t, err, "vector contains metrics with the same labelset after applying rule labels") require.EqualError(t, err, "vector contains metrics with the same labelset after applying rule labels")
} }
@ -215,7 +215,7 @@ func TestRecordingRuleLimit(t *testing.T) {
evalTime := time.Unix(0, 0) evalTime := time.Unix(0, 0)
for _, test := range tests { for _, test := range tests {
switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); { switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
case err != nil: case err != nil:
require.EqualError(t, err, test.err) require.EqualError(t, err, test.err)
case test.err != "": case test.err != "":
@ -243,7 +243,7 @@ func TestRecordingEvalWithOrigin(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
rule := NewRecordingRule(name, expr, lbs) rule := NewRecordingRule(name, expr, lbs)
_, err = rule.Eval(ctx, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) { _, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
detail = FromOriginContext(ctx) detail = FromOriginContext(ctx)
return nil, nil return nil, nil
}, nil, 0) }, nil, 0)

View file

@ -40,7 +40,7 @@ type Rule interface {
// Labels of the rule. // Labels of the rule.
Labels() labels.Labels Labels() labels.Labels
// Eval evaluates the rule, including any associated recording or alerting actions. // Eval evaluates the rule, including any associated recording or alerting actions.
Eval(context.Context, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error) Eval(ctx context.Context, queryOffset time.Duration, evaluationTime time.Time, queryFunc QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error)
// String returns a human-readable string representation of the rule. // String returns a human-readable string representation of the rule.
String() string String() string
// Query returns the rule query expression. // Query returns the rule query expression.