mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
tsdb: remove duplicate values set to reduce memory usage(map overhead) (#7915)
Signed-off-by: Xiaochao Dong (@damnever) <dxc.wolf@gmail.com>
This commit is contained in:
parent
90fc6be70f
commit
a282d25099
68
tsdb/head.go
68
tsdb/head.go
|
@ -20,7 +20,6 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -73,7 +72,6 @@ type Head struct {
|
||||||
|
|
||||||
symMtx sync.RWMutex
|
symMtx sync.RWMutex
|
||||||
symbols map[string]struct{}
|
symbols map[string]struct{}
|
||||||
values map[string]stringset // Label names to possible values.
|
|
||||||
|
|
||||||
deletedMtx sync.Mutex
|
deletedMtx sync.Mutex
|
||||||
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.
|
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.
|
||||||
|
@ -303,7 +301,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
|
||||||
wal: wal,
|
wal: wal,
|
||||||
logger: l,
|
logger: l,
|
||||||
series: newStripeSeries(stripeSize, seriesCallback),
|
series: newStripeSeries(stripeSize, seriesCallback),
|
||||||
values: map[string]stringset{},
|
|
||||||
symbols: map[string]struct{}{},
|
symbols: map[string]struct{}{},
|
||||||
postings: index.NewUnorderedMemPostings(),
|
postings: index.NewUnorderedMemPostings(),
|
||||||
tombstones: tombstones.NewMemTombstones(),
|
tombstones: tombstones.NewMemTombstones(),
|
||||||
|
@ -1345,24 +1342,15 @@ func (h *Head) gc() {
|
||||||
defer h.symMtx.Unlock()
|
defer h.symMtx.Unlock()
|
||||||
|
|
||||||
symbols := make(map[string]struct{}, len(h.symbols))
|
symbols := make(map[string]struct{}, len(h.symbols))
|
||||||
values := make(map[string]stringset, len(h.values))
|
if err := h.postings.Iter(func(l labels.Label, _ index.Postings) error {
|
||||||
if err := h.postings.Iter(func(t labels.Label, _ index.Postings) error {
|
symbols[l.Name] = struct{}{}
|
||||||
symbols[t.Name] = struct{}{}
|
symbols[l.Value] = struct{}{}
|
||||||
symbols[t.Value] = struct{}{}
|
|
||||||
|
|
||||||
ss, ok := values[t.Name]
|
|
||||||
if !ok {
|
|
||||||
ss = stringset{}
|
|
||||||
values[t.Name] = ss
|
|
||||||
}
|
|
||||||
ss.set(t.Value)
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
// This should never happen, as the iteration function only returns nil.
|
// This should never happen, as the iteration function only returns nil.
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
h.symbols = symbols
|
h.symbols = symbols
|
||||||
h.values = values
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tombstones returns a new reader over the head's tombstones
|
// Tombstones returns a new reader over the head's tombstones
|
||||||
|
@ -1572,37 +1560,27 @@ func (h *headIndexReader) SortedLabelValues(name string) ([]string, error) {
|
||||||
// specific label name that are within the time range mint to maxt.
|
// specific label name that are within the time range mint to maxt.
|
||||||
func (h *headIndexReader) LabelValues(name string) ([]string, error) {
|
func (h *headIndexReader) LabelValues(name string) ([]string, error) {
|
||||||
h.head.symMtx.RLock()
|
h.head.symMtx.RLock()
|
||||||
|
defer h.head.symMtx.RUnlock()
|
||||||
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
|
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
|
||||||
h.head.symMtx.RUnlock()
|
|
||||||
return []string{}, nil
|
return []string{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
sl := make([]string, 0, len(h.head.values[name]))
|
values := h.head.postings.LabelValues(name)
|
||||||
for s := range h.head.values[name] {
|
return values, nil
|
||||||
sl = append(sl, s)
|
|
||||||
}
|
|
||||||
h.head.symMtx.RUnlock()
|
|
||||||
return sl, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelNames returns all the unique label names present in the head
|
// LabelNames returns all the unique label names present in the head
|
||||||
// that are within the time range mint to maxt.
|
// that are within the time range mint to maxt.
|
||||||
func (h *headIndexReader) LabelNames() ([]string, error) {
|
func (h *headIndexReader) LabelNames() ([]string, error) {
|
||||||
h.head.symMtx.RLock()
|
h.head.symMtx.RLock()
|
||||||
defer h.head.symMtx.RUnlock()
|
|
||||||
|
|
||||||
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
|
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
|
||||||
|
h.head.symMtx.RUnlock()
|
||||||
return []string{}, nil
|
return []string{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
labelNames := make([]string, 0, len(h.head.values))
|
labelNames := h.head.postings.LabelNames()
|
||||||
for name := range h.head.values {
|
h.head.symMtx.RUnlock()
|
||||||
if name == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
labelNames = append(labelNames, name)
|
|
||||||
}
|
|
||||||
sort.Strings(labelNames)
|
sort.Strings(labelNames)
|
||||||
return labelNames, nil
|
return labelNames, nil
|
||||||
}
|
}
|
||||||
|
@ -1714,13 +1692,6 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie
|
||||||
defer h.symMtx.Unlock()
|
defer h.symMtx.Unlock()
|
||||||
|
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
valset, ok := h.values[l.Name]
|
|
||||||
if !ok {
|
|
||||||
valset = stringset{}
|
|
||||||
h.values[l.Name] = valset
|
|
||||||
}
|
|
||||||
valset.set(l.Value)
|
|
||||||
|
|
||||||
h.symbols[l.Name] = struct{}{}
|
h.symbols[l.Name] = struct{}{}
|
||||||
h.symbols[l.Value] = struct{}{}
|
h.symbols[l.Value] = struct{}{}
|
||||||
}
|
}
|
||||||
|
@ -2335,25 +2306,6 @@ func (it *memSafeIterator) At() (int64, float64) {
|
||||||
return s.t, s.v
|
return s.t, s.v
|
||||||
}
|
}
|
||||||
|
|
||||||
type stringset map[string]struct{}
|
|
||||||
|
|
||||||
func (ss stringset) set(s string) {
|
|
||||||
ss[s] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ss stringset) String() string {
|
|
||||||
return strings.Join(ss.slice(), ",")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ss stringset) slice() []string {
|
|
||||||
slice := make([]string, 0, len(ss))
|
|
||||||
for k := range ss {
|
|
||||||
slice = append(slice, k)
|
|
||||||
}
|
|
||||||
sort.Strings(slice)
|
|
||||||
return slice
|
|
||||||
}
|
|
||||||
|
|
||||||
type mmappedChunk struct {
|
type mmappedChunk struct {
|
||||||
ref uint64
|
ref uint64
|
||||||
numSamples uint16
|
numSamples uint16
|
||||||
|
|
|
@ -387,11 +387,21 @@ func TestHead_Truncate(t *testing.T) {
|
||||||
"2": {},
|
"2": {},
|
||||||
}, h.symbols)
|
}, h.symbols)
|
||||||
|
|
||||||
testutil.Equals(t, map[string]stringset{
|
values := map[string]map[string]struct{}{}
|
||||||
|
for _, name := range h.postings.LabelNames() {
|
||||||
|
ss, ok := values[name]
|
||||||
|
if !ok {
|
||||||
|
ss = map[string]struct{}{}
|
||||||
|
values[name] = ss
|
||||||
|
}
|
||||||
|
for _, value := range h.postings.LabelValues(name) {
|
||||||
|
ss[value] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
testutil.Equals(t, map[string]map[string]struct{}{
|
||||||
"a": {"1": struct{}{}, "2": struct{}{}},
|
"a": {"1": struct{}{}, "2": struct{}{}},
|
||||||
"b": {"1": struct{}{}},
|
"b": {"1": struct{}{}},
|
||||||
"": {"": struct{}{}},
|
}, values)
|
||||||
}, h.values)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate various behaviors brought on by firstChunkID accounting for
|
// Validate various behaviors brought on by firstChunkID accounting for
|
||||||
|
|
|
@ -79,6 +79,36 @@ func (p *MemPostings) SortedKeys() []labels.Label {
|
||||||
return keys
|
return keys
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LabelNames returns all the unique label names.
|
||||||
|
func (p *MemPostings) LabelNames() []string {
|
||||||
|
p.mtx.RLock()
|
||||||
|
defer p.mtx.RUnlock()
|
||||||
|
n := len(p.m)
|
||||||
|
if n == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
names := make([]string, 0, n-1)
|
||||||
|
for name := range p.m {
|
||||||
|
if name != allPostingsKey.Name {
|
||||||
|
names = append(names, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return names
|
||||||
|
}
|
||||||
|
|
||||||
|
// LabelValues returns label values for the given name.
|
||||||
|
func (p *MemPostings) LabelValues(name string) []string {
|
||||||
|
p.mtx.RLock()
|
||||||
|
defer p.mtx.RUnlock()
|
||||||
|
|
||||||
|
values := make([]string, 0, len(p.m[name]))
|
||||||
|
for v := range p.m[name] {
|
||||||
|
values = append(values, v)
|
||||||
|
}
|
||||||
|
return values
|
||||||
|
}
|
||||||
|
|
||||||
// PostingsStats contains cardinality based statistics for postings.
|
// PostingsStats contains cardinality based statistics for postings.
|
||||||
type PostingsStats struct {
|
type PostingsStats struct {
|
||||||
CardinalityMetricsStats []Stat
|
CardinalityMetricsStats []Stat
|
||||||
|
|
|
@ -106,7 +106,7 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe
|
||||||
|
|
||||||
postings := index.NewMemPostings()
|
postings := index.NewMemPostings()
|
||||||
chkReader := mockChunkReader(make(map[uint64]chunkenc.Chunk))
|
chkReader := mockChunkReader(make(map[uint64]chunkenc.Chunk))
|
||||||
lblIdx := make(map[string]stringset)
|
lblIdx := make(map[string]map[string]struct{})
|
||||||
mi := newMockIndex()
|
mi := newMockIndex()
|
||||||
blockMint := int64(math.MaxInt64)
|
blockMint := int64(math.MaxInt64)
|
||||||
blockMaxt := int64(math.MinInt64)
|
blockMaxt := int64(math.MinInt64)
|
||||||
|
@ -145,10 +145,10 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe
|
||||||
for _, l := range ls {
|
for _, l := range ls {
|
||||||
vs, present := lblIdx[l.Name]
|
vs, present := lblIdx[l.Name]
|
||||||
if !present {
|
if !present {
|
||||||
vs = stringset{}
|
vs = map[string]struct{}{}
|
||||||
lblIdx[l.Name] = vs
|
lblIdx[l.Name] = vs
|
||||||
}
|
}
|
||||||
vs.set(l.Value)
|
vs[l.Value] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue