Merge pull request #15316 from bboreham/revert-15141
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run

Revert "Fix `MemPostings.Add` and `MemPostings.Get` data race (#15141)"
This commit is contained in:
Bryan Boreham 2024-11-03 14:49:27 +00:00 committed by GitHub
commit a1a78a151d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 12 additions and 78 deletions

View file

@ -43,7 +43,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
defer h.Close() defer h.Close()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
h.getOrCreate(uint64(i), labels.FromStrings(labels.MetricName, "test", "a", strconv.Itoa(i), "b", strconv.Itoa(i%10), "c", strconv.Itoa(i%100), "d", strconv.Itoa(i/2), "e", strconv.Itoa(i/4))) h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)))
} }
} }
@ -61,8 +61,8 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
i := int(count.Inc()) i := count.Inc()
h.getOrCreate(uint64(i), labels.FromStrings(labels.MetricName, "test", "a", strconv.Itoa(i), "b", strconv.Itoa(i%10), "c", strconv.Itoa(i%100), "d", strconv.Itoa(i/2), "e", strconv.Itoa(i/4))) h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i))))
} }
}) })
} }
@ -82,7 +82,7 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
defer h.Close() defer h.Close()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
h.getOrCreate(uint64(i), labels.FromStrings(labels.MetricName, "test", "a", strconv.Itoa(i), "b", strconv.Itoa(i%10), "c", strconv.Itoa(i%100), "d", strconv.Itoa(i/2), "e", strconv.Itoa(i/4))) h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)))
} }
} }

View file

@ -345,14 +345,13 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) {
p.mtx.Unlock() p.mtx.Unlock()
} }
func appendWithExponentialGrowth[T any](a []T, v T) (_ []T, copied bool) { func appendWithExponentialGrowth[T any](a []T, v T) []T {
if cap(a) < len(a)+1 { if cap(a) < len(a)+1 {
newList := make([]T, len(a), len(a)*2+1) newList := make([]T, len(a), len(a)*2+1)
copy(newList, a) copy(newList, a)
a = newList a = newList
copied = true
} }
return append(a, v), copied return append(a, v)
} }
func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
@ -361,26 +360,16 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
nm = map[string][]storage.SeriesRef{} nm = map[string][]storage.SeriesRef{}
p.m[l.Name] = nm p.m[l.Name] = nm
} }
list, copied := appendWithExponentialGrowth(nm[l.Value], id) list := appendWithExponentialGrowth(nm[l.Value], id)
nm[l.Value] = list nm[l.Value] = list
// Return if it shouldn't be ordered, if it only has one element or if it's already ordered. if !p.ordered {
// The invariant is that the first n-1 items in the list are already sorted.
if !p.ordered || len(list) == 1 || list[len(list)-1] >= list[len(list)-2] {
return return
} }
// There is no guarantee that no higher ID was inserted before as they may
if !copied { // be generated independently before adding them to postings.
// We have appended to the existing slice, // We repair order violations on insert. The invariant is that the first n-1
// and readers may already have a copy of this postings slice, // items in the list are already sorted.
// so we need to copy it before sorting.
old := list
list = make([]storage.SeriesRef, len(old), cap(old))
copy(list, old)
nm[l.Value] = list
}
// Repair order violations.
for i := len(list) - 1; i >= 1; i-- { for i := len(list) - 1; i >= 1; i-- {
if list[i] >= list[i-1] { if list[i] >= list[i-1] {
break break

View file

@ -1475,58 +1475,3 @@ func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
require.Error(t, p.Err()) require.Error(t, p.Err())
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result. require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
} }
func TestMemPostings_Unordered_Add_Get(t *testing.T) {
mp := NewMemPostings()
for ref := storage.SeriesRef(1); ref < 8; ref += 2 {
// First, add next series.
next := ref + 1
mp.Add(next, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(next))))
nextPostings := mp.Get(labels.MetricName, "test")
// Now add current ref.
mp.Add(ref, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(ref))))
// Next postings should still reference the next series.
nextExpanded, err := ExpandPostings(nextPostings)
require.NoError(t, err)
require.Len(t, nextExpanded, int(ref))
require.Equal(t, next, nextExpanded[len(nextExpanded)-1])
}
}
func TestMemPostings_Concurrent_Add_Get(t *testing.T) {
refs := make(chan storage.SeriesRef)
wg := sync.WaitGroup{}
wg.Add(1)
t.Cleanup(wg.Wait)
t.Cleanup(func() { close(refs) })
mp := NewMemPostings()
go func() {
defer wg.Done()
for ref := range refs {
mp.Add(ref, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(ref))))
p := mp.Get(labels.MetricName, "test")
_, err := ExpandPostings(p)
if err != nil {
t.Errorf("unexpected error: %s", err)
}
}
}()
for ref := storage.SeriesRef(1); ref < 8; ref += 2 {
// Add next ref in another goroutine so they would race.
refs <- ref + 1
// Add current ref here
mp.Add(ref, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(ref))))
// We don't read the value of the postings here,
// this is tested in TestMemPostings_Unordered_Add_Get where it's easier to achieve the determinism.
// This test just checks that there's no data race.
p := mp.Get(labels.MetricName, "test")
_, err := ExpandPostings(p)
require.NoError(t, err)
}
}