Improve Merge performance (#531)

Use a heap for Next for merges, and
pre-compute if there's many postings on the
unset path.

Add posting lookup benchmarks

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2019-02-28 17:23:55 +00:00 committed by GitHub
parent df06f9ebc2
commit 62b652fbd0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 191 additions and 26 deletions

View file

@ -54,19 +54,61 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
// Put a series, select it. GC it and then access it. // Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000) h, err := NewHead(nil, nil, nil, 1000)
testutil.Ok(b, err) testutil.Ok(b, err)
defer h.Close() defer func() {
testutil.Ok(b, h.Close())
}()
// TODO: vary number of series var hash uint64
for i := 0; i < 1000000; i++ { for n := 0; n < 10; n++ {
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i))) for i := 0; i < 100000; i++ {
h.getOrCreate(hash, labels.FromStrings("i", strconv.Itoa(i), "n", strconv.Itoa(i), "j", "foo"))
hash++
// Have some series that won't be matched, to properly test inverted matches.
h.getOrCreate(hash, labels.FromStrings("i", strconv.Itoa(i), "n", strconv.Itoa(i), "j", "bar"))
hash++
}
} }
b.ResetTimer() n1 := labels.NewEqualMatcher("n", "1")
all, _ := labels.NewRegexpMatcher("a", ".*") jFoo := labels.NewEqualMatcher("j", "foo")
jNotFoo := labels.Not(jFoo)
iStar := labels.NewMustRegexpMatcher("i", "^.*$")
iPlus := labels.NewMustRegexpMatcher("i", "^.+$")
i1Plus := labels.NewMustRegexpMatcher("i", "^1.+$")
iEmptyRe := labels.NewMustRegexpMatcher("i", "^$")
iNotEmpty := labels.Not(labels.NewEqualMatcher("i", ""))
iNot2 := labels.Not(labels.NewEqualMatcher("n", "2"))
iNot2Star := labels.Not(labels.NewMustRegexpMatcher("i", "^2.*$"))
cases := []struct {
name string
matchers []labels.Matcher
}{
{`n="1"`, []labels.Matcher{n1}},
{`n="1",j="foo"`, []labels.Matcher{n1, jFoo}},
{`j="foo",n="1"`, []labels.Matcher{jFoo, n1}},
{`n="1",j!="foo"`, []labels.Matcher{n1, jNotFoo}},
{`i=~".*"`, []labels.Matcher{iStar}},
{`i=~".+"`, []labels.Matcher{iPlus}},
{`i=~""`, []labels.Matcher{iEmptyRe}},
{`i!=""`, []labels.Matcher{iNotEmpty}},
{`n="1",i=~".*",j="foo"`, []labels.Matcher{n1, iStar, jFoo}},
{`n="1",i=~".*",i!="2",j="foo"`, []labels.Matcher{n1, iStar, iNot2, jFoo}},
{`n="1",i!="",j="foo"`, []labels.Matcher{n1, iNotEmpty, jFoo}},
{`n="1",i=~".+",j="foo"`, []labels.Matcher{n1, iPlus, jFoo}},
{`n="1",i=~"1.+",j="foo"`, []labels.Matcher{n1, i1Plus, jFoo}},
{`n="1",i=~".+",i!="2",j="foo"`, []labels.Matcher{n1, iPlus, iNot2, jFoo}},
{`n="1",i=~".+",i!~"2.*",j="foo"`, []labels.Matcher{n1, iPlus, iNot2Star, jFoo}},
}
for _, c := range cases {
b.Run(c.name, func(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := PostingsForMatchers(h.indexRange(0, 1000), all) _, err := PostingsForMatchers(h.indexRange(0, 1000), c.matchers...)
testutil.Ok(b, err) testutil.Ok(b, err)
} }
})
}
} }

View file

@ -14,6 +14,7 @@
package index package index
import ( import (
"container/heap"
"encoding/binary" "encoding/binary"
"runtime" "runtime"
"sort" "sort"
@ -365,25 +366,132 @@ func Merge(its ...Postings) Postings {
if len(its) == 1 { if len(its) == 1 {
return its[0] return its[0]
} }
// All the uses of this function immediately expand it, so return newMergedPostings(its)
// collect everything in a map. This is more efficient }
// when there's 100ks of postings, compared to
// having a tree of merge objects. type postingsHeap []Postings
pm := make(map[uint64]struct{}, len(its))
for _, it := range its { func (h postingsHeap) Len() int { return len(h) }
for it.Next() { func (h postingsHeap) Less(i, j int) bool { return h[i].At() < h[j].At() }
pm[it.At()] = struct{}{} func (h *postingsHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
}
func (h *postingsHeap) Push(x interface{}) {
*h = append(*h, x.(Postings))
}
func (h *postingsHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type mergedPostings struct {
h postingsHeap
initilized bool
heaped bool
cur uint64
err error
}
func newMergedPostings(p []Postings) *mergedPostings {
ph := make(postingsHeap, 0, len(p))
for _, it := range p {
if it.Next() {
ph = append(ph, it)
} else {
if it.Err() != nil { if it.Err() != nil {
return ErrPostings(it.Err()) return &mergedPostings{err: it.Err()}
} }
} }
pl := make([]uint64, 0, len(pm))
for p := range pm {
pl = append(pl, p)
} }
sort.Slice(pl, func(i, j int) bool { return pl[i] < pl[j] }) return &mergedPostings{h: ph}
return newListPostings(pl) }
func (it *mergedPostings) Next() bool {
if it.h.Len() == 0 || it.err != nil {
return false
}
if !it.heaped {
heap.Init(&it.h)
it.heaped = true
}
// The user must issue an initial Next.
if !it.initilized {
it.cur = it.h[0].At()
it.initilized = true
return true
}
for {
cur := it.h[0]
if !cur.Next() {
heap.Pop(&it.h)
if cur.Err() != nil {
it.err = cur.Err()
return false
}
if it.h.Len() == 0 {
return false
}
} else {
// Value of top of heap has changed, re-heapify.
heap.Fix(&it.h, 0)
}
if it.h[0].At() != it.cur {
it.cur = it.h[0].At()
return true
}
}
}
func (it *mergedPostings) Seek(id uint64) bool {
if it.h.Len() == 0 || it.err != nil {
return false
}
if !it.initilized {
if !it.Next() {
return false
}
}
if it.cur >= id {
return true
}
// Heapifying when there is lots of Seeks is inefficient,
// mark to be re-heapified on the Next() call.
it.heaped = false
newH := make(postingsHeap, 0, len(it.h))
lowest := ^uint64(0)
for _, i := range it.h {
if i.Seek(id) {
newH = append(newH, i)
if i.At() < lowest {
lowest = i.At()
}
} else {
if i.Err() != nil {
it.err = i.Err()
return false
}
}
}
it.h = newH
if len(it.h) == 0 {
return false
}
it.cur = lowest
return true
}
func (it mergedPostings) At() uint64 {
return it.cur
}
func (it mergedPostings) Err() error {
return it.err
} }
// Without returns a new postings list that contains all elements from the full list that // Without returns a new postings list that contains all elements from the full list that
@ -498,6 +606,9 @@ func (it *listPostings) Seek(x uint64) bool {
if it.cur >= x { if it.cur >= x {
return true return true
} }
if len(it.list) == 0 {
return false
}
// Do binary search between current position and end. // Do binary search between current position and end.
i := sort.Search(len(it.list), func(i int) bool { i := sort.Search(len(it.list), func(i int) bool {

View file

@ -354,11 +354,23 @@ func postingsForUnsetLabelMatcher(ix IndexReader, m labels.Matcher) (index.Posti
rit = append(rit, it) rit = append(rit, it)
} }
merged := index.Merge(rit...)
// With many many postings, it's best to pre-calculate
// the merged list via next rather than have a ton of seeks
// in Without/Intersection.
if len(rit) > 100 {
pl, err := index.ExpandPostings(merged)
if err != nil {
return nil, err
}
merged = index.NewListPostings(pl)
}
allPostings, err := ix.Postings(index.AllPostingsKey()) allPostings, err := ix.Postings(index.AllPostingsKey())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return index.Without(allPostings, index.Merge(rit...)), nil return index.Without(allPostings, merged), nil
} }
func mergeStrings(a, b []string) []string { func mergeStrings(a, b []string) []string {