diff --git a/model/rulefmt/rulefmt.go b/model/rulefmt/rulefmt.go index 5332514ed..a550af724 100644 --- a/model/rulefmt/rulefmt.go +++ b/model/rulefmt/rulefmt.go @@ -116,10 +116,11 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) { // RuleGroup is a list of sequentially evaluated recording and alerting rules. type RuleGroup struct { - Name string `yaml:"name"` - Interval model.Duration `yaml:"interval,omitempty"` - Limit int `yaml:"limit,omitempty"` - Rules []RuleNode `yaml:"rules"` + Name string `yaml:"name"` + Interval model.Duration `yaml:"interval,omitempty"` + Limit int `yaml:"limit,omitempty"` + Rules []RuleNode `yaml:"rules"` + SourceTenants []string `yaml:"source_tenants,omitempty"` } // Rule describes an alerting or recording rule. diff --git a/model/rulefmt/testdata/test.yaml b/model/rulefmt/testdata/test.yaml index 6810b2cbd..d208bf197 100644 --- a/model/rulefmt/testdata/test.yaml +++ b/model/rulefmt/testdata/test.yaml @@ -36,6 +36,7 @@ groups: - name: my-another-name interval: 30s # defaults to global interval + source_tenants: [tenant-1] rules: - alert: HighErrors expr: | diff --git a/rules/fixtures/rules_with_source_tenants.yaml b/rules/fixtures/rules_with_source_tenants.yaml new file mode 100644 index 000000000..42f6e7ab4 --- /dev/null +++ b/rules/fixtures/rules_with_source_tenants.yaml @@ -0,0 +1,6 @@ +groups: + - name: test + rules: + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + source_tenants: [tenant-1, tenant-2] diff --git a/rules/manager.go b/rules/manager.go index 7499dbfca..da0964b46 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -246,6 +246,7 @@ type Group struct { interval time.Duration limit int rules []Rule + sourceTenants []string seriesInPreviousEval []map[string]labels.Labels // One per Rule. staleSeries []labels.Labels opts *ManagerOptions @@ -270,6 +271,7 @@ type GroupOptions struct { Interval time.Duration Limit int Rules []Rule + SourceTenants []string ShouldRestore bool Opts *ManagerOptions done chan struct{} @@ -301,6 +303,7 @@ func NewGroup(o GroupOptions) *Group { rules: o.Rules, shouldRestore: o.ShouldRestore, opts: o.Opts, + sourceTenants: o.SourceTenants, seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), done: make(chan struct{}), managerDone: o.done, @@ -325,6 +328,10 @@ func (g *Group) Interval() time.Duration { return g.interval } // Limit returns the group's limit. func (g *Group) Limit() int { return g.limit } +// SourceTenants returns the source tenants for the group. +// If it's empty or nil, then the owning user/tenant is considered to be the source tenant. +func (g *Group) SourceTenants() []string { return g.sourceTenants } + func (g *Group) run(ctx context.Context) { defer close(g.terminated) @@ -869,6 +876,23 @@ func (g *Group) Equals(ng *Group) bool { } } + // compare source tenants ignoring their order + if len(g.sourceTenants) != len(ng.sourceTenants) { + return false + } + + thisSourceTenants := make(map[string]struct{}, len(g.sourceTenants)) + + for _, tenant := range g.sourceTenants { + thisSourceTenants[tenant] = struct{}{} + } + + for _, tenant := range ng.sourceTenants { + if _, ok := thisSourceTenants[tenant]; !ok { + return false + } + } + return true } @@ -887,20 +911,25 @@ type Manager struct { // NotifyFunc sends notifications about a set of alerts generated by the given expression. type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) +type ContextWrapFunc func(ctx context.Context, g *Group) context.Context + // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + // GroupEvaluationContextFunc will be called to wrap Context based on the group being evaluated. + // Will be skipped if nil. + GroupEvaluationContextFunc ContextWrapFunc + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + GroupLoader GroupLoader Metrics *Metrics } @@ -992,11 +1021,16 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels newg.CopyState(oldg) } wg.Done() + + ctx := m.opts.Context + if m.opts.GroupEvaluationContextFunc != nil { + ctx = m.opts.GroupEvaluationContextFunc(ctx, newg) + } // Wait with starting evaluation until the rule manager // is told to run. This is necessary to avoid running // queries against a bootstrapping storage. <-m.block - newg.run(m.opts.Context) + newg.run(ctx) }(newg) } @@ -1097,6 +1131,7 @@ func (m *Manager) LoadGroups( Interval: itv, Limit: rg.Limit, Rules: rules, + SourceTenants: rg.SourceTenants, ShouldRestore: shouldRestore, Opts: m.opts, done: m.done, diff --git a/rules/manager_test.go b/rules/manager_test.go index 5d9c602b9..5d6de382f 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -787,6 +787,103 @@ func TestUpdate(t *testing.T) { } } reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs) + + // Change group source tenants and reload. + for i := range rgs.Groups { + rgs.Groups[i].SourceTenants = []string{"tenant-2"} + } + reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs) +} + +func TestUpdateSetsSourceTenants(t *testing.T) { + st := teststorage.New(t) + defer st.Close() + + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := promql.NewEngine(opts) + ruleManager := NewManager(&ManagerOptions{ + Appendable: st, + Queryable: st, + QueryFunc: EngineQueryFunc(engine, st), + Context: context.Background(), + Logger: log.NewNopLogger(), + }) + ruleManager.start() + defer ruleManager.Stop() + + rgs, errs := rulefmt.ParseFile("fixtures/rules_with_source_tenants.yaml") + require.Empty(t, errs, "file parsing failures") + + tmpFile, err := ioutil.TempFile("", "rules.test.*.yaml") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + defer tmpFile.Close() + + reloadRules(rgs, t, tmpFile, ruleManager, 0) + + // check that all source tenants were actually set + require.Len(t, ruleManager.groups, len(rgs.Groups)) + + for _, expectedGroup := range rgs.Groups { + actualGroup, ok := ruleManager.groups[GroupKey(tmpFile.Name(), expectedGroup.Name)] + + require.True(t, ok, "actual groups don't contain at one of the expected groups") + require.ElementsMatch(t, expectedGroup.SourceTenants, actualGroup.SourceTenants()) + } +} + +func TestGroupEvaluationContextFuncIsCalledWhenSupplied(t *testing.T) { + type testContextKeyType string + var testContextKey testContextKeyType = "TestGroupEvaluationContextFuncIsCalledWhenSupplied" + oldContextTestValue := context.Background().Value(testContextKey) + + contextTestValueChannel := make(chan interface{}) + mockQueryFunc := func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { + contextTestValueChannel <- ctx.Value(testContextKey) + return promql.Vector{}, nil + } + + mockContextWrapFunc := func(ctx context.Context, g *Group) context.Context { + return context.WithValue(ctx, testContextKey, 42) + } + + st := teststorage.New(t) + defer st.Close() + + ruleManager := NewManager(&ManagerOptions{ + Appendable: st, + Queryable: st, + QueryFunc: mockQueryFunc, + Context: context.Background(), + Logger: log.NewNopLogger(), + GroupEvaluationContextFunc: mockContextWrapFunc, + }) + + rgs, errs := rulefmt.ParseFile("fixtures/rules_with_source_tenants.yaml") + require.Empty(t, errs, "file parsing failures") + + tmpFile, err := ioutil.TempFile("", "rules.test.*.yaml") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + defer tmpFile.Close() + + // no filesystem is harmed when running this test, set the interval low + reloadRules(rgs, t, tmpFile, ruleManager, 10*time.Millisecond) + + ruleManager.start() + defer ruleManager.Stop() + + // check that all source tenants were actually set + require.Len(t, ruleManager.groups, len(rgs.Groups)) + + require.Nil(t, oldContextTestValue, "Context contained test key before the test, impossible") + newContextTestValue := <-contextTestValueChannel + require.Equal(t, 42, newContextTestValue, "Context does not contain the correct value that should be injected") } // ruleGroupsTest for running tests over rules. @@ -796,10 +893,11 @@ type ruleGroupsTest struct { // ruleGroupTest forms a testing struct for running tests over rules. type ruleGroupTest struct { - Name string `yaml:"name"` - Interval model.Duration `yaml:"interval,omitempty"` - Limit int `yaml:"limit,omitempty"` - Rules []rulefmt.Rule `yaml:"rules"` + Name string `yaml:"name"` + Interval model.Duration `yaml:"interval,omitempty"` + Limit int `yaml:"limit,omitempty"` + Rules []rulefmt.Rule `yaml:"rules"` + SourceTenants []string `yaml:"source_tenants,omitempty"` } func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { @@ -818,10 +916,11 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { }) } tmp = append(tmp, ruleGroupTest{ - Name: g.Name, - Interval: g.Interval, - Limit: g.Limit, - Rules: rtmp, + Name: g.Name, + Interval: g.Interval, + Limit: g.Limit, + Rules: rtmp, + SourceTenants: g.SourceTenants, }) } return ruleGroupsTest{ @@ -829,14 +928,23 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { } } -func reloadAndValidate(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager, expected map[string]labels.Labels, ogs map[string]*Group) { +func reloadRules(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager, interval time.Duration) { + if interval == 0 { + interval = 10 * time.Second + } + bs, err := yaml.Marshal(formatRules(rgs)) require.NoError(t, err) - tmpFile.Seek(0, 0) + _, _ = tmpFile.Seek(0, 0) _, err = tmpFile.Write(bs) require.NoError(t, err) - err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "") + err = ruleManager.Update(interval, []string{tmpFile.Name()}, nil, "") require.NoError(t, err) +} + +func reloadAndValidate(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager, expected map[string]labels.Labels, ogs map[string]*Group) { + reloadRules(rgs, t, tmpFile, ruleManager, 0) + for h, g := range ruleManager.groups { if ogs[h] == g { t.Fail()