From c5b3f0221f2884f1d70fdf86866015aeb4cf9b60 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Thu, 7 Nov 2019 09:26:45 -0700 Subject: [PATCH] Decode WAL in Separate Goroutine (#6230) * Make WAL replay benchmark more representative Signed-off-by: Chris Marchbanks * Move decoding records from the WAL into goroutine Decoding the WAL records accounts for a significant amount of time on startup, and can be done in parallel with creating series/samples to speed up startup. However, records still must be handled in order, so only a single goroutine can do the decoding. benchmark old ns/op new ns/op delta BenchmarkLoadWAL/batches=10,seriesPerBatch=100,samplesPerSeries=7200-8 481607033 391971490 -18.61% BenchmarkLoadWAL/batches=10,seriesPerBatch=10000,samplesPerSeries=50-8 836394378 629067006 -24.79% BenchmarkLoadWAL/batches=10,seriesPerBatch=1000,samplesPerSeries=480-8 348238658 234218667 -32.74% Signed-off-by: Chris Marchbanks --- tsdb/head.go | 127 ++++++++++++++++++++++++++++++++-------------- tsdb/head_test.go | 17 ++++--- 2 files changed, 97 insertions(+), 47 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 942eceee17..ea4bde3cb3 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -389,9 +389,6 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { var ( dec record.Decoder - series []record.RefSeries - samples []record.RefSample - tstones []tombstones.Stone allStones = tombstones.NewMemTombstones() shards = make([][]record.RefSample, n) ) @@ -400,21 +397,82 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { level.Warn(h.logger).Log("msg", "closing memTombstones during wal read", "err", err) } }() - for r.Next() { - series, samples, tstones = series[:0], samples[:0], tstones[:0] - rec := r.Record() - switch dec.Type(rec) { - case record.Series: - series, err = dec.Series(rec, series) - if err != nil { - return &wal.CorruptionErr{ - Err: errors.Wrap(err, "decode series"), + var ( + decoded = make(chan interface{}, 10) + errCh = make(chan error, 1) + seriesPool = sync.Pool{ + New: func() interface{} { + return []record.RefSeries{} + }, + } + samplesPool = sync.Pool{ + New: func() interface{} { + return []record.RefSample{} + }, + } + tstonesPool = sync.Pool{ + New: func() interface{} { + return []tombstones.Stone{} + }, + } + ) + go func() { + defer close(decoded) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + series := seriesPool.Get().([]record.RefSeries)[:0] + series, err = dec.Series(rec, series) + if err != nil { + errCh <- &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode series"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- series + case record.Samples: + samples := samplesPool.Get().([]record.RefSample)[:0] + samples, err = dec.Samples(rec, samples) + if err != nil { + errCh <- &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode samples"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- samples + case record.Tombstones: + tstones := tstonesPool.Get().([]tombstones.Stone)[:0] + tstones, err = dec.Tombstones(rec, tstones) + if err != nil { + errCh <- &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode tombstones"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- tstones + default: + errCh <- &wal.CorruptionErr{ + Err: errors.Errorf("invalid record type %v", dec.Type(rec)), Segment: r.Segment(), Offset: r.Offset(), } + return } - for _, s := range series { + } + }() + + for d := range decoded { + switch v := d.(type) { + case []record.RefSeries: + for _, s := range v { series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) if !created { @@ -426,16 +484,10 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { h.lastSeriesID = s.Ref } } - case record.Samples: - samples, err = dec.Samples(rec, samples) - s := samples - if err != nil { - return &wal.CorruptionErr{ - Err: errors.Wrap(err, "decode samples"), - Segment: r.Segment(), - Offset: r.Offset(), - } - } + //lint:ignore SA6002 relax staticcheck verification. + seriesPool.Put(v) + case []record.RefSample: + samples := v // We split up the samples into chunks of 5000 samples or less. // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise // cause thousands of very large in flight buffers occupying large amounts @@ -465,17 +517,10 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { } samples = samples[m:] } - samples = s // Keep whole slice for reuse. - case record.Tombstones: - tstones, err = dec.Tombstones(rec, tstones) - if err != nil { - return &wal.CorruptionErr{ - Err: errors.Wrap(err, "decode tombstones"), - Segment: r.Segment(), - Offset: r.Offset(), - } - } - for _, s := range tstones { + //lint:ignore SA6002 relax staticcheck verification. + samplesPool.Put(v) + case []tombstones.Stone: + for _, s := range v { for _, itv := range s.Intervals { if itv.Maxt < h.minValidTime { continue @@ -487,15 +532,19 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { allStones.AddInterval(s.Ref, itv) } } + //lint:ignore SA6002 relax staticcheck verification. + tstonesPool.Put(v) default: - return &wal.CorruptionErr{ - Err: errors.Errorf("invalid record type %v", dec.Type(rec)), - Segment: r.Segment(), - Offset: r.Offset(), - } + panic(fmt.Errorf("unexpected decoded type: %T", d)) } } + select { + case err := <-errCh: + return err + default: + } + // Signal termination to each worker and wait for it to close its output channel. for i := 0; i < n; i++ { close(inputs[i]) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 191edef39f..39350f08ff 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -106,20 +106,20 @@ func BenchmarkLoadWAL(b *testing.B) { seriesPerBatch int samplesPerSeries int }{ - { // Less series and more samples. + { // Less series and more samples. 2 hour WAL with 1 second scrape interval. batches: 10, seriesPerBatch: 100, - samplesPerSeries: 100000, + samplesPerSeries: 7200, }, { // More series and less samples. batches: 10, seriesPerBatch: 10000, - samplesPerSeries: 100, + samplesPerSeries: 50, }, { // In between. batches: 10, seriesPerBatch: 1000, - samplesPerSeries: 10000, + samplesPerSeries: 480, }, } @@ -167,13 +167,14 @@ func BenchmarkLoadWAL(b *testing.B) { } } - h, err := NewHead(nil, nil, w, 1000) - testutil.Ok(b, err) - b.ResetTimer() // Load the WAL. - h.Init(0) + for i := 0; i < b.N; i++ { + h, err := NewHead(nil, nil, w, 1000) + testutil.Ok(b, err) + h.Init(0) + } }) } }