Remove storage hook and metrics for errors

Signed-off-by: Mustafain Ali Khan <mustalik@amazon.com>
This commit is contained in:
Mustafain Ali Khan 2024-07-30 10:18:03 -07:00
parent c53f2ee849
commit 60c2a5b448
6 changed files with 88 additions and 76 deletions

View file

@ -809,7 +809,7 @@ func main() {
queryEngine = promql.NewEngine(opts)
var alertStore rules.AlertStore
if cfg.enableAlertStatePersistence {
alertStore = rules.NewFileStore(logger.With("component", "alertStore"), cfg.alertStoragePath)
alertStore = rules.NewFileStore(logger.With("component", "alertStore"), cfg.alertStoragePath, prometheus.DefaultRegisterer)
}
ruleManager = rules.NewManager(&rules.ManagerOptions{
@ -829,8 +829,7 @@ func main() {
DefaultRuleQueryOffset: func() time.Duration {
return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset)
},
AlertStore: alertStore,
AlertStoreFunc: rules.DefaultAlertStoreFunc,
AlertStore: alertStore,
})
}

View file

@ -77,7 +77,6 @@ type Group struct {
// concurrencyController controls the rules evaluation concurrency.
concurrencyController RuleConcurrencyController
appOpts *storage.AppendOptions
alertStoreFunc AlertStateStoreFunc
alertStore AlertStore
}
@ -98,7 +97,6 @@ type GroupOptions struct {
QueryOffset *time.Duration
done chan struct{}
EvalIterationFunc GroupEvalIterationFunc
AlertStoreFunc AlertStateStoreFunc
AlertStore AlertStore
}
@ -130,11 +128,6 @@ func NewGroup(o GroupOptions) *Group {
evalIterationFunc = DefaultEvalIterationFunc
}
alertStoreFunc := o.AlertStoreFunc
if alertStoreFunc == nil {
alertStoreFunc = DefaultAlertStoreFunc
}
concurrencyController := opts.RuleConcurrencyController
if concurrencyController == nil {
concurrencyController = sequentialRuleEvalController{}
@ -160,7 +153,6 @@ func NewGroup(o GroupOptions) *Group {
logger: opts.Logger.With("file", o.File, "group", o.Name),
metrics: metrics,
evalIterationFunc: evalIterationFunc,
alertStoreFunc: alertStoreFunc,
alertStore: o.AlertStore,
concurrencyController: concurrencyController,
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
@ -554,7 +546,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
restoredAlerts, _ := g.alertStore.GetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name())))
if len(restoredAlerts) > 0 {
ar.SetActiveAlerts(restoredAlerts)
logger.Info("Restored alerts from store", "rule", ar.name, "alerts", len(restoredAlerts))
g.logger.Info("Restored alerts from store", "rule", ar.name, "alerts", len(restoredAlerts))
}
}
}
@ -1157,3 +1149,35 @@ func buildDependencyMap(rules []Rule) dependencyMap {
return dependencies
}
// AlertStore provides persistent storage of alert state.
type AlertStore interface {
// SetAlerts stores the provided list of alerts for a rule.
SetAlerts(key uint64, groupKey string, alerts []*Alert) error
// GetAlerts returns a list of alerts for each alerting rule,
// alerting rule is identified by a fingerprint of its config.
GetAlerts(key uint64) (map[uint64]*Alert, error)
}
// StoreKeepFiringForState is periodically invoked to store the state of alerting rules using 'keep_firing_for'.
func (g *Group) StoreKeepFiringForState() {
for _, rule := range g.rules {
ar, ok := rule.(*AlertingRule)
if !ok {
continue
}
if ar.KeepFiringFor() != 0 {
alertsToStore := make([]*Alert, 0)
ar.ForEachActiveAlert(func(alert *Alert) {
if !alert.KeepFiringSince.IsZero() {
alertsToStore = append(alertsToStore, alert)
}
})
groupKey := GroupKey(g.File(), g.Name())
err := g.alertStore.SetAlerts(ar.GetFingerprint(groupKey), groupKey, alertsToStore)
if err != nil {
g.logger.Error("Failed to store alerting rule state", "rule", ar.Name(), "err", err)
}
}
}
}

View file

@ -89,33 +89,7 @@ func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time.
if g.alertStore != nil {
// feature enabled.
go func() {
g.alertStoreFunc(g)
}()
}
}
// DefaultAlertStoreFunc is the default implementation of
// AlertStateStoreFunc that is periodically invoked to store the state
// of alerting rules in a group at a given point in time.
func DefaultAlertStoreFunc(g *Group) {
for _, rule := range g.rules {
ar, ok := rule.(*AlertingRule)
if !ok {
continue
}
if ar.KeepFiringFor() != 0 {
alertsToStore := make([]*Alert, 0)
ar.ForEachActiveAlert(func(alert *Alert) {
if !alert.KeepFiringSince.IsZero() {
alertsToStore = append(alertsToStore, alert)
}
})
err := g.alertStore.SetAlerts(ar.GetFingerprint(GroupKey(g.File(), g.Name())), alertsToStore)
if err != nil {
g.logger.Error("Failed to store alerting rule state", "rule", ar.Name(), "err", err)
}
}
g.StoreKeepFiringForState()
}
}
@ -153,9 +127,9 @@ type ManagerOptions struct {
ConcurrentEvalsEnabled bool
RuleConcurrencyController RuleConcurrencyController
RuleDependencyController RuleDependencyController
Metrics *Metrics
AlertStore AlertStore
AlertStoreFunc AlertStateStoreFunc
Metrics *Metrics
}
// NewManager returns an implementation of Manager, ready to be started
@ -225,8 +199,6 @@ func (m *Manager) Stop() {
m.logger.Info("Rule manager stopped")
}
type AlertStateStoreFunc func(g *Group)
// Update the rule manager's state as the config requires. If
// loading the new rules failed the old rule set is restored.
// This method will no-op in case the manager is already stopped.
@ -388,7 +360,6 @@ func (m *Manager) LoadGroups(
QueryOffset: (*time.Duration)(rg.QueryOffset),
done: m.done,
EvalIterationFunc: groupEvalIterationFunc,
AlertStoreFunc: m.opts.AlertStoreFunc,
AlertStore: m.opts.AlertStore,
})
}

View file

@ -2283,8 +2283,8 @@ func TestKeepFiringForStateRestore(t *testing.T) {
},
)
alertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile)
ng := testEngine(t)
alertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile, prometheus.NewRegistry())
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(ng, testStorage),
Appendable: testStorage,
@ -2323,13 +2323,12 @@ func TestKeepFiringForStateRestore(t *testing.T) {
)
group := NewGroup(GroupOptions{
Name: "default",
Interval: time.Second,
Rules: []Rule{rule, rule2},
ShouldRestore: true,
Opts: opts,
AlertStoreFunc: DefaultAlertStoreFunc,
AlertStore: alertStore,
Name: "default",
Interval: time.Second,
Rules: []Rule{rule, rule2},
ShouldRestore: true,
Opts: opts,
AlertStore: alertStore,
})
groups := make(map[string]*Group)
@ -2372,7 +2371,7 @@ func TestKeepFiringForStateRestore(t *testing.T) {
group.Eval(opts.Context, evalTime)
group.setLastEvalTimestamp(evalTime)
// Manager will store alert state.
DefaultAlertStoreFunc(group)
group.StoreKeepFiringForState()
}
exp := rule.ActiveAlerts()
@ -2398,16 +2397,15 @@ func TestKeepFiringForStateRestore(t *testing.T) {
labels.FromStrings("annotation_test", "rule2"), labels.EmptyLabels(), "", true, nil,
)
// Restart alert store.
newAlertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile)
newAlertStore := NewFileStore(promslog.NewNopLogger(), testStoreFile, prometheus.NewRegistry())
newGroup := NewGroup(GroupOptions{
Name: "default",
Interval: time.Second,
Rules: []Rule{newRule, newRule2},
ShouldRestore: true,
Opts: opts,
AlertStore: newAlertStore,
AlertStoreFunc: DefaultAlertStoreFunc,
Name: "default",
Interval: time.Second,
Rules: []Rule{newRule, newRule2},
ShouldRestore: true,
Opts: opts,
AlertStore: newAlertStore,
})
newGroups := make(map[string]*Group)

View file

@ -5,41 +5,57 @@ import (
"log/slog"
"os"
"sync"
)
// AlertStore provides persistent storage of alert state.
type AlertStore interface {
// SetAlerts stores the provided list of alerts for a rule.
SetAlerts(key uint64, alerts []*Alert) error
// GetAlerts returns a list of alerts for each alerting rule,
// alerting rule is identified by a fingerprint of its config.
GetAlerts(key uint64) (map[uint64]*Alert, error)
}
"github.com/prometheus/client_golang/prometheus"
)
// FileStore implements the AlertStore interface.
type FileStore struct {
logger *slog.Logger
alertsByRule map[uint64][]*Alert
// protects the `alertsByRule` map.
stateMtx sync.RWMutex
path string
stateMtx sync.RWMutex
path string
registerer prometheus.Registerer
storeInitErrors prometheus.Counter
alertStoreErrors *prometheus.CounterVec
}
func NewFileStore(l *slog.Logger, storagePath string) *FileStore {
func NewFileStore(l *slog.Logger, storagePath string, registerer prometheus.Registerer) *FileStore {
s := &FileStore{
logger: l,
alertsByRule: make(map[uint64][]*Alert),
path: storagePath,
registerer: registerer,
}
s.storeInitErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "alert_store_init_errors_total",
Help: "The total number of errors starting alert store.",
},
)
s.alertStoreErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "rule_group_alert_store_errors_total",
Help: "The total number of errors in alert store.",
},
[]string{"rule_group"},
)
s.initState()
return s
}
// initState reads the state from file storage into the alertsByRule map.
func (s *FileStore) initState() {
if s.registerer != nil {
s.registerer.MustRegister(s.alertStoreErrors, s.storeInitErrors)
}
file, err := os.OpenFile(s.path, os.O_RDWR|os.O_CREATE, 0o666)
if err != nil {
s.logger.Error("Failed reading alerts state from file", "err", err)
s.storeInitErrors.Inc()
return
}
defer file.Close()
@ -48,6 +64,7 @@ func (s *FileStore) initState() {
err = json.NewDecoder(file).Decode(&alertsByRule)
if err != nil {
s.logger.Error("Failed reading alerts state from file", "err", err)
s.storeInitErrors.Inc()
}
if alertsByRule == nil {
alertsByRule = make(map[uint64][]*Alert)
@ -77,7 +94,7 @@ func (s *FileStore) GetAlerts(key uint64) (map[uint64]*Alert, error) {
}
// SetAlerts updates the stateByRule map and writes state to file storage.
func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error {
func (s *FileStore) SetAlerts(key uint64, groupKey string, alerts []*Alert) error {
s.stateMtx.Lock()
defer s.stateMtx.Unlock()
@ -88,6 +105,7 @@ func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error {
// flush in memory state to file storage
file, err := os.Create(s.path)
if err != nil {
s.alertStoreErrors.WithLabelValues(groupKey).Inc()
return err
}
defer file.Close()
@ -95,6 +113,7 @@ func (s *FileStore) SetAlerts(key uint64, alerts []*Alert) error {
encoder := json.NewEncoder(file)
err = encoder.Encode(s.alertsByRule)
if err != nil {
s.alertStoreErrors.WithLabelValues(groupKey).Inc()
return err
}
return nil

View file

@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/prometheus/common/promslog"
@ -12,7 +13,7 @@ import (
)
func TestAlertStore(t *testing.T) {
alertStore := NewFileStore(promslog.NewNopLogger(), "alertstoretest")
alertStore := NewFileStore(promslog.NewNopLogger(), "alertstoretest", prometheus.NewRegistry())
t.Cleanup(func() {
os.Remove("alertstoretest")
})
@ -29,7 +30,7 @@ func TestAlertStore(t *testing.T) {
for key, alerts := range alertsByRule {
sortAlerts(alerts)
err := alertStore.SetAlerts(key, alerts)
err := alertStore.SetAlerts(key, "test/test1", alerts)
require.NoError(t, err)
got, err := alertStore.GetAlerts(key)