From 6ffb81244ffa5679e6490298b3b90e7e75ed929b Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Thu, 25 Nov 2021 13:44:29 +0100 Subject: [PATCH 1/7] Add source_tenants fields to RuleGroup --- model/rulefmt/rulefmt.go | 9 +++--- model/rulefmt/testdata/test.yaml | 1 + rules/manager.go | 51 ++++++++++++++++++++++++-------- rules/manager_test.go | 35 ++++++++++++++-------- 4 files changed, 66 insertions(+), 30 deletions(-) diff --git a/model/rulefmt/rulefmt.go b/model/rulefmt/rulefmt.go index 5332514ed..a550af724 100644 --- a/model/rulefmt/rulefmt.go +++ b/model/rulefmt/rulefmt.go @@ -116,10 +116,11 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) { // RuleGroup is a list of sequentially evaluated recording and alerting rules. type RuleGroup struct { - Name string `yaml:"name"` - Interval model.Duration `yaml:"interval,omitempty"` - Limit int `yaml:"limit,omitempty"` - Rules []RuleNode `yaml:"rules"` + Name string `yaml:"name"` + Interval model.Duration `yaml:"interval,omitempty"` + Limit int `yaml:"limit,omitempty"` + Rules []RuleNode `yaml:"rules"` + SourceTenants []string `yaml:"source_tenants,omitempty"` } // Rule describes an alerting or recording rule. diff --git a/model/rulefmt/testdata/test.yaml b/model/rulefmt/testdata/test.yaml index 6810b2cbd..d208bf197 100644 --- a/model/rulefmt/testdata/test.yaml +++ b/model/rulefmt/testdata/test.yaml @@ -36,6 +36,7 @@ groups: - name: my-another-name interval: 30s # defaults to global interval + source_tenants: [tenant-1] rules: - alert: HighErrors expr: | diff --git a/rules/manager.go b/rules/manager.go index 7499dbfca..69b6cad63 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -246,6 +246,7 @@ type Group struct { interval time.Duration limit int rules []Rule + sourceTenants []string seriesInPreviousEval []map[string]labels.Labels // One per Rule. staleSeries []labels.Labels opts *ManagerOptions @@ -270,6 +271,7 @@ type GroupOptions struct { Interval time.Duration Limit int Rules []Rule + SourceTenants []string ShouldRestore bool Opts *ManagerOptions done chan struct{} @@ -301,6 +303,7 @@ func NewGroup(o GroupOptions) *Group { rules: o.Rules, shouldRestore: o.ShouldRestore, opts: o.Opts, + sourceTenants: o.SourceTenants, seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), done: make(chan struct{}), managerDone: o.done, @@ -325,6 +328,9 @@ func (g *Group) Interval() time.Duration { return g.interval } // Limit returns the group's limit. func (g *Group) Limit() int { return g.limit } +// SourceTenants returns the source tenants for the group. +func (g *Group) SourceTenants() []string { return g.sourceTenants } + func (g *Group) run(ctx context.Context) { defer close(g.terminated) @@ -863,6 +869,16 @@ func (g *Group) Equals(ng *Group) bool { return false } + if len(g.sourceTenants) != len(ng.sourceTenants) { + return false + } + + for i, tenant := range g.sourceTenants { + if ng.sourceTenants[i] != tenant { + return false + } + } + for i, gr := range g.rules { if gr.String() != ng.rules[i].String() { return false @@ -887,20 +903,23 @@ type Manager struct { // NotifyFunc sends notifications about a set of alerts generated by the given expression. type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) +type ContextFunc func(g *Group) context.Context + // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + FederatedContextFunc ContextFunc + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + GroupLoader GroupLoader Metrics *Metrics } @@ -992,11 +1011,16 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels newg.CopyState(oldg) } wg.Done() + + ctx := m.opts.Context + if len(newg.sourceTenants) > 0 { + ctx = m.opts.FederatedContextFunc(newg) + } // Wait with starting evaluation until the rule manager // is told to run. This is necessary to avoid running // queries against a bootstrapping storage. <-m.block - newg.run(m.opts.Context) + newg.run(ctx) }(newg) } @@ -1097,6 +1121,7 @@ func (m *Manager) LoadGroups( Interval: itv, Limit: rg.Limit, Rules: rules, + SourceTenants: rg.SourceTenants, ShouldRestore: shouldRestore, Opts: m.opts, done: m.done, diff --git a/rules/manager_test.go b/rules/manager_test.go index 5d9c602b9..ae77caa6b 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -718,11 +718,12 @@ func TestUpdate(t *testing.T) { } engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ - Appendable: st, - Queryable: st, - QueryFunc: EngineQueryFunc(engine, st), - Context: context.Background(), - Logger: log.NewNopLogger(), + Appendable: st, + Queryable: st, + QueryFunc: EngineQueryFunc(engine, st), + Context: context.Background(), + FederatedContextFunc: func(*Group) context.Context { return context.Background() }, + Logger: log.NewNopLogger(), }) ruleManager.start() defer ruleManager.Stop() @@ -787,6 +788,12 @@ func TestUpdate(t *testing.T) { } } reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs) + + // Change group source tenants and reload. + for i := range rgs.Groups { + rgs.Groups[i].SourceTenants = []string{"tenant-2"} + } + reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs) } // ruleGroupsTest for running tests over rules. @@ -796,10 +803,11 @@ type ruleGroupsTest struct { // ruleGroupTest forms a testing struct for running tests over rules. type ruleGroupTest struct { - Name string `yaml:"name"` - Interval model.Duration `yaml:"interval,omitempty"` - Limit int `yaml:"limit,omitempty"` - Rules []rulefmt.Rule `yaml:"rules"` + Name string `yaml:"name"` + Interval model.Duration `yaml:"interval,omitempty"` + Limit int `yaml:"limit,omitempty"` + Rules []rulefmt.Rule `yaml:"rules"` + SourceTenants []string `yaml:"source_tenants,omitempty"` } func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { @@ -818,10 +826,11 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { }) } tmp = append(tmp, ruleGroupTest{ - Name: g.Name, - Interval: g.Interval, - Limit: g.Limit, - Rules: rtmp, + Name: g.Name, + Interval: g.Interval, + Limit: g.Limit, + Rules: rtmp, + SourceTenants: g.SourceTenants, }) } return ruleGroupsTest{ From 42a7f1e2106359114ac7486e3919a45f20f5a5fc Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Thu, 25 Nov 2021 13:47:32 +0100 Subject: [PATCH 2/7] Add some godocs to ManagerOptions --- rules/manager.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rules/manager.go b/rules/manager.go index 69b6cad63..17b8c4dca 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -907,10 +907,11 @@ type ContextFunc func(g *Group) context.Context // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + // FederatedContextFunc will be called to obtain a context when evaluating rules with non-empty SourceTenants FederatedContextFunc ContextFunc Appendable storage.Appendable Queryable storage.Queryable From 75d3c11278d47147e13977f5bca6a26b7b420b8d Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 26 Nov 2021 14:03:40 +0100 Subject: [PATCH 3/7] Repurpose FederatedContextFunc into GroupEvaluationContextFunc --- rules/manager.go | 27 ++++++++++++++------------- rules/manager_test.go | 11 +++++------ 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/rules/manager.go b/rules/manager.go index 17b8c4dca..973a47161 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -903,7 +903,7 @@ type Manager struct { // NotifyFunc sends notifications about a set of alerts generated by the given expression. type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) -type ContextFunc func(g *Group) context.Context +type ContextWrapFunc func(ctx context.Context, g *Group) context.Context // ManagerOptions bundles options for the Manager. type ManagerOptions struct { @@ -911,16 +911,17 @@ type ManagerOptions struct { QueryFunc QueryFunc NotifyFunc NotifyFunc Context context.Context - // FederatedContextFunc will be called to obtain a context when evaluating rules with non-empty SourceTenants - FederatedContextFunc ContextFunc - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - GroupLoader GroupLoader + // GroupEvaluationContextFunc will be called to wrap Context based on the group being evaluated. + // Will be skipped if nil. + GroupEvaluationContextFunc ContextWrapFunc + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + GroupLoader GroupLoader Metrics *Metrics } @@ -1014,8 +1015,8 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels wg.Done() ctx := m.opts.Context - if len(newg.sourceTenants) > 0 { - ctx = m.opts.FederatedContextFunc(newg) + if m.opts.GroupEvaluationContextFunc != nil { + ctx = m.opts.GroupEvaluationContextFunc(ctx, newg) } // Wait with starting evaluation until the rule manager // is told to run. This is necessary to avoid running diff --git a/rules/manager_test.go b/rules/manager_test.go index ae77caa6b..ff2f42dd4 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -718,12 +718,11 @@ func TestUpdate(t *testing.T) { } engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ - Appendable: st, - Queryable: st, - QueryFunc: EngineQueryFunc(engine, st), - Context: context.Background(), - FederatedContextFunc: func(*Group) context.Context { return context.Background() }, - Logger: log.NewNopLogger(), + Appendable: st, + Queryable: st, + QueryFunc: EngineQueryFunc(engine, st), + Context: context.Background(), + Logger: log.NewNopLogger(), }) ruleManager.start() defer ruleManager.Stop() From a97576fc00bc821843726faaa48582213a1499c7 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 26 Nov 2021 14:04:46 +0100 Subject: [PATCH 4/7] Ignore order when comparing the source tenants of two rule groups --- rules/manager.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/rules/manager.go b/rules/manager.go index 973a47161..9b2482951 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -869,18 +869,25 @@ func (g *Group) Equals(ng *Group) bool { return false } - if len(g.sourceTenants) != len(ng.sourceTenants) { - return false - } - - for i, tenant := range g.sourceTenants { - if ng.sourceTenants[i] != tenant { + for i, gr := range g.rules { + if gr.String() != ng.rules[i].String() { return false } } - for i, gr := range g.rules { - if gr.String() != ng.rules[i].String() { + // compare source tenants ignoring their order + if len(g.sourceTenants) != len(ng.sourceTenants) { + return false + } + + thisSourceTenants := make(map[string]struct{}, len(g.sourceTenants)) + + for _, tenant := range g.sourceTenants { + thisSourceTenants[tenant] = struct{}{} + } + + for _, tenant := range ng.sourceTenants { + if _, ok := thisSourceTenants[tenant]; !ok { return false } } From f17d3a71aa5ef9799d7cac52b26f704d7a87fa92 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 26 Nov 2021 14:08:21 +0100 Subject: [PATCH 5/7] Improve godoc of Group.SourceTenants() --- rules/manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rules/manager.go b/rules/manager.go index 9b2482951..da0964b46 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -329,6 +329,7 @@ func (g *Group) Interval() time.Duration { return g.interval } func (g *Group) Limit() int { return g.limit } // SourceTenants returns the source tenants for the group. +// If it's empty or nil, then the owning user/tenant is considered to be the source tenant. func (g *Group) SourceTenants() []string { return g.sourceTenants } func (g *Group) run(ctx context.Context) { From a16235f5f48701c1817d273345f3ca71fb14166d Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Mon, 29 Nov 2021 13:53:23 +0100 Subject: [PATCH 6/7] Test loading rule groups with source tenants --- rules/fixtures/rules_with_source_tenants.yaml | 6 +++ rules/manager_test.go | 51 ++++++++++++++++++- 2 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 rules/fixtures/rules_with_source_tenants.yaml diff --git a/rules/fixtures/rules_with_source_tenants.yaml b/rules/fixtures/rules_with_source_tenants.yaml new file mode 100644 index 000000000..42f6e7ab4 --- /dev/null +++ b/rules/fixtures/rules_with_source_tenants.yaml @@ -0,0 +1,6 @@ +groups: + - name: test + rules: + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + source_tenants: [tenant-1, tenant-2] diff --git a/rules/manager_test.go b/rules/manager_test.go index ff2f42dd4..db2e2b468 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -795,6 +795,48 @@ func TestUpdate(t *testing.T) { reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs) } +func TestUpdateSetsSourceTenants(t *testing.T) { + st := teststorage.New(t) + defer st.Close() + + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := promql.NewEngine(opts) + ruleManager := NewManager(&ManagerOptions{ + Appendable: st, + Queryable: st, + QueryFunc: EngineQueryFunc(engine, st), + Context: context.Background(), + Logger: log.NewNopLogger(), + }) + ruleManager.start() + defer ruleManager.Stop() + + rgs, errs := rulefmt.ParseFile("fixtures/rules_with_source_tenants.yaml") + require.Equal(t, 0, len(errs), "file parsing failures") + + tmpFile, err := ioutil.TempFile("", "rules.test.*.yaml") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + defer tmpFile.Close() + + reloadRules(rgs, t, tmpFile, ruleManager) + + // check that all source tenants were actually set + require.Len(t, ruleManager.groups, len(rgs.Groups)) + + for _, expectedGroup := range rgs.Groups { + actualGroup, ok := ruleManager.groups[GroupKey(tmpFile.Name(), expectedGroup.Name)] + + require.True(t, ok, "actual groups don't contain at one of the expected groups") + require.ElementsMatch(t, expectedGroup.SourceTenants, actualGroup.SourceTenants()) + } +} + // ruleGroupsTest for running tests over rules. type ruleGroupsTest struct { Groups []ruleGroupTest `yaml:"groups"` @@ -837,14 +879,19 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { } } -func reloadAndValidate(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager, expected map[string]labels.Labels, ogs map[string]*Group) { +func reloadRules(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager) { bs, err := yaml.Marshal(formatRules(rgs)) require.NoError(t, err) - tmpFile.Seek(0, 0) + _, _ = tmpFile.Seek(0, 0) _, err = tmpFile.Write(bs) require.NoError(t, err) err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "") require.NoError(t, err) +} + +func reloadAndValidate(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager, expected map[string]labels.Labels, ogs map[string]*Group) { + reloadRules(rgs, t, tmpFile, ruleManager) + for h, g := range ruleManager.groups { if ogs[h] == g { t.Fail() From b55a51a11856482a9a54c3b01c5cc1477932f4eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Mon, 29 Nov 2021 18:30:43 +0100 Subject: [PATCH 7/7] Add test of Manager using the supplied context function. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tests the the query being executed uses a context modified by the function. Also modify reloadRules utility function to take time interval for the Manager so the test runs faster. We'll work on synthetic clock later. Signed-off-by: György Krajcsovits --- rules/manager_test.go | 63 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/rules/manager_test.go b/rules/manager_test.go index db2e2b468..5d6de382f 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -817,14 +817,14 @@ func TestUpdateSetsSourceTenants(t *testing.T) { defer ruleManager.Stop() rgs, errs := rulefmt.ParseFile("fixtures/rules_with_source_tenants.yaml") - require.Equal(t, 0, len(errs), "file parsing failures") + require.Empty(t, errs, "file parsing failures") tmpFile, err := ioutil.TempFile("", "rules.test.*.yaml") require.NoError(t, err) defer os.Remove(tmpFile.Name()) defer tmpFile.Close() - reloadRules(rgs, t, tmpFile, ruleManager) + reloadRules(rgs, t, tmpFile, ruleManager, 0) // check that all source tenants were actually set require.Len(t, ruleManager.groups, len(rgs.Groups)) @@ -837,6 +837,55 @@ func TestUpdateSetsSourceTenants(t *testing.T) { } } +func TestGroupEvaluationContextFuncIsCalledWhenSupplied(t *testing.T) { + type testContextKeyType string + var testContextKey testContextKeyType = "TestGroupEvaluationContextFuncIsCalledWhenSupplied" + oldContextTestValue := context.Background().Value(testContextKey) + + contextTestValueChannel := make(chan interface{}) + mockQueryFunc := func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { + contextTestValueChannel <- ctx.Value(testContextKey) + return promql.Vector{}, nil + } + + mockContextWrapFunc := func(ctx context.Context, g *Group) context.Context { + return context.WithValue(ctx, testContextKey, 42) + } + + st := teststorage.New(t) + defer st.Close() + + ruleManager := NewManager(&ManagerOptions{ + Appendable: st, + Queryable: st, + QueryFunc: mockQueryFunc, + Context: context.Background(), + Logger: log.NewNopLogger(), + GroupEvaluationContextFunc: mockContextWrapFunc, + }) + + rgs, errs := rulefmt.ParseFile("fixtures/rules_with_source_tenants.yaml") + require.Empty(t, errs, "file parsing failures") + + tmpFile, err := ioutil.TempFile("", "rules.test.*.yaml") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + defer tmpFile.Close() + + // no filesystem is harmed when running this test, set the interval low + reloadRules(rgs, t, tmpFile, ruleManager, 10*time.Millisecond) + + ruleManager.start() + defer ruleManager.Stop() + + // check that all source tenants were actually set + require.Len(t, ruleManager.groups, len(rgs.Groups)) + + require.Nil(t, oldContextTestValue, "Context contained test key before the test, impossible") + newContextTestValue := <-contextTestValueChannel + require.Equal(t, 42, newContextTestValue, "Context does not contain the correct value that should be injected") +} + // ruleGroupsTest for running tests over rules. type ruleGroupsTest struct { Groups []ruleGroupTest `yaml:"groups"` @@ -879,18 +928,22 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { } } -func reloadRules(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager) { +func reloadRules(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager, interval time.Duration) { + if interval == 0 { + interval = 10 * time.Second + } + bs, err := yaml.Marshal(formatRules(rgs)) require.NoError(t, err) _, _ = tmpFile.Seek(0, 0) _, err = tmpFile.Write(bs) require.NoError(t, err) - err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "") + err = ruleManager.Update(interval, []string{tmpFile.Name()}, nil, "") require.NoError(t, err) } func reloadAndValidate(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager, expected map[string]labels.Labels, ogs map[string]*Group) { - reloadRules(rgs, t, tmpFile, ruleManager) + reloadRules(rgs, t, tmpFile, ruleManager, 0) for h, g := range ruleManager.groups { if ogs[h] == g {