mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 05:47:27 -08:00
Add SyncForState Implementation for Ruler HA (#10070)
* continuously syncing activeAt for alerts Signed-off-by: Yijie Qin <qinyijie@amazon.com> Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * add import Signed-off-by: Yijie Qin <qinyijie@amazon.com> Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Refactor SyncForState and add unit tests Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Format code Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Add hook for syncForState Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix go lint Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Refactor syncForState override implementation Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Add syncForState override func as argument to Update() Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix go formatting Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix circleci test errors Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Remove overrideFunc as argument to run() Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * remove the syncForState Signed-off-by: Yijie Qin <qinyijie@amazon.com> * use the override function to decide if need to replace the activeAt or not Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix test case Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix format Signed-off-by: Yijie Qin <qinyijie@amazon.com> * Trigger build Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fixing comments Signed-off-by: Yijie Qin <qinyijie@amazon.com> * return the result of map of alerts instead of single one Signed-off-by: Yijie Qin <qinyijie@amazon.com> * upper case the QueryforStateSeries Signed-off-by: Yijie Qin <qinyijie@amazon.com> * use a more generic rule group post process function type Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix indentation Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix gofmt Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix lint Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fixing naming Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix comments Signed-off-by: Yijie Qin <qinyijie@amazon.com> * add the lastEvalTimestamp as parameter Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fmt Signed-off-by: Yijie Qin <qinyijie@amazon.com> * change funcType to func Signed-off-by: Yijie Qin <qinyijie@amazon.com> Co-authored-by: Yijie Qin <qinyijie@amazon.com> Co-authored-by: Yijie Qin <63399121+qinxx108@users.noreply.github.com>
This commit is contained in:
parent
b13aec9167
commit
83a2e52bc2
|
@ -724,6 +724,7 @@ func main() {
|
|||
files,
|
||||
cfg.GlobalConfig.ExternalLabels,
|
||||
externalURL,
|
||||
nil,
|
||||
)
|
||||
},
|
||||
}, {
|
||||
|
|
|
@ -70,7 +70,7 @@ func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient que
|
|||
|
||||
// loadGroups parses groups from a list of recording rule files.
|
||||
func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) {
|
||||
groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, "", filenames...)
|
||||
groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, "", nil, filenames...)
|
||||
if errs != nil {
|
||||
return errs
|
||||
}
|
||||
|
|
|
@ -170,7 +170,7 @@ func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]i
|
|||
Logger: log.NewNopLogger(),
|
||||
}
|
||||
m := rules.NewManager(opts)
|
||||
groupsMap, ers := m.LoadGroups(time.Duration(tg.Interval), tg.ExternalLabels, tg.ExternalURL, ruleFiles...)
|
||||
groupsMap, ers := m.LoadGroups(time.Duration(tg.Interval), tg.ExternalLabels, tg.ExternalURL, nil, ruleFiles...)
|
||||
if ers != nil {
|
||||
return ers
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/template"
|
||||
"github.com/prometheus/prometheus/util/strutil"
|
||||
)
|
||||
|
@ -258,6 +259,33 @@ func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) pro
|
|||
return s
|
||||
}
|
||||
|
||||
// QueryforStateSeries returns the series for ALERTS_FOR_STATE.
|
||||
func (r *AlertingRule) QueryforStateSeries(alert *Alert, q storage.Querier) (storage.Series, error) {
|
||||
smpl := r.forStateSample(alert, time.Now(), 0)
|
||||
var matchers []*labels.Matcher
|
||||
for _, l := range smpl.Metric {
|
||||
mt, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
matchers = append(matchers, mt)
|
||||
}
|
||||
sset := q.Select(false, nil, matchers...)
|
||||
|
||||
var s storage.Series
|
||||
for sset.Next() {
|
||||
// Query assures that smpl.Metric is included in sset.At().Labels(),
|
||||
// hence just checking the length would act like equality.
|
||||
// (This is faster than calling labels.Compare again as we already have some info).
|
||||
if len(sset.At().Labels()) == len(matchers) {
|
||||
s = sset.At()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return s, sset.Err()
|
||||
}
|
||||
|
||||
// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation.
|
||||
func (r *AlertingRule) SetEvaluationDuration(dur time.Duration) {
|
||||
r.mtx.Lock()
|
||||
|
|
|
@ -20,12 +20,14 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
)
|
||||
|
||||
|
@ -518,3 +520,83 @@ func TestAlertingRuleLimit(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryForStateSeries(t *testing.T) {
|
||||
testError := errors.New("test error")
|
||||
|
||||
type testInput struct {
|
||||
selectMockFunction func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet
|
||||
expectedSeries storage.Series
|
||||
expectedError error
|
||||
}
|
||||
|
||||
tests := []testInput{
|
||||
// Test for empty series.
|
||||
{
|
||||
selectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
return storage.EmptySeriesSet()
|
||||
},
|
||||
expectedSeries: nil,
|
||||
expectedError: nil,
|
||||
},
|
||||
// Test for error series.
|
||||
{
|
||||
selectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
return storage.ErrSeriesSet(testError)
|
||||
},
|
||||
expectedSeries: nil,
|
||||
expectedError: testError,
|
||||
},
|
||||
// Test for mock series.
|
||||
{
|
||||
selectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
return storage.TestSeriesSet(storage.MockSeries(
|
||||
[]int64{1, 2, 3},
|
||||
[]float64{1, 2, 3},
|
||||
[]string{"__name__", "ALERTS_FOR_STATE", "alertname", "TestRule", "severity", "critical"},
|
||||
))
|
||||
},
|
||||
expectedSeries: storage.MockSeries(
|
||||
[]int64{1, 2, 3},
|
||||
[]float64{1, 2, 3},
|
||||
[]string{"__name__", "ALERTS_FOR_STATE", "alertname", "TestRule", "severity", "critical"},
|
||||
),
|
||||
expectedError: nil,
|
||||
},
|
||||
}
|
||||
|
||||
testFunc := func(tst testInput) {
|
||||
querier := &storage.MockQuerier{
|
||||
SelectMockFunction: tst.selectMockFunction,
|
||||
}
|
||||
|
||||
rule := NewAlertingRule(
|
||||
"TestRule",
|
||||
nil,
|
||||
time.Minute,
|
||||
labels.FromStrings("severity", "critical"),
|
||||
nil, nil, "", true, nil,
|
||||
)
|
||||
|
||||
alert := &Alert{
|
||||
State: 0,
|
||||
Labels: nil,
|
||||
Annotations: nil,
|
||||
Value: 0,
|
||||
ActiveAt: time.Time{},
|
||||
FiredAt: time.Time{},
|
||||
ResolvedAt: time.Time{},
|
||||
LastSentAt: time.Time{},
|
||||
ValidUntil: time.Time{},
|
||||
}
|
||||
|
||||
series, err := rule.QueryforStateSeries(alert, querier)
|
||||
|
||||
require.Equal(t, tst.expectedSeries, series)
|
||||
require.Equal(t, tst.expectedError, err)
|
||||
}
|
||||
|
||||
for _, tst := range tests {
|
||||
testFunc(tst)
|
||||
}
|
||||
}
|
||||
|
|
118
rules/manager.go
118
rules/manager.go
|
@ -264,16 +264,23 @@ type Group struct {
|
|||
logger log.Logger
|
||||
|
||||
metrics *Metrics
|
||||
|
||||
ruleGroupPostProcessFunc RuleGroupPostProcessFunc
|
||||
}
|
||||
|
||||
// This function will be used before each rule group evaluation if not nil.
|
||||
// Use this function type if the rule group post processing is needed.
|
||||
type RuleGroupPostProcessFunc func(g *Group, lastEvalTimestamp time.Time, log log.Logger) error
|
||||
|
||||
type GroupOptions struct {
|
||||
Name, File string
|
||||
Interval time.Duration
|
||||
Limit int
|
||||
Rules []Rule
|
||||
ShouldRestore bool
|
||||
Opts *ManagerOptions
|
||||
done chan struct{}
|
||||
Name, File string
|
||||
Interval time.Duration
|
||||
Limit int
|
||||
Rules []Rule
|
||||
ShouldRestore bool
|
||||
Opts *ManagerOptions
|
||||
done chan struct{}
|
||||
RuleGroupPostProcessFunc RuleGroupPostProcessFunc
|
||||
}
|
||||
|
||||
// NewGroup makes a new Group with the given name, options, and rules.
|
||||
|
@ -295,19 +302,20 @@ func NewGroup(o GroupOptions) *Group {
|
|||
metrics.GroupInterval.WithLabelValues(key).Set(o.Interval.Seconds())
|
||||
|
||||
return &Group{
|
||||
name: o.Name,
|
||||
file: o.File,
|
||||
interval: o.Interval,
|
||||
limit: o.Limit,
|
||||
rules: o.Rules,
|
||||
shouldRestore: o.ShouldRestore,
|
||||
opts: o.Opts,
|
||||
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
|
||||
done: make(chan struct{}),
|
||||
managerDone: o.done,
|
||||
terminated: make(chan struct{}),
|
||||
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
|
||||
metrics: metrics,
|
||||
name: o.Name,
|
||||
file: o.File,
|
||||
interval: o.Interval,
|
||||
limit: o.Limit,
|
||||
rules: o.Rules,
|
||||
shouldRestore: o.ShouldRestore,
|
||||
opts: o.Opts,
|
||||
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
|
||||
done: make(chan struct{}),
|
||||
managerDone: o.done,
|
||||
terminated: make(chan struct{}),
|
||||
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
|
||||
metrics: metrics,
|
||||
ruleGroupPostProcessFunc: o.RuleGroupPostProcessFunc,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -320,6 +328,12 @@ func (g *Group) File() string { return g.file }
|
|||
// Rules returns the group's rules.
|
||||
func (g *Group) Rules() []Rule { return g.rules }
|
||||
|
||||
// Queryable returns the group's querable.
|
||||
func (g *Group) Queryable() storage.Queryable { return g.opts.Queryable }
|
||||
|
||||
// Context returns the group's context.
|
||||
func (g *Group) Context() context.Context { return g.opts.Context }
|
||||
|
||||
// Interval returns the group's interval.
|
||||
func (g *Group) Interval() time.Duration { return g.interval }
|
||||
|
||||
|
@ -423,12 +437,24 @@ func (g *Group) run(ctx context.Context) {
|
|||
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
|
||||
}
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
|
||||
|
||||
useRuleGroupPostProcessFunc(g, evalTimestamp.Add(-(missed+1)*g.interval))
|
||||
|
||||
iter()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func useRuleGroupPostProcessFunc(g *Group, lastEvalTimestamp time.Time) {
|
||||
if g.ruleGroupPostProcessFunc != nil {
|
||||
err := g.ruleGroupPostProcessFunc(g, lastEvalTimestamp, g.logger)
|
||||
if err != nil {
|
||||
level.Warn(g.logger).Log("msg", "ruleGroupPostProcessFunc failed", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Group) stop() {
|
||||
close(g.done)
|
||||
<-g.terminated
|
||||
|
@ -744,32 +770,10 @@ func (g *Group) RestoreForState(ts time.Time) {
|
|||
}
|
||||
|
||||
alertRule.ForEachActiveAlert(func(a *Alert) {
|
||||
smpl := alertRule.forStateSample(a, time.Now(), 0)
|
||||
var matchers []*labels.Matcher
|
||||
for _, l := range smpl.Metric {
|
||||
mt, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
matchers = append(matchers, mt)
|
||||
}
|
||||
|
||||
sset := q.Select(false, nil, matchers...)
|
||||
|
||||
seriesFound := false
|
||||
var s storage.Series
|
||||
for sset.Next() {
|
||||
// Query assures that smpl.Metric is included in sset.At().Labels(),
|
||||
// hence just checking the length would act like equality.
|
||||
// (This is faster than calling labels.Compare again as we already have some info).
|
||||
if len(sset.At().Labels()) == len(smpl.Metric) {
|
||||
s = sset.At()
|
||||
seriesFound = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err := sset.Err(); err != nil {
|
||||
s, err := alertRule.QueryforStateSeries(a, q)
|
||||
if err != nil {
|
||||
// Querier Warnings are ignored. We do not care unless we have an error.
|
||||
level.Error(g.logger).Log(
|
||||
"msg", "Failed to restore 'for' state",
|
||||
|
@ -780,7 +784,7 @@ func (g *Group) RestoreForState(ts time.Time) {
|
|||
return
|
||||
}
|
||||
|
||||
if !seriesFound {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -958,11 +962,12 @@ func (m *Manager) Stop() {
|
|||
|
||||
// Update the rule manager's state as the config requires. If
|
||||
// loading the new rules failed the old rule set is restored.
|
||||
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error {
|
||||
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, ruleGroupPostProcessFunc RuleGroupPostProcessFunc) error {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...)
|
||||
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, ruleGroupPostProcessFunc, files...)
|
||||
|
||||
if errs != nil {
|
||||
for _, e := range errs {
|
||||
level.Error(m.logger).Log("msg", "loading groups failed", "err", e)
|
||||
|
@ -1046,7 +1051,7 @@ func (FileLoader) Parse(query string) (parser.Expr, error) { return parser.Parse
|
|||
|
||||
// LoadGroups reads groups from a list of files.
|
||||
func (m *Manager) LoadGroups(
|
||||
interval time.Duration, externalLabels labels.Labels, externalURL string, filenames ...string,
|
||||
interval time.Duration, externalLabels labels.Labels, externalURL string, ruleGroupPostProcessFunc RuleGroupPostProcessFunc, filenames ...string,
|
||||
) (map[string]*Group, []error) {
|
||||
groups := make(map[string]*Group)
|
||||
|
||||
|
@ -1093,14 +1098,15 @@ func (m *Manager) LoadGroups(
|
|||
}
|
||||
|
||||
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
|
||||
Name: rg.Name,
|
||||
File: fn,
|
||||
Interval: itv,
|
||||
Limit: rg.Limit,
|
||||
Rules: rules,
|
||||
ShouldRestore: shouldRestore,
|
||||
Opts: m.opts,
|
||||
done: m.done,
|
||||
Name: rg.Name,
|
||||
File: fn,
|
||||
Interval: itv,
|
||||
Limit: rg.Limit,
|
||||
Rules: rules,
|
||||
ShouldRestore: shouldRestore,
|
||||
Opts: m.opts,
|
||||
done: m.done,
|
||||
RuleGroupPostProcessFunc: ruleGroupPostProcessFunc,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -727,7 +727,7 @@ func TestUpdate(t *testing.T) {
|
|||
ruleManager.start()
|
||||
defer ruleManager.Stop()
|
||||
|
||||
err := ruleManager.Update(10*time.Second, files, nil, "")
|
||||
err := ruleManager.Update(10*time.Second, files, nil, "", nil)
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, len(ruleManager.groups), 0, "expected non-empty rule groups")
|
||||
ogs := map[string]*Group{}
|
||||
|
@ -738,7 +738,7 @@ func TestUpdate(t *testing.T) {
|
|||
ogs[h] = g
|
||||
}
|
||||
|
||||
err = ruleManager.Update(10*time.Second, files, nil, "")
|
||||
err = ruleManager.Update(10*time.Second, files, nil, "", nil)
|
||||
require.NoError(t, err)
|
||||
for h, g := range ruleManager.groups {
|
||||
for _, actual := range g.seriesInPreviousEval {
|
||||
|
@ -757,7 +757,7 @@ func TestUpdate(t *testing.T) {
|
|||
defer os.Remove(tmpFile.Name())
|
||||
defer tmpFile.Close()
|
||||
|
||||
err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "")
|
||||
err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
for h, g := range ruleManager.groups {
|
||||
|
@ -835,7 +835,7 @@ func reloadAndValidate(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File,
|
|||
tmpFile.Seek(0, 0)
|
||||
_, err = tmpFile.Write(bs)
|
||||
require.NoError(t, err)
|
||||
err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "")
|
||||
err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "", nil)
|
||||
require.NoError(t, err)
|
||||
for h, g := range ruleManager.groups {
|
||||
if ogs[h] == g {
|
||||
|
@ -980,7 +980,7 @@ func TestMetricsUpdate(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, c := range cases {
|
||||
err := ruleManager.Update(time.Second, c.files, nil, "")
|
||||
err := ruleManager.Update(time.Second, c.files, nil, "", nil)
|
||||
require.NoError(t, err)
|
||||
time.Sleep(2 * time.Second)
|
||||
require.Equal(t, c.metrics, countMetrics(), "test %d: invalid count of metrics", i)
|
||||
|
@ -1054,7 +1054,7 @@ func TestGroupStalenessOnRemoval(t *testing.T) {
|
|||
|
||||
var totalStaleNaN int
|
||||
for i, c := range cases {
|
||||
err := ruleManager.Update(time.Second, c.files, nil, "")
|
||||
err := ruleManager.Update(time.Second, c.files, nil, "", nil)
|
||||
require.NoError(t, err)
|
||||
time.Sleep(3 * time.Second)
|
||||
totalStaleNaN += c.staleNaN
|
||||
|
@ -1096,11 +1096,11 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
err := ruleManager.Update(2*time.Second, files, nil, "")
|
||||
err := ruleManager.Update(2*time.Second, files, nil, "", nil)
|
||||
time.Sleep(4 * time.Second)
|
||||
require.NoError(t, err)
|
||||
start := time.Now()
|
||||
err = ruleManager.Update(3*time.Second, files[:0], nil, "")
|
||||
err = ruleManager.Update(3*time.Second, files[:0], nil, "", nil)
|
||||
require.NoError(t, err)
|
||||
ruleManager.Stop()
|
||||
stopped = true
|
||||
|
@ -1229,3 +1229,100 @@ func TestRuleHealthUpdates(t *testing.T) {
|
|||
require.EqualError(t, rules.LastError(), storage.ErrOutOfOrderSample.Error())
|
||||
require.Equal(t, HealthBad, rules.Health())
|
||||
}
|
||||
|
||||
func TestUpdateMissedEvalMetrics(t *testing.T) {
|
||||
suite, err := promql.NewTest(t, `
|
||||
load 5m
|
||||
http_requests{instance="0"} 75 85 50 0 0 25 0 0 40 0 120
|
||||
`)
|
||||
|
||||
require.NoError(t, err)
|
||||
defer suite.Close()
|
||||
|
||||
err = suite.Run()
|
||||
require.NoError(t, err)
|
||||
|
||||
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
|
||||
require.NoError(t, err)
|
||||
|
||||
testValue := 1
|
||||
|
||||
overrideFunc := func(g *Group, lastEvalTimestamp time.Time, log log.Logger) error {
|
||||
testValue = 2
|
||||
return nil
|
||||
}
|
||||
|
||||
type testInput struct {
|
||||
overrideFunc func(g *Group, lastEvalTimestamp time.Time, logger log.Logger) error
|
||||
expectedValue int
|
||||
}
|
||||
|
||||
tests := []testInput{
|
||||
// testValue should still have value of 1 since overrideFunc is nil.
|
||||
{
|
||||
overrideFunc: nil,
|
||||
expectedValue: 1,
|
||||
},
|
||||
// testValue should be incremented to 2 since overrideFunc is called.
|
||||
{
|
||||
overrideFunc: overrideFunc,
|
||||
expectedValue: 2,
|
||||
},
|
||||
}
|
||||
|
||||
testFunc := func(tst testInput) {
|
||||
opts := &ManagerOptions{
|
||||
QueryFunc: EngineQueryFunc(suite.QueryEngine(), suite.Storage()),
|
||||
Appendable: suite.Storage(),
|
||||
Queryable: suite.Storage(),
|
||||
Context: context.Background(),
|
||||
Logger: log.NewNopLogger(),
|
||||
NotifyFunc: func(ctx context.Context, expr string, alerts ...*Alert) {},
|
||||
OutageTolerance: 30 * time.Minute,
|
||||
ForGracePeriod: 10 * time.Minute,
|
||||
}
|
||||
|
||||
activeAlert := &Alert{
|
||||
State: StateFiring,
|
||||
ActiveAt: time.Now(),
|
||||
}
|
||||
|
||||
m := map[uint64]*Alert{}
|
||||
m[1] = activeAlert
|
||||
|
||||
rule := &AlertingRule{
|
||||
name: "HTTPRequestRateLow",
|
||||
vector: expr,
|
||||
holdDuration: 5 * time.Minute,
|
||||
labels: labels.FromStrings("severity", "critical"),
|
||||
annotations: nil,
|
||||
externalLabels: nil,
|
||||
externalURL: "",
|
||||
health: HealthUnknown,
|
||||
active: m,
|
||||
logger: nil,
|
||||
restored: true,
|
||||
}
|
||||
|
||||
group := NewGroup(GroupOptions{
|
||||
Name: "default",
|
||||
Interval: time.Second,
|
||||
Rules: []Rule{rule},
|
||||
ShouldRestore: true,
|
||||
Opts: opts,
|
||||
RuleGroupPostProcessFunc: tst.overrideFunc,
|
||||
})
|
||||
|
||||
go func() {
|
||||
group.run(opts.Context)
|
||||
}()
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
group.stop()
|
||||
require.Equal(t, tst.expectedValue, testValue)
|
||||
}
|
||||
|
||||
for _, tst := range tests {
|
||||
testFunc(tst)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,6 +82,15 @@ type Queryable interface {
|
|||
Querier(ctx context.Context, mint, maxt int64) (Querier, error)
|
||||
}
|
||||
|
||||
// A MockQueryable is used for testing purposes so that a mock Querier can be used.
|
||||
type MockQueryable struct {
|
||||
MockQuerier Querier
|
||||
}
|
||||
|
||||
func (q *MockQueryable) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
||||
return q.MockQuerier, nil
|
||||
}
|
||||
|
||||
// Querier provides querying access over time series data of a fixed time range.
|
||||
type Querier interface {
|
||||
LabelQuerier
|
||||
|
@ -92,6 +101,27 @@ type Querier interface {
|
|||
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet
|
||||
}
|
||||
|
||||
// MockQuerier is used for test purposes to mock the selected series that is returned.
|
||||
type MockQuerier struct {
|
||||
SelectMockFunction func(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet
|
||||
}
|
||||
|
||||
func (q *MockQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (q *MockQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (q *MockQuerier) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *MockQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet {
|
||||
return q.SelectMockFunction(sortSeries, hints, matchers...)
|
||||
}
|
||||
|
||||
// A ChunkQueryable handles queries against a storage.
|
||||
// Use it when you need to have access to samples in encoded format.
|
||||
type ChunkQueryable interface {
|
||||
|
@ -240,6 +270,20 @@ func EmptySeriesSet() SeriesSet {
|
|||
return emptySeriesSet
|
||||
}
|
||||
|
||||
type testSeriesSet struct {
|
||||
series Series
|
||||
}
|
||||
|
||||
func (s testSeriesSet) Next() bool { return true }
|
||||
func (s testSeriesSet) At() Series { return s.series }
|
||||
func (s testSeriesSet) Err() error { return nil }
|
||||
func (s testSeriesSet) Warnings() Warnings { return nil }
|
||||
|
||||
// TestSeriesSet returns a mock series set
|
||||
func TestSeriesSet(series Series) SeriesSet {
|
||||
return testSeriesSet{series: series}
|
||||
}
|
||||
|
||||
type errSeriesSet struct {
|
||||
err error
|
||||
}
|
||||
|
@ -281,6 +325,29 @@ type Series interface {
|
|||
SampleIterable
|
||||
}
|
||||
|
||||
type mockSeries struct {
|
||||
timestamps []int64
|
||||
values []float64
|
||||
labelSet []string
|
||||
}
|
||||
|
||||
func (s mockSeries) Labels() labels.Labels {
|
||||
return labels.FromStrings(s.labelSet...)
|
||||
}
|
||||
|
||||
func (s mockSeries) Iterator() chunkenc.Iterator {
|
||||
return chunkenc.MockSeriesIterator(s.timestamps, s.values)
|
||||
}
|
||||
|
||||
// MockSeries returns a series with custom timestamps, values and labelSet.
|
||||
func MockSeries(timestamps []int64, values []float64, labelSet []string) Series {
|
||||
return mockSeries{
|
||||
timestamps: timestamps,
|
||||
values: values,
|
||||
labelSet: labelSet,
|
||||
}
|
||||
}
|
||||
|
||||
// ChunkSeriesSet contains a set of chunked series.
|
||||
type ChunkSeriesSet interface {
|
||||
Next() bool
|
||||
|
|
|
@ -89,6 +89,36 @@ type Iterator interface {
|
|||
Err() error
|
||||
}
|
||||
|
||||
// MockSeriesIterator returns an iterator for a mock series with custom timeStamps and values.
|
||||
func MockSeriesIterator(timestamps []int64, values []float64) Iterator {
|
||||
return &mockSeriesIterator{
|
||||
timeStamps: timestamps,
|
||||
values: values,
|
||||
currIndex: 0,
|
||||
}
|
||||
}
|
||||
|
||||
type mockSeriesIterator struct {
|
||||
timeStamps []int64
|
||||
values []float64
|
||||
currIndex int
|
||||
}
|
||||
|
||||
func (it *mockSeriesIterator) Seek(int64) bool { return false }
|
||||
func (it *mockSeriesIterator) At() (int64, float64) {
|
||||
return it.timeStamps[it.currIndex], it.values[it.currIndex]
|
||||
}
|
||||
|
||||
func (it *mockSeriesIterator) Next() bool {
|
||||
if it.currIndex < len(it.timeStamps)-1 {
|
||||
it.currIndex++
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
func (it *mockSeriesIterator) Err() error { return nil }
|
||||
|
||||
// NewNopIterator returns a new chunk iterator that does not hold any data.
|
||||
func NewNopIterator() Iterator {
|
||||
return nopIterator{}
|
||||
|
|
Loading…
Reference in a new issue