Option to align rule group's evaluation time to interval (#400)

* Allow rule groups evaluation timestamp to be aligned on the evaluation interval.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
This commit is contained in:
Peter Štibraný 2023-01-19 14:51:26 +01:00 committed by GitHub
parent fa6d2a8ede
commit 806e71e828
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 163 additions and 58 deletions

View file

@ -135,12 +135,13 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) {
// RuleGroup is a list of sequentially evaluated recording and alerting rules.
type RuleGroup struct {
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
EvaluationDelay *model.Duration `yaml:"evaluation_delay,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"`
SourceTenants []string `yaml:"source_tenants,omitempty"`
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
EvaluationDelay *model.Duration `yaml:"evaluation_delay,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"`
SourceTenants []string `yaml:"source_tenants,omitempty"`
AlignExecutionTimeOnInterval bool `yaml:"align_execution_time_on_interval,omitempty"`
}
// Rule describes an alerting or recording rule.

View file

@ -0,0 +1,27 @@
groups:
- name: aligned
align_execution_time_on_interval: true
interval: 5m
rules:
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- name: aligned_with_crazy_interval
align_execution_time_on_interval: true
interval: 1m27s
rules:
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- name: unaligned_default
interval: 5m
rules:
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- name: unaligned_explicit
interval: 5m
align_execution_time_on_interval: false
rules:
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))

View file

@ -270,7 +270,8 @@ type Group struct {
metrics *Metrics
ruleGroupPostProcessFunc RuleGroupPostProcessFunc
ruleGroupPostProcessFunc RuleGroupPostProcessFunc
alignExecutionTimeOnInterval bool
}
// This function will be used before each rule group evaluation if not nil.
@ -278,16 +279,17 @@ type Group struct {
type RuleGroupPostProcessFunc func(g *Group, lastEvalTimestamp time.Time, log log.Logger) error
type GroupOptions struct {
Name, File string
Interval time.Duration
Limit int
Rules []Rule
SourceTenants []string
ShouldRestore bool
Opts *ManagerOptions
EvaluationDelay *time.Duration
done chan struct{}
RuleGroupPostProcessFunc RuleGroupPostProcessFunc
Name, File string
Interval time.Duration
Limit int
Rules []Rule
SourceTenants []string
ShouldRestore bool
Opts *ManagerOptions
EvaluationDelay *time.Duration
done chan struct{}
RuleGroupPostProcessFunc RuleGroupPostProcessFunc
AlignExecutionTimeOnInterval bool
}
// NewGroup makes a new Group with the given name, options, and rules.
@ -309,22 +311,23 @@ func NewGroup(o GroupOptions) *Group {
metrics.GroupInterval.WithLabelValues(key).Set(o.Interval.Seconds())
return &Group{
name: o.Name,
file: o.File,
interval: o.Interval,
evaluationDelay: o.EvaluationDelay,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: o.Opts,
sourceTenants: o.SourceTenants,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}),
managerDone: o.done,
terminated: make(chan struct{}),
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
metrics: metrics,
ruleGroupPostProcessFunc: o.RuleGroupPostProcessFunc,
name: o.Name,
file: o.File,
interval: o.Interval,
evaluationDelay: o.EvaluationDelay,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: o.Opts,
sourceTenants: o.SourceTenants,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}),
managerDone: o.done,
terminated: make(chan struct{}),
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
metrics: metrics,
ruleGroupPostProcessFunc: o.RuleGroupPostProcessFunc,
alignExecutionTimeOnInterval: o.AlignExecutionTimeOnInterval,
}
}
@ -547,11 +550,13 @@ func (g *Group) setLastEvaluation(ts time.Time) {
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
func (g *Group) EvalTimestamp(startTime int64) time.Time {
var (
var offset int64
if !g.alignExecutionTimeOnInterval {
offset = int64(g.hash() % uint64(g.interval))
adjNow = startTime - offset
base = adjNow - (adjNow % int64(g.interval))
)
}
adjNow := startTime - offset
base := adjNow - (adjNow % int64(g.interval))
return time.Unix(0, base+offset).UTC()
}
@ -928,6 +933,10 @@ func (g *Group) Equals(ng *Group) bool {
return false
}
if g.alignExecutionTimeOnInterval != ng.alignExecutionTimeOnInterval {
return false
}
for i, gr := range g.rules {
if gr.String() != ng.rules[i].String() {
return false
@ -1196,17 +1205,18 @@ func (m *Manager) LoadGroups(
}
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name,
File: fn,
Interval: itv,
Limit: rg.Limit,
Rules: rules,
SourceTenants: rg.SourceTenants,
ShouldRestore: shouldRestore,
Opts: m.opts,
EvaluationDelay: (*time.Duration)(rg.EvaluationDelay),
done: m.done,
RuleGroupPostProcessFunc: ruleGroupPostProcessFunc,
Name: rg.Name,
File: fn,
Interval: itv,
Limit: rg.Limit,
Rules: rules,
SourceTenants: rg.SourceTenants,
ShouldRestore: shouldRestore,
Opts: m.opts,
EvaluationDelay: (*time.Duration)(rg.EvaluationDelay),
done: m.done,
RuleGroupPostProcessFunc: ruleGroupPostProcessFunc,
AlignExecutionTimeOnInterval: rg.AlignExecutionTimeOnInterval,
})
}
}

View file

@ -919,6 +919,71 @@ func TestUpdateSetsSourceTenants(t *testing.T) {
}
}
func TestAlignEvaluationTimeOnInterval(t *testing.T) {
st := teststorage.New(t)
defer st.Close()
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: st,
Queryable: st,
QueryFunc: EngineQueryFunc(engine, st),
Context: context.Background(),
Logger: log.NewNopLogger(),
})
ruleManager.start()
defer ruleManager.Stop()
rgs, errs := rulefmt.ParseFile("fixtures/rules_with_alignment.yaml")
require.Empty(t, errs, "file parsing failures")
tmpFile, err := os.CreateTemp("", "rules.test.*.yaml")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
reloadRules(rgs, t, tmpFile, ruleManager, 0)
// Verify that all groups are loaded, and let's check their evaluation times.
loadedGroups := ruleManager.RuleGroups()
require.Len(t, loadedGroups, len(rgs.Groups))
assertGroupEvalTimeAlignedOnIntervalIsHonored := func(groupName string, expectedAligned bool) {
g := (*Group)(nil)
for _, lg := range loadedGroups {
if lg.name == groupName {
g = lg
break
}
}
require.NotNil(t, g, "group not found: %s", groupName)
// When "g.hash() % g.interval == 0" alignment cannot be checked, because aligned and unaligned eval timestamps
// would be the same. This can happen because g.hash() depends on path passed to ruleManager.Update function,
// and this test uses temporary directory for storing rule group files.
if g.hash()%uint64(g.interval) == 0 {
t.Skip("skipping test, because rule group hash is divisible by interval, which makes eval timestamp always aligned to the interval")
}
now := time.Now()
ts := g.EvalTimestamp(now.UnixNano())
aligned := ts.UnixNano()%g.interval.Nanoseconds() == 0
require.Equal(t, expectedAligned, aligned, "group: %s, hash: %d, now: %d", groupName, g.hash(), now.UnixNano())
}
assertGroupEvalTimeAlignedOnIntervalIsHonored("aligned", true)
assertGroupEvalTimeAlignedOnIntervalIsHonored("aligned_with_crazy_interval", true)
assertGroupEvalTimeAlignedOnIntervalIsHonored("unaligned_default", false)
assertGroupEvalTimeAlignedOnIntervalIsHonored("unaligned_explicit", false)
}
func TestGroupEvaluationContextFuncIsCalledWhenSupplied(t *testing.T) {
type testContextKeyType string
var testContextKey testContextKeyType = "TestGroupEvaluationContextFuncIsCalledWhenSupplied"
@ -975,11 +1040,12 @@ type ruleGroupsTest struct {
// ruleGroupTest forms a testing struct for running tests over rules.
type ruleGroupTest struct {
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []rulefmt.Rule `yaml:"rules"`
SourceTenants []string `yaml:"source_tenants,omitempty"`
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []rulefmt.Rule `yaml:"rules"`
SourceTenants []string `yaml:"source_tenants,omitempty"`
AlignExecutionTimeOnInterval bool `yaml:"align_execution_time_on_interval,omitempty"`
}
func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest {
@ -998,11 +1064,12 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest {
})
}
tmp = append(tmp, ruleGroupTest{
Name: g.Name,
Interval: g.Interval,
Limit: g.Limit,
Rules: rtmp,
SourceTenants: g.SourceTenants,
Name: g.Name,
Interval: g.Interval,
Limit: g.Limit,
Rules: rtmp,
SourceTenants: g.SourceTenants,
AlignExecutionTimeOnInterval: g.AlignExecutionTimeOnInterval,
})
}
return ruleGroupsTest{