This commit is contained in:
Mustafain Ali Khan 2025-03-05 11:06:49 -08:00 committed by GitHub
commit 0fbd4a2f9f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 487 additions and 5 deletions

View file

@ -203,11 +203,13 @@ type flagConfig struct {
featureList []string
// These options are extracted from featureList
// for ease of use.
enablePerStepStats bool
enableConcurrentRuleEval bool
enablePerStepStats bool
enableConcurrentRuleEval bool
enableAlertStatePersistence bool
prometheusURL string
corsRegexString string
prometheusURL string
corsRegexString string
alertStoragePath string
promqlEnableDelayedNameRemoval bool
@ -244,6 +246,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "concurrent-rule-eval":
c.enableConcurrentRuleEval = true
logger.Info("Experimental concurrent rule evaluation enabled.")
case "alert-state-persistence":
c.enableAlertStatePersistence = true
logger.Info("Experimental alert state persistence storage enabled for alerting rules using keep_firing_for.")
case "promql-experimental-functions":
parser.EnableExperimentalFunctions = true
logger.Info("Experimental PromQL functions enabled.")
@ -488,6 +493,8 @@ func main() {
serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
Default("1m").SetValue(&cfg.resendDelay)
serverOnlyFlag(a, "rules.alert.state-storage-path", "Path for alert state storage.").
Default("data/alerts").StringVar(&cfg.alertStoragePath)
serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules that can run concurrently. When set, \"query.max-concurrency\" may need to be adjusted accordingly.").
Default("4").Int64Var(&cfg.maxConcurrentEvals)
@ -804,6 +811,10 @@ func main() {
}
queryEngine = promql.NewEngine(opts)
var alertStore rules.AlertStore
if cfg.enableAlertStatePersistence {
alertStore = rules.NewFileStore(logger.With("component", "alertStore"), cfg.alertStoragePath, prometheus.DefaultRegisterer)
}
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
@ -822,6 +833,7 @@ func main() {
DefaultRuleQueryOffset: func() time.Duration {
return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset)
},
AlertStore: alertStore,
})
}

View file

@ -53,6 +53,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--rules.alert.for-outage-tolerance</code> | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` |
| <code class="text-nowrap">--rules.alert.for-grace-period</code> | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` |
| <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
| <code class="text-nowrap">--rules.alert.state-storage-path</code> | Path for alert state storage. Use with server mode only. | `data/alerts` |
| <code class="text-nowrap">--rules.max-concurrent-evals</code> | Global concurrency limit for independent rules that can run concurrently. When set, "query.max-concurrency" may need to be adjusted accordingly. Use with server mode only. | `4` |
| <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
| <code class="text-nowrap">--alertmanager.drain-notification-queue-on-shutdown</code> | Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down. Use with server mode only. | `true` |

View file

@ -23,6 +23,7 @@ import (
"sync"
"time"
"github.com/cespare/xxhash/v2"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
"gopkg.in/yaml.v2"
@ -610,6 +611,8 @@ func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay
notifyFunc(ctx, r.vector.String(), alerts...)
}
// Note that changing format of String() changes value of GetFingerprint() leading to loss of persisted state
// when the `alert-state-persistence` feature is enabled.
func (r *AlertingRule) String() string {
ar := rulefmt.Rule{
Alert: r.name,
@ -627,3 +630,16 @@ func (r *AlertingRule) String() string {
return string(byt)
}
// GetFingerprint returns a hash to uniquely identify an alerting rule,
// using a combination of rule config and the groupKey.
func (r *AlertingRule) GetFingerprint(groupKey string) uint64 {
return xxhash.Sum64(append([]byte(r.String()), []byte(groupKey)...))
}
// SetActiveAlerts updates the active alerts of the alerting rule.
func (r *AlertingRule) SetActiveAlerts(alerts map[uint64]*Alert) {
r.activeMtx.Lock()
defer r.activeMtx.Unlock()
r.active = alerts
}

View file

@ -73,6 +73,7 @@ type Group struct {
// Rule group evaluation iteration function,
// defaults to DefaultEvalIterationFunc.
evalIterationFunc GroupEvalIterationFunc
alertStore AlertStore
appOpts *storage.AppendOptions
}
@ -94,6 +95,7 @@ type GroupOptions struct {
QueryOffset *time.Duration
done chan struct{}
EvalIterationFunc GroupEvalIterationFunc
AlertStore AlertStore
}
// NewGroup makes a new Group with the given name, options, and rules.
@ -144,6 +146,7 @@ func NewGroup(o GroupOptions) *Group {
logger: opts.Logger.With("file", o.File, "group", o.Name),
metrics: metrics,
evalIterationFunc: evalIterationFunc,
alertStore: o.AlertStore,
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
}
}
@ -537,6 +540,17 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
if g.alertStore != nil && g.lastEvalTimestamp.IsZero() {
// Restore alerts when feature is enabled and it is the first evaluation for the group
if ar, ok := rule.(*AlertingRule); ok {
restoredAlerts, _ := g.alertStore.GetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name())))
if len(restoredAlerts) > 0 {
ar.SetActiveAlerts(restoredAlerts)
g.logger.Info("Restored alerts from store", "rule", ar.name, "alerts", len(restoredAlerts))
}
}
}
vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
if err != nil {
rule.SetHealth(HealthBad)
@ -1168,3 +1182,35 @@ func buildDependencyMap(rules []Rule) dependencyMap {
return dependencies
}
// AlertStore provides persistent storage of alert state.
type AlertStore interface {
// SetAlerts stores the provided list of alerts for a rule.
SetAlerts(key uint64, groupKey string, alerts []*Alert) error
// GetAlerts returns a list of alerts for each alerting rule,
// alerting rule is identified by a fingerprint of its config.
GetAlerts(key uint64) (map[uint64]*Alert, error)
}
// StoreKeepFiringForState is periodically invoked to store the state of alerting rules using 'keep_firing_for'.
func (g *Group) StoreKeepFiringForState() {
for _, rule := range g.rules {
ar, ok := rule.(*AlertingRule)
if !ok {
continue
}
if ar.KeepFiringFor() != 0 {
alertsToStore := make([]*Alert, 0)
ar.ForEachActiveAlert(func(alert *Alert) {
if !alert.KeepFiringSince.IsZero() {
alertsToStore = append(alertsToStore, alert)
}
})
groupKey := GroupKey(g.File(), g.Name())
err := g.alertStore.SetAlerts(ar.GetFingerprint(groupKey), groupKey, alertsToStore)
if err != nil {
g.logger.Error("Failed to store alerting rule state", "rule", ar.Name(), "err", err)
}
}
}
}

View file

@ -86,6 +86,11 @@ func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time.
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
g.setLastEvalTimestamp(evalTimestamp)
if g.alertStore != nil {
// feature enabled.
g.StoreKeepFiringForState()
}
}
// The Manager manages recording and alerting rules.
@ -123,6 +128,7 @@ type ManagerOptions struct {
ConcurrentEvalsEnabled bool
RuleConcurrencyController RuleConcurrencyController
RuleDependencyController RuleDependencyController
AlertStore AlertStore
// At present, manager only restores `for` state when manager is newly created which happens
// during restarts. This flag provides an option to restore the `for` state when new rule groups are
// added to an existing manager
@ -355,7 +361,6 @@ func (m *Manager) LoadGroups(
// Check dependencies between rules and store it on the Rule itself.
m.opts.RuleDependencyController.AnalyseRules(rules)
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name,
File: fn,
@ -367,6 +372,7 @@ func (m *Manager) LoadGroups(
QueryOffset: (*time.Duration)(rg.QueryOffset),
done: m.done,
EvalIterationFunc: groupEvalIterationFunc,
AlertStore: m.opts.AlertStore,
})
}
}

View file

@ -2551,6 +2551,196 @@ func TestLabels_FromMaps(t *testing.T) {
require.Equal(t, expected, mLabels, "unexpected labelset")
}
func TestKeepFiringForStateRestore(t *testing.T) {
testStorage := promqltest.LoadedStorage(t, `
load 5m
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 0 0 0 0 0 0 0
http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 100 0 0 0 0 0 0 0
http_requests_5xx{job="app-server", instance="2", group="canary", severity="overwrite-me"} 80 0 0 0 0 0 0 0
`)
testStoreFile := "testalertstore"
t.Cleanup(
func() {
testStorage.Close()
os.Remove(testStoreFile)
},
)
ng := testEngine(t)
alertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile, prometheus.NewRegistry())
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(ng, testStorage),
Appendable: testStorage,
Queryable: testStorage,
Context: context.Background(),
Logger: promslog.NewNopLogger(),
NotifyFunc: func(ctx context.Context, expr string, alerts ...*Alert) {},
OutageTolerance: 30 * time.Minute,
ForGracePeriod: 10 * time.Minute,
AlertStore: alertStore,
}
keepFiringForDuration := 30 * time.Minute
// Initial run before prometheus goes down.
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} > 0`)
require.NoError(t, err)
expr2, err := parser.ParseExpr(`http_requests_5xx{group="canary", job="app-server"} > 0`)
require.NoError(t, err)
rule := NewAlertingRule(
"HTTPRequestRateLow",
expr,
0,
keepFiringForDuration,
labels.FromStrings("severity", "critical"),
labels.FromStrings("annotation_test", "rule1"), labels.EmptyLabels(), "", true, nil,
)
keepFiringForDuration2 := 60 * time.Minute
rule2 := NewAlertingRule(
"HTTPRequestRateLow",
expr2,
0,
keepFiringForDuration2,
labels.FromStrings("severity", "critical"),
labels.FromStrings("annotation_test", "rule2"), labels.EmptyLabels(), "", true, nil,
)
group := NewGroup(GroupOptions{
Name: "default",
Interval: time.Second,
Rules: []Rule{rule, rule2},
ShouldRestore: true,
Opts: opts,
AlertStore: alertStore,
})
groups := make(map[string]*Group)
groups["default;"] = group
type testInput struct {
name string
restoreDuration time.Duration
initialRuns []time.Duration
alertsExpected int
}
tests := []testInput{
{
name: "normal restore - 3 alerts firing with keep_firing_for duration active",
restoreDuration: 30 * time.Minute,
initialRuns: []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 20 * time.Minute, 25 * time.Minute},
alertsExpected: 3,
},
{
name: "restore after rule 1 keep firing for duration is over - 1 alert with keep_firing_for duration active",
restoreDuration: keepFiringForDuration + 10*time.Minute,
initialRuns: []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 20 * time.Minute, 50 * time.Minute},
alertsExpected: 1,
},
{
name: "restore after keep firing for duration expires - 0 alerts active",
restoreDuration: 120 * time.Minute,
initialRuns: []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 20 * time.Minute, 110 * time.Minute},
alertsExpected: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
baseTime := time.Unix(0, 0)
for _, duration := range tt.initialRuns {
// Evaluating rule before restarting.
evalTime := baseTime.Add(duration)
group.Eval(opts.Context, evalTime)
group.setLastEvalTimestamp(evalTime)
// Manager will store alert state.
group.StoreKeepFiringForState()
}
exp := rule.ActiveAlerts()
exp2 := rule2.ActiveAlerts()
// Record alerts before restart.
expectedAlerts := [][]*Alert{exp, exp2}
// Prometheus goes down here. We create new rules and groups.
newRule := NewAlertingRule(
"HTTPRequestRateLow",
expr,
0,
keepFiringForDuration,
labels.FromStrings("severity", "critical"),
labels.FromStrings("annotation_test", "rule1"), labels.EmptyLabels(), "", false, nil,
)
newRule2 := NewAlertingRule(
"HTTPRequestRateLow",
expr2,
0,
keepFiringForDuration2,
labels.FromStrings("severity", "critical"),
labels.FromStrings("annotation_test", "rule2"), labels.EmptyLabels(), "", true, nil,
)
// Restart alert store.
newAlertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile, prometheus.NewRegistry())
newGroup := NewGroup(GroupOptions{
Name: "default",
Interval: time.Second,
Rules: []Rule{newRule, newRule2},
ShouldRestore: true,
Opts: opts,
AlertStore: newAlertStore,
})
newGroups := make(map[string]*Group)
newGroups["default;"] = newGroup
restoreTime := baseTime.Add(tt.restoreDuration)
// First eval after restart.
newGroup.Eval(context.TODO(), restoreTime)
got := newRule.ActiveAlerts()
got2 := newRule2.ActiveAlerts()
require.Equal(t, len(exp), len(got))
require.Equal(t, len(exp2), len(got2))
require.Equal(t, tt.alertsExpected, len(got)+len(got2))
results := [][]*Alert{got, got2}
for _, result := range results {
sort.Slice(result, func(i, j int) bool {
return labels.Compare(result[i].Labels, result[j].Labels) < 0
})
sortAlerts(result)
}
for _, alerts := range expectedAlerts {
sort.Slice(alerts, func(i, j int) bool {
return labels.Compare(alerts[i].Labels, alerts[j].Labels) < 0
})
sortAlerts(alerts)
}
for i, expected := range expectedAlerts {
got = results[i]
require.Equal(t, len(expected), len(got))
for j, alert := range expected {
diff := float64(alert.KeepFiringSince.Unix() - got[j].KeepFiringSince.Unix())
require.Equal(t, 0.0, math.Abs(diff), "'keep_firing_for' restored time is wrong")
diff = float64(alert.ActiveAt.Unix() - got[j].ActiveAt.Unix())
require.Equal(t, 0.0, math.Abs(diff), "'keep_firing_for' state restored activeAt time is wrong")
require.Equal(t, alert.Value, got[j].Value)
require.NotEmpty(t, alert.Labels)
require.Equal(t, alert.Labels.Get("instance"), got[j].Labels.Get("instance"))
require.NotEmpty(t, alert.Annotations)
require.Equal(t, alert.Annotations.Get("annotation_test"), got[j].Annotations.Get("annotation_test"))
}
}
})
}
}
func TestRuleDependencyController_AnalyseRules(t *testing.T) {
type expectedDependencies struct {
noDependentRules bool

142
rules/store.go Normal file
View file

@ -0,0 +1,142 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rules
import (
"encoding/json"
"log/slog"
"os"
"sync"
"github.com/prometheus/client_golang/prometheus"
)
// FileStore implements the AlertStore interface.
type FileStore struct {
alertsByRule map[uint64][]*Alert
logger *slog.Logger
// protects the `alertsByRule` map.
stateMtx sync.RWMutex
path string
storeInitErrors prometheus.Counter
alertStoreErrors *prometheus.CounterVec
}
type FileData struct {
Alerts map[uint64][]*Alert `json:"alerts"`
}
func NewFileStore(l *slog.Logger, storagePath string, registerer prometheus.Registerer) *FileStore {
s := &FileStore{
logger: l,
alertsByRule: make(map[uint64][]*Alert),
path: storagePath,
}
s.storeInitErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "alert_store_init_errors_total",
Help: "The total number of errors starting alert store.",
},
)
s.alertStoreErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "rule_group_alert_store_errors_total",
Help: "The total number of errors in alert store.",
},
[]string{"rule_group"},
)
s.initState(registerer)
return s
}
// initState reads the state from file storage into the alertsByRule map.
func (s *FileStore) initState(registerer prometheus.Registerer) {
if registerer != nil {
registerer.MustRegister(s.alertStoreErrors, s.storeInitErrors)
}
file, err := os.OpenFile(s.path, os.O_RDWR|os.O_CREATE, 0o644)
if err != nil {
s.logger.Error("Failed reading alerts state from file", "err", err)
s.storeInitErrors.Inc()
return
}
defer file.Close()
var data *FileData
err = json.NewDecoder(file).Decode(&data)
if err != nil {
s.logger.Error("Failed reading alerts state from file", "err", err)
s.storeInitErrors.Inc()
return
}
alertsByRule := make(map[uint64][]*Alert)
if data != nil && data.Alerts != nil {
alertsByRule = data.Alerts
}
s.alertsByRule = alertsByRule
}
// GetAlerts returns the stored alerts for an alerting rule
// Alert state is read from the in memory map which is populated during initialization.
func (s *FileStore) GetAlerts(key uint64) (map[uint64]*Alert, error) {
s.stateMtx.RLock()
defer s.stateMtx.RUnlock()
restoredAlerts, ok := s.alertsByRule[key]
if !ok {
return nil, nil
}
alerts := make(map[uint64]*Alert)
for _, alert := range restoredAlerts {
if alert == nil {
continue
}
h := alert.Labels.Hash()
alerts[h] = alert
}
return alerts, nil
}
// SetAlerts updates the stateByRule map and writes state to file storage.
func (s *FileStore) SetAlerts(key uint64, groupKey string, alerts []*Alert) error {
s.stateMtx.Lock()
defer s.stateMtx.Unlock()
if alerts == nil {
return nil
}
// Update in memory
s.alertsByRule[key] = alerts
// flush in memory state to file storage
file, err := os.Create(s.path)
if err != nil {
s.alertStoreErrors.WithLabelValues(groupKey).Inc()
return err
}
defer file.Close()
encoder := json.NewEncoder(file)
data := FileData{
Alerts: s.alertsByRule,
}
err = encoder.Encode(data)
if err != nil {
s.alertStoreErrors.WithLabelValues(groupKey).Inc()
return err
}
return nil
}

69
rules/store_test.go Normal file
View file

@ -0,0 +1,69 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rules
import (
"os"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
)
func TestAlertStore(t *testing.T) {
alertStore := NewFileStore(promslog.NewNopLogger(), "alertstoretest", prometheus.NewRegistry())
t.Cleanup(func() {
os.Remove("alertstoretest")
})
alertsByRule := make(map[uint64][]*Alert)
baseTime := time.Now()
al1 := &Alert{State: StateFiring, Labels: labels.FromStrings("a1", "1"), Annotations: labels.FromStrings("annotation1", "a1"), ActiveAt: baseTime, KeepFiringSince: baseTime}
al2 := &Alert{State: StateFiring, Labels: labels.FromStrings("a2", "2"), Annotations: labels.FromStrings("annotation2", "a2"), ActiveAt: baseTime, KeepFiringSince: baseTime}
alertsByRule[1] = []*Alert{al1, al2}
alertsByRule[2] = []*Alert{al2}
alertsByRule[3] = []*Alert{al1}
alertsByRule[4] = []*Alert{}
for key, alerts := range alertsByRule {
sortAlerts(alerts)
err := alertStore.SetAlerts(key, "test/test1", alerts)
require.NoError(t, err)
got, err := alertStore.GetAlerts(key)
require.NoError(t, err)
require.Equal(t, len(alerts), len(got))
result := make([]*Alert, 0, len(got))
for _, value := range got {
result = append(result, value)
}
sortAlerts(result)
j := 0
for _, al := range result {
require.Equal(t, alerts[j].State, al.State)
require.Equal(t, alerts[j].Labels, al.Labels)
require.Equal(t, alerts[j].Annotations, al.Annotations)
require.Equal(t, alerts[j].ActiveAt, al.ActiveAt)
require.Equal(t, alerts[j].KeepFiringSince, al.KeepFiringSince)
j++
}
}
}