mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 21:54:10 -08:00
Store alert state using file storage
Signed-off-by: Mustafain Ali Khan <mustalik@amazon.com>
This commit is contained in:
parent
5b5fee08af
commit
1ae0a35e33
|
@ -203,11 +203,13 @@ type flagConfig struct {
|
||||||
featureList []string
|
featureList []string
|
||||||
// These options are extracted from featureList
|
// These options are extracted from featureList
|
||||||
// for ease of use.
|
// for ease of use.
|
||||||
enablePerStepStats bool
|
enablePerStepStats bool
|
||||||
enableConcurrentRuleEval bool
|
enableConcurrentRuleEval bool
|
||||||
|
enableAlertStatePersistence bool
|
||||||
|
|
||||||
prometheusURL string
|
prometheusURL string
|
||||||
corsRegexString string
|
corsRegexString string
|
||||||
|
alertStoragePath string
|
||||||
|
|
||||||
promqlEnableDelayedNameRemoval bool
|
promqlEnableDelayedNameRemoval bool
|
||||||
|
|
||||||
|
@ -244,6 +246,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
|
||||||
case "concurrent-rule-eval":
|
case "concurrent-rule-eval":
|
||||||
c.enableConcurrentRuleEval = true
|
c.enableConcurrentRuleEval = true
|
||||||
logger.Info("Experimental concurrent rule evaluation enabled.")
|
logger.Info("Experimental concurrent rule evaluation enabled.")
|
||||||
|
case "alert-state-persistence":
|
||||||
|
c.enableAlertStatePersistence = true
|
||||||
|
logger.Info("Experimental alert state persistence storage enabled for alerting rules using keep_firing_for.")
|
||||||
case "promql-experimental-functions":
|
case "promql-experimental-functions":
|
||||||
parser.EnableExperimentalFunctions = true
|
parser.EnableExperimentalFunctions = true
|
||||||
logger.Info("Experimental PromQL functions enabled.")
|
logger.Info("Experimental PromQL functions enabled.")
|
||||||
|
@ -484,6 +489,8 @@ func main() {
|
||||||
|
|
||||||
serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
|
serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
|
||||||
Default("1m").SetValue(&cfg.resendDelay)
|
Default("1m").SetValue(&cfg.resendDelay)
|
||||||
|
serverOnlyFlag(a, "rules.alert.state-storage-path", "Path for alert state storage.").
|
||||||
|
Default("data/alerts").StringVar(&cfg.alertStoragePath)
|
||||||
|
|
||||||
serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules that can run concurrently. When set, \"query.max-concurrency\" may need to be adjusted accordingly.").
|
serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules that can run concurrently. When set, \"query.max-concurrency\" may need to be adjusted accordingly.").
|
||||||
Default("4").Int64Var(&cfg.maxConcurrentEvals)
|
Default("4").Int64Var(&cfg.maxConcurrentEvals)
|
||||||
|
@ -800,6 +807,10 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
queryEngine = promql.NewEngine(opts)
|
queryEngine = promql.NewEngine(opts)
|
||||||
|
var alertStore rules.AlertStore
|
||||||
|
if cfg.enableAlertStatePersistence {
|
||||||
|
alertStore = rules.NewFileStore(logger.With("component", "alertStore"), cfg.alertStoragePath)
|
||||||
|
}
|
||||||
|
|
||||||
ruleManager = rules.NewManager(&rules.ManagerOptions{
|
ruleManager = rules.NewManager(&rules.ManagerOptions{
|
||||||
Appendable: fanoutStorage,
|
Appendable: fanoutStorage,
|
||||||
|
@ -818,6 +829,8 @@ func main() {
|
||||||
DefaultRuleQueryOffset: func() time.Duration {
|
DefaultRuleQueryOffset: func() time.Duration {
|
||||||
return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset)
|
return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset)
|
||||||
},
|
},
|
||||||
|
AlertStore: alertStore,
|
||||||
|
AlertStoreFunc: rules.DefaultAlertStoreFunc,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,7 @@ The Prometheus monitoring server
|
||||||
| <code class="text-nowrap">--rules.alert.for-outage-tolerance</code> | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` |
|
| <code class="text-nowrap">--rules.alert.for-outage-tolerance</code> | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` |
|
||||||
| <code class="text-nowrap">--rules.alert.for-grace-period</code> | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` |
|
| <code class="text-nowrap">--rules.alert.for-grace-period</code> | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` |
|
||||||
| <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
|
| <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
|
||||||
|
| <code class="text-nowrap">--rules.alert.state-storage-path</code> | Path for alert state storage. Use with server mode only. | `data/alerts` |
|
||||||
| <code class="text-nowrap">--rules.max-concurrent-evals</code> | Global concurrency limit for independent rules that can run concurrently. When set, "query.max-concurrency" may need to be adjusted accordingly. Use with server mode only. | `4` |
|
| <code class="text-nowrap">--rules.max-concurrent-evals</code> | Global concurrency limit for independent rules that can run concurrently. When set, "query.max-concurrency" may need to be adjusted accordingly. Use with server mode only. | `4` |
|
||||||
| <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
|
| <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
|
||||||
| <code class="text-nowrap">--alertmanager.drain-notification-queue-on-shutdown</code> | Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down. Use with server mode only. | `true` |
|
| <code class="text-nowrap">--alertmanager.drain-notification-queue-on-shutdown</code> | Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down. Use with server mode only. | `true` |
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/cespare/xxhash/v2"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
|
@ -41,7 +42,6 @@ const (
|
||||||
alertMetricName = "ALERTS"
|
alertMetricName = "ALERTS"
|
||||||
// AlertForStateMetricName is the metric name for 'for' state of alert.
|
// AlertForStateMetricName is the metric name for 'for' state of alert.
|
||||||
alertForStateMetricName = "ALERTS_FOR_STATE"
|
alertForStateMetricName = "ALERTS_FOR_STATE"
|
||||||
|
|
||||||
// AlertStateLabel is the label name indicating the state of an alert.
|
// AlertStateLabel is the label name indicating the state of an alert.
|
||||||
alertStateLabel = "alertstate"
|
alertStateLabel = "alertstate"
|
||||||
)
|
)
|
||||||
|
@ -594,3 +594,16 @@ func (r *AlertingRule) String() string {
|
||||||
|
|
||||||
return string(byt)
|
return string(byt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetFingerprint returns a hash to uniquely identify an alerting rule,
|
||||||
|
// using a combination of rule config and the groupKey.
|
||||||
|
func (r *AlertingRule) GetFingerprint(groupKey string) uint64 {
|
||||||
|
return xxhash.Sum64(append([]byte(r.String()), []byte(groupKey)...))
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetActiveAlerts updates the active alerts of the alerting rule.
|
||||||
|
func (r *AlertingRule) SetActiveAlerts(alerts map[uint64]*Alert) {
|
||||||
|
r.activeMtx.Lock()
|
||||||
|
defer r.activeMtx.Unlock()
|
||||||
|
r.active = alerts
|
||||||
|
}
|
||||||
|
|
|
@ -77,6 +77,8 @@ type Group struct {
|
||||||
// concurrencyController controls the rules evaluation concurrency.
|
// concurrencyController controls the rules evaluation concurrency.
|
||||||
concurrencyController RuleConcurrencyController
|
concurrencyController RuleConcurrencyController
|
||||||
appOpts *storage.AppendOptions
|
appOpts *storage.AppendOptions
|
||||||
|
alertStoreFunc AlertStateStoreFunc
|
||||||
|
alertStore AlertStore
|
||||||
}
|
}
|
||||||
|
|
||||||
// GroupEvalIterationFunc is used to implement and extend rule group
|
// GroupEvalIterationFunc is used to implement and extend rule group
|
||||||
|
@ -96,6 +98,8 @@ type GroupOptions struct {
|
||||||
QueryOffset *time.Duration
|
QueryOffset *time.Duration
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
EvalIterationFunc GroupEvalIterationFunc
|
EvalIterationFunc GroupEvalIterationFunc
|
||||||
|
AlertStoreFunc AlertStateStoreFunc
|
||||||
|
AlertStore AlertStore
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGroup makes a new Group with the given name, options, and rules.
|
// NewGroup makes a new Group with the given name, options, and rules.
|
||||||
|
@ -126,6 +130,12 @@ func NewGroup(o GroupOptions) *Group {
|
||||||
evalIterationFunc = DefaultEvalIterationFunc
|
evalIterationFunc = DefaultEvalIterationFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
alertStoreFunc := o.AlertStoreFunc
|
||||||
|
//var alertStore *AlertStore
|
||||||
|
if alertStoreFunc == nil {
|
||||||
|
alertStoreFunc = DefaultAlertStoreFunc
|
||||||
|
}
|
||||||
|
|
||||||
concurrencyController := opts.RuleConcurrencyController
|
concurrencyController := opts.RuleConcurrencyController
|
||||||
if concurrencyController == nil {
|
if concurrencyController == nil {
|
||||||
concurrencyController = sequentialRuleEvalController{}
|
concurrencyController = sequentialRuleEvalController{}
|
||||||
|
@ -151,6 +161,8 @@ func NewGroup(o GroupOptions) *Group {
|
||||||
logger: opts.Logger.With("file", o.File, "group", o.Name),
|
logger: opts.Logger.With("file", o.File, "group", o.Name),
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
evalIterationFunc: evalIterationFunc,
|
evalIterationFunc: evalIterationFunc,
|
||||||
|
alertStoreFunc: alertStoreFunc,
|
||||||
|
alertStore: o.AlertStore,
|
||||||
concurrencyController: concurrencyController,
|
concurrencyController: concurrencyController,
|
||||||
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
|
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
|
||||||
}
|
}
|
||||||
|
@ -537,6 +549,17 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
|
|
||||||
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
|
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
|
||||||
|
|
||||||
|
if g.alertStore != nil && g.lastEvalTimestamp.IsZero() {
|
||||||
|
// Restore alerts when feature is enabled and it is the first evaluation for the group
|
||||||
|
if ar, ok := rule.(*AlertingRule); ok {
|
||||||
|
restoredAlerts, _ := g.alertStore.GetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name())))
|
||||||
|
if restoredAlerts != nil && len(restoredAlerts) > 0 {
|
||||||
|
ar.SetActiveAlerts(restoredAlerts)
|
||||||
|
logger.Info("Restored alerts from store", "rule", ar.name, "alerts", len(restoredAlerts))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
|
vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rule.SetHealth(HealthBad)
|
rule.SetHealth(HealthBad)
|
||||||
|
|
|
@ -26,7 +26,6 @@ import (
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/promslog"
|
"github.com/prometheus/common/promslog"
|
||||||
"golang.org/x/sync/semaphore"
|
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/model/rulefmt"
|
"github.com/prometheus/prometheus/model/rulefmt"
|
||||||
|
@ -35,6 +34,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/promql/parser"
|
"github.com/prometheus/prometheus/promql/parser"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/util/strutil"
|
"github.com/prometheus/prometheus/util/strutil"
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueryFunc processes PromQL queries.
|
// QueryFunc processes PromQL queries.
|
||||||
|
@ -86,6 +86,37 @@ func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time.
|
||||||
g.setEvaluationTime(timeSinceStart)
|
g.setEvaluationTime(timeSinceStart)
|
||||||
g.setLastEvaluation(start)
|
g.setLastEvaluation(start)
|
||||||
g.setLastEvalTimestamp(evalTimestamp)
|
g.setLastEvalTimestamp(evalTimestamp)
|
||||||
|
|
||||||
|
if g.alertStore != nil {
|
||||||
|
//feature enabled
|
||||||
|
go func() {
|
||||||
|
g.alertStoreFunc(g)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultAlertStoreFunc is the default implementation of
|
||||||
|
// AlertStateStoreFunc that is periodically invoked to store the state
|
||||||
|
// of alerting rules in a group at a given point in time.
|
||||||
|
func DefaultAlertStoreFunc(g *Group) {
|
||||||
|
for _, rule := range g.rules {
|
||||||
|
ar, ok := rule.(*AlertingRule)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if ar.KeepFiringFor() != 0 {
|
||||||
|
alertsToStore := make([]*Alert, 0)
|
||||||
|
ar.ForEachActiveAlert(func(alert *Alert) {
|
||||||
|
if !alert.KeepFiringSince.IsZero() {
|
||||||
|
alertsToStore = append(alertsToStore, alert)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
err := g.alertStore.SetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name())), alertsToStore)
|
||||||
|
if err != nil {
|
||||||
|
g.logger.Error("Failed to store alerting rule state", "rule", ar.Name(), "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// The Manager manages recording and alerting rules.
|
// The Manager manages recording and alerting rules.
|
||||||
|
@ -122,8 +153,9 @@ type ManagerOptions struct {
|
||||||
ConcurrentEvalsEnabled bool
|
ConcurrentEvalsEnabled bool
|
||||||
RuleConcurrencyController RuleConcurrencyController
|
RuleConcurrencyController RuleConcurrencyController
|
||||||
RuleDependencyController RuleDependencyController
|
RuleDependencyController RuleDependencyController
|
||||||
|
Metrics *Metrics
|
||||||
Metrics *Metrics
|
AlertStore AlertStore
|
||||||
|
AlertStoreFunc AlertStateStoreFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewManager returns an implementation of Manager, ready to be started
|
// NewManager returns an implementation of Manager, ready to be started
|
||||||
|
@ -193,6 +225,8 @@ func (m *Manager) Stop() {
|
||||||
m.logger.Info("Rule manager stopped")
|
m.logger.Info("Rule manager stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type AlertStateStoreFunc func(g *Group)
|
||||||
|
|
||||||
// Update the rule manager's state as the config requires. If
|
// Update the rule manager's state as the config requires. If
|
||||||
// loading the new rules failed the old rule set is restored.
|
// loading the new rules failed the old rule set is restored.
|
||||||
// This method will no-op in case the manager is already stopped.
|
// This method will no-op in case the manager is already stopped.
|
||||||
|
@ -343,7 +377,6 @@ func (m *Manager) LoadGroups(
|
||||||
|
|
||||||
// Check dependencies between rules and store it on the Rule itself.
|
// Check dependencies between rules and store it on the Rule itself.
|
||||||
m.opts.RuleDependencyController.AnalyseRules(rules)
|
m.opts.RuleDependencyController.AnalyseRules(rules)
|
||||||
|
|
||||||
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
|
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
|
||||||
Name: rg.Name,
|
Name: rg.Name,
|
||||||
File: fn,
|
File: fn,
|
||||||
|
@ -355,6 +388,8 @@ func (m *Manager) LoadGroups(
|
||||||
QueryOffset: (*time.Duration)(rg.QueryOffset),
|
QueryOffset: (*time.Duration)(rg.QueryOffset),
|
||||||
done: m.done,
|
done: m.done,
|
||||||
EvalIterationFunc: groupEvalIterationFunc,
|
EvalIterationFunc: groupEvalIterationFunc,
|
||||||
|
AlertStoreFunc: m.opts.AlertStoreFunc,
|
||||||
|
AlertStore: m.opts.AlertStore,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2266,6 +2266,188 @@ func TestLabels_FromMaps(t *testing.T) {
|
||||||
require.Equal(t, expected, mLabels, "unexpected labelset")
|
require.Equal(t, expected, mLabels, "unexpected labelset")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestKeepFiringForStateRestore(t *testing.T) {
|
||||||
|
testStorage := promqltest.LoadedStorage(t, `
|
||||||
|
load 5m
|
||||||
|
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 0 0 0 0 0 0 0
|
||||||
|
http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 100 0 0 0 0 0 0 0
|
||||||
|
http_requests_5xx{job="app-server", instance="2", group="canary", severity="overwrite-me"} 80 0 0 0 0 0 0 0
|
||||||
|
`)
|
||||||
|
|
||||||
|
testStoreFile := "testalertstore"
|
||||||
|
|
||||||
|
t.Cleanup(
|
||||||
|
func() {
|
||||||
|
testStorage.Close()
|
||||||
|
os.Remove(testStoreFile)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
alertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile)
|
||||||
|
ng := testEngine(t)
|
||||||
|
opts := &ManagerOptions{
|
||||||
|
QueryFunc: EngineQueryFunc(ng, testStorage),
|
||||||
|
Appendable: testStorage,
|
||||||
|
Queryable: testStorage,
|
||||||
|
Context: context.Background(),
|
||||||
|
Logger: promslog.NewNopLogger(),
|
||||||
|
NotifyFunc: func(ctx context.Context, expr string, alerts ...*Alert) {},
|
||||||
|
OutageTolerance: 30 * time.Minute,
|
||||||
|
ForGracePeriod: 10 * time.Minute,
|
||||||
|
AlertStore: alertStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
keepFiringForDuration := 30 * time.Minute
|
||||||
|
// Initial run before prometheus goes down.
|
||||||
|
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} > 0`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expr2, err := parser.ParseExpr(`http_requests_5xx{group="canary", job="app-server"} > 0`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
rule := NewAlertingRule(
|
||||||
|
"HTTPRequestRateLow",
|
||||||
|
expr,
|
||||||
|
0,
|
||||||
|
keepFiringForDuration,
|
||||||
|
labels.FromStrings("severity", "critical"),
|
||||||
|
labels.FromStrings("annotation1", "rule1"), labels.EmptyLabels(), "", true, nil,
|
||||||
|
)
|
||||||
|
keepFiringForDuration2 := 60 * time.Minute
|
||||||
|
rule2 := NewAlertingRule(
|
||||||
|
"HTTPRequestRateLow",
|
||||||
|
expr2,
|
||||||
|
0,
|
||||||
|
keepFiringForDuration2,
|
||||||
|
labels.FromStrings("severity", "critical"),
|
||||||
|
labels.FromStrings("annotation2", "rule2"), labels.EmptyLabels(), "", true, nil,
|
||||||
|
)
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "default",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule, rule2},
|
||||||
|
ShouldRestore: true,
|
||||||
|
Opts: opts,
|
||||||
|
AlertStoreFunc: DefaultAlertStoreFunc,
|
||||||
|
AlertStore: alertStore,
|
||||||
|
})
|
||||||
|
|
||||||
|
groups := make(map[string]*Group)
|
||||||
|
groups["default;"] = group
|
||||||
|
|
||||||
|
type testInput struct {
|
||||||
|
name string
|
||||||
|
restoreDuration time.Duration
|
||||||
|
initialRuns []time.Duration
|
||||||
|
alertsExpected int
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []testInput{
|
||||||
|
{
|
||||||
|
name: "normal restore - 3 alerts firing with keep_firing_for duration active",
|
||||||
|
restoreDuration: 30 * time.Minute,
|
||||||
|
initialRuns: []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 20 * time.Minute, 25 * time.Minute},
|
||||||
|
alertsExpected: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "restore after rule 1 keep firing for duration is over - 1 alert with keep_firing_for duration active",
|
||||||
|
restoreDuration: keepFiringForDuration + 10*time.Minute,
|
||||||
|
initialRuns: []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 20 * time.Minute, 50 * time.Minute},
|
||||||
|
alertsExpected: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "restore after keep firing for duration expires - 0 alerts active",
|
||||||
|
restoreDuration: 120 * time.Minute,
|
||||||
|
initialRuns: []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 20 * time.Minute, 110 * time.Minute},
|
||||||
|
alertsExpected: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
baseTime := time.Unix(0, 0)
|
||||||
|
for _, duration := range tt.initialRuns {
|
||||||
|
// Evaluating rule before restarting.
|
||||||
|
evalTime := baseTime.Add(duration)
|
||||||
|
group.Eval(opts.Context, evalTime)
|
||||||
|
group.setLastEvalTimestamp(evalTime)
|
||||||
|
// Manager will store alert state.
|
||||||
|
DefaultAlertStoreFunc(group)
|
||||||
|
}
|
||||||
|
|
||||||
|
exp := rule.ActiveAlerts()
|
||||||
|
exp2 := rule2.ActiveAlerts()
|
||||||
|
// Record alerts before restart.
|
||||||
|
expectedAlerts := [][]*Alert{exp, exp2}
|
||||||
|
|
||||||
|
// Prometheus goes down here. We create new rules and groups.
|
||||||
|
newRule := NewAlertingRule(
|
||||||
|
"HTTPRequestRateLow",
|
||||||
|
expr,
|
||||||
|
0,
|
||||||
|
keepFiringForDuration,
|
||||||
|
labels.FromStrings("severity", "critical"),
|
||||||
|
labels.FromStrings("annotation1", "rule1"), labels.EmptyLabels(), "", false, nil,
|
||||||
|
)
|
||||||
|
newRule2 := NewAlertingRule(
|
||||||
|
"HTTPRequestRateLow",
|
||||||
|
expr2,
|
||||||
|
0,
|
||||||
|
keepFiringForDuration2,
|
||||||
|
labels.FromStrings("severity", "critical"),
|
||||||
|
labels.FromStrings("annotation2", "rule2"), labels.EmptyLabels(), "", true, nil,
|
||||||
|
)
|
||||||
|
// Restart alert store
|
||||||
|
newAlertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile)
|
||||||
|
|
||||||
|
newGroup := NewGroup(GroupOptions{
|
||||||
|
Name: "default",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{newRule, newRule2},
|
||||||
|
ShouldRestore: true,
|
||||||
|
Opts: opts,
|
||||||
|
AlertStore: newAlertStore,
|
||||||
|
AlertStoreFunc: DefaultAlertStoreFunc,
|
||||||
|
})
|
||||||
|
|
||||||
|
newGroups := make(map[string]*Group)
|
||||||
|
newGroups["default;"] = newGroup
|
||||||
|
|
||||||
|
restoreTime := baseTime.Add(tt.restoreDuration)
|
||||||
|
|
||||||
|
// First eval after restart.
|
||||||
|
newGroup.Eval(context.TODO(), restoreTime)
|
||||||
|
|
||||||
|
got := newRule.ActiveAlerts()
|
||||||
|
got2 := newRule2.ActiveAlerts()
|
||||||
|
require.Equal(t, len(exp), len(got))
|
||||||
|
require.Equal(t, len(exp2), len(got2))
|
||||||
|
require.Equal(t, tt.alertsExpected, len(got)+len(got2))
|
||||||
|
|
||||||
|
results := [][]*Alert{got, got2}
|
||||||
|
|
||||||
|
for i, result := range results {
|
||||||
|
sort.Slice(result, func(i, j int) bool {
|
||||||
|
return labels.Compare(got[i].Labels, got[j].Labels) < 0
|
||||||
|
})
|
||||||
|
sortAlerts(result)
|
||||||
|
sortAlerts(expectedAlerts[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, expected := range expectedAlerts {
|
||||||
|
got = results[i]
|
||||||
|
require.Equal(t, len(expected), len(got))
|
||||||
|
for j, alert := range expected {
|
||||||
|
require.Equal(t, alert.Labels, got[j].Labels)
|
||||||
|
require.Equal(t, alert.Annotations, got[j].Annotations)
|
||||||
|
require.Equal(t, alert.ActiveAt, got[j].ActiveAt)
|
||||||
|
require.Equal(t, alert.KeepFiringSince, got[j].KeepFiringSince)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRuleDependencyController_AnalyseRules(t *testing.T) {
|
func TestRuleDependencyController_AnalyseRules(t *testing.T) {
|
||||||
type expectedDependencies struct {
|
type expectedDependencies struct {
|
||||||
noDependentRules bool
|
noDependentRules bool
|
||||||
|
|
101
rules/store.go
Normal file
101
rules/store.go
Normal file
|
@ -0,0 +1,101 @@
|
||||||
|
package rules
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AlertStore provides persistent storage of alert state.
|
||||||
|
type AlertStore interface {
|
||||||
|
// SetAlerts stores the provided list of alerts for a rule.
|
||||||
|
SetAlerts(key uint64, alerts []*Alert) error
|
||||||
|
// GetAlerts returns a list of alerts for each alerting rule,
|
||||||
|
// alerting rule is identified by a fingerprint of its config.
|
||||||
|
GetAlerts(key uint64) (map[uint64]*Alert, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FileStore implements the AlertStore interface.
|
||||||
|
type FileStore struct {
|
||||||
|
logger *slog.Logger
|
||||||
|
alertsByRule map[uint64][]*Alert
|
||||||
|
//protects the `alertsByRule` map
|
||||||
|
stateMtx sync.RWMutex
|
||||||
|
path string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFileStore(l *slog.Logger, storagePath string) *FileStore {
|
||||||
|
s := &FileStore{
|
||||||
|
logger: l,
|
||||||
|
alertsByRule: make(map[uint64][]*Alert),
|
||||||
|
path: storagePath,
|
||||||
|
}
|
||||||
|
s.initState()
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// initState reads the state from file storage into the alertsByRule map
|
||||||
|
func (s *FileStore) initState() {
|
||||||
|
file, err := os.OpenFile(s.path, os.O_RDWR|os.O_CREATE, 0o666)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Error("Failed reading alerts state from file", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
var alertsByRule map[uint64][]*Alert
|
||||||
|
err = json.NewDecoder(file).Decode(&alertsByRule)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Error("Failed reading alerts state from file", "err", err)
|
||||||
|
}
|
||||||
|
if alertsByRule == nil {
|
||||||
|
alertsByRule = make(map[uint64][]*Alert)
|
||||||
|
}
|
||||||
|
s.alertsByRule = alertsByRule
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAlerts returns the stored alerts for an alerting rule
|
||||||
|
// Alert state is read from the in memory map which is populated during initialization
|
||||||
|
func (s *FileStore) GetAlerts(key uint64) (map[uint64]*Alert, error) {
|
||||||
|
s.stateMtx.RLock()
|
||||||
|
defer s.stateMtx.RUnlock()
|
||||||
|
|
||||||
|
restoredAlerts, ok := s.alertsByRule[key]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
alerts := make(map[uint64]*Alert)
|
||||||
|
for _, alert := range restoredAlerts {
|
||||||
|
if alert == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
h := alert.Labels.Hash()
|
||||||
|
alerts[h] = alert
|
||||||
|
}
|
||||||
|
return alerts, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetAlerts updates the stateByRule map and writes state to file storage
|
||||||
|
func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error {
|
||||||
|
s.stateMtx.Lock()
|
||||||
|
defer s.stateMtx.Unlock()
|
||||||
|
|
||||||
|
// Update in memory
|
||||||
|
if alerts != nil {
|
||||||
|
s.alertsByRule[key] = alerts
|
||||||
|
}
|
||||||
|
// flush in memory state to file storage
|
||||||
|
file, err := os.Create(s.path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
encoder := json.NewEncoder(file)
|
||||||
|
err = encoder.Encode(s.alertsByRule)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
46
rules/store_test.go
Normal file
46
rules/store_test.go
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
package rules
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/promslog"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAlertStore(t *testing.T) {
|
||||||
|
alertStore := NewFileStore(promslog.NewNopLogger(), "alertstoretest")
|
||||||
|
t.Cleanup(func() {
|
||||||
|
os.Remove("alertstoretest")
|
||||||
|
})
|
||||||
|
|
||||||
|
alertsByRule := make(map[uint64][]*Alert)
|
||||||
|
baseTime := time.Now()
|
||||||
|
al1 := &Alert{State: StateFiring, Labels: labels.FromStrings("a1", "1"), Annotations: labels.FromStrings("annotation1", "a1"), ActiveAt: baseTime, KeepFiringSince: baseTime}
|
||||||
|
al2 := &Alert{State: StateFiring, Labels: labels.FromStrings("a2", "2"), Annotations: labels.FromStrings("annotation2", "a2"), ActiveAt: baseTime, KeepFiringSince: baseTime}
|
||||||
|
|
||||||
|
alertsByRule[1] = []*Alert{al1, al2}
|
||||||
|
alertsByRule[2] = []*Alert{al2}
|
||||||
|
alertsByRule[3] = []*Alert{al1}
|
||||||
|
alertsByRule[4] = []*Alert{}
|
||||||
|
|
||||||
|
for key, alerts := range alertsByRule {
|
||||||
|
err := alertStore.SetAlerts(key, alerts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
got, err := alertStore.GetAlerts(key)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, len(alerts), len(got))
|
||||||
|
j := 0
|
||||||
|
for _, al := range got {
|
||||||
|
require.Equal(t, alerts[j].State, al.State)
|
||||||
|
require.Equal(t, alerts[j].Labels, al.Labels)
|
||||||
|
require.Equal(t, alerts[j].Annotations, al.Annotations)
|
||||||
|
require.Equal(t, alerts[j].ActiveAt, al.ActiveAt)
|
||||||
|
require.Equal(t, alerts[j].KeepFiringSince, al.KeepFiringSince)
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue