Add merge postings

This commit is contained in:
Fabian Reinartz 2016-12-28 11:02:19 +01:00
parent ab7fbc05ad
commit 286293802b
5 changed files with 145 additions and 17 deletions

View file

@ -24,15 +24,15 @@ type Matcher interface {
}
type EqualMatcher struct {
LabelName, Value string
name, value string
}
func (m *EqualMatcher) Name() string { return m.LabelName }
func (m *EqualMatcher) Matches(v string) bool { return v == m.Value }
func (m *EqualMatcher) Name() string { return m.name }
func (m *EqualMatcher) Matches(v string) bool { return v == m.value }
// NewEqualMatcher returns a new matcher matching an exact label value.
func NewEqualMatcher(name, value string) Matcher {
return &EqualMatcher{LabelName: name, Value: value}
return &EqualMatcher{name: name, value: value}
}
type regexpMatcher struct {
@ -58,9 +58,7 @@ type notMatcher struct {
Matcher
}
func (m *notMatcher) Matches(v string) bool {
return !m.Matcher.Matches(v)
}
func (m *notMatcher) Matches(v string) bool { return !m.Matcher.Matches(v) }
// Not inverts the matcher's matching result.
func Not(m Matcher) Matcher {

View file

@ -73,9 +73,10 @@ func Intersect(its ...Postings) Postings {
return a
}
var emptyPostings = errPostings{}
type intersectPostings struct {
a, b Postings
av, bc uint32
aok, bok bool
cur uint32
}
@ -133,29 +134,75 @@ func Merge(its ...Postings) Postings {
a := its[0]
for _, b := range its[1:] {
a = &mergePostings{a: a, b: b}
a = newMergePostings(a, b)
}
return a
}
type mergePostings struct {
a, b Postings
a, b Postings
aok, bok bool
cur uint32
}
func newMergePostings(a, b Postings) *mergePostings {
it := &mergePostings{a: a, b: b}
it.aok = it.a.Next()
it.bok = it.b.Next()
return it
}
func (it *mergePostings) Value() uint32 {
return 0
return it.cur
}
func (it *mergePostings) Next() bool {
return false
if !it.aok && !it.bok {
return false
}
if !it.aok {
it.cur = it.b.Value()
it.bok = it.b.Next()
return true
}
if !it.bok {
it.cur = it.a.Value()
it.aok = it.a.Next()
return true
}
acur, bcur := it.a.Value(), it.b.Value()
if acur < bcur {
it.cur = acur
it.aok = it.a.Next()
return true
}
if bcur < acur {
it.cur = bcur
it.bok = it.b.Next()
return true
}
it.cur = acur
it.aok = it.a.Next()
it.bok = it.b.Next()
return true
}
func (it *mergePostings) Seek(id uint32) bool {
return false
it.aok = it.a.Seek(id)
it.bok = it.b.Seek(id)
return it.Next()
}
func (it *mergePostings) Err() error {
return nil
if it.a.Err() != nil {
return it.a.Err()
}
return it.b.Err()
}
// listPostings implements the Postings interface over a plain list.

View file

@ -17,7 +17,7 @@ func (m *mockPostings) Seek(v uint32) bool { return m.seek(v) }
func (m *mockPostings) Value() uint32 { return m.value() }
func (m *mockPostings) Err() error { return m.err() }
func TestIntersectIterator(t *testing.T) {
func TestIntersect(t *testing.T) {
var cases = []struct {
a, b []uint32
res []uint32
@ -117,3 +117,67 @@ func BenchmarkIntersect(t *testing.B) {
}
}
}
func TestMultiMerge(t *testing.T) {
var cases = []struct {
a, b, c []uint32
res []uint32
}{
{
a: []uint32{1, 2, 3, 4, 5, 6, 1000, 1001},
b: []uint32{2, 4, 5, 6, 7, 8, 999, 1001},
c: []uint32{1, 2, 5, 6, 7, 8, 1001, 1200},
res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200},
},
}
for _, c := range cases {
i1 := newListPostings(c.a)
i2 := newListPostings(c.b)
i3 := newListPostings(c.c)
res, err := expandPostings(Merge(i1, i2, i3))
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if !reflect.DeepEqual(res, c.res) {
t.Fatalf("Expected %v but got %v", c.res, res)
}
}
}
func TestMerge(t *testing.T) {
var cases = []struct {
a, b []uint32
res []uint32
}{
{
a: []uint32{1, 2, 3, 4, 5},
b: []uint32{6, 7, 8, 9, 10},
res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
},
{
a: []uint32{1, 2, 3, 4, 5},
b: []uint32{4, 5, 6, 7, 8},
res: []uint32{1, 2, 3, 4, 5, 6, 7, 8},
},
{
a: []uint32{1, 2, 3, 4, 9, 10},
b: []uint32{1, 4, 5, 6, 7, 8, 10, 11},
res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
}
for _, c := range cases {
a := newListPostings(c.a)
b := newListPostings(c.b)
res, err := expandPostings(newMergePostings(a, b))
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if !reflect.DeepEqual(res, c.res) {
t.Fatalf("Expected %v but got %v", c.res, res)
}
}
}

View file

@ -236,7 +236,7 @@ func (q *blockQuerier) selectSingle(m labels.Matcher) Postings {
}
if len(res) == 0 {
return errPostings{err: nil}
return emptyPostings
}
var rit []Postings
@ -249,7 +249,7 @@ func (q *blockQuerier) selectSingle(m labels.Matcher) Postings {
rit = append(rit, it)
}
return Intersect(rit...)
return Merge(rit...)
}
func (q *blockQuerier) LabelValues(name string) ([]string, error) {

View file

@ -24,3 +24,22 @@ func BenchmarkMapConversion(b *testing.B) {
}
}
}
func BenchmarkListIter(b *testing.B) {
var list []uint32
for i := 0; i < 1e4; i++ {
list = append(list, uint32(i))
}
b.ResetTimer()
total := uint32(0)
for j := 0; j < b.N; j++ {
sum := uint32(0)
for _, k := range list {
sum += k
}
total += sum
}
}