diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 7bc5629ac..58f3473da 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -26,7 +26,6 @@ import ( "sync" "github.com/bboreham/go-loser" - "github.com/cespare/xxhash/v2" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -293,76 +292,30 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected map[labels.Label]struct{}) { p.mtx.Lock() defer p.mtx.Unlock() - if len(p.m) == 0 || len(deleted) == 0 { - return + + process := func(l labels.Label) { + orig := p.m[l.Name][l.Value] + repl := make([]storage.SeriesRef, 0, len(orig)) + for _, id := range orig { + if _, ok := deleted[id]; !ok { + repl = append(repl, id) + } + } + if len(repl) > 0 { + p.m[l.Name][l.Value] = repl + } else { + delete(p.m[l.Name], l.Value) + // Delete the key if we removed all values. + if len(p.m[l.Name]) == 0 { + delete(p.m, l.Name) + } + } } - // Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it. - deleteLabelNames := make(chan string, len(p.m)) - - process, wait := processWithBoundedParallelismAndConsistentWorkers( - runtime.GOMAXPROCS(0), - func(l labels.Label) uint64 { return xxhash.Sum64String(l.Name) }, - func(l labels.Label) { - orig := p.m[l.Name][l.Value] - repl := make([]storage.SeriesRef, 0, len(orig)) - for _, id := range orig { - if _, ok := deleted[id]; !ok { - repl = append(repl, id) - } - } - if len(repl) > 0 { - p.m[l.Name][l.Value] = repl - } else { - delete(p.m[l.Name], l.Value) - if len(p.m[l.Name]) == 0 { - // Delete the key if we removed all values. - deleteLabelNames <- l.Name - } - } - }, - ) - for l := range affected { process(l) } process(allPostingsKey) - wait() - - // Close deleteLabelNames channel and delete the label names requested. - close(deleteLabelNames) - for name := range deleteLabelNames { - delete(p.m, name) - } -} - -// processWithBoundedParallelismAndConsistentWorkers will call f() with bounded parallelism, -// making sure that elements with same hash(T) will always be processed by the same worker. -// Call process() to add more jobs to process, and once finished adding, call wait() to ensure that all jobs are processed. -func processWithBoundedParallelismAndConsistentWorkers[T any](workers int, hash func(T) uint64, f func(T)) (process func(T), wait func()) { - wg := &sync.WaitGroup{} - jobs := make([]chan T, workers) - for i := 0; i < workers; i++ { - wg.Add(1) - jobs[i] = make(chan T, 128) - go func(jobs <-chan T) { - defer wg.Done() - for l := range jobs { - f(l) - } - }(jobs[i]) - } - - process = func(job T) { - jobs[hash(job)%uint64(workers)] <- job - } - wait = func() { - for i := range jobs { - close(jobs[i]) - } - wg.Wait() - } - return process, wait } // Iter calls f for each postings list. It aborts if f returns an error and returns it. diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 8ee9b9943..7d0b717bf 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -973,69 +973,37 @@ func TestMemPostingsStats(t *testing.T) { } func TestMemPostings_Delete(t *testing.T) { - t.Run("some postings", func(t *testing.T) { - p := NewMemPostings() - p.Add(1, labels.FromStrings("lbl1", "a")) - p.Add(2, labels.FromStrings("lbl1", "b")) - p.Add(3, labels.FromStrings("lbl2", "a")) + p := NewMemPostings() + p.Add(1, labels.FromStrings("lbl1", "a")) + p.Add(2, labels.FromStrings("lbl1", "b")) + p.Add(3, labels.FromStrings("lbl2", "a")) - before := p.Get(allPostingsKey.Name, allPostingsKey.Value) - deletedRefs := map[storage.SeriesRef]struct{}{ - 2: {}, - } - affectedLabels := map[labels.Label]struct{}{ - {Name: "lbl1", Value: "b"}: {}, - } - p.Delete(deletedRefs, affectedLabels) - after := p.Get(allPostingsKey.Name, allPostingsKey.Value) + before := p.Get(allPostingsKey.Name, allPostingsKey.Value) + deletedRefs := map[storage.SeriesRef]struct{}{ + 2: {}, + } + affectedLabels := map[labels.Label]struct{}{ + {Name: "lbl1", Value: "b"}: {}, + } + p.Delete(deletedRefs, affectedLabels) + after := p.Get(allPostingsKey.Name, allPostingsKey.Value) - // Make sure postings gotten before the delete have the old data when - // iterated over. - expanded, err := ExpandPostings(before) - require.NoError(t, err) - require.Equal(t, []storage.SeriesRef{1, 2, 3}, expanded) + // Make sure postings gotten before the delete have the old data when + // iterated over. + expanded, err := ExpandPostings(before) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{1, 2, 3}, expanded) - // Make sure postings gotten after the delete have the new data when - // iterated over. - expanded, err = ExpandPostings(after) - require.NoError(t, err) - require.Equal(t, []storage.SeriesRef{1, 3}, expanded) + // Make sure postings gotten after the delete have the new data when + // iterated over. + expanded, err = ExpandPostings(after) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{1, 3}, expanded) - deleted := p.Get("lbl1", "b") - expanded, err = ExpandPostings(deleted) - require.NoError(t, err) - require.Empty(t, expanded, "expected empty postings, got %v", expanded) - }) - - t.Run("all postings", func(t *testing.T) { - p := NewMemPostings() - p.Add(1, labels.FromStrings("lbl1", "a")) - p.Add(2, labels.FromStrings("lbl1", "b")) - p.Add(3, labels.FromStrings("lbl2", "a")) - - deletedRefs := map[storage.SeriesRef]struct{}{1: {}, 2: {}, 3: {}} - affectedLabels := map[labels.Label]struct{}{ - {Name: "lbl1", Value: "a"}: {}, - {Name: "lbl1", Value: "b"}: {}, - {Name: "lbl1", Value: "c"}: {}, - } - p.Delete(deletedRefs, affectedLabels) - after := p.Get(allPostingsKey.Name, allPostingsKey.Value) - expanded, err := ExpandPostings(after) - require.NoError(t, err) - require.Empty(t, expanded) - }) - - t.Run("nothing on empty mempostings", func(t *testing.T) { - p := NewMemPostings() - deletedRefs := map[storage.SeriesRef]struct{}{} - affectedLabels := map[labels.Label]struct{}{} - p.Delete(deletedRefs, affectedLabels) - after := p.Get(allPostingsKey.Name, allPostingsKey.Value) - expanded, err := ExpandPostings(after) - require.NoError(t, err) - require.Empty(t, expanded) - }) + deleted := p.Get("lbl1", "b") + expanded, err = ExpandPostings(deleted) + require.NoError(t, err) + require.Empty(t, expanded, "expected empty postings, got %v", expanded) } // BenchmarkMemPostings_Delete is quite heavy, so consider running it with @@ -1057,7 +1025,7 @@ func BenchmarkMemPostings_Delete(b *testing.B) { return s } - const total = 2e6 + const total = 1e6 allSeries := [total]labels.Labels{} nameValues := make([]string, 0, 100) for i := 0; i < total; i++ {