diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 8d34a54439..cd19c0ef58 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -103,11 +103,18 @@ func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, err // Postings returns the postings list iterator for the label pairs. func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) { - res := make([]index.Postings, 0, len(values)) - for _, value := range values { - res = append(res, h.head.postings.Get(name, value)) + switch len(values) { + case 0: + return index.EmptyPostings(), nil + case 1: + return h.head.postings.Get(name, values[0]), nil + default: + res := make([]index.Postings, 0, len(values)) + for _, value := range values { + res = append(res, h.head.postings.Get(name, value)) + } + return index.Merge(res...), nil } - return index.Merge(res...), nil } func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 10312d84d8..69a20b08b6 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -20,6 +20,8 @@ import ( "sort" "sync" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" ) @@ -831,3 +833,78 @@ type seriesRefSlice []storage.SeriesRef func (x seriesRefSlice) Len() int { return len(x) } func (x seriesRefSlice) Less(i, j int) bool { return x[i] < x[j] } func (x seriesRefSlice) Swap(i, j int) { x[i], x[j] = x[j], x[i] } + +// FindIntersectingPostings checks the intersection of p and candidates[i] for each i in candidates, +// if intersection is non empty, then i is added to the indexes returned. +// Returned indexes are not sorted. +func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int, err error) { + h := make(postingsWithIndexHeap, 0, len(candidates)) + for idx, it := range candidates { + if it.Next() { + h = append(h, postingsWithIndex{index: idx, p: it}) + } else if it.Err() != nil { + return nil, it.Err() + } + } + if len(h) == 0 { + return nil, nil + } + h.Init() + + for len(h) > 0 { + if !p.Seek(h.At()) { + return indexes, p.Err() + } + if p.At() == h.At() { + found := heap.Pop(&h).(postingsWithIndex) + indexes = append(indexes, found.index) + } else if err := h.Next(); err != nil { + return nil, err + } + } + + return indexes, nil +} + +type postingsWithIndex struct { + index int + p Postings +} + +type postingsWithIndexHeap []postingsWithIndex + +func (h *postingsWithIndexHeap) Init() { heap.Init(h) } +func (h postingsWithIndexHeap) At() storage.SeriesRef { return h[0].p.At() } +func (h *postingsWithIndexHeap) Next() error { + pi := (*h)[0] + next := pi.p.Next() + if next { + heap.Fix(h, 0) + return nil + } + + if err := pi.p.Err(); err != nil { + return errors.Wrapf(err, "postings %d", pi.index) + } + + heap.Pop(h) + return nil +} + +func (h postingsWithIndexHeap) Len() int { return len(h) } +func (h postingsWithIndexHeap) Less(i, j int) bool { return h[i].p.At() < h[j].p.At() } +func (h *postingsWithIndexHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } + +func (h *postingsWithIndexHeap) Push(x interface{}) { + *h = append(*h, x.(postingsWithIndex)) +} + +// Pop implements heap.Interface and pops the last element, which is NOT the min element, +// so this doesn't return the same heap.Pop() +func (h *postingsWithIndexHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 8cb76b0ad1..4eaa9a080f 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -14,6 +14,8 @@ package index import ( + "container/heap" + "context" "encoding/binary" "fmt" "math/rand" @@ -21,6 +23,7 @@ import ( "strconv" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" @@ -929,3 +932,142 @@ func TestMemPostings_Delete(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(expanded), "expected empty postings, got %v", expanded) } + +func TestFindIntersectingPostings(t *testing.T) { + t.Run("multiple intersections", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 15, 20, 25, 30, 35, 40, 45, 50}) + candidates := []Postings{ + 0: NewListPostings([]storage.SeriesRef{7, 13, 14, 27}), // Does not intersect. + 1: NewListPostings([]storage.SeriesRef{10, 20}), // Does intersect. + 2: NewListPostings([]storage.SeriesRef{29, 30, 31}), // Does intersect. + 3: NewListPostings([]storage.SeriesRef{29, 30, 31}), // Does intersect (same again). + 4: NewListPostings([]storage.SeriesRef{60}), // Does not intersect. + 5: NewListPostings([]storage.SeriesRef{45}), // Does intersect. + 6: EmptyPostings(), // Does not intersect. + } + + indexes, err := FindIntersectingPostings(p, candidates) + require.NoError(t, err) + sort.Ints(indexes) + require.Equal(t, []int{1, 2, 3, 5}, indexes) + }) + + t.Run("no intersections", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 15, 20, 25, 30, 35, 40, 45, 50}) + candidates := []Postings{ + 0: NewListPostings([]storage.SeriesRef{7, 13, 14, 27}), // Does not intersect. + 1: NewListPostings([]storage.SeriesRef{60}), // Does not intersect. + 2: EmptyPostings(), // Does not intersect. + } + + indexes, err := FindIntersectingPostings(p, candidates) + require.NoError(t, err) + require.Empty(t, indexes) + }) + + t.Run("p is ErrPostings", func(t *testing.T) { + p := ErrPostings(context.Canceled) + candidates := []Postings{NewListPostings([]storage.SeriesRef{1})} + _, err := FindIntersectingPostings(p, candidates) + require.Error(t, err) + }) + + t.Run("one of the candidates is ErrPostings", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{1}) + candidates := []Postings{ + NewListPostings([]storage.SeriesRef{1}), + ErrPostings(context.Canceled), + } + _, err := FindIntersectingPostings(p, candidates) + require.Error(t, err) + }) + + t.Run("one of the candidates fails on nth call", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50, 60, 70}) + candidates := []Postings{ + NewListPostings([]storage.SeriesRef{7, 13, 14, 27}), + &postingsFailingAfterNthCall{2, NewListPostings([]storage.SeriesRef{29, 30, 31, 40})}, + } + _, err := FindIntersectingPostings(p, candidates) + require.Error(t, err) + }) + + t.Run("p fails on the nth call", func(t *testing.T) { + p := &postingsFailingAfterNthCall{2, NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50, 60, 70})} + candidates := []Postings{ + NewListPostings([]storage.SeriesRef{7, 13, 14, 27}), + NewListPostings([]storage.SeriesRef{29, 30, 31, 40}), + } + _, err := FindIntersectingPostings(p, candidates) + require.Error(t, err) + }) +} + +type postingsFailingAfterNthCall struct { + ttl int + Postings +} + +func (p *postingsFailingAfterNthCall) Seek(v storage.SeriesRef) bool { + p.ttl-- + if p.ttl <= 0 { + return false + } + return p.Postings.Seek(v) +} + +func (p *postingsFailingAfterNthCall) Next() bool { + p.ttl-- + if p.ttl <= 0 { + return false + } + return p.Postings.Next() +} + +func (p *postingsFailingAfterNthCall) Err() error { + if p.ttl <= 0 { + return errors.New("ttl exceeded") + } + return p.Postings.Err() +} + +func TestPostingsWithIndexHeap(t *testing.T) { + t.Run("iterate", func(t *testing.T) { + h := postingsWithIndexHeap{ + {index: 0, p: NewListPostings([]storage.SeriesRef{10, 20, 30})}, + {index: 1, p: NewListPostings([]storage.SeriesRef{1, 5})}, + {index: 2, p: NewListPostings([]storage.SeriesRef{25, 50})}, + } + for _, node := range h { + node.p.Next() + } + h.Init() + + for _, expected := range []storage.SeriesRef{1, 5, 10, 20, 25, 30, 50} { + require.Equal(t, expected, h.At()) + require.NoError(t, h.Next()) + } + require.Empty(t, h) + }) + + t.Run("pop", func(t *testing.T) { + h := postingsWithIndexHeap{ + {index: 0, p: NewListPostings([]storage.SeriesRef{10, 20, 30})}, + {index: 1, p: NewListPostings([]storage.SeriesRef{1, 5})}, + {index: 2, p: NewListPostings([]storage.SeriesRef{25, 50})}, + } + for _, node := range h { + node.p.Next() + } + h.Init() + + for _, expected := range []storage.SeriesRef{1, 5, 10, 20} { + require.Equal(t, expected, h.At()) + require.NoError(t, h.Next()) + } + require.Equal(t, storage.SeriesRef(25), h.At()) + node := heap.Pop(&h).(postingsWithIndex) + require.Equal(t, 2, node.index) + require.Equal(t, storage.SeriesRef(25), node.p.At()) + }) +} diff --git a/tsdb/querier.go b/tsdb/querier.go index 4719470a74..522adb87cd 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -375,38 +375,30 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting } func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) { - // We're only interested in metrics which have the label . - requireLabel, err := labels.NewMatcher(labels.MatchNotEqual, name, "") + p, err := PostingsForMatchers(r, matchers...) if err != nil { - return nil, errors.Wrapf(err, "Failed to instantiate label matcher") + return nil, errors.Wrap(err, "fetching postings for matchers") } - var p index.Postings - p, err = PostingsForMatchers(r, append(matchers, requireLabel)...) + allValues, err := r.LabelValues(name) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "fetching values of label %s", name) } - - dedupe := map[string]interface{}{} - for p.Next() { - v, err := r.LabelValueFor(p.At(), name) + valuesPostings := make([]index.Postings, len(allValues)) + for i, value := range allValues { + valuesPostings[i], err = r.Postings(name, value) if err != nil { - if err == storage.ErrNotFound { - continue - } - - return nil, err + return nil, errors.Wrapf(err, "fetching postings for %s=%q", name, value) } - dedupe[v] = nil + } + indexes, err := index.FindIntersectingPostings(p, valuesPostings) + if err != nil { + return nil, errors.Wrap(err, "intersecting postings") } - if err = p.Err(); err != nil { - return nil, err - } - - values := make([]string, 0, len(dedupe)) - for value := range dedupe { - values = append(values, value) + values := make([]string, 0, len(indexes)) + for _, idx := range indexes { + values = append(values, allValues[idx]) } return values, nil diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 71dfef35bb..95ad739039 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -31,7 +31,7 @@ const ( postingsBenchSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" ) -func BenchmarkPostingsForMatchers(b *testing.B) { +func BenchmarkQuerier(b *testing.B) { chunkDir, err := ioutil.TempDir("", "chunk_dir") require.NoError(b, err) defer func() { @@ -66,7 +66,12 @@ func BenchmarkPostingsForMatchers(b *testing.B) { ir, err := h.Index() require.NoError(b, err) b.Run("Head", func(b *testing.B) { - benchmarkPostingsForMatchers(b, ir) + b.Run("PostingsForMatchers", func(b *testing.B) { + benchmarkPostingsForMatchers(b, ir) + }) + b.Run("labelValuesWithMatchers", func(b *testing.B) { + benchmarkLabelValuesWithMatchers(b, ir) + }) }) tmpdir, err := ioutil.TempDir("", "test_benchpostingsformatchers") @@ -85,7 +90,12 @@ func BenchmarkPostingsForMatchers(b *testing.B) { require.NoError(b, err) defer ir.Close() b.Run("Block", func(b *testing.B) { - benchmarkPostingsForMatchers(b, ir) + b.Run("PostingsForMatchers", func(b *testing.B) { + benchmarkPostingsForMatchers(b, ir) + }) + b.Run("labelValuesWithMatchers", func(b *testing.B) { + benchmarkLabelValuesWithMatchers(b, ir) + }) }) } @@ -103,7 +113,7 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) { i1Plus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^1.+$") iEmptyRe := labels.MustNewMatcher(labels.MatchRegexp, "i", "^$") iNotEmpty := labels.MustNewMatcher(labels.MatchNotEqual, "i", "") - iNot2 := labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+postingsBenchSuffix) + iNot2 := labels.MustNewMatcher(labels.MatchNotEqual, "i", "2"+postingsBenchSuffix) iNot2Star := labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$") iNotStar2Star := labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^.*2.*$") @@ -143,6 +153,38 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) { } } +func benchmarkLabelValuesWithMatchers(b *testing.B, ir IndexReader) { + i1 := labels.MustNewMatcher(labels.MatchEqual, "i", "1") + iStar := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.*$") + jNotFoo := labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo") + n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix) + nPlus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$") + + cases := []struct { + name string + labelName string + matchers []*labels.Matcher + }{ + // i has 100k values. + {`i with n="1"`, "i", []*labels.Matcher{n1}}, + {`i with n="^.+$"`, "i", []*labels.Matcher{nPlus}}, + {`i with n="1",j!="foo"`, "i", []*labels.Matcher{n1, jNotFoo}}, + {`i with n="1",i=~"^.*$",j!="foo"`, "i", []*labels.Matcher{n1, iStar, jNotFoo}}, + // n has 10 values. + {`n with j!="foo"`, "n", []*labels.Matcher{jNotFoo}}, + {`n with i="1"`, "n", []*labels.Matcher{i1}}, + } + + for _, c := range cases { + b.Run(c.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + _, err := labelValuesWithMatchers(ir, c.labelName, c.matchers...) + require.NoError(b, err) + } + }) + } +} + func BenchmarkQuerierSelect(b *testing.B) { chunkDir, err := ioutil.TempDir("", "chunk_dir") require.NoError(b, err)