From f1db699dffcdba6b27d81f9089d627635b888015 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 2 Aug 2018 15:48:24 +0530 Subject: [PATCH] Persist alert 'for' state across restarts (#4061) Signed-off-by: Ganesh Vernekar --- cmd/prometheus/main.go | 25 ++- rules/alerting.go | 56 ++++++- rules/alerting_test.go | 2 +- rules/manager.go | 180 +++++++++++++++++++-- rules/manager_test.go | 348 ++++++++++++++++++++++++++++++++++++++++- web/api/v1/api_test.go | 4 +- 6 files changed, 582 insertions(+), 33 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 5a44e21f7..fe3b78f0c 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -86,6 +86,8 @@ func main() { localStoragePath string notifier notifier.Options notifierTimeout model.Duration + forGracePeriod model.Duration + outageTolerance model.Duration web web.Options tsdb tsdb.Options lookbackDelta model.Duration @@ -164,6 +166,12 @@ func main() { a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). Default("1m").PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) + a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring 'for' state of alert."). + Default("1h").SetValue(&cfg.outageTolerance) + + a.Flag("rules.alert.for-grace-period", "Minimum duration between alert and restored 'for' state. This is maintained only for alerts with configured 'for' time greater than grace period."). + Default("10m").SetValue(&cfg.forGracePeriod) + a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). Default("10000").IntVar(&cfg.notifier.QueueCapacity) @@ -252,13 +260,16 @@ func main() { ) ruleManager = rules.NewManager(&rules.ManagerOptions{ - Appendable: fanoutStorage, - QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), - NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), - Context: ctxRule, - ExternalURL: cfg.web.ExternalURL, - Registerer: prometheus.DefaultRegisterer, - Logger: log.With(logger, "component", "rule manager"), + Appendable: fanoutStorage, + TSDB: localStorage, + QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), + NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), + Context: ctxRule, + ExternalURL: cfg.web.ExternalURL, + Registerer: prometheus.DefaultRegisterer, + Logger: log.With(logger, "component", "rule manager"), + OutageTolerance: time.Duration(cfg.outageTolerance), + ForGracePeriod: time.Duration(cfg.forGracePeriod), }) ) diff --git a/rules/alerting.go b/rules/alerting.go index 015ae3a91..a702126ff 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -39,6 +39,8 @@ import ( const ( // AlertMetricName is the metric name for synthetic alert timeseries. alertMetricName = "ALERTS" + // AlertForStateMetricName is the metric name for 'for' state of alert. + alertForStateMetricName = "ALERTS_FOR_STATE" // AlertNameLabel is the label name indicating the name of an alert. alertNameLabel = "alertname" @@ -103,6 +105,9 @@ type AlertingRule struct { annotations labels.Labels // Time in seconds taken to evaluate rule. evaluationDuration time.Duration + // true if old state has been restored. We start persisting samples for ALERT_FOR_STATE + // only after the restoration. + restored bool // Protects the below. mtx sync.Mutex @@ -114,7 +119,7 @@ type AlertingRule struct { } // NewAlertingRule constructs a new AlertingRule. -func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, anns labels.Labels, logger log.Logger) *AlertingRule { +func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, anns labels.Labels, restored bool, logger log.Logger) *AlertingRule { return &AlertingRule{ name: name, vector: vec, @@ -123,6 +128,7 @@ func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, ann annotations: anns, active: map[uint64]*Alert{}, logger: logger, + restored: restored, } } @@ -173,6 +179,24 @@ func (r *AlertingRule) sample(alert *Alert, ts time.Time) promql.Sample { return s } +// forStateSample returns the sample for ALERTS_FOR_STATE. +func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) promql.Sample { + lb := labels.NewBuilder(r.labels) + + for _, l := range alert.Labels { + lb.Set(l.Name, l.Value) + } + + lb.Set(labels.MetricName, alertForStateMetricName) + lb.Set(labels.AlertName, r.name) + + s := promql.Sample{ + Metric: lb.Labels(), + Point: promql.Point{T: timestamp.FromTime(ts), V: v}, + } + return s +} + // SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation. func (r *AlertingRule) SetEvaluationDuration(dur time.Duration) { r.mtx.Lock() @@ -187,6 +211,11 @@ func (r *AlertingRule) GetEvaluationDuration() time.Duration { return r.evaluationDuration } +// SetRestored updates the restoration state of the alerting rule. +func (r *AlertingRule) SetRestored(restored bool) { + r.restored = restored +} + // resolvedRetention is the duration for which a resolved alert instance // is kept in memory state and consequentally repeatedly sent to the AlertManager. const resolvedRetention = 15 * time.Minute @@ -206,6 +235,7 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, // or update the expression value for existing elements. resultFPs := map[uint64]struct{}{} + var vec promql.Vector for _, smpl := range res { // Provide the alert information to the template. l := make(map[string]string, len(smpl.Metric)) @@ -274,7 +304,6 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, } } - var vec promql.Vector // Check if any pending alerts should be removed or fire now. Write out alert timeseries. for fp, a := range r.active { if _, ok := resultFPs[fp]; !ok { @@ -295,7 +324,10 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, a.FiredAt = ts } - vec = append(vec, r.sample(a, ts)) + if r.restored { + vec = append(vec, r.sample(a, ts)) + vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix()))) + } } return vec, nil @@ -342,6 +374,19 @@ func (r *AlertingRule) currentAlerts() []*Alert { return alerts } +// ForEachActiveAlert runs the given function on each alert. +// This should be used when you want to use the actual alerts from the AlertingRule +// and not on its copy. +// If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'. +func (r *AlertingRule) ForEachActiveAlert(f func(*Alert)) { + r.mtx.Lock() + defer r.mtx.Unlock() + + for _, a := range r.active { + f(a) + } +} + func (r *AlertingRule) String() string { ar := rulefmt.Rule{ Alert: r.name, @@ -392,3 +437,8 @@ func (r *AlertingRule) HTMLSnippet(pathPrefix string) html_template.HTML { } return html_template.HTML(byt) } + +// HoldDuration returns the holdDuration of the alerting rule. +func (r *AlertingRule) HoldDuration() time.Duration { + return r.holdDuration +} diff --git a/rules/alerting_test.go b/rules/alerting_test.go index 96c317ec5..e3ec1da09 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -24,7 +24,7 @@ import ( func TestAlertingRuleHTMLSnippet(t *testing.T) { expr, err := promql.ParseExpr(`foo{html="BOLD"}`) testutil.Ok(t, err) - rule := NewAlertingRule("testrule", expr, 0, labels.FromStrings("html", "BOLD"), labels.FromStrings("html", "BOLD"), nil) + rule := NewAlertingRule("testrule", expr, 0, labels.FromStrings("html", "BOLD"), labels.FromStrings("html", "BOLD"), false, nil) const want = `alert: testrule expr: foo{html="<b>BOLD<b>"} diff --git a/rules/manager.go b/rules/manager.go index 7ddca7432..ad78704c0 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -30,6 +30,7 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/pkg/timestamp" @@ -158,6 +159,8 @@ type Group struct { evaluationDuration time.Duration mtx sync.Mutex + shouldRestore bool + done chan struct{} terminated chan struct{} @@ -165,12 +168,13 @@ type Group struct { } // NewGroup makes a new Group with the given name, options, and rules. -func NewGroup(name, file string, interval time.Duration, rules []Rule, opts *ManagerOptions) *Group { +func NewGroup(name, file string, interval time.Duration, rules []Rule, shouldRestore bool, opts *ManagerOptions) *Group { return &Group{ name: name, file: file, interval: interval, rules: rules, + shouldRestore: shouldRestore, opts: opts, seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)), done: make(chan struct{}), @@ -220,6 +224,28 @@ func (g *Group) run(ctx context.Context) { defer tick.Stop() iter() + if g.shouldRestore { + // If we have to restore, we wait for another Eval to finish. + // The reason behind this is, during first eval (or before it) + // we might not have enough data scraped, and recording rules would not + // have updated the latest values, on which some alerts might depend. + select { + case <-g.done: + return + case <-tick.C: + missed := (time.Since(evalTimestamp) / g.interval) - 1 + if missed > 0 { + iterationsMissed.Add(float64(missed)) + iterationsScheduled.Add(float64(missed)) + } + evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) + iter() + } + + g.RestoreForState(time.Now()) + g.shouldRestore = false + } + for { select { case <-g.done: @@ -410,12 +436,133 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } } +// RestoreForState restores the 'for' state of the alerts +// by looking up last ActiveAt from storage. +func (g *Group) RestoreForState(ts time.Time) { + maxtMS := int64(model.TimeFromUnixNano(ts.UnixNano())) + // We allow restoration only if alerts were active before after certain time. + mint := ts.Add(-g.opts.OutageTolerance) + mintMS := int64(model.TimeFromUnixNano(mint.UnixNano())) + q, err := g.opts.TSDB.Querier(g.opts.Context, mintMS, maxtMS) + if err != nil { + level.Error(g.logger).Log("msg", "Failed to get Querier", "err", err) + return + } + + for _, rule := range g.Rules() { + alertRule, ok := rule.(*AlertingRule) + if !ok { + continue + } + + alertHoldDuration := alertRule.HoldDuration() + if alertHoldDuration < g.opts.ForGracePeriod { + // If alertHoldDuration is already less than grace period, we would not + // like to make it wait for `g.opts.ForGracePeriod` time before firing. + // Hence we skip restoration, which will make it wait for alertHoldDuration. + alertRule.SetRestored(true) + continue + } + + alertRule.ForEachActiveAlert(func(a *Alert) { + smpl := alertRule.forStateSample(a, time.Now(), 0) + var matchers []*labels.Matcher + for _, l := range smpl.Metric { + mt, _ := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value) + matchers = append(matchers, mt) + } + + sset, err := q.Select(nil, matchers...) + if err != nil { + level.Error(g.logger).Log("msg", "Failed to restore 'for' state", + labels.AlertName, alertRule.Name(), "stage", "Select", "err", err) + return + } + + seriesFound := false + var s storage.Series + for sset.Next() { + // Query assures that smpl.Metric is included in sset.At().Labels(), + // hence just checking the length would act like equality. + // (This is faster than calling labels.Compare again as we already have some info). + if len(sset.At().Labels()) == len(smpl.Metric) { + s = sset.At() + seriesFound = true + break + } + } + + if !seriesFound { + return + } + + // Series found for the 'for' state. + var t int64 + var v float64 + it := s.Iterator() + for it.Next() { + t, v = it.At() + } + if it.Err() != nil { + level.Error(g.logger).Log("msg", "Failed to restore 'for' state", + labels.AlertName, alertRule.Name(), "stage", "Iterator", "err", it.Err()) + return + } + if value.IsStaleNaN(v) { // Alert was not active. + return + } + + downAt := time.Unix(t/1000, 0) + restoredActiveAt := time.Unix(int64(v), 0) + timeSpentPending := downAt.Sub(restoredActiveAt) + timeRemainingPending := alertHoldDuration - timeSpentPending + + if timeRemainingPending <= 0 { + // It means that alert was firing when prometheus went down. + // In the next Eval, the state of this alert will be set back to + // firing again if it's still firing in that Eval. + // Nothing to be done in this case. + } else if timeRemainingPending < g.opts.ForGracePeriod { + // (new) restoredActiveAt = (ts + m.opts.ForGracePeriod) - alertHoldDuration + // /* new firing time */ /* moving back by hold duration */ + // + // Proof of correctness: + // firingTime = restoredActiveAt.Add(alertHoldDuration) + // = ts + m.opts.ForGracePeriod - alertHoldDuration + alertHoldDuration + // = ts + m.opts.ForGracePeriod + // + // Time remaining to fire = firingTime.Sub(ts) + // = (ts + m.opts.ForGracePeriod) - ts + // = m.opts.ForGracePeriod + restoredActiveAt = ts.Add(g.opts.ForGracePeriod).Add(-alertHoldDuration) + } else { + // By shifting ActiveAt to the future (ActiveAt + some_duration), + // the total pending time from the original ActiveAt + // would be `alertHoldDuration + some_duration`. + // Here, some_duration = downDuration. + downDuration := ts.Sub(downAt) + restoredActiveAt = restoredActiveAt.Add(downDuration) + } + + a.ActiveAt = restoredActiveAt + level.Debug(g.logger).Log("msg", "'for' state restored", + labels.AlertName, alertRule.Name(), "restored_time", a.ActiveAt.Format(time.RFC850), + "labels", a.Labels.String()) + + }) + + alertRule.SetRestored(true) + } + +} + // The Manager manages recording and alerting rules. type Manager struct { - opts *ManagerOptions - groups map[string]*Group - mtx sync.RWMutex - block chan struct{} + opts *ManagerOptions + groups map[string]*Group + mtx sync.RWMutex + block chan struct{} + restored bool logger log.Logger } @@ -430,13 +577,16 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) error // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable Appendable - Logger log.Logger - Registerer prometheus.Registerer + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable Appendable + TSDB storage.Storage + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration } // NewManager returns an implementation of Manager, ready to be started @@ -486,6 +636,7 @@ func (m *Manager) Update(interval time.Duration, files []string) error { } return errors.New("error loading rules, previous rule set restored") } + m.restored = true var wg sync.WaitGroup @@ -531,6 +682,8 @@ func (m *Manager) Update(interval time.Duration, files []string) error { func (m *Manager) loadGroups(interval time.Duration, filenames ...string) (map[string]*Group, []error) { groups := make(map[string]*Group) + shouldRestore := !m.restored + for _, fn := range filenames { rgs, errs := rulefmt.ParseFile(fn) if errs != nil { @@ -557,6 +710,7 @@ func (m *Manager) loadGroups(interval time.Duration, filenames ...string) (map[s time.Duration(r.For), labels.FromMap(r.Labels), labels.FromMap(r.Annotations), + m.restored, log.With(m.logger, "alert", r.Alert), )) continue @@ -568,7 +722,7 @@ func (m *Manager) loadGroups(interval time.Duration, filenames ...string) (map[s )) } - groups[groupKey(rg.Name, fn)] = NewGroup(rg.Name, fn, itv, rules, m.opts) + groups[groupKey(rg.Name, fn)] = NewGroup(rg.Name, fn, itv, rules, shouldRestore, m.opts) } } diff --git a/rules/manager_test.go b/rules/manager_test.go index 9860f098b..89a1620c5 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -51,7 +51,7 @@ func TestAlertingRule(t *testing.T) { expr, time.Minute, labels.FromStrings("severity", "{{\"c\"}}ritical"), - nil, nil, + nil, true, nil, ) result := promql.Vector{ { @@ -146,15 +146,25 @@ func TestAlertingRule(t *testing.T) { res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil) testutil.Ok(t, err) + var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. + for _, smpl := range res { + smplName := smpl.Metric.Get("__name__") + if smplName == "ALERTS" { + filteredRes = append(filteredRes, smpl) + } else { + // If not 'ALERTS', it has to be 'ALERTS_FOR_STATE'. + testutil.Equals(t, smplName, "ALERTS_FOR_STATE") + } + } for i := range test.result { test.result[i].T = timestamp.FromTime(evalTime) } - testutil.Assert(t, len(test.result) == len(res), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res)) + testutil.Assert(t, len(test.result) == len(filteredRes), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res)) - sort.Slice(res, func(i, j int) bool { - return labels.Compare(res[i].Metric, res[j].Metric) < 0 + sort.Slice(filteredRes, func(i, j int) bool { + return labels.Compare(filteredRes[i].Metric, filteredRes[j].Metric) < 0 }) - testutil.Equals(t, test.result, res) + testutil.Equals(t, test.result, filteredRes) for _, aa := range rule.ActiveAlerts() { testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels) @@ -162,6 +172,327 @@ func TestAlertingRule(t *testing.T) { } } +func TestForStateAddSamples(t *testing.T) { + suite, err := promql.NewTest(t, ` + load 5m + http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85 + http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 80 90 100 110 120 130 140 + `) + testutil.Ok(t, err) + defer suite.Close() + + err = suite.Run() + testutil.Ok(t, err) + + expr, err := promql.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) + testutil.Ok(t, err) + + rule := NewAlertingRule( + "HTTPRequestRateLow", + expr, + time.Minute, + labels.FromStrings("severity", "{{\"c\"}}ritical"), + nil, true, nil, + ) + result := promql.Vector{ + { + Metric: labels.FromStrings( + "__name__", "ALERTS_FOR_STATE", + "alertname", "HTTPRequestRateLow", + "group", "canary", + "instance", "0", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, + { + Metric: labels.FromStrings( + "__name__", "ALERTS_FOR_STATE", + "alertname", "HTTPRequestRateLow", + "group", "canary", + "instance", "1", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, + { + Metric: labels.FromStrings( + "__name__", "ALERTS_FOR_STATE", + "alertname", "HTTPRequestRateLow", + "group", "canary", + "instance", "0", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, + { + Metric: labels.FromStrings( + "__name__", "ALERTS_FOR_STATE", + "alertname", "HTTPRequestRateLow", + "group", "canary", + "instance", "1", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, + } + + baseTime := time.Unix(0, 0) + + var tests = []struct { + time time.Duration + result promql.Vector + persistThisTime bool // If true, it means this 'time' is persisted for 'for'. + }{ + { + time: 0, + result: append(promql.Vector{}, result[:2]...), + persistThisTime: true, + }, + { + time: 5 * time.Minute, + result: append(promql.Vector{}, result[2:]...), + }, + { + time: 10 * time.Minute, + result: append(promql.Vector{}, result[2:3]...), + }, + { + time: 15 * time.Minute, + result: nil, + }, + { + time: 20 * time.Minute, + result: nil, + }, + { + time: 25 * time.Minute, + result: append(promql.Vector{}, result[:1]...), + persistThisTime: true, + }, + { + time: 30 * time.Minute, + result: append(promql.Vector{}, result[2:3]...), + }, + } + + var forState float64 + for i, test := range tests { + t.Logf("case %d", i) + evalTime := baseTime.Add(test.time) + + if test.persistThisTime { + forState = float64(evalTime.Unix()) + } + if test.result == nil { + forState = float64(value.StaleNaN) + } + + res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil) + testutil.Ok(t, err) + + var filteredRes promql.Vector // After removing 'ALERTS' samples. + for _, smpl := range res { + smplName := smpl.Metric.Get("__name__") + if smplName == "ALERTS_FOR_STATE" { + filteredRes = append(filteredRes, smpl) + } else { + // If not 'ALERTS_FOR_STATE', it has to be 'ALERTS'. + testutil.Equals(t, smplName, "ALERTS") + } + } + for i := range test.result { + test.result[i].T = timestamp.FromTime(evalTime) + // Updating the expected 'for' state. + if test.result[i].V >= 0 { + test.result[i].V = forState + } + } + testutil.Assert(t, len(test.result) == len(filteredRes), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res)) + + sort.Slice(filteredRes, func(i, j int) bool { + return labels.Compare(filteredRes[i].Metric, filteredRes[j].Metric) < 0 + }) + testutil.Equals(t, test.result, filteredRes) + + for _, aa := range rule.ActiveAlerts() { + testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels) + } + + } +} + +func TestForStateRestore(t *testing.T) { + + suite, err := promql.NewTest(t, ` + load 5m + http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 50 0 0 25 0 0 40 0 120 + http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 125 90 60 0 0 25 0 0 40 0 130 + `) + testutil.Ok(t, err) + defer suite.Close() + + err = suite.Run() + testutil.Ok(t, err) + + expr, err := promql.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) + testutil.Ok(t, err) + + opts := &ManagerOptions{ + QueryFunc: EngineQueryFunc(suite.QueryEngine(), suite.Storage()), + Appendable: suite.Storage(), + TSDB: suite.Storage(), + Context: context.Background(), + Logger: log.NewNopLogger(), + NotifyFunc: func(ctx context.Context, expr string, alerts ...*Alert) error { + return nil + }, + OutageTolerance: 30 * time.Minute, + ForGracePeriod: 10 * time.Minute, + } + + alertForDuration := 25 * time.Minute + // Initial run before prometheus goes down. + rule := NewAlertingRule( + "HTTPRequestRateLow", + expr, + alertForDuration, + labels.FromStrings("severity", "critical"), + nil, true, nil, + ) + + group := NewGroup("default", "", time.Second, []Rule{rule}, true, opts) + + groups := make(map[string]*Group) + groups["default;"] = group + + initialRuns := []time.Duration{0, 5 * time.Minute} + + baseTime := time.Unix(0, 0) + for _, duration := range initialRuns { + evalTime := baseTime.Add(duration) + group.Eval(suite.Context(), evalTime) + } + + exp := rule.ActiveAlerts() + for _, aa := range exp { + testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels) + } + sort.Slice(exp, func(i, j int) bool { + return labels.Compare(exp[i].Labels, exp[j].Labels) < 0 + }) + + // Prometheus goes down here. We create new rules and groups. + + type testInput struct { + restoreDuration time.Duration + alerts []*Alert + + num int + noRestore bool + gracePeriod bool + downDuration time.Duration + } + + tests := []testInput{ + { + // Normal restore (alerts were not firing). + restoreDuration: 10 * time.Minute, + alerts: rule.ActiveAlerts(), + downDuration: 5 * time.Minute, + }, + { + // Testing Outage Tolerance. + restoreDuration: 40 * time.Minute, + noRestore: true, + num: 2, + }, + { + // No active alerts. + restoreDuration: 50 * time.Minute, + alerts: []*Alert{}, + }, + } + + testFunc := func(tst testInput) { + newRule := NewAlertingRule( + "HTTPRequestRateLow", + expr, + alertForDuration, + labels.FromStrings("severity", "critical"), + nil, false, nil, + ) + newGroup := NewGroup("default", "", time.Second, []Rule{newRule}, true, opts) + + newGroups := make(map[string]*Group) + newGroups["default;"] = newGroup + + m := NewManager(opts) + m.mtx.Lock() + m.groups = newGroups + m.mtx.Unlock() + + restoreTime := baseTime.Add(tst.restoreDuration) + // First eval before restoration. + newGroup.Eval(suite.Context(), restoreTime) + // Restore happens here. + newGroup.RestoreForState(restoreTime) + + got := newRule.ActiveAlerts() + for _, aa := range got { + testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels) + } + sort.Slice(got, func(i, j int) bool { + return labels.Compare(got[i].Labels, got[j].Labels) < 0 + }) + + // Checking if we have restored it correctly. + + if tst.noRestore { + testutil.Equals(t, tst.num, len(got)) + for _, e := range got { + testutil.Equals(t, e.ActiveAt, restoreTime) + } + } else if tst.gracePeriod { + testutil.Equals(t, tst.num, len(got)) + for _, e := range got { + testutil.Equals(t, opts.ForGracePeriod, e.ActiveAt.Add(alertForDuration).Sub(restoreTime)) + } + } else { + exp := tst.alerts + testutil.Equals(t, len(exp), len(got)) + for i, e := range exp { + testutil.Equals(t, e.Labels, got[i].Labels) + + // Difference in time should be within 1e6 ns, i.e. 1ms + // (due to conversion between ns & ms, float64 & int64). + activeAtDiff := float64(e.ActiveAt.Unix() + int64(tst.downDuration/time.Second) - got[i].ActiveAt.Unix()) + testutil.Assert(t, math.Abs(activeAtDiff) == 0, "'for' state restored time is wrong") + } + } + } + + for _, tst := range tests { + testFunc(tst) + } + + // Testing the grace period. + for _, duration := range []time.Duration{10 * time.Minute, 15 * time.Minute, 20 * time.Minute} { + evalTime := baseTime.Add(duration) + group.Eval(suite.Context(), evalTime) + } + testFunc(testInput{ + restoreDuration: 25 * time.Minute, + alerts: []*Alert{}, + gracePeriod: true, + num: 2, + }) +} + func TestStaleness(t *testing.T) { storage := testutil.NewStorage(t) defer storage.Close() @@ -169,6 +500,7 @@ func TestStaleness(t *testing.T) { opts := &ManagerOptions{ QueryFunc: EngineQueryFunc(engine, storage), Appendable: storage, + TSDB: storage, Context: context.Background(), Logger: log.NewNopLogger(), } @@ -176,7 +508,7 @@ func TestStaleness(t *testing.T) { expr, err := promql.ParseExpr("a + 1") testutil.Ok(t, err) rule := NewRecordingRule("a_plus_one", expr, labels.Labels{}) - group := NewGroup("default", "", time.Second, []Rule{rule}, opts) + group := NewGroup("default", "", time.Second, []Rule{rule}, true, opts) // A time series that has two samples and then goes stale. app, _ := storage.Appender() @@ -244,7 +576,7 @@ func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.Point, error) { func TestCopyState(t *testing.T) { oldGroup := &Group{ rules: []Rule{ - NewAlertingRule("alert", nil, 0, nil, nil, nil), + NewAlertingRule("alert", nil, 0, nil, nil, true, nil), NewRecordingRule("rule1", nil, nil), NewRecordingRule("rule2", nil, nil), NewRecordingRule("rule3", nil, nil), @@ -265,7 +597,7 @@ func TestCopyState(t *testing.T) { NewRecordingRule("rule3", nil, nil), NewRecordingRule("rule3", nil, nil), NewRecordingRule("rule3", nil, nil), - NewAlertingRule("alert", nil, 0, nil, nil, nil), + NewAlertingRule("alert", nil, 0, nil, nil, true, nil), NewRecordingRule("rule1", nil, nil), NewRecordingRule("rule4", nil, nil), }, diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index e0d0f23c5..53ee38987 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -121,6 +121,7 @@ func (m rulesRetrieverMock) AlertingRules() []*rules.AlertingRule { time.Second, labels.Labels{}, labels.Labels{}, + true, log.NewNopLogger(), ) rule2 := rules.NewAlertingRule( @@ -129,6 +130,7 @@ func (m rulesRetrieverMock) AlertingRules() []*rules.AlertingRule { time.Second, labels.Labels{}, labels.Labels{}, + true, log.NewNopLogger(), ) var r []*rules.AlertingRule @@ -164,7 +166,7 @@ func (m rulesRetrieverMock) RuleGroups() []*rules.Group { recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{}) r = append(r, recordingRule) - group := rules.NewGroup("grp", "/path/to/file", time.Second, r, opts) + group := rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts) return []*rules.Group{group} }