diff --git a/tsdb/block.go b/tsdb/block.go index aec99788c..6483f8d8b 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -82,6 +82,10 @@ type IndexReader interface { // If no postings are found having at least one matching label, an empty iterator is returned. PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings + // PostingsForAllLabelValues returns a sorted iterator over all postings having a label with the given name. + // If no postings are found with the label in question, an empty iterator is returned. + PostingsForAllLabelValues(ctx context.Context, name string) index.Postings + // SortedPostings returns a postings list that is reordered to be sorted // by the label set of the underlying series. SortedPostings(index.Postings) index.Postings @@ -531,6 +535,10 @@ func (r blockIndexReader) PostingsForLabelMatching(ctx context.Context, name str return r.ir.PostingsForLabelMatching(ctx, name, match) } +func (r blockIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + return r.ir.PostingsForAllLabelValues(ctx, name) +} + func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { return r.ir.SortedPostings(p) } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 29adc3ee7..79ed0f024 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -123,6 +123,10 @@ func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name str return h.head.postings.PostingsForLabelMatching(ctx, name, match) } +func (h *headIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + return h.head.postings.PostingsForAllLabelValues(ctx, name) +} + func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { series := make([]*memSeries, 0, 128) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 4e199c5bd..2708b07b1 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1777,6 +1777,15 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P } func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { + return r.postingsForLabelMatching(ctx, name, match) +} + +func (r *Reader) PostingsForAllLabelValues(ctx context.Context, name string) Postings { + return r.postingsForLabelMatching(ctx, name, nil) +} + +// postingsForLabelMatching implements PostingsForLabelMatching if match is non-nil, and PostingsForAllLabelValues otherwise. +func (r *Reader) postingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { if r.version == FormatV1 { return r.postingsForLabelMatchingV1(ctx, name, match) } @@ -1786,11 +1795,17 @@ func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, matc return EmptyPostings() } + postingsEstimate := 0 + if match == nil { + // The caller wants all postings for name. + postingsEstimate = len(e) * symbolFactor + } + lastVal := e[len(e)-1].value - var its []Postings + its := make([]Postings, 0, postingsEstimate) if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) { - if match(val) { - // We want this postings iterator since the value is a match + if match == nil || match(val) { + // We want this postings iterator since the value is a match. postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) _, p, err := r.dec.DecodePostings(postingsDec) if err != nil { @@ -1819,7 +1834,7 @@ func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, ma return ErrPostings(ctx.Err()) } count++ - if !match(val) { + if match != nil && !match(val) { continue } diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 0d330cfcb..0a45f3722 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -613,6 +613,52 @@ func TestChunksTimeOrdering(t *testing.T) { require.NoError(t, idx.Close()) } +func TestReader_PostingsForLabelMatching(t *testing.T) { + const seriesCount = 9 + var input indexWriterSeriesSlice + for i := 1; i <= seriesCount; i++ { + input = append(input, &indexWriterSeries{ + labels: labels.FromStrings("__name__", strconv.Itoa(i)), + chunks: []chunks.Meta{ + {Ref: 1, MinTime: 0, MaxTime: 10}, + }, + }) + } + ir, _, _ := createFileReader(context.Background(), t, input) + + p := ir.PostingsForLabelMatching(context.Background(), "__name__", func(v string) bool { + iv, err := strconv.Atoi(v) + if err != nil { + panic(err) + } + return iv%2 == 0 + }) + require.NoError(t, p.Err()) + refs, err := ExpandPostings(p) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{4, 6, 8, 10}, refs) +} + +func TestReader_PostingsForAllLabelValues(t *testing.T) { + const seriesCount = 9 + var input indexWriterSeriesSlice + for i := 1; i <= seriesCount; i++ { + input = append(input, &indexWriterSeries{ + labels: labels.FromStrings("__name__", strconv.Itoa(i)), + chunks: []chunks.Meta{ + {Ref: 1, MinTime: 0, MaxTime: 10}, + }, + }) + } + ir, _, _ := createFileReader(context.Background(), t, input) + + p := ir.PostingsForAllLabelValues(context.Background(), "__name__") + require.NoError(t, p.Err()) + refs, err := ExpandPostings(p) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{3, 4, 5, 6, 7, 8, 9, 10, 11}, refs) +} + func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { const seriesCount = 1000 var input indexWriterSeriesSlice diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index d7b497e61..43ab1b55a 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -447,6 +447,22 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, return Merge(ctx, its...) } +func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string) Postings { + p.mtx.RLock() + + e := p.m[name] + its := make([]Postings, 0, len(e)) + for _, refs := range e { + if len(refs) > 0 { + its = append(its, NewListPostings(refs)) + } + } + + // Let the mutex go before merging. + p.mtx.RUnlock() + return Merge(ctx, its...) +} + // labelValues returns a slice of label values for the given label name. // It will take the read lock. func (p *MemPostings) labelValues(name string) []string { diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 96c9ed124..6ff5b9c06 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1460,6 +1460,21 @@ func TestMemPostings_PostingsForLabelMatching(t *testing.T) { require.Equal(t, []storage.SeriesRef{2, 4}, refs) } +func TestMemPostings_PostingsForAllLabelValues(t *testing.T) { + mp := NewMemPostings() + mp.Add(1, labels.FromStrings("foo", "1")) + mp.Add(2, labels.FromStrings("foo", "2")) + mp.Add(3, labels.FromStrings("foo", "3")) + mp.Add(4, labels.FromStrings("foo", "4")) + + p := mp.PostingsForAllLabelValues(context.Background(), "foo") + require.NoError(t, p.Err()) + refs, err := ExpandPostings(p) + require.NoError(t, err) + // All postings for the label should be returned. + require.Equal(t, []storage.SeriesRef{1, 2, 3, 4}, refs) +} + func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { memP := NewMemPostings() seriesCount := 10 * checkContextEveryNIterations diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 26cd4d057..2a1a44d18 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -450,6 +450,10 @@ func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context return index.ErrPostings(errors.New("not supported")) } +func (ir *OOOCompactionHeadIndexReader) PostingsForAllLabelValues(context.Context, string) index.Postings { + return index.ErrPostings(errors.New("not supported")) +} + func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings { // This will already be sorted from the Postings() call above. return p diff --git a/tsdb/querier.go b/tsdb/querier.go index f741c5e28..be2dea2de 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -365,29 +365,16 @@ func inversePostingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Ma return ix.Postings(ctx, m.Name, m.Value) } - vals, err := ix.LabelValues(ctx, m.Name) - if err != nil { - return nil, err - } - - res := vals[:0] - // If the match before inversion was !="" or !~"", we just want all the values. + // If the matcher being inverted is =~"" or ="", we just want all the values. if m.Value == "" && (m.Type == labels.MatchRegexp || m.Type == labels.MatchEqual) { - res = vals - } else { - count := 1 - for _, val := range vals { - if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { - return nil, ctx.Err() - } - count++ - if !m.Matches(val) { - res = append(res, val) - } - } + it := ix.PostingsForAllLabelValues(ctx, m.Name) + return it, it.Err() } - return ix.Postings(ctx, m.Name, res...) + it := ix.PostingsForLabelMatching(ctx, m.Name, func(s string) bool { + return !m.Matches(s) + }) + return it, it.Err() } func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) { diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index e97aea2fd..8e23bee58 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2340,6 +2340,16 @@ func (m mockIndex) PostingsForLabelMatching(ctx context.Context, name string, ma return index.Merge(ctx, res...) } +func (m mockIndex) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + var res []index.Postings + for l, srs := range m.postings { + if l.Name == name { + res = append(res, index.NewListPostings(srs)) + } + } + return index.Merge(ctx, res...) +} + func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { out := make([]storage.SeriesRef, 0, 128) @@ -3327,6 +3337,10 @@ func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func return index.ErrPostings(errors.New("PostingsForLabelMatching called")) } +func (m mockMatcherIndex) PostingsForAllLabelValues(context.Context, string) index.Postings { + return index.ErrPostings(errors.New("PostingsForAllLabelValues called")) +} + func TestPostingsForMatcher(t *testing.T) { ctx := context.Background() @@ -3725,17 +3739,6 @@ func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result. } -func TestReader_InversePostingsForMatcherHonorsContextCancel(t *testing.T) { - ir := mockReaderOfLabels{} - - failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations) - ctx := &testutil.MockContextErrAfter{FailAfter: failAfter} - _, err := inversePostingsForMatcher(ctx, ir, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) - - require.Error(t, err) - require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result. -} - type mockReaderOfLabels struct{} const mockReaderOfLabelsSeriesCount = checkContextEveryNIterations * 10 @@ -3768,6 +3771,10 @@ func (m mockReaderOfLabels) PostingsForLabelMatching(context.Context, string, fu panic("PostingsForLabelMatching called") } +func (m mockReaderOfLabels) PostingsForAllLabelValues(context.Context, string) index.Postings { + panic("PostingsForAllLabelValues called") +} + func (m mockReaderOfLabels) Postings(context.Context, string, ...string) (index.Postings, error) { panic("Postings called") }