If reading the WAL fails, try again. Also, read from the segment containing the index for the last checkpoint, not the first segment.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
Tom Wilkie 2019-02-13 14:47:35 +00:00 committed by Tom Wilkie
parent d6f911b511
commit bdc6b764b0
2 changed files with 68 additions and 39 deletions

View file

@ -60,7 +60,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
}
shardUpdateDuration := 10 * time.Second
s := &Storage{
logger: logging.RateLimit(logging.Dedupe(l, 1*time.Minute), 1),
logger: logging.Dedupe(l, 1*time.Minute),
localStartTimeCallback: stCallback,
flushDeadline: flushDeadline,
walDir: walDir,

View file

@ -19,6 +19,7 @@ import (
"math"
"os"
"path"
"sort"
"strconv"
"strings"
"time"
@ -182,7 +183,7 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string
func (w *WALWatcher) Start() {
level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name)
go w.runWatcher()
go w.loop()
}
func (w *WALWatcher) Stop() {
@ -190,71 +191,95 @@ func (w *WALWatcher) Stop() {
close(w.quit)
}
func (w *WALWatcher) runWatcher() {
// The WAL dir may not exist when Prometheus first starts up.
func (w *WALWatcher) loop() {
// We may encourter failures processing the WAL; we should wait and retry.
for {
if _, err := os.Stat(w.walDir); os.IsNotExist(err) {
time.Sleep(time.Second)
} else {
break
if err := w.run(); err != nil {
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
}
select {
case <-w.quit:
return
case <-time.After(5 * time.Second):
}
}
}
func (w *WALWatcher) run() error {
nw, err := wal.New(nil, nil, w.walDir)
if err != nil {
level.Error(w.logger).Log("err", err)
return
}
first, last, err := nw.Segments()
if err != nil {
level.Error(w.logger).Log("err", err)
return
}
if last == -1 {
level.Error(w.logger).Log("err", err)
return
return errors.Wrap(err, "wal.New")
}
// Backfill from the checkpoint first if it exists.
dir, _, err := tsdb.LastCheckpoint(w.walDir)
var nextIndex int
w.lastCheckpoint, nextIndex, err = tsdb.LastCheckpoint(w.walDir)
if err != nil && err != tsdb.ErrNotFound {
level.Error(w.logger).Log("msg", "error looking for existing checkpoint, some samples may be dropped", "err", errors.Wrap(err, "find last checkpoint"))
return err
}
level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", dir)
level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", w.lastCheckpoint, "startFrom", nextIndex)
if err == nil {
w.lastCheckpoint = dir
err = w.readCheckpoint(dir)
if err != nil {
level.Error(w.logger).Log("msg", "error reading existing checkpoint, some samples may be dropped", "err", err)
if err = w.readCheckpoint(w.lastCheckpoint); err != nil {
return err
}
}
w.currentSegment = first
tail := false
w.currentSegment, err = w.findSegmentForIndex(nextIndex)
if err != nil {
return err
}
level.Debug(w.logger).Log("msg", "starting from", "currentSegment", w.currentSegment)
for {
if w.currentSegment == last {
tail = true
}
w.currentSegmentMetric.Set(float64(w.currentSegment))
level.Info(w.logger).Log("msg", "process segment", "segment", w.currentSegment, "tail", tail)
level.Info(w.logger).Log("msg", "process segment", "segment", w.currentSegment)
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
if err := w.watch(nw, w.currentSegment, tail); err != nil {
if err := w.watch(nw, w.currentSegment, true); err != nil {
level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err)
return
return err
}
w.currentSegment++
}
}
// Use tail true to indicate that the reader is currently on a segment that is
func (w *WALWatcher) findSegmentForIndex(index int) (int, error) {
files, err := fileutil.ReadDir(w.walDir)
if err != nil {
return -1, err
}
var refs []int
var last int
for _, fn := range files {
k, err := strconv.Atoi(fn)
if err != nil {
continue
}
if len(refs) > 0 && k > last+1 {
return -1, errors.New("segments are not sequential")
}
refs = append(refs, k)
last = k
}
sort.Sort(sort.IntSlice(refs))
for _, r := range refs {
if r >= index {
return r, nil
}
}
return -1, errors.New("failed to find segment for index")
}
// Use tail true to indicate thatreader is currently on a segment that is
// actively being written to. If false, assume it's a full segment and we're
// replaying it on start to cache the series records.
func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
@ -444,11 +469,15 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
// w.readSeriesRecords(wal.NewLiveReader(sr), i, size)
r := wal.NewLiveReader(sr)
w.readSegment(r)
if err := w.readSegment(r); err != nil {
return errors.Wrap(err, "readSegment")
}
if r.TotalRead() != size {
level.Warn(w.logger).Log("msg", "may not have read all data from checkpoint")
}
level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir)
return nil
}