prometheus/index/postings.go
Brian Brazil f9bc0817a1
Avoid allocation in mergedPostings.Seek. (#586)
See
https://github.com/prometheus/prometheus/issues/5424#issuecomment-485199651

benchmark                                                            old
ns/op      new ns/op       delta
BenchmarkHeadPostingForMatchers/n="1"-4
5266           5267            +0.02%
BenchmarkHeadPostingForMatchers/n="1",j="foo"-4
6469           7801            +20.59%
BenchmarkHeadPostingForMatchers/j="foo",n="1"-4
4984           4963            -0.42%
BenchmarkHeadPostingForMatchers/n="1",j!="foo"-4
7137           7527            +5.46%
BenchmarkHeadPostingForMatchers/i=~".*"-4
8055830839     8117651746      +0.77%
BenchmarkHeadPostingForMatchers/i=~".+"-4
9369298293     9742710251      +3.99%
BenchmarkHeadPostingForMatchers/i=~""-4
2363120708     2507789029      +6.12%
BenchmarkHeadPostingForMatchers/i!=""-4
9387069195     11740628557     +25.07%
BenchmarkHeadPostingForMatchers/n="1",i=~".*",j="foo"-4
109668312      107644136       -1.85%
BenchmarkHeadPostingForMatchers/n="1",i=~".*",i!="2",j="foo"-4
106022679      105759760       -0.25%
BenchmarkHeadPostingForMatchers/n="1",i!="",j="foo"-4
330201810      266421024       -19.32%
BenchmarkHeadPostingForMatchers/n="1",i=~".+",j="foo"-4
355627801      292774913       -17.67%
BenchmarkHeadPostingForMatchers/n="1",i=~"1.+",j="foo"-4
108132305      106697690       -1.33%
BenchmarkHeadPostingForMatchers/n="1",i=~".+",i!="2",j="foo"-4
358836972      362579998       +1.04%
BenchmarkHeadPostingForMatchers/n="1",i=~".+",i!~"2.*",j="foo"-4
555879392      575858378       +3.59%

benchmark                                                            old
allocs     new allocs     delta
BenchmarkHeadPostingForMatchers/n="1"-4                              8
8              +0.00%
BenchmarkHeadPostingForMatchers/n="1",j="foo"-4                      11
11             +0.00%
BenchmarkHeadPostingForMatchers/j="foo",n="1"-4                      11
11             +0.00%
BenchmarkHeadPostingForMatchers/n="1",j!="foo"-4                     17
17             +0.00%
BenchmarkHeadPostingForMatchers/i=~".*"-4                            58
58             +0.00%
BenchmarkHeadPostingForMatchers/i=~".+"-4
100115         100115         +0.00%
BenchmarkHeadPostingForMatchers/i=~""-4
100125         100125         +0.00%
BenchmarkHeadPostingForMatchers/i!=""-4
100112         100112         +0.00%
BenchmarkHeadPostingForMatchers/n="1",i=~".*",j="foo"-4              19
19             +0.00%
BenchmarkHeadPostingForMatchers/n="1",i=~".*",i!="2",j="foo"-4       22
22             +0.00%
BenchmarkHeadPostingForMatchers/n="1",i!="",j="foo"-4
100109         100079         -0.03%
BenchmarkHeadPostingForMatchers/n="1",i=~".+",j="foo"-4
100110         100079         -0.03%
BenchmarkHeadPostingForMatchers/n="1",i=~"1.+",j="foo"-4
11176          11167          -0.08%
BenchmarkHeadPostingForMatchers/n="1",i=~".+",i!="2",j="foo"-4
100112         100082         -0.03%
BenchmarkHeadPostingForMatchers/n="1",i=~".+",i!~"2.*",j="foo"-4
111307         111277         -0.03%

benchmark                                                            old
bytes     new bytes     delta
BenchmarkHeadPostingForMatchers/n="1"-4
1360          1360          +0.00%
BenchmarkHeadPostingForMatchers/n="1",j="foo"-4
1392          1392          +0.00%
BenchmarkHeadPostingForMatchers/j="foo",n="1"-4
1392          1392          +0.00%
BenchmarkHeadPostingForMatchers/n="1",j!="foo"-4
1520          1520          +0.00%
BenchmarkHeadPostingForMatchers/i=~".*"-4
258254040     258254040     +0.00%
BenchmarkHeadPostingForMatchers/i=~".+"-4
281554280     281554280     +0.00%
BenchmarkHeadPostingForMatchers/i=~""-4
241554888     241554888     +0.00%
BenchmarkHeadPostingForMatchers/i!=""-4
281553664     281553664     +0.00%
BenchmarkHeadPostingForMatchers/n="1",i=~".*",j="foo"-4
1607277       1607277       +0.00%
BenchmarkHeadPostingForMatchers/n="1",i=~".*",i!="2",j="foo"-4
1607389       1607389       +0.00%
BenchmarkHeadPostingForMatchers/n="1",i!="",j="foo"-4
73076560      24907600      -65.92%
BenchmarkHeadPostingForMatchers/n="1",i=~".+",j="foo"-4
73076765      24907723      -65.92%
BenchmarkHeadPostingForMatchers/n="1",i=~"1.+",j="foo"-4
5416925       3794909       -29.94%
BenchmarkHeadPostingForMatchers/n="1",i=~".+",i!="2",j="foo"-4
73076779      24907819      -65.92%
BenchmarkHeadPostingForMatchers/n="1",i=~".+",i!~"2.*",j="foo"-4
99874860      51705900      -48.23%

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2019-04-24 13:12:58 +01:00

705 lines
14 KiB
Go

// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package index
import (
"container/heap"
"encoding/binary"
"runtime"
"sort"
"strings"
"sync"
"github.com/prometheus/tsdb/labels"
)
var allPostingsKey = labels.Label{}
// AllPostingsKey returns the label key that is used to store the postings list of all existing IDs.
func AllPostingsKey() (name, value string) {
return allPostingsKey.Name, allPostingsKey.Value
}
// MemPostings holds postings list for series ID per label pair. They may be written
// to out of order.
// ensureOrder() must be called once before any reads are done. This allows for quick
// unordered batch fills on startup.
type MemPostings struct {
mtx sync.RWMutex
m map[string]map[string][]uint64
ordered bool
}
// NewMemPostings returns a memPostings that's ready for reads and writes.
func NewMemPostings() *MemPostings {
return &MemPostings{
m: make(map[string]map[string][]uint64, 512),
ordered: true,
}
}
// NewUnorderedMemPostings returns a memPostings that is not safe to be read from
// until ensureOrder was called once.
func NewUnorderedMemPostings() *MemPostings {
return &MemPostings{
m: make(map[string]map[string][]uint64, 512),
ordered: false,
}
}
// SortedKeys returns a list of sorted label keys of the postings.
func (p *MemPostings) SortedKeys() []labels.Label {
p.mtx.RLock()
keys := make([]labels.Label, 0, len(p.m))
for n, e := range p.m {
for v := range e {
keys = append(keys, labels.Label{Name: n, Value: v})
}
}
p.mtx.RUnlock()
sort.Slice(keys, func(i, j int) bool {
if d := strings.Compare(keys[i].Name, keys[j].Name); d != 0 {
return d < 0
}
return keys[i].Value < keys[j].Value
})
return keys
}
// Get returns a postings list for the given label pair.
func (p *MemPostings) Get(name, value string) Postings {
var lp []uint64
p.mtx.RLock()
l := p.m[name]
if l != nil {
lp = l[value]
}
p.mtx.RUnlock()
if lp == nil {
return EmptyPostings()
}
return newListPostings(lp...)
}
// All returns a postings list over all documents ever added.
func (p *MemPostings) All() Postings {
return p.Get(AllPostingsKey())
}
// EnsureOrder ensures that all postings lists are sorted. After it returns all further
// calls to add and addFor will insert new IDs in a sorted manner.
func (p *MemPostings) EnsureOrder() {
p.mtx.Lock()
defer p.mtx.Unlock()
if p.ordered {
return
}
n := runtime.GOMAXPROCS(0)
workc := make(chan []uint64)
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
for l := range workc {
sort.Slice(l, func(i, j int) bool { return l[i] < l[j] })
}
wg.Done()
}()
}
for _, e := range p.m {
for _, l := range e {
workc <- l
}
}
close(workc)
wg.Wait()
p.ordered = true
}
// Delete removes all ids in the given map from the postings lists.
func (p *MemPostings) Delete(deleted map[uint64]struct{}) {
var keys, vals []string
// Collect all keys relevant for deletion once. New keys added afterwards
// can by definition not be affected by any of the given deletes.
p.mtx.RLock()
for n := range p.m {
keys = append(keys, n)
}
p.mtx.RUnlock()
for _, n := range keys {
p.mtx.RLock()
vals = vals[:0]
for v := range p.m[n] {
vals = append(vals, v)
}
p.mtx.RUnlock()
// For each posting we first analyse whether the postings list is affected by the deletes.
// If yes, we actually reallocate a new postings list.
for _, l := range vals {
// Only lock for processing one postings list so we don't block reads for too long.
p.mtx.Lock()
found := false
for _, id := range p.m[n][l] {
if _, ok := deleted[id]; ok {
found = true
break
}
}
if !found {
p.mtx.Unlock()
continue
}
repl := make([]uint64, 0, len(p.m[n][l]))
for _, id := range p.m[n][l] {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}
if len(repl) > 0 {
p.m[n][l] = repl
} else {
delete(p.m[n], l)
}
p.mtx.Unlock()
}
p.mtx.Lock()
if len(p.m[n]) == 0 {
delete(p.m, n)
}
p.mtx.Unlock()
}
}
// Iter calls f for each postings list. It aborts if f returns an error and returns it.
func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error {
p.mtx.RLock()
defer p.mtx.RUnlock()
for n, e := range p.m {
for v, p := range e {
if err := f(labels.Label{Name: n, Value: v}, newListPostings(p...)); err != nil {
return err
}
}
}
return nil
}
// Add a label set to the postings index.
func (p *MemPostings) Add(id uint64, lset labels.Labels) {
p.mtx.Lock()
for _, l := range lset {
p.addFor(id, l)
}
p.addFor(id, allPostingsKey)
p.mtx.Unlock()
}
func (p *MemPostings) addFor(id uint64, l labels.Label) {
nm, ok := p.m[l.Name]
if !ok {
nm = map[string][]uint64{}
p.m[l.Name] = nm
}
list := append(nm[l.Value], id)
nm[l.Value] = list
if !p.ordered {
return
}
// There is no guarantee that no higher ID was inserted before as they may
// be generated independently before adding them to postings.
// We repair order violations on insert. The invariant is that the first n-1
// items in the list are already sorted.
for i := len(list) - 1; i >= 1; i-- {
if list[i] >= list[i-1] {
break
}
list[i], list[i-1] = list[i-1], list[i]
}
}
// ExpandPostings returns the postings expanded as a slice.
func ExpandPostings(p Postings) (res []uint64, err error) {
for p.Next() {
res = append(res, p.At())
}
return res, p.Err()
}
// Postings provides iterative access over a postings list.
type Postings interface {
// Next advances the iterator and returns true if another value was found.
Next() bool
// Seek advances the iterator to value v or greater and returns
// true if a value was found.
Seek(v uint64) bool
// At returns the value at the current iterator position.
At() uint64
// Err returns the last error of the iterator.
Err() error
}
// errPostings is an empty iterator that always errors.
type errPostings struct {
err error
}
func (e errPostings) Next() bool { return false }
func (e errPostings) Seek(uint64) bool { return false }
func (e errPostings) At() uint64 { return 0 }
func (e errPostings) Err() error { return e.err }
var emptyPostings = errPostings{}
// EmptyPostings returns a postings list that's always empty.
// NOTE: Returning EmptyPostings sentinel when index.Postings struct has no postings is recommended.
// It triggers optimized flow in other functions like Intersect, Without etc.
func EmptyPostings() Postings {
return emptyPostings
}
// ErrPostings returns new postings that immediately error.
func ErrPostings(err error) Postings {
return errPostings{err}
}
// Intersect returns a new postings list over the intersection of the
// input postings.
func Intersect(its ...Postings) Postings {
if len(its) == 0 {
return EmptyPostings()
}
if len(its) == 1 {
return its[0]
}
l := len(its) / 2
a := Intersect(its[:l]...)
b := Intersect(its[l:]...)
if a == EmptyPostings() || b == EmptyPostings() {
return EmptyPostings()
}
return newIntersectPostings(a, b)
}
type intersectPostings struct {
a, b Postings
cur uint64
}
func newIntersectPostings(a, b Postings) *intersectPostings {
return &intersectPostings{a: a, b: b}
}
func (it *intersectPostings) At() uint64 {
return it.cur
}
func (it *intersectPostings) doNext(id uint64) bool {
for {
if !it.b.Seek(id) {
return false
}
if vb := it.b.At(); vb != id {
if !it.a.Seek(vb) {
return false
}
id = it.a.At()
if vb != id {
continue
}
}
it.cur = id
return true
}
}
func (it *intersectPostings) Next() bool {
if !it.a.Next() {
return false
}
return it.doNext(it.a.At())
}
func (it *intersectPostings) Seek(id uint64) bool {
if !it.a.Seek(id) {
return false
}
return it.doNext(it.a.At())
}
func (it *intersectPostings) Err() error {
if it.a.Err() != nil {
return it.a.Err()
}
return it.b.Err()
}
// Merge returns a new iterator over the union of the input iterators.
func Merge(its ...Postings) Postings {
if len(its) == 0 {
return EmptyPostings()
}
if len(its) == 1 {
return its[0]
}
p, ok := newMergedPostings(its)
if !ok {
return EmptyPostings()
}
return p
}
type postingsHeap []Postings
func (h postingsHeap) Len() int { return len(h) }
func (h postingsHeap) Less(i, j int) bool { return h[i].At() < h[j].At() }
func (h *postingsHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
func (h *postingsHeap) Push(x interface{}) {
*h = append(*h, x.(Postings))
}
func (h *postingsHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type mergedPostings struct {
h postingsHeap
initilized bool
heaped bool
cur uint64
err error
}
func newMergedPostings(p []Postings) (m *mergedPostings, nonEmpty bool) {
ph := make(postingsHeap, 0, len(p))
for _, it := range p {
// NOTE: mergedPostings struct requires the user to issue an initial Next.
if it.Next() {
ph = append(ph, it)
} else {
if it.Err() != nil {
return &mergedPostings{err: it.Err()}, true
}
}
}
if len(ph) == 0 {
return nil, false
}
return &mergedPostings{h: ph}, true
}
func (it *mergedPostings) Next() bool {
if it.h.Len() == 0 || it.err != nil {
return false
}
if !it.heaped {
heap.Init(&it.h)
it.heaped = true
}
// The user must issue an initial Next.
if !it.initilized {
it.cur = it.h[0].At()
it.initilized = true
return true
}
for {
cur := it.h[0]
if !cur.Next() {
heap.Pop(&it.h)
if cur.Err() != nil {
it.err = cur.Err()
return false
}
if it.h.Len() == 0 {
return false
}
} else {
// Value of top of heap has changed, re-heapify.
heap.Fix(&it.h, 0)
}
if it.h[0].At() != it.cur {
it.cur = it.h[0].At()
return true
}
}
}
func (it *mergedPostings) Seek(id uint64) bool {
if it.h.Len() == 0 || it.err != nil {
return false
}
if !it.initilized {
if !it.Next() {
return false
}
}
if it.cur >= id {
return true
}
// Heapifying when there is lots of Seeks is inefficient,
// mark to be re-heapified on the Next() call.
it.heaped = false
lowest := ^uint64(0)
n := 0
for _, i := range it.h {
if i.Seek(id) {
it.h[n] = i
n++
if i.At() < lowest {
lowest = i.At()
}
} else {
if i.Err() != nil {
it.err = i.Err()
return false
}
}
}
it.h = it.h[:n]
if len(it.h) == 0 {
return false
}
it.cur = lowest
return true
}
func (it mergedPostings) At() uint64 {
return it.cur
}
func (it mergedPostings) Err() error {
return it.err
}
// Without returns a new postings list that contains all elements from the full list that
// are not in the drop list.
func Without(full, drop Postings) Postings {
if full == EmptyPostings() {
return EmptyPostings()
}
if drop == EmptyPostings() {
return full
}
return newRemovedPostings(full, drop)
}
type removedPostings struct {
full, remove Postings
cur uint64
initialized bool
fok, rok bool
}
func newRemovedPostings(full, remove Postings) *removedPostings {
return &removedPostings{
full: full,
remove: remove,
}
}
func (rp *removedPostings) At() uint64 {
return rp.cur
}
func (rp *removedPostings) Next() bool {
if !rp.initialized {
rp.fok = rp.full.Next()
rp.rok = rp.remove.Next()
rp.initialized = true
}
for {
if !rp.fok {
return false
}
if !rp.rok {
rp.cur = rp.full.At()
rp.fok = rp.full.Next()
return true
}
fcur, rcur := rp.full.At(), rp.remove.At()
if fcur < rcur {
rp.cur = fcur
rp.fok = rp.full.Next()
return true
} else if rcur < fcur {
// Forward the remove postings to the right position.
rp.rok = rp.remove.Seek(fcur)
} else {
// Skip the current posting.
rp.fok = rp.full.Next()
}
}
}
func (rp *removedPostings) Seek(id uint64) bool {
if rp.cur >= id {
return true
}
rp.fok = rp.full.Seek(id)
rp.rok = rp.remove.Seek(id)
rp.initialized = true
return rp.Next()
}
func (rp *removedPostings) Err() error {
if rp.full.Err() != nil {
return rp.full.Err()
}
return rp.remove.Err()
}
// ListPostings implements the Postings interface over a plain list.
type ListPostings struct {
list []uint64
cur uint64
}
func NewListPostings(list []uint64) Postings {
return newListPostings(list...)
}
func newListPostings(list ...uint64) *ListPostings {
return &ListPostings{list: list}
}
func (it *ListPostings) At() uint64 {
return it.cur
}
func (it *ListPostings) Next() bool {
if len(it.list) > 0 {
it.cur = it.list[0]
it.list = it.list[1:]
return true
}
it.cur = 0
return false
}
func (it *ListPostings) Seek(x uint64) bool {
// If the current value satisfies, then return.
if it.cur >= x {
return true
}
if len(it.list) == 0 {
return false
}
// Do binary search between current position and end.
i := sort.Search(len(it.list), func(i int) bool {
return it.list[i] >= x
})
if i < len(it.list) {
it.cur = it.list[i]
it.list = it.list[i+1:]
return true
}
it.list = nil
return false
}
func (it *ListPostings) Err() error {
return nil
}
// bigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
list []byte
cur uint32
}
func newBigEndianPostings(list []byte) *bigEndianPostings {
return &bigEndianPostings{list: list}
}
func (it *bigEndianPostings) At() uint64 {
return uint64(it.cur)
}
func (it *bigEndianPostings) Next() bool {
if len(it.list) >= 4 {
it.cur = binary.BigEndian.Uint32(it.list)
it.list = it.list[4:]
return true
}
return false
}
func (it *bigEndianPostings) Seek(x uint64) bool {
if uint64(it.cur) >= x {
return true
}
num := len(it.list) / 4
// Do binary search between current position and end.
i := sort.Search(num, func(i int) bool {
return binary.BigEndian.Uint32(it.list[i*4:]) >= uint32(x)
})
if i < num {
j := i * 4
it.cur = binary.BigEndian.Uint32(it.list[j:])
it.list = it.list[j+4:]
return true
}
it.list = nil
return false
}
func (it *bigEndianPostings) Err() error {
return nil
}