postings: use Loser Tree for merge

It's faster.

Note change to test - instead of requiring that the data structure is
identical to `EmptyPostings()`, check that calling `Next()` returns
false, which implies it was empty.

Also the check for context cancellation during initialization was
removed. Initialization should be a small portion of the work done
during merge, so it's not worth plumbing a context argument through.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2023-09-04 15:23:30 +01:00
parent ee700151a3
commit ab3a47b489
3 changed files with 32 additions and 96 deletions

1
go.mod
View file

@ -10,6 +10,7 @@ require (
github.com/alecthomas/kingpin/v2 v2.4.0
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9
github.com/aws/aws-sdk-go v1.48.14
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3
github.com/cespare/xxhash/v2 v2.2.0
github.com/dennwc/varint v1.0.0
github.com/digitalocean/godo v1.106.0

2
go.sum
View file

@ -94,6 +94,8 @@ github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2z
github.com/aws/aws-sdk-go v1.48.14 h1:nVLrp+F84SG+xGiFMfe1TE6ZV6smF+42tuuNgYGV30s=
github.com/aws/aws-sdk-go v1.48.14/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=

View file

@ -18,11 +18,13 @@ import (
"context"
"encoding/binary"
"fmt"
"math"
"runtime"
"sort"
"strings"
"sync"
"github.com/bboreham/go-loser"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/labels"
@ -525,7 +527,7 @@ func (it *intersectPostings) Err() error {
}
// Merge returns a new iterator over the union of the input iterators.
func Merge(ctx context.Context, its ...Postings) Postings {
func Merge(_ context.Context, its ...Postings) Postings {
if len(its) == 0 {
return EmptyPostings()
}
@ -533,122 +535,48 @@ func Merge(ctx context.Context, its ...Postings) Postings {
return its[0]
}
p, ok := newMergedPostings(ctx, its)
p, ok := newMergedPostings(its)
if !ok {
return EmptyPostings()
}
return p
}
type postingsHeap []Postings
func (h postingsHeap) Len() int { return len(h) }
func (h postingsHeap) Less(i, j int) bool { return h[i].At() < h[j].At() }
func (h *postingsHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
func (h *postingsHeap) Push(x interface{}) {
*h = append(*h, x.(Postings))
}
func (h *postingsHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type mergedPostings struct {
h postingsHeap
initialized bool
cur storage.SeriesRef
err error
p []Postings
h *loser.Tree[storage.SeriesRef, Postings]
cur storage.SeriesRef
}
func newMergedPostings(ctx context.Context, p []Postings) (m *mergedPostings, nonEmpty bool) {
ph := make(postingsHeap, 0, len(p))
for _, it := range p {
// NOTE: mergedPostings struct requires the user to issue an initial Next.
switch {
case ctx.Err() != nil:
return &mergedPostings{err: ctx.Err()}, true
case it.Next():
ph = append(ph, it)
case it.Err() != nil:
return &mergedPostings{err: it.Err()}, true
}
}
if len(ph) == 0 {
return nil, false
}
return &mergedPostings{h: ph}, true
func newMergedPostings(p []Postings) (m *mergedPostings, nonEmpty bool) {
const maxVal = storage.SeriesRef(math.MaxUint64) // This value must be higher than all real values used in the tree.
lt := loser.New(p, maxVal)
return &mergedPostings{p: p, h: lt}, true
}
func (it *mergedPostings) Next() bool {
if it.h.Len() == 0 || it.err != nil {
return false
}
// The user must issue an initial Next.
if !it.initialized {
heap.Init(&it.h)
it.cur = it.h[0].At()
it.initialized = true
return true
}
for {
cur := it.h[0]
if !cur.Next() {
heap.Pop(&it.h)
if cur.Err() != nil {
it.err = cur.Err()
return false
}
if it.h.Len() == 0 {
return false
}
} else {
// Value of top of heap has changed, re-heapify.
heap.Fix(&it.h, 0)
if !it.h.Next() {
return false
}
if it.h[0].At() != it.cur {
it.cur = it.h[0].At()
// Remove duplicate entries.
newItem := it.h.At()
if newItem != it.cur {
it.cur = newItem
return true
}
}
}
func (it *mergedPostings) Seek(id storage.SeriesRef) bool {
if it.h.Len() == 0 || it.err != nil {
for !it.h.IsEmpty() && it.h.At() < id {
finished := !it.h.Winner().Seek(id)
it.h.Fix(finished)
}
if it.h.IsEmpty() {
return false
}
if !it.initialized {
if !it.Next() {
return false
}
}
for it.cur < id {
cur := it.h[0]
if !cur.Seek(id) {
heap.Pop(&it.h)
if cur.Err() != nil {
it.err = cur.Err()
return false
}
if it.h.Len() == 0 {
return false
}
} else {
// Value of top of heap has changed, re-heapify.
heap.Fix(&it.h, 0)
}
it.cur = it.h[0].At()
}
it.cur = it.h.At()
return true
}
@ -657,7 +585,12 @@ func (it mergedPostings) At() storage.SeriesRef {
}
func (it mergedPostings) Err() error {
return it.err
for _, p := range it.p {
if err := p.Err(); err != nil {
return err
}
}
return nil
}
// Without returns a new postings list that contains all elements from the full list that