From 23ce9ad9f0ffce41db3d247e608bc89cce52a3ab Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Mon, 14 Mar 2022 18:50:07 +0530 Subject: [PATCH] Introduce evaluation delay for rule groups (#155) * Allow having evaluation delay for rule groups Signed-off-by: Ganesh Vernekar * Fix lint Signed-off-by: Ganesh Vernekar * Move the option to ManagerOptions Signed-off-by: Ganesh Vernekar * Include evaluation_delay in the group config Signed-off-by: Ganesh Vernekar * Fix comments Signed-off-by: Ganesh Vernekar --- cmd/promtool/rules.go | 3 +- discovery/openstack/hypervisor.go | 3 +- discovery/openstack/instance.go | 3 +- discovery/targetgroup/targetgroup_test.go | 28 +- .../examples/custom-sd/adapter/adapter.go | 1 - model/rulefmt/rulefmt.go | 11 +- promql/parser/lex_test.go | 2 - rules/alerting.go | 8 +- rules/alerting_test.go | 16 +- rules/manager.go | 62 ++- rules/manager_test.go | 501 ++++++++++-------- rules/recording.go | 4 +- rules/recording_test.go | 6 +- tsdb/wal/checkpoint_test.go | 2 +- util/runtime/statfs_linux_386.go | 1 - util/runtime/statfs_uint32.go | 1 - web/api/v1/api.go | 1 - 17 files changed, 361 insertions(+), 292 deletions(-) diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index 7afca02f1..5d0633074 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -95,7 +95,8 @@ func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) { // importRule queries a prometheus API to evaluate rules at times in the past. func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, - maxBlockDuration int64, grp *rules.Group) (err error) { + maxBlockDuration int64, grp *rules.Group, +) (err error) { blockDuration := getCompatibleBlockDuration(maxBlockDuration) startInMs := start.Unix() * int64(time.Second/time.Millisecond) endInMs := end.Unix() * int64(time.Second/time.Millisecond) diff --git a/discovery/openstack/hypervisor.go b/discovery/openstack/hypervisor.go index 877b3eb9b..94d47d360 100644 --- a/discovery/openstack/hypervisor.go +++ b/discovery/openstack/hypervisor.go @@ -50,7 +50,8 @@ type HypervisorDiscovery struct { // newHypervisorDiscovery returns a new hypervisor discovery. func newHypervisorDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions, - port int, region string, availability gophercloud.Availability, l log.Logger) *HypervisorDiscovery { + port int, region string, availability gophercloud.Availability, l log.Logger, +) *HypervisorDiscovery { return &HypervisorDiscovery{ provider: provider, authOpts: opts, region: region, port: port, availability: availability, logger: l, diff --git a/discovery/openstack/instance.go b/discovery/openstack/instance.go index fa4039bea..b4e67825a 100644 --- a/discovery/openstack/instance.go +++ b/discovery/openstack/instance.go @@ -59,7 +59,8 @@ type InstanceDiscovery struct { // NewInstanceDiscovery returns a new instance discovery. func newInstanceDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions, - port int, region string, allTenants bool, availability gophercloud.Availability, l log.Logger) *InstanceDiscovery { + port int, region string, allTenants bool, availability gophercloud.Availability, l log.Logger, +) *InstanceDiscovery { if l == nil { l = log.NewNopLogger() } diff --git a/discovery/targetgroup/targetgroup_test.go b/discovery/targetgroup/targetgroup_test.go index fe9587eb8..d2d2c928c 100644 --- a/discovery/targetgroup/targetgroup_test.go +++ b/discovery/targetgroup/targetgroup_test.go @@ -143,21 +143,19 @@ func TestTargetGroupYamlUnmarshal(t *testing.T) { func TestString(t *testing.T) { // String() should return only the source, regardless of other attributes. - group1 := - Group{ - Targets: []model.LabelSet{ - {"__address__": "localhost:9090"}, - {"__address__": "localhost:9091"}, - }, - Source: "", - Labels: model.LabelSet{"foo": "bar", "bar": "baz"}, - } - group2 := - Group{ - Targets: []model.LabelSet{}, - Source: "", - Labels: model.LabelSet{}, - } + group1 := Group{ + Targets: []model.LabelSet{ + {"__address__": "localhost:9090"}, + {"__address__": "localhost:9091"}, + }, + Source: "", + Labels: model.LabelSet{"foo": "bar", "bar": "baz"}, + } + group2 := Group{ + Targets: []model.LabelSet{}, + Source: "", + Labels: model.LabelSet{}, + } require.Equal(t, "", group1.String()) require.Equal(t, "", group2.String()) require.Equal(t, group1.String(), group2.String()) diff --git a/documentation/examples/custom-sd/adapter/adapter.go b/documentation/examples/custom-sd/adapter/adapter.go index 564a4e83b..abaad7dc7 100644 --- a/documentation/examples/custom-sd/adapter/adapter.go +++ b/documentation/examples/custom-sd/adapter/adapter.go @@ -84,7 +84,6 @@ func generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) map[s } sdGroup := customSD{ - Targets: newTargets, Labels: newLabels, } diff --git a/model/rulefmt/rulefmt.go b/model/rulefmt/rulefmt.go index a550af724..898a2e39d 100644 --- a/model/rulefmt/rulefmt.go +++ b/model/rulefmt/rulefmt.go @@ -116,11 +116,12 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) { // RuleGroup is a list of sequentially evaluated recording and alerting rules. type RuleGroup struct { - Name string `yaml:"name"` - Interval model.Duration `yaml:"interval,omitempty"` - Limit int `yaml:"limit,omitempty"` - Rules []RuleNode `yaml:"rules"` - SourceTenants []string `yaml:"source_tenants,omitempty"` + Name string `yaml:"name"` + Interval model.Duration `yaml:"interval,omitempty"` + EvaluationDelay *model.Duration `yaml:"evaluation_delay,omitempty"` + Limit int `yaml:"limit,omitempty"` + Rules []RuleNode `yaml:"rules"` + SourceTenants []string `yaml:"source_tenants,omitempty"` } // Rule describes an alerting or recording rule. diff --git a/promql/parser/lex_test.go b/promql/parser/lex_test.go index 8e22f41d9..25c6abfb6 100644 --- a/promql/parser/lex_test.go +++ b/promql/parser/lex_test.go @@ -611,7 +611,6 @@ var tests = []struct { { // Nested Subquery. input: `min_over_time(rate(foo{bar="baz"}[2s])[5m:])[4m:3s]`, expected: []Item{ - {IDENTIFIER, 0, `min_over_time`}, {LEFT_PAREN, 13, `(`}, {IDENTIFIER, 14, `rate`}, @@ -660,7 +659,6 @@ var tests = []struct { { input: `min_over_time(rate(foo{bar="baz"}[2s])[5m:] offset 6m)[4m:3s]`, expected: []Item{ - {IDENTIFIER, 0, `min_over_time`}, {LEFT_PAREN, 13, `(`}, {IDENTIFIER, 14, `rate`}, diff --git a/rules/alerting.go b/rules/alerting.go index 929f7586d..0a3c542e2 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -304,8 +304,8 @@ const resolvedRetention = 15 * time.Minute // Eval evaluates the rule expression and then creates pending alerts and fires // or removes previously pending alerts accordingly. -func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) { - res, err := query(ctx, r.vector.String(), ts) +func (r *AlertingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) { + res, err := query(ctx, r.vector.String(), ts.Add(-evalDelay)) if err != nil { return nil, err } @@ -419,8 +419,8 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, } if r.restored { - vec = append(vec, r.sample(a, ts)) - vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix()))) + vec = append(vec, r.sample(a, ts.Add(-evalDelay))) + vec = append(vec, r.forStateSample(a, ts.Add(-evalDelay), float64(a.ActiveAt.Unix()))) } } diff --git a/rules/alerting_test.go b/rules/alerting_test.go index 31139298d..922784ea1 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -170,7 +170,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) { t.Logf("case %d", i) evalTime := baseTime.Add(time.Duration(i) * time.Minute) result[0].Point.T = timestamp.FromTime(evalTime) - res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0) + res, err := rule.Eval(suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -252,7 +252,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalLabels.Eval( - suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, + suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -266,7 +266,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { } res, err = ruleWithExternalLabels.Eval( - suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, + suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -346,7 +346,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalURL.Eval( - suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, + suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -360,7 +360,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { } res, err = ruleWithExternalURL.Eval( - suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, + suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -417,7 +417,7 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) { var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := rule.Eval( - suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, + suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -460,7 +460,7 @@ func TestAlertingRuleDuplicate(t *testing.T) { "", true, log.NewNopLogger(), ) - _, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0) + _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0) require.Error(t, err) require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels") } @@ -510,7 +510,7 @@ func TestAlertingRuleLimit(t *testing.T) { evalTime := time.Unix(0, 0) for _, test := range tests { - _, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, test.limit) + _, err := rule.Eval(suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, test.limit) if err != nil { require.EqualError(t, err, test.err) } else if test.err != "" { diff --git a/rules/manager.go b/rules/manager.go index 15d26b14e..9c02275fb 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -212,8 +212,9 @@ type Rule interface { Name() string // Labels of the rule. Labels() labels.Labels - // eval evaluates the rule, including any associated recording or alerting actions. - Eval(context.Context, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error) + // Eval evaluates the rule, including any associated recording or alerting actions. + // The duration passed is the evaluation delay. + Eval(context.Context, time.Duration, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error) // String returns a human-readable string representation of the rule. String() string // Query returns the rule query expression. @@ -244,6 +245,7 @@ type Group struct { name string file string interval time.Duration + evaluationDelay *time.Duration limit int rules []Rule sourceTenants []string @@ -267,14 +269,15 @@ type Group struct { } type GroupOptions struct { - Name, File string - Interval time.Duration - Limit int - Rules []Rule - SourceTenants []string - ShouldRestore bool - Opts *ManagerOptions - done chan struct{} + Name, File string + Interval time.Duration + Limit int + Rules []Rule + SourceTenants []string + ShouldRestore bool + Opts *ManagerOptions + EvaluationDelay *time.Duration + done chan struct{} } // NewGroup makes a new Group with the given name, options, and rules. @@ -299,6 +302,7 @@ func NewGroup(o GroupOptions) *Group { name: o.Name, file: o.File, interval: o.Interval, + evaluationDelay: o.EvaluationDelay, limit: o.Limit, rules: o.Rules, shouldRestore: o.ShouldRestore, @@ -583,6 +587,7 @@ func (g *Group) CopyState(from *Group) { // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. func (g *Group) Eval(ctx context.Context, ts time.Time) { var samplesTotal float64 + evaluationDelay := g.EvaluationDelay() for i, rule := range g.rules { select { case <-g.done: @@ -604,7 +609,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() - vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) + vector, err := rule.Eval(ctx, evaluationDelay, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) if err != nil { rule.SetHealth(HealthBad) rule.SetLastError(err) @@ -673,7 +678,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { for metric, lset := range g.seriesInPreviousEval[i] { if _, ok := seriesReturned[metric]; !ok { // Series no longer exposed, mark it stale. - _, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) + _, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-evaluationDelay)), math.Float64frombits(value.StaleNaN)) switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: @@ -692,14 +697,25 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { g.cleanupStaleSeries(ctx, ts) } +func (g *Group) EvaluationDelay() time.Duration { + if g.evaluationDelay != nil { + return *g.evaluationDelay + } + if g.opts.DefaultEvaluationDelay != nil { + return g.opts.DefaultEvaluationDelay() + } + return time.Duration(0) +} + func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) { if len(g.staleSeries) == 0 { return } app := g.opts.Appendable.Appender(ctx) + evaluationDelay := g.EvaluationDelay() for _, s := range g.staleSeries { // Rule that produced series no longer configured, mark it stale. - _, err := app.Append(0, s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) + _, err := app.Append(0, s, timestamp.FromTime(ts.Add(-evaluationDelay)), math.Float64frombits(value.StaleNaN)) switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: @@ -935,6 +951,7 @@ type ManagerOptions struct { ForGracePeriod time.Duration ResendDelay time.Duration GroupLoader GroupLoader + DefaultEvaluationDelay func() time.Duration Metrics *Metrics } @@ -1131,15 +1148,16 @@ func (m *Manager) LoadGroups( } groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ - Name: rg.Name, - File: fn, - Interval: itv, - Limit: rg.Limit, - Rules: rules, - SourceTenants: rg.SourceTenants, - ShouldRestore: shouldRestore, - Opts: m.opts, - done: m.done, + Name: rg.Name, + File: fn, + Interval: itv, + Limit: rg.Limit, + Rules: rules, + SourceTenants: rg.SourceTenants, + ShouldRestore: shouldRestore, + Opts: m.opts, + EvaluationDelay: (*time.Duration)(rg.EvaluationDelay), + done: m.done, }) } } diff --git a/rules/manager_test.go b/rules/manager_test.go index 1d5e14ed6..f2901a847 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -16,9 +16,11 @@ package rules import ( "context" "fmt" + "io/fs" "io/ioutil" "math" "os" + "path" "sort" "testing" "time" @@ -158,7 +160,7 @@ func TestAlertingRule(t *testing.T) { evalTime := baseTime.Add(test.time) - res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0) + res, err := rule.Eval(suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -188,156 +190,160 @@ func TestAlertingRule(t *testing.T) { } func TestForStateAddSamples(t *testing.T) { - suite, err := promql.NewTest(t, ` + for _, evalDelay := range []time.Duration{0, time.Minute} { + t.Run(fmt.Sprintf("evalDelay %s", evalDelay.String()), func(t *testing.T) { + suite, err := promql.NewTest(t, ` load 5m http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85 http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 80 90 100 110 120 130 140 `) - require.NoError(t, err) - defer suite.Close() + require.NoError(t, err) + defer suite.Close() - err = suite.Run() - require.NoError(t, err) + err = suite.Run() + require.NoError(t, err) - expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) - require.NoError(t, err) + expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) + require.NoError(t, err) - rule := NewAlertingRule( - "HTTPRequestRateLow", - expr, - time.Minute, - labels.FromStrings("severity", "{{\"c\"}}ritical"), - nil, nil, "", true, nil, - ) - result := promql.Vector{ - { - Metric: labels.FromStrings( - "__name__", "ALERTS_FOR_STATE", - "alertname", "HTTPRequestRateLow", - "group", "canary", - "instance", "0", - "job", "app-server", - "severity", "critical", - ), - Point: promql.Point{V: 1}, - }, - { - Metric: labels.FromStrings( - "__name__", "ALERTS_FOR_STATE", - "alertname", "HTTPRequestRateLow", - "group", "canary", - "instance", "1", - "job", "app-server", - "severity", "critical", - ), - Point: promql.Point{V: 1}, - }, - { - Metric: labels.FromStrings( - "__name__", "ALERTS_FOR_STATE", - "alertname", "HTTPRequestRateLow", - "group", "canary", - "instance", "0", - "job", "app-server", - "severity", "critical", - ), - Point: promql.Point{V: 1}, - }, - { - Metric: labels.FromStrings( - "__name__", "ALERTS_FOR_STATE", - "alertname", "HTTPRequestRateLow", - "group", "canary", - "instance", "1", - "job", "app-server", - "severity", "critical", - ), - Point: promql.Point{V: 1}, - }, - } - - baseTime := time.Unix(0, 0) - - tests := []struct { - time time.Duration - result promql.Vector - persistThisTime bool // If true, it means this 'time' is persisted for 'for'. - }{ - { - time: 0, - result: append(promql.Vector{}, result[:2]...), - persistThisTime: true, - }, - { - time: 5 * time.Minute, - result: append(promql.Vector{}, result[2:]...), - }, - { - time: 10 * time.Minute, - result: append(promql.Vector{}, result[2:3]...), - }, - { - time: 15 * time.Minute, - result: nil, - }, - { - time: 20 * time.Minute, - result: nil, - }, - { - time: 25 * time.Minute, - result: append(promql.Vector{}, result[:1]...), - persistThisTime: true, - }, - { - time: 30 * time.Minute, - result: append(promql.Vector{}, result[2:3]...), - }, - } - - var forState float64 - for i, test := range tests { - t.Logf("case %d", i) - evalTime := baseTime.Add(test.time) - - if test.persistThisTime { - forState = float64(evalTime.Unix()) - } - if test.result == nil { - forState = float64(value.StaleNaN) - } - - res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0) - require.NoError(t, err) - - var filteredRes promql.Vector // After removing 'ALERTS' samples. - for _, smpl := range res { - smplName := smpl.Metric.Get("__name__") - if smplName == "ALERTS_FOR_STATE" { - filteredRes = append(filteredRes, smpl) - } else { - // If not 'ALERTS_FOR_STATE', it has to be 'ALERTS'. - require.Equal(t, smplName, "ALERTS") + rule := NewAlertingRule( + "HTTPRequestRateLow", + expr, + time.Minute, + labels.FromStrings("severity", "{{\"c\"}}ritical"), + nil, nil, "", true, nil, + ) + result := promql.Vector{ + { + Metric: labels.FromStrings( + "__name__", "ALERTS_FOR_STATE", + "alertname", "HTTPRequestRateLow", + "group", "canary", + "instance", "0", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, + { + Metric: labels.FromStrings( + "__name__", "ALERTS_FOR_STATE", + "alertname", "HTTPRequestRateLow", + "group", "canary", + "instance", "1", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, + { + Metric: labels.FromStrings( + "__name__", "ALERTS_FOR_STATE", + "alertname", "HTTPRequestRateLow", + "group", "canary", + "instance", "0", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, + { + Metric: labels.FromStrings( + "__name__", "ALERTS_FOR_STATE", + "alertname", "HTTPRequestRateLow", + "group", "canary", + "instance", "1", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, } - } - for i := range test.result { - test.result[i].T = timestamp.FromTime(evalTime) - // Updating the expected 'for' state. - if test.result[i].V >= 0 { - test.result[i].V = forState - } - } - require.Equal(t, len(test.result), len(filteredRes), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res)) - sort.Slice(filteredRes, func(i, j int) bool { - return labels.Compare(filteredRes[i].Metric, filteredRes[j].Metric) < 0 + baseTime := time.Unix(0, 0) + + tests := []struct { + time time.Duration + result promql.Vector + persistThisTime bool // If true, it means this 'time' is persisted for 'for'. + }{ + { + time: 0, + result: append(promql.Vector{}, result[:2]...), + persistThisTime: true, + }, + { + time: 5 * time.Minute, + result: append(promql.Vector{}, result[2:]...), + }, + { + time: 10 * time.Minute, + result: append(promql.Vector{}, result[2:3]...), + }, + { + time: 15 * time.Minute, + result: nil, + }, + { + time: 20 * time.Minute, + result: nil, + }, + { + time: 25 * time.Minute, + result: append(promql.Vector{}, result[:1]...), + persistThisTime: true, + }, + { + time: 30 * time.Minute, + result: append(promql.Vector{}, result[2:3]...), + }, + } + + var forState float64 + for i, test := range tests { + t.Logf("case %d", i) + evalTime := baseTime.Add(test.time).Add(evalDelay) + + if test.persistThisTime { + forState = float64(evalTime.Unix()) + } + if test.result == nil { + forState = float64(value.StaleNaN) + } + + res, err := rule.Eval(suite.Context(), evalDelay, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0) + require.NoError(t, err) + + var filteredRes promql.Vector // After removing 'ALERTS' samples. + for _, smpl := range res { + smplName := smpl.Metric.Get("__name__") + if smplName == "ALERTS_FOR_STATE" { + filteredRes = append(filteredRes, smpl) + } else { + // If not 'ALERTS_FOR_STATE', it has to be 'ALERTS'. + require.Equal(t, smplName, "ALERTS") + } + } + for i := range test.result { + test.result[i].T = timestamp.FromTime(evalTime.Add(-evalDelay)) + // Updating the expected 'for' state. + if test.result[i].V >= 0 { + test.result[i].V = forState + } + } + require.Equal(t, len(test.result), len(filteredRes), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res)) + + sort.Slice(filteredRes, func(i, j int) bool { + return labels.Compare(filteredRes[i].Metric, filteredRes[j].Metric) < 0 + }) + require.Equal(t, test.result, filteredRes) + + for _, aa := range rule.ActiveAlerts() { + require.Zero(t, aa.Labels.Get(model.MetricNameLabel), "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels) + } + + } }) - require.Equal(t, test.result, filteredRes) - - for _, aa := range rule.ActiveAlerts() { - require.Zero(t, aa.Labels.Get(model.MetricNameLabel), "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels) - } - } } @@ -441,7 +447,7 @@ func TestForStateRestore(t *testing.T) { }, } - testFunc := func(tst testInput) { + testFunc := func(tst testInput, evalDelay time.Duration) { newRule := NewAlertingRule( "HTTPRequestRateLow", expr, @@ -450,17 +456,18 @@ func TestForStateRestore(t *testing.T) { nil, nil, "", false, nil, ) newGroup := NewGroup(GroupOptions{ - Name: "default", - Interval: time.Second, - Rules: []Rule{newRule}, - ShouldRestore: true, - Opts: opts, + Name: "default", + Interval: time.Second, + Rules: []Rule{newRule}, + ShouldRestore: true, + Opts: opts, + EvaluationDelay: &evalDelay, }) newGroups := make(map[string]*Group) newGroups["default;"] = newGroup - restoreTime := baseTime.Add(tst.restoreDuration) + restoreTime := baseTime.Add(tst.restoreDuration).Add(evalDelay) // First eval before restoration. newGroup.Eval(suite.Context(), restoreTime) // Restore happens here. @@ -495,14 +502,16 @@ func TestForStateRestore(t *testing.T) { // Difference in time should be within 1e6 ns, i.e. 1ms // (due to conversion between ns & ms, float64 & int64). - activeAtDiff := float64(e.ActiveAt.Unix() + int64(tst.downDuration/time.Second) - got[i].ActiveAt.Unix()) + activeAtDiff := evalDelay.Seconds() + float64(e.ActiveAt.Unix()+int64(tst.downDuration/time.Second)-got[i].ActiveAt.Unix()) require.Equal(t, 0.0, math.Abs(activeAtDiff), "'for' state restored time is wrong") } } } - for _, tst := range tests { - testFunc(tst) + for _, evalDelay := range []time.Duration{0, time.Minute} { + for _, tst := range tests { + testFunc(tst, evalDelay) + } } // Testing the grace period. @@ -510,82 +519,88 @@ func TestForStateRestore(t *testing.T) { evalTime := baseTime.Add(duration) group.Eval(suite.Context(), evalTime) } - testFunc(testInput{ - restoreDuration: 25 * time.Minute, - alerts: []*Alert{}, - gracePeriod: true, - num: 2, - }) + + for _, evalDelay := range []time.Duration{0, time.Minute} { + testFunc(testInput{ + restoreDuration: 25 * time.Minute, + alerts: []*Alert{}, + gracePeriod: true, + num: 2, + }, evalDelay) + } } func TestStaleness(t *testing.T) { - st := teststorage.New(t) - defer st.Close() - engineOpts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10, - Timeout: 10 * time.Second, + for _, evalDelay := range []time.Duration{0, time.Minute} { + st := teststorage.New(t) + defer st.Close() + engineOpts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := promql.NewEngine(engineOpts) + opts := &ManagerOptions{ + QueryFunc: EngineQueryFunc(engine, st), + Appendable: st, + Queryable: st, + Context: context.Background(), + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("a + 1") + require.NoError(t, err) + rule := NewRecordingRule("a_plus_one", expr, labels.Labels{}) + group := NewGroup(GroupOptions{ + Name: "default", + Interval: time.Second, + Rules: []Rule{rule}, + ShouldRestore: true, + Opts: opts, + EvaluationDelay: &evalDelay, + }) + + // A time series that has two samples and then goes stale. + app := st.Appender(context.Background()) + app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 0, 1) + app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) + app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN)) + + err = app.Commit() + require.NoError(t, err) + + ctx := context.Background() + + // Execute 3 times, 1 second apart. + group.Eval(ctx, time.Unix(0, 0).Add(evalDelay)) + group.Eval(ctx, time.Unix(1, 0).Add(evalDelay)) + group.Eval(ctx, time.Unix(2, 0).Add(evalDelay)) + + querier, err := st.Querier(context.Background(), 0, 2000) + require.NoError(t, err) + defer querier.Close() + + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") + require.NoError(t, err) + + set := querier.Select(false, nil, matcher) + samples, err := readSeriesSet(set) + require.NoError(t, err) + + metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String() + metricSample, ok := samples[metric] + + require.True(t, ok, "Series %s not returned.", metric) + require.True(t, value.IsStaleNaN(metricSample[2].V), "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(metricSample[2].V)) + metricSample[2].V = 42 // require.Equal cannot handle NaN. + + want := map[string][]promql.Point{ + metric: {{T: 0, V: 2}, {T: 1000, V: 3}, {T: 2000, V: 42}}, + } + + require.Equal(t, want, samples) } - engine := promql.NewEngine(engineOpts) - opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(engine, st), - Appendable: st, - Queryable: st, - Context: context.Background(), - Logger: log.NewNopLogger(), - } - - expr, err := parser.ParseExpr("a + 1") - require.NoError(t, err) - rule := NewRecordingRule("a_plus_one", expr, labels.Labels{}) - group := NewGroup(GroupOptions{ - Name: "default", - Interval: time.Second, - Rules: []Rule{rule}, - ShouldRestore: true, - Opts: opts, - }) - - // A time series that has two samples and then goes stale. - app := st.Appender(context.Background()) - app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 0, 1) - app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) - app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN)) - - err = app.Commit() - require.NoError(t, err) - - ctx := context.Background() - - // Execute 3 times, 1 second apart. - group.Eval(ctx, time.Unix(0, 0)) - group.Eval(ctx, time.Unix(1, 0)) - group.Eval(ctx, time.Unix(2, 0)) - - querier, err := st.Querier(context.Background(), 0, 2000) - require.NoError(t, err) - defer querier.Close() - - matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") - require.NoError(t, err) - - set := querier.Select(false, nil, matcher) - samples, err := readSeriesSet(set) - require.NoError(t, err) - - metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String() - metricSample, ok := samples[metric] - - require.True(t, ok, "Series %s not returned.", metric) - require.True(t, value.IsStaleNaN(metricSample[2].V), "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(metricSample[2].V)) - metricSample[2].V = 42 // require.Equal cannot handle NaN. - - want := map[string][]promql.Point{ - metric: {{T: 0, V: 2}, {T: 1000, V: 3}, {T: 2000, V: 42}}, - } - - require.Equal(t, want, samples) } // Convert a SeriesSet into a form usable with require.Equal. @@ -1512,3 +1527,43 @@ func TestGroup_Equals(t *testing.T) { }) } } + +func TestGroup_EvaluationDelay(t *testing.T) { + config := ` +groups: + - name: group1 + evaluation_delay: 2m + - name: group2 + evaluation_delay: 0s + - name: group3 +` + + dir := t.TempDir() + fname := path.Join(dir, "rules.yaml") + err := ioutil.WriteFile(fname, []byte(config), fs.ModePerm) + require.NoError(t, err) + + m := NewManager(&ManagerOptions{ + Logger: log.NewNopLogger(), + DefaultEvaluationDelay: func() time.Duration { + return time.Minute + }, + }) + m.start() + err = m.Update(time.Second, []string{fname}, nil, "") + require.NoError(t, err) + + rgs := m.RuleGroups() + sort.Slice(rgs, func(i, j int) bool { + return rgs[i].Name() < rgs[j].Name() + }) + + // From config. + require.Equal(t, 2*time.Minute, rgs[0].EvaluationDelay()) + // Setting 0 in config is detected. + require.Equal(t, time.Duration(0), rgs[1].EvaluationDelay()) + // Default when nothing is set. + require.Equal(t, time.Minute, rgs[2].EvaluationDelay()) + + m.Stop() +} diff --git a/rules/recording.go b/rules/recording.go index 0681db9a2..13074158e 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -73,8 +73,8 @@ func (rule *RecordingRule) Labels() labels.Labels { } // Eval evaluates the rule and then overrides the metric names and labels accordingly. -func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) { - vector, err := query(ctx, rule.vector.String(), ts) +func (rule *RecordingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) { + vector, err := query(ctx, rule.vector.String(), ts.Add(-evalDelay)) if err != nil { return nil, err } diff --git a/rules/recording_test.go b/rules/recording_test.go index dd06b775f..4a10e7197 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -74,7 +74,7 @@ func TestRuleEval(t *testing.T) { for _, test := range suite { rule := NewRecordingRule(test.name, test.expr, test.labels) - result, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0) + result, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0) if test.err == "" { require.NoError(t, err) } else { @@ -119,7 +119,7 @@ func TestRuleEvalDuplicate(t *testing.T) { expr, _ := parser.ParseExpr(`vector(0) or label_replace(vector(0),"test","x","","")`) rule := NewRecordingRule("foo", expr, labels.FromStrings("test", "test")) - _, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0) + _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0) require.Error(t, err) require.EqualError(t, err, "vector contains metrics with the same labelset after applying rule labels") } @@ -164,7 +164,7 @@ func TestRecordingRuleLimit(t *testing.T) { evalTime := time.Unix(0, 0) for _, test := range tests { - _, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, test.limit) + _, err := rule.Eval(suite.Context(), 0, evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, test.limit) if err != nil { require.EqualError(t, err, test.err) } else if test.err != "" { diff --git a/tsdb/wal/checkpoint_test.go b/tsdb/wal/checkpoint_test.go index 554a4b5d2..3f8ca25e0 100644 --- a/tsdb/wal/checkpoint_test.go +++ b/tsdb/wal/checkpoint_test.go @@ -252,7 +252,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { // Walk the wal dir to make sure there are no tmp folder left behind after the error. err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error { if err != nil { - return errors.Wrapf(err, "access err %q: %v\n", path, err) + return errors.Wrapf(err, "access err %q: %v", path, err) } if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") { return fmt.Errorf("wal dir contains temporary folder:%s", info.Name()) diff --git a/util/runtime/statfs_linux_386.go b/util/runtime/statfs_linux_386.go index b45eecdd3..7494a0adf 100644 --- a/util/runtime/statfs_linux_386.go +++ b/util/runtime/statfs_linux_386.go @@ -23,7 +23,6 @@ import ( // Statfs returns the file system type (Unix only) func Statfs(path string) string { - // Types of file systems that may be returned by `statfs` fsTypes := map[int32]string{ 0xadf5: "ADFS_SUPER_MAGIC", diff --git a/util/runtime/statfs_uint32.go b/util/runtime/statfs_uint32.go index fa10ebc96..72d670a39 100644 --- a/util/runtime/statfs_uint32.go +++ b/util/runtime/statfs_uint32.go @@ -23,7 +23,6 @@ import ( // Statfs returns the file system type (Unix only) func Statfs(path string) string { - // Types of file systems that may be returned by `statfs` fsTypes := map[uint32]string{ 0xadf5: "ADFS_SUPER_MAGIC", diff --git a/web/api/v1/api.go b/web/api/v1/api.go index e6a6daffc..e31acab9c 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -150,7 +150,6 @@ type TSDBAdminStats interface { CleanTombstones() error Delete(mint, maxt int64, ms ...*labels.Matcher) error Snapshot(dir string, withHead bool) error - Stats(statsByLabelName string) (*tsdb.Stats, error) WALReplayStatus() (tsdb.WALReplayStatus, error) }