diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index bfe74c323..25780e4ad 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -26,6 +26,7 @@ import ( "sync" "github.com/bboreham/go-loser" + "github.com/cespare/xxhash/v2" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -293,6 +294,9 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.mtx.Lock() defer p.mtx.Unlock() + // Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it. + deleteLabelNames := make(chan string, len(p.m)) + process := func(l labels.Label) { orig := p.m[l.Name][l.Value] repl := make([]storage.SeriesRef, 0, len(orig)) @@ -305,17 +309,46 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.m[l.Name][l.Value] = repl } else { delete(p.m[l.Name], l.Value) - // Delete the key if we removed all values. if len(p.m[l.Name]) == 0 { - delete(p.m, l.Name) + // Delete the key if we removed all values. + deleteLabelNames <- l.Name } } } - for l := range affected { - process(l) + // Create GOMAXPROCS workers. + wg := sync.WaitGroup{} + jobs := make([]chan labels.Label, runtime.GOMAXPROCS(0)) + for i := range jobs { + jobs[i] = make(chan labels.Label, 128) + wg.Add(1) + go func(jobs chan labels.Label) { + defer wg.Done() + for l := range jobs { + process(l) + } + }(jobs[i]) + } + + // Process all affected labels and the allPostingsKey. + for l := range affected { + j := int(xxhash.Sum64String(l.Name) % uint64(len(jobs))) + jobs[j] <- l + } + j := int(xxhash.Sum64String(allPostingsKey.Name) % uint64(len(jobs))) + jobs[j] <- allPostingsKey + + // Close jobs channels and wait all workers to finish. + for i := range jobs { + close(jobs[i]) + } + wg.Wait() + + // Close deleteLabelNames channel and delete the label names requested. + close(deleteLabelNames) + for name := range deleteLabelNames { + delete(p.m, name) } - process(allPostingsKey) } // Iter calls f for each postings list. It aborts if f returns an error and returns it. diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 96c9ed124..1802c9e89 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1025,7 +1025,7 @@ func BenchmarkMemPostings_Delete(b *testing.B) { return s } - const total = 1e6 + const total = 2e6 allSeries := [total]labels.Labels{} nameValues := make([]string, 0, 100) for i := 0; i < total; i++ {