Update wal LiveReader to ensure EOF is correctly propagated.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
Tom Wilkie 2019-02-14 10:02:54 +00:00 committed by Tom Wilkie
parent d6258aea8f
commit adf5307470
2 changed files with 10 additions and 7 deletions

1
go.sum
View file

@ -253,7 +253,6 @@ github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371 h1:SWV2fHctRpRrp49
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04 h1:y0cMJ0qjii33BnD6tMGcF/+gHYsoKQ6tbwQpy233OII= github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04 h1:y0cMJ0qjii33BnD6tMGcF/+gHYsoKQ6tbwQpy233OII=
github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/simonpasquier/klog-gokit v0.1.0 h1:l3GGzgwlUF4vC1ApCOEsMsV+6nJPM01VoVCUCZgOIUw=
github.com/simonpasquier/klog-gokit v0.1.0/go.mod h1:4lorAA0CyDox4KO34BrvNAJk8J2Ma/M9Q2BDkR38vSI= github.com/simonpasquier/klog-gokit v0.1.0/go.mod h1:4lorAA0CyDox4KO34BrvNAJk8J2Ma/M9Q2BDkR38vSI=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=

View file

@ -147,7 +147,7 @@ func (w *WALWatcher) loop() {
defer close(w.done) defer close(w.done)
// We may encourter failures processing the WAL; we should wait and retry. // We may encourter failures processing the WAL; we should wait and retry.
for { for !isClosed(w.quit) {
if err := w.run(); err != nil { if err := w.run(); err != nil {
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
} }
@ -205,6 +205,7 @@ func (w *WALWatcher) run() error {
} }
} }
// findSegmentForIndex finds the first segment greater than or equal to index.
func (w *WALWatcher) findSegmentForIndex(index int) (int, error) { func (w *WALWatcher) findSegmentForIndex(index int) (int, error) {
files, err := fileutil.ReadDir(w.walDir) files, err := fileutil.ReadDir(w.walDir)
if err != nil { if err != nil {
@ -305,14 +306,17 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
return nil return nil
case <-readTicker.C: case <-readTicker.C:
if err := w.readSegment(reader, segmentNum); err != nil && err != io.EOF { err := w.readSegment(reader, segmentNum)
level.Error(w.logger).Log("err", err)
return err // If we're reading to completion, stop when we hit an EOF.
} if err == io.EOF && !tail {
if reader.TotalRead() >= size && !tail {
level.Info(w.logger).Log("msg", "done replaying segment", "segment", segmentNum, "size", size, "read", reader.TotalRead()) level.Info(w.logger).Log("msg", "done replaying segment", "segment", segmentNum, "size", size, "read", reader.TotalRead())
return nil return nil
} }
if err != nil && err != io.EOF {
return err
}
} }
} }
} }