From d3682d701c66a3ccb3d193d1f66490bb86044b46 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 6 Oct 2017 14:06:39 +0200 Subject: [PATCH 1/2] wal: decode and process in separate threads. --- head.go | 9 +- head_test.go | 2 +- wal.go | 243 +++++++++++++++++++++++++++++---------------------- wal_test.go | 27 ++---- 4 files changed, 150 insertions(+), 131 deletions(-) diff --git a/head.go b/head.go index 4f3c60c39..1347b75d9 100644 --- a/head.go +++ b/head.go @@ -197,7 +197,7 @@ func (h *Head) ReadWAL() error { // for error reporting. var unknownRefs int - seriesFunc := func(series []RefSeries) error { + seriesFunc := func(series []RefSeries) { for _, s := range series { h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) @@ -205,9 +205,8 @@ func (h *Head) ReadWAL() error { h.lastSeriesID = s.Ref } } - return nil } - samplesFunc := func(samples []RefSample) error { + samplesFunc := func(samples []RefSample) { for _, s := range samples { if s.T < mint { continue @@ -223,9 +222,8 @@ func (h *Head) ReadWAL() error { h.metrics.chunks.Inc() } } - return nil } - deletesFunc := func(stones []Stone) error { + deletesFunc := func(stones []Stone) { for _, s := range stones { for _, itv := range s.intervals { if itv.Maxt < mint { @@ -234,7 +232,6 @@ func (h *Head) ReadWAL() error { h.tombstones.add(s.ref, itv) } } - return nil } if unknownRefs > 0 { diff --git a/head_test.go b/head_test.go index 31713b9c1..308ae3e2b 100644 --- a/head_test.go +++ b/head_test.go @@ -93,7 +93,7 @@ func (w *memoryWAL) Reader() WALReader { return w } -func (w *memoryWAL) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error { +func (w *memoryWAL) Read(series func([]RefSeries), samples func([]RefSample), deletes func([]Stone)) error { for _, e := range w.entries { switch v := e.(type) { case []RefSeries: diff --git a/wal.go b/wal.go index 467c4e09b..9dc45c608 100644 --- a/wal.go +++ b/wal.go @@ -27,16 +27,16 @@ import ( "sync" "time" - "github.com/prometheus/tsdb/fileutil" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/tsdb/labels" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/labels" ) // WALEntryType indicates what data a WAL entry contains. -type WALEntryType byte +type WALEntryType uint8 const ( // WALMagic is a 4 byte number every WAL segment file starts with. @@ -54,18 +54,6 @@ const ( WALEntryDeletes WALEntryType = 4 ) -// SamplesCB is the callback after reading samples. The passed slice -// is only valid until the call returns. -type SamplesCB func([]RefSample) error - -// SeriesCB is the callback after reading series. The passed slice -// is only valid until the call returns. -type SeriesCB func([]RefSeries) error - -// DeletesCB is the callback after reading deletes. The passed slice -// is only valid until the call returns. -type DeletesCB func([]Stone) error - type walMetrics struct { fsyncDuration prometheus.Summary } @@ -104,17 +92,27 @@ func NopWAL() WAL { type nopWAL struct{} -func (nopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil } -func (w nopWAL) Reader() WALReader { return w } -func (nopWAL) LogSeries([]RefSeries) error { return nil } -func (nopWAL) LogSamples([]RefSample) error { return nil } -func (nopWAL) LogDeletes([]Stone) error { return nil } -func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil } -func (nopWAL) Close() error { return nil } +func (nopWAL) Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), +) error { + return nil +} +func (w nopWAL) Reader() WALReader { return w } +func (nopWAL) LogSeries([]RefSeries) error { return nil } +func (nopWAL) LogSamples([]RefSample) error { return nil } +func (nopWAL) LogDeletes([]Stone) error { return nil } +func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil } +func (nopWAL) Close() error { return nil } // WALReader reads entries from a WAL. type WALReader interface { - Read(SeriesCB, SamplesCB, DeletesCB) error + Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), + ) error } // RefSeries is the series labels with the series ID. @@ -170,7 +168,7 @@ func newCRC32() hash.Hash32 { // SegmentWAL is a write ahead log for series data. type SegmentWAL struct { - mtx sync.Mutex + mtx sync.Mutex metrics *walMetrics dirFile *os.File @@ -238,12 +236,16 @@ type repairingWALReader struct { r WALReader } -func (r *repairingWALReader) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error { - err := r.r.Read(series, samples, deletes) +func (r *repairingWALReader) Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), +) error { + err := r.r.Read(seriesf, samplesf, deletesf) if err == nil { return nil } - cerr, ok := err.(walCorruptionErr) + cerr, ok := errors.Cause(err).(walCorruptionErr) if !ok { return err } @@ -336,6 +338,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { var ( csf = newSegmentFile(f) crc32 = newCRC32() + decSeries = []RefSeries{} activeSeries = []RefSeries{} ) @@ -345,13 +348,14 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { if rt != WALEntrySeries { continue } - series, err := r.decodeSeries(flag, byt) + decSeries = decSeries[:0] + activeSeries = activeSeries[:0] + + err := r.decodeSeries(flag, byt, &decSeries) if err != nil { return errors.Wrap(err, "decode samples while truncating") } - activeSeries = activeSeries[:0] - - for _, s := range series { + for _, s := range decSeries { if keep(s.Ref) { activeSeries = append(activeSeries, s) } @@ -807,10 +811,6 @@ type walReader struct { curBuf []byte lastOffset int64 // offset after last successfully read entry - seriesBuf []RefSeries - sampleBuf []RefSample - tombstoneBuf []Stone - err error } @@ -831,70 +831,118 @@ func (r *walReader) Err() error { return r.err } -func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error { - if seriesf == nil { - seriesf = func([]RefSeries) error { return nil } - } - if samplesf == nil { - samplesf = func([]RefSample) error { return nil } - } - if deletesf == nil { - deletesf = func([]Stone) error { return nil } - } +func (r *walReader) Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), +) error { + // Concurrency for replaying the WAL is very limited. We at least split out decoding and + // processing into separate threads. + // Historically, the processing is the bottleneck with reading and decoding using only + // 15% of the CPU. + var ( + seriesPool sync.Pool + samplePool sync.Pool + deletePool sync.Pool + ) + donec := make(chan struct{}) + datac := make(chan interface{}, 50) + + go func() { + defer close(donec) + + for x := range datac { + switch v := x.(type) { + case []RefSeries: + if seriesf != nil { + seriesf(v) + } + seriesPool.Put(v[:0]) + case []RefSample: + if samplesf != nil { + samplesf(v) + } + samplePool.Put(v[:0]) + case []Stone: + if deletesf != nil { + deletesf(v) + } + deletePool.Put(v[:0]) + default: + level.Error(r.logger).Log("msg", "unexpected data type") + } + } + }() + + var err error for r.next() { et, flag, b := r.at() + // In decoding below we never return a walCorruptionErr for now. // Those should generally be catched by entry decoding before. switch et { case WALEntrySeries: - series, err := r.decodeSeries(flag, b) + var series []RefSeries + if v := seriesPool.Get(); v == nil { + series = make([]RefSeries, 0, 512) + } else { + series = v.([]RefSeries) + } + + err := r.decodeSeries(flag, b, &series) if err != nil { - return errors.Wrap(err, "decode series entry") - } - if err := seriesf(series); err != nil { - return err + err = errors.Wrap(err, "decode series entry") + break } + datac <- series cf := r.current() - for _, s := range series { if cf.minSeries > s.Ref { cf.minSeries = s.Ref } } - case WALEntrySamples: - samples, err := r.decodeSamples(flag, b) + var samples []RefSample + if v := samplePool.Get(); v == nil { + samples = make([]RefSample, 0, 512) + } else { + samples = v.([]RefSample) + } + + err := r.decodeSamples(flag, b, &samples) if err != nil { - return errors.Wrap(err, "decode samples entry") - } - if err := samplesf(samples); err != nil { - return err + err = errors.Wrap(err, "decode samples entry") + break } + datac <- samples // Update the times for the WAL segment file. cf := r.current() - for _, s := range samples { if cf.maxTime < s.T { cf.maxTime = s.T } } - case WALEntryDeletes: - stones, err := r.decodeDeletes(flag, b) + var deletes []Stone + if v := deletePool.Get(); v == nil { + deletes = make([]Stone, 0, 512) + } else { + deletes = v.([]Stone) + } + + err := r.decodeDeletes(flag, b, &deletes) if err != nil { - return errors.Wrap(err, "decode delete entry") - } - if err := deletesf(stones); err != nil { - return err + err = errors.Wrap(err, "decode delete entry") + break } + datac <- deletes + // Update the times for the WAL segment file. - cf := r.current() - - for _, s := range stones { + for _, s := range deletes { for _, iv := range s.intervals { if cf.maxTime < iv.Maxt { cf.maxTime = iv.Maxt @@ -903,27 +951,16 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC } } } + close(datac) + <-donec - return r.Err() -} - -// nextEntry retrieves the next entry. It is also used as a testing hook. -func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { - if r.cur >= len(r.files) { - return 0, 0, nil, io.EOF + if err != nil { + return err } - cf := r.current() - - et, flag, b, err := r.entry(cf) - // If we reached the end of the reader, advance to the next one and close. - // Do not close on the last one as it will still be appended to. - if err == io.EOF && r.cur < len(r.files)-1 { - // Current reader completed. Leave the file open for later reads - // for truncating. - r.cur++ - return r.nextEntry() + if r.Err() != nil { + return errors.Wrap(r.Err(), "read entry") } - return et, flag, b, err + return nil } func (r *walReader) at() (WALEntryType, byte, []byte) { @@ -1043,9 +1080,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return etype, flag, buf, nil } -func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { - r.seriesBuf = r.seriesBuf[:0] - +func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { dec := decbuf{b: b} for len(dec.b) > 0 && dec.err() == nil { @@ -1059,25 +1094,24 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { } sort.Sort(lset) - r.seriesBuf = append(r.seriesBuf, RefSeries{ + *res = append(*res, RefSeries{ Ref: ref, Labels: lset, }) } if dec.err() != nil { - return nil, dec.err() + return dec.err() } if len(dec.b) > 0 { - return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return r.seriesBuf, nil + return nil } -func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { +func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { if len(b) == 0 { - return nil, nil + return nil } - r.sampleBuf = r.sampleBuf[:0] dec := decbuf{b: b} var ( @@ -1090,7 +1124,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { dtime := dec.varint64() val := dec.be64() - r.sampleBuf = append(r.sampleBuf, RefSample{ + *res = append(*res, RefSample{ Ref: uint64(int64(baseRef) + dref), T: baseTime + dtime, V: math.Float64frombits(val), @@ -1098,20 +1132,19 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { } if dec.err() != nil { - return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf)) + return errors.Wrapf(dec.err(), "decode error after %d samples", len(*res)) } if len(dec.b) > 0 { - return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return r.sampleBuf, nil + return nil } -func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { +func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { dec := &decbuf{b: b} - r.tombstoneBuf = r.tombstoneBuf[:0] for dec.len() > 0 && dec.err() == nil { - r.tombstoneBuf = append(r.tombstoneBuf, Stone{ + *res = append(*res, Stone{ ref: dec.be64(), intervals: Intervals{ {Mint: dec.varint64(), Maxt: dec.varint64()}, @@ -1119,10 +1152,10 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { }) } if dec.err() != nil { - return nil, dec.err() + return dec.err() } if len(dec.b) > 0 { - return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return r.tombstoneBuf, nil + return nil } diff --git a/wal_test.go b/wal_test.go index d469a888c..aadce89d7 100644 --- a/wal_test.go +++ b/wal_test.go @@ -187,9 +187,8 @@ func TestSegmentWAL_Truncate(t *testing.T) { var readSeries []RefSeries r := w.Reader() - r.Read(func(s []RefSeries) error { + r.Read(func(s []RefSeries) { readSeries = append(readSeries, s...) - return nil }, nil, nil) require.Equal(t, expected, readSeries) @@ -235,33 +234,27 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { resultDeletes [][]Stone ) - serf := func(series []RefSeries) error { + serf := func(series []RefSeries) { if len(series) > 0 { clsets := make([]RefSeries, len(series)) copy(clsets, series) resultSeries = append(resultSeries, clsets) } - - return nil } - smplf := func(smpls []RefSample) error { + smplf := func(smpls []RefSample) { if len(smpls) > 0 { csmpls := make([]RefSample, len(smpls)) copy(csmpls, smpls) resultSamples = append(resultSamples, csmpls) } - - return nil } - delf := func(stones []Stone) error { + delf := func(stones []Stone) { if len(stones) > 0 { cst := make([]Stone, len(stones)) copy(cst, stones) resultDeletes = append(resultDeletes, cst) } - - return nil } require.NoError(t, r.Read(serf, smplf, delf)) @@ -420,26 +413,22 @@ func TestWALRestoreCorrupted(t *testing.T) { r := w2.Reader() - serf := func(l []RefSeries) error { + serf := func(l []RefSeries) { require.Equal(t, 0, len(l)) - return nil } - delf := func([]Stone) error { return nil } // Weird hack to check order of reads. i := 0 - samplf := func(s []RefSample) error { + samplf := func(s []RefSample) { if i == 0 { require.Equal(t, []RefSample{{T: 1, V: 2}}, s) i++ } else { require.Equal(t, []RefSample{{T: 99, V: 100}}, s) } - - return nil } - require.NoError(t, r.Read(serf, samplf, delf)) + require.NoError(t, r.Read(serf, samplf, nil)) require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}})) require.NoError(t, w2.Close()) @@ -452,7 +441,7 @@ func TestWALRestoreCorrupted(t *testing.T) { r = w3.Reader() i = 0 - require.NoError(t, r.Read(serf, samplf, delf)) + require.NoError(t, r.Read(serf, samplf, nil)) }) } } From 7efb830d7021df8b24eb284955432659594334a8 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 7 Oct 2017 15:55:11 +0200 Subject: [PATCH 2/2] wal: parallelize sample processing --- head.go | 109 ++++++++++++++++++++++++++++++++++++++++++-------------- wal.go | 2 +- 2 files changed, 83 insertions(+), 28 deletions(-) diff --git a/head.go b/head.go index 1347b75d9..22221ac73 100644 --- a/head.go +++ b/head.go @@ -15,6 +15,7 @@ package tsdb import ( "math" + "runtime" "sort" "sync" "sync/atomic" @@ -186,29 +187,19 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( return h, nil } -// ReadWAL initializes the head by consuming the write ahead log. -func (h *Head) ReadWAL() error { - defer h.postings.ensureOrder() +// processWALSamples adds a partition of samples it receives to the head and passes +// them on to other workers. +// Samples before the mint timestamp are discarded. +func (h *Head) processWALSamples( + mint int64, + partition, total uint64, + input <-chan []RefSample, output chan<- []RefSample, +) (unknownRefs uint64) { + defer close(output) - r := h.wal.Reader() - mint := h.MinTime() - - // Track number of samples that referenced a series we don't know about - // for error reporting. - var unknownRefs int - - seriesFunc := func(series []RefSeries) { - for _, s := range series { - h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) - - if h.lastSeriesID < s.Ref { - h.lastSeriesID = s.Ref - } - } - } - samplesFunc := func(samples []RefSample) { + for samples := range input { for _, s := range samples { - if s.T < mint { + if s.T < mint || s.Ref%total != partition { continue } ms := h.series.getByID(s.Ref) @@ -222,6 +213,63 @@ func (h *Head) ReadWAL() error { h.metrics.chunks.Inc() } } + output <- samples + } + return unknownRefs +} + +// ReadWAL initializes the head by consuming the write ahead log. +func (h *Head) ReadWAL() error { + defer h.postings.ensureOrder() + + r := h.wal.Reader() + mint := h.MinTime() + + // Track number of samples that referenced a series we don't know about + // for error reporting. + var unknownRefs uint64 + + // Start workers that each process samples for a partition of the series ID space. + // They are connected through a ring of channels which ensures that all sample batches + // read from the WAL are processed in order. + var ( + n = runtime.GOMAXPROCS(0) + firstInput = make(chan []RefSample, 300) + input = firstInput + ) + for i := 0; i < n; i++ { + output := make(chan []RefSample, 300) + + go func(i int, input <-chan []RefSample, output chan<- []RefSample) { + unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output) + atomic.AddUint64(&unknownRefs, unknown) + }(i, input, output) + + // The output feeds the next worker goroutine. For the last worker, + // it feeds the initial input again to reuse the RefSample slices. + input = output + } + + // TODO(fabxc): series entries spread between samples can starve the sample workers. + // Even with bufferd channels, this can impact startup time with lots of series churn. + // We must not pralellize series creation itself but could make the indexing asynchronous. + seriesFunc := func(series []RefSeries) { + for _, s := range series { + h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if h.lastSeriesID < s.Ref { + h.lastSeriesID = s.Ref + } + } + } + samplesFunc := func(samples []RefSample) { + var buf []RefSample + select { + case buf = <-input: + default: + buf = make([]RefSample, 0, len(samples)*11/10) + } + firstInput <- append(buf[:0], samples...) } deletesFunc := func(stones []Stone) { for _, s := range stones { @@ -234,13 +282,18 @@ func (h *Head) ReadWAL() error { } } + err := r.Read(seriesFunc, samplesFunc, deletesFunc) + + // Signal termination to first worker and wait for last one to close its output channel. + close(firstInput) + for range input { + } + if err != nil { + return errors.Wrap(err, "consume WAL") + } if unknownRefs > 0 { level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs) } - - if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { - return errors.Wrap(err, "consume WAL") - } return nil } @@ -1168,10 +1221,12 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c = s.cut(t) chunkCreated = true } + numSamples := c.chunk.NumSamples() + if c.maxTime >= t { return false, chunkCreated } - if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt { + if numSamples > samplesPerChunk/4 && t >= s.nextAt { c = s.cut(t) chunkCreated = true } @@ -1179,7 +1234,7 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c.maxTime = t - if c.chunk.NumSamples() == samplesPerChunk/4 { + if numSamples == samplesPerChunk/4 { _, maxt := rangeForTimestamp(c.minTime, s.chunkRange) s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt) } diff --git a/wal.go b/wal.go index 9dc45c608..850ed9c9f 100644 --- a/wal.go +++ b/wal.go @@ -846,7 +846,7 @@ func (r *walReader) Read( deletePool sync.Pool ) donec := make(chan struct{}) - datac := make(chan interface{}, 50) + datac := make(chan interface{}, 100) go func() { defer close(donec)