From b02811233170bc40fd43fc8a65313f0b64ee6191 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90urica=20Yuri=20Nikoli=C4=87?= Date: Tue, 18 Apr 2023 12:13:05 +0200 Subject: [PATCH] Making the number of CPU cores used for sorting postings lists editable (#12247) Signed-off-by: Yuri Nikolic --- tsdb/head.go | 2 +- tsdb/index/postings.go | 14 ++++++++++---- tsdb/index/postings_test.go | 4 ++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 4696884f2..1d65d3229 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -574,7 +574,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second func (h *Head) Init(minValidTime int64) error { h.minValidTime.Store(minValidTime) defer func() { - h.postings.EnsureOrder() + h.postings.EnsureOrder(h.opts.WALReplayConcurrency) }() defer h.gc() // After loading the wal remove the obsolete data from the head. defer func() { diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index b55d70df0..15df374fc 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -224,7 +224,10 @@ func (p *MemPostings) All() Postings { // EnsureOrder ensures that all postings lists are sorted. After it returns all further // calls to add and addFor will insert new IDs in a sorted manner. -func (p *MemPostings) EnsureOrder() { +// Parameter numberOfConcurrentProcesses is used to specify the maximal number of +// CPU cores used for this operation. If it is <= 0, GOMAXPROCS is used. +// GOMAXPROCS was the default before introducing this parameter. +func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { p.mtx.Lock() defer p.mtx.Unlock() @@ -232,13 +235,16 @@ func (p *MemPostings) EnsureOrder() { return } - n := runtime.GOMAXPROCS(0) + concurrency := numberOfConcurrentProcesses + if concurrency <= 0 { + concurrency = runtime.GOMAXPROCS(0) + } workc := make(chan *[][]storage.SeriesRef) var wg sync.WaitGroup - wg.Add(n) + wg.Add(concurrency) - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { go func() { for job := range workc { for _, l := range *job { diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 1b1ecd3c3..a34f3c12d 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -54,7 +54,7 @@ func TestMemPostings_ensureOrder(t *testing.T) { p.m["a"][v] = l } - p.EnsureOrder() + p.EnsureOrder(0) for _, e := range p.m { for _, l := range e { @@ -114,7 +114,7 @@ func BenchmarkMemPostings_ensureOrder(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - p.EnsureOrder() + p.EnsureOrder(0) p.ordered = false } })