From 60c2a5b44830a982226f7642b0d9be7790b657ba Mon Sep 17 00:00:00 2001 From: Mustafain Ali Khan Date: Tue, 30 Jul 2024 10:18:03 -0700 Subject: [PATCH] Remove storage hook and metrics for errors Signed-off-by: Mustafain Ali Khan --- cmd/prometheus/main.go | 5 ++--- rules/group.go | 42 ++++++++++++++++++++++++++++++--------- rules/manager.go | 35 +++----------------------------- rules/manager_test.go | 32 ++++++++++++++---------------- rules/store.go | 45 ++++++++++++++++++++++++++++++------------ rules/store_test.go | 5 +++-- 6 files changed, 88 insertions(+), 76 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 250d21a831..20c652fb19 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -809,7 +809,7 @@ func main() { queryEngine = promql.NewEngine(opts) var alertStore rules.AlertStore if cfg.enableAlertStatePersistence { - alertStore = rules.NewFileStore(logger.With("component", "alertStore"), cfg.alertStoragePath) + alertStore = rules.NewFileStore(logger.With("component", "alertStore"), cfg.alertStoragePath, prometheus.DefaultRegisterer) } ruleManager = rules.NewManager(&rules.ManagerOptions{ @@ -829,8 +829,7 @@ func main() { DefaultRuleQueryOffset: func() time.Duration { return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset) }, - AlertStore: alertStore, - AlertStoreFunc: rules.DefaultAlertStoreFunc, + AlertStore: alertStore, }) } diff --git a/rules/group.go b/rules/group.go index ba52bbb91d..5d086dbc00 100644 --- a/rules/group.go +++ b/rules/group.go @@ -77,7 +77,6 @@ type Group struct { // concurrencyController controls the rules evaluation concurrency. concurrencyController RuleConcurrencyController appOpts *storage.AppendOptions - alertStoreFunc AlertStateStoreFunc alertStore AlertStore } @@ -98,7 +97,6 @@ type GroupOptions struct { QueryOffset *time.Duration done chan struct{} EvalIterationFunc GroupEvalIterationFunc - AlertStoreFunc AlertStateStoreFunc AlertStore AlertStore } @@ -130,11 +128,6 @@ func NewGroup(o GroupOptions) *Group { evalIterationFunc = DefaultEvalIterationFunc } - alertStoreFunc := o.AlertStoreFunc - if alertStoreFunc == nil { - alertStoreFunc = DefaultAlertStoreFunc - } - concurrencyController := opts.RuleConcurrencyController if concurrencyController == nil { concurrencyController = sequentialRuleEvalController{} @@ -160,7 +153,6 @@ func NewGroup(o GroupOptions) *Group { logger: opts.Logger.With("file", o.File, "group", o.Name), metrics: metrics, evalIterationFunc: evalIterationFunc, - alertStoreFunc: alertStoreFunc, alertStore: o.AlertStore, concurrencyController: concurrencyController, appOpts: &storage.AppendOptions{DiscardOutOfOrder: true}, @@ -554,7 +546,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { restoredAlerts, _ := g.alertStore.GetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name()))) if len(restoredAlerts) > 0 { ar.SetActiveAlerts(restoredAlerts) - logger.Info("Restored alerts from store", "rule", ar.name, "alerts", len(restoredAlerts)) + g.logger.Info("Restored alerts from store", "rule", ar.name, "alerts", len(restoredAlerts)) } } } @@ -1157,3 +1149,35 @@ func buildDependencyMap(rules []Rule) dependencyMap { return dependencies } + +// AlertStore provides persistent storage of alert state. +type AlertStore interface { + // SetAlerts stores the provided list of alerts for a rule. + SetAlerts(key uint64, groupKey string, alerts []*Alert) error + // GetAlerts returns a list of alerts for each alerting rule, + // alerting rule is identified by a fingerprint of its config. + GetAlerts(key uint64) (map[uint64]*Alert, error) +} + +// StoreKeepFiringForState is periodically invoked to store the state of alerting rules using 'keep_firing_for'. +func (g *Group) StoreKeepFiringForState() { + for _, rule := range g.rules { + ar, ok := rule.(*AlertingRule) + if !ok { + continue + } + if ar.KeepFiringFor() != 0 { + alertsToStore := make([]*Alert, 0) + ar.ForEachActiveAlert(func(alert *Alert) { + if !alert.KeepFiringSince.IsZero() { + alertsToStore = append(alertsToStore, alert) + } + }) + groupKey := GroupKey(g.File(), g.Name()) + err := g.alertStore.SetAlerts(ar.GetFingerprint(groupKey), groupKey, alertsToStore) + if err != nil { + g.logger.Error("Failed to store alerting rule state", "rule", ar.Name(), "err", err) + } + } + } +} diff --git a/rules/manager.go b/rules/manager.go index 8c459ad168..ad872679a3 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -89,33 +89,7 @@ func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time. if g.alertStore != nil { // feature enabled. - go func() { - g.alertStoreFunc(g) - }() - } -} - -// DefaultAlertStoreFunc is the default implementation of -// AlertStateStoreFunc that is periodically invoked to store the state -// of alerting rules in a group at a given point in time. -func DefaultAlertStoreFunc(g *Group) { - for _, rule := range g.rules { - ar, ok := rule.(*AlertingRule) - if !ok { - continue - } - if ar.KeepFiringFor() != 0 { - alertsToStore := make([]*Alert, 0) - ar.ForEachActiveAlert(func(alert *Alert) { - if !alert.KeepFiringSince.IsZero() { - alertsToStore = append(alertsToStore, alert) - } - }) - err := g.alertStore.SetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name())), alertsToStore) - if err != nil { - g.logger.Error("Failed to store alerting rule state", "rule", ar.Name(), "err", err) - } - } + g.StoreKeepFiringForState() } } @@ -153,9 +127,9 @@ type ManagerOptions struct { ConcurrentEvalsEnabled bool RuleConcurrencyController RuleConcurrencyController RuleDependencyController RuleDependencyController - Metrics *Metrics AlertStore AlertStore - AlertStoreFunc AlertStateStoreFunc + + Metrics *Metrics } // NewManager returns an implementation of Manager, ready to be started @@ -225,8 +199,6 @@ func (m *Manager) Stop() { m.logger.Info("Rule manager stopped") } -type AlertStateStoreFunc func(g *Group) - // Update the rule manager's state as the config requires. If // loading the new rules failed the old rule set is restored. // This method will no-op in case the manager is already stopped. @@ -388,7 +360,6 @@ func (m *Manager) LoadGroups( QueryOffset: (*time.Duration)(rg.QueryOffset), done: m.done, EvalIterationFunc: groupEvalIterationFunc, - AlertStoreFunc: m.opts.AlertStoreFunc, AlertStore: m.opts.AlertStore, }) } diff --git a/rules/manager_test.go b/rules/manager_test.go index 8a3cf49740..08ca8710c5 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -2283,8 +2283,8 @@ func TestKeepFiringForStateRestore(t *testing.T) { }, ) - alertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile) ng := testEngine(t) + alertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile, prometheus.NewRegistry()) opts := &ManagerOptions{ QueryFunc: EngineQueryFunc(ng, testStorage), Appendable: testStorage, @@ -2323,13 +2323,12 @@ func TestKeepFiringForStateRestore(t *testing.T) { ) group := NewGroup(GroupOptions{ - Name: "default", - Interval: time.Second, - Rules: []Rule{rule, rule2}, - ShouldRestore: true, - Opts: opts, - AlertStoreFunc: DefaultAlertStoreFunc, - AlertStore: alertStore, + Name: "default", + Interval: time.Second, + Rules: []Rule{rule, rule2}, + ShouldRestore: true, + Opts: opts, + AlertStore: alertStore, }) groups := make(map[string]*Group) @@ -2372,7 +2371,7 @@ func TestKeepFiringForStateRestore(t *testing.T) { group.Eval(opts.Context, evalTime) group.setLastEvalTimestamp(evalTime) // Manager will store alert state. - DefaultAlertStoreFunc(group) + group.StoreKeepFiringForState() } exp := rule.ActiveAlerts() @@ -2398,16 +2397,15 @@ func TestKeepFiringForStateRestore(t *testing.T) { labels.FromStrings("annotation_test", "rule2"), labels.EmptyLabels(), "", true, nil, ) // Restart alert store. - newAlertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile) + newAlertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile, prometheus.NewRegistry()) newGroup := NewGroup(GroupOptions{ - Name: "default", - Interval: time.Second, - Rules: []Rule{newRule, newRule2}, - ShouldRestore: true, - Opts: opts, - AlertStore: newAlertStore, - AlertStoreFunc: DefaultAlertStoreFunc, + Name: "default", + Interval: time.Second, + Rules: []Rule{newRule, newRule2}, + ShouldRestore: true, + Opts: opts, + AlertStore: newAlertStore, }) newGroups := make(map[string]*Group) diff --git a/rules/store.go b/rules/store.go index b23afb6c4f..7b7d8345ca 100644 --- a/rules/store.go +++ b/rules/store.go @@ -5,41 +5,57 @@ import ( "log/slog" "os" "sync" -) -// AlertStore provides persistent storage of alert state. -type AlertStore interface { - // SetAlerts stores the provided list of alerts for a rule. - SetAlerts(key uint64, alerts []*Alert) error - // GetAlerts returns a list of alerts for each alerting rule, - // alerting rule is identified by a fingerprint of its config. - GetAlerts(key uint64) (map[uint64]*Alert, error) -} + "github.com/prometheus/client_golang/prometheus" +) // FileStore implements the AlertStore interface. type FileStore struct { logger *slog.Logger alertsByRule map[uint64][]*Alert // protects the `alertsByRule` map. - stateMtx sync.RWMutex - path string + stateMtx sync.RWMutex + path string + registerer prometheus.Registerer + storeInitErrors prometheus.Counter + alertStoreErrors *prometheus.CounterVec } -func NewFileStore(l *slog.Logger, storagePath string) *FileStore { +func NewFileStore(l *slog.Logger, storagePath string, registerer prometheus.Registerer) *FileStore { s := &FileStore{ logger: l, alertsByRule: make(map[uint64][]*Alert), path: storagePath, + registerer: registerer, } + s.storeInitErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "alert_store_init_errors_total", + Help: "The total number of errors starting alert store.", + }, + ) + s.alertStoreErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "rule_group_alert_store_errors_total", + Help: "The total number of errors in alert store.", + }, + []string{"rule_group"}, + ) s.initState() return s } // initState reads the state from file storage into the alertsByRule map. func (s *FileStore) initState() { + if s.registerer != nil { + s.registerer.MustRegister(s.alertStoreErrors, s.storeInitErrors) + } file, err := os.OpenFile(s.path, os.O_RDWR|os.O_CREATE, 0o666) if err != nil { s.logger.Error("Failed reading alerts state from file", "err", err) + s.storeInitErrors.Inc() return } defer file.Close() @@ -48,6 +64,7 @@ func (s *FileStore) initState() { err = json.NewDecoder(file).Decode(&alertsByRule) if err != nil { s.logger.Error("Failed reading alerts state from file", "err", err) + s.storeInitErrors.Inc() } if alertsByRule == nil { alertsByRule = make(map[uint64][]*Alert) @@ -77,7 +94,7 @@ func (s *FileStore) GetAlerts(key uint64) (map[uint64]*Alert, error) { } // SetAlerts updates the stateByRule map and writes state to file storage. -func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error { +func (s *FileStore) SetAlerts(key uint64, groupKey string, alerts []*Alert) error { s.stateMtx.Lock() defer s.stateMtx.Unlock() @@ -88,6 +105,7 @@ func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error { // flush in memory state to file storage file, err := os.Create(s.path) if err != nil { + s.alertStoreErrors.WithLabelValues(groupKey).Inc() return err } defer file.Close() @@ -95,6 +113,7 @@ func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error { encoder := json.NewEncoder(file) err = encoder.Encode(s.alertsByRule) if err != nil { + s.alertStoreErrors.WithLabelValues(groupKey).Inc() return err } return nil diff --git a/rules/store_test.go b/rules/store_test.go index ba5cf7f018..398c39bb30 100644 --- a/rules/store_test.go +++ b/rules/store_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/prometheus/common/promslog" @@ -12,7 +13,7 @@ import ( ) func TestAlertStore(t *testing.T) { - alertStore := NewFileStore(promslog.NewNopLogger(), "alertstoretest") + alertStore := NewFileStore(promslog.NewNopLogger(), "alertstoretest", prometheus.NewRegistry()) t.Cleanup(func() { os.Remove("alertstoretest") }) @@ -29,7 +30,7 @@ func TestAlertStore(t *testing.T) { for key, alerts := range alertsByRule { sortAlerts(alerts) - err := alertStore.SetAlerts(key, alerts) + err := alertStore.SetAlerts(key, "test/test1", alerts) require.NoError(t, err) got, err := alertStore.GetAlerts(key)