Merge pull request #15778 from machine424/reuse-pools

feat(tsdb/(head|agent)): reuse pools across segments to reduce garbage during WL replay
This commit is contained in:
Ayoub Mrini 2025-02-17 12:48:17 +01:00 committed by GitHub
commit e04913aea2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 72 additions and 59 deletions

View file

@ -235,6 +235,12 @@ type DB struct {
appenderPool sync.Pool
bufPool sync.Pool
// These pools are used during WAL replay.
walReplaySeriesPool zeropool.Pool[[]record.RefSeries]
walReplaySamplesPool zeropool.Pool[[]record.RefSample]
walReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample]
walReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
nextRef *atomic.Uint64
series *stripeSeries
// deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they
@ -426,11 +432,6 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
decoded = make(chan interface{}, 10)
errCh = make(chan error, 1)
seriesPool zeropool.Pool[[]record.RefSeries]
samplesPool zeropool.Pool[[]record.RefSample]
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
)
go func() {
@ -440,7 +441,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series := seriesPool.Get()[:0]
series := db.walReplaySeriesPool.Get()[:0]
series, err = dec.Series(rec, series)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -452,7 +453,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
decoded <- series
case record.Samples:
samples := samplesPool.Get()[:0]
samples := db.walReplaySamplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -464,7 +465,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
decoded <- samples
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
histograms := histogramsPool.Get()[:0]
histograms := db.walReplayHistogramsPool.Get()[:0]
histograms, err = dec.HistogramSamples(rec, histograms)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -476,7 +477,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
decoded <- histograms
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
floatHistograms := floatHistogramsPool.Get()[:0]
floatHistograms := db.walReplayFloatHistogramsPool.Get()[:0]
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -521,7 +522,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
}
}
seriesPool.Put(v)
db.walReplaySeriesPool.Put(v)
case []record.RefSample:
for _, entry := range v {
// Update the lastTs for the series based
@ -535,7 +536,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
series.lastTs = entry.T
}
}
samplesPool.Put(v)
db.walReplaySamplesPool.Put(v)
case []record.RefHistogramSample:
for _, entry := range v {
// Update the lastTs for the series based
@ -549,7 +550,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
series.lastTs = entry.T
}
}
histogramsPool.Put(v)
db.walReplayHistogramsPool.Put(v)
case []record.RefFloatHistogramSample:
for _, entry := range v {
// Update the lastTs for the series based
@ -563,7 +564,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
series.lastTs = entry.T
}
}
floatHistogramsPool.Put(v)
db.walReplayFloatHistogramsPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}

View file

@ -94,6 +94,16 @@ type Head struct {
bytesPool zeropool.Pool[[]byte]
memChunkPool sync.Pool
// These pools are used during WAL/WBL replay.
wlReplaySeriesPool zeropool.Pool[[]record.RefSeries]
wlReplaySamplesPool zeropool.Pool[[]record.RefSample]
wlReplaytStonesPool zeropool.Pool[[]tombstones.Stone]
wlReplayExemplarsPool zeropool.Pool[[]record.RefExemplar]
wlReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample]
wlReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
wlReplayMetadataPool zeropool.Pool[[]record.RefMetadata]
wlReplayMmapMarkersPool zeropool.Pool[[]record.RefMmapMarker]
// All series addressable by their ID or hash.
series *stripeSeries

View file

@ -46,6 +46,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones"
@ -440,27 +441,41 @@ func BenchmarkLoadWLs(b *testing.B) {
// BenchmarkLoadRealWLs will be skipped unless the BENCHMARK_LOAD_REAL_WLS_DIR environment variable is set.
// BENCHMARK_LOAD_REAL_WLS_DIR should be the folder where `wal` and `chunks_head` are located.
//
// Using an absolute path for BENCHMARK_LOAD_REAL_WLS_DIR is recommended.
//
// Because WLs loading may alter BENCHMARK_LOAD_REAL_WLS_DIR which can affect benchmark results and to ensure consistency,
// a copy of BENCHMARK_LOAD_REAL_WLS_DIR is made for each iteration and deleted at the end.
// Make sure there is sufficient disk space for that.
func BenchmarkLoadRealWLs(b *testing.B) {
dir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
if dir == "" {
srcDir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
if srcDir == "" {
b.SkipNow()
}
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
require.NoError(b, err)
b.Cleanup(func() { wal.Close() })
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
require.NoError(b, err)
b.Cleanup(func() { wbl.Close() })
// Load the WAL.
for i := 0; i < b.N; i++ {
b.StopTimer()
dir := b.TempDir()
require.NoError(b, fileutil.CopyDirs(srcDir, dir))
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
require.NoError(b, err)
b.Cleanup(func() { wal.Close() })
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
require.NoError(b, err)
b.Cleanup(func() { wbl.Close() })
b.StartTimer()
opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
require.NoError(b, err)
require.NoError(b, h.Init(0))
b.StopTimer()
require.NoError(b, os.RemoveAll(dir))
}
}

View file

@ -39,7 +39,6 @@ import (
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/zeropool"
)
// histogramRecord combines both RefHistogramSample and RefFloatHistogramSample
@ -73,14 +72,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
decoded = make(chan interface{}, 10)
decodeErr, seriesCreationErr error
seriesPool zeropool.Pool[[]record.RefSeries]
samplesPool zeropool.Pool[[]record.RefSample]
tstonesPool zeropool.Pool[[]tombstones.Stone]
exemplarsPool zeropool.Pool[[]record.RefExemplar]
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
metadataPool zeropool.Pool[[]record.RefMetadata]
)
defer func() {
@ -140,7 +131,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series := seriesPool.Get()[:0]
series := h.wlReplaySeriesPool.Get()[:0]
series, err = dec.Series(rec, series)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -152,7 +143,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- series
case record.Samples:
samples := samplesPool.Get()[:0]
samples := h.wlReplaySamplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -164,7 +155,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- samples
case record.Tombstones:
tstones := tstonesPool.Get()[:0]
tstones := h.wlReplaytStonesPool.Get()[:0]
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -176,7 +167,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- tstones
case record.Exemplars:
exemplars := exemplarsPool.Get()[:0]
exemplars := h.wlReplayExemplarsPool.Get()[:0]
exemplars, err = dec.Exemplars(rec, exemplars)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -188,7 +179,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- exemplars
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
hists := histogramsPool.Get()[:0]
hists := h.wlReplayHistogramsPool.Get()[:0]
hists, err = dec.HistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -200,7 +191,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- hists
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
hists := floatHistogramsPool.Get()[:0]
hists := h.wlReplayFloatHistogramsPool.Get()[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -212,7 +203,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- hists
case record.Metadata:
meta := metadataPool.Get()[:0]
meta := h.wlReplayMetadataPool.Get()[:0]
meta, err := dec.Metadata(rec, meta)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -251,7 +242,7 @@ Outer:
idx := uint64(mSeries.ref) % uint64(concurrency)
processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
}
seriesPool.Put(v)
h.wlReplaySeriesPool.Put(v)
case []record.RefSample:
samples := v
minValidTime := h.minValidTime.Load()
@ -287,7 +278,7 @@ Outer:
}
samples = samples[m:]
}
samplesPool.Put(v)
h.wlReplaySamplesPool.Put(v)
case []tombstones.Stone:
for _, s := range v {
for _, itv := range s.Intervals {
@ -301,12 +292,12 @@ Outer:
h.tombstones.AddInterval(s.Ref, itv)
}
}
tstonesPool.Put(v)
h.wlReplaytStonesPool.Put(v)
case []record.RefExemplar:
for _, e := range v {
exemplarsInput <- e
}
exemplarsPool.Put(v)
h.wlReplayExemplarsPool.Put(v)
case []record.RefHistogramSample:
samples := v
minValidTime := h.minValidTime.Load()
@ -342,7 +333,7 @@ Outer:
}
samples = samples[m:]
}
histogramsPool.Put(v)
h.wlReplayHistogramsPool.Put(v)
case []record.RefFloatHistogramSample:
samples := v
minValidTime := h.minValidTime.Load()
@ -378,7 +369,7 @@ Outer:
}
samples = samples[m:]
}
floatHistogramsPool.Put(v)
h.wlReplayFloatHistogramsPool.Put(v)
case []record.RefMetadata:
for _, m := range v {
s := h.series.getByID(m.Ref)
@ -392,7 +383,7 @@ Outer:
Help: m.Help,
}
}
metadataPool.Put(v)
h.wlReplayMetadataPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}
@ -659,12 +650,8 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
shards = make([][]record.RefSample, concurrency)
histogramShards = make([][]histogramRecord, concurrency)
decodedCh = make(chan interface{}, 10)
decodeErr error
samplesPool zeropool.Pool[[]record.RefSample]
markersPool zeropool.Pool[[]record.RefMmapMarker]
histogramSamplesPool zeropool.Pool[[]record.RefHistogramSample]
floatHistogramSamplesPool zeropool.Pool[[]record.RefFloatHistogramSample]
decodedCh = make(chan interface{}, 10)
decodeErr error
)
defer func() {
@ -700,7 +687,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
rec := r.Record()
switch dec.Type(rec) {
case record.Samples:
samples := samplesPool.Get()[:0]
samples := h.wlReplaySamplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -712,7 +699,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- samples
case record.MmapMarkers:
markers := markersPool.Get()[:0]
markers := h.wlReplayMmapMarkersPool.Get()[:0]
markers, err = dec.MmapMarkers(rec, markers)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -724,7 +711,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- markers
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
hists := histogramSamplesPool.Get()[:0]
hists := h.wlReplayHistogramsPool.Get()[:0]
hists, err = dec.HistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -736,7 +723,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- hists
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
hists := floatHistogramSamplesPool.Get()[:0]
hists := h.wlReplayFloatHistogramsPool.Get()[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -787,7 +774,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples[m:]
}
samplesPool.Put(v)
h.wlReplaySamplesPool.Put(v)
case []record.RefMmapMarker:
markers := v
for _, rm := range markers {
@ -842,7 +829,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples[m:]
}
histogramSamplesPool.Put(v)
h.wlReplayHistogramsPool.Put(v)
case []record.RefFloatHistogramSample:
samples := v
// We split up the samples into chunks of 5000 samples or less.
@ -874,7 +861,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples[m:]
}
floatHistogramSamplesPool.Put(v)
h.wlReplayFloatHistogramsPool.Put(v)
default:
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
}