From 9dc763cc0341208ee47f5fd41869787f56981d67 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Fri, 1 Jun 2018 16:23:07 +0200 Subject: [PATCH] Run rule evaluation with timestamps precisely evaluation_interval apart (#4201) * Run rule evaluation with timestamps precisely evaluation_interval apart from one another. Signed-off-by: Alin Sinpalean --- rules/manager.go | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/rules/manager.go b/rules/manager.go index 35ce82ce80..f48ba75fa2 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -192,8 +192,9 @@ func (g *Group) run(ctx context.Context) { defer close(g.terminated) // Wait an initial amount to have consistently slotted intervals. + evalTimestamp := g.evalTimestamp().Add(g.interval) select { - case <-time.After(g.offset()): + case <-time.After(time.Until(evalTimestamp)): case <-g.done: return } @@ -202,17 +203,20 @@ func (g *Group) run(ctx context.Context) { iterationsScheduled.Inc() start := time.Now() - g.Eval(ctx, start) + g.Eval(ctx, evalTimestamp) + timeSinceStart := time.Since(start) - iterationDuration.Observe(time.Since(start).Seconds()) - g.SetEvaluationTime(time.Since(start)) + iterationDuration.Observe(timeSinceStart.Seconds()) + g.SetEvaluationTime(timeSinceStart) } - lastTriggered := time.Now() - iter() + // The assumption here is that since the ticker was started after having + // waited for `evalTimestamp` to pass, the ticks will trigger soon + // after each `evalTimestamp + N * g.interval` occurrence. tick := time.NewTicker(g.interval) defer tick.Stop() + iter() for { select { case <-g.done: @@ -222,12 +226,12 @@ func (g *Group) run(ctx context.Context) { case <-g.done: return case <-tick.C: - missed := (time.Since(lastTriggered).Nanoseconds() / g.interval.Nanoseconds()) - 1 + missed := (time.Since(evalTimestamp) / g.interval) - 1 if missed > 0 { iterationsMissed.Add(float64(missed)) iterationsScheduled.Add(float64(missed)) } - lastTriggered = time.Now() + evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) iter() } } @@ -261,20 +265,16 @@ func (g *Group) SetEvaluationTime(dur time.Duration) { g.evaluationTime = dur } -// offset returns until the next consistently slotted evaluation interval. -func (g *Group) offset() time.Duration { - now := time.Now().UnixNano() - +// evalTimestamp returns the immediately preceding consistently slotted evaluation time. +func (g *Group) evalTimestamp() time.Time { var ( - base = now - (now % int64(g.interval)) - offset = g.hash() % uint64(g.interval) - next = base + int64(offset) + offset = int64(g.hash() % uint64(g.interval)) + now = time.Now().UnixNano() + adjNow = now - offset + base = adjNow - (adjNow % int64(g.interval)) ) - if next < now { - next += int64(g.interval) - } - return time.Duration(next - now) + return time.Unix(0, base+offset) } // copyState copies the alerting rule and staleness related state from the given group.