Fix panic, out of order chunks, and race warning during WAL replay (#9856)

* Fix panic on WAL replay

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Refactor: introduce walSubsetProcessor

walSubsetProcessor packages up the `processWALSamples()` function and
its input and output channels, helping to clarify how these things
relate.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Refactor: extract more methods onto walSubsetProcessor

This makes the main logic easier to follow.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Fix race warning by locking processWALSamples

Although we have waited for the processor to finish, we still get a
warning from the race detector because it doesn't know how the different
parts relate.

Add a lock round each batch of samples, so the race detector can see
that we never access series owned by the processor outside of a lock.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Added test to reproduce issue 9859

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Remove redundant unit test

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix out of order chunks during WAL replay

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix nits

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
Co-authored-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
Bryan Boreham 2021-11-25 08:06:14 +00:00 committed by GitHub
parent 427425cc9f
commit 1b74a3812e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 163 additions and 53 deletions

View file

@ -1013,6 +1013,92 @@ func TestWALSegmentSizeOptions(t *testing.T) {
} }
} }
// https://github.com/prometheus/prometheus/issues/9846
// https://github.com/prometheus/prometheus/issues/9859
func TestWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T) {
const (
numRuns = 1
numSamplesBeforeSeriesCreation = 1000
)
// We test both with few and many samples appended after series creation. If samples are < 120 then there's no
// mmap-ed chunk, otherwise there's at least 1 mmap-ed chunk when replaying the WAL.
for _, numSamplesAfterSeriesCreation := range []int{1, 1000} {
for run := 1; run <= numRuns; run++ {
t.Run(fmt.Sprintf("samples after series creation = %d, run = %d", numSamplesAfterSeriesCreation, run), func(t *testing.T) {
testWALReplayRaceOnSamplesLoggedBeforeSeries(t, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation)
})
}
}
}
func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation int) {
const numSeries = 1000
db := openTestDB(t, nil, nil)
db.DisableCompactions()
for seriesRef := 1; seriesRef <= numSeries; seriesRef++ {
// Log samples before the series is logged to the WAL.
var enc record.Encoder
var samples []record.RefSample
for ts := 0; ts < numSamplesBeforeSeriesCreation; ts++ {
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(uint64(seriesRef)),
T: int64(ts),
V: float64(ts),
})
}
err := db.Head().wal.Log(enc.Samples(samples, nil))
require.NoError(t, err)
// Add samples via appender so that they're logged after the series in the WAL.
app := db.Appender(context.Background())
lbls := labels.FromStrings("series_id", strconv.Itoa(seriesRef))
for ts := numSamplesBeforeSeriesCreation; ts < numSamplesBeforeSeriesCreation+numSamplesAfterSeriesCreation; ts++ {
_, err := app.Append(0, lbls, int64(ts), float64(ts))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
require.NoError(t, db.Close())
// Reopen the DB, replaying the WAL.
reopenDB, err := Open(db.Dir(), log.NewLogfmtLogger(os.Stderr), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, reopenDB.Close())
})
// Query back chunks for all series.
q, err := reopenDB.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64)
require.NoError(t, err)
set := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+"))
actualSeries := 0
for set.Next() {
actualSeries++
actualChunks := 0
chunksIt := set.At().Iterator()
for chunksIt.Next() {
actualChunks++
}
require.NoError(t, chunksIt.Err())
// We expect 1 chunk every 120 samples after series creation.
require.Equalf(t, (numSamplesAfterSeriesCreation/120)+1, actualChunks, "series: %s", set.At().Labels().String())
}
require.NoError(t, set.Err())
require.Equal(t, numSeries, actualSeries)
}
func TestTombstoneClean(t *testing.T) { func TestTombstoneClean(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)

View file

@ -51,13 +51,10 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
var mmapOverlappingChunks uint64 var mmapOverlappingChunks uint64
// Start workers that each process samples for a partition of the series ID space. // Start workers that each process samples for a partition of the series ID space.
// They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order.
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
n = runtime.GOMAXPROCS(0) n = runtime.GOMAXPROCS(0)
inputs = make([]chan []record.RefSample, n) processors = make([]walSubsetProcessor, n)
outputs = make([]chan []record.RefSample, n)
exemplarsInput chan record.RefExemplar exemplarsInput chan record.RefExemplar
dec record.Decoder dec record.Decoder
@ -92,9 +89,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
_, ok := err.(*wal.CorruptionErr) _, ok := err.(*wal.CorruptionErr)
if ok || seriesCreationErr != nil { if ok || seriesCreationErr != nil {
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
close(inputs[i]) processors[i].closeAndDrain()
for range outputs[i] {
}
} }
close(exemplarsInput) close(exemplarsInput)
wg.Wait() wg.Wait()
@ -103,14 +98,13 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
wg.Add(n) wg.Add(n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
outputs[i] = make(chan []record.RefSample, 300) processors[i].setup()
inputs[i] = make(chan []record.RefSample, 300)
go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { go func(wp *walSubsetProcessor) {
unknown := h.processWALSamples(h.minValidTime.Load(), input, output) unknown := wp.processWALSamples(h)
unknownRefs.Add(unknown) unknownRefs.Add(unknown)
wg.Done() wg.Done()
}(inputs[i], outputs[i]) }(&processors[i])
} }
wg.Add(1) wg.Add(1)
@ -212,11 +206,20 @@ Outer:
h.lastSeriesID.Store(uint64(walSeries.Ref)) h.lastSeriesID.Store(uint64(walSeries.Ref))
} }
idx := uint64(mSeries.ref) % uint64(n)
// It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
processors[idx].waitUntilIdle()
// Lock the subset so we can modify the series object
processors[idx].mx.Lock()
mmc := mmappedChunks[walSeries.Ref] mmc := mmappedChunks[walSeries.Ref]
if created { if created {
// This is the first WAL series record for this series. // This is the first WAL series record for this series.
h.setMMappedChunks(mSeries, mmc) h.resetSeriesWithMMappedChunks(mSeries, mmc)
processors[idx].mx.Unlock()
continue continue
} }
@ -226,23 +229,6 @@ Outer:
multiRef[walSeries.Ref] = mSeries.ref multiRef[walSeries.Ref] = mSeries.ref
idx := uint64(mSeries.ref) % uint64(n)
// It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
select {
case <-outputs[idx]: // allow output side to drain to avoid deadlock
default:
}
inputs[idx] <- []record.RefSample{}
for len(inputs[idx]) != 0 {
time.Sleep(1 * time.Millisecond)
select {
case <-outputs[idx]: // allow output side to drain to avoid deadlock
default:
}
}
// Checking if the new m-mapped chunks overlap with the already existing ones. // Checking if the new m-mapped chunks overlap with the already existing ones.
if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 { if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 {
if overlapsClosedInterval( if overlapsClosedInterval(
@ -266,12 +252,9 @@ Outer:
} }
// Replacing m-mapped chunks with the new ones (could be empty). // Replacing m-mapped chunks with the new ones (could be empty).
h.setMMappedChunks(mSeries, mmc) h.resetSeriesWithMMappedChunks(mSeries, mmc)
// Any samples replayed till now would already be compacted. Resetting the head chunk. processors[idx].mx.Unlock()
mSeries.nextAt = 0
mSeries.headChunk = nil
mSeries.app = nil
} }
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. //nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
seriesPool.Put(v) seriesPool.Put(v)
@ -287,12 +270,7 @@ Outer:
m = len(samples) m = len(samples)
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
var buf []record.RefSample shards[i] = processors[i].reuseBuf()
select {
case buf = <-outputs[i]:
default:
}
shards[i] = buf[:0]
} }
for _, sam := range samples[:m] { for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok { if r, ok := multiRef[sam.Ref]; ok {
@ -302,7 +280,7 @@ Outer:
shards[mod] = append(shards[mod], sam) shards[mod] = append(shards[mod], sam)
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
inputs[i] <- shards[i] processors[i].input <- shards[i]
} }
samples = samples[m:] samples = samples[m:]
} }
@ -346,9 +324,7 @@ Outer:
// Signal termination to each worker and wait for it to close its output channel. // Signal termination to each worker and wait for it to close its output channel.
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
close(inputs[i]) processors[i].closeAndDrain()
for range outputs[i] {
}
} }
close(exemplarsInput) close(exemplarsInput)
wg.Wait() wg.Wait()
@ -366,7 +342,8 @@ Outer:
return nil return nil
} }
func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) { // resetSeriesWithMMappedChunks is only used during the WAL replay.
func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) {
h.metrics.chunksCreated.Add(float64(len(mmc))) h.metrics.chunksCreated.Add(float64(len(mmc)))
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks))) h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks)))
@ -378,20 +355,51 @@ func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) {
mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime
h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime) h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime)
} }
// Any samples replayed till now would already be compacted. Resetting the head chunk.
mSeries.nextAt = 0
mSeries.headChunk = nil
mSeries.app = nil
}
type walSubsetProcessor struct {
mx sync.Mutex // Take this lock while modifying series in the subset.
input chan []record.RefSample
output chan []record.RefSample
}
func (wp *walSubsetProcessor) setup() {
wp.output = make(chan []record.RefSample, 300)
wp.input = make(chan []record.RefSample, 300)
}
func (wp *walSubsetProcessor) closeAndDrain() {
close(wp.input)
for range wp.output {
}
}
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
func (wp *walSubsetProcessor) reuseBuf() []record.RefSample {
select {
case buf := <-wp.output:
return buf[:0]
default:
}
return nil
} }
// processWALSamples adds the samples it receives to the head and passes // processWALSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse. // the buffer received to an output channel for reuse.
// Samples before the minValidTime timestamp are discarded. // Samples before the minValidTime timestamp are discarded.
func (h *Head) processWALSamples( func (wp *walSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) {
minValidTime int64, defer close(wp.output)
input <-chan []record.RefSample, output chan<- []record.RefSample,
) (unknownRefs uint64) {
defer close(output)
minValidTime := h.minValidTime.Load()
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range input { for samples := range wp.input {
wp.mx.Lock()
for _, s := range samples { for _, s := range samples {
if s.T < minValidTime { if s.T < minValidTime {
continue continue
@ -415,13 +423,29 @@ func (h *Head) processWALSamples(
mint = s.T mint = s.T
} }
} }
output <- samples wp.mx.Unlock()
wp.output <- samples
} }
h.updateMinMaxTime(mint, maxt) h.updateMinMaxTime(mint, maxt)
return unknownRefs return unknownRefs
} }
func (wp *walSubsetProcessor) waitUntilIdle() {
select {
case <-wp.output: // Allow output side to drain to avoid deadlock.
default:
}
wp.input <- []record.RefSample{}
for len(wp.input) != 0 {
time.Sleep(1 * time.Millisecond)
select {
case <-wp.output: // Allow output side to drain to avoid deadlock.
default:
}
}
}
const ( const (
chunkSnapshotRecordTypeSeries uint8 = 1 chunkSnapshotRecordTypeSeries uint8 = 1
chunkSnapshotRecordTypeTombstones uint8 = 2 chunkSnapshotRecordTypeTombstones uint8 = 2