MemPostings.Delete(): reduce locking/unlocking (#13286)

* MemPostings: reduce locking/unlocking

MemPostings.Delete is called from Head.gc(), i.e. it gets the IDs of the
series that have churned.

I'd assume that many label values aren't affected by that churn at all,
so it doesn't make sense to touch the lock while checking them.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
Oleg Zaytsev 2024-06-10 14:23:22 +02:00 committed by GitHub
parent 08621bebe9
commit 2dc177d8af
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 139 additions and 15 deletions

View file

@ -289,41 +289,67 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {
// Delete removes all ids in the given map from the postings lists.
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) {
var keys, vals []string
// We will take an optimistic read lock for the entire method,
// and only lock for writing when we actually find something to delete.
//
// Each SeriesRef can appear in several Postings.
// To change each one, we need to know the label name and value that it is indexed under.
// We iterate over all label names, then for each name all values,
// and look for individual series to be deleted.
p.mtx.RLock()
defer p.mtx.RUnlock()
// Collect all keys relevant for deletion once. New keys added afterwards
// can by definition not be affected by any of the given deletes.
p.mtx.RLock()
keys := make([]string, 0, len(p.m))
maxVals := 0
for n := range p.m {
keys = append(keys, n)
if len(p.m[n]) > maxVals {
maxVals = len(p.m[n])
}
}
p.mtx.RUnlock()
vals := make([]string, 0, maxVals)
for _, n := range keys {
p.mtx.RLock()
// Copy the values and iterate the copy: if we unlock in the loop below,
// another goroutine might modify the map while we are part-way through it.
vals = vals[:0]
for v := range p.m[n] {
vals = append(vals, v)
}
p.mtx.RUnlock()
// For each posting we first analyse whether the postings list is affected by the deletes.
// If yes, we actually reallocate a new postings list.
for _, l := range vals {
// Only lock for processing one postings list so we don't block reads for too long.
p.mtx.Lock()
// If no, we remove the label value from the vals list.
// This way we only need to Lock once later.
for i := 0; i < len(vals); {
found := false
for _, id := range p.m[n][l] {
refs := p.m[n][vals[i]]
for _, id := range refs {
if _, ok := deleted[id]; ok {
i++
found = true
break
}
}
if !found {
p.mtx.Unlock()
continue
// Didn't match, bring the last value to this position, make the slice shorter and check again.
// The order of the slice doesn't matter as it comes from a map iteration.
vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1]
}
}
// If no label values have deleted ids, just continue.
if len(vals) == 0 {
continue
}
// The only vals left here are the ones that contain deleted ids.
// Now we take the write lock and remove the ids.
p.mtx.RUnlock()
p.mtx.Lock()
for _, l := range vals {
repl := make([]storage.SeriesRef, 0, len(p.m[n][l]))
for _, id := range p.m[n][l] {
@ -336,13 +362,14 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) {
} else {
delete(p.m[n], l)
}
p.mtx.Unlock()
}
p.mtx.Lock()
// Delete the key if we removed all values.
if len(p.m[n]) == 0 {
delete(p.m, n)
}
p.mtx.Unlock()
p.mtx.RLock()
}
}

View file

@ -23,6 +23,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"testing"
"github.com/grafana/regexp"
@ -1001,6 +1002,102 @@ func TestMemPostings_Delete(t *testing.T) {
require.Empty(t, expanded, "expected empty postings, got %v", expanded)
}
// BenchmarkMemPostings_Delete is quite heavy, so consider running it with
// -benchtime=10x or similar to get more stable and comparable results.
func BenchmarkMemPostings_Delete(b *testing.B) {
internedItoa := map[int]string{}
var mtx sync.RWMutex
itoa := func(i int) string {
mtx.RLock()
s, ok := internedItoa[i]
mtx.RUnlock()
if ok {
return s
}
mtx.Lock()
s = strconv.Itoa(i)
internedItoa[i] = s
mtx.Unlock()
return s
}
const total = 1e6
prepare := func() *MemPostings {
var ref storage.SeriesRef
next := func() storage.SeriesRef {
ref++
return ref
}
p := NewMemPostings()
nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ {
nameValues = nameValues[:0]
// A thousand labels like lbl_x_of_1000, each with total/1000 values
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
nameValues = append(nameValues, thousand, itoa(i/1000))
// A hundred labels like lbl_x_of_100, each with total/100 values.
hundred := "lbl_" + itoa(i%100) + "_of_100"
nameValues = append(nameValues, hundred, itoa(i/100))
if i < 100 {
ten := "lbl_" + itoa(i%10) + "_of_10"
nameValues = append(nameValues, ten, itoa(i%10))
}
p.Add(next(), labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...))
}
return p
}
for _, refs := range []int{1, 100, 10_000} {
b.Run(fmt.Sprintf("refs=%d", refs), func(b *testing.B) {
for _, reads := range []int{0, 1, 10} {
b.Run(fmt.Sprintf("readers=%d", reads), func(b *testing.B) {
if b.N > total/refs {
// Just to make sure that benchmark still makes sense.
panic("benchmark not prepared")
}
p := prepare()
stop := make(chan struct{})
wg := sync.WaitGroup{}
for i := 0; i < reads; i++ {
wg.Add(1)
go func(i int) {
lbl := "lbl_" + itoa(i) + "_of_100"
defer wg.Done()
for {
select {
case <-stop:
return
default:
// Get a random value of this label.
p.Get(lbl, itoa(rand.Intn(10000))).Next()
}
}
}(i)
}
b.Cleanup(func() {
close(stop)
wg.Wait()
})
b.ResetTimer()
for n := 0; n < b.N; n++ {
deleted := map[storage.SeriesRef]struct{}{}
for i := 0; i < refs; i++ {
deleted[storage.SeriesRef(n*refs+i)] = struct{}{}
}
p.Delete(deleted)
}
})
}
})
}
}
func TestFindIntersectingPostings(t *testing.T) {
t.Run("multiple intersections", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 15, 20, 25, 30, 35, 40, 45, 50})