Merge pull request #13628 from bboreham/cleanup-13583

tsdb/wlog: small cleanup of WAL watcher after #13583
This commit is contained in:
Bryan Boreham 2024-02-23 12:39:17 +00:00 committed by GitHub
commit 4d6bb2e0e4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 11 additions and 26 deletions

View file

@ -262,9 +262,6 @@ func (w *Watcher) loop() {
// Run the watcher, which will tail the WAL until the quit channel is closed // Run the watcher, which will tail the WAL until the quit channel is closed
// or an error case is hit. // or an error case is hit.
func (w *Watcher) Run() error { func (w *Watcher) Run() error {
var lastSegment int
var err error
// We want to ensure this is false across iterations since // We want to ensure this is false across iterations since
// Run will be called again if there was a failure to read the WAL. // Run will be called again if there was a failure to read the WAL.
w.sendSamples = false w.sendSamples = false
@ -289,21 +286,19 @@ func (w *Watcher) Run() error {
return err return err
} }
level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment) level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment)
for !isClosed(w.quit) { for !isClosed(w.quit) {
w.currentSegmentMetric.Set(float64(currentSegment)) w.currentSegmentMetric.Set(float64(currentSegment))
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment)
// Reset the value of lastSegment each iteration, this is to avoid having to wait too long for // Re-check on each iteration in case a new segment was added,
// between reads if we're reading a segment that is not the most recent segment after startup. // because watch() will wait for notifications on the last segment.
_, lastSegment, err = w.firstAndLast() _, lastSegment, err := w.firstAndLast()
if err != nil { if err != nil {
return fmt.Errorf("wal.Segments: %w", err) return fmt.Errorf("wal.Segments: %w", err)
} }
tail := currentSegment >= lastSegment tail := currentSegment >= lastSegment
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment, "lastSegment", lastSegment)
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) { if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) {
return err return err
} }

View file

@ -59,46 +59,36 @@ type writeToMock struct {
seriesLock sync.Mutex seriesLock sync.Mutex
seriesSegmentIndexes map[chunks.HeadSeriesRef]int seriesSegmentIndexes map[chunks.HeadSeriesRef]int
// delay reads with a short sleep // If nonzero, delay reads with a short sleep.
delay time.Duration delay time.Duration
} }
func (wtm *writeToMock) Append(s []record.RefSample) bool { func (wtm *writeToMock) Append(s []record.RefSample) bool {
if wtm.delay > 0 { time.Sleep(wtm.delay)
time.Sleep(wtm.delay)
}
wtm.samplesAppended += len(s) wtm.samplesAppended += len(s)
return true return true
} }
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool { func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
if wtm.delay > 0 { time.Sleep(wtm.delay)
time.Sleep(wtm.delay)
}
wtm.exemplarsAppended += len(e) wtm.exemplarsAppended += len(e)
return true return true
} }
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool { func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
if wtm.delay > 0 { time.Sleep(wtm.delay)
time.Sleep(wtm.delay)
}
wtm.histogramsAppended += len(h) wtm.histogramsAppended += len(h)
return true return true
} }
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool { func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
if wtm.delay > 0 { time.Sleep(wtm.delay)
time.Sleep(wtm.delay)
}
wtm.floatHistogramsAppended += len(fh) wtm.floatHistogramsAppended += len(fh)
return true return true
} }
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
if wtm.delay > 0 { time.Sleep(wtm.delay)
time.Sleep(wtm.delay)
}
wtm.UpdateSeriesSegment(series, index) wtm.UpdateSeriesSegment(series, index)
} }