diff --git a/tsdb/querier.go b/tsdb/querier.go index dad559549..ba2f24677 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -346,6 +346,8 @@ func inversePostingsForMatcher(ix IndexPostingsReader, m *labels.Matcher) (index return ix.Postings(m.Name, res...) } +const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series. + func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) { p, err := PostingsForMatchers(r, matchers...) if err != nil { @@ -376,6 +378,34 @@ func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Mat allValues = filteredValues } + // Let's see if expanded postings for matchers have smaller cardinality than label values. + // Since computing label values from series is expensive, we apply a limit on number of expanded + // postings (and series). + maxExpandedPostings := len(allValues) / maxExpandedPostingsFactor + if maxExpandedPostings > 0 { + // Add space for one extra posting when checking if we expanded all postings. + expanded := make([]storage.SeriesRef, 0, maxExpandedPostings+1) + + // Call p.Next() even if len(expanded) == maxExpandedPostings. This tells us if there are more postings or not. + for len(expanded) <= maxExpandedPostings && p.Next() { + expanded = append(expanded, p.At()) + } + + if len(expanded) <= maxExpandedPostings { + // When we're here, p.Next() must have returned false, so we need to check for errors. + if err := p.Err(); err != nil { + return nil, errors.Wrap(err, "expanding postings for matchers") + } + + // We have expanded all the postings -- all returned label values will be from these series only. + // (We supply allValues as a buffer for storing results. It should be big enough already, since it holds all possible label values.) + return labelValuesFromSeries(r, name, expanded, allValues) + } + + // If we haven't reached end of postings, we prepend our expanded postings to "p", and continue. + p = newPrependPostings(expanded, p) + } + valuesPostings := make([]index.Postings, len(allValues)) for i, value := range allValues { valuesPostings[i], err = r.Postings(name, value) @@ -396,6 +426,83 @@ func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Mat return values, nil } +// labelValuesFromSeries returns all unique label values from for given label name from supplied series. Values are not sorted. +// buf is space for holding result (if it isn't big enough, it will be ignored), may be nil. +func labelValuesFromSeries(r IndexReader, labelName string, refs []storage.SeriesRef, buf []string) ([]string, error) { + values := map[string]struct{}{} + + var builder labels.ScratchBuilder + for _, ref := range refs { + err := r.Series(ref, &builder, nil) + if err != nil { + return nil, errors.Wrapf(err, "label values for label %s", labelName) + } + + v := builder.Labels().Get(labelName) + if v != "" { + values[v] = struct{}{} + } + } + + if cap(buf) >= len(values) { + buf = buf[:0] + } else { + buf = make([]string, 0, len(values)) + } + for v := range values { + buf = append(buf, v) + } + return buf, nil +} + +func newPrependPostings(a []storage.SeriesRef, b index.Postings) index.Postings { + return &prependPostings{ + ix: -1, + prefix: a, + rest: b, + } +} + +// prependPostings returns series references from "prefix" before using "rest" postings. +type prependPostings struct { + ix int + prefix []storage.SeriesRef + rest index.Postings +} + +func (p *prependPostings) Next() bool { + p.ix++ + if p.ix < len(p.prefix) { + return true + } + return p.rest.Next() +} + +func (p *prependPostings) Seek(v storage.SeriesRef) bool { + for p.ix < len(p.prefix) { + if p.ix >= 0 && p.prefix[p.ix] >= v { + return true + } + p.ix++ + } + + return p.rest.Seek(v) +} + +func (p *prependPostings) At() storage.SeriesRef { + if p.ix >= 0 && p.ix < len(p.prefix) { + return p.prefix[p.ix] + } + return p.rest.At() +} + +func (p *prependPostings) Err() error { + if p.ix >= 0 && p.ix < len(p.prefix) { + return nil + } + return p.rest.Err() +} + func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]string, error) { p, err := r.PostingsForMatchers(false, matchers...) if err != nil { diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index cfe4d6fe9..71aac0fb1 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -50,7 +50,7 @@ func BenchmarkQuerier(b *testing.B) { for n := 0; n < 10; n++ { for i := 0; i < 100000; i++ { - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo", "i_times_n", strconv.Itoa(i*n))) // Have some series that won't be matched, to properly test inverted matches. addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "0_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) @@ -184,6 +184,9 @@ func benchmarkLabelValuesWithMatchers(b *testing.B, ir IndexReader) { n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix) nX := labels.MustNewMatcher(labels.MatchNotEqual, "n", "X"+postingsBenchSuffix) nPlus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$") + primesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "533701") // = 76243*7, ie. multiplication of primes. It will match single i*n combination. + nonPrimesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "20") // 1*20, 2*10, 4*5, 5*4 + times12 := labels.MustNewMatcher(labels.MatchRegexp, "i_times_n", "12.*") cases := []struct { name string @@ -199,6 +202,9 @@ func benchmarkLabelValuesWithMatchers(b *testing.B, ir IndexReader) { {`i with n="1",j=~"XXX|YYY"`, "i", []*labels.Matcher{n1, jXXXYYY}}, {`i with n="X",j!="foo"`, "i", []*labels.Matcher{nX, jNotFoo}}, {`i with n="1",i=~"^.*$",j!="foo"`, "i", []*labels.Matcher{n1, iStar, jNotFoo}}, + {`i with i_times_n=533701`, "i", []*labels.Matcher{primesTimes}}, + {`i with i_times_n=20`, "i", []*labels.Matcher{nonPrimesTimes}}, + {`i with i_times_n=~"12.*""`, "i", []*labels.Matcher{times12}}, // n has 10 values. {`n with j!="foo"`, "n", []*labels.Matcher{jNotFoo}}, {`n with i="1"`, "n", []*labels.Matcher{i1}}, diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 128db31a5..a81809b8d 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2733,3 +2733,115 @@ func TestQueryWithDeletedHistograms(t *testing.T) { }) } } + +func TestPrependPostings(t *testing.T) { + t.Run("empty", func(t *testing.T) { + p := newPrependPostings(nil, index.NewListPostings(nil)) + require.False(t, p.Next()) + }) + + t.Run("next+At", func(t *testing.T) { + p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.NewListPostings([]storage.SeriesRef{200, 300, 500})) + + for _, s := range []storage.SeriesRef{10, 20, 30, 200, 300, 500} { + require.True(t, p.Next()) + require.Equal(t, s, p.At()) + require.Equal(t, s, p.At()) // Multiple calls return same value. + } + require.False(t, p.Next()) + }) + + t.Run("seek+At", func(t *testing.T) { + p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.NewListPostings([]storage.SeriesRef{200, 300, 500})) + + require.True(t, p.Seek(5)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.Equal(t, storage.SeriesRef(10), p.At()) + + require.True(t, p.Seek(15)) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.Equal(t, storage.SeriesRef(20), p.At()) + + require.True(t, p.Seek(20)) // Seeking to "current" value doesn't move postings iterator. + require.Equal(t, storage.SeriesRef(20), p.At()) + require.Equal(t, storage.SeriesRef(20), p.At()) + + require.True(t, p.Seek(50)) + require.Equal(t, storage.SeriesRef(200), p.At()) + require.Equal(t, storage.SeriesRef(200), p.At()) + + require.False(t, p.Seek(1000)) + require.False(t, p.Next()) + }) + + t.Run("err", func(t *testing.T) { + err := fmt.Errorf("error") + p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.ErrPostings(err)) + + for _, s := range []storage.SeriesRef{10, 20, 30} { + require.True(t, p.Next()) + require.Equal(t, s, p.At()) + require.NoError(t, p.Err()) + } + // Advancing after prepended values returns false, and gives us access to error. + require.False(t, p.Next()) + require.Equal(t, err, p.Err()) + }) +} + +func TestLabelsValuesWithMatchersOptimization(t *testing.T) { + dir := t.TempDir() + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = dir + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(t, err) + defer func() { + require.NoError(t, h.Close()) + }() + + app := h.Appender(context.Background()) + addSeries := func(l labels.Labels) { + app.Append(0, l, 0, 0) + } + + const maxI = 10 * maxExpandedPostingsFactor + + allValuesOfI := make([]string, 0, maxI) + for i := 0; i < maxI; i++ { + allValuesOfI = append(allValuesOfI, strconv.Itoa(i)) + } + + for n := 0; n < 10; n++ { + for i := 0; i < maxI; i++ { + addSeries(labels.FromStrings("i", allValuesOfI[i], "n", strconv.Itoa(n), "j", "foo", "i_times_n", strconv.Itoa(i*n))) + } + } + require.NoError(t, app.Commit()) + + ir, err := h.Index() + require.NoError(t, err) + + primesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "23") // It will match single i*n combination (n < 10) + nonPrimesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "20") + n3 := labels.MustNewMatcher(labels.MatchEqual, "n", "3") + + cases := []struct { + name string + labelName string + matchers []*labels.Matcher + expectedResults []string + }{ + {name: `i with i_times_n=23`, labelName: "i", matchers: []*labels.Matcher{primesTimes}, expectedResults: []string{"23"}}, + {name: `i with i_times_n=20`, labelName: "i", matchers: []*labels.Matcher{nonPrimesTimes}, expectedResults: []string{"4", "5", "10", "20"}}, + {name: `n with n="3"`, labelName: "i", matchers: []*labels.Matcher{n3}, expectedResults: allValuesOfI}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + values, err := labelValuesWithMatchers(ir, c.labelName, c.matchers...) + require.NoError(t, err) + require.ElementsMatch(t, c.expectedResults, values) + }) + } +}