opportunistically clean up stale series from remote writes label cache

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2023-08-29 20:52:38 -07:00
parent 886945cda7
commit 7b70ff5aef

View file

@ -17,6 +17,7 @@ import (
"context"
"errors"
"math"
"runtime"
"strconv"
"sync"
"time"
@ -35,6 +36,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -407,9 +409,12 @@ type QueueManager struct {
storeClient WriteClient
seriesMtx sync.Mutex // Covers seriesLabels and droppedSeries.
seriesLabels map[chunks.HeadSeriesRef]labels.Labels
seriesLabels map[chunks.HeadSeriesRef]*labels.Labels
droppedSeries map[chunks.HeadSeriesRef]struct{}
cleanUpLock sync.Mutex
cleanUpSeries map[chunks.HeadSeriesRef]int64
seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first.
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
@ -472,9 +477,10 @@ func NewQueueManager(
sendExemplars: enableExemplarRemoteWrite,
sendNativeHistograms: enableNativeHistogramRemoteWrite,
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
seriesLabels: make(map[chunks.HeadSeriesRef]*labels.Labels),
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
droppedSeries: make(map[chunks.HeadSeriesRef]struct{}),
cleanUpSeries: make(map[chunks.HeadSeriesRef]int64),
numShards: cfg.MinShards,
reshardChan: make(chan int),
@ -499,6 +505,48 @@ func NewQueueManager(
return t
}
func (t *QueueManager) labelsCacheCleanupLoop() {
defer t.wg.Done()
// todo: make this configurable
ticker := time.NewTicker(1 * time.Minute)
for {
select {
case <-ticker.C:
if len(t.cleanUpSeries) == 0 {
continue
}
t.cleanUpLock.Lock()
t.seriesMtx.Lock()
removed := 0
newCleanup := make(map[chunks.HeadSeriesRef]int64)
for s, ts := range t.cleanUpSeries {
// todo: make this configurable or derive a reasonable time,
// is the staleness marker present for 5m?
if time.Now().Unix()-ts > 60 {
delete(t.seriesLabels, s)
removed++
continue
}
newCleanup[s] = ts
}
if removed > 0 {
t.cleanUpSeries = newCleanup
temp := make(map[chunks.HeadSeriesRef]*labels.Labels)
for k, v := range t.seriesLabels {
temp[k] = v
}
t.seriesLabels = temp
runtime.GC()
}
t.cleanUpLock.Unlock()
t.seriesMtx.Unlock()
case <-t.quit:
return
}
}
}
// AppendMetadata sends metadata to the remote storage. Metadata is sent in batches, but is not parallelized.
func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) {
mm := make([]prompb.MetricMetadata, 0, len(metadata))
@ -575,6 +623,14 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
func (t *QueueManager) Append(samples []record.RefSample) bool {
outer:
for _, s := range samples {
t.cleanUpLock.Lock()
if value.IsStaleNaN(s.V) {
t.cleanUpSeries[s.Ref] = time.Now().Unix()
} else if _, ok := t.cleanUpSeries[s.Ref]; ok {
delete(t.cleanUpSeries, s.Ref)
}
t.cleanUpLock.Unlock()
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[s.Ref]
if !ok {
@ -599,7 +655,7 @@ outer:
default:
}
if t.shards.enqueue(s.Ref, timeSeries{
seriesLabels: lbls,
seriesLabels: *lbls,
timestamp: s.T,
value: s.V,
sType: tSample,
@ -649,7 +705,7 @@ outer:
default:
}
if t.shards.enqueue(e.Ref, timeSeries{
seriesLabels: lbls,
seriesLabels: *lbls,
timestamp: e.T,
value: e.V,
exemplarLabels: e.Labels,
@ -697,7 +753,7 @@ outer:
default:
}
if t.shards.enqueue(h.Ref, timeSeries{
seriesLabels: lbls,
seriesLabels: *lbls,
timestamp: h.T,
histogram: h.H,
sType: tHistogram,
@ -744,7 +800,7 @@ outer:
default:
}
if t.shards.enqueue(h.Ref, timeSeries{
seriesLabels: lbls,
seriesLabels: *lbls,
timestamp: h.T,
floatHistogram: h.FH,
sType: tFloatHistogram,
@ -780,9 +836,10 @@ func (t *QueueManager) Start() {
t.metadataWatcher.Start()
}
t.wg.Add(2)
t.wg.Add(3)
go t.updateShardsLoop()
go t.reshardLoop()
go t.labelsCacheCleanupLoop()
}
// Stop stops sending samples to the remote storage and waits for pending
@ -805,7 +862,7 @@ func (t *QueueManager) Stop() {
// On shutdown, release the strings in the labels from the intern pool.
t.seriesMtx.Lock()
for _, labels := range t.seriesLabels {
t.releaseLabels(labels)
t.releaseLabels(*labels)
}
t.seriesMtx.Unlock()
t.metrics.unregister()
@ -833,9 +890,9 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
// in case we do we need to ensure we do not leak the replaced interned
// strings.
if orig, ok := t.seriesLabels[s.Ref]; ok {
t.releaseLabels(orig)
t.releaseLabels(*orig)
}
t.seriesLabels[s.Ref] = lbls
t.seriesLabels[s.Ref] = &lbls
}
}
@ -862,7 +919,7 @@ func (t *QueueManager) SeriesReset(index int) {
for k, v := range t.seriesSegmentIndexes {
if v < index {
delete(t.seriesSegmentIndexes, k)
t.releaseLabels(t.seriesLabels[k])
t.releaseLabels(*t.seriesLabels[k])
delete(t.seriesLabels, k)
delete(t.droppedSeries, k)
}