[PERF] TSDB: Optimize inverse matching (#14144)

Simple follow-up to #13620. Modify `tsdb.PostingsForMatchers` to use the optimized tsdb.IndexReader.PostingsForLabelMatching method also for inverse matching.

Introduce method `PostingsForAllLabelValues`, to avoid changing the existing method.

The performance is much improved for a subset of the cases; there are up to
~60% CPU gains and ~12.5% reduction in memory usage. 

Remove `TestReader_InversePostingsForMatcherHonorsContextCancel` since
`inversePostingsForMatcher` only passes `ctx` to `IndexReader` implementations now.

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Arve Knudsen 2024-11-19 16:49:01 +01:00 committed by GitHub
parent 62e6e55c07
commit 06d54fcc6c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 137 additions and 35 deletions

View file

@ -82,6 +82,10 @@ type IndexReader interface {
// If no postings are found having at least one matching label, an empty iterator is returned. // If no postings are found having at least one matching label, an empty iterator is returned.
PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings
// PostingsForAllLabelValues returns a sorted iterator over all postings having a label with the given name.
// If no postings are found with the label in question, an empty iterator is returned.
PostingsForAllLabelValues(ctx context.Context, name string) index.Postings
// SortedPostings returns a postings list that is reordered to be sorted // SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series. // by the label set of the underlying series.
SortedPostings(index.Postings) index.Postings SortedPostings(index.Postings) index.Postings
@ -531,6 +535,10 @@ func (r blockIndexReader) PostingsForLabelMatching(ctx context.Context, name str
return r.ir.PostingsForLabelMatching(ctx, name, match) return r.ir.PostingsForLabelMatching(ctx, name, match)
} }
func (r blockIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
return r.ir.PostingsForAllLabelValues(ctx, name)
}
func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
return r.ir.SortedPostings(p) return r.ir.SortedPostings(p)
} }

View file

@ -123,6 +123,10 @@ func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name str
return h.head.postings.PostingsForLabelMatching(ctx, name, match) return h.head.postings.PostingsForLabelMatching(ctx, name, match)
} }
func (h *headIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
return h.head.postings.PostingsForAllLabelValues(ctx, name)
}
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128) series := make([]*memSeries, 0, 128)

View file

@ -1777,6 +1777,15 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
} }
func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
return r.postingsForLabelMatching(ctx, name, match)
}
func (r *Reader) PostingsForAllLabelValues(ctx context.Context, name string) Postings {
return r.postingsForLabelMatching(ctx, name, nil)
}
// postingsForLabelMatching implements PostingsForLabelMatching if match is non-nil, and PostingsForAllLabelValues otherwise.
func (r *Reader) postingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
if r.version == FormatV1 { if r.version == FormatV1 {
return r.postingsForLabelMatchingV1(ctx, name, match) return r.postingsForLabelMatchingV1(ctx, name, match)
} }
@ -1786,11 +1795,17 @@ func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, matc
return EmptyPostings() return EmptyPostings()
} }
postingsEstimate := 0
if match == nil {
// The caller wants all postings for name.
postingsEstimate = len(e) * symbolFactor
}
lastVal := e[len(e)-1].value lastVal := e[len(e)-1].value
var its []Postings its := make([]Postings, 0, postingsEstimate)
if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) { if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) {
if match(val) { if match == nil || match(val) {
// We want this postings iterator since the value is a match // We want this postings iterator since the value is a match.
postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.DecodePostings(postingsDec) _, p, err := r.dec.DecodePostings(postingsDec)
if err != nil { if err != nil {
@ -1819,7 +1834,7 @@ func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, ma
return ErrPostings(ctx.Err()) return ErrPostings(ctx.Err())
} }
count++ count++
if !match(val) { if match != nil && !match(val) {
continue continue
} }

View file

@ -613,6 +613,52 @@ func TestChunksTimeOrdering(t *testing.T) {
require.NoError(t, idx.Close()) require.NoError(t, idx.Close())
} }
func TestReader_PostingsForLabelMatching(t *testing.T) {
const seriesCount = 9
var input indexWriterSeriesSlice
for i := 1; i <= seriesCount; i++ {
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("__name__", strconv.Itoa(i)),
chunks: []chunks.Meta{
{Ref: 1, MinTime: 0, MaxTime: 10},
},
})
}
ir, _, _ := createFileReader(context.Background(), t, input)
p := ir.PostingsForLabelMatching(context.Background(), "__name__", func(v string) bool {
iv, err := strconv.Atoi(v)
if err != nil {
panic(err)
}
return iv%2 == 0
})
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{4, 6, 8, 10}, refs)
}
func TestReader_PostingsForAllLabelValues(t *testing.T) {
const seriesCount = 9
var input indexWriterSeriesSlice
for i := 1; i <= seriesCount; i++ {
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("__name__", strconv.Itoa(i)),
chunks: []chunks.Meta{
{Ref: 1, MinTime: 0, MaxTime: 10},
},
})
}
ir, _, _ := createFileReader(context.Background(), t, input)
p := ir.PostingsForAllLabelValues(context.Background(), "__name__")
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{3, 4, 5, 6, 7, 8, 9, 10, 11}, refs)
}
func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
const seriesCount = 1000 const seriesCount = 1000
var input indexWriterSeriesSlice var input indexWriterSeriesSlice

View file

@ -447,6 +447,22 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
return Merge(ctx, its...) return Merge(ctx, its...)
} }
func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string) Postings {
p.mtx.RLock()
e := p.m[name]
its := make([]Postings, 0, len(e))
for _, refs := range e {
if len(refs) > 0 {
its = append(its, NewListPostings(refs))
}
}
// Let the mutex go before merging.
p.mtx.RUnlock()
return Merge(ctx, its...)
}
// labelValues returns a slice of label values for the given label name. // labelValues returns a slice of label values for the given label name.
// It will take the read lock. // It will take the read lock.
func (p *MemPostings) labelValues(name string) []string { func (p *MemPostings) labelValues(name string) []string {

View file

@ -1460,6 +1460,21 @@ func TestMemPostings_PostingsForLabelMatching(t *testing.T) {
require.Equal(t, []storage.SeriesRef{2, 4}, refs) require.Equal(t, []storage.SeriesRef{2, 4}, refs)
} }
func TestMemPostings_PostingsForAllLabelValues(t *testing.T) {
mp := NewMemPostings()
mp.Add(1, labels.FromStrings("foo", "1"))
mp.Add(2, labels.FromStrings("foo", "2"))
mp.Add(3, labels.FromStrings("foo", "3"))
mp.Add(4, labels.FromStrings("foo", "4"))
p := mp.PostingsForAllLabelValues(context.Background(), "foo")
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
// All postings for the label should be returned.
require.Equal(t, []storage.SeriesRef{1, 2, 3, 4}, refs)
}
func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
memP := NewMemPostings() memP := NewMemPostings()
seriesCount := 10 * checkContextEveryNIterations seriesCount := 10 * checkContextEveryNIterations

View file

@ -450,6 +450,10 @@ func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context
return index.ErrPostings(errors.New("not supported")) return index.ErrPostings(errors.New("not supported"))
} }
func (ir *OOOCompactionHeadIndexReader) PostingsForAllLabelValues(context.Context, string) index.Postings {
return index.ErrPostings(errors.New("not supported"))
}
func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings { func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings {
// This will already be sorted from the Postings() call above. // This will already be sorted from the Postings() call above.
return p return p

View file

@ -365,29 +365,16 @@ func inversePostingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Ma
return ix.Postings(ctx, m.Name, m.Value) return ix.Postings(ctx, m.Name, m.Value)
} }
vals, err := ix.LabelValues(ctx, m.Name) // If the matcher being inverted is =~"" or ="", we just want all the values.
if err != nil {
return nil, err
}
res := vals[:0]
// If the match before inversion was !="" or !~"", we just want all the values.
if m.Value == "" && (m.Type == labels.MatchRegexp || m.Type == labels.MatchEqual) { if m.Value == "" && (m.Type == labels.MatchRegexp || m.Type == labels.MatchEqual) {
res = vals it := ix.PostingsForAllLabelValues(ctx, m.Name)
} else { return it, it.Err()
count := 1
for _, val := range vals {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
}
count++
if !m.Matches(val) {
res = append(res, val)
}
}
} }
return ix.Postings(ctx, m.Name, res...) it := ix.PostingsForLabelMatching(ctx, m.Name, func(s string) bool {
return !m.Matches(s)
})
return it, it.Err()
} }
func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) { func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {

View file

@ -2340,6 +2340,16 @@ func (m mockIndex) PostingsForLabelMatching(ctx context.Context, name string, ma
return index.Merge(ctx, res...) return index.Merge(ctx, res...)
} }
func (m mockIndex) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
var res []index.Postings
for l, srs := range m.postings {
if l.Name == name {
res = append(res, index.NewListPostings(srs))
}
}
return index.Merge(ctx, res...)
}
func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
out := make([]storage.SeriesRef, 0, 128) out := make([]storage.SeriesRef, 0, 128)
@ -3327,6 +3337,10 @@ func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func
return index.ErrPostings(errors.New("PostingsForLabelMatching called")) return index.ErrPostings(errors.New("PostingsForLabelMatching called"))
} }
func (m mockMatcherIndex) PostingsForAllLabelValues(context.Context, string) index.Postings {
return index.ErrPostings(errors.New("PostingsForAllLabelValues called"))
}
func TestPostingsForMatcher(t *testing.T) { func TestPostingsForMatcher(t *testing.T) {
ctx := context.Background() ctx := context.Background()
@ -3725,17 +3739,6 @@ func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result. require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
} }
func TestReader_InversePostingsForMatcherHonorsContextCancel(t *testing.T) {
ir := mockReaderOfLabels{}
failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations)
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
_, err := inversePostingsForMatcher(ctx, ir, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
require.Error(t, err)
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}
type mockReaderOfLabels struct{} type mockReaderOfLabels struct{}
const mockReaderOfLabelsSeriesCount = checkContextEveryNIterations * 10 const mockReaderOfLabelsSeriesCount = checkContextEveryNIterations * 10
@ -3768,6 +3771,10 @@ func (m mockReaderOfLabels) PostingsForLabelMatching(context.Context, string, fu
panic("PostingsForLabelMatching called") panic("PostingsForLabelMatching called")
} }
func (m mockReaderOfLabels) PostingsForAllLabelValues(context.Context, string) index.Postings {
panic("PostingsForAllLabelValues called")
}
func (m mockReaderOfLabels) Postings(context.Context, string, ...string) (index.Postings, error) { func (m mockReaderOfLabels) Postings(context.Context, string, ...string) (index.Postings, error) {
panic("Postings called") panic("Postings called")
} }