diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e52780925b..4e4e189756 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/util/testutil" @@ -531,7 +532,7 @@ func BenchmarkStartup(b *testing.B) { m := NewQueueManager(nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) - m.watcher.StartTime = math.MaxInt64 + m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() testutil.Ok(b, err) diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index 75403cc4b7..f92386f0de 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -68,8 +68,9 @@ type Watcher struct { metrics *WatcherMetrics readerMetrics *liveReaderMetrics - StartTime int64 - lastSegment int + startTime time.Time + startTimestamp int64 // the start time as a Prometheus timestamp + sendSamples bool recordsReadMetric *prometheus.CounterVec recordDecodeFailsMetric prometheus.Counter @@ -192,7 +193,7 @@ func (w *Watcher) loop() { // We may encounter failures processing the WAL; we should wait and retry. for !isClosed(w.quit) { - w.StartTime = timestamp.FromTime(time.Now()) + w.SetStartTime(time.Now()) if err := w.Run(); err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } @@ -212,7 +213,10 @@ func (w *Watcher) Run() error { if err != nil { return errors.Wrap(err, "wal.Segments") } - w.lastSegment = lastSegment + + // We want to ensure this is false across iterations since + // Run will be called again if there was a failure to read the WAL. + w.sendSamples = false level.Info(w.logger).Log("msg", "replaying WAL", "queue", w.name) @@ -241,7 +245,7 @@ func (w *Watcher) Run() error { // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - if err := w.watch(currentSegment, currentSegment >= w.lastSegment); err != nil { + if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil { return err } @@ -455,13 +459,11 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { var ( - dec record.Decoder - series []record.RefSeries - samples []record.RefSample - send []record.RefSample - sentSamples bool + dec record.Decoder + series []record.RefSeries + samples []record.RefSample + send []record.RefSample ) - for r.Next() && !isClosed(w.quit) { rec := r.Record() w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc() @@ -487,10 +489,11 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { return err } for _, s := range samples { - if s.T > w.StartTime { - if !sentSamples && segmentNum == w.lastSegment { - sentSamples = true - level.Info(w.logger).Log("msg", "done replaying WAL") + if s.T > w.startTimestamp { + if !w.sendSamples { + w.sendSamples = true + duration := time.Since(w.startTime) + level.Info(w.logger).Log("msg", "done replaying WAL", "duration", duration) } send = append(send, s) } @@ -514,6 +517,11 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { return r.Err() } +func (w *Watcher) SetStartTime(t time.Time) { + w.startTime = t + w.startTimestamp = timestamp.FromTime(t) +} + func recordType(rt record.Type) string { switch rt { case record.Invalid: diff --git a/tsdb/wal/watcher_test.go b/tsdb/wal/watcher_test.go index 9cd9286a24..8378e997d2 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -139,7 +139,7 @@ func TestTailSamples(t *testing.T) { wt := newWriteToMock() watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) - watcher.StartTime = now.UnixNano() + watcher.SetStartTime(now) // Set the Watcher's metrics so they're not nil pointers. watcher.setMetrics()