diff --git a/tsdb/db_test.go b/tsdb/db_test.go index a6e85f90c..063a90aba 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1013,6 +1013,92 @@ func TestWALSegmentSizeOptions(t *testing.T) { } } +// https://github.com/prometheus/prometheus/issues/9846 +// https://github.com/prometheus/prometheus/issues/9859 +func TestWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T) { + const ( + numRuns = 1 + numSamplesBeforeSeriesCreation = 1000 + ) + + // We test both with few and many samples appended after series creation. If samples are < 120 then there's no + // mmap-ed chunk, otherwise there's at least 1 mmap-ed chunk when replaying the WAL. + for _, numSamplesAfterSeriesCreation := range []int{1, 1000} { + for run := 1; run <= numRuns; run++ { + t.Run(fmt.Sprintf("samples after series creation = %d, run = %d", numSamplesAfterSeriesCreation, run), func(t *testing.T) { + testWALReplayRaceOnSamplesLoggedBeforeSeries(t, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation) + }) + } + } +} + +func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation int) { + const numSeries = 1000 + + db := openTestDB(t, nil, nil) + db.DisableCompactions() + + for seriesRef := 1; seriesRef <= numSeries; seriesRef++ { + // Log samples before the series is logged to the WAL. + var enc record.Encoder + var samples []record.RefSample + + for ts := 0; ts < numSamplesBeforeSeriesCreation; ts++ { + samples = append(samples, record.RefSample{ + Ref: chunks.HeadSeriesRef(uint64(seriesRef)), + T: int64(ts), + V: float64(ts), + }) + } + + err := db.Head().wal.Log(enc.Samples(samples, nil)) + require.NoError(t, err) + + // Add samples via appender so that they're logged after the series in the WAL. + app := db.Appender(context.Background()) + lbls := labels.FromStrings("series_id", strconv.Itoa(seriesRef)) + + for ts := numSamplesBeforeSeriesCreation; ts < numSamplesBeforeSeriesCreation+numSamplesAfterSeriesCreation; ts++ { + _, err := app.Append(0, lbls, int64(ts), float64(ts)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + require.NoError(t, db.Close()) + + // Reopen the DB, replaying the WAL. + reopenDB, err := Open(db.Dir(), log.NewLogfmtLogger(os.Stderr), nil, nil, nil) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, reopenDB.Close()) + }) + + // Query back chunks for all series. + q, err := reopenDB.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64) + require.NoError(t, err) + + set := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+")) + actualSeries := 0 + + for set.Next() { + actualSeries++ + actualChunks := 0 + + chunksIt := set.At().Iterator() + for chunksIt.Next() { + actualChunks++ + } + require.NoError(t, chunksIt.Err()) + + // We expect 1 chunk every 120 samples after series creation. + require.Equalf(t, (numSamplesAfterSeriesCreation/120)+1, actualChunks, "series: %s", set.At().Labels().String()) + } + + require.NoError(t, set.Err()) + require.Equal(t, numSeries, actualSeries) +} + func TestTombstoneClean(t *testing.T) { numSamples := int64(10) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 5d5c995b3..819128493 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -51,13 +51,10 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H var mmapOverlappingChunks uint64 // Start workers that each process samples for a partition of the series ID space. - // They are connected through a ring of channels which ensures that all sample batches - // read from the WAL are processed in order. var ( wg sync.WaitGroup n = runtime.GOMAXPROCS(0) - inputs = make([]chan []record.RefSample, n) - outputs = make([]chan []record.RefSample, n) + processors = make([]walSubsetProcessor, n) exemplarsInput chan record.RefExemplar dec record.Decoder @@ -92,9 +89,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H _, ok := err.(*wal.CorruptionErr) if ok || seriesCreationErr != nil { for i := 0; i < n; i++ { - close(inputs[i]) - for range outputs[i] { - } + processors[i].closeAndDrain() } close(exemplarsInput) wg.Wait() @@ -103,14 +98,13 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H wg.Add(n) for i := 0; i < n; i++ { - outputs[i] = make(chan []record.RefSample, 300) - inputs[i] = make(chan []record.RefSample, 300) + processors[i].setup() - go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { - unknown := h.processWALSamples(h.minValidTime.Load(), input, output) + go func(wp *walSubsetProcessor) { + unknown := wp.processWALSamples(h) unknownRefs.Add(unknown) wg.Done() - }(inputs[i], outputs[i]) + }(&processors[i]) } wg.Add(1) @@ -212,11 +206,20 @@ Outer: h.lastSeriesID.Store(uint64(walSeries.Ref)) } + idx := uint64(mSeries.ref) % uint64(n) + // It is possible that some old sample is being processed in processWALSamples that + // could cause race below. So we wait for the goroutine to empty input the buffer and finish + // processing all old samples after emptying the buffer. + processors[idx].waitUntilIdle() + // Lock the subset so we can modify the series object + processors[idx].mx.Lock() + mmc := mmappedChunks[walSeries.Ref] if created { // This is the first WAL series record for this series. - h.setMMappedChunks(mSeries, mmc) + h.resetSeriesWithMMappedChunks(mSeries, mmc) + processors[idx].mx.Unlock() continue } @@ -226,23 +229,6 @@ Outer: multiRef[walSeries.Ref] = mSeries.ref - idx := uint64(mSeries.ref) % uint64(n) - // It is possible that some old sample is being processed in processWALSamples that - // could cause race below. So we wait for the goroutine to empty input the buffer and finish - // processing all old samples after emptying the buffer. - select { - case <-outputs[idx]: // allow output side to drain to avoid deadlock - default: - } - inputs[idx] <- []record.RefSample{} - for len(inputs[idx]) != 0 { - time.Sleep(1 * time.Millisecond) - select { - case <-outputs[idx]: // allow output side to drain to avoid deadlock - default: - } - } - // Checking if the new m-mapped chunks overlap with the already existing ones. if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 { if overlapsClosedInterval( @@ -266,12 +252,9 @@ Outer: } // Replacing m-mapped chunks with the new ones (could be empty). - h.setMMappedChunks(mSeries, mmc) + h.resetSeriesWithMMappedChunks(mSeries, mmc) - // Any samples replayed till now would already be compacted. Resetting the head chunk. - mSeries.nextAt = 0 - mSeries.headChunk = nil - mSeries.app = nil + processors[idx].mx.Unlock() } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. seriesPool.Put(v) @@ -287,12 +270,7 @@ Outer: m = len(samples) } for i := 0; i < n; i++ { - var buf []record.RefSample - select { - case buf = <-outputs[i]: - default: - } - shards[i] = buf[:0] + shards[i] = processors[i].reuseBuf() } for _, sam := range samples[:m] { if r, ok := multiRef[sam.Ref]; ok { @@ -302,7 +280,7 @@ Outer: shards[mod] = append(shards[mod], sam) } for i := 0; i < n; i++ { - inputs[i] <- shards[i] + processors[i].input <- shards[i] } samples = samples[m:] } @@ -346,9 +324,7 @@ Outer: // Signal termination to each worker and wait for it to close its output channel. for i := 0; i < n; i++ { - close(inputs[i]) - for range outputs[i] { - } + processors[i].closeAndDrain() } close(exemplarsInput) wg.Wait() @@ -366,7 +342,8 @@ Outer: return nil } -func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) { +// resetSeriesWithMMappedChunks is only used during the WAL replay. +func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) { h.metrics.chunksCreated.Add(float64(len(mmc))) h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks))) @@ -378,20 +355,51 @@ func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) { mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime) } + + // Any samples replayed till now would already be compacted. Resetting the head chunk. + mSeries.nextAt = 0 + mSeries.headChunk = nil + mSeries.app = nil +} + +type walSubsetProcessor struct { + mx sync.Mutex // Take this lock while modifying series in the subset. + input chan []record.RefSample + output chan []record.RefSample +} + +func (wp *walSubsetProcessor) setup() { + wp.output = make(chan []record.RefSample, 300) + wp.input = make(chan []record.RefSample, 300) +} + +func (wp *walSubsetProcessor) closeAndDrain() { + close(wp.input) + for range wp.output { + } +} + +// If there is a buffer in the output chan, return it for reuse, otherwise return nil. +func (wp *walSubsetProcessor) reuseBuf() []record.RefSample { + select { + case buf := <-wp.output: + return buf[:0] + default: + } + return nil } // processWALSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. // Samples before the minValidTime timestamp are discarded. -func (h *Head) processWALSamples( - minValidTime int64, - input <-chan []record.RefSample, output chan<- []record.RefSample, -) (unknownRefs uint64) { - defer close(output) +func (wp *walSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) { + defer close(wp.output) + minValidTime := h.minValidTime.Load() mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) - for samples := range input { + for samples := range wp.input { + wp.mx.Lock() for _, s := range samples { if s.T < minValidTime { continue @@ -415,13 +423,29 @@ func (h *Head) processWALSamples( mint = s.T } } - output <- samples + wp.mx.Unlock() + wp.output <- samples } h.updateMinMaxTime(mint, maxt) return unknownRefs } +func (wp *walSubsetProcessor) waitUntilIdle() { + select { + case <-wp.output: // Allow output side to drain to avoid deadlock. + default: + } + wp.input <- []record.RefSample{} + for len(wp.input) != 0 { + time.Sleep(1 * time.Millisecond) + select { + case <-wp.output: // Allow output side to drain to avoid deadlock. + default: + } + } +} + const ( chunkSnapshotRecordTypeSeries uint8 = 1 chunkSnapshotRecordTypeTombstones uint8 = 2