Merge pull request #12878 from bboreham/loser-tree

postings: use Loser Tree for merge
This commit is contained in:
Bryan Boreham 2023-12-12 21:38:30 +00:00 committed by GitHub
commit d0c2d9c0b9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 97 deletions

1
go.mod
View file

@ -10,6 +10,7 @@ require (
github.com/alecthomas/kingpin/v2 v2.4.0
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9
github.com/aws/aws-sdk-go v1.48.14
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3
github.com/cespare/xxhash/v2 v2.2.0
github.com/dennwc/varint v1.0.0
github.com/digitalocean/godo v1.106.0

2
go.sum
View file

@ -94,6 +94,8 @@ github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2z
github.com/aws/aws-sdk-go v1.48.14 h1:nVLrp+F84SG+xGiFMfe1TE6ZV6smF+42tuuNgYGV30s=
github.com/aws/aws-sdk-go v1.48.14/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=

View file

@ -18,11 +18,13 @@ import (
"context"
"encoding/binary"
"fmt"
"math"
"runtime"
"sort"
"strings"
"sync"
"github.com/bboreham/go-loser"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/labels"
@ -525,7 +527,7 @@ func (it *intersectPostings) Err() error {
}
// Merge returns a new iterator over the union of the input iterators.
func Merge(ctx context.Context, its ...Postings) Postings {
func Merge(_ context.Context, its ...Postings) Postings {
if len(its) == 0 {
return EmptyPostings()
}
@ -533,122 +535,48 @@ func Merge(ctx context.Context, its ...Postings) Postings {
return its[0]
}
p, ok := newMergedPostings(ctx, its)
p, ok := newMergedPostings(its)
if !ok {
return EmptyPostings()
}
return p
}
type postingsHeap []Postings
func (h postingsHeap) Len() int { return len(h) }
func (h postingsHeap) Less(i, j int) bool { return h[i].At() < h[j].At() }
func (h *postingsHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
func (h *postingsHeap) Push(x interface{}) {
*h = append(*h, x.(Postings))
}
func (h *postingsHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type mergedPostings struct {
h postingsHeap
initialized bool
p []Postings
h *loser.Tree[storage.SeriesRef, Postings]
cur storage.SeriesRef
err error
}
func newMergedPostings(ctx context.Context, p []Postings) (m *mergedPostings, nonEmpty bool) {
ph := make(postingsHeap, 0, len(p))
for _, it := range p {
// NOTE: mergedPostings struct requires the user to issue an initial Next.
switch {
case ctx.Err() != nil:
return &mergedPostings{err: ctx.Err()}, true
case it.Next():
ph = append(ph, it)
case it.Err() != nil:
return &mergedPostings{err: it.Err()}, true
}
}
if len(ph) == 0 {
return nil, false
}
return &mergedPostings{h: ph}, true
func newMergedPostings(p []Postings) (m *mergedPostings, nonEmpty bool) {
const maxVal = storage.SeriesRef(math.MaxUint64) // This value must be higher than all real values used in the tree.
lt := loser.New(p, maxVal)
return &mergedPostings{p: p, h: lt}, true
}
func (it *mergedPostings) Next() bool {
if it.h.Len() == 0 || it.err != nil {
return false
}
// The user must issue an initial Next.
if !it.initialized {
heap.Init(&it.h)
it.cur = it.h[0].At()
it.initialized = true
return true
}
for {
cur := it.h[0]
if !cur.Next() {
heap.Pop(&it.h)
if cur.Err() != nil {
it.err = cur.Err()
if !it.h.Next() {
return false
}
if it.h.Len() == 0 {
return false
}
} else {
// Value of top of heap has changed, re-heapify.
heap.Fix(&it.h, 0)
}
if it.h[0].At() != it.cur {
it.cur = it.h[0].At()
// Remove duplicate entries.
newItem := it.h.At()
if newItem != it.cur {
it.cur = newItem
return true
}
}
}
func (it *mergedPostings) Seek(id storage.SeriesRef) bool {
if it.h.Len() == 0 || it.err != nil {
for !it.h.IsEmpty() && it.h.At() < id {
finished := !it.h.Winner().Seek(id)
it.h.Fix(finished)
}
if it.h.IsEmpty() {
return false
}
if !it.initialized {
if !it.Next() {
return false
}
}
for it.cur < id {
cur := it.h[0]
if !cur.Seek(id) {
heap.Pop(&it.h)
if cur.Err() != nil {
it.err = cur.Err()
return false
}
if it.h.Len() == 0 {
return false
}
} else {
// Value of top of heap has changed, re-heapify.
heap.Fix(&it.h, 0)
}
it.cur = it.h[0].At()
}
it.cur = it.h.At()
return true
}
@ -657,7 +585,12 @@ func (it mergedPostings) At() storage.SeriesRef {
}
func (it mergedPostings) Err() error {
return it.err
for _, p := range it.p {
if err := p.Err(); err != nil {
return err
}
}
return nil
}
// Without returns a new postings list that contains all elements from the full list that

View file

@ -380,6 +380,38 @@ func BenchmarkIntersect(t *testing.B) {
})
}
func BenchmarkMerge(t *testing.B) {
var lps []*ListPostings
var refs [][]storage.SeriesRef
// Create 100000 matchers(k=100000), making sure all memory allocation is done before starting the loop.
for i := 0; i < 100000; i++ {
var temp []storage.SeriesRef
for j := 1; j < 100; j++ {
temp = append(temp, storage.SeriesRef(i+j*100000))
}
lps = append(lps, newListPostings(temp...))
refs = append(refs, temp)
}
its := make([]Postings, len(refs))
for _, nSeries := range []int{1, 10, 100, 1000, 10000, 100000} {
t.Run(fmt.Sprint(nSeries), func(bench *testing.B) {
ctx := context.Background()
for i := 0; i < bench.N; i++ {
// Reset the ListPostings to their original values each time round the loop.
for j := range refs[:nSeries] {
lps[j].list = refs[j]
its[j] = lps[j]
}
if err := consumePostings(Merge(ctx, its[:nSeries]...)); err != nil {
bench.Fatal(err)
}
}
})
}
}
func TestMultiMerge(t *testing.T) {
i1 := newListPostings(1, 2, 3, 4, 5, 6, 1000, 1001)
i2 := newListPostings(2, 4, 5, 6, 7, 8, 999, 1001)
@ -481,7 +513,7 @@ func TestMergedPostings(t *testing.T) {
m := Merge(ctx, c.in...)
if c.res == EmptyPostings() {
require.Equal(t, EmptyPostings(), m)
require.False(t, m.Next())
return
}