mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
RuleConcurrencyController
: Add Sort
method
The concurrency implementation can now return a slice of concurrent rule batches This allows for additional concurrency as opposed to the current interface which is limited by the order in which the rules have been loaded Also, I removed the `concurrencyController` attribute from the group. That information is duplicated in the opts.RuleConcurrencyController` attribute, leading to some confusing behavior, especially in tests. Signed-off-by: Julien Duchesne <julien.duchesne@grafana.com>
This commit is contained in:
parent
615195372d
commit
9fd394072f
22
rules/fixtures/rules_chain.yaml
Normal file
22
rules/fixtures/rules_chain.yaml
Normal file
|
@ -0,0 +1,22 @@
|
|||
groups:
|
||||
- name: chain
|
||||
rules:
|
||||
# Evaluated concurrently, no dependencies
|
||||
- record: job:http_requests:rate1m
|
||||
expr: sum by (job)(rate(http_requests_total[1m]))
|
||||
- record: job:http_requests:rate5m
|
||||
expr: sum by (job)(rate(http_requests_total[1m]))
|
||||
|
||||
# Evaluated sequentially, dependents and dependencies
|
||||
- record: job1:http_requests:rate1m
|
||||
expr: job:http_requests:rate1m{job="job1"}
|
||||
- record: job1_cluster1:http_requests:rate1m
|
||||
expr: job1:http_requests:rate1m{cluster="cluster1"}
|
||||
|
||||
# Evaluated concurrently, no dependents
|
||||
- record: job1_cluster2:http_requests:rate1m
|
||||
expr: job1:http_requests:rate1m{cluster="cluster2"}
|
||||
- record: job1_cluster1_namespace1:http_requests:rate1m
|
||||
expr: job1_cluster1:http_requests:rate1m{namespace="namespace1"}
|
||||
- record: job1_cluster1_namespace2:http_requests:rate1m
|
||||
expr: job1_cluster1:http_requests:rate1m{namespace="namespace2"}
|
21
rules/fixtures/rules_multiple_dependents_on_base.yaml
Normal file
21
rules/fixtures/rules_multiple_dependents_on_base.yaml
Normal file
|
@ -0,0 +1,21 @@
|
|||
groups:
|
||||
- name: concurrent_dependents
|
||||
rules:
|
||||
# 3 dependents on the same base
|
||||
- record: job:http_requests:rate1m
|
||||
expr: sum by (job)(rate(http_requests_total[1m]))
|
||||
- record: job1:http_requests:rate1m
|
||||
expr: job:http_requests:rate1m{job="job1"}
|
||||
- record: job2:http_requests:rate1m
|
||||
expr: job:http_requests:rate1m{job="job2"}
|
||||
- record: job3:http_requests:rate1m
|
||||
expr: job:http_requests:rate1m{job="job3"}
|
||||
# another 3 dependents on the same base
|
||||
- record: job:http_requests:rate5m
|
||||
expr: sum by (job)(rate(http_requests_total[5m]))
|
||||
- record: job1:http_requests:rate5m
|
||||
expr: job:http_requests:rate5m{job="job1"}
|
||||
- record: job2:http_requests:rate5m
|
||||
expr: job:http_requests:rate5m{job="job2"}
|
||||
- record: job3:http_requests:rate5m
|
||||
expr: job:http_requests:rate5m{job="job3"}
|
|
@ -6,6 +6,8 @@ groups:
|
|||
expr: sum by (job)(rate(http_requests_total[1m]))
|
||||
- record: job:http_requests:rate5m
|
||||
expr: sum by (job)(rate(http_requests_total[5m]))
|
||||
- record: job:http_requests:rate10m
|
||||
expr: sum by (job)(rate(http_requests_total[10m]))
|
||||
|
||||
# dependents
|
||||
- record: job:http_requests:rate15m
|
||||
|
@ -20,6 +22,8 @@ groups:
|
|||
expr: sum by (job)(rate(grpc_requests_total[1m]))
|
||||
- record: job:grpc_requests:rate5m
|
||||
expr: sum by (job)(rate(grpc_requests_total[5m]))
|
||||
- record: job:grpc_requests:rate10m
|
||||
expr: sum by (job)(rate(grpc_requests_total[10m]))
|
||||
|
||||
# dependents
|
||||
- record: job:grpc_requests:rate15m
|
||||
|
|
|
@ -74,9 +74,7 @@ type Group struct {
|
|||
// defaults to DefaultEvalIterationFunc.
|
||||
evalIterationFunc GroupEvalIterationFunc
|
||||
|
||||
// concurrencyController controls the rules evaluation concurrency.
|
||||
concurrencyController RuleConcurrencyController
|
||||
appOpts *storage.AppendOptions
|
||||
appOpts *storage.AppendOptions
|
||||
}
|
||||
|
||||
// GroupEvalIterationFunc is used to implement and extend rule group
|
||||
|
@ -126,33 +124,27 @@ func NewGroup(o GroupOptions) *Group {
|
|||
evalIterationFunc = DefaultEvalIterationFunc
|
||||
}
|
||||
|
||||
concurrencyController := opts.RuleConcurrencyController
|
||||
if concurrencyController == nil {
|
||||
concurrencyController = sequentialRuleEvalController{}
|
||||
}
|
||||
|
||||
if opts.Logger == nil {
|
||||
opts.Logger = promslog.NewNopLogger()
|
||||
}
|
||||
|
||||
return &Group{
|
||||
name: o.Name,
|
||||
file: o.File,
|
||||
interval: o.Interval,
|
||||
queryOffset: o.QueryOffset,
|
||||
limit: o.Limit,
|
||||
rules: o.Rules,
|
||||
shouldRestore: o.ShouldRestore,
|
||||
opts: opts,
|
||||
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
|
||||
done: make(chan struct{}),
|
||||
managerDone: o.done,
|
||||
terminated: make(chan struct{}),
|
||||
logger: opts.Logger.With("file", o.File, "group", o.Name),
|
||||
metrics: metrics,
|
||||
evalIterationFunc: evalIterationFunc,
|
||||
concurrencyController: concurrencyController,
|
||||
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
|
||||
name: o.Name,
|
||||
file: o.File,
|
||||
interval: o.Interval,
|
||||
queryOffset: o.QueryOffset,
|
||||
limit: o.Limit,
|
||||
rules: o.Rules,
|
||||
shouldRestore: o.ShouldRestore,
|
||||
opts: opts,
|
||||
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
|
||||
done: make(chan struct{}),
|
||||
managerDone: o.done,
|
||||
terminated: make(chan struct{}),
|
||||
logger: opts.Logger.With("file", o.File, "group", o.Name),
|
||||
metrics: metrics,
|
||||
evalIterationFunc: evalIterationFunc,
|
||||
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -647,25 +639,32 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
|||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i, rule := range g.rules {
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) {
|
||||
wg.Add(1)
|
||||
|
||||
go eval(i, rule, func() {
|
||||
wg.Done()
|
||||
ctrl.Done(ctx)
|
||||
})
|
||||
} else {
|
||||
eval(i, rule, nil)
|
||||
}
|
||||
ctrl := g.opts.RuleConcurrencyController
|
||||
if ctrl == nil {
|
||||
ctrl = sequentialRuleEvalController{}
|
||||
}
|
||||
for _, concurrentRules := range ctrl.Sort(ctx, g) {
|
||||
for _, ruleIdx := range concurrentRules {
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
rule := g.rules[ruleIdx]
|
||||
if len(concurrentRules) > 1 && ctrl.Allow(ctx, g, rule) {
|
||||
wg.Add(1)
|
||||
|
||||
go eval(ruleIdx, rule, func() {
|
||||
wg.Done()
|
||||
ctrl.Done(ctx)
|
||||
})
|
||||
} else {
|
||||
eval(ruleIdx, rule, nil)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load())
|
||||
g.cleanupStaleSeries(ctx, ts)
|
||||
|
|
|
@ -465,10 +465,18 @@ func (c ruleDependencyController) AnalyseRules(rules []Rule) {
|
|||
}
|
||||
}
|
||||
|
||||
// ConcurrentRules represents a slice of indexes of rules that can be evaluated concurrently.
|
||||
type ConcurrentRules []int
|
||||
|
||||
// RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently.
|
||||
// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus
|
||||
// server with additional query load. Concurrency is controlled globally, not on a per-group basis.
|
||||
type RuleConcurrencyController interface {
|
||||
// Sort returns a slice of ConcurrentRules, which are batches of rules that can be evaluated concurrently.
|
||||
// The rules are represented by their index in the original list of rules of the group.
|
||||
// We need the original index to update the group's state and we don't want to mutate the group's rules.
|
||||
Sort(ctx context.Context, group *Group) []ConcurrentRules
|
||||
|
||||
// Allow determines if the given rule is allowed to be evaluated concurrently.
|
||||
// If Allow() returns true, then Done() must be called to release the acquired slot and corresponding cleanup is done.
|
||||
// It is important that both *Group and Rule are not retained and only be used for the duration of the call.
|
||||
|
@ -490,21 +498,51 @@ func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyControlle
|
|||
}
|
||||
|
||||
func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool {
|
||||
// To allow a rule to be executed concurrently, we need 3 conditions:
|
||||
// 1. The rule must not have any rules that depend on it.
|
||||
// 2. The rule itself must not depend on any other rules.
|
||||
// 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot.
|
||||
if rule.NoDependentRules() && rule.NoDependencyRules() {
|
||||
return c.sema.TryAcquire(1)
|
||||
return c.sema.TryAcquire(1)
|
||||
}
|
||||
|
||||
func (c *concurrentRuleEvalController) Sort(_ context.Context, g *Group) []ConcurrentRules {
|
||||
// Using the rule dependency controller information (rules being identified as having no dependencies or no dependants),
|
||||
// we can safely run the following concurrent groups:
|
||||
// 1. Concurrently, all rules that have no dependencies
|
||||
// 2. Sequentially, all rules that have both dependencies and dependants
|
||||
// 3. Concurrently, all rules that have no dependants
|
||||
|
||||
var noDependencies []int
|
||||
var dependenciesAndDependants []int
|
||||
var noDependants []int
|
||||
|
||||
for i, r := range g.rules {
|
||||
switch {
|
||||
case r.NoDependencyRules():
|
||||
noDependencies = append(noDependencies, i)
|
||||
case !r.NoDependentRules() && !r.NoDependencyRules():
|
||||
dependenciesAndDependants = append(dependenciesAndDependants, i)
|
||||
case r.NoDependentRules():
|
||||
noDependants = append(noDependants, i)
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
var order []ConcurrentRules
|
||||
if len(noDependencies) > 0 {
|
||||
order = append(order, noDependencies)
|
||||
}
|
||||
for _, r := range dependenciesAndDependants {
|
||||
order = append(order, []int{r})
|
||||
}
|
||||
if len(noDependants) > 0 {
|
||||
order = append(order, noDependants)
|
||||
}
|
||||
|
||||
return order
|
||||
}
|
||||
|
||||
func (c *concurrentRuleEvalController) Done(_ context.Context) {
|
||||
c.sema.Release(1)
|
||||
}
|
||||
|
||||
var _ RuleConcurrencyController = &sequentialRuleEvalController{}
|
||||
|
||||
// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially.
|
||||
type sequentialRuleEvalController struct{}
|
||||
|
||||
|
@ -512,6 +550,14 @@ func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule)
|
|||
return false
|
||||
}
|
||||
|
||||
func (c sequentialRuleEvalController) Sort(_ context.Context, g *Group) []ConcurrentRules {
|
||||
order := make([]ConcurrentRules, len(g.rules))
|
||||
for i := range g.rules {
|
||||
order[i] = []int{i}
|
||||
}
|
||||
return order
|
||||
}
|
||||
|
||||
func (c sequentialRuleEvalController) Done(_ context.Context) {}
|
||||
|
||||
// FromMaps returns new sorted Labels from the given maps, overriding each other in order.
|
||||
|
|
|
@ -1987,6 +1987,15 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
start := time.Now()
|
||||
DefaultEvalIterationFunc(ctx, group, start)
|
||||
|
||||
// Expected evaluation order
|
||||
order := group.opts.RuleConcurrencyController.Sort(ctx, group)
|
||||
require.Equal(t, []ConcurrentRules{
|
||||
{0},
|
||||
{1},
|
||||
{2},
|
||||
{3},
|
||||
}, order)
|
||||
|
||||
// Never expect more than 1 inflight query at a time.
|
||||
require.EqualValues(t, 1, maxInflight.Load())
|
||||
// Each rule should take at least 1 second to execute sequentially.
|
||||
|
@ -2065,6 +2074,12 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
start := time.Now()
|
||||
DefaultEvalIterationFunc(ctx, group, start)
|
||||
|
||||
// Expected evaluation order (isn't affected by concurrency settings)
|
||||
order := group.opts.RuleConcurrencyController.Sort(ctx, group)
|
||||
require.Equal(t, []ConcurrentRules{
|
||||
{0, 1, 2, 3, 4, 5},
|
||||
}, order)
|
||||
|
||||
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
|
||||
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
|
||||
// Some rules should execute concurrently so should complete quicker.
|
||||
|
@ -2104,6 +2119,12 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
|
||||
DefaultEvalIterationFunc(ctx, group, start)
|
||||
|
||||
// Expected evaluation order
|
||||
order := group.opts.RuleConcurrencyController.Sort(ctx, group)
|
||||
require.Equal(t, []ConcurrentRules{
|
||||
{0, 1, 2, 3, 4, 5},
|
||||
}, order)
|
||||
|
||||
// Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once.
|
||||
require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals)
|
||||
// Some rules should execute concurrently so should complete quicker.
|
||||
|
@ -2153,6 +2174,99 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
inflightQueries := atomic.Int32{}
|
||||
maxInflight := atomic.Int32{}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
ruleCount := 8
|
||||
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
|
||||
|
||||
// Configure concurrency settings.
|
||||
opts.ConcurrentEvalsEnabled = true
|
||||
opts.MaxConcurrentEvals = int64(ruleCount) * 2
|
||||
opts.RuleConcurrencyController = nil
|
||||
ruleManager := NewManager(opts)
|
||||
|
||||
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_dependents_on_base.yaml"}...)
|
||||
require.Empty(t, errs)
|
||||
require.Len(t, groups, 1)
|
||||
var group *Group
|
||||
for _, g := range groups {
|
||||
group = g
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Expected evaluation order
|
||||
order := group.opts.RuleConcurrencyController.Sort(ctx, group)
|
||||
require.Equal(t, []ConcurrentRules{
|
||||
{0, 4},
|
||||
{1, 2, 3, 5, 6, 7},
|
||||
}, order)
|
||||
|
||||
group.Eval(ctx, start)
|
||||
|
||||
// Inflight queries should be equal to 6. This is the size of the second batch of rules that can be executed concurrently.
|
||||
require.EqualValues(t, 6, maxInflight.Load())
|
||||
// Some rules should execute concurrently so should complete quicker.
|
||||
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
|
||||
// Each rule produces one vector.
|
||||
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
||||
})
|
||||
|
||||
t.Run("attempted asynchronous evaluation of chained rules", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
inflightQueries := atomic.Int32{}
|
||||
maxInflight := atomic.Int32{}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
ruleCount := 7
|
||||
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
|
||||
|
||||
// Configure concurrency settings.
|
||||
opts.ConcurrentEvalsEnabled = true
|
||||
opts.MaxConcurrentEvals = int64(ruleCount) * 2
|
||||
opts.RuleConcurrencyController = nil
|
||||
ruleManager := NewManager(opts)
|
||||
|
||||
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_chain.yaml"}...)
|
||||
require.Empty(t, errs)
|
||||
require.Len(t, groups, 1)
|
||||
var group *Group
|
||||
for _, g := range groups {
|
||||
group = g
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Expected evaluation order
|
||||
order := group.opts.RuleConcurrencyController.Sort(ctx, group)
|
||||
require.Equal(t, []ConcurrentRules{
|
||||
{0, 1},
|
||||
{2},
|
||||
{3},
|
||||
{4, 5, 6},
|
||||
}, order)
|
||||
|
||||
group.Eval(ctx, start)
|
||||
|
||||
require.EqualValues(t, 3, maxInflight.Load())
|
||||
// Some rules should execute concurrently so should complete quicker.
|
||||
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
|
||||
// Each rule produces one vector.
|
||||
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
||||
})
|
||||
}
|
||||
|
||||
func TestBoundedRuleEvalConcurrency(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue