mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
MemPostings: keep a map of label values slices (#15426)
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
While investigating lock contention on `MemPostings`, we saw that lots of locking is happening in `LabelValues` and `PostingsForLabelsMatching`, both copying the label values slices while holding the mutex. This adds an extra map that holds an append-only label values slice for each one of the label names. Since the slice is append-only, it can be copied without holding the mutex. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
parent
ac92cf256e
commit
cd1f8ac129
|
@ -18,6 +18,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"maps"
|
||||||
"math"
|
"math"
|
||||||
"runtime"
|
"runtime"
|
||||||
"slices"
|
"slices"
|
||||||
|
@ -32,6 +33,8 @@ import (
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const exponentialSliceGrowthFactor = 2
|
||||||
|
|
||||||
var allPostingsKey = labels.Label{}
|
var allPostingsKey = labels.Label{}
|
||||||
|
|
||||||
// AllPostingsKey returns the label key that is used to store the postings list of all existing IDs.
|
// AllPostingsKey returns the label key that is used to store the postings list of all existing IDs.
|
||||||
|
@ -56,14 +59,32 @@ var ensureOrderBatchPool = sync.Pool{
|
||||||
// unordered batch fills on startup.
|
// unordered batch fills on startup.
|
||||||
type MemPostings struct {
|
type MemPostings struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
|
||||||
|
// m holds the postings lists for each label-value pair, indexed first by label name, and then by label value.
|
||||||
|
//
|
||||||
|
// mtx must be held when interacting with m (the appropriate one for reading or writing).
|
||||||
|
// It is safe to retain a reference to a postings list after releasing the lock.
|
||||||
|
//
|
||||||
|
// BUG: There's currently a data race in addFor, which might modify the tail of the postings list:
|
||||||
|
// https://github.com/prometheus/prometheus/issues/15317
|
||||||
m map[string]map[string][]storage.SeriesRef
|
m map[string]map[string][]storage.SeriesRef
|
||||||
|
|
||||||
|
// lvs holds the label values for each label name.
|
||||||
|
// lvs[name] is essentially an unsorted append-only list of all keys in m[name]
|
||||||
|
// mtx must be held when interacting with lvs.
|
||||||
|
// Since it's append-only, it is safe to the label values slice after releasing the lock.
|
||||||
|
lvs map[string][]string
|
||||||
|
|
||||||
ordered bool
|
ordered bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const defaultLabelNamesMapSize = 512
|
||||||
|
|
||||||
// NewMemPostings returns a memPostings that's ready for reads and writes.
|
// NewMemPostings returns a memPostings that's ready for reads and writes.
|
||||||
func NewMemPostings() *MemPostings {
|
func NewMemPostings() *MemPostings {
|
||||||
return &MemPostings{
|
return &MemPostings{
|
||||||
m: make(map[string]map[string][]storage.SeriesRef, 512),
|
m: make(map[string]map[string][]storage.SeriesRef, defaultLabelNamesMapSize),
|
||||||
|
lvs: make(map[string][]string, defaultLabelNamesMapSize),
|
||||||
ordered: true,
|
ordered: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,7 +93,8 @@ func NewMemPostings() *MemPostings {
|
||||||
// until EnsureOrder() was called once.
|
// until EnsureOrder() was called once.
|
||||||
func NewUnorderedMemPostings() *MemPostings {
|
func NewUnorderedMemPostings() *MemPostings {
|
||||||
return &MemPostings{
|
return &MemPostings{
|
||||||
m: make(map[string]map[string][]storage.SeriesRef, 512),
|
m: make(map[string]map[string][]storage.SeriesRef, defaultLabelNamesMapSize),
|
||||||
|
lvs: make(map[string][]string, defaultLabelNamesMapSize),
|
||||||
ordered: false,
|
ordered: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,16 +102,19 @@ func NewUnorderedMemPostings() *MemPostings {
|
||||||
// Symbols returns an iterator over all unique name and value strings, in order.
|
// Symbols returns an iterator over all unique name and value strings, in order.
|
||||||
func (p *MemPostings) Symbols() StringIter {
|
func (p *MemPostings) Symbols() StringIter {
|
||||||
p.mtx.RLock()
|
p.mtx.RLock()
|
||||||
|
// Make a quick clone of the map to avoid holding the lock while iterating.
|
||||||
|
// It's safe to use the values of the map after releasing the lock, as they're append-only slices.
|
||||||
|
lvs := maps.Clone(p.lvs)
|
||||||
|
p.mtx.RUnlock()
|
||||||
|
|
||||||
// Add all the strings to a map to de-duplicate.
|
// Add all the strings to a map to de-duplicate.
|
||||||
symbols := make(map[string]struct{}, 512)
|
symbols := make(map[string]struct{}, defaultLabelNamesMapSize)
|
||||||
for n, e := range p.m {
|
for n, labelValues := range lvs {
|
||||||
symbols[n] = struct{}{}
|
symbols[n] = struct{}{}
|
||||||
for v := range e {
|
for _, v := range labelValues {
|
||||||
symbols[v] = struct{}{}
|
symbols[v] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p.mtx.RUnlock()
|
|
||||||
|
|
||||||
res := make([]string, 0, len(symbols))
|
res := make([]string, 0, len(symbols))
|
||||||
for k := range symbols {
|
for k := range symbols {
|
||||||
|
@ -145,13 +170,14 @@ func (p *MemPostings) LabelNames() []string {
|
||||||
// LabelValues returns label values for the given name.
|
// LabelValues returns label values for the given name.
|
||||||
func (p *MemPostings) LabelValues(_ context.Context, name string) []string {
|
func (p *MemPostings) LabelValues(_ context.Context, name string) []string {
|
||||||
p.mtx.RLock()
|
p.mtx.RLock()
|
||||||
defer p.mtx.RUnlock()
|
values := p.lvs[name]
|
||||||
|
p.mtx.RUnlock()
|
||||||
|
|
||||||
values := make([]string, 0, len(p.m[name]))
|
// The slice from p.lvs[name] is shared between all readers, and it is append-only.
|
||||||
for v := range p.m[name] {
|
// Since it's shared, we need to make a copy of it before returning it to make
|
||||||
values = append(values, v)
|
// sure that no caller modifies the original one by sorting it or filtering it.
|
||||||
}
|
// Since it's append-only, we can do this while not holding the mutex anymore.
|
||||||
return values
|
return slices.Clone(values)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PostingsStats contains cardinality based statistics for postings.
|
// PostingsStats contains cardinality based statistics for postings.
|
||||||
|
@ -294,6 +320,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
|
||||||
p.mtx.Lock()
|
p.mtx.Lock()
|
||||||
defer p.mtx.Unlock()
|
defer p.mtx.Unlock()
|
||||||
|
|
||||||
|
affectedLabelNames := map[string]struct{}{}
|
||||||
process := func(l labels.Label) {
|
process := func(l labels.Label) {
|
||||||
orig := p.m[l.Name][l.Value]
|
orig := p.m[l.Name][l.Value]
|
||||||
repl := make([]storage.SeriesRef, 0, len(orig))
|
repl := make([]storage.SeriesRef, 0, len(orig))
|
||||||
|
@ -306,10 +333,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
|
||||||
p.m[l.Name][l.Value] = repl
|
p.m[l.Name][l.Value] = repl
|
||||||
} else {
|
} else {
|
||||||
delete(p.m[l.Name], l.Value)
|
delete(p.m[l.Name], l.Value)
|
||||||
// Delete the key if we removed all values.
|
affectedLabelNames[l.Name] = struct{}{}
|
||||||
if len(p.m[l.Name]) == 0 {
|
|
||||||
delete(p.m, l.Name)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -323,6 +347,40 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
|
||||||
// Note that a read query will most likely want to read multiple postings lists, say 5, 10 or 20 (depending on the number of matchers)
|
// Note that a read query will most likely want to read multiple postings lists, say 5, 10 or 20 (depending on the number of matchers)
|
||||||
// And that read query will most likely evaluate only one of those matchers before we unpause here, so we want to pause often.
|
// And that read query will most likely evaluate only one of those matchers before we unpause here, so we want to pause often.
|
||||||
if i%512 == 0 {
|
if i%512 == 0 {
|
||||||
|
p.unlockWaitAndLockAgain()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
process(allPostingsKey)
|
||||||
|
|
||||||
|
// Now we need to update the label values slices.
|
||||||
|
i = 0
|
||||||
|
for name := range affectedLabelNames {
|
||||||
|
i++
|
||||||
|
// From time to time we want some readers to go through and read their postings.
|
||||||
|
if i%512 == 0 {
|
||||||
|
p.unlockWaitAndLockAgain()
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(p.m[name]) == 0 {
|
||||||
|
// Delete the label name key if we deleted all values.
|
||||||
|
delete(p.m, name)
|
||||||
|
delete(p.lvs, name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the new slice with enough room to grow without reallocating.
|
||||||
|
// We have deleted values here, so there's definitely some churn, so be prepared for it.
|
||||||
|
lvs := make([]string, 0, exponentialSliceGrowthFactor*len(p.m[name]))
|
||||||
|
for v := range p.m[name] {
|
||||||
|
lvs = append(lvs, v)
|
||||||
|
}
|
||||||
|
p.lvs[name] = lvs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// unlockWaitAndLockAgain will unlock an already locked p.mtx.Lock() and then wait a little bit before locking it again,
|
||||||
|
// letting the RLock()-waiting goroutines to get the lock.
|
||||||
|
func (p *MemPostings) unlockWaitAndLockAgain() {
|
||||||
p.mtx.Unlock()
|
p.mtx.Unlock()
|
||||||
// While it's tempting to just do a `time.Sleep(time.Millisecond)` here,
|
// While it's tempting to just do a `time.Sleep(time.Millisecond)` here,
|
||||||
// it wouldn't ensure use that readers actually were able to get the read lock,
|
// it wouldn't ensure use that readers actually were able to get the read lock,
|
||||||
|
@ -333,12 +391,8 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
|
||||||
// Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock.
|
// Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock.
|
||||||
p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
|
p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
|
||||||
// Now we can wait a little bit just to increase the chance of a reader getting the lock.
|
// Now we can wait a little bit just to increase the chance of a reader getting the lock.
|
||||||
// If we were deleting 100M series here, pausing every 512 with 1ms sleeps would be an extra of 200s, which is negligible.
|
|
||||||
time.Sleep(time.Millisecond)
|
time.Sleep(time.Millisecond)
|
||||||
p.mtx.Lock()
|
p.mtx.Lock()
|
||||||
}
|
|
||||||
}
|
|
||||||
process(allPostingsKey)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iter calls f for each postings list. It aborts if f returns an error and returns it.
|
// Iter calls f for each postings list. It aborts if f returns an error and returns it.
|
||||||
|
@ -370,7 +424,7 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) {
|
||||||
|
|
||||||
func appendWithExponentialGrowth[T any](a []T, v T) []T {
|
func appendWithExponentialGrowth[T any](a []T, v T) []T {
|
||||||
if cap(a) < len(a)+1 {
|
if cap(a) < len(a)+1 {
|
||||||
newList := make([]T, len(a), len(a)*2+1)
|
newList := make([]T, len(a), len(a)*exponentialSliceGrowthFactor+1)
|
||||||
copy(newList, a)
|
copy(newList, a)
|
||||||
a = newList
|
a = newList
|
||||||
}
|
}
|
||||||
|
@ -383,7 +437,11 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
|
||||||
nm = map[string][]storage.SeriesRef{}
|
nm = map[string][]storage.SeriesRef{}
|
||||||
p.m[l.Name] = nm
|
p.m[l.Name] = nm
|
||||||
}
|
}
|
||||||
list := appendWithExponentialGrowth(nm[l.Value], id)
|
vm, ok := nm[l.Value]
|
||||||
|
if !ok {
|
||||||
|
p.lvs[l.Name] = appendWithExponentialGrowth(p.lvs[l.Name], l.Value)
|
||||||
|
}
|
||||||
|
list := appendWithExponentialGrowth(vm, id)
|
||||||
nm[l.Value] = list
|
nm[l.Value] = list
|
||||||
|
|
||||||
if !p.ordered {
|
if !p.ordered {
|
||||||
|
@ -402,25 +460,27 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
|
func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
|
||||||
// We'll copy the values into a slice and then match over that,
|
// We'll take the label values slice and then match over that,
|
||||||
// this way we don't need to hold the mutex while we're matching,
|
// this way we don't need to hold the mutex while we're matching,
|
||||||
// which can be slow (seconds) if the match function is a huge regex.
|
// which can be slow (seconds) if the match function is a huge regex.
|
||||||
// Holding this lock prevents new series from being added (slows down the write path)
|
// Holding this lock prevents new series from being added (slows down the write path)
|
||||||
// and blocks the compaction process.
|
// and blocks the compaction process.
|
||||||
vals := p.labelValues(name)
|
//
|
||||||
for i, count := 0, 1; i < len(vals); count++ {
|
// We just need to make sure we don't modify the slice we took,
|
||||||
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
|
// so we'll append matching values to a different one.
|
||||||
|
p.mtx.RLock()
|
||||||
|
readOnlyLabelValues := p.lvs[name]
|
||||||
|
p.mtx.RUnlock()
|
||||||
|
|
||||||
|
vals := make([]string, 0, len(readOnlyLabelValues))
|
||||||
|
for i, v := range readOnlyLabelValues {
|
||||||
|
if i%checkContextEveryNIterations == 0 && ctx.Err() != nil {
|
||||||
return ErrPostings(ctx.Err())
|
return ErrPostings(ctx.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
if match(vals[i]) {
|
if match(v) {
|
||||||
i++
|
vals = append(vals, v)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Didn't match, bring the last value to this position, make the slice shorter and check again.
|
|
||||||
// The order of the slice doesn't matter as it comes from a map iteration.
|
|
||||||
vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If none matched (or this label had no values), no need to grab the lock again.
|
// If none matched (or this label had no values), no need to grab the lock again.
|
||||||
|
@ -469,27 +529,6 @@ func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string
|
||||||
return Merge(ctx, its...)
|
return Merge(ctx, its...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// labelValues returns a slice of label values for the given label name.
|
|
||||||
// It will take the read lock.
|
|
||||||
func (p *MemPostings) labelValues(name string) []string {
|
|
||||||
p.mtx.RLock()
|
|
||||||
defer p.mtx.RUnlock()
|
|
||||||
|
|
||||||
e := p.m[name]
|
|
||||||
if len(e) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
vals := make([]string, 0, len(e))
|
|
||||||
for v, srs := range e {
|
|
||||||
if len(srs) > 0 {
|
|
||||||
vals = append(vals, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return vals
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExpandPostings returns the postings expanded as a slice.
|
// ExpandPostings returns the postings expanded as a slice.
|
||||||
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
|
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
|
||||||
for p.Next() {
|
for p.Next() {
|
||||||
|
|
Loading…
Reference in a new issue