mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-24 21:24:05 -08:00
Add an option to restore new rule groups added to existing rule manager
Signed-off-by: Anand Rajagopal <anrajag@amazon.com>
This commit is contained in:
parent
da53bad3f0
commit
ceb2f653ba
6
rules/fixtures/alert_rule.yaml
Normal file
6
rules/fixtures/alert_rule.yaml
Normal file
|
@ -0,0 +1,6 @@
|
|||
groups:
|
||||
- name: test
|
||||
interval: 1s
|
||||
rules:
|
||||
- alert: rule1
|
||||
expr: 1 < bool 2
|
6
rules/fixtures/alert_rule1.yaml
Normal file
6
rules/fixtures/alert_rule1.yaml
Normal file
|
@ -0,0 +1,6 @@
|
|||
groups:
|
||||
- name: test2
|
||||
interval: 1s
|
||||
rules:
|
||||
- alert: rule2
|
||||
expr: 1 < bool 2
|
|
@ -95,6 +95,7 @@ type Manager struct {
|
|||
block chan struct{}
|
||||
done chan struct{}
|
||||
restored bool
|
||||
restoreNewRuleGroups bool
|
||||
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
@ -121,6 +122,10 @@ type ManagerOptions struct {
|
|||
ConcurrentEvalsEnabled bool
|
||||
RuleConcurrencyController RuleConcurrencyController
|
||||
RuleDependencyController RuleDependencyController
|
||||
// At present, manager only restores `for` state when manager is newly created which happens
|
||||
// during restarts. This flag provides an option to restore the `for` state when new rule groups are
|
||||
// added to an existing manager
|
||||
RestoreNewRuleGroups bool
|
||||
|
||||
Metrics *Metrics
|
||||
}
|
||||
|
@ -158,6 +163,7 @@ func NewManager(o *ManagerOptions) *Manager {
|
|||
block: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
logger: o.Logger,
|
||||
restoreNewRuleGroups: o.RestoreNewRuleGroups,
|
||||
}
|
||||
|
||||
return m
|
||||
|
@ -295,7 +301,7 @@ func (m *Manager) LoadGroups(
|
|||
) (map[string]*Group, []error) {
|
||||
groups := make(map[string]*Group)
|
||||
|
||||
shouldRestore := !m.restored
|
||||
shouldRestore := !m.restored || m.restoreNewRuleGroups
|
||||
|
||||
for _, fn := range filenames {
|
||||
rgs, errs := m.opts.GroupLoader.Load(fn)
|
||||
|
@ -328,7 +334,7 @@ func (m *Manager) LoadGroups(
|
|||
labels.FromMap(r.Annotations),
|
||||
externalLabels,
|
||||
externalURL,
|
||||
m.restored,
|
||||
!shouldRestore,
|
||||
m.logger.With("alert", r.Alert),
|
||||
))
|
||||
continue
|
||||
|
|
|
@ -2112,6 +2112,139 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestNewRuleGroupRestoration(t *testing.T) {
|
||||
store := teststorage.New(t)
|
||||
t.Cleanup(func() { store.Close() })
|
||||
var (
|
||||
inflightQueries atomic.Int32
|
||||
maxInflight atomic.Int32
|
||||
maxConcurrency int64
|
||||
interval = 60 * time.Second
|
||||
)
|
||||
|
||||
waitForEvaluations := func(t *testing.T, ch <-chan int32, targetCount int32) {
|
||||
for {
|
||||
select {
|
||||
case cnt := <-ch:
|
||||
if cnt == targetCount {
|
||||
return
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
files := []string{"fixtures/alert_rule.yaml"}
|
||||
|
||||
option := optsFactory(store, &maxInflight, &inflightQueries, maxConcurrency)
|
||||
option.Queryable = store
|
||||
option.Appendable = store
|
||||
option.NotifyFunc = func(ctx context.Context, expr string, alerts ...*Alert) {}
|
||||
|
||||
var evalCount atomic.Int32
|
||||
ch := make(chan int32)
|
||||
noopEvalIterFunc := func(ctx context.Context, g *Group, evalTimestamp time.Time) {
|
||||
evalCount.Inc()
|
||||
ch <- evalCount.Load()
|
||||
}
|
||||
|
||||
ruleManager := NewManager(option)
|
||||
go ruleManager.Run()
|
||||
err := ruleManager.Update(interval, files, labels.EmptyLabels(), "", noopEvalIterFunc)
|
||||
require.NoError(t, err)
|
||||
|
||||
waitForEvaluations(t, ch, 3)
|
||||
require.Equal(t, int32(3), evalCount.Load())
|
||||
ruleGroups := make(map[string]struct{})
|
||||
for _, group := range ruleManager.groups {
|
||||
ruleGroups[group.Name()] = struct{}{}
|
||||
require.False(t, group.shouldRestore)
|
||||
for _, rule := range group.rules {
|
||||
require.True(t, rule.(*AlertingRule).restored.Load())
|
||||
}
|
||||
}
|
||||
|
||||
files = append(files, "fixtures/alert_rule1.yaml")
|
||||
err = ruleManager.Update(interval, files, labels.EmptyLabels(), "", nil)
|
||||
require.NoError(t, err)
|
||||
ruleManager.Stop()
|
||||
for _, group := range ruleManager.groups {
|
||||
// new rule groups added to existing manager will not be restored
|
||||
require.False(t, group.shouldRestore)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewRuleGroupRestorationWithRestoreNewGroupOption(t *testing.T) {
|
||||
store := teststorage.New(t)
|
||||
t.Cleanup(func() { store.Close() })
|
||||
var (
|
||||
inflightQueries atomic.Int32
|
||||
maxInflight atomic.Int32
|
||||
maxConcurrency int64
|
||||
interval = 60 * time.Second
|
||||
)
|
||||
|
||||
waitForEvaluations := func(t *testing.T, ch <-chan int32, targetCount int32) {
|
||||
for {
|
||||
select {
|
||||
case cnt := <-ch:
|
||||
if cnt == targetCount {
|
||||
return
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
files := []string{"fixtures/alert_rule.yaml"}
|
||||
|
||||
option := optsFactory(store, &maxInflight, &inflightQueries, maxConcurrency)
|
||||
option.Queryable = store
|
||||
option.Appendable = store
|
||||
option.RestoreNewRuleGroups = true
|
||||
option.NotifyFunc = func(ctx context.Context, expr string, alerts ...*Alert) {}
|
||||
|
||||
var evalCount atomic.Int32
|
||||
ch := make(chan int32)
|
||||
noopEvalIterFunc := func(ctx context.Context, g *Group, evalTimestamp time.Time) {
|
||||
evalCount.Inc()
|
||||
ch <- evalCount.Load()
|
||||
}
|
||||
|
||||
ruleManager := NewManager(option)
|
||||
go ruleManager.Run()
|
||||
err := ruleManager.Update(interval, files, labels.EmptyLabels(), "", noopEvalIterFunc)
|
||||
require.NoError(t, err)
|
||||
|
||||
waitForEvaluations(t, ch, 3)
|
||||
require.Equal(t, int32(3), evalCount.Load())
|
||||
ruleGroups := make(map[string]struct{})
|
||||
for _, group := range ruleManager.groups {
|
||||
ruleGroups[group.Name()] = struct{}{}
|
||||
require.False(t, group.shouldRestore)
|
||||
for _, rule := range group.rules {
|
||||
require.True(t, rule.(*AlertingRule).restored.Load())
|
||||
}
|
||||
}
|
||||
|
||||
files = append(files, "fixtures/alert_rule1.yaml")
|
||||
err = ruleManager.Update(interval, files, labels.EmptyLabels(), "", nil)
|
||||
require.NoError(t, err)
|
||||
// stop eval
|
||||
ruleManager.Stop()
|
||||
for _, group := range ruleManager.groups {
|
||||
if _, OK := ruleGroups[group.Name()]; OK {
|
||||
// already restored
|
||||
require.False(t, group.shouldRestore)
|
||||
continue
|
||||
}
|
||||
// new rule groups added to existing manager will be restored
|
||||
require.True(t, group.shouldRestore)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoundedRuleEvalConcurrency(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
|
Loading…
Reference in a new issue