diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index b3e93dcb46..b8830b82ae 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -233,53 +233,38 @@ func (w *WALWatcher) runWatcher() { } w.currentSegment = first - w.currentSegmentMetric.Set(float64(w.currentSegment)) - segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, w.currentSegment)) - // TODO: callum, is this error really fatal? - if err != nil { - level.Error(w.logger).Log("err", err) - return - } - reader := wal.NewLiveReader(segment) tail := false for { - // If we've replayed the existing WAL, start tailing. if w.currentSegment == last { tail = true } - if tail { - level.Info(w.logger).Log("msg", "watching segment", "segment", w.currentSegment) - } else { - level.Info(w.logger).Log("msg", "replaying segment", "segment", w.currentSegment) - } + + w.currentSegmentMetric.Set(float64(w.currentSegment)) + level.Info(w.logger).Log("msg", "process segment", "segment", w.currentSegment, "tail", tail) // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - err := w.watch(nw, reader, tail) - segment.Close() - if err != nil { + if err := w.watch(nw, w.currentSegment, tail); err != nil { level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err) return } w.currentSegment++ - w.currentSegmentMetric.Set(float64(w.currentSegment)) - - segment, err = wal.OpenReadSegment(wal.SegmentName(w.walDir, w.currentSegment)) - // TODO: callum, is this error really fatal? - if err != nil { - level.Error(w.logger).Log("err", err) - return - } - reader = wal.NewLiveReader(segment) } } // Use tail true to indicate that the reader is currently on a segment that is // actively being written to. If false, assume it's a full segment and we're // replaying it on start to cache the series records. -func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader, tail bool) error { +func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { + segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, segmentNum)) + if err != nil { + return err + } + defer segment.Close() + + reader := wal.NewLiveReader(segment) readTicker := time.NewTicker(readPeriod) defer readTicker.Stop() @@ -289,6 +274,7 @@ func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader, tail bool) error segmentTicker := time.NewTicker(segmentCheckPeriod) defer segmentTicker.Stop() + // If we're replaying the segment we need to know the size of the file to know // when to return from watch and move on to the next segment. size := int64(math.MaxInt64)