From adf5307470391b47d09c7854205e246baff88a38 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 14 Feb 2019 10:02:54 +0000 Subject: [PATCH] Update wal LiveReader to ensure EOF is correctly propagated. Signed-off-by: Tom Wilkie --- go.sum | 1 - storage/remote/wal_watcher.go | 16 ++++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/go.sum b/go.sum index 0a2d3c186f..06dbd4509b 100644 --- a/go.sum +++ b/go.sum @@ -253,7 +253,6 @@ github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371 h1:SWV2fHctRpRrp49 github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04 h1:y0cMJ0qjii33BnD6tMGcF/+gHYsoKQ6tbwQpy233OII= github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= -github.com/simonpasquier/klog-gokit v0.1.0 h1:l3GGzgwlUF4vC1ApCOEsMsV+6nJPM01VoVCUCZgOIUw= github.com/simonpasquier/klog-gokit v0.1.0/go.mod h1:4lorAA0CyDox4KO34BrvNAJk8J2Ma/M9Q2BDkR38vSI= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index b25c4efcf2..2a659b0c32 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -147,7 +147,7 @@ func (w *WALWatcher) loop() { defer close(w.done) // We may encourter failures processing the WAL; we should wait and retry. - for { + for !isClosed(w.quit) { if err := w.run(); err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } @@ -205,6 +205,7 @@ func (w *WALWatcher) run() error { } } +// findSegmentForIndex finds the first segment greater than or equal to index. func (w *WALWatcher) findSegmentForIndex(index int) (int, error) { files, err := fileutil.ReadDir(w.walDir) if err != nil { @@ -305,14 +306,17 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { return nil case <-readTicker.C: - if err := w.readSegment(reader, segmentNum); err != nil && err != io.EOF { - level.Error(w.logger).Log("err", err) - return err - } - if reader.TotalRead() >= size && !tail { + err := w.readSegment(reader, segmentNum) + + // If we're reading to completion, stop when we hit an EOF. + if err == io.EOF && !tail { level.Info(w.logger).Log("msg", "done replaying segment", "segment", segmentNum, "size", size, "read", reader.TotalRead()) return nil } + + if err != nil && err != io.EOF { + return err + } } } }