Introduce evaluation delay for rule groups (#155)

* Allow having evaluation delay for rule groups

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix lint

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Move the option to ManagerOptions

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Include evaluation_delay in the group config

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix comments

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2022-03-14 18:50:07 +05:30 committed by GitHub
parent f8e3195f75
commit 23ce9ad9f0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 361 additions and 292 deletions

View file

@ -95,7 +95,8 @@ func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
// importRule queries a prometheus API to evaluate rules at times in the past. // importRule queries a prometheus API to evaluate rules at times in the past.
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time,
maxBlockDuration int64, grp *rules.Group) (err error) { maxBlockDuration int64, grp *rules.Group,
) (err error) {
blockDuration := getCompatibleBlockDuration(maxBlockDuration) blockDuration := getCompatibleBlockDuration(maxBlockDuration)
startInMs := start.Unix() * int64(time.Second/time.Millisecond) startInMs := start.Unix() * int64(time.Second/time.Millisecond)
endInMs := end.Unix() * int64(time.Second/time.Millisecond) endInMs := end.Unix() * int64(time.Second/time.Millisecond)

View file

@ -50,7 +50,8 @@ type HypervisorDiscovery struct {
// newHypervisorDiscovery returns a new hypervisor discovery. // newHypervisorDiscovery returns a new hypervisor discovery.
func newHypervisorDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions, func newHypervisorDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions,
port int, region string, availability gophercloud.Availability, l log.Logger) *HypervisorDiscovery { port int, region string, availability gophercloud.Availability, l log.Logger,
) *HypervisorDiscovery {
return &HypervisorDiscovery{ return &HypervisorDiscovery{
provider: provider, authOpts: opts, provider: provider, authOpts: opts,
region: region, port: port, availability: availability, logger: l, region: region, port: port, availability: availability, logger: l,

View file

@ -59,7 +59,8 @@ type InstanceDiscovery struct {
// NewInstanceDiscovery returns a new instance discovery. // NewInstanceDiscovery returns a new instance discovery.
func newInstanceDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions, func newInstanceDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions,
port int, region string, allTenants bool, availability gophercloud.Availability, l log.Logger) *InstanceDiscovery { port int, region string, allTenants bool, availability gophercloud.Availability, l log.Logger,
) *InstanceDiscovery {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }

View file

@ -143,21 +143,19 @@ func TestTargetGroupYamlUnmarshal(t *testing.T) {
func TestString(t *testing.T) { func TestString(t *testing.T) {
// String() should return only the source, regardless of other attributes. // String() should return only the source, regardless of other attributes.
group1 := group1 := Group{
Group{ Targets: []model.LabelSet{
Targets: []model.LabelSet{ {"__address__": "localhost:9090"},
{"__address__": "localhost:9090"}, {"__address__": "localhost:9091"},
{"__address__": "localhost:9091"}, },
}, Source: "<source>",
Source: "<source>", Labels: model.LabelSet{"foo": "bar", "bar": "baz"},
Labels: model.LabelSet{"foo": "bar", "bar": "baz"}, }
} group2 := Group{
group2 := Targets: []model.LabelSet{},
Group{ Source: "<source>",
Targets: []model.LabelSet{}, Labels: model.LabelSet{},
Source: "<source>", }
Labels: model.LabelSet{},
}
require.Equal(t, "<source>", group1.String()) require.Equal(t, "<source>", group1.String())
require.Equal(t, "<source>", group2.String()) require.Equal(t, "<source>", group2.String())
require.Equal(t, group1.String(), group2.String()) require.Equal(t, group1.String(), group2.String())

View file

@ -84,7 +84,6 @@ func generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) map[s
} }
sdGroup := customSD{ sdGroup := customSD{
Targets: newTargets, Targets: newTargets,
Labels: newLabels, Labels: newLabels,
} }

View file

@ -116,11 +116,12 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) {
// RuleGroup is a list of sequentially evaluated recording and alerting rules. // RuleGroup is a list of sequentially evaluated recording and alerting rules.
type RuleGroup struct { type RuleGroup struct {
Name string `yaml:"name"` Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"` Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"` EvaluationDelay *model.Duration `yaml:"evaluation_delay,omitempty"`
Rules []RuleNode `yaml:"rules"` Limit int `yaml:"limit,omitempty"`
SourceTenants []string `yaml:"source_tenants,omitempty"` Rules []RuleNode `yaml:"rules"`
SourceTenants []string `yaml:"source_tenants,omitempty"`
} }
// Rule describes an alerting or recording rule. // Rule describes an alerting or recording rule.

View file

@ -611,7 +611,6 @@ var tests = []struct {
{ // Nested Subquery. { // Nested Subquery.
input: `min_over_time(rate(foo{bar="baz"}[2s])[5m:])[4m:3s]`, input: `min_over_time(rate(foo{bar="baz"}[2s])[5m:])[4m:3s]`,
expected: []Item{ expected: []Item{
{IDENTIFIER, 0, `min_over_time`}, {IDENTIFIER, 0, `min_over_time`},
{LEFT_PAREN, 13, `(`}, {LEFT_PAREN, 13, `(`},
{IDENTIFIER, 14, `rate`}, {IDENTIFIER, 14, `rate`},
@ -660,7 +659,6 @@ var tests = []struct {
{ {
input: `min_over_time(rate(foo{bar="baz"}[2s])[5m:] offset 6m)[4m:3s]`, input: `min_over_time(rate(foo{bar="baz"}[2s])[5m:] offset 6m)[4m:3s]`,
expected: []Item{ expected: []Item{
{IDENTIFIER, 0, `min_over_time`}, {IDENTIFIER, 0, `min_over_time`},
{LEFT_PAREN, 13, `(`}, {LEFT_PAREN, 13, `(`},
{IDENTIFIER, 14, `rate`}, {IDENTIFIER, 14, `rate`},

View file

@ -304,8 +304,8 @@ const resolvedRetention = 15 * time.Minute
// Eval evaluates the rule expression and then creates pending alerts and fires // Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly. // or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) { func (r *AlertingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
res, err := query(ctx, r.vector.String(), ts) res, err := query(ctx, r.vector.String(), ts.Add(-evalDelay))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -419,8 +419,8 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
} }
if r.restored { if r.restored {
vec = append(vec, r.sample(a, ts)) vec = append(vec, r.sample(a, ts.Add(-evalDelay)))
vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix()))) vec = append(vec, r.forStateSample(a, ts.Add(-evalDelay), float64(a.ActiveAt.Unix())))
} }
} }

View file

@ -170,7 +170,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
t.Logf("case %d", i) t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute) evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].Point.T = timestamp.FromTime(evalTime) result[0].Point.T = timestamp.FromTime(evalTime)
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0) res, err := rule.Eval(suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0)
require.NoError(t, err) require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -252,7 +252,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalLabels.Eval( res, err := ruleWithoutExternalLabels.Eval(
suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -266,7 +266,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
} }
res, err = ruleWithExternalLabels.Eval( res, err = ruleWithExternalLabels.Eval(
suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -346,7 +346,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalURL.Eval( res, err := ruleWithoutExternalURL.Eval(
suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -360,7 +360,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
} }
res, err = ruleWithExternalURL.Eval( res, err = ruleWithExternalURL.Eval(
suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -417,7 +417,7 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := rule.Eval( res, err := rule.Eval(
suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -460,7 +460,7 @@ func TestAlertingRuleDuplicate(t *testing.T) {
"", "",
true, log.NewNopLogger(), true, log.NewNopLogger(),
) )
_, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0) _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0)
require.Error(t, err) require.Error(t, err)
require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels") require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels")
} }
@ -510,7 +510,7 @@ func TestAlertingRuleLimit(t *testing.T) {
evalTime := time.Unix(0, 0) evalTime := time.Unix(0, 0)
for _, test := range tests { for _, test := range tests {
_, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, test.limit) _, err := rule.Eval(suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, test.limit)
if err != nil { if err != nil {
require.EqualError(t, err, test.err) require.EqualError(t, err, test.err)
} else if test.err != "" { } else if test.err != "" {

View file

@ -212,8 +212,9 @@ type Rule interface {
Name() string Name() string
// Labels of the rule. // Labels of the rule.
Labels() labels.Labels Labels() labels.Labels
// eval evaluates the rule, including any associated recording or alerting actions. // Eval evaluates the rule, including any associated recording or alerting actions.
Eval(context.Context, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error) // The duration passed is the evaluation delay.
Eval(context.Context, time.Duration, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error)
// String returns a human-readable string representation of the rule. // String returns a human-readable string representation of the rule.
String() string String() string
// Query returns the rule query expression. // Query returns the rule query expression.
@ -244,6 +245,7 @@ type Group struct {
name string name string
file string file string
interval time.Duration interval time.Duration
evaluationDelay *time.Duration
limit int limit int
rules []Rule rules []Rule
sourceTenants []string sourceTenants []string
@ -267,14 +269,15 @@ type Group struct {
} }
type GroupOptions struct { type GroupOptions struct {
Name, File string Name, File string
Interval time.Duration Interval time.Duration
Limit int Limit int
Rules []Rule Rules []Rule
SourceTenants []string SourceTenants []string
ShouldRestore bool ShouldRestore bool
Opts *ManagerOptions Opts *ManagerOptions
done chan struct{} EvaluationDelay *time.Duration
done chan struct{}
} }
// NewGroup makes a new Group with the given name, options, and rules. // NewGroup makes a new Group with the given name, options, and rules.
@ -299,6 +302,7 @@ func NewGroup(o GroupOptions) *Group {
name: o.Name, name: o.Name,
file: o.File, file: o.File,
interval: o.Interval, interval: o.Interval,
evaluationDelay: o.EvaluationDelay,
limit: o.Limit, limit: o.Limit,
rules: o.Rules, rules: o.Rules,
shouldRestore: o.ShouldRestore, shouldRestore: o.ShouldRestore,
@ -583,6 +587,7 @@ func (g *Group) CopyState(from *Group) {
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially. // Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) { func (g *Group) Eval(ctx context.Context, ts time.Time) {
var samplesTotal float64 var samplesTotal float64
evaluationDelay := g.EvaluationDelay()
for i, rule := range g.rules { for i, rule := range g.rules {
select { select {
case <-g.done: case <-g.done:
@ -604,7 +609,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) vector, err := rule.Eval(ctx, evaluationDelay, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
if err != nil { if err != nil {
rule.SetHealth(HealthBad) rule.SetHealth(HealthBad)
rule.SetLastError(err) rule.SetLastError(err)
@ -673,7 +678,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
for metric, lset := range g.seriesInPreviousEval[i] { for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok { if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale. // Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) _, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-evaluationDelay)), math.Float64frombits(value.StaleNaN))
switch errors.Cause(err) { switch errors.Cause(err) {
case nil: case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
@ -692,14 +697,25 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
g.cleanupStaleSeries(ctx, ts) g.cleanupStaleSeries(ctx, ts)
} }
func (g *Group) EvaluationDelay() time.Duration {
if g.evaluationDelay != nil {
return *g.evaluationDelay
}
if g.opts.DefaultEvaluationDelay != nil {
return g.opts.DefaultEvaluationDelay()
}
return time.Duration(0)
}
func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) { func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) {
if len(g.staleSeries) == 0 { if len(g.staleSeries) == 0 {
return return
} }
app := g.opts.Appendable.Appender(ctx) app := g.opts.Appendable.Appender(ctx)
evaluationDelay := g.EvaluationDelay()
for _, s := range g.staleSeries { for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale. // Rule that produced series no longer configured, mark it stale.
_, err := app.Append(0, s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) _, err := app.Append(0, s, timestamp.FromTime(ts.Add(-evaluationDelay)), math.Float64frombits(value.StaleNaN))
switch errors.Cause(err) { switch errors.Cause(err) {
case nil: case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
@ -935,6 +951,7 @@ type ManagerOptions struct {
ForGracePeriod time.Duration ForGracePeriod time.Duration
ResendDelay time.Duration ResendDelay time.Duration
GroupLoader GroupLoader GroupLoader GroupLoader
DefaultEvaluationDelay func() time.Duration
Metrics *Metrics Metrics *Metrics
} }
@ -1131,15 +1148,16 @@ func (m *Manager) LoadGroups(
} }
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name, Name: rg.Name,
File: fn, File: fn,
Interval: itv, Interval: itv,
Limit: rg.Limit, Limit: rg.Limit,
Rules: rules, Rules: rules,
SourceTenants: rg.SourceTenants, SourceTenants: rg.SourceTenants,
ShouldRestore: shouldRestore, ShouldRestore: shouldRestore,
Opts: m.opts, Opts: m.opts,
done: m.done, EvaluationDelay: (*time.Duration)(rg.EvaluationDelay),
done: m.done,
}) })
} }
} }

View file

@ -16,9 +16,11 @@ package rules
import ( import (
"context" "context"
"fmt" "fmt"
"io/fs"
"io/ioutil" "io/ioutil"
"math" "math"
"os" "os"
"path"
"sort" "sort"
"testing" "testing"
"time" "time"
@ -158,7 +160,7 @@ func TestAlertingRule(t *testing.T) {
evalTime := baseTime.Add(test.time) evalTime := baseTime.Add(test.time)
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0) res, err := rule.Eval(suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0)
require.NoError(t, err) require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -188,156 +190,160 @@ func TestAlertingRule(t *testing.T) {
} }
func TestForStateAddSamples(t *testing.T) { func TestForStateAddSamples(t *testing.T) {
suite, err := promql.NewTest(t, ` for _, evalDelay := range []time.Duration{0, time.Minute} {
t.Run(fmt.Sprintf("evalDelay %s", evalDelay.String()), func(t *testing.T) {
suite, err := promql.NewTest(t, `
load 5m load 5m
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85 http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85
http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 80 90 100 110 120 130 140 http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 80 90 100 110 120 130 140
`) `)
require.NoError(t, err) require.NoError(t, err)
defer suite.Close() defer suite.Close()
err = suite.Run() err = suite.Run()
require.NoError(t, err) require.NoError(t, err)
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
require.NoError(t, err) require.NoError(t, err)
rule := NewAlertingRule( rule := NewAlertingRule(
"HTTPRequestRateLow", "HTTPRequestRateLow",
expr, expr,
time.Minute, time.Minute,
labels.FromStrings("severity", "{{\"c\"}}ritical"), labels.FromStrings("severity", "{{\"c\"}}ritical"),
nil, nil, "", true, nil, nil, nil, "", true, nil,
) )
result := promql.Vector{ result := promql.Vector{
{ {
Metric: labels.FromStrings( Metric: labels.FromStrings(
"__name__", "ALERTS_FOR_STATE", "__name__", "ALERTS_FOR_STATE",
"alertname", "HTTPRequestRateLow", "alertname", "HTTPRequestRateLow",
"group", "canary", "group", "canary",
"instance", "0", "instance", "0",
"job", "app-server", "job", "app-server",
"severity", "critical", "severity", "critical",
), ),
Point: promql.Point{V: 1}, Point: promql.Point{V: 1},
}, },
{ {
Metric: labels.FromStrings( Metric: labels.FromStrings(
"__name__", "ALERTS_FOR_STATE", "__name__", "ALERTS_FOR_STATE",
"alertname", "HTTPRequestRateLow", "alertname", "HTTPRequestRateLow",
"group", "canary", "group", "canary",
"instance", "1", "instance", "1",
"job", "app-server", "job", "app-server",
"severity", "critical", "severity", "critical",
), ),
Point: promql.Point{V: 1}, Point: promql.Point{V: 1},
}, },
{ {
Metric: labels.FromStrings( Metric: labels.FromStrings(
"__name__", "ALERTS_FOR_STATE", "__name__", "ALERTS_FOR_STATE",
"alertname", "HTTPRequestRateLow", "alertname", "HTTPRequestRateLow",
"group", "canary", "group", "canary",
"instance", "0", "instance", "0",
"job", "app-server", "job", "app-server",
"severity", "critical", "severity", "critical",
), ),
Point: promql.Point{V: 1}, Point: promql.Point{V: 1},
}, },
{ {
Metric: labels.FromStrings( Metric: labels.FromStrings(
"__name__", "ALERTS_FOR_STATE", "__name__", "ALERTS_FOR_STATE",
"alertname", "HTTPRequestRateLow", "alertname", "HTTPRequestRateLow",
"group", "canary", "group", "canary",
"instance", "1", "instance", "1",
"job", "app-server", "job", "app-server",
"severity", "critical", "severity", "critical",
), ),
Point: promql.Point{V: 1}, Point: promql.Point{V: 1},
}, },
}
baseTime := time.Unix(0, 0)
tests := []struct {
time time.Duration
result promql.Vector
persistThisTime bool // If true, it means this 'time' is persisted for 'for'.
}{
{
time: 0,
result: append(promql.Vector{}, result[:2]...),
persistThisTime: true,
},
{
time: 5 * time.Minute,
result: append(promql.Vector{}, result[2:]...),
},
{
time: 10 * time.Minute,
result: append(promql.Vector{}, result[2:3]...),
},
{
time: 15 * time.Minute,
result: nil,
},
{
time: 20 * time.Minute,
result: nil,
},
{
time: 25 * time.Minute,
result: append(promql.Vector{}, result[:1]...),
persistThisTime: true,
},
{
time: 30 * time.Minute,
result: append(promql.Vector{}, result[2:3]...),
},
}
var forState float64
for i, test := range tests {
t.Logf("case %d", i)
evalTime := baseTime.Add(test.time)
if test.persistThisTime {
forState = float64(evalTime.Unix())
}
if test.result == nil {
forState = float64(value.StaleNaN)
}
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS' samples.
for _, smpl := range res {
smplName := smpl.Metric.Get("__name__")
if smplName == "ALERTS_FOR_STATE" {
filteredRes = append(filteredRes, smpl)
} else {
// If not 'ALERTS_FOR_STATE', it has to be 'ALERTS'.
require.Equal(t, smplName, "ALERTS")
} }
}
for i := range test.result {
test.result[i].T = timestamp.FromTime(evalTime)
// Updating the expected 'for' state.
if test.result[i].V >= 0 {
test.result[i].V = forState
}
}
require.Equal(t, len(test.result), len(filteredRes), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res))
sort.Slice(filteredRes, func(i, j int) bool { baseTime := time.Unix(0, 0)
return labels.Compare(filteredRes[i].Metric, filteredRes[j].Metric) < 0
tests := []struct {
time time.Duration
result promql.Vector
persistThisTime bool // If true, it means this 'time' is persisted for 'for'.
}{
{
time: 0,
result: append(promql.Vector{}, result[:2]...),
persistThisTime: true,
},
{
time: 5 * time.Minute,
result: append(promql.Vector{}, result[2:]...),
},
{
time: 10 * time.Minute,
result: append(promql.Vector{}, result[2:3]...),
},
{
time: 15 * time.Minute,
result: nil,
},
{
time: 20 * time.Minute,
result: nil,
},
{
time: 25 * time.Minute,
result: append(promql.Vector{}, result[:1]...),
persistThisTime: true,
},
{
time: 30 * time.Minute,
result: append(promql.Vector{}, result[2:3]...),
},
}
var forState float64
for i, test := range tests {
t.Logf("case %d", i)
evalTime := baseTime.Add(test.time).Add(evalDelay)
if test.persistThisTime {
forState = float64(evalTime.Unix())
}
if test.result == nil {
forState = float64(value.StaleNaN)
}
res, err := rule.Eval(suite.Context(), evalDelay, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS' samples.
for _, smpl := range res {
smplName := smpl.Metric.Get("__name__")
if smplName == "ALERTS_FOR_STATE" {
filteredRes = append(filteredRes, smpl)
} else {
// If not 'ALERTS_FOR_STATE', it has to be 'ALERTS'.
require.Equal(t, smplName, "ALERTS")
}
}
for i := range test.result {
test.result[i].T = timestamp.FromTime(evalTime.Add(-evalDelay))
// Updating the expected 'for' state.
if test.result[i].V >= 0 {
test.result[i].V = forState
}
}
require.Equal(t, len(test.result), len(filteredRes), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res))
sort.Slice(filteredRes, func(i, j int) bool {
return labels.Compare(filteredRes[i].Metric, filteredRes[j].Metric) < 0
})
require.Equal(t, test.result, filteredRes)
for _, aa := range rule.ActiveAlerts() {
require.Zero(t, aa.Labels.Get(model.MetricNameLabel), "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
}
}
}) })
require.Equal(t, test.result, filteredRes)
for _, aa := range rule.ActiveAlerts() {
require.Zero(t, aa.Labels.Get(model.MetricNameLabel), "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
}
} }
} }
@ -441,7 +447,7 @@ func TestForStateRestore(t *testing.T) {
}, },
} }
testFunc := func(tst testInput) { testFunc := func(tst testInput, evalDelay time.Duration) {
newRule := NewAlertingRule( newRule := NewAlertingRule(
"HTTPRequestRateLow", "HTTPRequestRateLow",
expr, expr,
@ -450,17 +456,18 @@ func TestForStateRestore(t *testing.T) {
nil, nil, "", false, nil, nil, nil, "", false, nil,
) )
newGroup := NewGroup(GroupOptions{ newGroup := NewGroup(GroupOptions{
Name: "default", Name: "default",
Interval: time.Second, Interval: time.Second,
Rules: []Rule{newRule}, Rules: []Rule{newRule},
ShouldRestore: true, ShouldRestore: true,
Opts: opts, Opts: opts,
EvaluationDelay: &evalDelay,
}) })
newGroups := make(map[string]*Group) newGroups := make(map[string]*Group)
newGroups["default;"] = newGroup newGroups["default;"] = newGroup
restoreTime := baseTime.Add(tst.restoreDuration) restoreTime := baseTime.Add(tst.restoreDuration).Add(evalDelay)
// First eval before restoration. // First eval before restoration.
newGroup.Eval(suite.Context(), restoreTime) newGroup.Eval(suite.Context(), restoreTime)
// Restore happens here. // Restore happens here.
@ -495,14 +502,16 @@ func TestForStateRestore(t *testing.T) {
// Difference in time should be within 1e6 ns, i.e. 1ms // Difference in time should be within 1e6 ns, i.e. 1ms
// (due to conversion between ns & ms, float64 & int64). // (due to conversion between ns & ms, float64 & int64).
activeAtDiff := float64(e.ActiveAt.Unix() + int64(tst.downDuration/time.Second) - got[i].ActiveAt.Unix()) activeAtDiff := evalDelay.Seconds() + float64(e.ActiveAt.Unix()+int64(tst.downDuration/time.Second)-got[i].ActiveAt.Unix())
require.Equal(t, 0.0, math.Abs(activeAtDiff), "'for' state restored time is wrong") require.Equal(t, 0.0, math.Abs(activeAtDiff), "'for' state restored time is wrong")
} }
} }
} }
for _, tst := range tests { for _, evalDelay := range []time.Duration{0, time.Minute} {
testFunc(tst) for _, tst := range tests {
testFunc(tst, evalDelay)
}
} }
// Testing the grace period. // Testing the grace period.
@ -510,82 +519,88 @@ func TestForStateRestore(t *testing.T) {
evalTime := baseTime.Add(duration) evalTime := baseTime.Add(duration)
group.Eval(suite.Context(), evalTime) group.Eval(suite.Context(), evalTime)
} }
testFunc(testInput{
restoreDuration: 25 * time.Minute, for _, evalDelay := range []time.Duration{0, time.Minute} {
alerts: []*Alert{}, testFunc(testInput{
gracePeriod: true, restoreDuration: 25 * time.Minute,
num: 2, alerts: []*Alert{},
}) gracePeriod: true,
num: 2,
}, evalDelay)
}
} }
func TestStaleness(t *testing.T) { func TestStaleness(t *testing.T) {
st := teststorage.New(t) for _, evalDelay := range []time.Duration{0, time.Minute} {
defer st.Close() st := teststorage.New(t)
engineOpts := promql.EngineOpts{ defer st.Close()
Logger: nil, engineOpts := promql.EngineOpts{
Reg: nil, Logger: nil,
MaxSamples: 10, Reg: nil,
Timeout: 10 * time.Second, MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(engineOpts)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(engine, st),
Appendable: st,
Queryable: st,
Context: context.Background(),
Logger: log.NewNopLogger(),
}
expr, err := parser.ParseExpr("a + 1")
require.NoError(t, err)
rule := NewRecordingRule("a_plus_one", expr, labels.Labels{})
group := NewGroup(GroupOptions{
Name: "default",
Interval: time.Second,
Rules: []Rule{rule},
ShouldRestore: true,
Opts: opts,
EvaluationDelay: &evalDelay,
})
// A time series that has two samples and then goes stale.
app := st.Appender(context.Background())
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN))
err = app.Commit()
require.NoError(t, err)
ctx := context.Background()
// Execute 3 times, 1 second apart.
group.Eval(ctx, time.Unix(0, 0).Add(evalDelay))
group.Eval(ctx, time.Unix(1, 0).Add(evalDelay))
group.Eval(ctx, time.Unix(2, 0).Add(evalDelay))
querier, err := st.Querier(context.Background(), 0, 2000)
require.NoError(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
require.NoError(t, err)
set := querier.Select(false, nil, matcher)
samples, err := readSeriesSet(set)
require.NoError(t, err)
metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String()
metricSample, ok := samples[metric]
require.True(t, ok, "Series %s not returned.", metric)
require.True(t, value.IsStaleNaN(metricSample[2].V), "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(metricSample[2].V))
metricSample[2].V = 42 // require.Equal cannot handle NaN.
want := map[string][]promql.Point{
metric: {{T: 0, V: 2}, {T: 1000, V: 3}, {T: 2000, V: 42}},
}
require.Equal(t, want, samples)
} }
engine := promql.NewEngine(engineOpts)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(engine, st),
Appendable: st,
Queryable: st,
Context: context.Background(),
Logger: log.NewNopLogger(),
}
expr, err := parser.ParseExpr("a + 1")
require.NoError(t, err)
rule := NewRecordingRule("a_plus_one", expr, labels.Labels{})
group := NewGroup(GroupOptions{
Name: "default",
Interval: time.Second,
Rules: []Rule{rule},
ShouldRestore: true,
Opts: opts,
})
// A time series that has two samples and then goes stale.
app := st.Appender(context.Background())
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN))
err = app.Commit()
require.NoError(t, err)
ctx := context.Background()
// Execute 3 times, 1 second apart.
group.Eval(ctx, time.Unix(0, 0))
group.Eval(ctx, time.Unix(1, 0))
group.Eval(ctx, time.Unix(2, 0))
querier, err := st.Querier(context.Background(), 0, 2000)
require.NoError(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
require.NoError(t, err)
set := querier.Select(false, nil, matcher)
samples, err := readSeriesSet(set)
require.NoError(t, err)
metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String()
metricSample, ok := samples[metric]
require.True(t, ok, "Series %s not returned.", metric)
require.True(t, value.IsStaleNaN(metricSample[2].V), "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(metricSample[2].V))
metricSample[2].V = 42 // require.Equal cannot handle NaN.
want := map[string][]promql.Point{
metric: {{T: 0, V: 2}, {T: 1000, V: 3}, {T: 2000, V: 42}},
}
require.Equal(t, want, samples)
} }
// Convert a SeriesSet into a form usable with require.Equal. // Convert a SeriesSet into a form usable with require.Equal.
@ -1512,3 +1527,43 @@ func TestGroup_Equals(t *testing.T) {
}) })
} }
} }
func TestGroup_EvaluationDelay(t *testing.T) {
config := `
groups:
- name: group1
evaluation_delay: 2m
- name: group2
evaluation_delay: 0s
- name: group3
`
dir := t.TempDir()
fname := path.Join(dir, "rules.yaml")
err := ioutil.WriteFile(fname, []byte(config), fs.ModePerm)
require.NoError(t, err)
m := NewManager(&ManagerOptions{
Logger: log.NewNopLogger(),
DefaultEvaluationDelay: func() time.Duration {
return time.Minute
},
})
m.start()
err = m.Update(time.Second, []string{fname}, nil, "")
require.NoError(t, err)
rgs := m.RuleGroups()
sort.Slice(rgs, func(i, j int) bool {
return rgs[i].Name() < rgs[j].Name()
})
// From config.
require.Equal(t, 2*time.Minute, rgs[0].EvaluationDelay())
// Setting 0 in config is detected.
require.Equal(t, time.Duration(0), rgs[1].EvaluationDelay())
// Default when nothing is set.
require.Equal(t, time.Minute, rgs[2].EvaluationDelay())
m.Stop()
}

View file

@ -73,8 +73,8 @@ func (rule *RecordingRule) Labels() labels.Labels {
} }
// Eval evaluates the rule and then overrides the metric names and labels accordingly. // Eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) { func (rule *RecordingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) {
vector, err := query(ctx, rule.vector.String(), ts) vector, err := query(ctx, rule.vector.String(), ts.Add(-evalDelay))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -74,7 +74,7 @@ func TestRuleEval(t *testing.T) {
for _, test := range suite { for _, test := range suite {
rule := NewRecordingRule(test.name, test.expr, test.labels) rule := NewRecordingRule(test.name, test.expr, test.labels)
result, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0) result, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0)
if test.err == "" { if test.err == "" {
require.NoError(t, err) require.NoError(t, err)
} else { } else {
@ -119,7 +119,7 @@ func TestRuleEvalDuplicate(t *testing.T) {
expr, _ := parser.ParseExpr(`vector(0) or label_replace(vector(0),"test","x","","")`) expr, _ := parser.ParseExpr(`vector(0) or label_replace(vector(0),"test","x","","")`)
rule := NewRecordingRule("foo", expr, labels.FromStrings("test", "test")) rule := NewRecordingRule("foo", expr, labels.FromStrings("test", "test"))
_, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0) _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0)
require.Error(t, err) require.Error(t, err)
require.EqualError(t, err, "vector contains metrics with the same labelset after applying rule labels") require.EqualError(t, err, "vector contains metrics with the same labelset after applying rule labels")
} }
@ -164,7 +164,7 @@ func TestRecordingRuleLimit(t *testing.T) {
evalTime := time.Unix(0, 0) evalTime := time.Unix(0, 0)
for _, test := range tests { for _, test := range tests {
_, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, test.limit) _, err := rule.Eval(suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, test.limit)
if err != nil { if err != nil {
require.EqualError(t, err, test.err) require.EqualError(t, err, test.err)
} else if test.err != "" { } else if test.err != "" {

View file

@ -252,7 +252,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
// Walk the wal dir to make sure there are no tmp folder left behind after the error. // Walk the wal dir to make sure there are no tmp folder left behind after the error.
err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error { err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error {
if err != nil { if err != nil {
return errors.Wrapf(err, "access err %q: %v\n", path, err) return errors.Wrapf(err, "access err %q: %v", path, err)
} }
if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") { if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") {
return fmt.Errorf("wal dir contains temporary folder:%s", info.Name()) return fmt.Errorf("wal dir contains temporary folder:%s", info.Name())

View file

@ -23,7 +23,6 @@ import (
// Statfs returns the file system type (Unix only) // Statfs returns the file system type (Unix only)
func Statfs(path string) string { func Statfs(path string) string {
// Types of file systems that may be returned by `statfs` // Types of file systems that may be returned by `statfs`
fsTypes := map[int32]string{ fsTypes := map[int32]string{
0xadf5: "ADFS_SUPER_MAGIC", 0xadf5: "ADFS_SUPER_MAGIC",

View file

@ -23,7 +23,6 @@ import (
// Statfs returns the file system type (Unix only) // Statfs returns the file system type (Unix only)
func Statfs(path string) string { func Statfs(path string) string {
// Types of file systems that may be returned by `statfs` // Types of file systems that may be returned by `statfs`
fsTypes := map[uint32]string{ fsTypes := map[uint32]string{
0xadf5: "ADFS_SUPER_MAGIC", 0xadf5: "ADFS_SUPER_MAGIC",

View file

@ -150,7 +150,6 @@ type TSDBAdminStats interface {
CleanTombstones() error CleanTombstones() error
Delete(mint, maxt int64, ms ...*labels.Matcher) error Delete(mint, maxt int64, ms ...*labels.Matcher) error
Snapshot(dir string, withHead bool) error Snapshot(dir string, withHead bool) error
Stats(statsByLabelName string) (*tsdb.Stats, error) Stats(statsByLabelName string) (*tsdb.Stats, error)
WALReplayStatus() (tsdb.WALReplayStatus, error) WALReplayStatus() (tsdb.WALReplayStatus, error)
} }