This commit is contained in:
Arve Knudsen 2024-09-19 17:06:08 -04:00 committed by GitHub
commit 847b396bb6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 84 additions and 35 deletions

View file

@ -78,6 +78,7 @@ type IndexReader interface {
Postings(ctx context.Context, name string, values ...string) (index.Postings, error)
// PostingsForLabelMatching returns a sorted iterator over postings having a label with the given name and a value for which match returns true.
// If match is nil, all postings for the label name are included.
// If no postings are found having at least one matching label, an empty iterator is returned.
PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings

View file

@ -1780,11 +1780,17 @@ func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, matc
return EmptyPostings()
}
postingsCount := 0
if match == nil {
// The caller wants all postings for name.
postingsCount = len(e) * symbolFactor
}
lastVal := e[len(e)-1].value
var its []Postings
its := make([]Postings, 0, postingsCount)
if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) {
if match(val) {
// We want this postings iterator since the value is a match
if match == nil || match(val) {
// We want this postings iterator since the value is a match.
postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.PostingsFromDecbuf(postingsDec)
if err != nil {

View file

@ -612,6 +612,41 @@ func TestChunksTimeOrdering(t *testing.T) {
require.NoError(t, idx.Close())
}
func TestReader_PostingsForLabelMatching(t *testing.T) {
const seriesCount = 9
var input indexWriterSeriesSlice
for i := 1; i <= seriesCount; i++ {
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("__name__", strconv.Itoa(i)),
chunks: []chunks.Meta{
{Ref: 1, MinTime: 0, MaxTime: 10},
},
})
}
ir, _, _ := createFileReader(context.Background(), t, input)
p := ir.PostingsForLabelMatching(context.Background(), "__name__", func(v string) bool {
iv, err := strconv.Atoi(v)
if err != nil {
panic(err)
}
return iv%2 == 0
})
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{4, 6, 8, 10}, refs)
t.Run("nil match function", func(t *testing.T) {
p := ir.PostingsForLabelMatching(context.Background(), "__name__", nil)
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
// All postings for the label should be returned.
require.Equal(t, []storage.SeriesRef{3, 4, 5, 6, 7, 8, 9, 10, 11}, refs)
})
}
func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
const seriesCount = 1000
var input indexWriterSeriesSlice

View file

@ -370,6 +370,28 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
}
func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
if match == nil {
// All postings are requested, no filtering.
p.mtx.RLock()
e := p.m[name]
if len(e) == 0 {
p.mtx.RUnlock()
return EmptyPostings()
}
its := make([]Postings, 0, len(e))
for _, refs := range e {
if len(refs) > 0 {
its = append(its, NewListPostings(refs))
}
}
// Let the mutex go before merging.
p.mtx.RUnlock()
return Merge(ctx, its...)
}
// We'll copy the values into a slice and then match over that,
// this way we don't need to hold the mutex while we're matching,
// which can be slow (seconds) if the match function is a huge regex.

View file

@ -1458,6 +1458,15 @@ func TestMemPostings_PostingsForLabelMatching(t *testing.T) {
refs, err := ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{2, 4}, refs)
t.Run("nil match function", func(t *testing.T) {
p := mp.PostingsForLabelMatching(context.Background(), "foo", nil)
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
// All postings for the label should be returned.
require.Equal(t, []storage.SeriesRef{1, 2, 3, 4}, refs)
})
}
func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {

View file

@ -363,29 +363,16 @@ func inversePostingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Ma
return ix.Postings(ctx, m.Name, m.Value)
}
vals, err := ix.LabelValues(ctx, m.Name)
if err != nil {
return nil, err
// If inverse matching the empty string, we just want all the values.
if (m.Value == "" && (m.Type == labels.MatchRegexp || m.Type == labels.MatchEqual)) || (m.Type == labels.MatchRegexp && m.Value == "^$") {
it := ix.PostingsForLabelMatching(ctx, m.Name, nil)
return it, it.Err()
}
res := vals[:0]
// If the match before inversion was !="" or !~"", we just want all the values.
if m.Value == "" && (m.Type == labels.MatchRegexp || m.Type == labels.MatchEqual) {
res = vals
} else {
count := 1
for _, val := range vals {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
}
count++
if !m.Matches(val) {
res = append(res, val)
}
}
}
return ix.Postings(ctx, m.Name, res...)
it := ix.PostingsForLabelMatching(ctx, m.Name, func(s string) bool {
return !m.Matches(s)
})
return it, it.Err()
}
func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {

View file

@ -3725,17 +3725,6 @@ func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}
func TestReader_InversePostingsForMatcherHonorsContextCancel(t *testing.T) {
ir := mockReaderOfLabels{}
failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations)
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
_, err := inversePostingsForMatcher(ctx, ir, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
require.Error(t, err)
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}
type mockReaderOfLabels struct{}
const mockReaderOfLabelsSeriesCount = checkContextEveryNIterations * 10