Process MemPostings.Delete() with GOMAXPROCS workers

We are still seeing lock contention on MemPostings.mtx, and MemPostings.Delete() is by far the most expensive operation on that mutex.

This adds parallelism to that method, trying to reduce the amount of time we spend with the mutex held.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
Oleg Zaytsev 2024-09-25 10:38:47 +02:00
parent 5bd8988637
commit e196b977af
No known key found for this signature in database
GPG key ID: 7E9FE9FD48F512EF
2 changed files with 39 additions and 6 deletions

View file

@ -26,6 +26,7 @@ import (
"sync" "sync"
"github.com/bboreham/go-loser" "github.com/bboreham/go-loser"
"github.com/cespare/xxhash/v2"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
@ -293,6 +294,9 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
p.mtx.Lock() p.mtx.Lock()
defer p.mtx.Unlock() defer p.mtx.Unlock()
// Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it.
deleteLabelNames := make(chan string, len(p.m))
process := func(l labels.Label) { process := func(l labels.Label) {
orig := p.m[l.Name][l.Value] orig := p.m[l.Name][l.Value]
repl := make([]storage.SeriesRef, 0, len(orig)) repl := make([]storage.SeriesRef, 0, len(orig))
@ -305,17 +309,46 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
p.m[l.Name][l.Value] = repl p.m[l.Name][l.Value] = repl
} else { } else {
delete(p.m[l.Name], l.Value) delete(p.m[l.Name], l.Value)
// Delete the key if we removed all values.
if len(p.m[l.Name]) == 0 { if len(p.m[l.Name]) == 0 {
delete(p.m, l.Name) // Delete the key if we removed all values.
deleteLabelNames <- l.Name
} }
} }
} }
for l := range affected { // Create GOMAXPROCS workers.
wg := sync.WaitGroup{}
jobs := make([]chan labels.Label, runtime.GOMAXPROCS(0))
for i := range jobs {
jobs[i] = make(chan labels.Label, 128)
wg.Add(1)
go func(jobs chan labels.Label) {
defer wg.Done()
for l := range jobs {
process(l) process(l)
} }
process(allPostingsKey) }(jobs[i])
}
// Process all affected labels and the allPostingsKey.
for l := range affected {
j := int(xxhash.Sum64String(l.Name) % uint64(len(jobs)))
jobs[j] <- l
}
j := int(xxhash.Sum64String(allPostingsKey.Name) % uint64(len(jobs)))
jobs[j] <- allPostingsKey
// Close jobs channels and wait all workers to finish.
for i := range jobs {
close(jobs[i])
}
wg.Wait()
// Close deleteLabelNames channel and delete the label names requested.
close(deleteLabelNames)
for name := range deleteLabelNames {
delete(p.m, name)
}
} }
// Iter calls f for each postings list. It aborts if f returns an error and returns it. // Iter calls f for each postings list. It aborts if f returns an error and returns it.

View file

@ -1025,7 +1025,7 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
return s return s
} }
const total = 1e6 const total = 2e6
allSeries := [total]labels.Labels{} allSeries := [total]labels.Labels{}
nameValues := make([]string, 0, 100) nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ { for i := 0; i < total; i++ {