mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-02 08:31:11 -08:00
Load postings in batch on startup
This allows to insert IDs to postings out of order until a trigger function is called. This avoids the insertion sort we usually do which can be very costly since WAL entries are more out of order than regular adds.
This commit is contained in:
parent
4a7c39d9d8
commit
cd2e26b7fc
4
head.go
4
head.go
|
@ -178,7 +178,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
|||
series: newStripeSeries(),
|
||||
values: map[string]stringset{},
|
||||
symbols: map[string]struct{}{},
|
||||
postings: newMemPostings(),
|
||||
postings: newUnorderedMemPostings(),
|
||||
tombstones: newEmptyTombstoneReader(),
|
||||
}
|
||||
h.metrics = newHeadMetrics(h, r)
|
||||
|
@ -188,6 +188,8 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
|||
|
||||
// ReadWAL initializes the head by consuming the write ahead log.
|
||||
func (h *Head) ReadWAL() error {
|
||||
defer h.postings.ensureOrder()
|
||||
|
||||
r := h.wal.Reader()
|
||||
mint := h.MinTime()
|
||||
|
||||
|
|
60
postings.go
60
postings.go
|
@ -15,6 +15,7 @@ package tsdb
|
|||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -22,14 +23,30 @@ import (
|
|||
"github.com/prometheus/tsdb/labels"
|
||||
)
|
||||
|
||||
// memPostings holds postings list for series ID per label pair. They may be written
|
||||
// to out of order.
|
||||
// ensureOrder() must be called once before any reads are done. This allows for quick
|
||||
// unordered batch fills on startup.
|
||||
type memPostings struct {
|
||||
mtx sync.RWMutex
|
||||
m map[labels.Label][]uint64
|
||||
mtx sync.RWMutex
|
||||
m map[labels.Label][]uint64
|
||||
ordered bool
|
||||
}
|
||||
|
||||
// newMemPoistings returns a memPostings that's ready for reads and writes.
|
||||
func newMemPostings() *memPostings {
|
||||
return &memPostings{
|
||||
m: make(map[labels.Label][]uint64, 512),
|
||||
m: make(map[labels.Label][]uint64, 512),
|
||||
ordered: true,
|
||||
}
|
||||
}
|
||||
|
||||
// newUnorderedMemPostings returns a memPostings that is not safe to be read from
|
||||
// until ensureOrder was called once.
|
||||
func newUnorderedMemPostings() *memPostings {
|
||||
return &memPostings{
|
||||
m: make(map[labels.Label][]uint64, 512),
|
||||
ordered: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -47,6 +64,40 @@ func (p *memPostings) get(name, value string) Postings {
|
|||
|
||||
var allPostingsKey = labels.Label{}
|
||||
|
||||
// ensurePostings ensures that all postings lists are sorted. After it returns all further
|
||||
// calls to add and addFor will insert new IDs in a sorted manner.
|
||||
func (p *memPostings) ensureOrder() {
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
|
||||
if p.ordered {
|
||||
return
|
||||
}
|
||||
|
||||
n := runtime.GOMAXPROCS(0)
|
||||
workc := make(chan []uint64)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(n)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
go func() {
|
||||
for l := range workc {
|
||||
sort.Slice(l, func(i, j int) bool { return l[i] < l[j] })
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
for _, l := range p.m {
|
||||
workc <- l
|
||||
}
|
||||
close(workc)
|
||||
wg.Wait()
|
||||
|
||||
p.ordered = true
|
||||
}
|
||||
|
||||
// add adds a document to the index. The caller has to ensure that no
|
||||
// term argument appears twice.
|
||||
func (p *memPostings) add(id uint64, lset labels.Labels) {
|
||||
|
@ -64,6 +115,9 @@ func (p *memPostings) addFor(id uint64, l labels.Label) {
|
|||
list := append(p.m[l], id)
|
||||
p.m[l] = list
|
||||
|
||||
if !p.ordered {
|
||||
return
|
||||
}
|
||||
// There is no guarantee that no higher ID was inserted before as they may
|
||||
// be generated independently before adding them to postings.
|
||||
// We repair order violations on insert. The invariant is that the first n-1
|
||||
|
|
|
@ -15,9 +15,12 @@ package tsdb
|
|||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -30,6 +33,31 @@ func TestMemPostings_addFor(t *testing.T) {
|
|||
require.Equal(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey])
|
||||
}
|
||||
|
||||
func TestMemPostings_ensureOrder(t *testing.T) {
|
||||
p := newUnorderedMemPostings()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
l := make([]uint64, 100)
|
||||
for j := range l {
|
||||
l[j] = rand.Uint64()
|
||||
}
|
||||
v := fmt.Sprintf("%d", i)
|
||||
|
||||
p.m[labels.Label{"a", v}] = l
|
||||
}
|
||||
|
||||
p.ensureOrder()
|
||||
|
||||
for _, l := range p.m {
|
||||
ok := sort.SliceIsSorted(l, func(i, j int) bool {
|
||||
return l[i] < l[j]
|
||||
})
|
||||
if !ok {
|
||||
t.Fatalf("postings list %v is not sorted", l)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type mockPostings struct {
|
||||
next func() bool
|
||||
seek func(uint64) bool
|
||||
|
|
Loading…
Reference in a new issue