diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index a0e7d035ad..ea32ba5632 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -18,6 +18,7 @@ import ( "context" "encoding/binary" "fmt" + "maps" "math" "runtime" "slices" @@ -32,6 +33,8 @@ import ( "github.com/prometheus/prometheus/storage" ) +const exponentialSliceGrowthFactor = 2 + var allPostingsKey = labels.Label{} // AllPostingsKey returns the label key that is used to store the postings list of all existing IDs. @@ -55,15 +58,33 @@ var ensureOrderBatchPool = sync.Pool{ // EnsureOrder() must be called once before any reads are done. This allows for quick // unordered batch fills on startup. type MemPostings struct { - mtx sync.RWMutex - m map[string]map[string][]storage.SeriesRef + mtx sync.RWMutex + + // m holds the postings lists for each label-value pair, indexed first by label name, and then by label value. + // + // mtx must be held when interacting with m (the appropriate one for reading or writing). + // It is safe to retain a reference to a postings list after releasing the lock. + // + // BUG: There's currently a data race in addFor, which might modify the tail of the postings list: + // https://github.com/prometheus/prometheus/issues/15317 + m map[string]map[string][]storage.SeriesRef + + // lvs holds the label values for each label name. + // lvs[name] is essentially an unsorted append-only list of all keys in m[name] + // mtx must be held when interacting with lvs. + // Since it's append-only, it is safe to the label values slice after releasing the lock. + lvs map[string][]string + ordered bool } +const defaultLabelNamesMapSize = 512 + // NewMemPostings returns a memPostings that's ready for reads and writes. func NewMemPostings() *MemPostings { return &MemPostings{ - m: make(map[string]map[string][]storage.SeriesRef, 512), + m: make(map[string]map[string][]storage.SeriesRef, defaultLabelNamesMapSize), + lvs: make(map[string][]string, defaultLabelNamesMapSize), ordered: true, } } @@ -72,7 +93,8 @@ func NewMemPostings() *MemPostings { // until EnsureOrder() was called once. func NewUnorderedMemPostings() *MemPostings { return &MemPostings{ - m: make(map[string]map[string][]storage.SeriesRef, 512), + m: make(map[string]map[string][]storage.SeriesRef, defaultLabelNamesMapSize), + lvs: make(map[string][]string, defaultLabelNamesMapSize), ordered: false, } } @@ -80,16 +102,19 @@ func NewUnorderedMemPostings() *MemPostings { // Symbols returns an iterator over all unique name and value strings, in order. func (p *MemPostings) Symbols() StringIter { p.mtx.RLock() + // Make a quick clone of the map to avoid holding the lock while iterating. + // It's safe to use the values of the map after releasing the lock, as they're append-only slices. + lvs := maps.Clone(p.lvs) + p.mtx.RUnlock() // Add all the strings to a map to de-duplicate. - symbols := make(map[string]struct{}, 512) - for n, e := range p.m { + symbols := make(map[string]struct{}, defaultLabelNamesMapSize) + for n, labelValues := range lvs { symbols[n] = struct{}{} - for v := range e { + for _, v := range labelValues { symbols[v] = struct{}{} } } - p.mtx.RUnlock() res := make([]string, 0, len(symbols)) for k := range symbols { @@ -145,13 +170,14 @@ func (p *MemPostings) LabelNames() []string { // LabelValues returns label values for the given name. func (p *MemPostings) LabelValues(_ context.Context, name string) []string { p.mtx.RLock() - defer p.mtx.RUnlock() + values := p.lvs[name] + p.mtx.RUnlock() - values := make([]string, 0, len(p.m[name])) - for v := range p.m[name] { - values = append(values, v) - } - return values + // The slice from p.lvs[name] is shared between all readers, and it is append-only. + // Since it's shared, we need to make a copy of it before returning it to make + // sure that no caller modifies the original one by sorting it or filtering it. + // Since it's append-only, we can do this while not holding the mutex anymore. + return slices.Clone(values) } // PostingsStats contains cardinality based statistics for postings. @@ -294,6 +320,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.mtx.Lock() defer p.mtx.Unlock() + affectedLabelNames := map[string]struct{}{} process := func(l labels.Label) { orig := p.m[l.Name][l.Value] repl := make([]storage.SeriesRef, 0, len(orig)) @@ -306,10 +333,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.m[l.Name][l.Value] = repl } else { delete(p.m[l.Name], l.Value) - // Delete the key if we removed all values. - if len(p.m[l.Name]) == 0 { - delete(p.m, l.Name) - } + affectedLabelNames[l.Name] = struct{}{} } } @@ -323,22 +347,52 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma // Note that a read query will most likely want to read multiple postings lists, say 5, 10 or 20 (depending on the number of matchers) // And that read query will most likely evaluate only one of those matchers before we unpause here, so we want to pause often. if i%512 == 0 { - p.mtx.Unlock() - // While it's tempting to just do a `time.Sleep(time.Millisecond)` here, - // it wouldn't ensure use that readers actually were able to get the read lock, - // because if there are writes waiting on same mutex, readers won't be able to get it. - // So we just grab one RLock ourselves. - p.mtx.RLock() - // We shouldn't wait here, because we would be blocking a potential write for no reason. - // Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock. - p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section. - // Now we can wait a little bit just to increase the chance of a reader getting the lock. - // If we were deleting 100M series here, pausing every 512 with 1ms sleeps would be an extra of 200s, which is negligible. - time.Sleep(time.Millisecond) - p.mtx.Lock() + p.unlockWaitAndLockAgain() } } process(allPostingsKey) + + // Now we need to update the label values slices. + i = 0 + for name := range affectedLabelNames { + i++ + // From time to time we want some readers to go through and read their postings. + if i%512 == 0 { + p.unlockWaitAndLockAgain() + } + + if len(p.m[name]) == 0 { + // Delete the label name key if we deleted all values. + delete(p.m, name) + delete(p.lvs, name) + continue + } + + // Create the new slice with enough room to grow without reallocating. + // We have deleted values here, so there's definitely some churn, so be prepared for it. + lvs := make([]string, 0, exponentialSliceGrowthFactor*len(p.m[name])) + for v := range p.m[name] { + lvs = append(lvs, v) + } + p.lvs[name] = lvs + } +} + +// unlockWaitAndLockAgain will unlock an already locked p.mtx.Lock() and then wait a little bit before locking it again, +// letting the RLock()-waiting goroutines to get the lock. +func (p *MemPostings) unlockWaitAndLockAgain() { + p.mtx.Unlock() + // While it's tempting to just do a `time.Sleep(time.Millisecond)` here, + // it wouldn't ensure use that readers actually were able to get the read lock, + // because if there are writes waiting on same mutex, readers won't be able to get it. + // So we just grab one RLock ourselves. + p.mtx.RLock() + // We shouldn't wait here, because we would be blocking a potential write for no reason. + // Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock. + p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section. + // Now we can wait a little bit just to increase the chance of a reader getting the lock. + time.Sleep(time.Millisecond) + p.mtx.Lock() } // Iter calls f for each postings list. It aborts if f returns an error and returns it. @@ -370,7 +424,7 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) { func appendWithExponentialGrowth[T any](a []T, v T) []T { if cap(a) < len(a)+1 { - newList := make([]T, len(a), len(a)*2+1) + newList := make([]T, len(a), len(a)*exponentialSliceGrowthFactor+1) copy(newList, a) a = newList } @@ -383,7 +437,11 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { nm = map[string][]storage.SeriesRef{} p.m[l.Name] = nm } - list := appendWithExponentialGrowth(nm[l.Value], id) + vm, ok := nm[l.Value] + if !ok { + p.lvs[l.Name] = appendWithExponentialGrowth(p.lvs[l.Name], l.Value) + } + list := appendWithExponentialGrowth(vm, id) nm[l.Value] = list if !p.ordered { @@ -402,25 +460,27 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { } func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { - // We'll copy the values into a slice and then match over that, + // We'll take the label values slice and then match over that, // this way we don't need to hold the mutex while we're matching, // which can be slow (seconds) if the match function is a huge regex. // Holding this lock prevents new series from being added (slows down the write path) // and blocks the compaction process. - vals := p.labelValues(name) - for i, count := 0, 1; i < len(vals); count++ { - if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { + // + // We just need to make sure we don't modify the slice we took, + // so we'll append matching values to a different one. + p.mtx.RLock() + readOnlyLabelValues := p.lvs[name] + p.mtx.RUnlock() + + vals := make([]string, 0, len(readOnlyLabelValues)) + for i, v := range readOnlyLabelValues { + if i%checkContextEveryNIterations == 0 && ctx.Err() != nil { return ErrPostings(ctx.Err()) } - if match(vals[i]) { - i++ - continue + if match(v) { + vals = append(vals, v) } - - // Didn't match, bring the last value to this position, make the slice shorter and check again. - // The order of the slice doesn't matter as it comes from a map iteration. - vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1] } // If none matched (or this label had no values), no need to grab the lock again. @@ -469,27 +529,6 @@ func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string return Merge(ctx, its...) } -// labelValues returns a slice of label values for the given label name. -// It will take the read lock. -func (p *MemPostings) labelValues(name string) []string { - p.mtx.RLock() - defer p.mtx.RUnlock() - - e := p.m[name] - if len(e) == 0 { - return nil - } - - vals := make([]string, 0, len(e)) - for v, srs := range e { - if len(srs) > 0 { - vals = append(vals, v) - } - } - - return vals -} - // ExpandPostings returns the postings expanded as a slice. func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) { for p.Next() {