diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index ee974bf710..ad3b6b41fd 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -813,7 +813,7 @@ func main() { queryEngine = promql.NewEngine(opts) var alertStore rules.AlertStore if cfg.enableAlertStatePersistence { - alertStore = rules.NewFileStore(logger.With("component", "alertStore"), cfg.alertStoragePath) + alertStore = rules.NewFileStore(logger.With("component", "alertStore"), cfg.alertStoragePath, prometheus.DefaultRegisterer) } ruleManager = rules.NewManager(&rules.ManagerOptions{ @@ -833,8 +833,7 @@ func main() { DefaultRuleQueryOffset: func() time.Duration { return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset) }, - AlertStore: alertStore, - AlertStoreFunc: rules.DefaultAlertStoreFunc, + AlertStore: alertStore, }) } diff --git a/rules/group.go b/rules/group.go index 8d3130843b..a00801a01c 100644 --- a/rules/group.go +++ b/rules/group.go @@ -75,7 +75,6 @@ type Group struct { evalIterationFunc GroupEvalIterationFunc appOpts *storage.AppendOptions - alertStoreFunc AlertStateStoreFunc alertStore AlertStore } @@ -96,7 +95,6 @@ type GroupOptions struct { QueryOffset *time.Duration done chan struct{} EvalIterationFunc GroupEvalIterationFunc - AlertStoreFunc AlertStateStoreFunc AlertStore AlertStore } @@ -128,11 +126,6 @@ func NewGroup(o GroupOptions) *Group { evalIterationFunc = DefaultEvalIterationFunc } - alertStoreFunc := o.AlertStoreFunc - if alertStoreFunc == nil { - alertStoreFunc = DefaultAlertStoreFunc - } - if opts.Logger == nil { opts.Logger = promslog.NewNopLogger() } @@ -154,7 +147,6 @@ func NewGroup(o GroupOptions) *Group { metrics: metrics, evalIterationFunc: evalIterationFunc, appOpts: &storage.AppendOptions{DiscardOutOfOrder: true}, - alertStoreFunc: alertStoreFunc, alertStore: o.AlertStore, } } @@ -554,7 +546,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { restoredAlerts, _ := g.alertStore.GetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name()))) if len(restoredAlerts) > 0 { ar.SetActiveAlerts(restoredAlerts) - logger.Info("Restored alerts from store", "rule", ar.name, "alerts", len(restoredAlerts)) + g.logger.Info("Restored alerts from store", "rule", ar.name, "alerts", len(restoredAlerts)) } } } @@ -1190,3 +1182,35 @@ func buildDependencyMap(rules []Rule) dependencyMap { return dependencies } + +// AlertStore provides persistent storage of alert state. +type AlertStore interface { + // SetAlerts stores the provided list of alerts for a rule. + SetAlerts(key uint64, groupKey string, alerts []*Alert) error + // GetAlerts returns a list of alerts for each alerting rule, + // alerting rule is identified by a fingerprint of its config. + GetAlerts(key uint64) (map[uint64]*Alert, error) +} + +// StoreKeepFiringForState is periodically invoked to store the state of alerting rules using 'keep_firing_for'. +func (g *Group) StoreKeepFiringForState() { + for _, rule := range g.rules { + ar, ok := rule.(*AlertingRule) + if !ok { + continue + } + if ar.KeepFiringFor() != 0 { + alertsToStore := make([]*Alert, 0) + ar.ForEachActiveAlert(func(alert *Alert) { + if !alert.KeepFiringSince.IsZero() { + alertsToStore = append(alertsToStore, alert) + } + }) + groupKey := GroupKey(g.File(), g.Name()) + err := g.alertStore.SetAlerts(ar.GetFingerprint(groupKey), groupKey, alertsToStore) + if err != nil { + g.logger.Error("Failed to store alerting rule state", "rule", ar.Name(), "err", err) + } + } + } +} diff --git a/rules/manager.go b/rules/manager.go index 8d52f18b15..c5bd59417c 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -89,33 +89,7 @@ func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time. if g.alertStore != nil { // feature enabled. - go func() { - g.alertStoreFunc(g) - }() - } -} - -// DefaultAlertStoreFunc is the default implementation of -// AlertStateStoreFunc that is periodically invoked to store the state -// of alerting rules in a group at a given point in time. -func DefaultAlertStoreFunc(g *Group) { - for _, rule := range g.rules { - ar, ok := rule.(*AlertingRule) - if !ok { - continue - } - if ar.KeepFiringFor() != 0 { - alertsToStore := make([]*Alert, 0) - ar.ForEachActiveAlert(func(alert *Alert) { - if !alert.KeepFiringSince.IsZero() { - alertsToStore = append(alertsToStore, alert) - } - }) - err := g.alertStore.SetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name())), alertsToStore) - if err != nil { - g.logger.Error("Failed to store alerting rule state", "rule", ar.Name(), "err", err) - } - } + g.StoreKeepFiringForState() } } @@ -155,7 +129,6 @@ type ManagerOptions struct { RuleConcurrencyController RuleConcurrencyController RuleDependencyController RuleDependencyController AlertStore AlertStore - AlertStoreFunc AlertStateStoreFunc // At present, manager only restores `for` state when manager is newly created which happens // during restarts. This flag provides an option to restore the `for` state when new rule groups are // added to an existing manager @@ -239,8 +212,6 @@ func (m *Manager) Stop() { m.logger.Info("Rule manager stopped") } -type AlertStateStoreFunc func(g *Group) - // Update the rule manager's state as the config requires. If // loading the new rules failed the old rule set is restored. // This method will no-op in case the manager is already stopped. @@ -402,7 +373,6 @@ func (m *Manager) LoadGroups( QueryOffset: (*time.Duration)(rg.QueryOffset), done: m.done, EvalIterationFunc: groupEvalIterationFunc, - AlertStoreFunc: m.opts.AlertStoreFunc, AlertStore: m.opts.AlertStore, }) } diff --git a/rules/manager_test.go b/rules/manager_test.go index 904acbfcf9..cf3bf8fdd6 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -2568,8 +2568,8 @@ func TestKeepFiringForStateRestore(t *testing.T) { }, ) - alertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile) ng := testEngine(t) + alertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile, prometheus.NewRegistry()) opts := &ManagerOptions{ QueryFunc: EngineQueryFunc(ng, testStorage), Appendable: testStorage, @@ -2608,13 +2608,12 @@ func TestKeepFiringForStateRestore(t *testing.T) { ) group := NewGroup(GroupOptions{ - Name: "default", - Interval: time.Second, - Rules: []Rule{rule, rule2}, - ShouldRestore: true, - Opts: opts, - AlertStoreFunc: DefaultAlertStoreFunc, - AlertStore: alertStore, + Name: "default", + Interval: time.Second, + Rules: []Rule{rule, rule2}, + ShouldRestore: true, + Opts: opts, + AlertStore: alertStore, }) groups := make(map[string]*Group) @@ -2657,7 +2656,7 @@ func TestKeepFiringForStateRestore(t *testing.T) { group.Eval(opts.Context, evalTime) group.setLastEvalTimestamp(evalTime) // Manager will store alert state. - DefaultAlertStoreFunc(group) + group.StoreKeepFiringForState() } exp := rule.ActiveAlerts() @@ -2683,16 +2682,15 @@ func TestKeepFiringForStateRestore(t *testing.T) { labels.FromStrings("annotation_test", "rule2"), labels.EmptyLabels(), "", true, nil, ) // Restart alert store. - newAlertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile) + newAlertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile, prometheus.NewRegistry()) newGroup := NewGroup(GroupOptions{ - Name: "default", - Interval: time.Second, - Rules: []Rule{newRule, newRule2}, - ShouldRestore: true, - Opts: opts, - AlertStore: newAlertStore, - AlertStoreFunc: DefaultAlertStoreFunc, + Name: "default", + Interval: time.Second, + Rules: []Rule{newRule, newRule2}, + ShouldRestore: true, + Opts: opts, + AlertStore: newAlertStore, }) newGroups := make(map[string]*Group) diff --git a/rules/store.go b/rules/store.go index b23afb6c4f..7b7d8345ca 100644 --- a/rules/store.go +++ b/rules/store.go @@ -5,41 +5,57 @@ import ( "log/slog" "os" "sync" -) -// AlertStore provides persistent storage of alert state. -type AlertStore interface { - // SetAlerts stores the provided list of alerts for a rule. - SetAlerts(key uint64, alerts []*Alert) error - // GetAlerts returns a list of alerts for each alerting rule, - // alerting rule is identified by a fingerprint of its config. - GetAlerts(key uint64) (map[uint64]*Alert, error) -} + "github.com/prometheus/client_golang/prometheus" +) // FileStore implements the AlertStore interface. type FileStore struct { logger *slog.Logger alertsByRule map[uint64][]*Alert // protects the `alertsByRule` map. - stateMtx sync.RWMutex - path string + stateMtx sync.RWMutex + path string + registerer prometheus.Registerer + storeInitErrors prometheus.Counter + alertStoreErrors *prometheus.CounterVec } -func NewFileStore(l *slog.Logger, storagePath string) *FileStore { +func NewFileStore(l *slog.Logger, storagePath string, registerer prometheus.Registerer) *FileStore { s := &FileStore{ logger: l, alertsByRule: make(map[uint64][]*Alert), path: storagePath, + registerer: registerer, } + s.storeInitErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "alert_store_init_errors_total", + Help: "The total number of errors starting alert store.", + }, + ) + s.alertStoreErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "rule_group_alert_store_errors_total", + Help: "The total number of errors in alert store.", + }, + []string{"rule_group"}, + ) s.initState() return s } // initState reads the state from file storage into the alertsByRule map. func (s *FileStore) initState() { + if s.registerer != nil { + s.registerer.MustRegister(s.alertStoreErrors, s.storeInitErrors) + } file, err := os.OpenFile(s.path, os.O_RDWR|os.O_CREATE, 0o666) if err != nil { s.logger.Error("Failed reading alerts state from file", "err", err) + s.storeInitErrors.Inc() return } defer file.Close() @@ -48,6 +64,7 @@ func (s *FileStore) initState() { err = json.NewDecoder(file).Decode(&alertsByRule) if err != nil { s.logger.Error("Failed reading alerts state from file", "err", err) + s.storeInitErrors.Inc() } if alertsByRule == nil { alertsByRule = make(map[uint64][]*Alert) @@ -77,7 +94,7 @@ func (s *FileStore) GetAlerts(key uint64) (map[uint64]*Alert, error) { } // SetAlerts updates the stateByRule map and writes state to file storage. -func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error { +func (s *FileStore) SetAlerts(key uint64, groupKey string, alerts []*Alert) error { s.stateMtx.Lock() defer s.stateMtx.Unlock() @@ -88,6 +105,7 @@ func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error { // flush in memory state to file storage file, err := os.Create(s.path) if err != nil { + s.alertStoreErrors.WithLabelValues(groupKey).Inc() return err } defer file.Close() @@ -95,6 +113,7 @@ func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error { encoder := json.NewEncoder(file) err = encoder.Encode(s.alertsByRule) if err != nil { + s.alertStoreErrors.WithLabelValues(groupKey).Inc() return err } return nil diff --git a/rules/store_test.go b/rules/store_test.go index ba5cf7f018..398c39bb30 100644 --- a/rules/store_test.go +++ b/rules/store_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/prometheus/common/promslog" @@ -12,7 +13,7 @@ import ( ) func TestAlertStore(t *testing.T) { - alertStore := NewFileStore(promslog.NewNopLogger(), "alertstoretest") + alertStore := NewFileStore(promslog.NewNopLogger(), "alertstoretest", prometheus.NewRegistry()) t.Cleanup(func() { os.Remove("alertstoretest") }) @@ -29,7 +30,7 @@ func TestAlertStore(t *testing.T) { for key, alerts := range alertsByRule { sortAlerts(alerts) - err := alertStore.SetAlerts(key, alerts) + err := alertStore.SetAlerts(key, "test/test1", alerts) require.NoError(t, err) got, err := alertStore.GetAlerts(key)