mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 06:04:05 -08:00
Optimized MemPostings.EnsureOrder() (#9673)
* Optimizes MemPostings.EnsureOrder() Signed-off-by: Marco Pracucci <marco@pracucci.com> * Ignore linter warning Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
parent
4bdaea7663
commit
309b094b92
|
@ -30,9 +30,19 @@ func AllPostingsKey() (name, value string) {
|
||||||
return allPostingsKey.Name, allPostingsKey.Value
|
return allPostingsKey.Name, allPostingsKey.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensureOrderBatchSize is the max number of postings passed to a worker in a single batch in MemPostings.EnsureOrder().
|
||||||
|
const ensureOrderBatchSize = 1024
|
||||||
|
|
||||||
|
// ensureOrderBatchPool is a pool used to recycle batches passed to workers in MemPostings.EnsureOrder().
|
||||||
|
var ensureOrderBatchPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return make([][]uint64, 0, ensureOrderBatchSize)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// MemPostings holds postings list for series ID per label pair. They may be written
|
// MemPostings holds postings list for series ID per label pair. They may be written
|
||||||
// to out of order.
|
// to out of order.
|
||||||
// ensureOrder() must be called once before any reads are done. This allows for quick
|
// EnsureOrder() must be called once before any reads are done. This allows for quick
|
||||||
// unordered batch fills on startup.
|
// unordered batch fills on startup.
|
||||||
type MemPostings struct {
|
type MemPostings struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
@ -49,7 +59,7 @@ func NewMemPostings() *MemPostings {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewUnorderedMemPostings returns a memPostings that is not safe to be read from
|
// NewUnorderedMemPostings returns a memPostings that is not safe to be read from
|
||||||
// until ensureOrder was called once.
|
// until EnsureOrder() was called once.
|
||||||
func NewUnorderedMemPostings() *MemPostings {
|
func NewUnorderedMemPostings() *MemPostings {
|
||||||
return &MemPostings{
|
return &MemPostings{
|
||||||
m: make(map[string]map[string][]uint64, 512),
|
m: make(map[string]map[string][]uint64, 512),
|
||||||
|
@ -218,25 +228,42 @@ func (p *MemPostings) EnsureOrder() {
|
||||||
}
|
}
|
||||||
|
|
||||||
n := runtime.GOMAXPROCS(0)
|
n := runtime.GOMAXPROCS(0)
|
||||||
workc := make(chan []uint64)
|
workc := make(chan [][]uint64)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(n)
|
wg.Add(n)
|
||||||
|
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
for l := range workc {
|
for job := range workc {
|
||||||
sort.Slice(l, func(a, b int) bool { return l[a] < l[b] })
|
for _, l := range job {
|
||||||
|
sort.Sort(uint64Slice(l))
|
||||||
|
}
|
||||||
|
|
||||||
|
job = job[:0]
|
||||||
|
ensureOrderBatchPool.Put(job) //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nextJob := ensureOrderBatchPool.Get().([][]uint64)
|
||||||
for _, e := range p.m {
|
for _, e := range p.m {
|
||||||
for _, l := range e {
|
for _, l := range e {
|
||||||
workc <- l
|
nextJob = append(nextJob, l)
|
||||||
|
|
||||||
|
if len(nextJob) >= ensureOrderBatchSize {
|
||||||
|
workc <- nextJob
|
||||||
|
nextJob = ensureOrderBatchPool.Get().([][]uint64)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the last job was partially filled, we need to push it to workers too.
|
||||||
|
if len(nextJob) > 0 {
|
||||||
|
workc <- nextJob
|
||||||
|
}
|
||||||
|
|
||||||
close(workc)
|
close(workc)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
@ -796,3 +823,10 @@ func (it *bigEndianPostings) Seek(x uint64) bool {
|
||||||
func (it *bigEndianPostings) Err() error {
|
func (it *bigEndianPostings) Err() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// uint64Slice attaches the methods of sort.Interface to []uint64, sorting in increasing order.
|
||||||
|
type uint64Slice []uint64
|
||||||
|
|
||||||
|
func (x uint64Slice) Len() int { return len(x) }
|
||||||
|
func (x uint64Slice) Less(i, j int) bool { return x[i] < x[j] }
|
||||||
|
func (x uint64Slice) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -63,6 +64,59 @@ func TestMemPostings_ensureOrder(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkMemPostings_ensureOrder(b *testing.B) {
|
||||||
|
tests := map[string]struct {
|
||||||
|
numLabels int
|
||||||
|
numValuesPerLabel int
|
||||||
|
numRefsPerValue int
|
||||||
|
}{
|
||||||
|
"many values per label": {
|
||||||
|
numLabels: 100,
|
||||||
|
numValuesPerLabel: 10000,
|
||||||
|
numRefsPerValue: 100,
|
||||||
|
},
|
||||||
|
"few values per label": {
|
||||||
|
numLabels: 1000000,
|
||||||
|
numValuesPerLabel: 1,
|
||||||
|
numRefsPerValue: 100,
|
||||||
|
},
|
||||||
|
"few refs per label value": {
|
||||||
|
numLabels: 1000,
|
||||||
|
numValuesPerLabel: 1000,
|
||||||
|
numRefsPerValue: 10,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for testName, testData := range tests {
|
||||||
|
b.Run(testName, func(b *testing.B) {
|
||||||
|
p := NewUnorderedMemPostings()
|
||||||
|
|
||||||
|
// Generate postings.
|
||||||
|
for l := 0; l < testData.numLabels; l++ {
|
||||||
|
labelName := strconv.Itoa(l)
|
||||||
|
p.m[labelName] = map[string][]uint64{}
|
||||||
|
|
||||||
|
for v := 0; v < testData.numValuesPerLabel; v++ {
|
||||||
|
refs := make([]uint64, testData.numRefsPerValue)
|
||||||
|
for j := range refs {
|
||||||
|
refs[j] = rand.Uint64()
|
||||||
|
}
|
||||||
|
|
||||||
|
labelValue := strconv.Itoa(v)
|
||||||
|
p.m[labelName][labelValue] = refs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
p.EnsureOrder()
|
||||||
|
p.ordered = false
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestIntersect(t *testing.T) {
|
func TestIntersect(t *testing.T) {
|
||||||
a := newListPostings(1, 2, 3)
|
a := newListPostings(1, 2, 3)
|
||||||
b := newListPostings(2, 3, 4)
|
b := newListPostings(2, 3, 4)
|
||||||
|
|
Loading…
Reference in a new issue