From e630ffdbedaf276da8c7e1a015af893ddc16d77a Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 26 Jul 2024 18:17:35 +0100 Subject: [PATCH 1/4] TSDB: extend BenchmarkMemPostings_PostingsForLabelMatching to check merge speed We need to create more postings entries so the merger has some work to do. Not material for the regexp ones as they match so few series. Signed-off-by: Bryan Boreham --- tsdb/index/postings_test.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 6dd9f25bc0..c4fb1f12f4 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1410,12 +1410,15 @@ func BenchmarkMemPostings_PostingsForLabelMatching(b *testing.B) { slowRegexp := "^" + slowRegexpString() + "$" b.Logf("Slow regexp length = %d", len(slowRegexp)) slow := regexp.MustCompile(slowRegexp) + const seriesPerLabel = 10 for _, labelValueCount := range []int{1_000, 10_000, 100_000} { b.Run(fmt.Sprintf("labels=%d", labelValueCount), func(b *testing.B) { mp := NewMemPostings() for i := 0; i < labelValueCount; i++ { - mp.Add(storage.SeriesRef(i), labels.FromStrings("label", strconv.Itoa(i))) + for j := 0; j < seriesPerLabel; j++ { + mp.Add(storage.SeriesRef(i*seriesPerLabel+j), labels.FromStrings("__name__", strconv.Itoa(j), "label", strconv.Itoa(i))) + } } fp, err := ExpandPostings(mp.PostingsForLabelMatching(context.Background(), "label", fast.MatchString)) @@ -1435,6 +1438,18 @@ func BenchmarkMemPostings_PostingsForLabelMatching(b *testing.B) { mp.PostingsForLabelMatching(context.Background(), "label", slow.MatchString).Next() } }) + + b.Run("matcher=all", func(b *testing.B) { + for i := 0; i < b.N; i++ { + // Match everything. + p := mp.PostingsForLabelMatching(context.Background(), "label", func(_ string) bool { return true }) + var sum storage.SeriesRef + // Iterate through all results to exercise merge function. + for p.Next() { + sum += p.At() + } + } + }) }) } } From 1b22242024d09a287a30ad30560dfc58febee966 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 26 Jul 2024 20:00:03 +0100 Subject: [PATCH 2/4] TSDB BenchmarkMerge: run fewer sizes As long as we run small and big sizes, we don't need all the sizes inbetween. Signed-off-by: Bryan Boreham --- tsdb/index/postings_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index c4fb1f12f4..cd7fa9d3c6 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -393,7 +393,7 @@ func BenchmarkMerge(t *testing.B) { } its := make([]Postings, len(refs)) - for _, nSeries := range []int{1, 10, 100, 1000, 10000, 100000} { + for _, nSeries := range []int{1, 10, 10000, 100000} { t.Run(strconv.Itoa(nSeries), func(bench *testing.B) { ctx := context.Background() for i := 0; i < bench.N; i++ { From 0a8779f46dace4d24dd9c14e81cba065c23e2a88 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 26 Jul 2024 19:35:58 +0100 Subject: [PATCH 3/4] TSDB: Make mergedPostings generic Now we can call it with more specific types which is more efficient than making everything go through the `Postings` interface. Benchmark the concrete type. Signed-off-by: Bryan Boreham --- tsdb/index/postings.go | 20 ++++++++++---------- tsdb/index/postings_test.go | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index a2c5a82239..5384133832 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -660,7 +660,7 @@ func (it *intersectPostings) Err() error { } // Merge returns a new iterator over the union of the input iterators. -func Merge(_ context.Context, its ...Postings) Postings { +func Merge[T Postings](_ context.Context, its ...T) Postings { if len(its) == 0 { return EmptyPostings() } @@ -675,19 +675,19 @@ func Merge(_ context.Context, its ...Postings) Postings { return p } -type mergedPostings struct { - p []Postings - h *loser.Tree[storage.SeriesRef, Postings] +type mergedPostings[T Postings] struct { + p []T + h *loser.Tree[storage.SeriesRef, T] cur storage.SeriesRef } -func newMergedPostings(p []Postings) (m *mergedPostings, nonEmpty bool) { +func newMergedPostings[T Postings](p []T) (m *mergedPostings[T], nonEmpty bool) { const maxVal = storage.SeriesRef(math.MaxUint64) // This value must be higher than all real values used in the tree. lt := loser.New(p, maxVal) - return &mergedPostings{p: p, h: lt}, true + return &mergedPostings[T]{p: p, h: lt}, true } -func (it *mergedPostings) Next() bool { +func (it *mergedPostings[T]) Next() bool { for { if !it.h.Next() { return false @@ -701,7 +701,7 @@ func (it *mergedPostings) Next() bool { } } -func (it *mergedPostings) Seek(id storage.SeriesRef) bool { +func (it *mergedPostings[T]) Seek(id storage.SeriesRef) bool { for !it.h.IsEmpty() && it.h.At() < id { finished := !it.h.Winner().Seek(id) it.h.Fix(finished) @@ -713,11 +713,11 @@ func (it *mergedPostings) Seek(id storage.SeriesRef) bool { return true } -func (it mergedPostings) At() storage.SeriesRef { +func (it mergedPostings[T]) At() storage.SeriesRef { return it.cur } -func (it mergedPostings) Err() error { +func (it mergedPostings[T]) Err() error { for _, p := range it.p { if err := p.Err(); err != nil { return err diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index cd7fa9d3c6..77d59ec995 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -392,7 +392,7 @@ func BenchmarkMerge(t *testing.B) { refs = append(refs, temp) } - its := make([]Postings, len(refs)) + its := make([]*ListPostings, len(refs)) for _, nSeries := range []int{1, 10, 10000, 100000} { t.Run(strconv.Itoa(nSeries), func(bench *testing.B) { ctx := context.Background() From cfa32f3d2847eb9d40ebc16ce9bb8ebfeb0705a1 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 26 Jul 2024 20:08:51 +0100 Subject: [PATCH 4/4] TSDB: Move merge of head postings into index This enables it to take advantage of a more compact data structure since all postings are known to be `*ListPostings`. Remove the `Get` member which was not used for anything else, and fix up tests. Signed-off-by: Bryan Boreham --- tsdb/head_read.go | 15 +-------------- tsdb/head_test.go | 12 ++++++------ tsdb/index/postings.go | 38 ++++++++++++++++++------------------- tsdb/index/postings_test.go | 8 ++++---- 4 files changed, 30 insertions(+), 43 deletions(-) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 79ed0f0240..b95257c28a 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -103,20 +103,7 @@ func (h *headIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Ma // Postings returns the postings list iterator for the label pairs. func (h *headIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { - switch len(values) { - case 0: - return index.EmptyPostings(), nil - case 1: - return h.head.postings.Get(name, values[0]), nil - default: - res := make([]index.Postings, 0, len(values)) - for _, value := range values { - if p := h.head.postings.Get(name, value); !index.IsEmptyPostingsType(p) { - res = append(res, p) - } - } - return index.Merge(ctx, res...), nil - } + return h.head.postings.Postings(ctx, name, values...), nil } func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index fb158b593c..e3742cbe9c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -962,12 +962,12 @@ func TestHead_Truncate(t *testing.T) { require.Nil(t, h.series.getByID(s3.ref)) require.Nil(t, h.series.getByID(s4.ref)) - postingsA1, _ := index.ExpandPostings(h.postings.Get("a", "1")) - postingsA2, _ := index.ExpandPostings(h.postings.Get("a", "2")) - postingsB1, _ := index.ExpandPostings(h.postings.Get("b", "1")) - postingsB2, _ := index.ExpandPostings(h.postings.Get("b", "2")) - postingsC1, _ := index.ExpandPostings(h.postings.Get("c", "1")) - postingsAll, _ := index.ExpandPostings(h.postings.Get("", "")) + postingsA1, _ := index.ExpandPostings(h.postings.Postings(ctx, "a", "1")) + postingsA2, _ := index.ExpandPostings(h.postings.Postings(ctx, "a", "2")) + postingsB1, _ := index.ExpandPostings(h.postings.Postings(ctx, "b", "1")) + postingsB2, _ := index.ExpandPostings(h.postings.Postings(ctx, "b", "2")) + postingsC1, _ := index.ExpandPostings(h.postings.Postings(ctx, "c", "1")) + postingsAll, _ := index.ExpandPostings(h.postings.Postings(ctx, "", "")) require.Equal(t, []storage.SeriesRef{storage.SeriesRef(s1.ref)}, postingsA1) require.Equal(t, []storage.SeriesRef{storage.SeriesRef(s2.ref)}, postingsA2) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 5384133832..03e3f7a239 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -235,25 +235,9 @@ func (p *MemPostings) Stats(label string, limit int, labelSizeFunc func(string, } } -// Get returns a postings list for the given label pair. -func (p *MemPostings) Get(name, value string) Postings { - var lp []storage.SeriesRef - p.mtx.RLock() - l := p.m[name] - if l != nil { - lp = l[value] - } - p.mtx.RUnlock() - - if lp == nil { - return EmptyPostings() - } - return newListPostings(lp...) -} - // All returns a postings list over all documents ever added. func (p *MemPostings) All() Postings { - return p.Get(AllPostingsKey()) + return p.Postings(context.Background(), allPostingsKey.Name, allPostingsKey.Value) } // EnsureOrder ensures that all postings lists are sorted. After it returns all further @@ -490,7 +474,7 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, } // Now `vals` only contains the values that matched, get their postings. - its := make([]Postings, 0, len(vals)) + its := make([]*ListPostings, 0, len(vals)) lps := make([]ListPostings, len(vals)) p.mtx.RLock() e := p.m[name] @@ -510,11 +494,27 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, return Merge(ctx, its...) } +// Postings returns a postings iterator for the given label values. +func (p *MemPostings) Postings(ctx context.Context, name string, values ...string) Postings { + res := make([]*ListPostings, 0, len(values)) + lps := make([]ListPostings, len(values)) + p.mtx.RLock() + postingsMapForName := p.m[name] + for i, value := range values { + if lp := postingsMapForName[value]; lp != nil { + lps[i] = ListPostings{list: lp} + res = append(res, &lps[i]) + } + } + p.mtx.RUnlock() + return Merge(ctx, res...) +} + func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string) Postings { p.mtx.RLock() e := p.m[name] - its := make([]Postings, 0, len(e)) + its := make([]*ListPostings, 0, len(e)) lps := make([]ListPostings, len(e)) i := 0 for _, refs := range e { diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 77d59ec995..cf5ab6c0f8 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -979,7 +979,7 @@ func TestMemPostings_Delete(t *testing.T) { p.Add(2, labels.FromStrings("lbl1", "b")) p.Add(3, labels.FromStrings("lbl2", "a")) - before := p.Get(allPostingsKey.Name, allPostingsKey.Value) + before := p.Postings(context.Background(), allPostingsKey.Name, allPostingsKey.Value) deletedRefs := map[storage.SeriesRef]struct{}{ 2: {}, } @@ -987,7 +987,7 @@ func TestMemPostings_Delete(t *testing.T) { {Name: "lbl1", Value: "b"}: {}, } p.Delete(deletedRefs, affectedLabels) - after := p.Get(allPostingsKey.Name, allPostingsKey.Value) + after := p.Postings(context.Background(), allPostingsKey.Name, allPostingsKey.Value) // Make sure postings gotten before the delete have the old data when // iterated over. @@ -1001,7 +1001,7 @@ func TestMemPostings_Delete(t *testing.T) { require.NoError(t, err) require.Equal(t, []storage.SeriesRef{1, 3}, expanded) - deleted := p.Get("lbl1", "b") + deleted := p.Postings(context.Background(), "lbl1", "b") expanded, err = ExpandPostings(deleted) require.NoError(t, err) require.Empty(t, expanded, "expected empty postings, got %v", expanded) @@ -1073,7 +1073,7 @@ func BenchmarkMemPostings_Delete(b *testing.B) { return default: // Get a random value of this label. - p.Get(lbl, itoa(rand.Intn(10000))).Next() + p.Postings(context.Background(), lbl, itoa(rand.Intn(10000))).Next() } } }(i)