Limit number of alerts or series produced by a rule (#9260)

* Add limit to rules

Signed-off-by: Levi Harrison <git@leviharrison.dev>
This commit is contained in:
Levi Harrison 2021-09-15 03:48:26 -04:00 committed by GitHub
parent 1ea774f184
commit dc2f1993d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 140 additions and 16 deletions

View file

@ -78,6 +78,10 @@ name: <string>
# How often rules in the group are evaluated. # How often rules in the group are evaluated.
[ interval: <duration> | default = global.evaluation_interval ] [ interval: <duration> | default = global.evaluation_interval ]
# Limit the number of alerts and series individual rules can produce.
# 0 is no limit.
[ limit: <int> | default = 0 ]
rules: rules:
[ - <rule> ... ] [ - <rule> ... ]
``` ```

View file

@ -107,6 +107,7 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) {
type RuleGroup struct { type RuleGroup struct {
Name string `yaml:"name"` Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"` Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"` Rules []RuleNode `yaml:"rules"`
} }

View file

@ -297,7 +297,7 @@ const resolvedRetention = 15 * time.Minute
// Eval evaluates the rule expression and then creates pending alerts and fires // Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly. // or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) { func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
res, err := query(ctx, r.vector.String(), ts) res, err := query(ctx, r.vector.String(), ts)
if err != nil { if err != nil {
return nil, err return nil, err
@ -415,6 +415,12 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
} }
} }
numActive := len(r.active)
if limit != 0 && numActive > limit {
r.active = map[uint64]*Alert{}
return nil, errors.Errorf("exceeded limit of %d with %d alerts", limit, numActive)
}
return vec, nil return vec, nil
} }

View file

@ -170,7 +170,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
t.Logf("case %d", i) t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute) evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].Point.T = timestamp.FromTime(evalTime) result[0].Point.T = timestamp.FromTime(evalTime)
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil) res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0)
require.NoError(t, err) require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -252,7 +252,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalLabels.Eval( res, err := ruleWithoutExternalLabels.Eval(
suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -266,7 +266,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
} }
res, err = ruleWithExternalLabels.Eval( res, err = ruleWithExternalLabels.Eval(
suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -346,7 +346,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalURL.Eval( res, err := ruleWithoutExternalURL.Eval(
suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -360,7 +360,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
} }
res, err = ruleWithExternalURL.Eval( res, err = ruleWithExternalURL.Eval(
suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -417,7 +417,7 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := rule.Eval( res, err := rule.Eval(
suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0,
) )
require.NoError(t, err) require.NoError(t, err)
for _, smpl := range res { for _, smpl := range res {
@ -460,7 +460,61 @@ func TestAlertingRuleDuplicate(t *testing.T) {
"", "",
true, log.NewNopLogger(), true, log.NewNopLogger(),
) )
_, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil) _, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0)
require.Error(t, err) require.Error(t, err)
require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels") require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels")
} }
func TestAlertingRuleLimit(t *testing.T) {
storage := teststorage.New(t)
defer storage.Close()
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
now := time.Now()
suite := []struct {
limit int
err string
}{
{
limit: 0,
},
{
limit: 1,
},
{
limit: -1,
err: "exceeded limit of -1 with 1 alerts",
},
}
for _, test := range suite {
expr, _ := parser.ParseExpr(`1`)
rule := NewAlertingRule(
"foo",
expr,
time.Minute,
labels.FromStrings("test", "test"),
nil,
nil,
"",
true, log.NewNopLogger(),
)
_, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, test.limit)
if test.err == "" {
require.NoError(t, err)
} else {
require.Equal(t, test.err, err.Error())
}
}
}

View file

@ -213,7 +213,7 @@ type Rule interface {
// Labels of the rule. // Labels of the rule.
Labels() labels.Labels Labels() labels.Labels
// eval evaluates the rule, including any associated recording or alerting actions. // eval evaluates the rule, including any associated recording or alerting actions.
Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error) Eval(context.Context, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error)
// String returns a human-readable string representation of the rule. // String returns a human-readable string representation of the rule.
String() string String() string
// Query returns the rule query expression. // Query returns the rule query expression.
@ -244,6 +244,7 @@ type Group struct {
name string name string
file string file string
interval time.Duration interval time.Duration
limit int
rules []Rule rules []Rule
seriesInPreviousEval []map[string]labels.Labels // One per Rule. seriesInPreviousEval []map[string]labels.Labels // One per Rule.
staleSeries []labels.Labels staleSeries []labels.Labels
@ -267,6 +268,7 @@ type Group struct {
type GroupOptions struct { type GroupOptions struct {
Name, File string Name, File string
Interval time.Duration Interval time.Duration
Limit int
Rules []Rule Rules []Rule
ShouldRestore bool ShouldRestore bool
Opts *ManagerOptions Opts *ManagerOptions
@ -295,6 +297,7 @@ func NewGroup(o GroupOptions) *Group {
name: o.Name, name: o.Name,
file: o.File, file: o.File,
interval: o.Interval, interval: o.Interval,
limit: o.Limit,
rules: o.Rules, rules: o.Rules,
shouldRestore: o.ShouldRestore, shouldRestore: o.ShouldRestore,
opts: o.Opts, opts: o.Opts,
@ -319,6 +322,9 @@ func (g *Group) Rules() []Rule { return g.rules }
// Interval returns the group's interval. // Interval returns the group's interval.
func (g *Group) Interval() time.Duration { return g.interval } func (g *Group) Interval() time.Duration { return g.interval }
// Limit returns the group's limit.
func (g *Group) Limit() int { return g.limit }
func (g *Group) run(ctx context.Context) { func (g *Group) run(ctx context.Context) {
defer close(g.terminated) defer close(g.terminated)
@ -591,7 +597,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL) vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
if err != nil { if err != nil {
rule.SetHealth(HealthBad) rule.SetHealth(HealthBad)
rule.SetLastError(err) rule.SetLastError(err)
@ -850,6 +856,10 @@ func (g *Group) Equals(ng *Group) bool {
return false return false
} }
if g.limit != ng.limit {
return false
}
if len(g.rules) != len(ng.rules) { if len(g.rules) != len(ng.rules) {
return false return false
} }
@ -1086,6 +1096,7 @@ func (m *Manager) LoadGroups(
Name: rg.Name, Name: rg.Name,
File: fn, File: fn,
Interval: itv, Interval: itv,
Limit: rg.Limit,
Rules: rules, Rules: rules,
ShouldRestore: shouldRestore, ShouldRestore: shouldRestore,
Opts: m.opts, Opts: m.opts,

View file

@ -156,7 +156,7 @@ func TestAlertingRule(t *testing.T) {
evalTime := baseTime.Add(test.time) evalTime := baseTime.Add(test.time)
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil) res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0)
require.NoError(t, err) require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -305,7 +305,7 @@ func TestForStateAddSamples(t *testing.T) {
forState = float64(value.StaleNaN) forState = float64(value.StaleNaN)
} }
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil) res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0)
require.NoError(t, err) require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS' samples. var filteredRes promql.Vector // After removing 'ALERTS' samples.
@ -773,6 +773,12 @@ func TestUpdate(t *testing.T) {
} }
reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs) reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs)
// Update limit and reload.
for i := range rgs.Groups {
rgs.Groups[i].Limit = 1
}
reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs)
// Change group rules and reload. // Change group rules and reload.
for i, g := range rgs.Groups { for i, g := range rgs.Groups {
for j, r := range g.Rules { for j, r := range g.Rules {
@ -791,6 +797,7 @@ type ruleGroupsTest struct {
type ruleGroupTest struct { type ruleGroupTest struct {
Name string `yaml:"name"` Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"` Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []rulefmt.Rule `yaml:"rules"` Rules []rulefmt.Rule `yaml:"rules"`
} }
@ -812,6 +819,7 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest {
tmp = append(tmp, ruleGroupTest{ tmp = append(tmp, ruleGroupTest{
Name: g.Name, Name: g.Name,
Interval: g.Interval, Interval: g.Interval,
Limit: g.Limit,
Rules: rtmp, Rules: rtmp,
}) })
} }

View file

@ -73,7 +73,7 @@ func (rule *RecordingRule) Labels() labels.Labels {
} }
// Eval evaluates the rule and then overrides the metric names and labels accordingly. // Eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) { func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) {
vector, err := query(ctx, rule.vector.String(), ts) vector, err := query(ctx, rule.vector.String(), ts)
if err != nil { if err != nil {
return nil, err return nil, err
@ -99,6 +99,13 @@ func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFu
return nil, fmt.Errorf("vector contains metrics with the same labelset after applying rule labels") return nil, fmt.Errorf("vector contains metrics with the same labelset after applying rule labels")
} }
numSamples := len(vector)
if limit != 0 && numSamples > limit {
return nil, fmt.Errorf("exceeded limit %d with %d samples", limit, numSamples)
}
rule.SetHealth(HealthGood)
rule.SetLastError(err)
return vector, nil return vector, nil
} }

View file

@ -49,7 +49,9 @@ func TestRuleEval(t *testing.T) {
name string name string
expr parser.Expr expr parser.Expr
labels labels.Labels labels labels.Labels
limit int
result promql.Vector result promql.Vector
err string
}{ }{
{ {
name: "nolabels", name: "nolabels",
@ -69,12 +71,43 @@ func TestRuleEval(t *testing.T) {
Point: promql.Point{V: 1, T: timestamp.FromTime(now)}, Point: promql.Point{V: 1, T: timestamp.FromTime(now)},
}}, }},
}, },
{
name: "underlimit",
expr: &parser.NumberLiteral{Val: 1},
labels: labels.FromStrings("foo", "bar"),
limit: 2,
result: promql.Vector{promql.Sample{
Metric: labels.FromStrings("__name__", "underlimit", "foo", "bar"),
Point: promql.Point{V: 1, T: timestamp.FromTime(now)},
}},
},
{
name: "atlimit",
expr: &parser.NumberLiteral{Val: 1},
labels: labels.FromStrings("foo", "bar"),
limit: 1,
result: promql.Vector{promql.Sample{
Metric: labels.FromStrings("__name__", "atlimit", "foo", "bar"),
Point: promql.Point{V: 1, T: timestamp.FromTime(now)},
}},
},
{
name: "overlimit",
expr: &parser.NumberLiteral{Val: 1},
labels: labels.FromStrings("foo", "bar"),
limit: -1,
err: "exceeded limit -1 with 1 samples",
},
} }
for _, test := range suite { for _, test := range suite {
rule := NewRecordingRule(test.name, test.expr, test.labels) rule := NewRecordingRule(test.name, test.expr, test.labels)
result, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil) result, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, test.limit)
if test.err == "" {
require.NoError(t, err) require.NoError(t, err)
} else {
require.Equal(t, test.err, err.Error())
}
require.Equal(t, test.result, result) require.Equal(t, test.result, result)
} }
} }
@ -114,7 +147,7 @@ func TestRuleEvalDuplicate(t *testing.T) {
expr, _ := parser.ParseExpr(`vector(0) or label_replace(vector(0),"test","x","","")`) expr, _ := parser.ParseExpr(`vector(0) or label_replace(vector(0),"test","x","","")`)
rule := NewRecordingRule("foo", expr, labels.FromStrings("test", "test")) rule := NewRecordingRule("foo", expr, labels.FromStrings("test", "test"))
_, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil) _, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0)
require.Error(t, err) require.Error(t, err)
require.EqualError(t, err, "vector contains metrics with the same labelset after applying rule labels") require.EqualError(t, err, "vector contains metrics with the same labelset after applying rule labels")
} }