mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
tsdb: Make sure the cache for postings cardinality properly honors the label name (#12653)
Add a string remembering which label and limit the cache corresponds to. Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>
This commit is contained in:
parent
1200c89d0c
commit
28d8f1650c
17
tsdb/head.go
17
tsdb/head.go
|
@ -20,6 +20,7 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -110,7 +111,8 @@ type Head struct {
|
||||||
|
|
||||||
cardinalityMutex sync.Mutex
|
cardinalityMutex sync.Mutex
|
||||||
cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec.
|
cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec.
|
||||||
lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching.
|
cardinalityCacheKey string
|
||||||
|
lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching.
|
||||||
|
|
||||||
// chunkDiskMapper is used to write and read Head chunks to/from disk.
|
// chunkDiskMapper is used to write and read Head chunks to/from disk.
|
||||||
chunkDiskMapper *chunks.ChunkDiskMapper
|
chunkDiskMapper *chunks.ChunkDiskMapper
|
||||||
|
@ -988,16 +990,23 @@ func (h *Head) DisableNativeHistograms() {
|
||||||
|
|
||||||
// PostingsCardinalityStats returns highest cardinality stats by label and value names.
|
// PostingsCardinalityStats returns highest cardinality stats by label and value names.
|
||||||
func (h *Head) PostingsCardinalityStats(statsByLabelName string, limit int) *index.PostingsStats {
|
func (h *Head) PostingsCardinalityStats(statsByLabelName string, limit int) *index.PostingsStats {
|
||||||
|
cacheKey := statsByLabelName + ";" + strconv.Itoa(limit)
|
||||||
|
|
||||||
h.cardinalityMutex.Lock()
|
h.cardinalityMutex.Lock()
|
||||||
defer h.cardinalityMutex.Unlock()
|
defer h.cardinalityMutex.Unlock()
|
||||||
currentTime := time.Duration(time.Now().Unix()) * time.Second
|
if h.cardinalityCacheKey != cacheKey {
|
||||||
seconds := currentTime - h.lastPostingsStatsCall
|
|
||||||
if seconds > cardinalityCacheExpirationTime {
|
|
||||||
h.cardinalityCache = nil
|
h.cardinalityCache = nil
|
||||||
|
} else {
|
||||||
|
currentTime := time.Duration(time.Now().Unix()) * time.Second
|
||||||
|
seconds := currentTime - h.lastPostingsStatsCall
|
||||||
|
if seconds > cardinalityCacheExpirationTime {
|
||||||
|
h.cardinalityCache = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if h.cardinalityCache != nil {
|
if h.cardinalityCache != nil {
|
||||||
return h.cardinalityCache
|
return h.cardinalityCache
|
||||||
}
|
}
|
||||||
|
h.cardinalityCacheKey = cacheKey
|
||||||
h.cardinalityCache = h.postings.Stats(statsByLabelName, limit)
|
h.cardinalityCache = h.postings.Stats(statsByLabelName, limit)
|
||||||
h.lastPostingsStatsCall = time.Duration(time.Now().Unix()) * time.Second
|
h.lastPostingsStatsCall = time.Duration(time.Now().Unix()) * time.Second
|
||||||
|
|
||||||
|
|
|
@ -5618,3 +5618,26 @@ func TestStripeSeries_gc(t *testing.T) {
|
||||||
got = s.getByHash(hash, ms2.lset)
|
got = s.getByHash(hash, ms2.lset)
|
||||||
require.Nil(t, got)
|
require.Nil(t, got)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPostingsCardinalityStats(t *testing.T) {
|
||||||
|
head := &Head{postings: index.NewMemPostings()}
|
||||||
|
head.postings.Add(1, labels.FromStrings(labels.MetricName, "t", "n", "v1"))
|
||||||
|
head.postings.Add(2, labels.FromStrings(labels.MetricName, "t", "n", "v2"))
|
||||||
|
|
||||||
|
statsForMetricName := head.PostingsCardinalityStats(labels.MetricName, 10)
|
||||||
|
head.postings.Add(3, labels.FromStrings(labels.MetricName, "t", "n", "v3"))
|
||||||
|
// Using cache.
|
||||||
|
require.Equal(t, statsForMetricName, head.PostingsCardinalityStats(labels.MetricName, 10))
|
||||||
|
|
||||||
|
statsForSomeLabel := head.PostingsCardinalityStats("n", 10)
|
||||||
|
// Cache should be evicted because of the change of label name.
|
||||||
|
require.NotEqual(t, statsForMetricName, statsForSomeLabel)
|
||||||
|
head.postings.Add(4, labels.FromStrings(labels.MetricName, "t", "n", "v4"))
|
||||||
|
// Using cache.
|
||||||
|
require.Equal(t, statsForSomeLabel, head.PostingsCardinalityStats("n", 10))
|
||||||
|
// Cache should be evicted because of the change of limit parameter.
|
||||||
|
statsForSomeLabel1 := head.PostingsCardinalityStats("n", 1)
|
||||||
|
require.NotEqual(t, statsForSomeLabel1, statsForSomeLabel)
|
||||||
|
// Using cache.
|
||||||
|
require.Equal(t, statsForSomeLabel1, head.PostingsCardinalityStats("n", 1))
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue