Persist alert 'for' state across restarts (#4061)

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2018-08-02 15:48:24 +05:30 committed by Brian Brazil
parent f978f5bba3
commit f1db699dff
6 changed files with 582 additions and 33 deletions

View file

@ -86,6 +86,8 @@ func main() {
localStoragePath string localStoragePath string
notifier notifier.Options notifier notifier.Options
notifierTimeout model.Duration notifierTimeout model.Duration
forGracePeriod model.Duration
outageTolerance model.Duration
web web.Options web web.Options
tsdb tsdb.Options tsdb tsdb.Options
lookbackDelta model.Duration lookbackDelta model.Duration
@ -164,6 +166,12 @@ func main() {
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline) Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring 'for' state of alert.").
Default("1h").SetValue(&cfg.outageTolerance)
a.Flag("rules.alert.for-grace-period", "Minimum duration between alert and restored 'for' state. This is maintained only for alerts with configured 'for' time greater than grace period.").
Default("10m").SetValue(&cfg.forGracePeriod)
a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications.").
Default("10000").IntVar(&cfg.notifier.QueueCapacity) Default("10000").IntVar(&cfg.notifier.QueueCapacity)
@ -252,13 +260,16 @@ func main() {
) )
ruleManager = rules.NewManager(&rules.ManagerOptions{ ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage, Appendable: fanoutStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), TSDB: localStorage,
NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
Context: ctxRule, NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()),
ExternalURL: cfg.web.ExternalURL, Context: ctxRule,
Registerer: prometheus.DefaultRegisterer, ExternalURL: cfg.web.ExternalURL,
Logger: log.With(logger, "component", "rule manager"), Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
}) })
) )

View file

@ -39,6 +39,8 @@ import (
const ( const (
// AlertMetricName is the metric name for synthetic alert timeseries. // AlertMetricName is the metric name for synthetic alert timeseries.
alertMetricName = "ALERTS" alertMetricName = "ALERTS"
// AlertForStateMetricName is the metric name for 'for' state of alert.
alertForStateMetricName = "ALERTS_FOR_STATE"
// AlertNameLabel is the label name indicating the name of an alert. // AlertNameLabel is the label name indicating the name of an alert.
alertNameLabel = "alertname" alertNameLabel = "alertname"
@ -103,6 +105,9 @@ type AlertingRule struct {
annotations labels.Labels annotations labels.Labels
// Time in seconds taken to evaluate rule. // Time in seconds taken to evaluate rule.
evaluationDuration time.Duration evaluationDuration time.Duration
// true if old state has been restored. We start persisting samples for ALERT_FOR_STATE
// only after the restoration.
restored bool
// Protects the below. // Protects the below.
mtx sync.Mutex mtx sync.Mutex
@ -114,7 +119,7 @@ type AlertingRule struct {
} }
// NewAlertingRule constructs a new AlertingRule. // NewAlertingRule constructs a new AlertingRule.
func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, anns labels.Labels, logger log.Logger) *AlertingRule { func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, anns labels.Labels, restored bool, logger log.Logger) *AlertingRule {
return &AlertingRule{ return &AlertingRule{
name: name, name: name,
vector: vec, vector: vec,
@ -123,6 +128,7 @@ func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, ann
annotations: anns, annotations: anns,
active: map[uint64]*Alert{}, active: map[uint64]*Alert{},
logger: logger, logger: logger,
restored: restored,
} }
} }
@ -173,6 +179,24 @@ func (r *AlertingRule) sample(alert *Alert, ts time.Time) promql.Sample {
return s return s
} }
// forStateSample returns the sample for ALERTS_FOR_STATE.
func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) promql.Sample {
lb := labels.NewBuilder(r.labels)
for _, l := range alert.Labels {
lb.Set(l.Name, l.Value)
}
lb.Set(labels.MetricName, alertForStateMetricName)
lb.Set(labels.AlertName, r.name)
s := promql.Sample{
Metric: lb.Labels(),
Point: promql.Point{T: timestamp.FromTime(ts), V: v},
}
return s
}
// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation. // SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation.
func (r *AlertingRule) SetEvaluationDuration(dur time.Duration) { func (r *AlertingRule) SetEvaluationDuration(dur time.Duration) {
r.mtx.Lock() r.mtx.Lock()
@ -187,6 +211,11 @@ func (r *AlertingRule) GetEvaluationDuration() time.Duration {
return r.evaluationDuration return r.evaluationDuration
} }
// SetRestored updates the restoration state of the alerting rule.
func (r *AlertingRule) SetRestored(restored bool) {
r.restored = restored
}
// resolvedRetention is the duration for which a resolved alert instance // resolvedRetention is the duration for which a resolved alert instance
// is kept in memory state and consequentally repeatedly sent to the AlertManager. // is kept in memory state and consequentally repeatedly sent to the AlertManager.
const resolvedRetention = 15 * time.Minute const resolvedRetention = 15 * time.Minute
@ -206,6 +235,7 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
// or update the expression value for existing elements. // or update the expression value for existing elements.
resultFPs := map[uint64]struct{}{} resultFPs := map[uint64]struct{}{}
var vec promql.Vector
for _, smpl := range res { for _, smpl := range res {
// Provide the alert information to the template. // Provide the alert information to the template.
l := make(map[string]string, len(smpl.Metric)) l := make(map[string]string, len(smpl.Metric))
@ -274,7 +304,6 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
} }
} }
var vec promql.Vector
// Check if any pending alerts should be removed or fire now. Write out alert timeseries. // Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.active { for fp, a := range r.active {
if _, ok := resultFPs[fp]; !ok { if _, ok := resultFPs[fp]; !ok {
@ -295,7 +324,10 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
a.FiredAt = ts a.FiredAt = ts
} }
vec = append(vec, r.sample(a, ts)) if r.restored {
vec = append(vec, r.sample(a, ts))
vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix())))
}
} }
return vec, nil return vec, nil
@ -342,6 +374,19 @@ func (r *AlertingRule) currentAlerts() []*Alert {
return alerts return alerts
} }
// ForEachActiveAlert runs the given function on each alert.
// This should be used when you want to use the actual alerts from the AlertingRule
// and not on its copy.
// If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'.
func (r *AlertingRule) ForEachActiveAlert(f func(*Alert)) {
r.mtx.Lock()
defer r.mtx.Unlock()
for _, a := range r.active {
f(a)
}
}
func (r *AlertingRule) String() string { func (r *AlertingRule) String() string {
ar := rulefmt.Rule{ ar := rulefmt.Rule{
Alert: r.name, Alert: r.name,
@ -392,3 +437,8 @@ func (r *AlertingRule) HTMLSnippet(pathPrefix string) html_template.HTML {
} }
return html_template.HTML(byt) return html_template.HTML(byt)
} }
// HoldDuration returns the holdDuration of the alerting rule.
func (r *AlertingRule) HoldDuration() time.Duration {
return r.holdDuration
}

View file

@ -24,7 +24,7 @@ import (
func TestAlertingRuleHTMLSnippet(t *testing.T) { func TestAlertingRuleHTMLSnippet(t *testing.T) {
expr, err := promql.ParseExpr(`foo{html="<b>BOLD<b>"}`) expr, err := promql.ParseExpr(`foo{html="<b>BOLD<b>"}`)
testutil.Ok(t, err) testutil.Ok(t, err)
rule := NewAlertingRule("testrule", expr, 0, labels.FromStrings("html", "<b>BOLD</b>"), labels.FromStrings("html", "<b>BOLD</b>"), nil) rule := NewAlertingRule("testrule", expr, 0, labels.FromStrings("html", "<b>BOLD</b>"), labels.FromStrings("html", "<b>BOLD</b>"), false, nil)
const want = `alert: <a href="/test/prefix/graph?g0.expr=ALERTS%7Balertname%3D%22testrule%22%7D&g0.tab=1">testrule</a> const want = `alert: <a href="/test/prefix/graph?g0.expr=ALERTS%7Balertname%3D%22testrule%22%7D&g0.tab=1">testrule</a>
expr: <a href="/test/prefix/graph?g0.expr=foo%7Bhtml%3D%22%3Cb%3EBOLD%3Cb%3E%22%7D&g0.tab=1">foo{html=&#34;&lt;b&gt;BOLD&lt;b&gt;&#34;}</a> expr: <a href="/test/prefix/graph?g0.expr=foo%7Bhtml%3D%22%3Cb%3EBOLD%3Cb%3E%22%7D&g0.tab=1">foo{html=&#34;&lt;b&gt;BOLD&lt;b&gt;&#34;}</a>

View file

@ -30,6 +30,7 @@ import (
opentracing "github.com/opentracing/opentracing-go" opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/timestamp"
@ -158,6 +159,8 @@ type Group struct {
evaluationDuration time.Duration evaluationDuration time.Duration
mtx sync.Mutex mtx sync.Mutex
shouldRestore bool
done chan struct{} done chan struct{}
terminated chan struct{} terminated chan struct{}
@ -165,12 +168,13 @@ type Group struct {
} }
// NewGroup makes a new Group with the given name, options, and rules. // NewGroup makes a new Group with the given name, options, and rules.
func NewGroup(name, file string, interval time.Duration, rules []Rule, opts *ManagerOptions) *Group { func NewGroup(name, file string, interval time.Duration, rules []Rule, shouldRestore bool, opts *ManagerOptions) *Group {
return &Group{ return &Group{
name: name, name: name,
file: file, file: file,
interval: interval, interval: interval,
rules: rules, rules: rules,
shouldRestore: shouldRestore,
opts: opts, opts: opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)), seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)),
done: make(chan struct{}), done: make(chan struct{}),
@ -220,6 +224,28 @@ func (g *Group) run(ctx context.Context) {
defer tick.Stop() defer tick.Stop()
iter() iter()
if g.shouldRestore {
// If we have to restore, we wait for another Eval to finish.
// The reason behind this is, during first eval (or before it)
// we might not have enough data scraped, and recording rules would not
// have updated the latest values, on which some alerts might depend.
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
iterationsMissed.Add(float64(missed))
iterationsScheduled.Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
iter()
}
g.RestoreForState(time.Now())
g.shouldRestore = false
}
for { for {
select { select {
case <-g.done: case <-g.done:
@ -410,12 +436,133 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
} }
} }
// RestoreForState restores the 'for' state of the alerts
// by looking up last ActiveAt from storage.
func (g *Group) RestoreForState(ts time.Time) {
maxtMS := int64(model.TimeFromUnixNano(ts.UnixNano()))
// We allow restoration only if alerts were active before after certain time.
mint := ts.Add(-g.opts.OutageTolerance)
mintMS := int64(model.TimeFromUnixNano(mint.UnixNano()))
q, err := g.opts.TSDB.Querier(g.opts.Context, mintMS, maxtMS)
if err != nil {
level.Error(g.logger).Log("msg", "Failed to get Querier", "err", err)
return
}
for _, rule := range g.Rules() {
alertRule, ok := rule.(*AlertingRule)
if !ok {
continue
}
alertHoldDuration := alertRule.HoldDuration()
if alertHoldDuration < g.opts.ForGracePeriod {
// If alertHoldDuration is already less than grace period, we would not
// like to make it wait for `g.opts.ForGracePeriod` time before firing.
// Hence we skip restoration, which will make it wait for alertHoldDuration.
alertRule.SetRestored(true)
continue
}
alertRule.ForEachActiveAlert(func(a *Alert) {
smpl := alertRule.forStateSample(a, time.Now(), 0)
var matchers []*labels.Matcher
for _, l := range smpl.Metric {
mt, _ := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value)
matchers = append(matchers, mt)
}
sset, err := q.Select(nil, matchers...)
if err != nil {
level.Error(g.logger).Log("msg", "Failed to restore 'for' state",
labels.AlertName, alertRule.Name(), "stage", "Select", "err", err)
return
}
seriesFound := false
var s storage.Series
for sset.Next() {
// Query assures that smpl.Metric is included in sset.At().Labels(),
// hence just checking the length would act like equality.
// (This is faster than calling labels.Compare again as we already have some info).
if len(sset.At().Labels()) == len(smpl.Metric) {
s = sset.At()
seriesFound = true
break
}
}
if !seriesFound {
return
}
// Series found for the 'for' state.
var t int64
var v float64
it := s.Iterator()
for it.Next() {
t, v = it.At()
}
if it.Err() != nil {
level.Error(g.logger).Log("msg", "Failed to restore 'for' state",
labels.AlertName, alertRule.Name(), "stage", "Iterator", "err", it.Err())
return
}
if value.IsStaleNaN(v) { // Alert was not active.
return
}
downAt := time.Unix(t/1000, 0)
restoredActiveAt := time.Unix(int64(v), 0)
timeSpentPending := downAt.Sub(restoredActiveAt)
timeRemainingPending := alertHoldDuration - timeSpentPending
if timeRemainingPending <= 0 {
// It means that alert was firing when prometheus went down.
// In the next Eval, the state of this alert will be set back to
// firing again if it's still firing in that Eval.
// Nothing to be done in this case.
} else if timeRemainingPending < g.opts.ForGracePeriod {
// (new) restoredActiveAt = (ts + m.opts.ForGracePeriod) - alertHoldDuration
// /* new firing time */ /* moving back by hold duration */
//
// Proof of correctness:
// firingTime = restoredActiveAt.Add(alertHoldDuration)
// = ts + m.opts.ForGracePeriod - alertHoldDuration + alertHoldDuration
// = ts + m.opts.ForGracePeriod
//
// Time remaining to fire = firingTime.Sub(ts)
// = (ts + m.opts.ForGracePeriod) - ts
// = m.opts.ForGracePeriod
restoredActiveAt = ts.Add(g.opts.ForGracePeriod).Add(-alertHoldDuration)
} else {
// By shifting ActiveAt to the future (ActiveAt + some_duration),
// the total pending time from the original ActiveAt
// would be `alertHoldDuration + some_duration`.
// Here, some_duration = downDuration.
downDuration := ts.Sub(downAt)
restoredActiveAt = restoredActiveAt.Add(downDuration)
}
a.ActiveAt = restoredActiveAt
level.Debug(g.logger).Log("msg", "'for' state restored",
labels.AlertName, alertRule.Name(), "restored_time", a.ActiveAt.Format(time.RFC850),
"labels", a.Labels.String())
})
alertRule.SetRestored(true)
}
}
// The Manager manages recording and alerting rules. // The Manager manages recording and alerting rules.
type Manager struct { type Manager struct {
opts *ManagerOptions opts *ManagerOptions
groups map[string]*Group groups map[string]*Group
mtx sync.RWMutex mtx sync.RWMutex
block chan struct{} block chan struct{}
restored bool
logger log.Logger logger log.Logger
} }
@ -430,13 +577,16 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) error
// ManagerOptions bundles options for the Manager. // ManagerOptions bundles options for the Manager.
type ManagerOptions struct { type ManagerOptions struct {
ExternalURL *url.URL ExternalURL *url.URL
QueryFunc QueryFunc QueryFunc QueryFunc
NotifyFunc NotifyFunc NotifyFunc NotifyFunc
Context context.Context Context context.Context
Appendable Appendable Appendable Appendable
Logger log.Logger TSDB storage.Storage
Registerer prometheus.Registerer Logger log.Logger
Registerer prometheus.Registerer
OutageTolerance time.Duration
ForGracePeriod time.Duration
} }
// NewManager returns an implementation of Manager, ready to be started // NewManager returns an implementation of Manager, ready to be started
@ -486,6 +636,7 @@ func (m *Manager) Update(interval time.Duration, files []string) error {
} }
return errors.New("error loading rules, previous rule set restored") return errors.New("error loading rules, previous rule set restored")
} }
m.restored = true
var wg sync.WaitGroup var wg sync.WaitGroup
@ -531,6 +682,8 @@ func (m *Manager) Update(interval time.Duration, files []string) error {
func (m *Manager) loadGroups(interval time.Duration, filenames ...string) (map[string]*Group, []error) { func (m *Manager) loadGroups(interval time.Duration, filenames ...string) (map[string]*Group, []error) {
groups := make(map[string]*Group) groups := make(map[string]*Group)
shouldRestore := !m.restored
for _, fn := range filenames { for _, fn := range filenames {
rgs, errs := rulefmt.ParseFile(fn) rgs, errs := rulefmt.ParseFile(fn)
if errs != nil { if errs != nil {
@ -557,6 +710,7 @@ func (m *Manager) loadGroups(interval time.Duration, filenames ...string) (map[s
time.Duration(r.For), time.Duration(r.For),
labels.FromMap(r.Labels), labels.FromMap(r.Labels),
labels.FromMap(r.Annotations), labels.FromMap(r.Annotations),
m.restored,
log.With(m.logger, "alert", r.Alert), log.With(m.logger, "alert", r.Alert),
)) ))
continue continue
@ -568,7 +722,7 @@ func (m *Manager) loadGroups(interval time.Duration, filenames ...string) (map[s
)) ))
} }
groups[groupKey(rg.Name, fn)] = NewGroup(rg.Name, fn, itv, rules, m.opts) groups[groupKey(rg.Name, fn)] = NewGroup(rg.Name, fn, itv, rules, shouldRestore, m.opts)
} }
} }

View file

@ -51,7 +51,7 @@ func TestAlertingRule(t *testing.T) {
expr, expr,
time.Minute, time.Minute,
labels.FromStrings("severity", "{{\"c\"}}ritical"), labels.FromStrings("severity", "{{\"c\"}}ritical"),
nil, nil, nil, true, nil,
) )
result := promql.Vector{ result := promql.Vector{
{ {
@ -146,15 +146,25 @@ func TestAlertingRule(t *testing.T) {
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil) res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil)
testutil.Ok(t, err) testutil.Ok(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
for _, smpl := range res {
smplName := smpl.Metric.Get("__name__")
if smplName == "ALERTS" {
filteredRes = append(filteredRes, smpl)
} else {
// If not 'ALERTS', it has to be 'ALERTS_FOR_STATE'.
testutil.Equals(t, smplName, "ALERTS_FOR_STATE")
}
}
for i := range test.result { for i := range test.result {
test.result[i].T = timestamp.FromTime(evalTime) test.result[i].T = timestamp.FromTime(evalTime)
} }
testutil.Assert(t, len(test.result) == len(res), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res)) testutil.Assert(t, len(test.result) == len(filteredRes), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res))
sort.Slice(res, func(i, j int) bool { sort.Slice(filteredRes, func(i, j int) bool {
return labels.Compare(res[i].Metric, res[j].Metric) < 0 return labels.Compare(filteredRes[i].Metric, filteredRes[j].Metric) < 0
}) })
testutil.Equals(t, test.result, res) testutil.Equals(t, test.result, filteredRes)
for _, aa := range rule.ActiveAlerts() { for _, aa := range rule.ActiveAlerts() {
testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels) testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
@ -162,6 +172,327 @@ func TestAlertingRule(t *testing.T) {
} }
} }
func TestForStateAddSamples(t *testing.T) {
suite, err := promql.NewTest(t, `
load 5m
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85
http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 80 90 100 110 120 130 140
`)
testutil.Ok(t, err)
defer suite.Close()
err = suite.Run()
testutil.Ok(t, err)
expr, err := promql.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
testutil.Ok(t, err)
rule := NewAlertingRule(
"HTTPRequestRateLow",
expr,
time.Minute,
labels.FromStrings("severity", "{{\"c\"}}ritical"),
nil, true, nil,
)
result := promql.Vector{
{
Metric: labels.FromStrings(
"__name__", "ALERTS_FOR_STATE",
"alertname", "HTTPRequestRateLow",
"group", "canary",
"instance", "0",
"job", "app-server",
"severity", "critical",
),
Point: promql.Point{V: 1},
},
{
Metric: labels.FromStrings(
"__name__", "ALERTS_FOR_STATE",
"alertname", "HTTPRequestRateLow",
"group", "canary",
"instance", "1",
"job", "app-server",
"severity", "critical",
),
Point: promql.Point{V: 1},
},
{
Metric: labels.FromStrings(
"__name__", "ALERTS_FOR_STATE",
"alertname", "HTTPRequestRateLow",
"group", "canary",
"instance", "0",
"job", "app-server",
"severity", "critical",
),
Point: promql.Point{V: 1},
},
{
Metric: labels.FromStrings(
"__name__", "ALERTS_FOR_STATE",
"alertname", "HTTPRequestRateLow",
"group", "canary",
"instance", "1",
"job", "app-server",
"severity", "critical",
),
Point: promql.Point{V: 1},
},
}
baseTime := time.Unix(0, 0)
var tests = []struct {
time time.Duration
result promql.Vector
persistThisTime bool // If true, it means this 'time' is persisted for 'for'.
}{
{
time: 0,
result: append(promql.Vector{}, result[:2]...),
persistThisTime: true,
},
{
time: 5 * time.Minute,
result: append(promql.Vector{}, result[2:]...),
},
{
time: 10 * time.Minute,
result: append(promql.Vector{}, result[2:3]...),
},
{
time: 15 * time.Minute,
result: nil,
},
{
time: 20 * time.Minute,
result: nil,
},
{
time: 25 * time.Minute,
result: append(promql.Vector{}, result[:1]...),
persistThisTime: true,
},
{
time: 30 * time.Minute,
result: append(promql.Vector{}, result[2:3]...),
},
}
var forState float64
for i, test := range tests {
t.Logf("case %d", i)
evalTime := baseTime.Add(test.time)
if test.persistThisTime {
forState = float64(evalTime.Unix())
}
if test.result == nil {
forState = float64(value.StaleNaN)
}
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil)
testutil.Ok(t, err)
var filteredRes promql.Vector // After removing 'ALERTS' samples.
for _, smpl := range res {
smplName := smpl.Metric.Get("__name__")
if smplName == "ALERTS_FOR_STATE" {
filteredRes = append(filteredRes, smpl)
} else {
// If not 'ALERTS_FOR_STATE', it has to be 'ALERTS'.
testutil.Equals(t, smplName, "ALERTS")
}
}
for i := range test.result {
test.result[i].T = timestamp.FromTime(evalTime)
// Updating the expected 'for' state.
if test.result[i].V >= 0 {
test.result[i].V = forState
}
}
testutil.Assert(t, len(test.result) == len(filteredRes), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res))
sort.Slice(filteredRes, func(i, j int) bool {
return labels.Compare(filteredRes[i].Metric, filteredRes[j].Metric) < 0
})
testutil.Equals(t, test.result, filteredRes)
for _, aa := range rule.ActiveAlerts() {
testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
}
}
}
func TestForStateRestore(t *testing.T) {
suite, err := promql.NewTest(t, `
load 5m
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 50 0 0 25 0 0 40 0 120
http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 125 90 60 0 0 25 0 0 40 0 130
`)
testutil.Ok(t, err)
defer suite.Close()
err = suite.Run()
testutil.Ok(t, err)
expr, err := promql.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
testutil.Ok(t, err)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(suite.QueryEngine(), suite.Storage()),
Appendable: suite.Storage(),
TSDB: suite.Storage(),
Context: context.Background(),
Logger: log.NewNopLogger(),
NotifyFunc: func(ctx context.Context, expr string, alerts ...*Alert) error {
return nil
},
OutageTolerance: 30 * time.Minute,
ForGracePeriod: 10 * time.Minute,
}
alertForDuration := 25 * time.Minute
// Initial run before prometheus goes down.
rule := NewAlertingRule(
"HTTPRequestRateLow",
expr,
alertForDuration,
labels.FromStrings("severity", "critical"),
nil, true, nil,
)
group := NewGroup("default", "", time.Second, []Rule{rule}, true, opts)
groups := make(map[string]*Group)
groups["default;"] = group
initialRuns := []time.Duration{0, 5 * time.Minute}
baseTime := time.Unix(0, 0)
for _, duration := range initialRuns {
evalTime := baseTime.Add(duration)
group.Eval(suite.Context(), evalTime)
}
exp := rule.ActiveAlerts()
for _, aa := range exp {
testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
}
sort.Slice(exp, func(i, j int) bool {
return labels.Compare(exp[i].Labels, exp[j].Labels) < 0
})
// Prometheus goes down here. We create new rules and groups.
type testInput struct {
restoreDuration time.Duration
alerts []*Alert
num int
noRestore bool
gracePeriod bool
downDuration time.Duration
}
tests := []testInput{
{
// Normal restore (alerts were not firing).
restoreDuration: 10 * time.Minute,
alerts: rule.ActiveAlerts(),
downDuration: 5 * time.Minute,
},
{
// Testing Outage Tolerance.
restoreDuration: 40 * time.Minute,
noRestore: true,
num: 2,
},
{
// No active alerts.
restoreDuration: 50 * time.Minute,
alerts: []*Alert{},
},
}
testFunc := func(tst testInput) {
newRule := NewAlertingRule(
"HTTPRequestRateLow",
expr,
alertForDuration,
labels.FromStrings("severity", "critical"),
nil, false, nil,
)
newGroup := NewGroup("default", "", time.Second, []Rule{newRule}, true, opts)
newGroups := make(map[string]*Group)
newGroups["default;"] = newGroup
m := NewManager(opts)
m.mtx.Lock()
m.groups = newGroups
m.mtx.Unlock()
restoreTime := baseTime.Add(tst.restoreDuration)
// First eval before restoration.
newGroup.Eval(suite.Context(), restoreTime)
// Restore happens here.
newGroup.RestoreForState(restoreTime)
got := newRule.ActiveAlerts()
for _, aa := range got {
testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
}
sort.Slice(got, func(i, j int) bool {
return labels.Compare(got[i].Labels, got[j].Labels) < 0
})
// Checking if we have restored it correctly.
if tst.noRestore {
testutil.Equals(t, tst.num, len(got))
for _, e := range got {
testutil.Equals(t, e.ActiveAt, restoreTime)
}
} else if tst.gracePeriod {
testutil.Equals(t, tst.num, len(got))
for _, e := range got {
testutil.Equals(t, opts.ForGracePeriod, e.ActiveAt.Add(alertForDuration).Sub(restoreTime))
}
} else {
exp := tst.alerts
testutil.Equals(t, len(exp), len(got))
for i, e := range exp {
testutil.Equals(t, e.Labels, got[i].Labels)
// Difference in time should be within 1e6 ns, i.e. 1ms
// (due to conversion between ns & ms, float64 & int64).
activeAtDiff := float64(e.ActiveAt.Unix() + int64(tst.downDuration/time.Second) - got[i].ActiveAt.Unix())
testutil.Assert(t, math.Abs(activeAtDiff) == 0, "'for' state restored time is wrong")
}
}
}
for _, tst := range tests {
testFunc(tst)
}
// Testing the grace period.
for _, duration := range []time.Duration{10 * time.Minute, 15 * time.Minute, 20 * time.Minute} {
evalTime := baseTime.Add(duration)
group.Eval(suite.Context(), evalTime)
}
testFunc(testInput{
restoreDuration: 25 * time.Minute,
alerts: []*Alert{},
gracePeriod: true,
num: 2,
})
}
func TestStaleness(t *testing.T) { func TestStaleness(t *testing.T) {
storage := testutil.NewStorage(t) storage := testutil.NewStorage(t)
defer storage.Close() defer storage.Close()
@ -169,6 +500,7 @@ func TestStaleness(t *testing.T) {
opts := &ManagerOptions{ opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(engine, storage), QueryFunc: EngineQueryFunc(engine, storage),
Appendable: storage, Appendable: storage,
TSDB: storage,
Context: context.Background(), Context: context.Background(),
Logger: log.NewNopLogger(), Logger: log.NewNopLogger(),
} }
@ -176,7 +508,7 @@ func TestStaleness(t *testing.T) {
expr, err := promql.ParseExpr("a + 1") expr, err := promql.ParseExpr("a + 1")
testutil.Ok(t, err) testutil.Ok(t, err)
rule := NewRecordingRule("a_plus_one", expr, labels.Labels{}) rule := NewRecordingRule("a_plus_one", expr, labels.Labels{})
group := NewGroup("default", "", time.Second, []Rule{rule}, opts) group := NewGroup("default", "", time.Second, []Rule{rule}, true, opts)
// A time series that has two samples and then goes stale. // A time series that has two samples and then goes stale.
app, _ := storage.Appender() app, _ := storage.Appender()
@ -244,7 +576,7 @@ func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.Point, error) {
func TestCopyState(t *testing.T) { func TestCopyState(t *testing.T) {
oldGroup := &Group{ oldGroup := &Group{
rules: []Rule{ rules: []Rule{
NewAlertingRule("alert", nil, 0, nil, nil, nil), NewAlertingRule("alert", nil, 0, nil, nil, true, nil),
NewRecordingRule("rule1", nil, nil), NewRecordingRule("rule1", nil, nil),
NewRecordingRule("rule2", nil, nil), NewRecordingRule("rule2", nil, nil),
NewRecordingRule("rule3", nil, nil), NewRecordingRule("rule3", nil, nil),
@ -265,7 +597,7 @@ func TestCopyState(t *testing.T) {
NewRecordingRule("rule3", nil, nil), NewRecordingRule("rule3", nil, nil),
NewRecordingRule("rule3", nil, nil), NewRecordingRule("rule3", nil, nil),
NewRecordingRule("rule3", nil, nil), NewRecordingRule("rule3", nil, nil),
NewAlertingRule("alert", nil, 0, nil, nil, nil), NewAlertingRule("alert", nil, 0, nil, nil, true, nil),
NewRecordingRule("rule1", nil, nil), NewRecordingRule("rule1", nil, nil),
NewRecordingRule("rule4", nil, nil), NewRecordingRule("rule4", nil, nil),
}, },

View file

@ -121,6 +121,7 @@ func (m rulesRetrieverMock) AlertingRules() []*rules.AlertingRule {
time.Second, time.Second,
labels.Labels{}, labels.Labels{},
labels.Labels{}, labels.Labels{},
true,
log.NewNopLogger(), log.NewNopLogger(),
) )
rule2 := rules.NewAlertingRule( rule2 := rules.NewAlertingRule(
@ -129,6 +130,7 @@ func (m rulesRetrieverMock) AlertingRules() []*rules.AlertingRule {
time.Second, time.Second,
labels.Labels{}, labels.Labels{},
labels.Labels{}, labels.Labels{},
true,
log.NewNopLogger(), log.NewNopLogger(),
) )
var r []*rules.AlertingRule var r []*rules.AlertingRule
@ -164,7 +166,7 @@ func (m rulesRetrieverMock) RuleGroups() []*rules.Group {
recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{}) recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{})
r = append(r, recordingRule) r = append(r, recordingRule)
group := rules.NewGroup("grp", "/path/to/file", time.Second, r, opts) group := rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts)
return []*rules.Group{group} return []*rules.Group{group}
} }