Fix missing postings in Merge and Intersect (#77)

* Test for a previous implematation of Intersect

Before we were moving the postings list everytime we create a new
chained `intersectPostings`. That was causing some postings to be
skipped. This test fails on the older version.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* Advance on Seek only when valid.

Issue:
Before in mergedPostings and others we advance everytime we `Seek`,
which causes issues with `Intersect`.

Take the case, where we have a mergedPostings = m merging, a: {10, 20, 30} and
b: {15, 25, 35}. Everytime we `Seek`, we do a.Seek and b.Seek.

Now if we Intersect m with {21, 22, 23, 30}, we would do Seek({21,22,23}) which
would advance a and b beyond 30.

Fix:
Now we advance only when the seeking value is greater than the current
value, as the definition specifies.

Also, posting 0 will not be a valid posting and will be used to signal
finished or un-initialized PostingsList.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* Add test for Merge+Intersect edgecase.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* Add comments to trivial tests.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-05-12 13:14:41 +05:30 committed by Fabian Reinartz
parent 968821e575
commit 2fa647f50b
3 changed files with 76 additions and 30 deletions

View file

@ -112,7 +112,7 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
h := &headBlock{
dir: dir,
wal: wal,
series: []*memSeries{},
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
hashes: map[uint64][]*memSeries{},
values: map[string]stringset{},
postings: &memPostings{m: make(map[term][]uint32)},

View file

@ -207,19 +207,15 @@ func (it *mergedPostings) Next() bool {
}
func (it *mergedPostings) Seek(id uint32) bool {
if it.cur >= id {
return true
}
it.aok = it.a.Seek(id)
it.bok = it.b.Seek(id)
it.initialized = true
acur, bcur := it.a.At(), it.b.At()
if acur < bcur {
it.cur = acur
} else if acur > bcur {
it.cur = bcur
} else {
it.cur = acur
it.bok = it.b.Next()
}
return it.aok && it.bok
return it.Next()
}
func (it *mergedPostings) Err() error {
@ -249,10 +245,16 @@ func (it *listPostings) Next() bool {
it.list = it.list[1:]
return true
}
it.cur = 0
return false
}
func (it *listPostings) Seek(x uint32) bool {
// If the current value satisfies, then return.
if it.cur >= x {
return true
}
// Do binary search between current position and end.
i := sort.Search(len(it.list), func(i int) bool {
return it.list[i] >= x
@ -295,6 +297,10 @@ func (it *bigEndianPostings) Next() bool {
}
func (it *bigEndianPostings) Seek(x uint32) bool {
if it.cur >= x {
return true
}
num := len(it.list) / 4
// Do binary search between current position and end.
i := sort.Search(num, func(i int) bool {

View file

@ -78,23 +78,39 @@ func TestIntersect(t *testing.T) {
func TestMultiIntersect(t *testing.T) {
var cases = []struct {
a, b, c []uint32
p [][]uint32
res []uint32
}{
{
a: []uint32{1, 2, 3, 4, 5, 6, 1000, 1001},
b: []uint32{2, 4, 5, 6, 7, 8, 999, 1001},
c: []uint32{1, 2, 5, 6, 7, 8, 1001, 1200},
p: [][]uint32{
{1, 2, 3, 4, 5, 6, 1000, 1001},
{2, 4, 5, 6, 7, 8, 999, 1001},
{1, 2, 5, 6, 7, 8, 1001, 1200},
},
res: []uint32{2, 5, 6, 1001},
},
// One of the reproduceable cases for:
// https://github.com/prometheus/prometheus/issues/2616
// The initialisation of intersectPostings was moving the iterator forward
// prematurely making us miss some postings.
{
p: [][]uint32{
{1, 2},
{1, 2},
{1, 2},
{2},
},
res: []uint32{2},
},
}
for _, c := range cases {
pa := newListPostings(c.a)
pb := newListPostings(c.b)
pc := newListPostings(c.c)
ps := make([]Postings, 0, len(c.p))
for _, postings := range c.p {
ps = append(ps, newListPostings(postings))
}
res, err := expandPostings(Intersect(pa, pb, pc))
res, err := expandPostings(Intersect(ps...))
require.NoError(t, err)
require.Equal(t, c.res, res)
@ -200,12 +216,12 @@ func TestMergedPostingsSeek(t *testing.T) {
res []uint32
}{
{
a: []uint32{1, 2, 3, 4, 5},
a: []uint32{2, 3, 4, 5},
b: []uint32{6, 7, 8, 9, 10},
seek: 0,
seek: 1,
success: true,
res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
res: []uint32{2, 3, 4, 5, 6, 7, 8, 9, 10},
},
{
a: []uint32{1, 2, 3, 4, 5},
@ -240,10 +256,17 @@ func TestMergedPostingsSeek(t *testing.T) {
p := newMergedPostings(a, b)
require.Equal(t, c.success, p.Seek(c.seek))
// After Seek(), At() should be called.
if c.success {
start := p.At()
lst, err := expandPostings(p)
require.NoError(t, err)
lst = append([]uint32{start}, lst...)
require.Equal(t, c.res, lst)
}
}
return
}
@ -293,16 +316,16 @@ func TestBigEndian(t *testing.T) {
ls[600] + 1, ls[601], true,
},
{
ls[600] + 1, ls[602], true,
ls[600] + 1, ls[601], true,
},
{
ls[600] + 1, ls[603], true,
ls[600] + 1, ls[601], true,
},
{
ls[0], ls[604], true,
ls[0], ls[601], true,
},
{
ls[600], ls[605], true,
ls[600], ls[601], true,
},
{
ls[999], ls[999], true,
@ -321,3 +344,20 @@ func TestBigEndian(t *testing.T) {
}
})
}
func TestIntersectWithMerge(t *testing.T) {
// One of the reproduceable cases for:
// https://github.com/prometheus/prometheus/issues/2616
a := newListPostings([]uint32{21, 22, 23, 24, 25, 30})
b := newMergedPostings(
newListPostings([]uint32{10, 20, 30}),
newListPostings([]uint32{15, 26, 30}),
)
p := Intersect(a, b)
res, err := expandPostings(p)
require.NoError(t, err)
require.Equal(t, []uint32{30}, res)
}