Optimise WAL loading by removing extra map and caching min-time (#9160)

* BenchmarkLoadWAL: close WAL after use

So that goroutines are stopped and resources released

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* BenchmarkLoadWAL: make series IDs co-prime with #workers

Series are distributed across workers by taking the modulus of the
ID with the number of workers, so multiples of 100 are a poor choice.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* BenchmarkLoadWAL: simulate mmapped chunks

Real Prometheus cuts chunks every 120 samples, then skips those samples
when re-reading the WAL. Simulate this by creating a single mapped chunk
for each series, since the max time is all the reader looks at.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Fix comment

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Remove series map from processWALSamples()

The locks that is commented to reduce contention in are now sharded
32,000 ways, so won't be contended. Removing the map saves memory and
goes just as fast.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* loadWAL: Cache the last mmapped chunk time

So we can skip calling append() for samples it will reject.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Improvements from code review

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Full stops and capitals on comments

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Cache max time in both places mmappedChunks is updated

Including refactor to extract function `setMMappedChunks`, to reduce
code duplication.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Update head min/max time when mmapped chunks added

This ensures we have the correct values if no WAL samples are added for
that series.

Note that `mSeries.maxTime()` was always `math.MinInt64` before, since
that function doesn't consider mmapped chunks.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2021-08-10 10:23:31 +01:00 committed by GitHub
parent 7407457243
commit 040ef175eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 23 deletions

View file

@ -1437,6 +1437,7 @@ type memSeries struct {
ref uint64 ref uint64
lset labels.Labels lset labels.Labels
mmappedChunks []*mmappedChunk mmappedChunks []*mmappedChunk
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
headChunk *memChunk headChunk *memChunk
chunkRange int64 chunkRange int64
firstChunkID int firstChunkID int

View file

@ -137,6 +137,7 @@ func BenchmarkLoadWAL(b *testing.B) {
batches int batches int
seriesPerBatch int seriesPerBatch int
samplesPerSeries int samplesPerSeries int
mmappedChunkT int64
}{ }{
{ // Less series and more samples. 2 hour WAL with 1 second scrape interval. { // Less series and more samples. 2 hour WAL with 1 second scrape interval.
batches: 10, batches: 10,
@ -153,6 +154,12 @@ func BenchmarkLoadWAL(b *testing.B) {
seriesPerBatch: 1000, seriesPerBatch: 1000,
samplesPerSeries: 480, samplesPerSeries: 480,
}, },
{ // 2 hour WAL with 15 second scrape interval, and mmapped chunks up to last 100 samples.
batches: 100,
seriesPerBatch: 1000,
samplesPerSeries: 480,
mmappedChunkT: 3800,
},
} }
labelsPerSeries := 5 labelsPerSeries := 5
@ -169,7 +176,7 @@ func BenchmarkLoadWAL(b *testing.B) {
} }
lastExemplarsPerSeries = exemplarsPerSeries lastExemplarsPerSeries = exemplarsPerSeries
// fmt.Println("exemplars per series: ", exemplarsPerSeries) // fmt.Println("exemplars per series: ", exemplarsPerSeries)
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries), b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT),
func(b *testing.B) { func(b *testing.B) {
dir, err := ioutil.TempDir("", "test_load_wal") dir, err := ioutil.TempDir("", "test_load_wal")
require.NoError(b, err) require.NoError(b, err)
@ -190,7 +197,7 @@ func BenchmarkLoadWAL(b *testing.B) {
for j := 1; len(lbls) < labelsPerSeries; j++ { for j := 1; len(lbls) < labelsPerSeries; j++ {
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
} }
refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 100, Labels: labels.FromMap(lbls)}) refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 101, Labels: labels.FromMap(lbls)})
} }
populateTestWAL(b, w, []interface{}{refSeries}) populateTestWAL(b, w, []interface{}{refSeries})
} }
@ -202,7 +209,7 @@ func BenchmarkLoadWAL(b *testing.B) {
refSamples = refSamples[:0] refSamples = refSamples[:0]
for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
refSamples = append(refSamples, record.RefSample{ refSamples = append(refSamples, record.RefSample{
Ref: uint64(k) * 100, Ref: uint64(k) * 101,
T: int64(i) * 10, T: int64(i) * 10,
V: float64(i) * 100, V: float64(i) * 100,
}) })
@ -211,14 +218,27 @@ func BenchmarkLoadWAL(b *testing.B) {
} }
} }
// Write samples. // Write mmapped chunks.
if c.mmappedChunkT != 0 {
chunkDiskMapper, err := chunks.NewChunkDiskMapper(mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
require.NoError(b, err)
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, uint64(k)*101, c.mmappedChunkT, nil)
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper)
s.mmapCurrentHeadChunk(chunkDiskMapper)
}
require.NoError(b, chunkDiskMapper.Close())
}
// Write exemplars.
refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch) refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch)
for i := 0; i < exemplarsPerSeries; i++ { for i := 0; i < exemplarsPerSeries; i++ {
for j := 0; j < c.batches; j++ { for j := 0; j < c.batches; j++ {
refExemplars = refExemplars[:0] refExemplars = refExemplars[:0]
for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
refExemplars = append(refExemplars, record.RefExemplar{ refExemplars = append(refExemplars, record.RefExemplar{
Ref: uint64(k) * 100, Ref: uint64(k) * 101,
T: int64(i) * 10, T: int64(i) * 10,
V: float64(i) * 100, V: float64(i) * 100,
Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)), Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)),
@ -239,6 +259,8 @@ func BenchmarkLoadWAL(b *testing.B) {
require.NoError(b, err) require.NoError(b, err)
h.Init(0) h.Init(0)
} }
b.StopTimer()
w.Close()
}) })
} }
} }

View file

@ -212,9 +212,7 @@ Outer:
if created { if created {
// This is the first WAL series record for this series. // This is the first WAL series record for this series.
h.metrics.chunksCreated.Add(float64(len(mmc))) h.setMMappedChunks(mSeries, mmc)
h.metrics.chunks.Add(float64(len(mmc)))
mSeries.mmappedChunks = mmc
continue continue
} }
@ -258,16 +256,12 @@ Outer:
} }
// Replacing m-mapped chunks with the new ones (could be empty). // Replacing m-mapped chunks with the new ones (could be empty).
h.metrics.chunksCreated.Add(float64(len(mmc))) h.setMMappedChunks(mSeries, mmc)
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks)))
mSeries.mmappedChunks = mmc
// Any samples replayed till now would already be compacted. Resetting the head chunk. // Any samples replayed till now would already be compacted. Resetting the head chunk.
mSeries.nextAt = 0 mSeries.nextAt = 0
mSeries.headChunk = nil mSeries.headChunk = nil
mSeries.app = nil mSeries.app = nil
h.updateMinMaxTime(mSeries.minTime(), mSeries.maxTime())
} }
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. //nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
seriesPool.Put(v) seriesPool.Put(v)
@ -359,6 +353,20 @@ Outer:
return nil return nil
} }
func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) {
h.metrics.chunksCreated.Add(float64(len(mmc)))
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks)))
mSeries.mmappedChunks = mmc
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
if len(mmc) == 0 {
mSeries.mmMaxTime = math.MinInt64
} else {
mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime
h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime)
}
}
// processWALSamples adds the samples it receives to the head and passes // processWALSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse. // the buffer received to an output channel for reuse.
// Samples before the minValidTime timestamp are discarded. // Samples before the minValidTime timestamp are discarded.
@ -368,9 +376,6 @@ func (h *Head) processWALSamples(
) (unknownRefs uint64) { ) (unknownRefs uint64) {
defer close(output) defer close(output)
// Mitigate lock contention in getByID.
refSeries := map[uint64]*memSeries{}
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range input { for samples := range input {
@ -378,14 +383,13 @@ func (h *Head) processWALSamples(
if s.T < minValidTime { if s.T < minValidTime {
continue continue
} }
ms := refSeries[s.Ref] ms := h.series.getByID(s.Ref)
if ms == nil {
ms = h.series.getByID(s.Ref)
if ms == nil { if ms == nil {
unknownRefs++ unknownRefs++
continue continue
} }
refSeries[s.Ref] = ms if s.T <= ms.mmMaxTime {
continue
} }
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated {
h.metrics.chunksCreated.Inc() h.metrics.chunksCreated.Inc()