Track staleness per rule rather than per group.

This commit is contained in:
Brian Brazil 2017-05-19 13:42:07 +01:00
parent 0451d6d31b
commit 9bc68db7e6

View file

@ -129,8 +129,8 @@ type Group struct {
name string
interval time.Duration
rules []Rule
seriesInPreviousEval []map[string]labels.Labels // One per Rule.
opts *ManagerOptions
seriesInPreviousEval map[string]labels.Labels
done chan struct{}
terminated chan struct{}
@ -143,7 +143,7 @@ func NewGroup(name string, interval time.Duration, rules []Rule, opts *ManagerOp
interval: interval,
rules: rules,
opts: opts,
seriesInPreviousEval: map[string]labels.Labels{},
seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)),
done: make(chan struct{}),
terminated: make(chan struct{}),
}
@ -259,17 +259,15 @@ func typeForRule(r Rule) ruleType {
// rule dependency.
func (g *Group) Eval(ts time.Time) {
var (
wg sync.WaitGroup
mu sync.Mutex
seriesReturned = make(map[string]labels.Labels, len(g.seriesInPreviousEval))
wg sync.WaitGroup
)
for _, rule := range g.rules {
for i, rule := range g.rules {
rtyp := string(typeForRule(rule))
wg.Add(1)
// BUG(julius): Look at fixing thundering herd.
go func(rule Rule) {
go func(i int, rule Rule) {
defer wg.Done()
defer func(t time.Time) {
@ -303,6 +301,7 @@ func (g *Group) Eval(ts time.Time) {
return
}
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
for _, s := range vector {
if _, err := app.Add(s.Metric, s.T, s.V); err != nil {
switch err {
@ -316,9 +315,7 @@ func (g *Group) Eval(ts time.Time) {
log.With("sample", s).With("err", err).Warn("Rule evaluation result discarded")
}
} else {
mu.Lock()
seriesReturned[s.Metric.String()] = s.Metric
mu.Unlock()
}
}
if numOutOfOrder > 0 {
@ -327,41 +324,29 @@ func (g *Group) Eval(ts time.Time) {
if numDuplicates > 0 {
log.With("numDropped", numDuplicates).Warn("Error on ingesting results from rule evaluation with different value but same timestamp")
}
if err := app.Commit(); err != nil {
log.With("err", err).Warn("rule sample appending failed")
}
}(rule)
}
wg.Wait()
// TODO(bbrazil): This should apply per-rule.
app, err := g.opts.Appendable.Appender()
if err != nil {
log.With("err", err).Warn("creating appender failed")
return
}
for metric, lset := range g.seriesInPreviousEval {
if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different group.
continue
default:
log.With("sample", metric).With("err", err).Warn("adding stale sample failed")
if err := app.Rollback(); err != nil {
log.With("err", err).Warn("rule stale sample rollback failed")
for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
log.With("sample", metric).With("err", err).Warn("adding stale sample failed")
}
}
}
}
if err := app.Commit(); err != nil {
log.With("err", err).Warn("rule sample appending failed")
} else {
g.seriesInPreviousEval[i] = seriesReturned
}
}(i, rule)
}
if err := app.Commit(); err != nil {
log.With("err", err).Warn("rule stale sample appending failed")
}
g.seriesInPreviousEval = seriesReturned
wg.Wait()
}
// sendAlerts sends alert notifications for the given rule.