Implementation

NOTE:
Rebased from main after refactor in #13014

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
This commit is contained in:
Danny Kopping 2023-10-25 22:31:26 +02:00 committed by Marco Pracucci
parent bdc3cfdd5d
commit 940f83a540
No known key found for this signature in database
GPG key ID: 74C1BD403D2DF9B5
8 changed files with 595 additions and 28 deletions

View file

@ -137,6 +137,7 @@ type flagConfig struct {
forGracePeriod model.Duration forGracePeriod model.Duration
outageTolerance model.Duration outageTolerance model.Duration
resendDelay model.Duration resendDelay model.Duration
maxConcurrentEvals int64
web web.Options web web.Options
scrape scrape.Options scrape scrape.Options
tsdb tsdbOptions tsdb tsdbOptions
@ -411,6 +412,9 @@ func main() {
serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
Default("1m").SetValue(&cfg.resendDelay) Default("1m").SetValue(&cfg.resendDelay)
serverOnlyFlag(a, "rules.max-concurrent-rule-evals", "Global concurrency limit for independent rules which can run concurrently.").
Default("4").Int64Var(&cfg.maxConcurrentEvals)
a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release."). a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release.").
Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps) Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps)
@ -760,6 +764,7 @@ func main() {
OutageTolerance: time.Duration(cfg.outageTolerance), OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod), ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay), ResendDelay: time.Duration(cfg.resendDelay),
MaxConcurrentEvals: cfg.maxConcurrentEvals,
}) })
} }

View file

@ -48,6 +48,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--rules.alert.for-outage-tolerance</code> | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` | | <code class="text-nowrap">--rules.alert.for-outage-tolerance</code> | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` |
| <code class="text-nowrap">--rules.alert.for-grace-period</code> | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` | | <code class="text-nowrap">--rules.alert.for-grace-period</code> | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` |
| <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | | <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
| <code class="text-nowrap">--rules.max-concurrent-rule-evals</code> | Global concurrency limit for independent rules which can run concurrently. Use with server mode only. | `4` |
| <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | | <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
| <code class="text-nowrap">--query.lookback-delta</code> | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` | | <code class="text-nowrap">--query.lookback-delta</code> | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` |
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |

View file

@ -0,0 +1,7 @@
groups:
- name: test
rules:
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- record: HighRequestRate
expr: job:http_requests:rate5m > 100

View file

@ -0,0 +1,14 @@
groups:
- name: test
rules:
# independents
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
# dependents
- record: job:http_requests:rate15m
expr: sum by (job)(rate(http_requests_total[15m]))
- record: TooManyRequests
expr: job:http_requests:rate15m > 100

View file

@ -0,0 +1,28 @@
groups:
- name: http
rules:
# independents
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
# dependents
- record: job:http_requests:rate15m
expr: sum by (job)(rate(http_requests_total[15m]))
- record: TooManyHTTPRequests
expr: job:http_requests:rate15m > 100
- name: grpc
rules:
# independents
- record: job:grpc_requests:rate1m
expr: sum by (job)(rate(grpc_requests_total[1m]))
- record: job:grpc_requests:rate5m
expr: sum by (job)(rate(grpc_requests_total[5m]))
# dependents
- record: job:grpc_requests:rate15m
expr: sum by (job)(rate(grpc_requests_total[15m]))
- record: TooManyGRPCRequests
expr: job:grpc_requests:rate15m > 100

View file

@ -21,8 +21,11 @@ import (
"sync" "sync"
"time" "time"
"go.uber.org/atomic"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"github.com/prometheus/prometheus/promql/parser"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -68,6 +71,7 @@ type Group struct {
// Rule group evaluation iteration function, // Rule group evaluation iteration function,
// defaults to DefaultEvalIterationFunc. // defaults to DefaultEvalIterationFunc.
evalIterationFunc GroupEvalIterationFunc evalIterationFunc GroupEvalIterationFunc
dependencyMap dependencyMap
} }
// GroupEvalIterationFunc is used to implement and extend rule group // GroupEvalIterationFunc is used to implement and extend rule group
@ -126,6 +130,7 @@ func NewGroup(o GroupOptions) *Group {
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
metrics: metrics, metrics: metrics,
evalIterationFunc: evalIterationFunc, evalIterationFunc: evalIterationFunc,
dependencyMap: buildDependencyMap(o.Rules),
} }
} }
@ -421,7 +426,7 @@ func (g *Group) CopyState(from *Group) {
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially. // Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) { func (g *Group) Eval(ctx context.Context, ts time.Time) {
var samplesTotal float64 var samplesTotal atomic.Float64
for i, rule := range g.rules { for i, rule := range g.rules {
select { select {
case <-g.done: case <-g.done:
@ -429,7 +434,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
default: default:
} }
func(i int, rule Rule) { eval := func(i int, rule Rule, async bool) {
if async {
defer func() {
g.opts.ConcurrentEvalSema.Release(1)
}()
}
logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i)
ctx, sp := otel.Tracer("").Start(ctx, "rule") ctx, sp := otel.Tracer("").Start(ctx, "rule")
sp.SetAttributes(attribute.String("name", rule.Name())) sp.SetAttributes(attribute.String("name", rule.Name()))
@ -465,7 +475,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
} }
rule.SetHealth(HealthGood) rule.SetHealth(HealthGood)
rule.SetLastError(nil) rule.SetLastError(nil)
samplesTotal += float64(len(vector)) samplesTotal.Add(float64(len(vector)))
if ar, ok := rule.(*AlertingRule); ok { if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
@ -554,10 +564,19 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
} }
} }
} }
}(i, rule)
} }
// If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output.
// Try run concurrently if there are slots available.
if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalSema != nil && g.opts.ConcurrentEvalSema.TryAcquire(1) {
go eval(i, rule, true)
} else {
eval(i, rule, false)
}
}
if g.metrics != nil { if g.metrics != nil {
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal) g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load())
} }
g.cleanupStaleSeries(ctx, ts) g.cleanupStaleSeries(ctx, ts)
} }
@ -866,3 +885,109 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics {
return m return m
} }
// dependencyMap is a data-structure which contains the relationships between rules within a group.
// It is used to describe the dependency associations between recording rules in a group whereby one rule uses the
// output metric produced by another recording rule in its expression (i.e. as its "input").
type dependencyMap map[Rule][]Rule
// dependents returns all rules which use the output of the given rule as one of their inputs.
func (m dependencyMap) dependents(r Rule) []Rule {
if len(m) == 0 {
return nil
}
return m[r]
}
// dependencies returns all the rules on which the given rule is dependent for input.
func (m dependencyMap) dependencies(r Rule) []Rule {
if len(m) == 0 {
return nil
}
var parents []Rule
for parent, children := range m {
if len(children) == 0 {
continue
}
for _, child := range children {
if child == r {
parents = append(parents, parent)
}
}
}
return parents
}
func (m dependencyMap) isIndependent(r Rule) bool {
if m == nil {
return false
}
return len(m.dependents(r)) == 0 && len(m.dependencies(r)) == 0
}
// buildDependencyMap builds a data-structure which contains the relationships between rules within a group.
func buildDependencyMap(rules []Rule) dependencyMap {
dependencies := make(dependencyMap)
if len(rules) <= 1 {
// No relationships if group has 1 or fewer rules.
return nil
}
inputs := make(map[string][]Rule, len(rules))
outputs := make(map[string][]Rule, len(rules))
var indeterminate bool
for _, rule := range rules {
rule := rule
name := rule.Name()
outputs[name] = append(outputs[name], rule)
parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error {
if n, ok := node.(*parser.VectorSelector); ok {
// A wildcard metric expression means we cannot reliably determine if this rule depends on any other,
// which means we cannot safely run any rules concurrently.
if n.Name == "" && len(n.LabelMatchers) > 0 {
indeterminate = true
return nil
}
// Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour
// if they run concurrently.
if n.Name == alertMetricName || n.Name == alertForStateMetricName {
indeterminate = true
return nil
}
inputs[n.Name] = append(inputs[n.Name], rule)
}
return nil
})
}
if indeterminate {
return nil
}
if len(inputs) == 0 || len(outputs) == 0 {
// No relationships can be inferred.
return nil
}
for output, outRules := range outputs {
for _, outRule := range outRules {
if rs, found := inputs[output]; found && len(rs) > 0 {
dependencies[outRule] = append(dependencies[outRule], rs...)
}
}
}
return dependencies
}

View file

@ -26,6 +26,7 @@ import (
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"golang.org/x/sync/semaphore"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/model/rulefmt"
@ -114,6 +115,8 @@ type ManagerOptions struct {
OutageTolerance time.Duration OutageTolerance time.Duration
ForGracePeriod time.Duration ForGracePeriod time.Duration
ResendDelay time.Duration ResendDelay time.Duration
MaxConcurrentEvals int64
ConcurrentEvalSema *semaphore.Weighted
GroupLoader GroupLoader GroupLoader GroupLoader
Metrics *Metrics Metrics *Metrics
@ -130,6 +133,8 @@ func NewManager(o *ManagerOptions) *Manager {
o.GroupLoader = FileLoader{} o.GroupLoader = FileLoader{}
} }
o.ConcurrentEvalSema = semaphore.NewWeighted(o.MaxConcurrentEvals)
m := &Manager{ m := &Manager{
groups: map[string]*Group{}, groups: map[string]*Group{},
opts: o, opts: o,

View file

@ -19,15 +19,18 @@ import (
"math" "math"
"os" "os"
"sort" "sort"
"sync"
"testing" "testing"
"time" "time"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/goleak" "go.uber.org/goleak"
"golang.org/x/sync/semaphore"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
@ -1402,3 +1405,382 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
require.Equal(t, expHist, fh) require.Equal(t, expHist, fh)
require.Equal(t, chunkenc.ValNone, it.Next()) require.Equal(t, chunkenc.ValNone, it.Next())
} }
func TestDependencyMap(t *testing.T) {
ctx := context.Background()
opts := &ManagerOptions{
Context: ctx,
Logger: log.NewNopLogger(),
}
expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
require.NoError(t, err)
rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
expr, err = parser.ParseExpr("user:requests:rate1m <= 0")
require.NoError(t, err)
rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger())
group := NewGroup(GroupOptions{
Name: "rule_group",
Interval: time.Second,
Rules: []Rule{rule, rule2},
Opts: opts,
})
require.Equal(t, []Rule{rule2}, group.dependencyMap.dependents(rule))
require.Len(t, group.dependencyMap.dependencies(rule), 0)
require.False(t, group.dependencyMap.isIndependent(rule))
require.Len(t, group.dependencyMap.dependents(rule2), 0)
require.Equal(t, []Rule{rule}, group.dependencyMap.dependencies(rule2))
require.False(t, group.dependencyMap.isIndependent(rule2))
}
func TestNoDependency(t *testing.T) {
ctx := context.Background()
opts := &ManagerOptions{
Context: ctx,
Logger: log.NewNopLogger(),
}
expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
require.NoError(t, err)
rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
group := NewGroup(GroupOptions{
Name: "rule_group",
Interval: time.Second,
Rules: []Rule{rule},
Opts: opts,
})
// A group with only one rule cannot have dependencies.
require.False(t, group.dependencyMap.isIndependent(rule))
}
func TestNoMetricSelector(t *testing.T) {
ctx := context.Background()
opts := &ManagerOptions{
Context: ctx,
Logger: log.NewNopLogger(),
}
expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
require.NoError(t, err)
rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
expr, err = parser.ParseExpr(`count({user="bob"})`)
require.NoError(t, err)
rule2 := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
group := NewGroup(GroupOptions{
Name: "rule_group",
Interval: time.Second,
Rules: []Rule{rule, rule2},
Opts: opts,
})
// A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore
// all rules are not considered independent.
require.False(t, group.dependencyMap.isIndependent(rule))
require.False(t, group.dependencyMap.isIndependent(rule2))
}
func TestDependentRulesWithNonMetricExpression(t *testing.T) {
ctx := context.Background()
opts := &ManagerOptions{
Context: ctx,
Logger: log.NewNopLogger(),
}
expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
require.NoError(t, err)
rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
expr, err = parser.ParseExpr("user:requests:rate1m <= 0")
require.NoError(t, err)
rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger())
expr, err = parser.ParseExpr("3")
require.NoError(t, err)
rule3 := NewRecordingRule("three", expr, labels.Labels{})
group := NewGroup(GroupOptions{
Name: "rule_group",
Interval: time.Second,
Rules: []Rule{rule, rule2, rule3},
Opts: opts,
})
require.False(t, group.dependencyMap.isIndependent(rule))
require.False(t, group.dependencyMap.isIndependent(rule2))
require.True(t, group.dependencyMap.isIndependent(rule3))
}
func TestRulesDependentOnMetaMetrics(t *testing.T) {
ctx := context.Background()
opts := &ManagerOptions{
Context: ctx,
Logger: log.NewNopLogger(),
}
// This rule is not dependent on any other rules in its group but it does depend on `ALERTS`, which is produced by
// the rule engine, and is therefore not independent.
expr, err := parser.ParseExpr("count(ALERTS)")
require.NoError(t, err)
rule := NewRecordingRule("alert_count", expr, labels.Labels{})
// Create another rule so a dependency map is built (no map is built if a group contains one or fewer rules).
expr, err = parser.ParseExpr("1")
require.NoError(t, err)
rule2 := NewRecordingRule("one", expr, labels.Labels{})
group := NewGroup(GroupOptions{
Name: "rule_group",
Interval: time.Second,
Rules: []Rule{rule, rule2},
Opts: opts,
})
require.False(t, group.dependencyMap.isIndependent(rule))
}
func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
files := []string{"fixtures/rules.yaml"}
ruleManager := NewManager(&ManagerOptions{
Context: context.Background(),
Logger: log.NewNopLogger(),
})
ruleManager.start()
defer ruleManager.Stop()
err := ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
require.NoError(t, err)
require.Greater(t, len(ruleManager.groups), 0, "expected non-empty rule groups")
orig := make(map[string]dependencyMap, len(ruleManager.groups))
for _, g := range ruleManager.groups {
// No dependency map is expected because there is only one rule in the group.
require.Empty(t, g.dependencyMap)
orig[g.Name()] = g.dependencyMap
}
// Update once without changing groups.
err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
require.NoError(t, err)
for h, g := range ruleManager.groups {
// Dependency maps are the same because of no updates.
require.Equal(t, orig[h], g.dependencyMap)
}
// Groups will be recreated when updated.
files[0] = "fixtures/rules_dependencies.yaml"
err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
require.NoError(t, err)
for h, g := range ruleManager.groups {
// Dependency maps must change because the groups would've been updated.
require.NotEqual(t, orig[h], g.dependencyMap)
// We expect there to be some dependencies since the new rule group contains a dependency.
require.Greater(t, len(g.dependencyMap), 0)
}
}
func TestAsyncRuleEvaluation(t *testing.T) {
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })
const artificialDelay = time.Second
var (
inflightQueries atomic.Int32
maxInflight atomic.Int32
)
files := []string{"fixtures/rules_multiple.yaml"}
ruleManager := NewManager(&ManagerOptions{
Context: context.Background(),
Logger: log.NewNopLogger(),
Appendable: storage,
QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
inflightQueries.Add(1)
defer func() {
inflightQueries.Add(-1)
}()
// Artificially delay all query executions to highly concurrent execution improvement.
time.Sleep(artificialDelay)
// return a stub sample
return promql.Vector{
promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
}, nil
},
})
// Evaluate groups manually to show the impact of async rule evaluations.
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
require.Empty(t, errs)
require.Len(t, groups, 1)
expectedRules := 4
t.Run("synchronous evaluation with independent rules", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
for _, group := range groups {
require.Len(t, group.rules, expectedRules)
start := time.Now()
// Never expect more than 1 inflight query at a time.
go func() {
for {
select {
case <-ctx.Done():
return
default:
highWatermark := maxInflight.Load()
current := inflightQueries.Load()
if current > highWatermark {
maxInflight.Store(current)
}
time.Sleep(time.Millisecond)
}
}
}()
group.Eval(ctx, start)
require.EqualValues(t, 1, maxInflight.Load())
// Each rule should take at least 1 second to execute sequentially.
require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples))
}
cancel()
})
t.Run("asynchronous evaluation with independent rules", func(t *testing.T) {
// Reset.
inflightQueries.Store(0)
maxInflight.Store(0)
ctx, cancel := context.WithCancel(context.Background())
for _, group := range groups {
// Allow up to 2 concurrent rule evaluations.
group.opts.ConcurrentEvalSema = semaphore.NewWeighted(2)
require.Len(t, group.rules, expectedRules)
start := time.Now()
go func() {
for {
select {
case <-ctx.Done():
return
default:
highWatermark := maxInflight.Load()
current := inflightQueries.Load()
if current > highWatermark {
maxInflight.Store(current)
}
time.Sleep(time.Millisecond)
}
}
}()
group.Eval(ctx, start)
require.EqualValues(t, 3, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds())
// Each rule produces one vector.
require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples))
}
cancel()
})
}
func TestBoundedRuleEvalConcurrency(t *testing.T) {
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })
const artificialDelay = time.Millisecond * 100
var (
inflightQueries atomic.Int32
maxInflight atomic.Int32
maxConcurrency int64 = 3
groupCount = 2
)
files := []string{"fixtures/rules_multiple_groups.yaml"}
ruleManager := NewManager(&ManagerOptions{
Context: context.Background(),
Logger: log.NewNopLogger(),
Appendable: storage,
MaxConcurrentEvals: maxConcurrency,
QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
inflightQueries.Add(1)
defer func() {
inflightQueries.Add(-1)
}()
// Artificially delay all query executions to highly concurrent execution improvement.
time.Sleep(artificialDelay)
// return a stub sample
return promql.Vector{
promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
}, nil
},
})
// Evaluate groups manually to show the impact of async rule evaluations.
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
require.Empty(t, errs)
require.Len(t, groups, groupCount)
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-ctx.Done():
return
default:
highWatermark := maxInflight.Load()
current := inflightQueries.Load()
if current > highWatermark {
maxInflight.Store(current)
}
time.Sleep(time.Millisecond)
}
}
}()
// Evaluate groups concurrently (like they normally do).
var wg sync.WaitGroup
for _, group := range groups {
group := group
wg.Add(1)
go func() {
group.Eval(ctx, time.Now())
wg.Done()
}()
}
wg.Wait()
cancel()
// Synchronous queries also count towards inflight, so at most we can have maxConcurrency+$groupCount inflight evaluations.
require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount))
}