From 7efb830d7021df8b24eb284955432659594334a8 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 7 Oct 2017 15:55:11 +0200 Subject: [PATCH] wal: parallelize sample processing --- head.go | 109 ++++++++++++++++++++++++++++++++++++++++++-------------- wal.go | 2 +- 2 files changed, 83 insertions(+), 28 deletions(-) diff --git a/head.go b/head.go index 1347b75d91..22221ac73a 100644 --- a/head.go +++ b/head.go @@ -15,6 +15,7 @@ package tsdb import ( "math" + "runtime" "sort" "sync" "sync/atomic" @@ -186,29 +187,19 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( return h, nil } -// ReadWAL initializes the head by consuming the write ahead log. -func (h *Head) ReadWAL() error { - defer h.postings.ensureOrder() +// processWALSamples adds a partition of samples it receives to the head and passes +// them on to other workers. +// Samples before the mint timestamp are discarded. +func (h *Head) processWALSamples( + mint int64, + partition, total uint64, + input <-chan []RefSample, output chan<- []RefSample, +) (unknownRefs uint64) { + defer close(output) - r := h.wal.Reader() - mint := h.MinTime() - - // Track number of samples that referenced a series we don't know about - // for error reporting. - var unknownRefs int - - seriesFunc := func(series []RefSeries) { - for _, s := range series { - h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) - - if h.lastSeriesID < s.Ref { - h.lastSeriesID = s.Ref - } - } - } - samplesFunc := func(samples []RefSample) { + for samples := range input { for _, s := range samples { - if s.T < mint { + if s.T < mint || s.Ref%total != partition { continue } ms := h.series.getByID(s.Ref) @@ -222,6 +213,63 @@ func (h *Head) ReadWAL() error { h.metrics.chunks.Inc() } } + output <- samples + } + return unknownRefs +} + +// ReadWAL initializes the head by consuming the write ahead log. +func (h *Head) ReadWAL() error { + defer h.postings.ensureOrder() + + r := h.wal.Reader() + mint := h.MinTime() + + // Track number of samples that referenced a series we don't know about + // for error reporting. + var unknownRefs uint64 + + // Start workers that each process samples for a partition of the series ID space. + // They are connected through a ring of channels which ensures that all sample batches + // read from the WAL are processed in order. + var ( + n = runtime.GOMAXPROCS(0) + firstInput = make(chan []RefSample, 300) + input = firstInput + ) + for i := 0; i < n; i++ { + output := make(chan []RefSample, 300) + + go func(i int, input <-chan []RefSample, output chan<- []RefSample) { + unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output) + atomic.AddUint64(&unknownRefs, unknown) + }(i, input, output) + + // The output feeds the next worker goroutine. For the last worker, + // it feeds the initial input again to reuse the RefSample slices. + input = output + } + + // TODO(fabxc): series entries spread between samples can starve the sample workers. + // Even with bufferd channels, this can impact startup time with lots of series churn. + // We must not pralellize series creation itself but could make the indexing asynchronous. + seriesFunc := func(series []RefSeries) { + for _, s := range series { + h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if h.lastSeriesID < s.Ref { + h.lastSeriesID = s.Ref + } + } + } + samplesFunc := func(samples []RefSample) { + var buf []RefSample + select { + case buf = <-input: + default: + buf = make([]RefSample, 0, len(samples)*11/10) + } + firstInput <- append(buf[:0], samples...) } deletesFunc := func(stones []Stone) { for _, s := range stones { @@ -234,13 +282,18 @@ func (h *Head) ReadWAL() error { } } + err := r.Read(seriesFunc, samplesFunc, deletesFunc) + + // Signal termination to first worker and wait for last one to close its output channel. + close(firstInput) + for range input { + } + if err != nil { + return errors.Wrap(err, "consume WAL") + } if unknownRefs > 0 { level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs) } - - if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { - return errors.Wrap(err, "consume WAL") - } return nil } @@ -1168,10 +1221,12 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c = s.cut(t) chunkCreated = true } + numSamples := c.chunk.NumSamples() + if c.maxTime >= t { return false, chunkCreated } - if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt { + if numSamples > samplesPerChunk/4 && t >= s.nextAt { c = s.cut(t) chunkCreated = true } @@ -1179,7 +1234,7 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c.maxTime = t - if c.chunk.NumSamples() == samplesPerChunk/4 { + if numSamples == samplesPerChunk/4 { _, maxt := rangeForTimestamp(c.minTime, s.chunkRange) s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt) } diff --git a/wal.go b/wal.go index 9dc45c608a..850ed9c9fc 100644 --- a/wal.go +++ b/wal.go @@ -846,7 +846,7 @@ func (r *walReader) Read( deletePool sync.Pool ) donec := make(chan struct{}) - datac := make(chan interface{}, 50) + datac := make(chan interface{}, 100) go func() { defer close(donec)