RuleDependencyController: Fix for indeterminate conditions (#15560)

The dependency map being empty meant that all the rules were being set as having no dependencies or dependents. Which is the opposite of what we want
Added two new tests that verify the behavior, they failed before the fix, running all the rules concurrently

Signed-off-by: Julien Duchesne <julien.duchesne@grafana.com>
This commit is contained in:
Julien Duchesne 2024-12-13 11:48:29 -05:00 committed by GitHub
parent 9009724c5b
commit 7802ca263d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 191 additions and 0 deletions

View file

@ -0,0 +1,18 @@
groups:
- name: indeterminate
rules:
# This shouldn't run in parallel because of the open matcher
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- record: job:http_requests:rate15m
expr: sum by (job)(rate(http_requests_total[15m]))
- record: job:http_requests:rate30m
expr: sum by (job)(rate(http_requests_total[30m]))
- record: job:http_requests:rate1h
expr: sum by (job)(rate(http_requests_total[1h]))
- record: job:http_requests:rate2h
expr: sum by (job)(rate(http_requests_total[2h]))
- record: matcher
expr: '{job="job1"}'

View file

@ -453,6 +453,11 @@ type ruleDependencyController struct{}
// AnalyseRules implements RuleDependencyController. // AnalyseRules implements RuleDependencyController.
func (c ruleDependencyController) AnalyseRules(rules []Rule) { func (c ruleDependencyController) AnalyseRules(rules []Rule) {
depMap := buildDependencyMap(rules) depMap := buildDependencyMap(rules)
if depMap == nil {
return
}
for _, r := range rules { for _, r := range rules {
r.SetNoDependentRules(depMap.dependents(r) == 0) r.SetNoDependentRules(depMap.dependents(r) == 0)
r.SetNoDependencyRules(depMap.dependencies(r) == 0) r.SetNoDependencyRules(depMap.dependencies(r) == 0)

View file

@ -2110,6 +2110,45 @@ func TestAsyncRuleEvaluation(t *testing.T) {
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
} }
}) })
t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleCount := 7
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(ruleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_indeterminates.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups {
require.Len(t, group.rules, ruleCount)
start := time.Now()
group.Eval(ctx, start)
// Never expect more than 1 inflight query at a time.
require.EqualValues(t, 1, maxInflight.Load())
// Each rule should take at least 1 second to execute sequentially.
require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
})
} }
func TestBoundedRuleEvalConcurrency(t *testing.T) { func TestBoundedRuleEvalConcurrency(t *testing.T) {
@ -2222,3 +2261,132 @@ func TestLabels_FromMaps(t *testing.T) {
require.Equal(t, expected, mLabels, "unexpected labelset") require.Equal(t, expected, mLabels, "unexpected labelset")
} }
func TestRuleDependencyController_AnalyseRules(t *testing.T) {
type expectedDependencies struct {
noDependentRules bool
noDependencyRules bool
}
testCases := []struct {
name string
ruleFile string
expected map[string]expectedDependencies
}{
{
name: "all independent rules",
ruleFile: "fixtures/rules_multiple_independent.yaml",
expected: map[string]expectedDependencies{
"job:http_requests:rate1m": {
noDependentRules: true,
noDependencyRules: true,
},
"job:http_requests:rate5m": {
noDependentRules: true,
noDependencyRules: true,
},
"job:http_requests:rate15m": {
noDependentRules: true,
noDependencyRules: true,
},
"job:http_requests:rate30m": {
noDependentRules: true,
noDependencyRules: true,
},
"job:http_requests:rate1h": {
noDependentRules: true,
noDependencyRules: true,
},
"job:http_requests:rate2h": {
noDependentRules: true,
noDependencyRules: true,
},
},
},
{
name: "some dependent rules",
ruleFile: "fixtures/rules_multiple.yaml",
expected: map[string]expectedDependencies{
"job:http_requests:rate1m": {
noDependentRules: true,
noDependencyRules: true,
},
"job:http_requests:rate5m": {
noDependentRules: true,
noDependencyRules: true,
},
"job:http_requests:rate15m": {
noDependentRules: false,
noDependencyRules: true,
},
"TooManyRequests": {
noDependentRules: true,
noDependencyRules: false,
},
},
},
{
name: "indeterminate rules",
ruleFile: "fixtures/rules_indeterminates.yaml",
expected: map[string]expectedDependencies{
"job:http_requests:rate1m": {
noDependentRules: false,
noDependencyRules: false,
},
"job:http_requests:rate5m": {
noDependentRules: false,
noDependencyRules: false,
},
"job:http_requests:rate15m": {
noDependentRules: false,
noDependencyRules: false,
},
"job:http_requests:rate30m": {
noDependentRules: false,
noDependencyRules: false,
},
"job:http_requests:rate1h": {
noDependentRules: false,
noDependencyRules: false,
},
"job:http_requests:rate2h": {
noDependentRules: false,
noDependencyRules: false,
},
"matcher": {
noDependentRules: false,
noDependencyRules: false,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })
ruleManager := NewManager(&ManagerOptions{
Context: context.Background(),
Logger: promslog.NewNopLogger(),
Appendable: storage,
QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { return nil, nil },
})
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, tc.ruleFile)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, g := range groups {
ruleManager.opts.RuleDependencyController.AnalyseRules(g.rules)
for _, r := range g.rules {
exp, ok := tc.expected[r.Name()]
require.Truef(t, ok, "rule not found in expected: %s", r.String())
require.Equalf(t, exp.noDependentRules, r.NoDependentRules(), "rule: %s", r.String())
require.Equalf(t, exp.noDependencyRules, r.NoDependencyRules(), "rule: %s", r.String())
}
}
})
}
}