Remove storage hook and metrics for errors

Signed-off-by: Mustafain Ali Khan <mustalik@amazon.com>
This commit is contained in:
Mustafain Ali Khan 2024-07-30 10:18:03 -07:00
parent 5660122c5f
commit c727e0945b
6 changed files with 86 additions and 75 deletions

View file

@ -813,7 +813,7 @@ func main() {
queryEngine = promql.NewEngine(opts) queryEngine = promql.NewEngine(opts)
var alertStore rules.AlertStore var alertStore rules.AlertStore
if cfg.enableAlertStatePersistence { if cfg.enableAlertStatePersistence {
alertStore = rules.NewFileStore(logger.With("component", "alertStore"), cfg.alertStoragePath) alertStore = rules.NewFileStore(logger.With("component", "alertStore"), cfg.alertStoragePath, prometheus.DefaultRegisterer)
} }
ruleManager = rules.NewManager(&rules.ManagerOptions{ ruleManager = rules.NewManager(&rules.ManagerOptions{
@ -833,8 +833,7 @@ func main() {
DefaultRuleQueryOffset: func() time.Duration { DefaultRuleQueryOffset: func() time.Duration {
return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset) return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset)
}, },
AlertStore: alertStore, AlertStore: alertStore,
AlertStoreFunc: rules.DefaultAlertStoreFunc,
}) })
} }

View file

@ -75,7 +75,6 @@ type Group struct {
evalIterationFunc GroupEvalIterationFunc evalIterationFunc GroupEvalIterationFunc
appOpts *storage.AppendOptions appOpts *storage.AppendOptions
alertStoreFunc AlertStateStoreFunc
alertStore AlertStore alertStore AlertStore
} }
@ -96,7 +95,6 @@ type GroupOptions struct {
QueryOffset *time.Duration QueryOffset *time.Duration
done chan struct{} done chan struct{}
EvalIterationFunc GroupEvalIterationFunc EvalIterationFunc GroupEvalIterationFunc
AlertStoreFunc AlertStateStoreFunc
AlertStore AlertStore AlertStore AlertStore
} }
@ -128,11 +126,6 @@ func NewGroup(o GroupOptions) *Group {
evalIterationFunc = DefaultEvalIterationFunc evalIterationFunc = DefaultEvalIterationFunc
} }
alertStoreFunc := o.AlertStoreFunc
if alertStoreFunc == nil {
alertStoreFunc = DefaultAlertStoreFunc
}
if opts.Logger == nil { if opts.Logger == nil {
opts.Logger = promslog.NewNopLogger() opts.Logger = promslog.NewNopLogger()
} }
@ -154,7 +147,6 @@ func NewGroup(o GroupOptions) *Group {
metrics: metrics, metrics: metrics,
evalIterationFunc: evalIterationFunc, evalIterationFunc: evalIterationFunc,
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true}, appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
alertStoreFunc: alertStoreFunc,
alertStore: o.AlertStore, alertStore: o.AlertStore,
} }
} }
@ -554,7 +546,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
restoredAlerts, _ := g.alertStore.GetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name()))) restoredAlerts, _ := g.alertStore.GetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name())))
if len(restoredAlerts) > 0 { if len(restoredAlerts) > 0 {
ar.SetActiveAlerts(restoredAlerts) ar.SetActiveAlerts(restoredAlerts)
logger.Info("Restored alerts from store", "rule", ar.name, "alerts", len(restoredAlerts)) g.logger.Info("Restored alerts from store", "rule", ar.name, "alerts", len(restoredAlerts))
} }
} }
} }
@ -1190,3 +1182,35 @@ func buildDependencyMap(rules []Rule) dependencyMap {
return dependencies return dependencies
} }
// AlertStore provides persistent storage of alert state.
type AlertStore interface {
// SetAlerts stores the provided list of alerts for a rule.
SetAlerts(key uint64, groupKey string, alerts []*Alert) error
// GetAlerts returns a list of alerts for each alerting rule,
// alerting rule is identified by a fingerprint of its config.
GetAlerts(key uint64) (map[uint64]*Alert, error)
}
// StoreKeepFiringForState is periodically invoked to store the state of alerting rules using 'keep_firing_for'.
func (g *Group) StoreKeepFiringForState() {
for _, rule := range g.rules {
ar, ok := rule.(*AlertingRule)
if !ok {
continue
}
if ar.KeepFiringFor() != 0 {
alertsToStore := make([]*Alert, 0)
ar.ForEachActiveAlert(func(alert *Alert) {
if !alert.KeepFiringSince.IsZero() {
alertsToStore = append(alertsToStore, alert)
}
})
groupKey := GroupKey(g.File(), g.Name())
err := g.alertStore.SetAlerts(ar.GetFingerprint(groupKey), groupKey, alertsToStore)
if err != nil {
g.logger.Error("Failed to store alerting rule state", "rule", ar.Name(), "err", err)
}
}
}
}

View file

@ -89,33 +89,7 @@ func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time.
if g.alertStore != nil { if g.alertStore != nil {
// feature enabled. // feature enabled.
go func() { g.StoreKeepFiringForState()
g.alertStoreFunc(g)
}()
}
}
// DefaultAlertStoreFunc is the default implementation of
// AlertStateStoreFunc that is periodically invoked to store the state
// of alerting rules in a group at a given point in time.
func DefaultAlertStoreFunc(g *Group) {
for _, rule := range g.rules {
ar, ok := rule.(*AlertingRule)
if !ok {
continue
}
if ar.KeepFiringFor() != 0 {
alertsToStore := make([]*Alert, 0)
ar.ForEachActiveAlert(func(alert *Alert) {
if !alert.KeepFiringSince.IsZero() {
alertsToStore = append(alertsToStore, alert)
}
})
err := g.alertStore.SetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name())), alertsToStore)
if err != nil {
g.logger.Error("Failed to store alerting rule state", "rule", ar.Name(), "err", err)
}
}
} }
} }
@ -155,7 +129,6 @@ type ManagerOptions struct {
RuleConcurrencyController RuleConcurrencyController RuleConcurrencyController RuleConcurrencyController
RuleDependencyController RuleDependencyController RuleDependencyController RuleDependencyController
AlertStore AlertStore AlertStore AlertStore
AlertStoreFunc AlertStateStoreFunc
// At present, manager only restores `for` state when manager is newly created which happens // At present, manager only restores `for` state when manager is newly created which happens
// during restarts. This flag provides an option to restore the `for` state when new rule groups are // during restarts. This flag provides an option to restore the `for` state when new rule groups are
// added to an existing manager // added to an existing manager
@ -239,8 +212,6 @@ func (m *Manager) Stop() {
m.logger.Info("Rule manager stopped") m.logger.Info("Rule manager stopped")
} }
type AlertStateStoreFunc func(g *Group)
// Update the rule manager's state as the config requires. If // Update the rule manager's state as the config requires. If
// loading the new rules failed the old rule set is restored. // loading the new rules failed the old rule set is restored.
// This method will no-op in case the manager is already stopped. // This method will no-op in case the manager is already stopped.
@ -402,7 +373,6 @@ func (m *Manager) LoadGroups(
QueryOffset: (*time.Duration)(rg.QueryOffset), QueryOffset: (*time.Duration)(rg.QueryOffset),
done: m.done, done: m.done,
EvalIterationFunc: groupEvalIterationFunc, EvalIterationFunc: groupEvalIterationFunc,
AlertStoreFunc: m.opts.AlertStoreFunc,
AlertStore: m.opts.AlertStore, AlertStore: m.opts.AlertStore,
}) })
} }

View file

@ -2568,8 +2568,8 @@ func TestKeepFiringForStateRestore(t *testing.T) {
}, },
) )
alertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile)
ng := testEngine(t) ng := testEngine(t)
alertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile, prometheus.NewRegistry())
opts := &ManagerOptions{ opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(ng, testStorage), QueryFunc: EngineQueryFunc(ng, testStorage),
Appendable: testStorage, Appendable: testStorage,
@ -2608,13 +2608,12 @@ func TestKeepFiringForStateRestore(t *testing.T) {
) )
group := NewGroup(GroupOptions{ group := NewGroup(GroupOptions{
Name: "default", Name: "default",
Interval: time.Second, Interval: time.Second,
Rules: []Rule{rule, rule2}, Rules: []Rule{rule, rule2},
ShouldRestore: true, ShouldRestore: true,
Opts: opts, Opts: opts,
AlertStoreFunc: DefaultAlertStoreFunc, AlertStore: alertStore,
AlertStore: alertStore,
}) })
groups := make(map[string]*Group) groups := make(map[string]*Group)
@ -2657,7 +2656,7 @@ func TestKeepFiringForStateRestore(t *testing.T) {
group.Eval(opts.Context, evalTime) group.Eval(opts.Context, evalTime)
group.setLastEvalTimestamp(evalTime) group.setLastEvalTimestamp(evalTime)
// Manager will store alert state. // Manager will store alert state.
DefaultAlertStoreFunc(group) group.StoreKeepFiringForState()
} }
exp := rule.ActiveAlerts() exp := rule.ActiveAlerts()
@ -2683,16 +2682,15 @@ func TestKeepFiringForStateRestore(t *testing.T) {
labels.FromStrings("annotation_test", "rule2"), labels.EmptyLabels(), "", true, nil, labels.FromStrings("annotation_test", "rule2"), labels.EmptyLabels(), "", true, nil,
) )
// Restart alert store. // Restart alert store.
newAlertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile) newAlertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile, prometheus.NewRegistry())
newGroup := NewGroup(GroupOptions{ newGroup := NewGroup(GroupOptions{
Name: "default", Name: "default",
Interval: time.Second, Interval: time.Second,
Rules: []Rule{newRule, newRule2}, Rules: []Rule{newRule, newRule2},
ShouldRestore: true, ShouldRestore: true,
Opts: opts, Opts: opts,
AlertStore: newAlertStore, AlertStore: newAlertStore,
AlertStoreFunc: DefaultAlertStoreFunc,
}) })
newGroups := make(map[string]*Group) newGroups := make(map[string]*Group)

View file

@ -5,41 +5,57 @@ import (
"log/slog" "log/slog"
"os" "os"
"sync" "sync"
)
// AlertStore provides persistent storage of alert state. "github.com/prometheus/client_golang/prometheus"
type AlertStore interface { )
// SetAlerts stores the provided list of alerts for a rule.
SetAlerts(key uint64, alerts []*Alert) error
// GetAlerts returns a list of alerts for each alerting rule,
// alerting rule is identified by a fingerprint of its config.
GetAlerts(key uint64) (map[uint64]*Alert, error)
}
// FileStore implements the AlertStore interface. // FileStore implements the AlertStore interface.
type FileStore struct { type FileStore struct {
logger *slog.Logger logger *slog.Logger
alertsByRule map[uint64][]*Alert alertsByRule map[uint64][]*Alert
// protects the `alertsByRule` map. // protects the `alertsByRule` map.
stateMtx sync.RWMutex stateMtx sync.RWMutex
path string path string
registerer prometheus.Registerer
storeInitErrors prometheus.Counter
alertStoreErrors *prometheus.CounterVec
} }
func NewFileStore(l *slog.Logger, storagePath string) *FileStore { func NewFileStore(l *slog.Logger, storagePath string, registerer prometheus.Registerer) *FileStore {
s := &FileStore{ s := &FileStore{
logger: l, logger: l,
alertsByRule: make(map[uint64][]*Alert), alertsByRule: make(map[uint64][]*Alert),
path: storagePath, path: storagePath,
registerer: registerer,
} }
s.storeInitErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "alert_store_init_errors_total",
Help: "The total number of errors starting alert store.",
},
)
s.alertStoreErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "rule_group_alert_store_errors_total",
Help: "The total number of errors in alert store.",
},
[]string{"rule_group"},
)
s.initState() s.initState()
return s return s
} }
// initState reads the state from file storage into the alertsByRule map. // initState reads the state from file storage into the alertsByRule map.
func (s *FileStore) initState() { func (s *FileStore) initState() {
if s.registerer != nil {
s.registerer.MustRegister(s.alertStoreErrors, s.storeInitErrors)
}
file, err := os.OpenFile(s.path, os.O_RDWR|os.O_CREATE, 0o666) file, err := os.OpenFile(s.path, os.O_RDWR|os.O_CREATE, 0o666)
if err != nil { if err != nil {
s.logger.Error("Failed reading alerts state from file", "err", err) s.logger.Error("Failed reading alerts state from file", "err", err)
s.storeInitErrors.Inc()
return return
} }
defer file.Close() defer file.Close()
@ -48,6 +64,7 @@ func (s *FileStore) initState() {
err = json.NewDecoder(file).Decode(&alertsByRule) err = json.NewDecoder(file).Decode(&alertsByRule)
if err != nil { if err != nil {
s.logger.Error("Failed reading alerts state from file", "err", err) s.logger.Error("Failed reading alerts state from file", "err", err)
s.storeInitErrors.Inc()
} }
if alertsByRule == nil { if alertsByRule == nil {
alertsByRule = make(map[uint64][]*Alert) alertsByRule = make(map[uint64][]*Alert)
@ -77,7 +94,7 @@ func (s *FileStore) GetAlerts(key uint64) (map[uint64]*Alert, error) {
} }
// SetAlerts updates the stateByRule map and writes state to file storage. // SetAlerts updates the stateByRule map and writes state to file storage.
func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error { func (s *FileStore) SetAlerts(key uint64, groupKey string, alerts []*Alert) error {
s.stateMtx.Lock() s.stateMtx.Lock()
defer s.stateMtx.Unlock() defer s.stateMtx.Unlock()
@ -88,6 +105,7 @@ func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error {
// flush in memory state to file storage // flush in memory state to file storage
file, err := os.Create(s.path) file, err := os.Create(s.path)
if err != nil { if err != nil {
s.alertStoreErrors.WithLabelValues(groupKey).Inc()
return err return err
} }
defer file.Close() defer file.Close()
@ -95,6 +113,7 @@ func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error {
encoder := json.NewEncoder(file) encoder := json.NewEncoder(file)
err = encoder.Encode(s.alertsByRule) err = encoder.Encode(s.alertsByRule)
if err != nil { if err != nil {
s.alertStoreErrors.WithLabelValues(groupKey).Inc()
return err return err
} }
return nil return nil

View file

@ -5,6 +5,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/common/promslog" "github.com/prometheus/common/promslog"
@ -12,7 +13,7 @@ import (
) )
func TestAlertStore(t *testing.T) { func TestAlertStore(t *testing.T) {
alertStore := NewFileStore(promslog.NewNopLogger(), "alertstoretest") alertStore := NewFileStore(promslog.NewNopLogger(), "alertstoretest", prometheus.NewRegistry())
t.Cleanup(func() { t.Cleanup(func() {
os.Remove("alertstoretest") os.Remove("alertstoretest")
}) })
@ -29,7 +30,7 @@ func TestAlertStore(t *testing.T) {
for key, alerts := range alertsByRule { for key, alerts := range alertsByRule {
sortAlerts(alerts) sortAlerts(alerts)
err := alertStore.SetAlerts(key, alerts) err := alertStore.SetAlerts(key, "test/test1", alerts)
require.NoError(t, err) require.NoError(t, err)
got, err := alertStore.GetAlerts(key) got, err := alertStore.GetAlerts(key)