WIP cut wal watcher cpu usage by ~40% by using a backoff in the timer

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2023-02-07 13:02:37 -08:00
parent c70d85baed
commit 9dac334754

View file

@ -35,6 +35,7 @@ import (
const (
readPeriod = 10 * time.Millisecond
maxReadPeriod = 1 * time.Second
checkpointPeriod = 5 * time.Second
segmentCheckPeriod = 100 * time.Millisecond
consumer = "consumer"
@ -83,6 +84,7 @@ type Watcher struct {
startTime time.Time
startTimestamp int64 // the start time as a Prometheus timestamp
sendSamples bool
curBackoff time.Duration
recordsReadMetric *prometheus.CounterVec
recordDecodeFailsMetric prometheus.Counter
@ -401,7 +403,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
continue
}
err = w.readSegment(reader, segmentNum, tail)
_, err = w.readSegment(reader, segmentNum, tail)
// Ignore errors reading to end of segment whilst replaying the WAL.
if !tail {
@ -421,7 +423,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
return nil
case <-readTicker.C:
err = w.readSegment(reader, segmentNum, tail)
ok, err := w.readSegment(reader, segmentNum, tail)
// Ignore all errors reading to end of segment whilst replaying the WAL.
if !tail {
@ -437,7 +439,22 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
if errors.Cause(err) != io.EOF {
return err
}
if ok {
w.curBackoff = readPeriod
continue
}
fmt.Printf("there was no data, sleeping for %s\n", w.curBackoff)
time.Sleep(w.curBackoff)
w.doubleBackoff()
}
}
}
func (w *Watcher) doubleBackoff() {
w.curBackoff = w.curBackoff * 2
if w.curBackoff > maxReadPeriod {
w.curBackoff = maxReadPeriod
}
}
@ -475,7 +492,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
// Read from a segment and pass the details to w.writer.
// Also used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) (bool, error) {
var (
dec record.Decoder
series []record.RefSeries
@ -486,6 +503,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
histogramsToSend []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
floatHistogramsToSend []record.RefFloatHistogramSample
readData bool
)
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
@ -496,9 +514,10 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
series, err := dec.Series(rec, series[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
return readData, err
}
w.writer.StoreSeries(series, segmentNum)
readData = true
case record.Samples:
// If we're not tailing a segment we can ignore any samples records we see.
@ -509,7 +528,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
samples, err := dec.Samples(rec, samples[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
return readData, err
}
for _, s := range samples {
if s.T > w.startTimestamp {
@ -525,6 +544,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.writer.Append(samplesToSend)
samplesToSend = samplesToSend[:0]
}
readData = true
case record.Exemplars:
// Skip if experimental "exemplars over remote write" is not enabled.
@ -539,9 +559,10 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
exemplars, err := dec.Exemplars(rec, exemplars[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
return readData, err
}
w.writer.AppendExemplars(exemplars)
readData = true
case record.HistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
@ -554,7 +575,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
histograms, err := dec.HistogramSamples(rec, histograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
return readData, err
}
for _, h := range histograms {
if h.T > w.startTimestamp {
@ -570,6 +591,8 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.writer.AppendHistograms(histogramsToSend)
histogramsToSend = histogramsToSend[:0]
}
readData = true
case record.FloatHistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms {
@ -581,7 +604,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
floatHistograms, err := dec.FloatHistogramSamples(rec, floatHistograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
return readData, err
}
for _, fh := range floatHistograms {
if fh.T > w.startTimestamp {
@ -597,22 +620,26 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.writer.AppendFloatHistograms(floatHistogramsToSend)
floatHistogramsToSend = floatHistogramsToSend[:0]
}
readData = true
case record.Tombstones:
readData = true
default:
// Could be corruption, or reading from a WAL from a newer Prometheus.
w.recordDecodeFailsMetric.Inc()
readData = true // should we set this here?
}
}
return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err())
return readData, errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err())
}
// Go through all series in a segment updating the segmentNum, so we can delete older series.
// Used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) (bool, error) {
var (
dec record.Decoder
series []record.RefSeries
readData bool
)
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
@ -623,8 +650,9 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error
series, err := dec.Series(rec, series[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
return false, err
}
readData = true
w.writer.UpdateSeriesSegment(series, segmentNum)
// Ignore these; we're only interested in series.
@ -637,7 +665,7 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error
w.recordDecodeFailsMetric.Inc()
}
}
return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err())
return readData, errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err())
}
func (w *Watcher) SetStartTime(t time.Time) {
@ -645,7 +673,7 @@ func (w *Watcher) SetStartTime(t time.Time) {
w.startTimestamp = timestamp.FromTime(t)
}
type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) error
type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) (bool, error)
// Read all the series records from a Checkpoint directory.
func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error {
@ -673,7 +701,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err
defer sr.Close()
r := NewLiveReader(w.logger, w.readerMetrics, sr)
if err := readFn(w, r, index, false); errors.Cause(err) != io.EOF && err != nil {
if _, err := readFn(w, r, index, false); errors.Cause(err) != io.EOF && err != nil {
return errors.Wrap(err, "readSegment")
}