Simplify duration check for watcher WAL replay.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2019-11-26 16:53:11 -08:00
parent 2d3ce3916c
commit 6a24eee340
3 changed files with 26 additions and 17 deletions

View file

@ -35,6 +35,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
@ -531,7 +532,7 @@ func BenchmarkStartup(b *testing.B) {
m := NewQueueManager(nil, logger, dir, m := NewQueueManager(nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration), newEWMARate(ewmaWeight, shardUpdateDuration),
config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) config.DefaultQueueConfig, nil, nil, c, 1*time.Minute)
m.watcher.StartTime = math.MaxInt64 m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2] m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run() err := m.watcher.Run()
testutil.Ok(b, err) testutil.Ok(b, err)

View file

@ -68,8 +68,9 @@ type Watcher struct {
metrics *WatcherMetrics metrics *WatcherMetrics
readerMetrics *liveReaderMetrics readerMetrics *liveReaderMetrics
StartTime int64 startTime time.Time
lastSegment int startTimestamp int64 // the start time as a Prometheus timestamp
sendSamples bool
recordsReadMetric *prometheus.CounterVec recordsReadMetric *prometheus.CounterVec
recordDecodeFailsMetric prometheus.Counter recordDecodeFailsMetric prometheus.Counter
@ -192,7 +193,7 @@ func (w *Watcher) loop() {
// We may encounter failures processing the WAL; we should wait and retry. // We may encounter failures processing the WAL; we should wait and retry.
for !isClosed(w.quit) { for !isClosed(w.quit) {
w.StartTime = timestamp.FromTime(time.Now()) w.SetStartTime(time.Now())
if err := w.Run(); err != nil { if err := w.Run(); err != nil {
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
} }
@ -212,7 +213,10 @@ func (w *Watcher) Run() error {
if err != nil { if err != nil {
return errors.Wrap(err, "wal.Segments") return errors.Wrap(err, "wal.Segments")
} }
w.lastSegment = lastSegment
// We want to ensure this is false across iterations since
// Run will be called again if there was a failure to read the WAL.
w.sendSamples = false
level.Info(w.logger).Log("msg", "replaying WAL", "queue", w.name) level.Info(w.logger).Log("msg", "replaying WAL", "queue", w.name)
@ -241,7 +245,7 @@ func (w *Watcher) Run() error {
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
if err := w.watch(currentSegment, currentSegment >= w.lastSegment); err != nil { if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil {
return err return err
} }
@ -455,13 +459,11 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
var ( var (
dec record.Decoder dec record.Decoder
series []record.RefSeries series []record.RefSeries
samples []record.RefSample samples []record.RefSample
send []record.RefSample send []record.RefSample
sentSamples bool
) )
for r.Next() && !isClosed(w.quit) { for r.Next() && !isClosed(w.quit) {
rec := r.Record() rec := r.Record()
w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc() w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc()
@ -487,10 +489,11 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
return err return err
} }
for _, s := range samples { for _, s := range samples {
if s.T > w.StartTime { if s.T > w.startTimestamp {
if !sentSamples && segmentNum == w.lastSegment { if !w.sendSamples {
sentSamples = true w.sendSamples = true
level.Info(w.logger).Log("msg", "done replaying WAL") duration := time.Since(w.startTime)
level.Info(w.logger).Log("msg", "done replaying WAL", "duration", duration)
} }
send = append(send, s) send = append(send, s)
} }
@ -514,6 +517,11 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
return r.Err() return r.Err()
} }
func (w *Watcher) SetStartTime(t time.Time) {
w.startTime = t
w.startTimestamp = timestamp.FromTime(t)
}
func recordType(rt record.Type) string { func recordType(rt record.Type) string {
switch rt { switch rt {
case record.Invalid: case record.Invalid:

View file

@ -139,7 +139,7 @@ func TestTailSamples(t *testing.T) {
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir)
watcher.StartTime = now.UnixNano() watcher.SetStartTime(now)
// Set the Watcher's metrics so they're not nil pointers. // Set the Watcher's metrics so they're not nil pointers.
watcher.setMetrics() watcher.setMetrics()