Merge pull request #55 from grafana/dimitar/rule-groups-soruce-tenants

Add source_tenants field to RuleGroup
This commit is contained in:
Marco Pracucci 2021-12-01 16:17:18 +01:00 committed by GitHub
commit 415354aeb8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 179 additions and 28 deletions

View file

@ -116,10 +116,11 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) {
// RuleGroup is a list of sequentially evaluated recording and alerting rules.
type RuleGroup struct {
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"`
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"`
SourceTenants []string `yaml:"source_tenants,omitempty"`
}
// Rule describes an alerting or recording rule.

View file

@ -36,6 +36,7 @@ groups:
- name: my-another-name
interval: 30s # defaults to global interval
source_tenants: [tenant-1]
rules:
- alert: HighErrors
expr: |

View file

@ -0,0 +1,6 @@
groups:
- name: test
rules:
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
source_tenants: [tenant-1, tenant-2]

View file

@ -246,6 +246,7 @@ type Group struct {
interval time.Duration
limit int
rules []Rule
sourceTenants []string
seriesInPreviousEval []map[string]labels.Labels // One per Rule.
staleSeries []labels.Labels
opts *ManagerOptions
@ -270,6 +271,7 @@ type GroupOptions struct {
Interval time.Duration
Limit int
Rules []Rule
SourceTenants []string
ShouldRestore bool
Opts *ManagerOptions
done chan struct{}
@ -301,6 +303,7 @@ func NewGroup(o GroupOptions) *Group {
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: o.Opts,
sourceTenants: o.SourceTenants,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}),
managerDone: o.done,
@ -325,6 +328,10 @@ func (g *Group) Interval() time.Duration { return g.interval }
// Limit returns the group's limit.
func (g *Group) Limit() int { return g.limit }
// SourceTenants returns the source tenants for the group.
// If it's empty or nil, then the owning user/tenant is considered to be the source tenant.
func (g *Group) SourceTenants() []string { return g.sourceTenants }
func (g *Group) run(ctx context.Context) {
defer close(g.terminated)
@ -869,6 +876,23 @@ func (g *Group) Equals(ng *Group) bool {
}
}
// compare source tenants ignoring their order
if len(g.sourceTenants) != len(ng.sourceTenants) {
return false
}
thisSourceTenants := make(map[string]struct{}, len(g.sourceTenants))
for _, tenant := range g.sourceTenants {
thisSourceTenants[tenant] = struct{}{}
}
for _, tenant := range ng.sourceTenants {
if _, ok := thisSourceTenants[tenant]; !ok {
return false
}
}
return true
}
@ -887,20 +911,25 @@ type Manager struct {
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
type ContextWrapFunc func(ctx context.Context, g *Group) context.Context
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
ExternalURL *url.URL
QueryFunc QueryFunc
NotifyFunc NotifyFunc
Context context.Context
Appendable storage.Appendable
Queryable storage.Queryable
Logger log.Logger
Registerer prometheus.Registerer
OutageTolerance time.Duration
ForGracePeriod time.Duration
ResendDelay time.Duration
GroupLoader GroupLoader
ExternalURL *url.URL
QueryFunc QueryFunc
NotifyFunc NotifyFunc
Context context.Context
// GroupEvaluationContextFunc will be called to wrap Context based on the group being evaluated.
// Will be skipped if nil.
GroupEvaluationContextFunc ContextWrapFunc
Appendable storage.Appendable
Queryable storage.Queryable
Logger log.Logger
Registerer prometheus.Registerer
OutageTolerance time.Duration
ForGracePeriod time.Duration
ResendDelay time.Duration
GroupLoader GroupLoader
Metrics *Metrics
}
@ -992,11 +1021,16 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
newg.CopyState(oldg)
}
wg.Done()
ctx := m.opts.Context
if m.opts.GroupEvaluationContextFunc != nil {
ctx = m.opts.GroupEvaluationContextFunc(ctx, newg)
}
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
newg.run(m.opts.Context)
newg.run(ctx)
}(newg)
}
@ -1097,6 +1131,7 @@ func (m *Manager) LoadGroups(
Interval: itv,
Limit: rg.Limit,
Rules: rules,
SourceTenants: rg.SourceTenants,
ShouldRestore: shouldRestore,
Opts: m.opts,
done: m.done,

View file

@ -787,6 +787,103 @@ func TestUpdate(t *testing.T) {
}
}
reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs)
// Change group source tenants and reload.
for i := range rgs.Groups {
rgs.Groups[i].SourceTenants = []string{"tenant-2"}
}
reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs)
}
func TestUpdateSetsSourceTenants(t *testing.T) {
st := teststorage.New(t)
defer st.Close()
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: st,
Queryable: st,
QueryFunc: EngineQueryFunc(engine, st),
Context: context.Background(),
Logger: log.NewNopLogger(),
})
ruleManager.start()
defer ruleManager.Stop()
rgs, errs := rulefmt.ParseFile("fixtures/rules_with_source_tenants.yaml")
require.Empty(t, errs, "file parsing failures")
tmpFile, err := ioutil.TempFile("", "rules.test.*.yaml")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
reloadRules(rgs, t, tmpFile, ruleManager, 0)
// check that all source tenants were actually set
require.Len(t, ruleManager.groups, len(rgs.Groups))
for _, expectedGroup := range rgs.Groups {
actualGroup, ok := ruleManager.groups[GroupKey(tmpFile.Name(), expectedGroup.Name)]
require.True(t, ok, "actual groups don't contain at one of the expected groups")
require.ElementsMatch(t, expectedGroup.SourceTenants, actualGroup.SourceTenants())
}
}
func TestGroupEvaluationContextFuncIsCalledWhenSupplied(t *testing.T) {
type testContextKeyType string
var testContextKey testContextKeyType = "TestGroupEvaluationContextFuncIsCalledWhenSupplied"
oldContextTestValue := context.Background().Value(testContextKey)
contextTestValueChannel := make(chan interface{})
mockQueryFunc := func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
contextTestValueChannel <- ctx.Value(testContextKey)
return promql.Vector{}, nil
}
mockContextWrapFunc := func(ctx context.Context, g *Group) context.Context {
return context.WithValue(ctx, testContextKey, 42)
}
st := teststorage.New(t)
defer st.Close()
ruleManager := NewManager(&ManagerOptions{
Appendable: st,
Queryable: st,
QueryFunc: mockQueryFunc,
Context: context.Background(),
Logger: log.NewNopLogger(),
GroupEvaluationContextFunc: mockContextWrapFunc,
})
rgs, errs := rulefmt.ParseFile("fixtures/rules_with_source_tenants.yaml")
require.Empty(t, errs, "file parsing failures")
tmpFile, err := ioutil.TempFile("", "rules.test.*.yaml")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
// no filesystem is harmed when running this test, set the interval low
reloadRules(rgs, t, tmpFile, ruleManager, 10*time.Millisecond)
ruleManager.start()
defer ruleManager.Stop()
// check that all source tenants were actually set
require.Len(t, ruleManager.groups, len(rgs.Groups))
require.Nil(t, oldContextTestValue, "Context contained test key before the test, impossible")
newContextTestValue := <-contextTestValueChannel
require.Equal(t, 42, newContextTestValue, "Context does not contain the correct value that should be injected")
}
// ruleGroupsTest for running tests over rules.
@ -796,10 +893,11 @@ type ruleGroupsTest struct {
// ruleGroupTest forms a testing struct for running tests over rules.
type ruleGroupTest struct {
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []rulefmt.Rule `yaml:"rules"`
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []rulefmt.Rule `yaml:"rules"`
SourceTenants []string `yaml:"source_tenants,omitempty"`
}
func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest {
@ -818,10 +916,11 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest {
})
}
tmp = append(tmp, ruleGroupTest{
Name: g.Name,
Interval: g.Interval,
Limit: g.Limit,
Rules: rtmp,
Name: g.Name,
Interval: g.Interval,
Limit: g.Limit,
Rules: rtmp,
SourceTenants: g.SourceTenants,
})
}
return ruleGroupsTest{
@ -829,14 +928,23 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest {
}
}
func reloadAndValidate(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager, expected map[string]labels.Labels, ogs map[string]*Group) {
func reloadRules(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager, interval time.Duration) {
if interval == 0 {
interval = 10 * time.Second
}
bs, err := yaml.Marshal(formatRules(rgs))
require.NoError(t, err)
tmpFile.Seek(0, 0)
_, _ = tmpFile.Seek(0, 0)
_, err = tmpFile.Write(bs)
require.NoError(t, err)
err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "")
err = ruleManager.Update(interval, []string{tmpFile.Name()}, nil, "")
require.NoError(t, err)
}
func reloadAndValidate(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, ruleManager *Manager, expected map[string]labels.Labels, ogs map[string]*Group) {
reloadRules(rgs, t, tmpFile, ruleManager, 0)
for h, g := range ruleManager.groups {
if ogs[h] == g {
t.Fail()