diff --git a/rules/fixtures/alert_rule.yaml b/rules/fixtures/alert_rule.yaml new file mode 100644 index 000000000..0b6d69daf --- /dev/null +++ b/rules/fixtures/alert_rule.yaml @@ -0,0 +1,6 @@ +groups: + - name: test + interval: 1s + rules: + - alert: rule1 + expr: 1 < bool 2 diff --git a/rules/fixtures/alert_rule1.yaml b/rules/fixtures/alert_rule1.yaml new file mode 100644 index 000000000..306ff41b6 --- /dev/null +++ b/rules/fixtures/alert_rule1.yaml @@ -0,0 +1,6 @@ +groups: + - name: test2 + interval: 1s + rules: + - alert: rule2 + expr: 1 < bool 2 diff --git a/rules/manager.go b/rules/manager.go index 6e9bf6469..9bdb66f1b 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -89,12 +89,13 @@ func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time. // The Manager manages recording and alerting rules. type Manager struct { - opts *ManagerOptions - groups map[string]*Group - mtx sync.RWMutex - block chan struct{} - done chan struct{} - restored bool + opts *ManagerOptions + groups map[string]*Group + mtx sync.RWMutex + block chan struct{} + done chan struct{} + restored bool + restoreNewRuleGroups bool logger *slog.Logger } @@ -121,6 +122,10 @@ type ManagerOptions struct { ConcurrentEvalsEnabled bool RuleConcurrencyController RuleConcurrencyController RuleDependencyController RuleDependencyController + // At present, manager only restores `for` state when manager is newly created which happens + // during restarts. This flag provides an option to restore the `for` state when new rule groups are + // added to an existing manager + RestoreNewRuleGroups bool Metrics *Metrics } @@ -153,11 +158,12 @@ func NewManager(o *ManagerOptions) *Manager { } m := &Manager{ - groups: map[string]*Group{}, - opts: o, - block: make(chan struct{}), - done: make(chan struct{}), - logger: o.Logger, + groups: map[string]*Group{}, + opts: o, + block: make(chan struct{}), + done: make(chan struct{}), + logger: o.Logger, + restoreNewRuleGroups: o.RestoreNewRuleGroups, } return m @@ -295,7 +301,7 @@ func (m *Manager) LoadGroups( ) (map[string]*Group, []error) { groups := make(map[string]*Group) - shouldRestore := !m.restored + shouldRestore := !m.restored || m.restoreNewRuleGroups for _, fn := range filenames { rgs, errs := m.opts.GroupLoader.Load(fn) @@ -328,7 +334,7 @@ func (m *Manager) LoadGroups( labels.FromMap(r.Annotations), externalLabels, externalURL, - m.restored, + !shouldRestore, m.logger.With("alert", r.Alert), )) continue diff --git a/rules/manager_test.go b/rules/manager_test.go index 6afac993d..bbc0a6023 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -2112,6 +2112,139 @@ func TestAsyncRuleEvaluation(t *testing.T) { }) } +func TestNewRuleGroupRestoration(t *testing.T) { + store := teststorage.New(t) + t.Cleanup(func() { store.Close() }) + var ( + inflightQueries atomic.Int32 + maxInflight atomic.Int32 + maxConcurrency int64 + interval = 60 * time.Second + ) + + waitForEvaluations := func(t *testing.T, ch <-chan int32, targetCount int32) { + for { + select { + case cnt := <-ch: + if cnt == targetCount { + return + } + case <-time.After(5 * time.Second): + return + } + } + } + + files := []string{"fixtures/alert_rule.yaml"} + + option := optsFactory(store, &maxInflight, &inflightQueries, maxConcurrency) + option.Queryable = store + option.Appendable = store + option.NotifyFunc = func(ctx context.Context, expr string, alerts ...*Alert) {} + + var evalCount atomic.Int32 + ch := make(chan int32) + noopEvalIterFunc := func(ctx context.Context, g *Group, evalTimestamp time.Time) { + evalCount.Inc() + ch <- evalCount.Load() + } + + ruleManager := NewManager(option) + go ruleManager.Run() + err := ruleManager.Update(interval, files, labels.EmptyLabels(), "", noopEvalIterFunc) + require.NoError(t, err) + + waitForEvaluations(t, ch, 3) + require.Equal(t, int32(3), evalCount.Load()) + ruleGroups := make(map[string]struct{}) + for _, group := range ruleManager.groups { + ruleGroups[group.Name()] = struct{}{} + require.False(t, group.shouldRestore) + for _, rule := range group.rules { + require.True(t, rule.(*AlertingRule).restored.Load()) + } + } + + files = append(files, "fixtures/alert_rule1.yaml") + err = ruleManager.Update(interval, files, labels.EmptyLabels(), "", nil) + require.NoError(t, err) + ruleManager.Stop() + for _, group := range ruleManager.groups { + // new rule groups added to existing manager will not be restored + require.False(t, group.shouldRestore) + } +} + +func TestNewRuleGroupRestorationWithRestoreNewGroupOption(t *testing.T) { + store := teststorage.New(t) + t.Cleanup(func() { store.Close() }) + var ( + inflightQueries atomic.Int32 + maxInflight atomic.Int32 + maxConcurrency int64 + interval = 60 * time.Second + ) + + waitForEvaluations := func(t *testing.T, ch <-chan int32, targetCount int32) { + for { + select { + case cnt := <-ch: + if cnt == targetCount { + return + } + case <-time.After(5 * time.Second): + return + } + } + } + + files := []string{"fixtures/alert_rule.yaml"} + + option := optsFactory(store, &maxInflight, &inflightQueries, maxConcurrency) + option.Queryable = store + option.Appendable = store + option.RestoreNewRuleGroups = true + option.NotifyFunc = func(ctx context.Context, expr string, alerts ...*Alert) {} + + var evalCount atomic.Int32 + ch := make(chan int32) + noopEvalIterFunc := func(ctx context.Context, g *Group, evalTimestamp time.Time) { + evalCount.Inc() + ch <- evalCount.Load() + } + + ruleManager := NewManager(option) + go ruleManager.Run() + err := ruleManager.Update(interval, files, labels.EmptyLabels(), "", noopEvalIterFunc) + require.NoError(t, err) + + waitForEvaluations(t, ch, 3) + require.Equal(t, int32(3), evalCount.Load()) + ruleGroups := make(map[string]struct{}) + for _, group := range ruleManager.groups { + ruleGroups[group.Name()] = struct{}{} + require.False(t, group.shouldRestore) + for _, rule := range group.rules { + require.True(t, rule.(*AlertingRule).restored.Load()) + } + } + + files = append(files, "fixtures/alert_rule1.yaml") + err = ruleManager.Update(interval, files, labels.EmptyLabels(), "", nil) + require.NoError(t, err) + // stop eval + ruleManager.Stop() + for _, group := range ruleManager.groups { + if _, OK := ruleGroups[group.Name()]; OK { + // already restored + require.False(t, group.shouldRestore) + continue + } + // new rule groups added to existing manager will be restored + require.True(t, group.shouldRestore) + } +} + func TestBoundedRuleEvalConcurrency(t *testing.T) { storage := teststorage.New(t) t.Cleanup(func() { storage.Close() })