Make MemPostings nested.

This saves memory, about a quarter of the size of the postings map
itself with high-cardinality labels (not including the post ids).

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2018-11-02 14:27:19 +00:00
parent fc99b8bb3a
commit 407e12d051
3 changed files with 91 additions and 57 deletions

View file

@ -320,10 +320,12 @@ func TestPersistence_index_e2e(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
mi.WritePostings("", "", newListPostings(all)) mi.WritePostings("", "", newListPostings(all))
for l := range postings.m { for n, e := range postings.m {
err = iw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)) for v := range e {
testutil.Ok(t, err) err = iw.WritePostings(n, v, postings.Get(n, v))
mi.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)) testutil.Ok(t, err)
mi.WritePostings(n, v, postings.Get(n, v))
}
} }
err = iw.Close() err = iw.Close()

View file

@ -36,14 +36,14 @@ func AllPostingsKey() (name, value string) {
// unordered batch fills on startup. // unordered batch fills on startup.
type MemPostings struct { type MemPostings struct {
mtx sync.RWMutex mtx sync.RWMutex
m map[labels.Label][]uint64 m map[string]map[string][]uint64
ordered bool ordered bool
} }
// NewMemPostings returns a memPostings that's ready for reads and writes. // NewMemPostings returns a memPostings that's ready for reads and writes.
func NewMemPostings() *MemPostings { func NewMemPostings() *MemPostings {
return &MemPostings{ return &MemPostings{
m: make(map[labels.Label][]uint64, 512), m: make(map[string]map[string][]uint64, 512),
ordered: true, ordered: true,
} }
} }
@ -52,7 +52,7 @@ func NewMemPostings() *MemPostings {
// until ensureOrder was called once. // until ensureOrder was called once.
func NewUnorderedMemPostings() *MemPostings { func NewUnorderedMemPostings() *MemPostings {
return &MemPostings{ return &MemPostings{
m: make(map[labels.Label][]uint64, 512), m: make(map[string]map[string][]uint64, 512),
ordered: false, ordered: false,
} }
} }
@ -62,8 +62,10 @@ func (p *MemPostings) SortedKeys() []labels.Label {
p.mtx.RLock() p.mtx.RLock()
keys := make([]labels.Label, 0, len(p.m)) keys := make([]labels.Label, 0, len(p.m))
for l := range p.m { for n, e := range p.m {
keys = append(keys, l) for v := range e {
keys = append(keys, labels.Label{Name: n, Value: v})
}
} }
p.mtx.RUnlock() p.mtx.RUnlock()
@ -78,14 +80,18 @@ func (p *MemPostings) SortedKeys() []labels.Label {
// Get returns a postings list for the given label pair. // Get returns a postings list for the given label pair.
func (p *MemPostings) Get(name, value string) Postings { func (p *MemPostings) Get(name, value string) Postings {
var lp []uint64
p.mtx.RLock() p.mtx.RLock()
l := p.m[labels.Label{Name: name, Value: value}] l := p.m[name]
if l != nil {
lp = l[value]
}
p.mtx.RUnlock() p.mtx.RUnlock()
if l == nil { if lp == nil {
return EmptyPostings() return EmptyPostings()
} }
return newListPostings(l) return newListPostings(lp)
} }
// All returns a postings list over all documents ever added. // All returns a postings list over all documents ever added.
@ -118,8 +124,10 @@ func (p *MemPostings) EnsureOrder() {
}() }()
} }
for _, l := range p.m { for _, e := range p.m {
workc <- l for _, l := range e {
workc <- l
}
} }
close(workc) close(workc)
wg.Wait() wg.Wait()
@ -129,44 +137,58 @@ func (p *MemPostings) EnsureOrder() {
// Delete removes all ids in the given map from the postings lists. // Delete removes all ids in the given map from the postings lists.
func (p *MemPostings) Delete(deleted map[uint64]struct{}) { func (p *MemPostings) Delete(deleted map[uint64]struct{}) {
var keys []labels.Label var keys, vals []string
// Collect all keys relevant for deletion once. New keys added afterwards // Collect all keys relevant for deletion once. New keys added afterwards
// can by definition not be affected by any of the given deletes. // can by definition not be affected by any of the given deletes.
p.mtx.RLock() p.mtx.RLock()
for l := range p.m { for n := range p.m {
keys = append(keys, l) keys = append(keys, n)
} }
p.mtx.RUnlock() p.mtx.RUnlock()
// For each key we first analyse whether the postings list is affected by the deletes. for _, n := range keys {
// If yes, we actually reallocate a new postings list. p.mtx.RLock()
for _, l := range keys { vals = vals[:0]
// Only lock for processing one postings list so we don't block reads for too long. for v := range p.m[n] {
p.mtx.Lock() vals = append(vals, v)
found := false
for _, id := range p.m[l] {
if _, ok := deleted[id]; ok {
found = true
break
}
} }
if !found { p.mtx.RUnlock()
// For each posting we first analyse whether the postings list is affected by the deletes.
// If yes, we actually reallocate a new postings list.
for _, l := range vals {
// Only lock for processing one postings list so we don't block reads for too long.
p.mtx.Lock()
found := false
for _, id := range p.m[n][l] {
if _, ok := deleted[id]; ok {
found = true
break
}
}
if !found {
p.mtx.Unlock()
continue
}
repl := make([]uint64, 0, len(p.m[n][l]))
for _, id := range p.m[n][l] {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}
if len(repl) > 0 {
p.m[n][l] = repl
} else {
delete(p.m[n], l)
}
p.mtx.Unlock() p.mtx.Unlock()
continue
} }
repl := make([]uint64, 0, len(p.m[l])) p.mtx.Lock()
if len(p.m[n]) == 0 {
for _, id := range p.m[l] { delete(p.m, n)
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}
if len(repl) > 0 {
p.m[l] = repl
} else {
delete(p.m, l)
} }
p.mtx.Unlock() p.mtx.Unlock()
} }
@ -177,9 +199,11 @@ func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error {
p.mtx.RLock() p.mtx.RLock()
defer p.mtx.RUnlock() defer p.mtx.RUnlock()
for l, p := range p.m { for n, e := range p.m {
if err := f(l, newListPostings(p)); err != nil { for v, p := range e {
return err if err := f(labels.Label{Name: n, Value: v}, newListPostings(p)); err != nil {
return err
}
} }
} }
return nil return nil
@ -198,8 +222,13 @@ func (p *MemPostings) Add(id uint64, lset labels.Labels) {
} }
func (p *MemPostings) addFor(id uint64, l labels.Label) { func (p *MemPostings) addFor(id uint64, l labels.Label) {
list := append(p.m[l], id) nm, ok := p.m[l.Name]
p.m[l] = list if !ok {
nm = map[string][]uint64{}
p.m[l.Name] = nm
}
list := append(nm[l.Value], id)
nm[l.Value] = list
if !p.ordered { if !p.ordered {
return return

View file

@ -20,21 +20,22 @@ import (
"sort" "sort"
"testing" "testing"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
) )
func TestMemPostings_addFor(t *testing.T) { func TestMemPostings_addFor(t *testing.T) {
p := NewMemPostings() p := NewMemPostings()
p.m[allPostingsKey] = []uint64{1, 2, 3, 4, 6, 7, 8} p.m[allPostingsKey.Name] = map[string][]uint64{}
p.m[allPostingsKey.Name][allPostingsKey.Value] = []uint64{1, 2, 3, 4, 6, 7, 8}
p.addFor(5, allPostingsKey) p.addFor(5, allPostingsKey)
testutil.Equals(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey]) testutil.Equals(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey.Name][allPostingsKey.Value])
} }
func TestMemPostings_ensureOrder(t *testing.T) { func TestMemPostings_ensureOrder(t *testing.T) {
p := NewUnorderedMemPostings() p := NewUnorderedMemPostings()
p.m["a"] = map[string][]uint64{}
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
l := make([]uint64, 100) l := make([]uint64, 100)
@ -43,17 +44,19 @@ func TestMemPostings_ensureOrder(t *testing.T) {
} }
v := fmt.Sprintf("%d", i) v := fmt.Sprintf("%d", i)
p.m[labels.Label{"a", v}] = l p.m["a"][v] = l
} }
p.EnsureOrder() p.EnsureOrder()
for _, l := range p.m { for _, e := range p.m {
ok := sort.SliceIsSorted(l, func(i, j int) bool { for _, l := range e {
return l[i] < l[j] ok := sort.SliceIsSorted(l, func(i, j int) bool {
}) return l[i] < l[j]
if !ok { })
t.Fatalf("postings list %v is not sorted", l) if !ok {
t.Fatalf("postings list %v is not sorted", l)
}
} }
} }
} }