Do WBL mmap marker replay concurrently (#12801)

* Benchmark WBL

Extended WAL benchmark test with WBL parts too - added basic cases for
OOO handling - a percentage of series have a percentage of samples set
as OOO ones.

Signed-off-by: Fiona Liao <fiona.y.liao@gmail.com>
This commit is contained in:
Fiona Liao 2023-09-12 20:31:10 +01:00 committed by GitHub
parent d3a1044354
commit 4419399e4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 127 additions and 54 deletions

View file

@ -135,7 +135,7 @@ func BenchmarkHeadAppender_Append_Commit_ExistingSeries(b *testing.B) {
} }
} }
func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) { func populateTestWL(t testing.TB, w *wlog.WL, recs []interface{}) {
var enc record.Encoder var enc record.Encoder
for _, r := range recs { for _, r := range recs {
switch v := r.(type) { switch v := r.(type) {
@ -147,6 +147,8 @@ func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) {
require.NoError(t, w.Log(enc.Tombstones(v, nil))) require.NoError(t, w.Log(enc.Tombstones(v, nil)))
case []record.RefExemplar: case []record.RefExemplar:
require.NoError(t, w.Log(enc.Exemplars(v, nil))) require.NoError(t, w.Log(enc.Exemplars(v, nil)))
case []record.RefMmapMarker:
require.NoError(t, w.Log(enc.MmapMarkers(v, nil)))
} }
} }
} }
@ -197,13 +199,18 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
return recs return recs
} }
func BenchmarkLoadWAL(b *testing.B) { func BenchmarkLoadWLs(b *testing.B) {
cases := []struct { cases := []struct {
// Total series is (batches*seriesPerBatch). // Total series is (batches*seriesPerBatch).
batches int batches int
seriesPerBatch int seriesPerBatch int
samplesPerSeries int samplesPerSeries int
mmappedChunkT int64 mmappedChunkT int64
// The first oooSeriesPct*seriesPerBatch series in a batch are selected as "OOO" series.
oooSeriesPct float64
// The first oooSamplesPct*samplesPerSeries samples in an OOO series are written as OOO samples.
oooSamplesPct float64
oooCapMax int64
}{ }{
{ // Less series and more samples. 2 hour WAL with 1 second scrape interval. { // Less series and more samples. 2 hour WAL with 1 second scrape interval.
batches: 10, batches: 10,
@ -226,6 +233,31 @@ func BenchmarkLoadWAL(b *testing.B) {
samplesPerSeries: 480, samplesPerSeries: 480,
mmappedChunkT: 3800, mmappedChunkT: 3800,
}, },
{ // A lot of OOO samples (50% series with 50% of samples being OOO).
batches: 10,
seriesPerBatch: 1000,
samplesPerSeries: 480,
oooSeriesPct: 0.5,
oooSamplesPct: 0.5,
oooCapMax: DefaultOutOfOrderCapMax,
},
{ // Fewer OOO samples (10% of series with 10% of samples being OOO).
batches: 10,
seriesPerBatch: 1000,
samplesPerSeries: 480,
oooSeriesPct: 0.1,
oooSamplesPct: 0.1,
},
{ // 2 hour WAL with 15 second scrape interval, and mmapped chunks up to last 100 samples.
// Four mmap markers per OOO series: 480 * 0.3 = 144, 144 / 32 (DefaultOutOfOrderCapMax) = 4.
batches: 100,
seriesPerBatch: 1000,
samplesPerSeries: 480,
mmappedChunkT: 3800,
oooSeriesPct: 0.2,
oooSamplesPct: 0.3,
oooCapMax: DefaultOutOfOrderCapMax,
},
} }
labelsPerSeries := 5 labelsPerSeries := 5
@ -241,12 +273,17 @@ func BenchmarkLoadWAL(b *testing.B) {
continue continue
} }
lastExemplarsPerSeries = exemplarsPerSeries lastExemplarsPerSeries = exemplarsPerSeries
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT), b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d,oooSeriesPct=%.3f,oooSamplesPct=%.3f,oooCapMax=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax),
func(b *testing.B) { func(b *testing.B) {
dir := b.TempDir() dir := b.TempDir()
w, err := wlog.New(nil, nil, dir, wlog.CompressionNone) wal, err := wlog.New(nil, nil, dir, wlog.CompressionNone)
require.NoError(b, err) require.NoError(b, err)
var wbl *wlog.WL
if c.oooSeriesPct != 0 {
wbl, err = wlog.New(nil, nil, dir, wlog.CompressionNone)
require.NoError(b, err)
}
// Write series. // Write series.
refSeries := make([]record.RefSeries, 0, c.seriesPerBatch) refSeries := make([]record.RefSeries, 0, c.seriesPerBatch)
@ -260,22 +297,33 @@ func BenchmarkLoadWAL(b *testing.B) {
} }
refSeries = append(refSeries, record.RefSeries{Ref: chunks.HeadSeriesRef(i) * 101, Labels: labels.FromMap(lbls)}) refSeries = append(refSeries, record.RefSeries{Ref: chunks.HeadSeriesRef(i) * 101, Labels: labels.FromMap(lbls)})
} }
populateTestWAL(b, w, []interface{}{refSeries}) populateTestWL(b, wal, []interface{}{refSeries})
} }
// Write samples. // Write samples.
refSamples := make([]record.RefSample, 0, c.seriesPerBatch) refSamples := make([]record.RefSample, 0, c.seriesPerBatch)
oooSeriesPerBatch := int(float64(c.seriesPerBatch) * c.oooSeriesPct)
oooSamplesPerSeries := int(float64(c.samplesPerSeries) * c.oooSamplesPct)
for i := 0; i < c.samplesPerSeries; i++ { for i := 0; i < c.samplesPerSeries; i++ {
for j := 0; j < c.batches; j++ { for j := 0; j < c.batches; j++ {
refSamples = refSamples[:0] refSamples = refSamples[:0]
for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
k := j * c.seriesPerBatch
// Skip appending the first oooSamplesPerSeries samples for the series in the batch that
// should have OOO samples. OOO samples are appended after all the in-order samples.
if i < oooSamplesPerSeries {
k += oooSeriesPerBatch
}
for ; k < (j+1)*c.seriesPerBatch; k++ {
refSamples = append(refSamples, record.RefSample{ refSamples = append(refSamples, record.RefSample{
Ref: chunks.HeadSeriesRef(k) * 101, Ref: chunks.HeadSeriesRef(k) * 101,
T: int64(i) * 10, T: int64(i) * 10,
V: float64(i) * 100, V: float64(i) * 100,
}) })
} }
populateTestWAL(b, w, []interface{}{refSamples}) populateTestWL(b, wal, []interface{}{refSamples})
} }
} }
@ -292,6 +340,10 @@ func BenchmarkLoadWAL(b *testing.B) {
// Create one mmapped chunk per series, with one sample at the given time. // Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled) s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled)
s.append(c.mmappedChunkT, 42, 0, cOpts) s.append(c.mmappedChunkT, 42, 0, cOpts)
// There's only one head chunk because only a single sample is appended. mmapChunks()
// ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with
// the sample at c.mmappedChunkT is mmapped.
s.cutNewHeadChunk(c.mmappedChunkT, chunkenc.EncXOR, c.mmappedChunkT)
s.mmapChunks(chunkDiskMapper) s.mmapChunks(chunkDiskMapper)
} }
require.NoError(b, chunkDiskMapper.Close()) require.NoError(b, chunkDiskMapper.Close())
@ -310,7 +362,39 @@ func BenchmarkLoadWAL(b *testing.B) {
Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)), Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)),
}) })
} }
populateTestWAL(b, w, []interface{}{refExemplars}) populateTestWL(b, wal, []interface{}{refExemplars})
}
}
// Write OOO samples and mmap markers.
refMarkers := make([]record.RefMmapMarker, 0, oooSeriesPerBatch)
refSamples = make([]record.RefSample, 0, oooSeriesPerBatch)
for i := 0; i < oooSamplesPerSeries; i++ {
shouldAddMarkers := c.oooCapMax != 0 && i != 0 && int64(i)%c.oooCapMax == 0
for j := 0; j < c.batches; j++ {
refSamples = refSamples[:0]
if shouldAddMarkers {
refMarkers = refMarkers[:0]
}
for k := j * c.seriesPerBatch; k < (j*c.seriesPerBatch)+oooSeriesPerBatch; k++ {
ref := chunks.HeadSeriesRef(k) * 101
if shouldAddMarkers {
// loadWBL() checks that the marker's MmapRef is less than or equal to the ref
// for the last mmap chunk. Setting MmapRef to 0 to always pass that check.
refMarkers = append(refMarkers, record.RefMmapMarker{Ref: ref, MmapRef: 0})
}
refSamples = append(refSamples, record.RefSample{
Ref: ref,
T: int64(i) * 10,
V: float64(i) * 100,
})
}
if shouldAddMarkers {
populateTestWL(b, wbl, []interface{}{refMarkers})
}
populateTestWL(b, wal, []interface{}{refSamples})
populateTestWL(b, wbl, []interface{}{refSamples})
} }
} }
@ -320,13 +404,19 @@ func BenchmarkLoadWAL(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkRange = 1000 opts.ChunkRange = 1000
opts.ChunkDirRoot = w.Dir() opts.ChunkDirRoot = dir
h, err := NewHead(nil, nil, w, nil, opts, nil) if c.oooCapMax > 0 {
opts.OutOfOrderCapMax.Store(c.oooCapMax)
}
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
require.NoError(b, err) require.NoError(b, err)
h.Init(0) h.Init(0)
} }
b.StopTimer() b.StopTimer()
w.Close() wal.Close()
if wbl != nil {
wbl.Close()
}
}) })
} }
} }
@ -563,7 +653,7 @@ func TestHead_ReadWAL(t *testing.T) {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
}() }()
populateTestWAL(t, w, entries) populateTestWL(t, w, entries)
require.NoError(t, head.Init(math.MinInt64)) require.NoError(t, head.Init(math.MinInt64))
require.Equal(t, uint64(101), head.lastSeriesID.Load()) require.Equal(t, uint64(101), head.lastSeriesID.Load())
@ -1037,7 +1127,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
}() }()
populateTestWAL(t, w, entries) populateTestWL(t, w, entries)
require.NoError(t, head.Init(math.MinInt64)) require.NoError(t, head.Init(math.MinInt64))

View file

@ -762,7 +762,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
} }
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
if len(shards[i]) > 0 { if len(shards[i]) > 0 {
processors[i].input <- shards[i] processors[i].input <- wblSubsetProcessorInputItem{samples: shards[i]}
shards[i] = nil shards[i] = nil
} }
} }
@ -790,23 +790,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
continue continue
} }
idx := uint64(ms.ref) % uint64(concurrency) idx := uint64(ms.ref) % uint64(concurrency)
// It is possible that some old sample is being processed in processWALSamples that processors[idx].input <- wblSubsetProcessorInputItem{mmappedSeries: ms}
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
processors[idx].waitUntilIdle()
// Lock the subset so we can modify the series object
processors[idx].mx.Lock()
// All samples till now have been m-mapped. Hence clear out the headChunk.
// In case some samples slipped through and went into m-map chunks because of changed
// chunk size parameters, we are not taking care of that here.
// TODO(codesome): see if there is a way to avoid duplicate m-map chunks if
// the size of ooo chunk was reduced between restart.
if ms.ooo != nil {
ms.ooo.oooHeadChunk = nil
}
processors[idx].mx.Unlock()
} }
default: default:
panic(fmt.Errorf("unexpected decodedCh type: %T", d)) panic(fmt.Errorf("unexpected decodedCh type: %T", d))
@ -858,14 +842,18 @@ func isErrLoadOOOWal(err error) bool {
} }
type wblSubsetProcessor struct { type wblSubsetProcessor struct {
mx sync.Mutex // Take this lock while modifying series in the subset. input chan wblSubsetProcessorInputItem
input chan []record.RefSample
output chan []record.RefSample output chan []record.RefSample
} }
type wblSubsetProcessorInputItem struct {
mmappedSeries *memSeries
samples []record.RefSample
}
func (wp *wblSubsetProcessor) setup() { func (wp *wblSubsetProcessor) setup() {
wp.output = make(chan []record.RefSample, 300) wp.output = make(chan []record.RefSample, 300)
wp.input = make(chan []record.RefSample, 300) wp.input = make(chan wblSubsetProcessorInputItem, 300)
} }
func (wp *wblSubsetProcessor) closeAndDrain() { func (wp *wblSubsetProcessor) closeAndDrain() {
@ -892,9 +880,17 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
oooCapMax := h.opts.OutOfOrderCapMax.Load() oooCapMax := h.opts.OutOfOrderCapMax.Load()
// We don't check for minValidTime for ooo samples. // We don't check for minValidTime for ooo samples.
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range wp.input { for in := range wp.input {
wp.mx.Lock() if in.mmappedSeries != nil && in.mmappedSeries.ooo != nil {
for _, s := range samples { // All samples till now have been m-mapped. Hence clear out the headChunk.
// In case some samples slipped through and went into m-map chunks because of changed
// chunk size parameters, we are not taking care of that here.
// TODO(codesome): see if there is a way to avoid duplicate m-map chunks if
// the size of ooo chunk was reduced between restart.
in.mmappedSeries.ooo.oooHeadChunk = nil
continue
}
for _, s := range in.samples {
ms := h.series.getByID(s.Ref) ms := h.series.getByID(s.Ref)
if ms == nil { if ms == nil {
unknownRefs++ unknownRefs++
@ -914,8 +910,10 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
} }
} }
} }
wp.mx.Unlock() select {
case wp.output <- in.samples:
default:
}
} }
h.updateMinOOOMaxOOOTime(mint, maxt) h.updateMinOOOMaxOOOTime(mint, maxt)
@ -923,21 +921,6 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
return unknownRefs return unknownRefs
} }
func (wp *wblSubsetProcessor) waitUntilIdle() {
select {
case <-wp.output: // Allow output side to drain to avoid deadlock.
default:
}
wp.input <- []record.RefSample{}
for len(wp.input) != 0 {
time.Sleep(10 * time.Microsecond)
select {
case <-wp.output: // Allow output side to drain to avoid deadlock.
default:
}
}
}
const ( const (
chunkSnapshotRecordTypeSeries uint8 = 1 chunkSnapshotRecordTypeSeries uint8 = 1
chunkSnapshotRecordTypeTombstones uint8 = 2 chunkSnapshotRecordTypeTombstones uint8 = 2