mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Label values with matchers by intersecting postings (#9907)
* LabelValues w/matchers by intersecting postings Instead of iterating all matched series to find the values, this checks if each one of the label values is present in the matched series (postings). Pending to be benchmarked. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Benchmark labelValuesWithMatchers name old time/op new time/op Querier/Head/labelValuesWithMatchers/i_with_n="1" 157ms ± 0% 48ms ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 1.80s ± 0% 0.46s ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 144ms ± 0% 57ms ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 304ms ± 0% 111ms ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 761ms ± 0% 164ms ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 6.11µs ± 0% 6.62µs ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 117ms ± 0% 62ms ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 1.44s ± 0% 0.24s ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 92.1ms ± 0% 70.3ms ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 196ms ± 0% 115ms ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 1.23s ± 0% 0.21s ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 1.06ms ± 0% 0.88ms ± 0% name old alloc/op new alloc/op Querier/Head/labelValuesWithMatchers/i_with_n="1" 29.5MB ± 0% 26.9MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 46.8MB ± 0% 251.5MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 29.5MB ± 0% 22.3MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 46.8MB ± 0% 23.9MB ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 10.3kB ± 0% 138535.2kB ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 5.54kB ± 0% 7.09kB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 39.1MB ± 0% 28.5MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 287MB ± 0% 253MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 34.3MB ± 0% 23.9MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 51.6MB ± 0% 25.5MB ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 144MB ± 0% 139MB ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 6.43kB ± 0% 8.66kB ± 0% name old allocs/op new allocs/op Querier/Head/labelValuesWithMatchers/i_with_n="1" 104k ± 0% 500k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 204k ± 0% 600k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 104k ± 0% 500k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 204k ± 0% 500k ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 66.0 ± 0% 255.0 ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 61.0 ± 0% 205.0 ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 304k ± 0% 600k ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 5.20M ± 0% 0.70M ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 204k ± 0% 600k ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 304k ± 0% 600k ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 3.00M ± 0% 0.00M ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 61.0 ± 0% 247.0 ± 0% Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Don't expand postings to intersect them Using a min heap we can check whether matched postings intersect with each one of the label values postings. This avoid expanding postings (and thus having all of them in memory at any point). Slightly slower than the expanding postings version for some cases, but definitely pays the price once the cardinality grows. Still offers 10x latency improvement where previous latencies were reaching 1s. Benchmark results: name \ time/op old.txt intersect.txt intersect_noexpand.txt Querier/Head/labelValuesWithMatchers/i_with_n="1" 157ms ± 0% 48ms ± 0% 110ms ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 1.80s ± 0% 0.46s ± 0% 0.18s ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 144ms ± 0% 57ms ± 0% 125ms ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 304ms ± 0% 111ms ± 0% 177ms ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 761ms ± 0% 164ms ± 0% 134ms ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 6.11µs ± 0% 6.62µs ± 0% 4.29µs ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 117ms ± 0% 62ms ± 0% 120ms ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 1.44s ± 0% 0.24s ± 0% 0.15s ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 92.1ms ± 0% 70.3ms ± 0% 125.4ms ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 196ms ± 0% 115ms ± 0% 170ms ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 1.23s ± 0% 0.21s ± 0% 0.14s ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 1.06ms ± 0% 0.88ms ± 0% 0.92ms ± 0% name \ alloc/op old.txt intersect.txt intersect_noexpand.txt Querier/Head/labelValuesWithMatchers/i_with_n="1" 29.5MB ± 0% 26.9MB ± 0% 19.1MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 46.8MB ± 0% 251.5MB ± 0% 36.3MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 29.5MB ± 0% 22.3MB ± 0% 19.1MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 46.8MB ± 0% 23.9MB ± 0% 20.7MB ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 10.3kB ± 0% 138535.2kB ± 0% 6.4kB ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 5.54kB ± 0% 7.09kB ± 0% 4.30kB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 39.1MB ± 0% 28.5MB ± 0% 20.7MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 287MB ± 0% 253MB ± 0% 38MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 34.3MB ± 0% 23.9MB ± 0% 20.7MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 51.6MB ± 0% 25.5MB ± 0% 22.3MB ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 144MB ± 0% 139MB ± 0% 0MB ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 6.43kB ± 0% 8.66kB ± 0% 5.86kB ± 0% name \ allocs/op old.txt intersect.txt intersect_noexpand.txt Querier/Head/labelValuesWithMatchers/i_with_n="1" 104k ± 0% 500k ± 0% 300k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 204k ± 0% 600k ± 0% 400k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 104k ± 0% 500k ± 0% 300k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 204k ± 0% 500k ± 0% 300k ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 66.0 ± 0% 255.0 ± 0% 139.0 ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 61.0 ± 0% 205.0 ± 0% 87.0 ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 304k ± 0% 600k ± 0% 400k ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 5.20M ± 0% 0.70M ± 0% 0.50M ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 204k ± 0% 600k ± 0% 400k ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 304k ± 0% 600k ± 0% 400k ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 3.00M ± 0% 0.00M ± 0% 0.00M ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 61.0 ± 0% 247.0 ± 0% 129.0 ± 0% Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Apply comment suggestions from the code review Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Change else { if } to else if Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Remove sorting of label values We were not sorting them before, so no need to sort them now Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
This commit is contained in:
parent
d5845f8333
commit
3947238ce0
|
@ -103,11 +103,18 @@ func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, err
|
||||||
|
|
||||||
// Postings returns the postings list iterator for the label pairs.
|
// Postings returns the postings list iterator for the label pairs.
|
||||||
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {
|
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {
|
||||||
res := make([]index.Postings, 0, len(values))
|
switch len(values) {
|
||||||
for _, value := range values {
|
case 0:
|
||||||
res = append(res, h.head.postings.Get(name, value))
|
return index.EmptyPostings(), nil
|
||||||
|
case 1:
|
||||||
|
return h.head.postings.Get(name, values[0]), nil
|
||||||
|
default:
|
||||||
|
res := make([]index.Postings, 0, len(values))
|
||||||
|
for _, value := range values {
|
||||||
|
res = append(res, h.head.postings.Get(name, value))
|
||||||
|
}
|
||||||
|
return index.Merge(res...), nil
|
||||||
}
|
}
|
||||||
return index.Merge(res...), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
|
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||||
|
|
|
@ -20,6 +20,8 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
)
|
)
|
||||||
|
@ -831,3 +833,78 @@ type seriesRefSlice []storage.SeriesRef
|
||||||
func (x seriesRefSlice) Len() int { return len(x) }
|
func (x seriesRefSlice) Len() int { return len(x) }
|
||||||
func (x seriesRefSlice) Less(i, j int) bool { return x[i] < x[j] }
|
func (x seriesRefSlice) Less(i, j int) bool { return x[i] < x[j] }
|
||||||
func (x seriesRefSlice) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
|
func (x seriesRefSlice) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
|
||||||
|
|
||||||
|
// FindIntersectingPostings checks the intersection of p and candidates[i] for each i in candidates,
|
||||||
|
// if intersection is non empty, then i is added to the indexes returned.
|
||||||
|
// Returned indexes are not sorted.
|
||||||
|
func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int, err error) {
|
||||||
|
h := make(postingsWithIndexHeap, 0, len(candidates))
|
||||||
|
for idx, it := range candidates {
|
||||||
|
if it.Next() {
|
||||||
|
h = append(h, postingsWithIndex{index: idx, p: it})
|
||||||
|
} else if it.Err() != nil {
|
||||||
|
return nil, it.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(h) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
h.Init()
|
||||||
|
|
||||||
|
for len(h) > 0 {
|
||||||
|
if !p.Seek(h.At()) {
|
||||||
|
return indexes, p.Err()
|
||||||
|
}
|
||||||
|
if p.At() == h.At() {
|
||||||
|
found := heap.Pop(&h).(postingsWithIndex)
|
||||||
|
indexes = append(indexes, found.index)
|
||||||
|
} else if err := h.Next(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return indexes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type postingsWithIndex struct {
|
||||||
|
index int
|
||||||
|
p Postings
|
||||||
|
}
|
||||||
|
|
||||||
|
type postingsWithIndexHeap []postingsWithIndex
|
||||||
|
|
||||||
|
func (h *postingsWithIndexHeap) Init() { heap.Init(h) }
|
||||||
|
func (h postingsWithIndexHeap) At() storage.SeriesRef { return h[0].p.At() }
|
||||||
|
func (h *postingsWithIndexHeap) Next() error {
|
||||||
|
pi := (*h)[0]
|
||||||
|
next := pi.p.Next()
|
||||||
|
if next {
|
||||||
|
heap.Fix(h, 0)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pi.p.Err(); err != nil {
|
||||||
|
return errors.Wrapf(err, "postings %d", pi.index)
|
||||||
|
}
|
||||||
|
|
||||||
|
heap.Pop(h)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h postingsWithIndexHeap) Len() int { return len(h) }
|
||||||
|
func (h postingsWithIndexHeap) Less(i, j int) bool { return h[i].p.At() < h[j].p.At() }
|
||||||
|
func (h *postingsWithIndexHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
|
||||||
|
|
||||||
|
func (h *postingsWithIndexHeap) Push(x interface{}) {
|
||||||
|
*h = append(*h, x.(postingsWithIndex))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pop implements heap.Interface and pops the last element, which is NOT the min element,
|
||||||
|
// so this doesn't return the same heap.Pop()
|
||||||
|
func (h *postingsWithIndexHeap) Pop() interface{} {
|
||||||
|
old := *h
|
||||||
|
n := len(old)
|
||||||
|
x := old[n-1]
|
||||||
|
*h = old[0 : n-1]
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
package index
|
package index
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"container/heap"
|
||||||
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
@ -21,6 +23,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
@ -929,3 +932,142 @@ func TestMemPostings_Delete(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 0, len(expanded), "expected empty postings, got %v", expanded)
|
require.Equal(t, 0, len(expanded), "expected empty postings, got %v", expanded)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFindIntersectingPostings(t *testing.T) {
|
||||||
|
t.Run("multiple intersections", func(t *testing.T) {
|
||||||
|
p := NewListPostings([]storage.SeriesRef{10, 15, 20, 25, 30, 35, 40, 45, 50})
|
||||||
|
candidates := []Postings{
|
||||||
|
0: NewListPostings([]storage.SeriesRef{7, 13, 14, 27}), // Does not intersect.
|
||||||
|
1: NewListPostings([]storage.SeriesRef{10, 20}), // Does intersect.
|
||||||
|
2: NewListPostings([]storage.SeriesRef{29, 30, 31}), // Does intersect.
|
||||||
|
3: NewListPostings([]storage.SeriesRef{29, 30, 31}), // Does intersect (same again).
|
||||||
|
4: NewListPostings([]storage.SeriesRef{60}), // Does not intersect.
|
||||||
|
5: NewListPostings([]storage.SeriesRef{45}), // Does intersect.
|
||||||
|
6: EmptyPostings(), // Does not intersect.
|
||||||
|
}
|
||||||
|
|
||||||
|
indexes, err := FindIntersectingPostings(p, candidates)
|
||||||
|
require.NoError(t, err)
|
||||||
|
sort.Ints(indexes)
|
||||||
|
require.Equal(t, []int{1, 2, 3, 5}, indexes)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("no intersections", func(t *testing.T) {
|
||||||
|
p := NewListPostings([]storage.SeriesRef{10, 15, 20, 25, 30, 35, 40, 45, 50})
|
||||||
|
candidates := []Postings{
|
||||||
|
0: NewListPostings([]storage.SeriesRef{7, 13, 14, 27}), // Does not intersect.
|
||||||
|
1: NewListPostings([]storage.SeriesRef{60}), // Does not intersect.
|
||||||
|
2: EmptyPostings(), // Does not intersect.
|
||||||
|
}
|
||||||
|
|
||||||
|
indexes, err := FindIntersectingPostings(p, candidates)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Empty(t, indexes)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("p is ErrPostings", func(t *testing.T) {
|
||||||
|
p := ErrPostings(context.Canceled)
|
||||||
|
candidates := []Postings{NewListPostings([]storage.SeriesRef{1})}
|
||||||
|
_, err := FindIntersectingPostings(p, candidates)
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("one of the candidates is ErrPostings", func(t *testing.T) {
|
||||||
|
p := NewListPostings([]storage.SeriesRef{1})
|
||||||
|
candidates := []Postings{
|
||||||
|
NewListPostings([]storage.SeriesRef{1}),
|
||||||
|
ErrPostings(context.Canceled),
|
||||||
|
}
|
||||||
|
_, err := FindIntersectingPostings(p, candidates)
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("one of the candidates fails on nth call", func(t *testing.T) {
|
||||||
|
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50, 60, 70})
|
||||||
|
candidates := []Postings{
|
||||||
|
NewListPostings([]storage.SeriesRef{7, 13, 14, 27}),
|
||||||
|
&postingsFailingAfterNthCall{2, NewListPostings([]storage.SeriesRef{29, 30, 31, 40})},
|
||||||
|
}
|
||||||
|
_, err := FindIntersectingPostings(p, candidates)
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("p fails on the nth call", func(t *testing.T) {
|
||||||
|
p := &postingsFailingAfterNthCall{2, NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50, 60, 70})}
|
||||||
|
candidates := []Postings{
|
||||||
|
NewListPostings([]storage.SeriesRef{7, 13, 14, 27}),
|
||||||
|
NewListPostings([]storage.SeriesRef{29, 30, 31, 40}),
|
||||||
|
}
|
||||||
|
_, err := FindIntersectingPostings(p, candidates)
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type postingsFailingAfterNthCall struct {
|
||||||
|
ttl int
|
||||||
|
Postings
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *postingsFailingAfterNthCall) Seek(v storage.SeriesRef) bool {
|
||||||
|
p.ttl--
|
||||||
|
if p.ttl <= 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return p.Postings.Seek(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *postingsFailingAfterNthCall) Next() bool {
|
||||||
|
p.ttl--
|
||||||
|
if p.ttl <= 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return p.Postings.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *postingsFailingAfterNthCall) Err() error {
|
||||||
|
if p.ttl <= 0 {
|
||||||
|
return errors.New("ttl exceeded")
|
||||||
|
}
|
||||||
|
return p.Postings.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostingsWithIndexHeap(t *testing.T) {
|
||||||
|
t.Run("iterate", func(t *testing.T) {
|
||||||
|
h := postingsWithIndexHeap{
|
||||||
|
{index: 0, p: NewListPostings([]storage.SeriesRef{10, 20, 30})},
|
||||||
|
{index: 1, p: NewListPostings([]storage.SeriesRef{1, 5})},
|
||||||
|
{index: 2, p: NewListPostings([]storage.SeriesRef{25, 50})},
|
||||||
|
}
|
||||||
|
for _, node := range h {
|
||||||
|
node.p.Next()
|
||||||
|
}
|
||||||
|
h.Init()
|
||||||
|
|
||||||
|
for _, expected := range []storage.SeriesRef{1, 5, 10, 20, 25, 30, 50} {
|
||||||
|
require.Equal(t, expected, h.At())
|
||||||
|
require.NoError(t, h.Next())
|
||||||
|
}
|
||||||
|
require.Empty(t, h)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("pop", func(t *testing.T) {
|
||||||
|
h := postingsWithIndexHeap{
|
||||||
|
{index: 0, p: NewListPostings([]storage.SeriesRef{10, 20, 30})},
|
||||||
|
{index: 1, p: NewListPostings([]storage.SeriesRef{1, 5})},
|
||||||
|
{index: 2, p: NewListPostings([]storage.SeriesRef{25, 50})},
|
||||||
|
}
|
||||||
|
for _, node := range h {
|
||||||
|
node.p.Next()
|
||||||
|
}
|
||||||
|
h.Init()
|
||||||
|
|
||||||
|
for _, expected := range []storage.SeriesRef{1, 5, 10, 20} {
|
||||||
|
require.Equal(t, expected, h.At())
|
||||||
|
require.NoError(t, h.Next())
|
||||||
|
}
|
||||||
|
require.Equal(t, storage.SeriesRef(25), h.At())
|
||||||
|
node := heap.Pop(&h).(postingsWithIndex)
|
||||||
|
require.Equal(t, 2, node.index)
|
||||||
|
require.Equal(t, storage.SeriesRef(25), node.p.At())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -375,38 +375,30 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting
|
||||||
}
|
}
|
||||||
|
|
||||||
func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
|
func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||||
// We're only interested in metrics which have the label <name>.
|
p, err := PostingsForMatchers(r, matchers...)
|
||||||
requireLabel, err := labels.NewMatcher(labels.MatchNotEqual, name, "")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "Failed to instantiate label matcher")
|
return nil, errors.Wrap(err, "fetching postings for matchers")
|
||||||
}
|
}
|
||||||
|
|
||||||
var p index.Postings
|
allValues, err := r.LabelValues(name)
|
||||||
p, err = PostingsForMatchers(r, append(matchers, requireLabel)...)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errors.Wrapf(err, "fetching values of label %s", name)
|
||||||
}
|
}
|
||||||
|
valuesPostings := make([]index.Postings, len(allValues))
|
||||||
dedupe := map[string]interface{}{}
|
for i, value := range allValues {
|
||||||
for p.Next() {
|
valuesPostings[i], err = r.Postings(name, value)
|
||||||
v, err := r.LabelValueFor(p.At(), name)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == storage.ErrNotFound {
|
return nil, errors.Wrapf(err, "fetching postings for %s=%q", name, value)
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
dedupe[v] = nil
|
}
|
||||||
|
indexes, err := index.FindIntersectingPostings(p, valuesPostings)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "intersecting postings")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = p.Err(); err != nil {
|
values := make([]string, 0, len(indexes))
|
||||||
return nil, err
|
for _, idx := range indexes {
|
||||||
}
|
values = append(values, allValues[idx])
|
||||||
|
|
||||||
values := make([]string, 0, len(dedupe))
|
|
||||||
for value := range dedupe {
|
|
||||||
values = append(values, value)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return values, nil
|
return values, nil
|
||||||
|
|
|
@ -31,7 +31,7 @@ const (
|
||||||
postingsBenchSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd"
|
postingsBenchSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd"
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkPostingsForMatchers(b *testing.B) {
|
func BenchmarkQuerier(b *testing.B) {
|
||||||
chunkDir, err := ioutil.TempDir("", "chunk_dir")
|
chunkDir, err := ioutil.TempDir("", "chunk_dir")
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -66,7 +66,12 @@ func BenchmarkPostingsForMatchers(b *testing.B) {
|
||||||
ir, err := h.Index()
|
ir, err := h.Index()
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
b.Run("Head", func(b *testing.B) {
|
b.Run("Head", func(b *testing.B) {
|
||||||
benchmarkPostingsForMatchers(b, ir)
|
b.Run("PostingsForMatchers", func(b *testing.B) {
|
||||||
|
benchmarkPostingsForMatchers(b, ir)
|
||||||
|
})
|
||||||
|
b.Run("labelValuesWithMatchers", func(b *testing.B) {
|
||||||
|
benchmarkLabelValuesWithMatchers(b, ir)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
tmpdir, err := ioutil.TempDir("", "test_benchpostingsformatchers")
|
tmpdir, err := ioutil.TempDir("", "test_benchpostingsformatchers")
|
||||||
|
@ -85,7 +90,12 @@ func BenchmarkPostingsForMatchers(b *testing.B) {
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
defer ir.Close()
|
defer ir.Close()
|
||||||
b.Run("Block", func(b *testing.B) {
|
b.Run("Block", func(b *testing.B) {
|
||||||
benchmarkPostingsForMatchers(b, ir)
|
b.Run("PostingsForMatchers", func(b *testing.B) {
|
||||||
|
benchmarkPostingsForMatchers(b, ir)
|
||||||
|
})
|
||||||
|
b.Run("labelValuesWithMatchers", func(b *testing.B) {
|
||||||
|
benchmarkLabelValuesWithMatchers(b, ir)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,7 +113,7 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
|
||||||
i1Plus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^1.+$")
|
i1Plus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^1.+$")
|
||||||
iEmptyRe := labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")
|
iEmptyRe := labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")
|
||||||
iNotEmpty := labels.MustNewMatcher(labels.MatchNotEqual, "i", "")
|
iNotEmpty := labels.MustNewMatcher(labels.MatchNotEqual, "i", "")
|
||||||
iNot2 := labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+postingsBenchSuffix)
|
iNot2 := labels.MustNewMatcher(labels.MatchNotEqual, "i", "2"+postingsBenchSuffix)
|
||||||
iNot2Star := labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")
|
iNot2Star := labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")
|
||||||
iNotStar2Star := labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^.*2.*$")
|
iNotStar2Star := labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^.*2.*$")
|
||||||
|
|
||||||
|
@ -143,6 +153,38 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func benchmarkLabelValuesWithMatchers(b *testing.B, ir IndexReader) {
|
||||||
|
i1 := labels.MustNewMatcher(labels.MatchEqual, "i", "1")
|
||||||
|
iStar := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.*$")
|
||||||
|
jNotFoo := labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo")
|
||||||
|
n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)
|
||||||
|
nPlus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$")
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
labelName string
|
||||||
|
matchers []*labels.Matcher
|
||||||
|
}{
|
||||||
|
// i has 100k values.
|
||||||
|
{`i with n="1"`, "i", []*labels.Matcher{n1}},
|
||||||
|
{`i with n="^.+$"`, "i", []*labels.Matcher{nPlus}},
|
||||||
|
{`i with n="1",j!="foo"`, "i", []*labels.Matcher{n1, jNotFoo}},
|
||||||
|
{`i with n="1",i=~"^.*$",j!="foo"`, "i", []*labels.Matcher{n1, iStar, jNotFoo}},
|
||||||
|
// n has 10 values.
|
||||||
|
{`n with j!="foo"`, "n", []*labels.Matcher{jNotFoo}},
|
||||||
|
{`n with i="1"`, "n", []*labels.Matcher{i1}},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
b.Run(c.name, func(b *testing.B) {
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
_, err := labelValuesWithMatchers(ir, c.labelName, c.matchers...)
|
||||||
|
require.NoError(b, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkQuerierSelect(b *testing.B) {
|
func BenchmarkQuerierSelect(b *testing.B) {
|
||||||
chunkDir, err := ioutil.TempDir("", "chunk_dir")
|
chunkDir, err := ioutil.TempDir("", "chunk_dir")
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
|
|
Loading…
Reference in a new issue