mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-02 08:31:11 -08:00
Making the number of CPU cores used for sorting postings lists editable (#12247)
Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
This commit is contained in:
parent
bb217dded8
commit
b028112331
|
@ -574,7 +574,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
|
|||
func (h *Head) Init(minValidTime int64) error {
|
||||
h.minValidTime.Store(minValidTime)
|
||||
defer func() {
|
||||
h.postings.EnsureOrder()
|
||||
h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
|
||||
}()
|
||||
defer h.gc() // After loading the wal remove the obsolete data from the head.
|
||||
defer func() {
|
||||
|
|
|
@ -224,7 +224,10 @@ func (p *MemPostings) All() Postings {
|
|||
|
||||
// EnsureOrder ensures that all postings lists are sorted. After it returns all further
|
||||
// calls to add and addFor will insert new IDs in a sorted manner.
|
||||
func (p *MemPostings) EnsureOrder() {
|
||||
// Parameter numberOfConcurrentProcesses is used to specify the maximal number of
|
||||
// CPU cores used for this operation. If it is <= 0, GOMAXPROCS is used.
|
||||
// GOMAXPROCS was the default before introducing this parameter.
|
||||
func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
|
||||
|
@ -232,13 +235,16 @@ func (p *MemPostings) EnsureOrder() {
|
|||
return
|
||||
}
|
||||
|
||||
n := runtime.GOMAXPROCS(0)
|
||||
concurrency := numberOfConcurrentProcesses
|
||||
if concurrency <= 0 {
|
||||
concurrency = runtime.GOMAXPROCS(0)
|
||||
}
|
||||
workc := make(chan *[][]storage.SeriesRef)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(n)
|
||||
wg.Add(concurrency)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func() {
|
||||
for job := range workc {
|
||||
for _, l := range *job {
|
||||
|
|
|
@ -54,7 +54,7 @@ func TestMemPostings_ensureOrder(t *testing.T) {
|
|||
p.m["a"][v] = l
|
||||
}
|
||||
|
||||
p.EnsureOrder()
|
||||
p.EnsureOrder(0)
|
||||
|
||||
for _, e := range p.m {
|
||||
for _, l := range e {
|
||||
|
@ -114,7 +114,7 @@ func BenchmarkMemPostings_ensureOrder(b *testing.B) {
|
|||
b.ResetTimer()
|
||||
|
||||
for n := 0; n < b.N; n++ {
|
||||
p.EnsureOrder()
|
||||
p.EnsureOrder(0)
|
||||
p.ordered = false
|
||||
}
|
||||
})
|
||||
|
|
Loading…
Reference in a new issue