RuleConcurrencyController: Add SplitGroupIntoBatches method (#15681)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run

* `RuleConcurrencyController`: Add `SplitGroupIntoBatches` method
The concurrency implementation can now return a slice of concurrent rule batches
This allows for additional concurrency as opposed to the current interface which is limited by the order in which the rules have been loaded

Also, I removed the `concurrencyController` attribute from the group. That information is duplicated in the opts.RuleConcurrencyController` attribute, leading to some confusing behavior, especially in tests.

Signed-off-by: Julien Duchesne <julien.duchesne@grafana.com>

* Address PR comments

Signed-off-by: Julien Duchesne <julien.duchesne@grafana.com>

* Apply suggestions from code review

Co-authored-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: Julien Duchesne <julienduchesne@live.com>

---------

Signed-off-by: Julien Duchesne <julien.duchesne@grafana.com>
Signed-off-by: Julien Duchesne <julienduchesne@live.com>
Co-authored-by: gotjosh <josue.abreu@gmail.com>
This commit is contained in:
Julien Duchesne 2025-01-06 13:51:19 -05:00 committed by GitHub
parent 5fdec31401
commit 8067f27971
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 256 additions and 50 deletions

View file

@ -0,0 +1,22 @@
groups:
- name: chain
rules:
# Evaluated concurrently, no dependencies
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[1m]))
# Evaluated sequentially, dependents and dependencies
- record: job1:http_requests:rate1m
expr: job:http_requests:rate1m{job="job1"}
- record: job1_cluster1:http_requests:rate1m
expr: job1:http_requests:rate1m{cluster="cluster1"}
# Evaluated concurrently, no dependents
- record: job1_cluster2:http_requests:rate1m
expr: job1:http_requests:rate1m{cluster="cluster2"}
- record: job1_cluster1_namespace1:http_requests:rate1m
expr: job1_cluster1:http_requests:rate1m{namespace="namespace1"}
- record: job1_cluster1_namespace2:http_requests:rate1m
expr: job1_cluster1:http_requests:rate1m{namespace="namespace2"}

View file

@ -0,0 +1,21 @@
groups:
- name: concurrent_dependents
rules:
# 3 dependents on the same base
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job1:http_requests:rate1m
expr: job:http_requests:rate1m{job="job1"}
- record: job2:http_requests:rate1m
expr: job:http_requests:rate1m{job="job2"}
- record: job3:http_requests:rate1m
expr: job:http_requests:rate1m{job="job3"}
# another 3 dependents on the same base
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- record: job1:http_requests:rate5m
expr: job:http_requests:rate5m{job="job1"}
- record: job2:http_requests:rate5m
expr: job:http_requests:rate5m{job="job2"}
- record: job3:http_requests:rate5m
expr: job:http_requests:rate5m{job="job3"}

View file

@ -6,6 +6,8 @@ groups:
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- record: job:http_requests:rate10m
expr: sum by (job)(rate(http_requests_total[10m]))
# dependents
- record: job:http_requests:rate15m
@ -20,6 +22,8 @@ groups:
expr: sum by (job)(rate(grpc_requests_total[1m]))
- record: job:grpc_requests:rate5m
expr: sum by (job)(rate(grpc_requests_total[5m]))
- record: job:grpc_requests:rate10m
expr: sum by (job)(rate(grpc_requests_total[10m]))
# dependents
- record: job:grpc_requests:rate15m

View file

@ -74,9 +74,7 @@ type Group struct {
// defaults to DefaultEvalIterationFunc.
evalIterationFunc GroupEvalIterationFunc
// concurrencyController controls the rules evaluation concurrency.
concurrencyController RuleConcurrencyController
appOpts *storage.AppendOptions
appOpts *storage.AppendOptions
}
// GroupEvalIterationFunc is used to implement and extend rule group
@ -126,33 +124,27 @@ func NewGroup(o GroupOptions) *Group {
evalIterationFunc = DefaultEvalIterationFunc
}
concurrencyController := opts.RuleConcurrencyController
if concurrencyController == nil {
concurrencyController = sequentialRuleEvalController{}
}
if opts.Logger == nil {
opts.Logger = promslog.NewNopLogger()
}
return &Group{
name: o.Name,
file: o.File,
interval: o.Interval,
queryOffset: o.QueryOffset,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}),
managerDone: o.done,
terminated: make(chan struct{}),
logger: opts.Logger.With("file", o.File, "group", o.Name),
metrics: metrics,
evalIterationFunc: evalIterationFunc,
concurrencyController: concurrencyController,
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
name: o.Name,
file: o.File,
interval: o.Interval,
queryOffset: o.QueryOffset,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}),
managerDone: o.done,
terminated: make(chan struct{}),
logger: opts.Logger.With("file", o.File, "group", o.Name),
metrics: metrics,
evalIterationFunc: evalIterationFunc,
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
}
}
@ -647,25 +639,33 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
var wg sync.WaitGroup
for i, rule := range g.rules {
select {
case <-g.done:
return
default:
}
if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) {
wg.Add(1)
go eval(i, rule, func() {
wg.Done()
ctrl.Done(ctx)
})
} else {
eval(i, rule, nil)
}
ctrl := g.opts.RuleConcurrencyController
if ctrl == nil {
ctrl = sequentialRuleEvalController{}
}
for _, batch := range ctrl.SplitGroupIntoBatches(ctx, g) {
for _, ruleIndex := range batch {
select {
case <-g.done:
return
default:
}
rule := g.rules[ruleIndex]
if len(batch) > 1 && ctrl.Allow(ctx, g, rule) {
wg.Add(1)
go eval(ruleIndex, rule, func() {
wg.Done()
ctrl.Done(ctx)
})
} else {
eval(ruleIndex, rule, nil)
}
}
// It is important that we finish processing any rules in this current batch - before we move into the next one.
wg.Wait()
}
wg.Wait()
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load())
g.cleanupStaleSeries(ctx, ts)

View file

@ -465,10 +465,17 @@ func (c ruleDependencyController) AnalyseRules(rules []Rule) {
}
}
// ConcurrentRules represents a slice of indexes of rules that can be evaluated concurrently.
type ConcurrentRules []int
// RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently.
// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus
// server with additional query load. Concurrency is controlled globally, not on a per-group basis.
type RuleConcurrencyController interface {
// SplitGroupIntoBatches returns an ordered slice of of ConcurrentRules, which are batches of rules that can be evaluated concurrently.
// The rules are represented by their index from the input rule group.
SplitGroupIntoBatches(ctx context.Context, group *Group) []ConcurrentRules
// Allow determines if the given rule is allowed to be evaluated concurrently.
// If Allow() returns true, then Done() must be called to release the acquired slot and corresponding cleanup is done.
// It is important that both *Group and Rule are not retained and only be used for the duration of the call.
@ -490,21 +497,51 @@ func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyControlle
}
func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool {
// To allow a rule to be executed concurrently, we need 3 conditions:
// 1. The rule must not have any rules that depend on it.
// 2. The rule itself must not depend on any other rules.
// 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot.
if rule.NoDependentRules() && rule.NoDependencyRules() {
return c.sema.TryAcquire(1)
return c.sema.TryAcquire(1)
}
func (c *concurrentRuleEvalController) SplitGroupIntoBatches(_ context.Context, g *Group) []ConcurrentRules {
// Using the rule dependency controller information (rules being identified as having no dependencies or no dependants),
// we can safely run the following concurrent groups:
// 1. Concurrently, all rules that have no dependencies
// 2. Sequentially, all rules that have both dependencies and dependants
// 3. Concurrently, all rules that have no dependants
var noDependencies []int
var dependenciesAndDependants []int
var noDependants []int
for i, r := range g.rules {
switch {
case r.NoDependencyRules():
noDependencies = append(noDependencies, i)
case !r.NoDependentRules() && !r.NoDependencyRules():
dependenciesAndDependants = append(dependenciesAndDependants, i)
case r.NoDependentRules():
noDependants = append(noDependants, i)
}
}
return false
var order []ConcurrentRules
if len(noDependencies) > 0 {
order = append(order, noDependencies)
}
for _, r := range dependenciesAndDependants {
order = append(order, []int{r})
}
if len(noDependants) > 0 {
order = append(order, noDependants)
}
return order
}
func (c *concurrentRuleEvalController) Done(_ context.Context) {
c.sema.Release(1)
}
var _ RuleConcurrencyController = &sequentialRuleEvalController{}
// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially.
type sequentialRuleEvalController struct{}
@ -512,6 +549,14 @@ func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule)
return false
}
func (c sequentialRuleEvalController) SplitGroupIntoBatches(_ context.Context, g *Group) []ConcurrentRules {
order := make([]ConcurrentRules, len(g.rules))
for i := range g.rules {
order[i] = []int{i}
}
return order
}
func (c sequentialRuleEvalController) Done(_ context.Context) {}
// FromMaps returns new sorted Labels from the given maps, overriding each other in order.

View file

@ -1987,6 +1987,15 @@ func TestAsyncRuleEvaluation(t *testing.T) {
start := time.Now()
DefaultEvalIterationFunc(ctx, group, start)
// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0},
{1},
{2},
{3},
}, order)
// Never expect more than 1 inflight query at a time.
require.EqualValues(t, 1, maxInflight.Load())
// Each rule should take at least 1 second to execute sequentially.
@ -2065,6 +2074,12 @@ func TestAsyncRuleEvaluation(t *testing.T) {
start := time.Now()
DefaultEvalIterationFunc(ctx, group, start)
// Expected evaluation order (isn't affected by concurrency settings)
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1, 2, 3, 4, 5},
}, order)
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
@ -2104,6 +2119,12 @@ func TestAsyncRuleEvaluation(t *testing.T) {
DefaultEvalIterationFunc(ctx, group, start)
// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1, 2, 3, 4, 5},
}, order)
// Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once.
require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals)
// Some rules should execute concurrently so should complete quicker.
@ -2153,6 +2174,99 @@ func TestAsyncRuleEvaluation(t *testing.T) {
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
}
})
t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleCount := 8
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(ruleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
var group *Group
for _, g := range groups {
group = g
}
start := time.Now()
// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 4},
{1, 2, 3, 5, 6, 7},
}, order)
group.Eval(ctx, start)
// Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently.
require.EqualValues(t, 6, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
})
t.Run("attempted asynchronous evaluation of chained rules", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })
inflightQueries := atomic.Int32{}
maxInflight := atomic.Int32{}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleCount := 7
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = int64(ruleCount) * 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_chain.yaml"}...)
require.Empty(t, errs)
require.Len(t, groups, 1)
var group *Group
for _, g := range groups {
group = g
}
start := time.Now()
// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1},
{2},
{3},
{4, 5, 6},
}, order)
group.Eval(ctx, start)
require.EqualValues(t, 3, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
})
}
func TestBoundedRuleEvalConcurrency(t *testing.T) {