diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index cd0775bbd..250d21a83 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -203,11 +203,13 @@ type flagConfig struct { featureList []string // These options are extracted from featureList // for ease of use. - enablePerStepStats bool - enableConcurrentRuleEval bool + enablePerStepStats bool + enableConcurrentRuleEval bool + enableAlertStatePersistence bool - prometheusURL string - corsRegexString string + prometheusURL string + corsRegexString string + alertStoragePath string promqlEnableDelayedNameRemoval bool @@ -244,6 +246,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "concurrent-rule-eval": c.enableConcurrentRuleEval = true logger.Info("Experimental concurrent rule evaluation enabled.") + case "alert-state-persistence": + c.enableAlertStatePersistence = true + logger.Info("Experimental alert state persistence storage enabled for alerting rules using keep_firing_for.") case "promql-experimental-functions": parser.EnableExperimentalFunctions = true logger.Info("Experimental PromQL functions enabled.") @@ -484,6 +489,8 @@ func main() { serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). Default("1m").SetValue(&cfg.resendDelay) + serverOnlyFlag(a, "rules.alert.state-storage-path", "Path for alert state storage."). + Default("data/alerts").StringVar(&cfg.alertStoragePath) serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules that can run concurrently. When set, \"query.max-concurrency\" may need to be adjusted accordingly."). Default("4").Int64Var(&cfg.maxConcurrentEvals) @@ -800,6 +807,10 @@ func main() { } queryEngine = promql.NewEngine(opts) + var alertStore rules.AlertStore + if cfg.enableAlertStatePersistence { + alertStore = rules.NewFileStore(logger.With("component", "alertStore"), cfg.alertStoragePath) + } ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, @@ -818,6 +829,8 @@ func main() { DefaultRuleQueryOffset: func() time.Duration { return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset) }, + AlertStore: alertStore, + AlertStoreFunc: rules.DefaultAlertStoreFunc, }) } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index dd207dc38..59db490a1 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -53,6 +53,7 @@ The Prometheus monitoring server | --rules.alert.for-outage-tolerance | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` | | --rules.alert.for-grace-period | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` | | --rules.alert.resend-delay | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | +| --rules.alert.state-storage-path | Path for alert state storage. Use with server mode only. | `data/alerts` | | --rules.max-concurrent-evals | Global concurrency limit for independent rules that can run concurrently. When set, "query.max-concurrency" may need to be adjusted accordingly. Use with server mode only. | `4` | | --alertmanager.notification-queue-capacity | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | | --alertmanager.drain-notification-queue-on-shutdown | Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down. Use with server mode only. | `true` | diff --git a/rules/alerting.go b/rules/alerting.go index e7f15baef..1fa3e2bb5 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/cespare/xxhash/v2" "github.com/prometheus/common/model" "go.uber.org/atomic" "gopkg.in/yaml.v2" @@ -41,7 +42,6 @@ const ( alertMetricName = "ALERTS" // AlertForStateMetricName is the metric name for 'for' state of alert. alertForStateMetricName = "ALERTS_FOR_STATE" - // AlertStateLabel is the label name indicating the state of an alert. alertStateLabel = "alertstate" ) @@ -594,3 +594,16 @@ func (r *AlertingRule) String() string { return string(byt) } + +// GetFingerprint returns a hash to uniquely identify an alerting rule, +// using a combination of rule config and the groupKey. +func (r *AlertingRule) GetFingerprint(groupKey string) uint64 { + return xxhash.Sum64(append([]byte(r.String()), []byte(groupKey)...)) +} + +// SetActiveAlerts updates the active alerts of the alerting rule. +func (r *AlertingRule) SetActiveAlerts(alerts map[uint64]*Alert) { + r.activeMtx.Lock() + defer r.activeMtx.Unlock() + r.active = alerts +} diff --git a/rules/group.go b/rules/group.go index ecc96d0a1..05c3a7f50 100644 --- a/rules/group.go +++ b/rules/group.go @@ -77,6 +77,8 @@ type Group struct { // concurrencyController controls the rules evaluation concurrency. concurrencyController RuleConcurrencyController appOpts *storage.AppendOptions + alertStoreFunc AlertStateStoreFunc + alertStore AlertStore } // GroupEvalIterationFunc is used to implement and extend rule group @@ -96,6 +98,8 @@ type GroupOptions struct { QueryOffset *time.Duration done chan struct{} EvalIterationFunc GroupEvalIterationFunc + AlertStoreFunc AlertStateStoreFunc + AlertStore AlertStore } // NewGroup makes a new Group with the given name, options, and rules. @@ -126,6 +130,12 @@ func NewGroup(o GroupOptions) *Group { evalIterationFunc = DefaultEvalIterationFunc } + alertStoreFunc := o.AlertStoreFunc + //var alertStore *AlertStore + if alertStoreFunc == nil { + alertStoreFunc = DefaultAlertStoreFunc + } + concurrencyController := opts.RuleConcurrencyController if concurrencyController == nil { concurrencyController = sequentialRuleEvalController{} @@ -151,6 +161,8 @@ func NewGroup(o GroupOptions) *Group { logger: opts.Logger.With("file", o.File, "group", o.Name), metrics: metrics, evalIterationFunc: evalIterationFunc, + alertStoreFunc: alertStoreFunc, + alertStore: o.AlertStore, concurrencyController: concurrencyController, appOpts: &storage.AppendOptions{DiscardOutOfOrder: true}, } @@ -537,6 +549,17 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + if g.alertStore != nil && g.lastEvalTimestamp.IsZero() { + // Restore alerts when feature is enabled and it is the first evaluation for the group + if ar, ok := rule.(*AlertingRule); ok { + restoredAlerts, _ := g.alertStore.GetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name()))) + if restoredAlerts != nil && len(restoredAlerts) > 0 { + ar.SetActiveAlerts(restoredAlerts) + logger.Info("Restored alerts from store", "rule", ar.name, "alerts", len(restoredAlerts)) + } + } + } + vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) if err != nil { rule.SetHealth(HealthBad) diff --git a/rules/manager.go b/rules/manager.go index edc67a832..9d178551f 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -26,7 +26,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/promslog" - "golang.org/x/sync/semaphore" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" @@ -35,6 +34,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/strutil" + "golang.org/x/sync/semaphore" ) // QueryFunc processes PromQL queries. @@ -86,6 +86,37 @@ func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time. g.setEvaluationTime(timeSinceStart) g.setLastEvaluation(start) g.setLastEvalTimestamp(evalTimestamp) + + if g.alertStore != nil { + //feature enabled + go func() { + g.alertStoreFunc(g) + }() + } +} + +// DefaultAlertStoreFunc is the default implementation of +// AlertStateStoreFunc that is periodically invoked to store the state +// of alerting rules in a group at a given point in time. +func DefaultAlertStoreFunc(g *Group) { + for _, rule := range g.rules { + ar, ok := rule.(*AlertingRule) + if !ok { + continue + } + if ar.KeepFiringFor() != 0 { + alertsToStore := make([]*Alert, 0) + ar.ForEachActiveAlert(func(alert *Alert) { + if !alert.KeepFiringSince.IsZero() { + alertsToStore = append(alertsToStore, alert) + } + }) + err := g.alertStore.SetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name())), alertsToStore) + if err != nil { + g.logger.Error("Failed to store alerting rule state", "rule", ar.Name(), "err", err) + } + } + } } // The Manager manages recording and alerting rules. @@ -122,8 +153,9 @@ type ManagerOptions struct { ConcurrentEvalsEnabled bool RuleConcurrencyController RuleConcurrencyController RuleDependencyController RuleDependencyController - - Metrics *Metrics + Metrics *Metrics + AlertStore AlertStore + AlertStoreFunc AlertStateStoreFunc } // NewManager returns an implementation of Manager, ready to be started @@ -193,6 +225,8 @@ func (m *Manager) Stop() { m.logger.Info("Rule manager stopped") } +type AlertStateStoreFunc func(g *Group) + // Update the rule manager's state as the config requires. If // loading the new rules failed the old rule set is restored. // This method will no-op in case the manager is already stopped. @@ -343,7 +377,6 @@ func (m *Manager) LoadGroups( // Check dependencies between rules and store it on the Rule itself. m.opts.RuleDependencyController.AnalyseRules(rules) - groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ Name: rg.Name, File: fn, @@ -355,6 +388,8 @@ func (m *Manager) LoadGroups( QueryOffset: (*time.Duration)(rg.QueryOffset), done: m.done, EvalIterationFunc: groupEvalIterationFunc, + AlertStoreFunc: m.opts.AlertStoreFunc, + AlertStore: m.opts.AlertStore, }) } } diff --git a/rules/manager_test.go b/rules/manager_test.go index 94ee1e8b8..2f8cc497a 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -2266,6 +2266,188 @@ func TestLabels_FromMaps(t *testing.T) { require.Equal(t, expected, mLabels, "unexpected labelset") } +func TestKeepFiringForStateRestore(t *testing.T) { + testStorage := promqltest.LoadedStorage(t, ` + load 5m + http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 0 0 0 0 0 0 0 + http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 100 0 0 0 0 0 0 0 + http_requests_5xx{job="app-server", instance="2", group="canary", severity="overwrite-me"} 80 0 0 0 0 0 0 0 + `) + + testStoreFile := "testalertstore" + + t.Cleanup( + func() { + testStorage.Close() + os.Remove(testStoreFile) + }, + ) + + alertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile) + ng := testEngine(t) + opts := &ManagerOptions{ + QueryFunc: EngineQueryFunc(ng, testStorage), + Appendable: testStorage, + Queryable: testStorage, + Context: context.Background(), + Logger: promslog.NewNopLogger(), + NotifyFunc: func(ctx context.Context, expr string, alerts ...*Alert) {}, + OutageTolerance: 30 * time.Minute, + ForGracePeriod: 10 * time.Minute, + AlertStore: alertStore, + } + + keepFiringForDuration := 30 * time.Minute + // Initial run before prometheus goes down. + expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} > 0`) + require.NoError(t, err) + expr2, err := parser.ParseExpr(`http_requests_5xx{group="canary", job="app-server"} > 0`) + require.NoError(t, err) + + rule := NewAlertingRule( + "HTTPRequestRateLow", + expr, + 0, + keepFiringForDuration, + labels.FromStrings("severity", "critical"), + labels.FromStrings("annotation1", "rule1"), labels.EmptyLabels(), "", true, nil, + ) + keepFiringForDuration2 := 60 * time.Minute + rule2 := NewAlertingRule( + "HTTPRequestRateLow", + expr2, + 0, + keepFiringForDuration2, + labels.FromStrings("severity", "critical"), + labels.FromStrings("annotation2", "rule2"), labels.EmptyLabels(), "", true, nil, + ) + + group := NewGroup(GroupOptions{ + Name: "default", + Interval: time.Second, + Rules: []Rule{rule, rule2}, + ShouldRestore: true, + Opts: opts, + AlertStoreFunc: DefaultAlertStoreFunc, + AlertStore: alertStore, + }) + + groups := make(map[string]*Group) + groups["default;"] = group + + type testInput struct { + name string + restoreDuration time.Duration + initialRuns []time.Duration + alertsExpected int + } + + tests := []testInput{ + { + name: "normal restore - 3 alerts firing with keep_firing_for duration active", + restoreDuration: 30 * time.Minute, + initialRuns: []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 20 * time.Minute, 25 * time.Minute}, + alertsExpected: 3, + }, + { + name: "restore after rule 1 keep firing for duration is over - 1 alert with keep_firing_for duration active", + restoreDuration: keepFiringForDuration + 10*time.Minute, + initialRuns: []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 20 * time.Minute, 50 * time.Minute}, + alertsExpected: 1, + }, + { + name: "restore after keep firing for duration expires - 0 alerts active", + restoreDuration: 120 * time.Minute, + initialRuns: []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 20 * time.Minute, 110 * time.Minute}, + alertsExpected: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + baseTime := time.Unix(0, 0) + for _, duration := range tt.initialRuns { + // Evaluating rule before restarting. + evalTime := baseTime.Add(duration) + group.Eval(opts.Context, evalTime) + group.setLastEvalTimestamp(evalTime) + // Manager will store alert state. + DefaultAlertStoreFunc(group) + } + + exp := rule.ActiveAlerts() + exp2 := rule2.ActiveAlerts() + // Record alerts before restart. + expectedAlerts := [][]*Alert{exp, exp2} + + // Prometheus goes down here. We create new rules and groups. + newRule := NewAlertingRule( + "HTTPRequestRateLow", + expr, + 0, + keepFiringForDuration, + labels.FromStrings("severity", "critical"), + labels.FromStrings("annotation1", "rule1"), labels.EmptyLabels(), "", false, nil, + ) + newRule2 := NewAlertingRule( + "HTTPRequestRateLow", + expr2, + 0, + keepFiringForDuration2, + labels.FromStrings("severity", "critical"), + labels.FromStrings("annotation2", "rule2"), labels.EmptyLabels(), "", true, nil, + ) + // Restart alert store + newAlertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile) + + newGroup := NewGroup(GroupOptions{ + Name: "default", + Interval: time.Second, + Rules: []Rule{newRule, newRule2}, + ShouldRestore: true, + Opts: opts, + AlertStore: newAlertStore, + AlertStoreFunc: DefaultAlertStoreFunc, + }) + + newGroups := make(map[string]*Group) + newGroups["default;"] = newGroup + + restoreTime := baseTime.Add(tt.restoreDuration) + + // First eval after restart. + newGroup.Eval(context.TODO(), restoreTime) + + got := newRule.ActiveAlerts() + got2 := newRule2.ActiveAlerts() + require.Equal(t, len(exp), len(got)) + require.Equal(t, len(exp2), len(got2)) + require.Equal(t, tt.alertsExpected, len(got)+len(got2)) + + results := [][]*Alert{got, got2} + + for i, result := range results { + sort.Slice(result, func(i, j int) bool { + return labels.Compare(got[i].Labels, got[j].Labels) < 0 + }) + sortAlerts(result) + sortAlerts(expectedAlerts[i]) + } + + for i, expected := range expectedAlerts { + got = results[i] + require.Equal(t, len(expected), len(got)) + for j, alert := range expected { + require.Equal(t, alert.Labels, got[j].Labels) + require.Equal(t, alert.Annotations, got[j].Annotations) + require.Equal(t, alert.ActiveAt, got[j].ActiveAt) + require.Equal(t, alert.KeepFiringSince, got[j].KeepFiringSince) + } + } + }) + } +} + func TestRuleDependencyController_AnalyseRules(t *testing.T) { type expectedDependencies struct { noDependentRules bool diff --git a/rules/store.go b/rules/store.go new file mode 100644 index 000000000..721ece982 --- /dev/null +++ b/rules/store.go @@ -0,0 +1,101 @@ +package rules + +import ( + "encoding/json" + "log/slog" + "os" + "sync" +) + +// AlertStore provides persistent storage of alert state. +type AlertStore interface { + // SetAlerts stores the provided list of alerts for a rule. + SetAlerts(key uint64, alerts []*Alert) error + // GetAlerts returns a list of alerts for each alerting rule, + // alerting rule is identified by a fingerprint of its config. + GetAlerts(key uint64) (map[uint64]*Alert, error) +} + +// FileStore implements the AlertStore interface. +type FileStore struct { + logger *slog.Logger + alertsByRule map[uint64][]*Alert + //protects the `alertsByRule` map + stateMtx sync.RWMutex + path string +} + +func NewFileStore(l *slog.Logger, storagePath string) *FileStore { + s := &FileStore{ + logger: l, + alertsByRule: make(map[uint64][]*Alert), + path: storagePath, + } + s.initState() + return s +} + +// initState reads the state from file storage into the alertsByRule map +func (s *FileStore) initState() { + file, err := os.OpenFile(s.path, os.O_RDWR|os.O_CREATE, 0o666) + if err != nil { + s.logger.Error("Failed reading alerts state from file", "err", err) + return + } + defer file.Close() + + var alertsByRule map[uint64][]*Alert + err = json.NewDecoder(file).Decode(&alertsByRule) + if err != nil { + s.logger.Error("Failed reading alerts state from file", "err", err) + } + if alertsByRule == nil { + alertsByRule = make(map[uint64][]*Alert) + } + s.alertsByRule = alertsByRule +} + +// GetAlerts returns the stored alerts for an alerting rule +// Alert state is read from the in memory map which is populated during initialization +func (s *FileStore) GetAlerts(key uint64) (map[uint64]*Alert, error) { + s.stateMtx.RLock() + defer s.stateMtx.RUnlock() + + restoredAlerts, ok := s.alertsByRule[key] + if !ok { + return nil, nil + } + alerts := make(map[uint64]*Alert) + for _, alert := range restoredAlerts { + if alert == nil { + continue + } + h := alert.Labels.Hash() + alerts[h] = alert + } + return alerts, nil +} + +// SetAlerts updates the stateByRule map and writes state to file storage +func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error { + s.stateMtx.Lock() + defer s.stateMtx.Unlock() + + // Update in memory + if alerts != nil { + s.alertsByRule[key] = alerts + } + // flush in memory state to file storage + file, err := os.Create(s.path) + if err != nil { + return err + } + defer file.Close() + + encoder := json.NewEncoder(file) + err = encoder.Encode(s.alertsByRule) + if err != nil { + return err + } + return nil +} diff --git a/rules/store_test.go b/rules/store_test.go new file mode 100644 index 000000000..96e95efa8 --- /dev/null +++ b/rules/store_test.go @@ -0,0 +1,46 @@ +package rules + +import ( + "os" + "testing" + "time" + + "github.com/prometheus/common/promslog" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestAlertStore(t *testing.T) { + alertStore := NewFileStore(promslog.NewNopLogger(), "alertstoretest") + t.Cleanup(func() { + os.Remove("alertstoretest") + }) + + alertsByRule := make(map[uint64][]*Alert) + baseTime := time.Now() + al1 := &Alert{State: StateFiring, Labels: labels.FromStrings("a1", "1"), Annotations: labels.FromStrings("annotation1", "a1"), ActiveAt: baseTime, KeepFiringSince: baseTime} + al2 := &Alert{State: StateFiring, Labels: labels.FromStrings("a2", "2"), Annotations: labels.FromStrings("annotation2", "a2"), ActiveAt: baseTime, KeepFiringSince: baseTime} + + alertsByRule[1] = []*Alert{al1, al2} + alertsByRule[2] = []*Alert{al2} + alertsByRule[3] = []*Alert{al1} + alertsByRule[4] = []*Alert{} + + for key, alerts := range alertsByRule { + err := alertStore.SetAlerts(key, alerts) + require.NoError(t, err) + + got, err := alertStore.GetAlerts(key) + require.NoError(t, err) + require.Equal(t, len(alerts), len(got)) + j := 0 + for _, al := range got { + require.Equal(t, alerts[j].State, al.State) + require.Equal(t, alerts[j].Labels, al.Labels) + require.Equal(t, alerts[j].Annotations, al.Annotations) + require.Equal(t, alerts[j].ActiveAt, al.ActiveAt) + require.Equal(t, alerts[j].KeepFiringSince, al.KeepFiringSince) + j++ + } + } +}