Use zeropool when replaying agent's DB WAL (#12651)

Same as https://github.com/prometheus/prometheus/pull/12189 but for
tsdb/agent/db.go

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
Oleg Zaytsev 2023-08-04 10:39:55 +02:00 committed by GitHub
parent 5e21b3b2c6
commit 6ea6def0d3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -42,6 +42,7 @@ import (
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/zeropool"
)
const (
@ -411,28 +412,13 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
dec record.Decoder
lastRef = chunks.HeadSeriesRef(db.nextRef.Load())
decoded = make(chan interface{}, 10)
errCh = make(chan error, 1)
seriesPool = sync.Pool{
New: func() interface{} {
return []record.RefSeries{}
},
}
samplesPool = sync.Pool{
New: func() interface{} {
return []record.RefSample{}
},
}
histogramsPool = sync.Pool{
New: func() interface{} {
return []record.RefHistogramSample{}
},
}
floatHistogramsPool = sync.Pool{
New: func() interface{} {
return []record.RefFloatHistogramSample{}
},
}
decoded = make(chan interface{}, 10)
errCh = make(chan error, 1)
seriesPool zeropool.Pool[[]record.RefSeries]
samplesPool zeropool.Pool[[]record.RefSample]
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
)
go func() {
@ -442,7 +428,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series := seriesPool.Get().([]record.RefSeries)[:0]
series := seriesPool.Get()[:0]
series, err = dec.Series(rec, series)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -454,7 +440,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
decoded <- series
case record.Samples:
samples := samplesPool.Get().([]record.RefSample)[:0]
samples := samplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -466,7 +452,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
decoded <- samples
case record.HistogramSamples:
histograms := histogramsPool.Get().([]record.RefHistogramSample)[:0]
histograms := histogramsPool.Get()[:0]
histograms, err = dec.HistogramSamples(rec, histograms)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -478,7 +464,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
decoded <- histograms
case record.FloatHistogramSamples:
floatHistograms := floatHistogramsPool.Get().([]record.RefFloatHistogramSample)[:0]
floatHistograms := floatHistogramsPool.Get()[:0]
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -523,8 +509,6 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
}
}
//nolint:staticcheck
seriesPool.Put(v)
case []record.RefSample:
for _, entry := range v {
@ -539,8 +523,6 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
series.lastTs = entry.T
}
}
//nolint:staticcheck
samplesPool.Put(v)
case []record.RefHistogramSample:
for _, entry := range v {
@ -555,7 +537,6 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
series.lastTs = entry.T
}
}
//nolint:staticcheck
histogramsPool.Put(v)
case []record.RefFloatHistogramSample:
for _, entry := range v {
@ -570,7 +551,6 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
series.lastTs = entry.T
}
}
//nolint:staticcheck
floatHistogramsPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))