Making the number of CPU cores used for sorting postings lists editable (#12247)

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
This commit is contained in:
Đurica Yuri Nikolić 2023-04-18 12:13:05 +02:00 committed by GitHub
parent bb217dded8
commit b028112331
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 7 deletions

View file

@ -574,7 +574,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
func (h *Head) Init(minValidTime int64) error { func (h *Head) Init(minValidTime int64) error {
h.minValidTime.Store(minValidTime) h.minValidTime.Store(minValidTime)
defer func() { defer func() {
h.postings.EnsureOrder() h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
}() }()
defer h.gc() // After loading the wal remove the obsolete data from the head. defer h.gc() // After loading the wal remove the obsolete data from the head.
defer func() { defer func() {

View file

@ -224,7 +224,10 @@ func (p *MemPostings) All() Postings {
// EnsureOrder ensures that all postings lists are sorted. After it returns all further // EnsureOrder ensures that all postings lists are sorted. After it returns all further
// calls to add and addFor will insert new IDs in a sorted manner. // calls to add and addFor will insert new IDs in a sorted manner.
func (p *MemPostings) EnsureOrder() { // Parameter numberOfConcurrentProcesses is used to specify the maximal number of
// CPU cores used for this operation. If it is <= 0, GOMAXPROCS is used.
// GOMAXPROCS was the default before introducing this parameter.
func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {
p.mtx.Lock() p.mtx.Lock()
defer p.mtx.Unlock() defer p.mtx.Unlock()
@ -232,13 +235,16 @@ func (p *MemPostings) EnsureOrder() {
return return
} }
n := runtime.GOMAXPROCS(0) concurrency := numberOfConcurrentProcesses
if concurrency <= 0 {
concurrency = runtime.GOMAXPROCS(0)
}
workc := make(chan *[][]storage.SeriesRef) workc := make(chan *[][]storage.SeriesRef)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(n) wg.Add(concurrency)
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
go func() { go func() {
for job := range workc { for job := range workc {
for _, l := range *job { for _, l := range *job {

View file

@ -54,7 +54,7 @@ func TestMemPostings_ensureOrder(t *testing.T) {
p.m["a"][v] = l p.m["a"][v] = l
} }
p.EnsureOrder() p.EnsureOrder(0)
for _, e := range p.m { for _, e := range p.m {
for _, l := range e { for _, l := range e {
@ -114,7 +114,7 @@ func BenchmarkMemPostings_ensureOrder(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
p.EnsureOrder() p.EnsureOrder(0)
p.ordered = false p.ordered = false
} }
}) })