Block until all rules, both sync & async, have completed evaluating

Updated & added tests
Review feedback nits
Return empty map if not indeterminate
Use highWatermark to track inflight requests counter
Appease the linter
Clarify feature flag

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
This commit is contained in:
Danny Kopping 2024-01-05 22:48:30 +02:00 committed by Marco Pracucci
parent f922534c4d
commit 7aa3b10c3f
No known key found for this signature in database
GPG key ID: 74C1BD403D2DF9B5
5 changed files with 208 additions and 145 deletions

View file

@ -217,8 +217,9 @@ Besides enabling this feature in Prometheus, created timestamps need to be expos
`--enable-feature=concurrent-rule-eval`
Rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the
By default, rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the
output of a preceding rule as its input. However, if there is no detectable relationship between rules then there is no
reason to run them sequentially. This can improve rule reliability at the expense of adding more concurrent query
load. The number of concurrent rule evaluations can be configured with `--rules.max-concurrent-rule-evals` which is set
to `4` by default.
reason to run them sequentially.
When the `concurrent-rule-eval` feature flag is enabled, rules without any dependency on other rules within a rule group will be evaluated concurrently.
This can improve rule reliability at the expense of adding more concurrent query load. The number of concurrent rule evaluations can be configured
with `--rules.max-concurrent-rule-evals` which is set to `4` by default.

View file

@ -0,0 +1,15 @@
groups:
- name: independents
rules:
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- record: job:http_requests:rate15m
expr: sum by (job)(rate(http_requests_total[15m]))
- record: job:http_requests:rate30m
expr: sum by (job)(rate(http_requests_total[30m]))
- record: job:http_requests:rate1h
expr: sum by (job)(rate(http_requests_total[1h]))
- record: job:http_requests:rate2h
expr: sum by (job)(rate(http_requests_total[2h]))

View file

@ -423,8 +423,13 @@ func (g *Group) CopyState(from *Group) {
}
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
// Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
var samplesTotal atomic.Float64
var (
samplesTotal atomic.Float64
wg sync.WaitGroup
)
for i, rule := range g.rules {
select {
case <-g.done:
@ -435,6 +440,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
eval := func(i int, rule Rule, async bool) {
defer func() {
if async {
wg.Done()
g.opts.RuleConcurrencyController.Done()
}
}()
@ -569,12 +575,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
// Try run concurrently if there are slots available.
ctrl := g.opts.RuleConcurrencyController
if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() {
wg.Add(1)
go eval(i, rule, true)
} else {
eval(i, rule, false)
}
}
wg.Wait()
if g.metrics != nil {
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load())
}
@ -940,7 +948,7 @@ func buildDependencyMap(rules []Rule) dependencyMap {
if len(rules) <= 1 {
// No relationships if group has 1 or fewer rules.
return nil
return dependencies
}
inputs := make(map[string][]Rule, len(rules))
@ -949,7 +957,9 @@ func buildDependencyMap(rules []Rule) dependencyMap {
var indeterminate bool
for _, rule := range rules {
rule := rule
if indeterminate {
break
}
name := rule.Name()
outputs[name] = append(outputs[name], rule)
@ -980,15 +990,10 @@ func buildDependencyMap(rules []Rule) dependencyMap {
return nil
}
if len(inputs) == 0 || len(outputs) == 0 {
// No relationships can be inferred.
return nil
}
for output, outRules := range outputs {
for _, outRule := range outRules {
if rs, found := inputs[output]; found && len(rs) > 0 {
dependencies[outRule] = append(dependencies[outRule], rs...)
if inRules, found := inputs[output]; found && len(inRules) > 0 {
dependencies[outRule] = append(dependencies[outRule], inRules...)
}
}
}

View file

@ -424,6 +424,7 @@ type RuleConcurrencyController interface {
RuleEligible(g *Group, r Rule) bool
// Allow determines whether any concurrent evaluation slots are available.
// If Allow() returns true, then Done() must be called to release the acquired slot.
Allow() bool
// Done releases a concurrent evaluation slot.
@ -445,15 +446,15 @@ func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcur
// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
type concurrentRuleEvalController struct {
mu sync.Mutex
enabled bool
sema *semaphore.Weighted
depMapsMu sync.Mutex
depMaps map[*Group]dependencyMap
}
func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool {
c.mu.Lock()
defer c.mu.Unlock()
c.depMapsMu.Lock()
defer c.depMapsMu.Unlock()
depMap, found := c.depMaps[g]
if !found {
@ -481,8 +482,8 @@ func (c *concurrentRuleEvalController) Done() {
}
func (c *concurrentRuleEvalController) Invalidate() {
c.mu.Lock()
defer c.mu.Unlock()
c.depMapsMu.Lock()
defer c.depMapsMu.Unlock()
// Clear out the memoized dependency maps because some or all groups may have been updated.
c.depMaps = map[*Group]dependencyMap{}

View file

@ -1498,8 +1498,8 @@ func TestDependenciesEdgeCases(t *testing.T) {
depMap := buildDependencyMap(group.rules)
// A group with no rules has no dependency map, but doesn't panic if the map is queried.
require.Nil(t, depMap)
require.False(t, depMap.isIndependent(rule))
require.Empty(t, depMap)
require.True(t, depMap.isIndependent(rule))
})
t.Run("rules which reference no series", func(t *testing.T) {
@ -1627,7 +1627,7 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
err := ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
require.NoError(t, err)
require.Greater(t, len(ruleManager.groups), 0, "expected non-empty rule groups")
require.NotEmpty(t, ruleManager.groups, "expected non-empty rule groups")
orig := make(map[string]dependencyMap, len(ruleManager.groups))
for _, g := range ruleManager.groups {
@ -1643,9 +1643,15 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
for h, g := range ruleManager.groups {
depMap := buildDependencyMap(g.rules)
// Dependency maps are the same because of no updates.
if orig[h] == nil {
require.Empty(t, orig[h])
require.Empty(t, depMap)
} else {
require.Equal(t, orig[h], depMap)
}
}
// Groups will be recreated when updated.
files[0] = "fixtures/rules_dependencies.yaml"
err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
@ -1667,7 +1673,7 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
// Dependency maps must change because the groups would've been updated.
require.NotEqual(t, orig[h], depMap)
// We expect there to be some dependencies since the new rule group contains a dependency.
require.Greater(t, len(depMap), 0)
require.NotEmpty(t, depMap)
require.Equal(t, 1, depMap.dependents(rr))
require.Zero(t, depMap.dependencies(rr))
}
@ -1677,86 +1683,51 @@ func TestAsyncRuleEvaluation(t *testing.T) {
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })
const artificialDelay = time.Second
var (
inflightQueries atomic.Int32
maxInflight atomic.Int32
)
files := []string{"fixtures/rules_multiple.yaml"}
opts := &ManagerOptions{
Context: context.Background(),
Logger: log.NewNopLogger(),
Appendable: storage,
QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
inflightQueries.Add(1)
defer func() {
inflightQueries.Add(-1)
}()
// Artificially delay all query executions to highlight concurrent execution improvement.
time.Sleep(artificialDelay)
// Return a stub sample.
return promql.Vector{
promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
}, nil
},
}
inflightTracker := func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
highWatermark := maxInflight.Load()
current := inflightQueries.Load()
if current > highWatermark {
maxInflight.Store(current)
}
time.Sleep(time.Millisecond)
}
}
}
expectedRules := 4
t.Run("synchronous evaluation with independent rules", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups {
require.Len(t, group.rules, expectedRules)
start := time.Now()
// Never expect more than 1 inflight query at a time.
go inflightTracker(ctx)
group.Eval(ctx, start)
require.EqualValues(t, 1, maxInflight.Load())
// Each rule should take at least 1 second to execute sequentially.
require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples))
}
cancel()
})
t.Run("asynchronous evaluation with independent rules", func(t *testing.T) {
// Reset.
inflightQueries.Store(0)
maxInflight.Store(0)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0))
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
ruleCount := 4
for _, group := range groups {
require.Len(t, group.rules, ruleCount)
start := time.Now()
group.Eval(ctx, start)
// Never expect more than 1 inflight query at a time.
require.EqualValues(t, 1, maxInflight.Load())
// Each rule should take at least 1 second to execute sequentially.
require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
})
t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) {
// Reset.
inflightQueries.Store(0)
maxInflight.Store(0)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleCount := 4
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
@ -1764,28 +1735,97 @@ func TestAsyncRuleEvaluation(t *testing.T) {
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups {
require.Len(t, group.rules, expectedRules)
require.Len(t, group.rules, ruleCount)
start := time.Now()
go inflightTracker(ctx)
group.Eval(ctx, start)
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds())
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples))
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
})
cancel()
t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) {
// Reset.
inflightQueries.Store(0)
maxInflight.Store(0)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleCount := 6
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups {
require.Len(t, group.rules, ruleCount)
start := time.Now()
group.Eval(ctx, start)
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
})
t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) {
// Reset.
inflightQueries.Store(0)
maxInflight.Store(0)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleCount := 6
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(ruleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups {
require.Len(t, group.rules, ruleCount)
start := time.Now()
group.Eval(ctx, start)
// Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once.
require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals)
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
})
}
@ -1793,8 +1833,6 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) {
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })
const artificialDelay = time.Millisecond * 100
var (
inflightQueries atomic.Int32
maxInflight atomic.Int32
@ -1803,50 +1841,15 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) {
)
files := []string{"fixtures/rules_multiple_groups.yaml"}
ruleManager := NewManager(&ManagerOptions{
Context: context.Background(),
Logger: log.NewNopLogger(),
Appendable: storage,
ConcurrentEvalsEnabled: true,
MaxConcurrentEvals: maxConcurrency,
QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
inflightQueries.Add(1)
defer func() {
inflightQueries.Add(-1)
}()
// Artificially delay all query executions to highlight concurrent execution improvement.
time.Sleep(artificialDelay)
// Return a stub sample.
return promql.Vector{
promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
}, nil
},
})
ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, maxConcurrency))
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
require.Empty(t, errs)
require.Len(t, groups, groupCount)
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-ctx.Done():
return
default:
highWatermark := maxInflight.Load()
current := inflightQueries.Load()
if current > highWatermark {
maxInflight.Store(current)
}
time.Sleep(time.Millisecond)
}
}
}()
t.Cleanup(cancel)
// Evaluate groups concurrently (like they normally do).
var wg sync.WaitGroup
@ -1861,8 +1864,46 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) {
}
wg.Wait()
cancel()
// Synchronous queries also count towards inflight, so at most we can have maxConcurrency+$groupCount inflight evaluations.
require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount))
}
const artificialDelay = 10 * time.Millisecond
func optsFactory(storage storage.Storage, maxInflight, inflightQueries *atomic.Int32, maxConcurrent int64) *ManagerOptions {
var inflightMu sync.Mutex
concurrent := maxConcurrent > 0
return &ManagerOptions{
Context: context.Background(),
Logger: log.NewNopLogger(),
ConcurrentEvalsEnabled: concurrent,
MaxConcurrentEvals: maxConcurrent,
Appendable: storage,
QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
inflightMu.Lock()
current := inflightQueries.Add(1)
defer func() {
inflightQueries.Add(-1)
}()
highWatermark := maxInflight.Load()
if current > highWatermark {
maxInflight.Store(current)
}
inflightMu.Unlock()
// Artificially delay all query executions to highlight concurrent execution improvement.
time.Sleep(artificialDelay)
// Return a stub sample.
return promql.Vector{
promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
}, nil
},
}
}