mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Improvement on postings intersection (#616)
* improvement on postings intersection Signed-off-by: naivewong <867245430@qq.com>
This commit is contained in:
parent
e809cb477d
commit
6ab483071a
|
@ -303,68 +303,68 @@ func Intersect(its ...Postings) Postings {
|
|||
if len(its) == 1 {
|
||||
return its[0]
|
||||
}
|
||||
|
||||
l := len(its) / 2
|
||||
a := Intersect(its[:l]...)
|
||||
b := Intersect(its[l:]...)
|
||||
|
||||
if a == EmptyPostings() || b == EmptyPostings() {
|
||||
return EmptyPostings()
|
||||
for _, p := range its {
|
||||
if p == EmptyPostings() {
|
||||
return EmptyPostings()
|
||||
}
|
||||
}
|
||||
return newIntersectPostings(a, b)
|
||||
|
||||
return newIntersectPostings(its...)
|
||||
}
|
||||
|
||||
type intersectPostings struct {
|
||||
a, b Postings
|
||||
cur uint64
|
||||
arr []Postings
|
||||
cur uint64
|
||||
}
|
||||
|
||||
func newIntersectPostings(a, b Postings) *intersectPostings {
|
||||
return &intersectPostings{a: a, b: b}
|
||||
func newIntersectPostings(its ...Postings) *intersectPostings {
|
||||
return &intersectPostings{arr: its}
|
||||
}
|
||||
|
||||
func (it *intersectPostings) At() uint64 {
|
||||
return it.cur
|
||||
}
|
||||
|
||||
func (it *intersectPostings) doNext(id uint64) bool {
|
||||
func (it *intersectPostings) doNext() bool {
|
||||
Loop:
|
||||
for {
|
||||
if !it.b.Seek(id) {
|
||||
return false
|
||||
}
|
||||
if vb := it.b.At(); vb != id {
|
||||
if !it.a.Seek(vb) {
|
||||
for _, p := range it.arr {
|
||||
if !p.Seek(it.cur) {
|
||||
return false
|
||||
}
|
||||
id = it.a.At()
|
||||
if vb != id {
|
||||
continue
|
||||
if p.At() > it.cur {
|
||||
it.cur = p.At()
|
||||
continue Loop
|
||||
}
|
||||
}
|
||||
it.cur = id
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (it *intersectPostings) Next() bool {
|
||||
if !it.a.Next() {
|
||||
return false
|
||||
for _, p := range it.arr {
|
||||
if !p.Next() {
|
||||
return false
|
||||
}
|
||||
if p.At() > it.cur {
|
||||
it.cur = p.At()
|
||||
}
|
||||
}
|
||||
return it.doNext(it.a.At())
|
||||
return it.doNext()
|
||||
}
|
||||
|
||||
func (it *intersectPostings) Seek(id uint64) bool {
|
||||
if !it.a.Seek(id) {
|
||||
return false
|
||||
}
|
||||
return it.doNext(it.a.At())
|
||||
it.cur = id
|
||||
return it.doNext()
|
||||
}
|
||||
|
||||
func (it *intersectPostings) Err() error {
|
||||
if it.a.Err() != nil {
|
||||
return it.a.Err()
|
||||
for _, p := range it.arr {
|
||||
if p.Err() != nil {
|
||||
return p.Err()
|
||||
}
|
||||
}
|
||||
return it.b.Err()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Merge returns a new iterator over the union of the input iterators.
|
||||
|
|
|
@ -221,36 +221,90 @@ func TestMultiIntersect(t *testing.T) {
|
|||
}
|
||||
|
||||
func BenchmarkIntersect(t *testing.B) {
|
||||
var a, b, c, d []uint64
|
||||
t.Run("LongPostings1", func(bench *testing.B) {
|
||||
var a, b, c, d []uint64
|
||||
|
||||
for i := 0; i < 10000000; i += 2 {
|
||||
a = append(a, uint64(i))
|
||||
}
|
||||
for i := 5000000; i < 5000100; i += 4 {
|
||||
b = append(b, uint64(i))
|
||||
}
|
||||
for i := 5090000; i < 5090600; i += 4 {
|
||||
b = append(b, uint64(i))
|
||||
}
|
||||
for i := 4990000; i < 5100000; i++ {
|
||||
c = append(c, uint64(i))
|
||||
}
|
||||
for i := 4000000; i < 6000000; i++ {
|
||||
d = append(d, uint64(i))
|
||||
}
|
||||
|
||||
i1 := newListPostings(a...)
|
||||
i2 := newListPostings(b...)
|
||||
i3 := newListPostings(c...)
|
||||
i4 := newListPostings(d...)
|
||||
|
||||
t.ResetTimer()
|
||||
|
||||
for i := 0; i < t.N; i++ {
|
||||
if _, err := ExpandPostings(Intersect(i1, i2, i3, i4)); err != nil {
|
||||
t.Fatal(err)
|
||||
for i := 0; i < 10000000; i += 2 {
|
||||
a = append(a, uint64(i))
|
||||
}
|
||||
}
|
||||
for i := 5000000; i < 5000100; i += 4 {
|
||||
b = append(b, uint64(i))
|
||||
}
|
||||
for i := 5090000; i < 5090600; i += 4 {
|
||||
b = append(b, uint64(i))
|
||||
}
|
||||
for i := 4990000; i < 5100000; i++ {
|
||||
c = append(c, uint64(i))
|
||||
}
|
||||
for i := 4000000; i < 6000000; i++ {
|
||||
d = append(d, uint64(i))
|
||||
}
|
||||
|
||||
i1 := newListPostings(a...)
|
||||
i2 := newListPostings(b...)
|
||||
i3 := newListPostings(c...)
|
||||
i4 := newListPostings(d...)
|
||||
|
||||
bench.ResetTimer()
|
||||
bench.ReportAllocs()
|
||||
for i := 0; i < bench.N; i++ {
|
||||
if _, err := ExpandPostings(Intersect(i1, i2, i3, i4)); err != nil {
|
||||
bench.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("LongPostings2", func(bench *testing.B) {
|
||||
var a, b, c, d []uint64
|
||||
|
||||
for i := 0; i < 12500000; i++ {
|
||||
a = append(a, uint64(i))
|
||||
}
|
||||
for i := 7500000; i < 12500000; i++ {
|
||||
b = append(b, uint64(i))
|
||||
}
|
||||
for i := 9000000; i < 20000000; i++ {
|
||||
c = append(c, uint64(i))
|
||||
}
|
||||
for i := 10000000; i < 12000000; i++ {
|
||||
d = append(d, uint64(i))
|
||||
}
|
||||
|
||||
i1 := newListPostings(a...)
|
||||
i2 := newListPostings(b...)
|
||||
i3 := newListPostings(c...)
|
||||
i4 := newListPostings(d...)
|
||||
|
||||
bench.ResetTimer()
|
||||
bench.ReportAllocs()
|
||||
for i := 0; i < bench.N; i++ {
|
||||
if _, err := ExpandPostings(Intersect(i1, i2, i3, i4)); err != nil {
|
||||
bench.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Many matchers(k >> n).
|
||||
t.Run("ManyPostings", func(bench *testing.B) {
|
||||
var its []Postings
|
||||
|
||||
// 100000 matchers(k=100000).
|
||||
for i := 0; i < 100000; i++ {
|
||||
var temp []uint64
|
||||
for j := 1; j < 100; j++ {
|
||||
temp = append(temp, uint64(j))
|
||||
}
|
||||
its = append(its, newListPostings(temp...))
|
||||
}
|
||||
|
||||
bench.ResetTimer()
|
||||
bench.ReportAllocs()
|
||||
for i := 0; i < bench.N; i++ {
|
||||
if _, err := ExpandPostings(Intersect(its...)); err != nil {
|
||||
bench.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestMultiMerge(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue