mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 06:04:05 -08:00
Merge pull request #57 from telendt/posings_fix
Fix various postings implementations
This commit is contained in:
commit
968821e575
159
postings.go
159
postings.go
|
@ -33,7 +33,7 @@ func (p *memPostings) get(t term) Postings {
|
||||||
if l == nil {
|
if l == nil {
|
||||||
return emptyPostings
|
return emptyPostings
|
||||||
}
|
}
|
||||||
return &listPostings{list: l, idx: -1}
|
return newListPostings(l)
|
||||||
}
|
}
|
||||||
|
|
||||||
// add adds a document to the index. The caller has to ensure that no
|
// add adds a document to the index. The caller has to ensure that no
|
||||||
|
@ -70,18 +70,13 @@ func (e errPostings) Seek(uint32) bool { return false }
|
||||||
func (e errPostings) At() uint32 { return 0 }
|
func (e errPostings) At() uint32 { return 0 }
|
||||||
func (e errPostings) Err() error { return e.err }
|
func (e errPostings) Err() error { return e.err }
|
||||||
|
|
||||||
func expandPostings(p Postings) (res []uint32, err error) {
|
var emptyPostings = errPostings{}
|
||||||
for p.Next() {
|
|
||||||
res = append(res, p.At())
|
|
||||||
}
|
|
||||||
return res, p.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Intersect returns a new postings list over the intersection of the
|
// Intersect returns a new postings list over the intersection of the
|
||||||
// input postings.
|
// input postings.
|
||||||
func Intersect(its ...Postings) Postings {
|
func Intersect(its ...Postings) Postings {
|
||||||
if len(its) == 0 {
|
if len(its) == 0 {
|
||||||
return errPostings{err: nil}
|
return emptyPostings
|
||||||
}
|
}
|
||||||
a := its[0]
|
a := its[0]
|
||||||
|
|
||||||
|
@ -91,8 +86,6 @@ func Intersect(its ...Postings) Postings {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
var emptyPostings = errPostings{}
|
|
||||||
|
|
||||||
type intersectPostings struct {
|
type intersectPostings struct {
|
||||||
a, b Postings
|
a, b Postings
|
||||||
aok, bok bool
|
aok, bok bool
|
||||||
|
@ -100,41 +93,44 @@ type intersectPostings struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newIntersectPostings(a, b Postings) *intersectPostings {
|
func newIntersectPostings(a, b Postings) *intersectPostings {
|
||||||
it := &intersectPostings{a: a, b: b}
|
return &intersectPostings{a: a, b: b}
|
||||||
it.aok = it.a.Next()
|
|
||||||
it.bok = it.b.Next()
|
|
||||||
|
|
||||||
return it
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *intersectPostings) At() uint32 {
|
func (it *intersectPostings) At() uint32 {
|
||||||
return it.cur
|
return it.cur
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *intersectPostings) Next() bool {
|
func (it *intersectPostings) doNext(id uint32) bool {
|
||||||
for {
|
for {
|
||||||
if !it.aok || !it.bok {
|
if !it.b.Seek(id) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
av, bv := it.a.At(), it.b.At()
|
if vb := it.b.At(); vb != id {
|
||||||
|
if !it.a.Seek(vb) {
|
||||||
if av < bv {
|
return false
|
||||||
it.aok = it.a.Seek(bv)
|
}
|
||||||
} else if bv < av {
|
id = it.a.At()
|
||||||
it.bok = it.b.Seek(av)
|
if vb != id {
|
||||||
} else {
|
continue
|
||||||
it.cur = av
|
}
|
||||||
it.aok = it.a.Next()
|
|
||||||
it.bok = it.b.Next()
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
it.cur = id
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (it *intersectPostings) Next() bool {
|
||||||
|
if !it.a.Next() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return it.doNext(it.a.At())
|
||||||
|
}
|
||||||
|
|
||||||
func (it *intersectPostings) Seek(id uint32) bool {
|
func (it *intersectPostings) Seek(id uint32) bool {
|
||||||
it.aok = it.a.Seek(id)
|
if !it.a.Seek(id) {
|
||||||
it.bok = it.b.Seek(id)
|
return false
|
||||||
return it.Next()
|
}
|
||||||
|
return it.doNext(it.a.At())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *intersectPostings) Err() error {
|
func (it *intersectPostings) Err() error {
|
||||||
|
@ -158,17 +154,14 @@ func Merge(its ...Postings) Postings {
|
||||||
}
|
}
|
||||||
|
|
||||||
type mergedPostings struct {
|
type mergedPostings struct {
|
||||||
a, b Postings
|
a, b Postings
|
||||||
aok, bok bool
|
initialized bool
|
||||||
cur uint32
|
aok, bok bool
|
||||||
|
cur uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMergedPostings(a, b Postings) *mergedPostings {
|
func newMergedPostings(a, b Postings) *mergedPostings {
|
||||||
it := &mergedPostings{a: a, b: b}
|
return &mergedPostings{a: a, b: b}
|
||||||
it.aok = it.a.Next()
|
|
||||||
it.bok = it.b.Next()
|
|
||||||
|
|
||||||
return it
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *mergedPostings) At() uint32 {
|
func (it *mergedPostings) At() uint32 {
|
||||||
|
@ -176,6 +169,12 @@ func (it *mergedPostings) At() uint32 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *mergedPostings) Next() bool {
|
func (it *mergedPostings) Next() bool {
|
||||||
|
if !it.initialized {
|
||||||
|
it.aok = it.a.Next()
|
||||||
|
it.bok = it.b.Next()
|
||||||
|
it.initialized = true
|
||||||
|
}
|
||||||
|
|
||||||
if !it.aok && !it.bok {
|
if !it.aok && !it.bok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -196,25 +195,31 @@ func (it *mergedPostings) Next() bool {
|
||||||
if acur < bcur {
|
if acur < bcur {
|
||||||
it.cur = acur
|
it.cur = acur
|
||||||
it.aok = it.a.Next()
|
it.aok = it.a.Next()
|
||||||
return true
|
} else if acur > bcur {
|
||||||
}
|
|
||||||
if bcur < acur {
|
|
||||||
it.cur = bcur
|
it.cur = bcur
|
||||||
it.bok = it.b.Next()
|
it.bok = it.b.Next()
|
||||||
return true
|
} else {
|
||||||
|
it.cur = acur
|
||||||
|
it.aok = it.a.Next()
|
||||||
|
it.bok = it.b.Next()
|
||||||
}
|
}
|
||||||
it.cur = acur
|
|
||||||
it.aok = it.a.Next()
|
|
||||||
it.bok = it.b.Next()
|
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *mergedPostings) Seek(id uint32) bool {
|
func (it *mergedPostings) Seek(id uint32) bool {
|
||||||
it.aok = it.a.Seek(id)
|
it.aok = it.a.Seek(id)
|
||||||
it.bok = it.b.Seek(id)
|
it.bok = it.b.Seek(id)
|
||||||
|
it.initialized = true
|
||||||
return it.Next()
|
acur, bcur := it.a.At(), it.b.At()
|
||||||
|
if acur < bcur {
|
||||||
|
it.cur = acur
|
||||||
|
} else if acur > bcur {
|
||||||
|
it.cur = bcur
|
||||||
|
} else {
|
||||||
|
it.cur = acur
|
||||||
|
it.bok = it.b.Next()
|
||||||
|
}
|
||||||
|
return it.aok && it.bok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *mergedPostings) Err() error {
|
func (it *mergedPostings) Err() error {
|
||||||
|
@ -227,28 +232,38 @@ func (it *mergedPostings) Err() error {
|
||||||
// listPostings implements the Postings interface over a plain list.
|
// listPostings implements the Postings interface over a plain list.
|
||||||
type listPostings struct {
|
type listPostings struct {
|
||||||
list []uint32
|
list []uint32
|
||||||
idx int
|
cur uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func newListPostings(list []uint32) *listPostings {
|
func newListPostings(list []uint32) *listPostings {
|
||||||
return &listPostings{list: list, idx: -1}
|
return &listPostings{list: list}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *listPostings) At() uint32 {
|
func (it *listPostings) At() uint32 {
|
||||||
return it.list[it.idx]
|
return it.cur
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *listPostings) Next() bool {
|
func (it *listPostings) Next() bool {
|
||||||
it.idx++
|
if len(it.list) > 0 {
|
||||||
return it.idx < len(it.list)
|
it.cur = it.list[0]
|
||||||
|
it.list = it.list[1:]
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *listPostings) Seek(x uint32) bool {
|
func (it *listPostings) Seek(x uint32) bool {
|
||||||
// Do binary search between current position and end.
|
// Do binary search between current position and end.
|
||||||
it.idx += sort.Search(len(it.list)-it.idx, func(i int) bool {
|
i := sort.Search(len(it.list), func(i int) bool {
|
||||||
return it.list[i+it.idx] >= x
|
return it.list[i] >= x
|
||||||
})
|
})
|
||||||
return it.idx < len(it.list)
|
if i < len(it.list) {
|
||||||
|
it.cur = it.list[i]
|
||||||
|
it.list = it.list[i+1:]
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
it.list = nil
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *listPostings) Err() error {
|
func (it *listPostings) Err() error {
|
||||||
|
@ -259,32 +274,40 @@ func (it *listPostings) Err() error {
|
||||||
// big endian numbers.
|
// big endian numbers.
|
||||||
type bigEndianPostings struct {
|
type bigEndianPostings struct {
|
||||||
list []byte
|
list []byte
|
||||||
idx int
|
cur uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBigEndianPostings(list []byte) *bigEndianPostings {
|
func newBigEndianPostings(list []byte) *bigEndianPostings {
|
||||||
return &bigEndianPostings{list: list, idx: -1}
|
return &bigEndianPostings{list: list}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *bigEndianPostings) At() uint32 {
|
func (it *bigEndianPostings) At() uint32 {
|
||||||
idx := 4 * it.idx
|
return it.cur
|
||||||
return binary.BigEndian.Uint32(it.list[idx : idx+4])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *bigEndianPostings) Next() bool {
|
func (it *bigEndianPostings) Next() bool {
|
||||||
it.idx++
|
if len(it.list) >= 4 {
|
||||||
return it.idx*4 < len(it.list)
|
it.cur = binary.BigEndian.Uint32(it.list)
|
||||||
|
it.list = it.list[4:]
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *bigEndianPostings) Seek(x uint32) bool {
|
func (it *bigEndianPostings) Seek(x uint32) bool {
|
||||||
num := len(it.list) / 4
|
num := len(it.list) / 4
|
||||||
// Do binary search between current position and end.
|
// Do binary search between current position and end.
|
||||||
it.idx += sort.Search(num-it.idx, func(i int) bool {
|
i := sort.Search(num, func(i int) bool {
|
||||||
idx := 4 * (it.idx + i)
|
return binary.BigEndian.Uint32(it.list[i*4:]) >= x
|
||||||
val := binary.BigEndian.Uint32(it.list[idx : idx+4])
|
|
||||||
return val >= x
|
|
||||||
})
|
})
|
||||||
return it.idx*4 < len(it.list)
|
if i < num {
|
||||||
|
j := i * 4
|
||||||
|
it.cur = binary.BigEndian.Uint32(it.list[j:])
|
||||||
|
it.list = it.list[j+4:]
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
it.list = nil
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *bigEndianPostings) Err() error {
|
func (it *bigEndianPostings) Err() error {
|
||||||
|
|
|
@ -33,6 +33,13 @@ func (m *mockPostings) Seek(v uint32) bool { return m.seek(v) }
|
||||||
func (m *mockPostings) Value() uint32 { return m.value() }
|
func (m *mockPostings) Value() uint32 { return m.value() }
|
||||||
func (m *mockPostings) Err() error { return m.err() }
|
func (m *mockPostings) Err() error { return m.err() }
|
||||||
|
|
||||||
|
func expandPostings(p Postings) (res []uint32, err error) {
|
||||||
|
for p.Next() {
|
||||||
|
res = append(res, p.At())
|
||||||
|
}
|
||||||
|
return res, p.Err()
|
||||||
|
}
|
||||||
|
|
||||||
func TestIntersect(t *testing.T) {
|
func TestIntersect(t *testing.T) {
|
||||||
var cases = []struct {
|
var cases = []struct {
|
||||||
a, b []uint32
|
a, b []uint32
|
||||||
|
@ -233,19 +240,9 @@ func TestMergedPostingsSeek(t *testing.T) {
|
||||||
p := newMergedPostings(a, b)
|
p := newMergedPostings(a, b)
|
||||||
|
|
||||||
require.Equal(t, c.success, p.Seek(c.seek))
|
require.Equal(t, c.success, p.Seek(c.seek))
|
||||||
|
lst, err := expandPostings(p)
|
||||||
if c.success {
|
require.NoError(t, err)
|
||||||
// check the current element and then proceed to check the rest.
|
require.Equal(t, c.res, lst)
|
||||||
i := 0
|
|
||||||
require.Equal(t, c.res[i], p.At())
|
|
||||||
|
|
||||||
for p.Next() {
|
|
||||||
i++
|
|
||||||
require.Equal(t, int(c.res[i]), int(p.At()))
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, len(c.res)-1, i)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -296,16 +293,16 @@ func TestBigEndian(t *testing.T) {
|
||||||
ls[600] + 1, ls[601], true,
|
ls[600] + 1, ls[601], true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ls[600] + 1, ls[601], true,
|
ls[600] + 1, ls[602], true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ls[600] + 1, ls[601], true,
|
ls[600] + 1, ls[603], true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ls[0], ls[601], true,
|
ls[0], ls[604], true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ls[600], ls[601], true,
|
ls[600], ls[605], true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ls[999], ls[999], true,
|
ls[999], ls[999], true,
|
||||||
|
@ -316,15 +313,11 @@ func TestBigEndian(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bep := newBigEndianPostings(beLst)
|
bep := newBigEndianPostings(beLst)
|
||||||
bep.Next()
|
|
||||||
|
|
||||||
for _, v := range table {
|
for _, v := range table {
|
||||||
require.Equal(t, v.found, bep.Seek(v.seek))
|
require.Equal(t, v.found, bep.Seek(v.seek))
|
||||||
// Once you seek beyond, At() will panic.
|
require.Equal(t, v.val, bep.At())
|
||||||
if v.found {
|
require.Nil(t, bep.Err())
|
||||||
require.Equal(t, v.val, bep.At())
|
|
||||||
require.Nil(t, bep.Err())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue