From 6ffb81244ffa5679e6490298b3b90e7e75ed929b Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Thu, 25 Nov 2021 13:44:29 +0100 Subject: [PATCH] Add source_tenants fields to RuleGroup --- model/rulefmt/rulefmt.go | 9 +++--- model/rulefmt/testdata/test.yaml | 1 + rules/manager.go | 51 ++++++++++++++++++++++++-------- rules/manager_test.go | 35 ++++++++++++++-------- 4 files changed, 66 insertions(+), 30 deletions(-) diff --git a/model/rulefmt/rulefmt.go b/model/rulefmt/rulefmt.go index 5332514ed5..a550af7240 100644 --- a/model/rulefmt/rulefmt.go +++ b/model/rulefmt/rulefmt.go @@ -116,10 +116,11 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) { // RuleGroup is a list of sequentially evaluated recording and alerting rules. type RuleGroup struct { - Name string `yaml:"name"` - Interval model.Duration `yaml:"interval,omitempty"` - Limit int `yaml:"limit,omitempty"` - Rules []RuleNode `yaml:"rules"` + Name string `yaml:"name"` + Interval model.Duration `yaml:"interval,omitempty"` + Limit int `yaml:"limit,omitempty"` + Rules []RuleNode `yaml:"rules"` + SourceTenants []string `yaml:"source_tenants,omitempty"` } // Rule describes an alerting or recording rule. diff --git a/model/rulefmt/testdata/test.yaml b/model/rulefmt/testdata/test.yaml index 6810b2cbd3..d208bf1970 100644 --- a/model/rulefmt/testdata/test.yaml +++ b/model/rulefmt/testdata/test.yaml @@ -36,6 +36,7 @@ groups: - name: my-another-name interval: 30s # defaults to global interval + source_tenants: [tenant-1] rules: - alert: HighErrors expr: | diff --git a/rules/manager.go b/rules/manager.go index 7499dbfcaf..69b6cad639 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -246,6 +246,7 @@ type Group struct { interval time.Duration limit int rules []Rule + sourceTenants []string seriesInPreviousEval []map[string]labels.Labels // One per Rule. staleSeries []labels.Labels opts *ManagerOptions @@ -270,6 +271,7 @@ type GroupOptions struct { Interval time.Duration Limit int Rules []Rule + SourceTenants []string ShouldRestore bool Opts *ManagerOptions done chan struct{} @@ -301,6 +303,7 @@ func NewGroup(o GroupOptions) *Group { rules: o.Rules, shouldRestore: o.ShouldRestore, opts: o.Opts, + sourceTenants: o.SourceTenants, seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), done: make(chan struct{}), managerDone: o.done, @@ -325,6 +328,9 @@ func (g *Group) Interval() time.Duration { return g.interval } // Limit returns the group's limit. func (g *Group) Limit() int { return g.limit } +// SourceTenants returns the source tenants for the group. +func (g *Group) SourceTenants() []string { return g.sourceTenants } + func (g *Group) run(ctx context.Context) { defer close(g.terminated) @@ -863,6 +869,16 @@ func (g *Group) Equals(ng *Group) bool { return false } + if len(g.sourceTenants) != len(ng.sourceTenants) { + return false + } + + for i, tenant := range g.sourceTenants { + if ng.sourceTenants[i] != tenant { + return false + } + } + for i, gr := range g.rules { if gr.String() != ng.rules[i].String() { return false @@ -887,20 +903,23 @@ type Manager struct { // NotifyFunc sends notifications about a set of alerts generated by the given expression. type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) +type ContextFunc func(g *Group) context.Context + // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + FederatedContextFunc ContextFunc + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + GroupLoader GroupLoader Metrics *Metrics } @@ -992,11 +1011,16 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels newg.CopyState(oldg) } wg.Done() + + ctx := m.opts.Context + if len(newg.sourceTenants) > 0 { + ctx = m.opts.FederatedContextFunc(newg) + } // Wait with starting evaluation until the rule manager // is told to run. This is necessary to avoid running // queries against a bootstrapping storage. <-m.block - newg.run(m.opts.Context) + newg.run(ctx) }(newg) } @@ -1097,6 +1121,7 @@ func (m *Manager) LoadGroups( Interval: itv, Limit: rg.Limit, Rules: rules, + SourceTenants: rg.SourceTenants, ShouldRestore: shouldRestore, Opts: m.opts, done: m.done, diff --git a/rules/manager_test.go b/rules/manager_test.go index 5d9c602b9e..ae77caa6b9 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -718,11 +718,12 @@ func TestUpdate(t *testing.T) { } engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ - Appendable: st, - Queryable: st, - QueryFunc: EngineQueryFunc(engine, st), - Context: context.Background(), - Logger: log.NewNopLogger(), + Appendable: st, + Queryable: st, + QueryFunc: EngineQueryFunc(engine, st), + Context: context.Background(), + FederatedContextFunc: func(*Group) context.Context { return context.Background() }, + Logger: log.NewNopLogger(), }) ruleManager.start() defer ruleManager.Stop() @@ -787,6 +788,12 @@ func TestUpdate(t *testing.T) { } } reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs) + + // Change group source tenants and reload. + for i := range rgs.Groups { + rgs.Groups[i].SourceTenants = []string{"tenant-2"} + } + reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs) } // ruleGroupsTest for running tests over rules. @@ -796,10 +803,11 @@ type ruleGroupsTest struct { // ruleGroupTest forms a testing struct for running tests over rules. type ruleGroupTest struct { - Name string `yaml:"name"` - Interval model.Duration `yaml:"interval,omitempty"` - Limit int `yaml:"limit,omitempty"` - Rules []rulefmt.Rule `yaml:"rules"` + Name string `yaml:"name"` + Interval model.Duration `yaml:"interval,omitempty"` + Limit int `yaml:"limit,omitempty"` + Rules []rulefmt.Rule `yaml:"rules"` + SourceTenants []string `yaml:"source_tenants,omitempty"` } func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { @@ -818,10 +826,11 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { }) } tmp = append(tmp, ruleGroupTest{ - Name: g.Name, - Interval: g.Interval, - Limit: g.Limit, - Rules: rtmp, + Name: g.Name, + Interval: g.Interval, + Limit: g.Limit, + Rules: rtmp, + SourceTenants: g.SourceTenants, }) } return ruleGroupsTest{