Add source_tenants fields to RuleGroup

This commit is contained in:
Dimitar Dimitrov 2021-11-25 13:44:29 +01:00
parent 63537ea653
commit 6ffb81244f
No known key found for this signature in database
GPG key ID: 4541B04E6C90EBC3
4 changed files with 66 additions and 30 deletions

View file

@ -116,10 +116,11 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) {
// RuleGroup is a list of sequentially evaluated recording and alerting rules. // RuleGroup is a list of sequentially evaluated recording and alerting rules.
type RuleGroup struct { type RuleGroup struct {
Name string `yaml:"name"` Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"` Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"` Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"` Rules []RuleNode `yaml:"rules"`
SourceTenants []string `yaml:"source_tenants,omitempty"`
} }
// Rule describes an alerting or recording rule. // Rule describes an alerting or recording rule.

View file

@ -36,6 +36,7 @@ groups:
- name: my-another-name - name: my-another-name
interval: 30s # defaults to global interval interval: 30s # defaults to global interval
source_tenants: [tenant-1]
rules: rules:
- alert: HighErrors - alert: HighErrors
expr: | expr: |

View file

@ -246,6 +246,7 @@ type Group struct {
interval time.Duration interval time.Duration
limit int limit int
rules []Rule rules []Rule
sourceTenants []string
seriesInPreviousEval []map[string]labels.Labels // One per Rule. seriesInPreviousEval []map[string]labels.Labels // One per Rule.
staleSeries []labels.Labels staleSeries []labels.Labels
opts *ManagerOptions opts *ManagerOptions
@ -270,6 +271,7 @@ type GroupOptions struct {
Interval time.Duration Interval time.Duration
Limit int Limit int
Rules []Rule Rules []Rule
SourceTenants []string
ShouldRestore bool ShouldRestore bool
Opts *ManagerOptions Opts *ManagerOptions
done chan struct{} done chan struct{}
@ -301,6 +303,7 @@ func NewGroup(o GroupOptions) *Group {
rules: o.Rules, rules: o.Rules,
shouldRestore: o.ShouldRestore, shouldRestore: o.ShouldRestore,
opts: o.Opts, opts: o.Opts,
sourceTenants: o.SourceTenants,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}), done: make(chan struct{}),
managerDone: o.done, managerDone: o.done,
@ -325,6 +328,9 @@ func (g *Group) Interval() time.Duration { return g.interval }
// Limit returns the group's limit. // Limit returns the group's limit.
func (g *Group) Limit() int { return g.limit } func (g *Group) Limit() int { return g.limit }
// SourceTenants returns the source tenants for the group.
func (g *Group) SourceTenants() []string { return g.sourceTenants }
func (g *Group) run(ctx context.Context) { func (g *Group) run(ctx context.Context) {
defer close(g.terminated) defer close(g.terminated)
@ -863,6 +869,16 @@ func (g *Group) Equals(ng *Group) bool {
return false return false
} }
if len(g.sourceTenants) != len(ng.sourceTenants) {
return false
}
for i, tenant := range g.sourceTenants {
if ng.sourceTenants[i] != tenant {
return false
}
}
for i, gr := range g.rules { for i, gr := range g.rules {
if gr.String() != ng.rules[i].String() { if gr.String() != ng.rules[i].String() {
return false return false
@ -887,20 +903,23 @@ type Manager struct {
// NotifyFunc sends notifications about a set of alerts generated by the given expression. // NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
type ContextFunc func(g *Group) context.Context
// ManagerOptions bundles options for the Manager. // ManagerOptions bundles options for the Manager.
type ManagerOptions struct { type ManagerOptions struct {
ExternalURL *url.URL ExternalURL *url.URL
QueryFunc QueryFunc QueryFunc QueryFunc
NotifyFunc NotifyFunc NotifyFunc NotifyFunc
Context context.Context Context context.Context
Appendable storage.Appendable FederatedContextFunc ContextFunc
Queryable storage.Queryable Appendable storage.Appendable
Logger log.Logger Queryable storage.Queryable
Registerer prometheus.Registerer Logger log.Logger
OutageTolerance time.Duration Registerer prometheus.Registerer
ForGracePeriod time.Duration OutageTolerance time.Duration
ResendDelay time.Duration ForGracePeriod time.Duration
GroupLoader GroupLoader ResendDelay time.Duration
GroupLoader GroupLoader
Metrics *Metrics Metrics *Metrics
} }
@ -992,11 +1011,16 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
newg.CopyState(oldg) newg.CopyState(oldg)
} }
wg.Done() wg.Done()
ctx := m.opts.Context
if len(newg.sourceTenants) > 0 {
ctx = m.opts.FederatedContextFunc(newg)
}
// Wait with starting evaluation until the rule manager // Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running // is told to run. This is necessary to avoid running
// queries against a bootstrapping storage. // queries against a bootstrapping storage.
<-m.block <-m.block
newg.run(m.opts.Context) newg.run(ctx)
}(newg) }(newg)
} }
@ -1097,6 +1121,7 @@ func (m *Manager) LoadGroups(
Interval: itv, Interval: itv,
Limit: rg.Limit, Limit: rg.Limit,
Rules: rules, Rules: rules,
SourceTenants: rg.SourceTenants,
ShouldRestore: shouldRestore, ShouldRestore: shouldRestore,
Opts: m.opts, Opts: m.opts,
done: m.done, done: m.done,

View file

@ -718,11 +718,12 @@ func TestUpdate(t *testing.T) {
} }
engine := promql.NewEngine(opts) engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{ ruleManager := NewManager(&ManagerOptions{
Appendable: st, Appendable: st,
Queryable: st, Queryable: st,
QueryFunc: EngineQueryFunc(engine, st), QueryFunc: EngineQueryFunc(engine, st),
Context: context.Background(), Context: context.Background(),
Logger: log.NewNopLogger(), FederatedContextFunc: func(*Group) context.Context { return context.Background() },
Logger: log.NewNopLogger(),
}) })
ruleManager.start() ruleManager.start()
defer ruleManager.Stop() defer ruleManager.Stop()
@ -787,6 +788,12 @@ func TestUpdate(t *testing.T) {
} }
} }
reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs) reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs)
// Change group source tenants and reload.
for i := range rgs.Groups {
rgs.Groups[i].SourceTenants = []string{"tenant-2"}
}
reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs)
} }
// ruleGroupsTest for running tests over rules. // ruleGroupsTest for running tests over rules.
@ -796,10 +803,11 @@ type ruleGroupsTest struct {
// ruleGroupTest forms a testing struct for running tests over rules. // ruleGroupTest forms a testing struct for running tests over rules.
type ruleGroupTest struct { type ruleGroupTest struct {
Name string `yaml:"name"` Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"` Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"` Limit int `yaml:"limit,omitempty"`
Rules []rulefmt.Rule `yaml:"rules"` Rules []rulefmt.Rule `yaml:"rules"`
SourceTenants []string `yaml:"source_tenants,omitempty"`
} }
func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest {
@ -818,10 +826,11 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest {
}) })
} }
tmp = append(tmp, ruleGroupTest{ tmp = append(tmp, ruleGroupTest{
Name: g.Name, Name: g.Name,
Interval: g.Interval, Interval: g.Interval,
Limit: g.Limit, Limit: g.Limit,
Rules: rtmp, Rules: rtmp,
SourceTenants: g.SourceTenants,
}) })
} }
return ruleGroupsTest{ return ruleGroupsTest{