diff --git a/go.sum b/go.sum index ab98fd23c..d926f96f0 100644 --- a/go.sum +++ b/go.sum @@ -253,6 +253,7 @@ github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371 h1:SWV2fHctRpRrp49 github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04 h1:y0cMJ0qjii33BnD6tMGcF/+gHYsoKQ6tbwQpy233OII= github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= +github.com/simonpasquier/klog-gokit v0.1.0 h1:l3GGzgwlUF4vC1ApCOEsMsV+6nJPM01VoVCUCZgOIUw= github.com/simonpasquier/klog-gokit v0.1.0/go.mod h1:4lorAA0CyDox4KO34BrvNAJk8J2Ma/M9Q2BDkR38vSI= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index bc295cb93..1fdb33e16 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -216,7 +216,7 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, highestTimestampIn *int64, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*pkgrelabel.Config, client StorageClient, flushDeadline time.Duration, startTime int64) *QueueManager { +func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, highestTimestampIn *int64, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*pkgrelabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { if logger == nil { logger = log.NewNopLogger() } else { @@ -250,7 +250,7 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high t.highestSentTimestampMetric = queueHighestSentTimestamp.WithLabelValues(t.queueName) t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(t.queueName) t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(t.queueName) - t.watcher = NewWALWatcher(logger, client.Name(), t, walDir, startTime) + t.watcher = NewWALWatcher(logger, client.Name(), t, walDir) t.shards = t.newShards() numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e34ae048b..ea624d5ba 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -56,7 +56,7 @@ func TestSampleDelivery(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline, 0) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline) m.seriesLabels = refSeriesToLabelsProto(series) // These should be received by the client. @@ -85,7 +85,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline, 0) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline) m.seriesLabels = refSeriesToLabelsProto(series) m.Start() defer m.Stop() @@ -126,7 +126,7 @@ func TestSampleDeliveryOrder(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, 0) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.seriesLabels = refSeriesToLabelsProto(series) m.Start() @@ -146,7 +146,7 @@ func TestShutdown(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, deadline, 0) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, deadline) samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend) m.seriesLabels = refSeriesToLabelsProto(series) m.Start() @@ -183,7 +183,7 @@ func TestSeriesReset(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, deadline, 0) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, deadline) for i := 0; i < numSegments; i++ { series := []tsdb.RefSeries{} for j := 0; j < numSeries; j++ { @@ -213,7 +213,7 @@ func TestReshard(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline, 0) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline) m.seriesLabels = refSeriesToLabelsProto(series) m.Start() diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 3c9d834fd..71b5c5465 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -25,7 +25,6 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/logging" - "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/storage" ) @@ -98,9 +97,6 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { if err != nil { return err } - // Convert to int64 for comparison with timestamps from samples - // we will eventually read from the WAL on startup. - startTime := timestamp.FromTime(time.Now()) newQueues = append(newQueues, NewQueueManager( s.logger, s.walDir, @@ -111,7 +107,6 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { rwConf.WriteRelabelConfigs, c, s.flushDeadline, - startTime, )) } diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 2a659b0c3..399338045 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -29,6 +29,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/wal" @@ -111,18 +112,17 @@ type WALWatcher struct { } // NewWALWatcher creates a new WAL watcher for a given WriteTo. -func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string, startTime int64) *WALWatcher { +func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string) *WALWatcher { if logger == nil { logger = log.NewNopLogger() } return &WALWatcher{ - logger: logger, - writer: writer, - walDir: path.Join(walDir, "wal"), - startTime: startTime, - name: name, - quit: make(chan struct{}), - done: make(chan struct{}), + logger: logger, + writer: writer, + walDir: path.Join(walDir, "wal"), + name: name, + quit: make(chan struct{}), + done: make(chan struct{}), recordsReadMetric: watcherRecordsRead.MustCurryWith(prometheus.Labels{queue: name}), recordDecodeFailsMetric: watcherRecordDecodeFails.WithLabelValues(name), @@ -148,6 +148,7 @@ func (w *WALWatcher) loop() { // We may encourter failures processing the WAL; we should wait and retry. for !isClosed(w.quit) { + w.startTime = timestamp.FromTime(time.Now()) if err := w.run(); err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } @@ -177,10 +178,9 @@ func (w *WALWatcher) run() error { return err } - level.Info(w.logger).Log("msg", "reading checkpoint", "dir", lastCheckpoint, "startFrom", nextIndex) if err == nil { if err = w.readCheckpoint(lastCheckpoint); err != nil { - return err + return errors.Wrap(err, "readCheckpoint") } } @@ -189,20 +189,21 @@ func (w *WALWatcher) run() error { return err } - level.Info(w.logger).Log("msg", "starting from", "currentSegment", currentSegment, "last", last) - for { + level.Info(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "startFrom", nextIndex, "currentSegment", currentSegment, "last", last) + for !isClosed(w.quit) { w.currentSegmentMetric.Set(float64(currentSegment)) level.Info(w.logger).Log("msg", "process segment", "segment", currentSegment) // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. if err := w.watch(nw, currentSegment, currentSegment >= last); err != nil { - level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err) return err } currentSegment++ } + + return nil } // findSegmentForIndex finds the first segment greater than or equal to index. @@ -246,7 +247,7 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { } defer segment.Close() - reader := wal.NewLiveReader(segment) + reader := wal.NewLiveReader(w.logger, segment) readTicker := time.NewTicker(readPeriod) defer readTicker.Stop() @@ -274,8 +275,7 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { for { select { case <-w.quit: - level.Info(w.logger).Log("msg", "quitting WAL watcher watch loop") - return errors.New("quit channel") + return nil case <-checkpointTicker.C: // Periodically check if there is a new checkpoint so we can garbage @@ -296,25 +296,40 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { continue } - if err := w.readSegment(reader, segmentNum); err != nil { - // Ignore errors reading to end of segment, as we're going to move to - // next segment now. - level.Error(w.logger).Log("msg", "error reading to end of segment", "err", err) - } + err = w.readSegment(reader, segmentNum) - level.Info(w.logger).Log("msg", "a new segment exists, we should start reading it", "current", fmt.Sprintf("%08d", segmentNum), "new", fmt.Sprintf("%08d", last)) - return nil - - case <-readTicker.C: - err := w.readSegment(reader, segmentNum) - - // If we're reading to completion, stop when we hit an EOF. - if err == io.EOF && !tail { - level.Info(w.logger).Log("msg", "done replaying segment", "segment", segmentNum, "size", size, "read", reader.TotalRead()) + // Ignore errors reading to end of segment whilst replaying the WAL. + if !tail { + if err != nil && err != io.EOF { + level.Warn(w.logger).Log("msg", "ignoring error reading to end of segment, may have dropped data", "err", err) + } else if reader.Offset() != size { + level.Warn(w.logger).Log("msg", "expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size) + } return nil } - if err != nil && err != io.EOF { + // Otherwise, when we are tailing, non-EOFs are fatal. + if err != io.EOF { + return err + } + + return nil + + case <-readTicker.C: + err = w.readSegment(reader, segmentNum) + + // Ignore all errors reading to end of segment whilst replaying the WAL. + if !tail { + if err != nil && err != io.EOF { + level.Warn(w.logger).Log("msg", "ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) + } else if reader.Offset() != size { + level.Warn(w.logger).Log("msg", "expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size) + } + return nil + } + + // Otherwise, when we are tailing, non-EOFs are fatal. + if err != io.EOF { return err } } @@ -355,11 +370,8 @@ func (w *WALWatcher) garbageCollectSeries(segmentNum int) error { func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int) error { for r.Next() && !isClosed(w.quit) { - err := w.decodeRecord(r.Record(), segmentNum) - - // Intentionally skip over record decode errors. - if err != nil { - level.Error(w.logger).Log("err", err) + if err := w.decodeRecord(r.Record(), segmentNum); err != nil { + return err } } return r.Err() @@ -450,13 +462,13 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error { } // w.readSeriesRecords(wal.NewLiveReader(sr), i, size) - r := wal.NewLiveReader(sr) - if err := w.readSegment(r, index); err != nil { + r := wal.NewLiveReader(w.logger, sr) + if err := w.readSegment(r, index); err != io.EOF { return errors.Wrap(err, "readSegment") } - if r.TotalRead() != size { - level.Warn(w.logger).Log("msg", "may not have read all data from checkpoint", "totalRead", r.TotalRead(), "size", size) + if r.Offset() != size { + level.Warn(w.logger).Log("msg", "may not have read all data from checkpoint", "totalRead", r.Offset(), "size", size) } level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir) diff --git a/storage/remote/wal_watcher_test.go b/storage/remote/wal_watcher_test.go index 8e009dce2..9e114f390 100644 --- a/storage/remote/wal_watcher_test.go +++ b/storage/remote/wal_watcher_test.go @@ -22,7 +22,6 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/labels" @@ -141,8 +140,7 @@ func Test_readToEnd_noCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - st := timestamp.FromTime(time.Now()) - watcher := NewWALWatcher(nil, "", wt, dir, st) + watcher := NewWALWatcher(nil, "", wt, dir) go watcher.Start() expected := seriesCount @@ -223,8 +221,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) { _, _, err = w.Segments() testutil.Ok(t, err) wt := newWriteToMock() - st := timestamp.FromTime(time.Now()) - watcher := NewWALWatcher(nil, "", wt, dir, st) + watcher := NewWALWatcher(nil, "", wt, dir) go watcher.Start() expected := seriesCount * 10 * 2 @@ -285,8 +282,7 @@ func Test_readCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - st := timestamp.FromTime(time.Now()) - watcher := NewWALWatcher(nil, "", wt, dir, st) + watcher := NewWALWatcher(nil, "", wt, dir) go watcher.Start() expected := seriesCount * 10 @@ -342,8 +338,7 @@ func Test_checkpoint_seriesReset(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - st := timestamp.FromTime(time.Now()) - watcher := NewWALWatcher(nil, "", wt, dir, st) + watcher := NewWALWatcher(nil, "", wt, dir) go watcher.Start() expected := seriesCount * 10 @@ -372,7 +367,7 @@ func Test_decodeRecord(t *testing.T) { defer os.RemoveAll(dir) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir, 0) + watcher := NewWALWatcher(nil, "", wt, dir) enc := tsdb.RecordEncoder{} buf := enc.Series([]tsdb.RefSeries{tsdb.RefSeries{Ref: 1234, Labels: labels.Labels{}}}, nil) @@ -394,7 +389,7 @@ func Test_decodeRecord_afterStart(t *testing.T) { defer os.RemoveAll(dir) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir, 1) + watcher := NewWALWatcher(nil, "", wt, dir) enc := tsdb.RecordEncoder{} buf := enc.Series([]tsdb.RefSeries{tsdb.RefSeries{Ref: 1234, Labels: labels.Labels{}}}, nil)