mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
Remove symbols map from TSDB head (#9301)
This saves memory, effort and locking. Since every symbol is also added to postings, `Symbols()` can be implemented there instead. This now has to build a map for deduplication, but `Symbols()` is only called for compaction, and `gc()` used to rebuild the symbols map after every compaction so not an additional cost. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
92a3eeac55
commit
87d909df4a
28
tsdb/head.go
28
tsdb/head.go
|
@ -79,9 +79,6 @@ type Head struct {
|
||||||
// All series addressable by their ID or hash.
|
// All series addressable by their ID or hash.
|
||||||
series *stripeSeries
|
series *stripeSeries
|
||||||
|
|
||||||
symMtx sync.RWMutex
|
|
||||||
symbols map[string]struct{}
|
|
||||||
|
|
||||||
deletedMtx sync.Mutex
|
deletedMtx sync.Mutex
|
||||||
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.
|
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.
|
||||||
|
|
||||||
|
@ -223,7 +220,6 @@ func (h *Head) resetInMemoryState() error {
|
||||||
h.exemplarMetrics = em
|
h.exemplarMetrics = em
|
||||||
h.exemplars = es
|
h.exemplars = es
|
||||||
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
|
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
|
||||||
h.symbols = map[string]struct{}{}
|
|
||||||
h.postings = index.NewUnorderedMemPostings()
|
h.postings = index.NewUnorderedMemPostings()
|
||||||
h.tombstones = tombstones.NewMemTombstones()
|
h.tombstones = tombstones.NewMemTombstones()
|
||||||
h.iso = newIsolation()
|
h.iso = newIsolation()
|
||||||
|
@ -1120,22 +1116,6 @@ func (h *Head) gc() int64 {
|
||||||
h.deletedMtx.Unlock()
|
h.deletedMtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rebuild symbols and label value indices from what is left in the postings terms.
|
|
||||||
// symMtx ensures that append of symbols and postings is disabled for rebuild time.
|
|
||||||
h.symMtx.Lock()
|
|
||||||
defer h.symMtx.Unlock()
|
|
||||||
|
|
||||||
symbols := make(map[string]struct{}, len(h.symbols))
|
|
||||||
if err := h.postings.Iter(func(l labels.Label, _ index.Postings) error {
|
|
||||||
symbols[l.Name] = struct{}{}
|
|
||||||
symbols[l.Value] = struct{}{}
|
|
||||||
return nil
|
|
||||||
}); err != nil {
|
|
||||||
// This should never happen, as the iteration function only returns nil.
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
h.symbols = symbols
|
|
||||||
|
|
||||||
return actualMint
|
return actualMint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1234,14 +1214,6 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie
|
||||||
h.metrics.seriesCreated.Inc()
|
h.metrics.seriesCreated.Inc()
|
||||||
h.numSeries.Inc()
|
h.numSeries.Inc()
|
||||||
|
|
||||||
h.symMtx.Lock()
|
|
||||||
defer h.symMtx.Unlock()
|
|
||||||
|
|
||||||
for _, l := range lset {
|
|
||||||
h.symbols[l.Name] = struct{}{}
|
|
||||||
h.symbols[l.Value] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
h.postings.Add(id, lset)
|
h.postings.Add(id, lset)
|
||||||
return s, true, nil
|
return s, true, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,16 +54,7 @@ func (h *headIndexReader) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *headIndexReader) Symbols() index.StringIter {
|
func (h *headIndexReader) Symbols() index.StringIter {
|
||||||
h.head.symMtx.RLock()
|
return h.head.postings.Symbols()
|
||||||
res := make([]string, 0, len(h.head.symbols))
|
|
||||||
|
|
||||||
for s := range h.head.symbols {
|
|
||||||
res = append(res, s)
|
|
||||||
}
|
|
||||||
h.head.symMtx.RUnlock()
|
|
||||||
|
|
||||||
sort.Strings(res)
|
|
||||||
return index.NewStringListIter(res)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SortedLabelValues returns label values present in the head for the
|
// SortedLabelValues returns label values present in the head for the
|
||||||
|
@ -88,8 +79,6 @@ func (h *headIndexReader) LabelValues(name string, matchers ...*labels.Matcher)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(matchers) == 0 {
|
if len(matchers) == 0 {
|
||||||
h.head.symMtx.RLock()
|
|
||||||
defer h.head.symMtx.RUnlock()
|
|
||||||
return h.head.postings.LabelValues(name), nil
|
return h.head.postings.LabelValues(name), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,10 +93,7 @@ func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(matchers) == 0 {
|
if len(matchers) == 0 {
|
||||||
h.head.symMtx.RLock()
|
|
||||||
labelNames := h.head.postings.LabelNames()
|
labelNames := h.head.postings.LabelNames()
|
||||||
h.head.symMtx.RUnlock()
|
|
||||||
|
|
||||||
sort.Strings(labelNames)
|
sort.Strings(labelNames)
|
||||||
return labelNames, nil
|
return labelNames, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -469,13 +469,14 @@ func TestHead_Truncate(t *testing.T) {
|
||||||
require.Nil(t, postingsB2)
|
require.Nil(t, postingsB2)
|
||||||
require.Nil(t, postingsC1)
|
require.Nil(t, postingsC1)
|
||||||
|
|
||||||
require.Equal(t, map[string]struct{}{
|
iter := h.postings.Symbols()
|
||||||
"": {}, // from 'all' postings list
|
symbols := []string{}
|
||||||
"a": {},
|
for iter.Next() {
|
||||||
"b": {},
|
symbols = append(symbols, iter.At())
|
||||||
"1": {},
|
}
|
||||||
"2": {},
|
require.Equal(t,
|
||||||
}, h.symbols)
|
[]string{"" /* from 'all' postings list */, "1", "2", "a", "b"},
|
||||||
|
symbols)
|
||||||
|
|
||||||
values := map[string]map[string]struct{}{}
|
values := map[string]map[string]struct{}{}
|
||||||
for _, name := range h.postings.LabelNames() {
|
for _, name := range h.postings.LabelNames() {
|
||||||
|
|
|
@ -58,6 +58,29 @@ func NewUnorderedMemPostings() *MemPostings {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Symbols returns an iterator over all unique name and value strings, in order.
|
||||||
|
func (p *MemPostings) Symbols() StringIter {
|
||||||
|
p.mtx.RLock()
|
||||||
|
|
||||||
|
// Add all the strings to a map to de-duplicate.
|
||||||
|
symbols := make(map[string]struct{}, 512)
|
||||||
|
for n, e := range p.m {
|
||||||
|
symbols[n] = struct{}{}
|
||||||
|
for v := range e {
|
||||||
|
symbols[v] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
p.mtx.RUnlock()
|
||||||
|
|
||||||
|
res := make([]string, 0, len(symbols))
|
||||||
|
for k := range symbols {
|
||||||
|
res = append(res, k)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Strings(res)
|
||||||
|
return NewStringListIter(res)
|
||||||
|
}
|
||||||
|
|
||||||
// SortedKeys returns a list of sorted label keys of the postings.
|
// SortedKeys returns a list of sorted label keys of the postings.
|
||||||
func (p *MemPostings) SortedKeys() []labels.Label {
|
func (p *MemPostings) SortedKeys() []labels.Label {
|
||||||
p.mtx.RLock()
|
p.mtx.RLock()
|
||||||
|
|
Loading…
Reference in a new issue