Optimize labelValuesWithMatchers to fetch label values from selected series (#518)

* Optimize labelValues when matchers select small number of series.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Address review feedback.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Add missing p.Err()

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Reuse existing slice with all label values.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Simplify check for expanded postings.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Fix comment.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

---------

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
This commit is contained in:
Peter Štibraný 2023-07-27 11:44:29 +02:00 committed by GitHub
parent 2a075ced62
commit c8f5ad1492
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 226 additions and 1 deletions

View file

@ -346,6 +346,8 @@ func inversePostingsForMatcher(ix IndexPostingsReader, m *labels.Matcher) (index
return ix.Postings(m.Name, res...)
}
const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series.
func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
p, err := PostingsForMatchers(r, matchers...)
if err != nil {
@ -376,6 +378,34 @@ func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Mat
allValues = filteredValues
}
// Let's see if expanded postings for matchers have smaller cardinality than label values.
// Since computing label values from series is expensive, we apply a limit on number of expanded
// postings (and series).
maxExpandedPostings := len(allValues) / maxExpandedPostingsFactor
if maxExpandedPostings > 0 {
// Add space for one extra posting when checking if we expanded all postings.
expanded := make([]storage.SeriesRef, 0, maxExpandedPostings+1)
// Call p.Next() even if len(expanded) == maxExpandedPostings. This tells us if there are more postings or not.
for len(expanded) <= maxExpandedPostings && p.Next() {
expanded = append(expanded, p.At())
}
if len(expanded) <= maxExpandedPostings {
// When we're here, p.Next() must have returned false, so we need to check for errors.
if err := p.Err(); err != nil {
return nil, errors.Wrap(err, "expanding postings for matchers")
}
// We have expanded all the postings -- all returned label values will be from these series only.
// (We supply allValues as a buffer for storing results. It should be big enough already, since it holds all possible label values.)
return labelValuesFromSeries(r, name, expanded, allValues)
}
// If we haven't reached end of postings, we prepend our expanded postings to "p", and continue.
p = newPrependPostings(expanded, p)
}
valuesPostings := make([]index.Postings, len(allValues))
for i, value := range allValues {
valuesPostings[i], err = r.Postings(name, value)
@ -396,6 +426,83 @@ func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Mat
return values, nil
}
// labelValuesFromSeries returns all unique label values from for given label name from supplied series. Values are not sorted.
// buf is space for holding result (if it isn't big enough, it will be ignored), may be nil.
func labelValuesFromSeries(r IndexReader, labelName string, refs []storage.SeriesRef, buf []string) ([]string, error) {
values := map[string]struct{}{}
var builder labels.ScratchBuilder
for _, ref := range refs {
err := r.Series(ref, &builder, nil)
if err != nil {
return nil, errors.Wrapf(err, "label values for label %s", labelName)
}
v := builder.Labels().Get(labelName)
if v != "" {
values[v] = struct{}{}
}
}
if cap(buf) >= len(values) {
buf = buf[:0]
} else {
buf = make([]string, 0, len(values))
}
for v := range values {
buf = append(buf, v)
}
return buf, nil
}
func newPrependPostings(a []storage.SeriesRef, b index.Postings) index.Postings {
return &prependPostings{
ix: -1,
prefix: a,
rest: b,
}
}
// prependPostings returns series references from "prefix" before using "rest" postings.
type prependPostings struct {
ix int
prefix []storage.SeriesRef
rest index.Postings
}
func (p *prependPostings) Next() bool {
p.ix++
if p.ix < len(p.prefix) {
return true
}
return p.rest.Next()
}
func (p *prependPostings) Seek(v storage.SeriesRef) bool {
for p.ix < len(p.prefix) {
if p.ix >= 0 && p.prefix[p.ix] >= v {
return true
}
p.ix++
}
return p.rest.Seek(v)
}
func (p *prependPostings) At() storage.SeriesRef {
if p.ix >= 0 && p.ix < len(p.prefix) {
return p.prefix[p.ix]
}
return p.rest.At()
}
func (p *prependPostings) Err() error {
if p.ix >= 0 && p.ix < len(p.prefix) {
return nil
}
return p.rest.Err()
}
func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]string, error) {
p, err := r.PostingsForMatchers(false, matchers...)
if err != nil {

View file

@ -50,7 +50,7 @@ func BenchmarkQuerier(b *testing.B) {
for n := 0; n < 10; n++ {
for i := 0; i < 100000; i++ {
addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo"))
addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo", "i_times_n", strconv.Itoa(i*n)))
// Have some series that won't be matched, to properly test inverted matches.
addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "bar"))
addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "0_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar"))
@ -184,6 +184,9 @@ func benchmarkLabelValuesWithMatchers(b *testing.B, ir IndexReader) {
n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)
nX := labels.MustNewMatcher(labels.MatchNotEqual, "n", "X"+postingsBenchSuffix)
nPlus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$")
primesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "533701") // = 76243*7, ie. multiplication of primes. It will match single i*n combination.
nonPrimesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "20") // 1*20, 2*10, 4*5, 5*4
times12 := labels.MustNewMatcher(labels.MatchRegexp, "i_times_n", "12.*")
cases := []struct {
name string
@ -199,6 +202,9 @@ func benchmarkLabelValuesWithMatchers(b *testing.B, ir IndexReader) {
{`i with n="1",j=~"XXX|YYY"`, "i", []*labels.Matcher{n1, jXXXYYY}},
{`i with n="X",j!="foo"`, "i", []*labels.Matcher{nX, jNotFoo}},
{`i with n="1",i=~"^.*$",j!="foo"`, "i", []*labels.Matcher{n1, iStar, jNotFoo}},
{`i with i_times_n=533701`, "i", []*labels.Matcher{primesTimes}},
{`i with i_times_n=20`, "i", []*labels.Matcher{nonPrimesTimes}},
{`i with i_times_n=~"12.*""`, "i", []*labels.Matcher{times12}},
// n has 10 values.
{`n with j!="foo"`, "n", []*labels.Matcher{jNotFoo}},
{`n with i="1"`, "n", []*labels.Matcher{i1}},

View file

@ -2733,3 +2733,115 @@ func TestQueryWithDeletedHistograms(t *testing.T) {
})
}
}
func TestPrependPostings(t *testing.T) {
t.Run("empty", func(t *testing.T) {
p := newPrependPostings(nil, index.NewListPostings(nil))
require.False(t, p.Next())
})
t.Run("next+At", func(t *testing.T) {
p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.NewListPostings([]storage.SeriesRef{200, 300, 500}))
for _, s := range []storage.SeriesRef{10, 20, 30, 200, 300, 500} {
require.True(t, p.Next())
require.Equal(t, s, p.At())
require.Equal(t, s, p.At()) // Multiple calls return same value.
}
require.False(t, p.Next())
})
t.Run("seek+At", func(t *testing.T) {
p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.NewListPostings([]storage.SeriesRef{200, 300, 500}))
require.True(t, p.Seek(5))
require.Equal(t, storage.SeriesRef(10), p.At())
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Seek(15))
require.Equal(t, storage.SeriesRef(20), p.At())
require.Equal(t, storage.SeriesRef(20), p.At())
require.True(t, p.Seek(20)) // Seeking to "current" value doesn't move postings iterator.
require.Equal(t, storage.SeriesRef(20), p.At())
require.Equal(t, storage.SeriesRef(20), p.At())
require.True(t, p.Seek(50))
require.Equal(t, storage.SeriesRef(200), p.At())
require.Equal(t, storage.SeriesRef(200), p.At())
require.False(t, p.Seek(1000))
require.False(t, p.Next())
})
t.Run("err", func(t *testing.T) {
err := fmt.Errorf("error")
p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.ErrPostings(err))
for _, s := range []storage.SeriesRef{10, 20, 30} {
require.True(t, p.Next())
require.Equal(t, s, p.At())
require.NoError(t, p.Err())
}
// Advancing after prepended values returns false, and gives us access to error.
require.False(t, p.Next())
require.Equal(t, err, p.Err())
})
}
func TestLabelsValuesWithMatchersOptimization(t *testing.T) {
dir := t.TempDir()
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, h.Close())
}()
app := h.Appender(context.Background())
addSeries := func(l labels.Labels) {
app.Append(0, l, 0, 0)
}
const maxI = 10 * maxExpandedPostingsFactor
allValuesOfI := make([]string, 0, maxI)
for i := 0; i < maxI; i++ {
allValuesOfI = append(allValuesOfI, strconv.Itoa(i))
}
for n := 0; n < 10; n++ {
for i := 0; i < maxI; i++ {
addSeries(labels.FromStrings("i", allValuesOfI[i], "n", strconv.Itoa(n), "j", "foo", "i_times_n", strconv.Itoa(i*n)))
}
}
require.NoError(t, app.Commit())
ir, err := h.Index()
require.NoError(t, err)
primesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "23") // It will match single i*n combination (n < 10)
nonPrimesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "20")
n3 := labels.MustNewMatcher(labels.MatchEqual, "n", "3")
cases := []struct {
name string
labelName string
matchers []*labels.Matcher
expectedResults []string
}{
{name: `i with i_times_n=23`, labelName: "i", matchers: []*labels.Matcher{primesTimes}, expectedResults: []string{"23"}},
{name: `i with i_times_n=20`, labelName: "i", matchers: []*labels.Matcher{nonPrimesTimes}, expectedResults: []string{"4", "5", "10", "20"}},
{name: `n with n="3"`, labelName: "i", matchers: []*labels.Matcher{n3}, expectedResults: allValuesOfI},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
values, err := labelValuesWithMatchers(ir, c.labelName, c.matchers...)
require.NoError(t, err)
require.ElementsMatch(t, c.expectedResults, values)
})
}
}