From 806e71e828af0dc0cfe3c9947fd30540720234af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 19 Jan 2023 14:51:26 +0100 Subject: [PATCH] Option to align rule group's evaluation time to interval (#400) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Allow rule groups evaluation timestamp to be aligned on the evaluation interval. Signed-off-by: Peter Štibraný --- model/rulefmt/rulefmt.go | 13 ++-- rules/fixtures/rules_with_alignment.yaml | 27 +++++++ rules/manager.go | 94 +++++++++++++----------- rules/manager_test.go | 87 +++++++++++++++++++--- 4 files changed, 163 insertions(+), 58 deletions(-) create mode 100644 rules/fixtures/rules_with_alignment.yaml diff --git a/model/rulefmt/rulefmt.go b/model/rulefmt/rulefmt.go index 7c6e7b653..9016ed46c 100644 --- a/model/rulefmt/rulefmt.go +++ b/model/rulefmt/rulefmt.go @@ -135,12 +135,13 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) { // RuleGroup is a list of sequentially evaluated recording and alerting rules. type RuleGroup struct { - Name string `yaml:"name"` - Interval model.Duration `yaml:"interval,omitempty"` - EvaluationDelay *model.Duration `yaml:"evaluation_delay,omitempty"` - Limit int `yaml:"limit,omitempty"` - Rules []RuleNode `yaml:"rules"` - SourceTenants []string `yaml:"source_tenants,omitempty"` + Name string `yaml:"name"` + Interval model.Duration `yaml:"interval,omitempty"` + EvaluationDelay *model.Duration `yaml:"evaluation_delay,omitempty"` + Limit int `yaml:"limit,omitempty"` + Rules []RuleNode `yaml:"rules"` + SourceTenants []string `yaml:"source_tenants,omitempty"` + AlignExecutionTimeOnInterval bool `yaml:"align_execution_time_on_interval,omitempty"` } // Rule describes an alerting or recording rule. diff --git a/rules/fixtures/rules_with_alignment.yaml b/rules/fixtures/rules_with_alignment.yaml new file mode 100644 index 000000000..06a172ad4 --- /dev/null +++ b/rules/fixtures/rules_with_alignment.yaml @@ -0,0 +1,27 @@ +groups: + - name: aligned + align_execution_time_on_interval: true + interval: 5m + rules: + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + + - name: aligned_with_crazy_interval + align_execution_time_on_interval: true + interval: 1m27s + rules: + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + + - name: unaligned_default + interval: 5m + rules: + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + + - name: unaligned_explicit + interval: 5m + align_execution_time_on_interval: false + rules: + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) diff --git a/rules/manager.go b/rules/manager.go index 4033f22f7..747ebb191 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -270,7 +270,8 @@ type Group struct { metrics *Metrics - ruleGroupPostProcessFunc RuleGroupPostProcessFunc + ruleGroupPostProcessFunc RuleGroupPostProcessFunc + alignExecutionTimeOnInterval bool } // This function will be used before each rule group evaluation if not nil. @@ -278,16 +279,17 @@ type Group struct { type RuleGroupPostProcessFunc func(g *Group, lastEvalTimestamp time.Time, log log.Logger) error type GroupOptions struct { - Name, File string - Interval time.Duration - Limit int - Rules []Rule - SourceTenants []string - ShouldRestore bool - Opts *ManagerOptions - EvaluationDelay *time.Duration - done chan struct{} - RuleGroupPostProcessFunc RuleGroupPostProcessFunc + Name, File string + Interval time.Duration + Limit int + Rules []Rule + SourceTenants []string + ShouldRestore bool + Opts *ManagerOptions + EvaluationDelay *time.Duration + done chan struct{} + RuleGroupPostProcessFunc RuleGroupPostProcessFunc + AlignExecutionTimeOnInterval bool } // NewGroup makes a new Group with the given name, options, and rules. @@ -309,22 +311,23 @@ func NewGroup(o GroupOptions) *Group { metrics.GroupInterval.WithLabelValues(key).Set(o.Interval.Seconds()) return &Group{ - name: o.Name, - file: o.File, - interval: o.Interval, - evaluationDelay: o.EvaluationDelay, - limit: o.Limit, - rules: o.Rules, - shouldRestore: o.ShouldRestore, - opts: o.Opts, - sourceTenants: o.SourceTenants, - seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), - done: make(chan struct{}), - managerDone: o.done, - terminated: make(chan struct{}), - logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), - metrics: metrics, - ruleGroupPostProcessFunc: o.RuleGroupPostProcessFunc, + name: o.Name, + file: o.File, + interval: o.Interval, + evaluationDelay: o.EvaluationDelay, + limit: o.Limit, + rules: o.Rules, + shouldRestore: o.ShouldRestore, + opts: o.Opts, + sourceTenants: o.SourceTenants, + seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), + done: make(chan struct{}), + managerDone: o.done, + terminated: make(chan struct{}), + logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), + metrics: metrics, + ruleGroupPostProcessFunc: o.RuleGroupPostProcessFunc, + alignExecutionTimeOnInterval: o.AlignExecutionTimeOnInterval, } } @@ -547,11 +550,13 @@ func (g *Group) setLastEvaluation(ts time.Time) { // EvalTimestamp returns the immediately preceding consistently slotted evaluation time. func (g *Group) EvalTimestamp(startTime int64) time.Time { - var ( + var offset int64 + if !g.alignExecutionTimeOnInterval { offset = int64(g.hash() % uint64(g.interval)) - adjNow = startTime - offset - base = adjNow - (adjNow % int64(g.interval)) - ) + } + + adjNow := startTime - offset + base := adjNow - (adjNow % int64(g.interval)) return time.Unix(0, base+offset).UTC() } @@ -928,6 +933,10 @@ func (g *Group) Equals(ng *Group) bool { return false } + if g.alignExecutionTimeOnInterval != ng.alignExecutionTimeOnInterval { + return false + } + for i, gr := range g.rules { if gr.String() != ng.rules[i].String() { return false @@ -1196,17 +1205,18 @@ func (m *Manager) LoadGroups( } groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ - Name: rg.Name, - File: fn, - Interval: itv, - Limit: rg.Limit, - Rules: rules, - SourceTenants: rg.SourceTenants, - ShouldRestore: shouldRestore, - Opts: m.opts, - EvaluationDelay: (*time.Duration)(rg.EvaluationDelay), - done: m.done, - RuleGroupPostProcessFunc: ruleGroupPostProcessFunc, + Name: rg.Name, + File: fn, + Interval: itv, + Limit: rg.Limit, + Rules: rules, + SourceTenants: rg.SourceTenants, + ShouldRestore: shouldRestore, + Opts: m.opts, + EvaluationDelay: (*time.Duration)(rg.EvaluationDelay), + done: m.done, + RuleGroupPostProcessFunc: ruleGroupPostProcessFunc, + AlignExecutionTimeOnInterval: rg.AlignExecutionTimeOnInterval, }) } } diff --git a/rules/manager_test.go b/rules/manager_test.go index 0816db5a9..a0bb96a4e 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -919,6 +919,71 @@ func TestUpdateSetsSourceTenants(t *testing.T) { } } +func TestAlignEvaluationTimeOnInterval(t *testing.T) { + st := teststorage.New(t) + defer st.Close() + + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := promql.NewEngine(opts) + ruleManager := NewManager(&ManagerOptions{ + Appendable: st, + Queryable: st, + QueryFunc: EngineQueryFunc(engine, st), + Context: context.Background(), + Logger: log.NewNopLogger(), + }) + ruleManager.start() + defer ruleManager.Stop() + + rgs, errs := rulefmt.ParseFile("fixtures/rules_with_alignment.yaml") + require.Empty(t, errs, "file parsing failures") + + tmpFile, err := os.CreateTemp("", "rules.test.*.yaml") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + defer tmpFile.Close() + + reloadRules(rgs, t, tmpFile, ruleManager, 0) + + // Verify that all groups are loaded, and let's check their evaluation times. + loadedGroups := ruleManager.RuleGroups() + require.Len(t, loadedGroups, len(rgs.Groups)) + + assertGroupEvalTimeAlignedOnIntervalIsHonored := func(groupName string, expectedAligned bool) { + g := (*Group)(nil) + for _, lg := range loadedGroups { + if lg.name == groupName { + g = lg + break + } + } + require.NotNil(t, g, "group not found: %s", groupName) + + // When "g.hash() % g.interval == 0" alignment cannot be checked, because aligned and unaligned eval timestamps + // would be the same. This can happen because g.hash() depends on path passed to ruleManager.Update function, + // and this test uses temporary directory for storing rule group files. + if g.hash()%uint64(g.interval) == 0 { + t.Skip("skipping test, because rule group hash is divisible by interval, which makes eval timestamp always aligned to the interval") + } + + now := time.Now() + ts := g.EvalTimestamp(now.UnixNano()) + + aligned := ts.UnixNano()%g.interval.Nanoseconds() == 0 + require.Equal(t, expectedAligned, aligned, "group: %s, hash: %d, now: %d", groupName, g.hash(), now.UnixNano()) + } + + assertGroupEvalTimeAlignedOnIntervalIsHonored("aligned", true) + assertGroupEvalTimeAlignedOnIntervalIsHonored("aligned_with_crazy_interval", true) + assertGroupEvalTimeAlignedOnIntervalIsHonored("unaligned_default", false) + assertGroupEvalTimeAlignedOnIntervalIsHonored("unaligned_explicit", false) +} + func TestGroupEvaluationContextFuncIsCalledWhenSupplied(t *testing.T) { type testContextKeyType string var testContextKey testContextKeyType = "TestGroupEvaluationContextFuncIsCalledWhenSupplied" @@ -975,11 +1040,12 @@ type ruleGroupsTest struct { // ruleGroupTest forms a testing struct for running tests over rules. type ruleGroupTest struct { - Name string `yaml:"name"` - Interval model.Duration `yaml:"interval,omitempty"` - Limit int `yaml:"limit,omitempty"` - Rules []rulefmt.Rule `yaml:"rules"` - SourceTenants []string `yaml:"source_tenants,omitempty"` + Name string `yaml:"name"` + Interval model.Duration `yaml:"interval,omitempty"` + Limit int `yaml:"limit,omitempty"` + Rules []rulefmt.Rule `yaml:"rules"` + SourceTenants []string `yaml:"source_tenants,omitempty"` + AlignExecutionTimeOnInterval bool `yaml:"align_execution_time_on_interval,omitempty"` } func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { @@ -998,11 +1064,12 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { }) } tmp = append(tmp, ruleGroupTest{ - Name: g.Name, - Interval: g.Interval, - Limit: g.Limit, - Rules: rtmp, - SourceTenants: g.SourceTenants, + Name: g.Name, + Interval: g.Interval, + Limit: g.Limit, + Rules: rtmp, + SourceTenants: g.SourceTenants, + AlignExecutionTimeOnInterval: g.AlignExecutionTimeOnInterval, }) } return ruleGroupsTest{