diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3edd31b918..2aa5d055fc 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -17,6 +17,7 @@ import ( "context" "errors" "math" + "runtime" "strconv" "sync" "time" @@ -35,6 +36,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/chunks" @@ -407,9 +409,12 @@ type QueueManager struct { storeClient WriteClient seriesMtx sync.Mutex // Covers seriesLabels and droppedSeries. - seriesLabels map[chunks.HeadSeriesRef]labels.Labels + seriesLabels map[chunks.HeadSeriesRef]*labels.Labels droppedSeries map[chunks.HeadSeriesRef]struct{} + cleanUpLock sync.Mutex + cleanUpSeries map[chunks.HeadSeriesRef]int64 + seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first. seriesSegmentIndexes map[chunks.HeadSeriesRef]int @@ -472,9 +477,10 @@ func NewQueueManager( sendExemplars: enableExemplarRemoteWrite, sendNativeHistograms: enableNativeHistogramRemoteWrite, - seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), + seriesLabels: make(map[chunks.HeadSeriesRef]*labels.Labels), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), droppedSeries: make(map[chunks.HeadSeriesRef]struct{}), + cleanUpSeries: make(map[chunks.HeadSeriesRef]int64), numShards: cfg.MinShards, reshardChan: make(chan int), @@ -499,6 +505,48 @@ func NewQueueManager( return t } +func (t *QueueManager) labelsCacheCleanupLoop() { + defer t.wg.Done() + // todo: make this configurable + ticker := time.NewTicker(1 * time.Minute) + for { + select { + case <-ticker.C: + if len(t.cleanUpSeries) == 0 { + continue + } + t.cleanUpLock.Lock() + t.seriesMtx.Lock() + removed := 0 + newCleanup := make(map[chunks.HeadSeriesRef]int64) + for s, ts := range t.cleanUpSeries { + // todo: make this configurable or derive a reasonable time, + // is the staleness marker present for 5m? + if time.Now().Unix()-ts > 60 { + delete(t.seriesLabels, s) + removed++ + continue + } + newCleanup[s] = ts + } + if removed > 0 { + t.cleanUpSeries = newCleanup + temp := make(map[chunks.HeadSeriesRef]*labels.Labels) + for k, v := range t.seriesLabels { + temp[k] = v + } + t.seriesLabels = temp + runtime.GC() + } + + t.cleanUpLock.Unlock() + t.seriesMtx.Unlock() + case <-t.quit: + return + } + } +} + // AppendMetadata sends metadata to the remote storage. Metadata is sent in batches, but is not parallelized. func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) { mm := make([]prompb.MetricMetadata, 0, len(metadata)) @@ -575,6 +623,14 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p func (t *QueueManager) Append(samples []record.RefSample) bool { outer: for _, s := range samples { + t.cleanUpLock.Lock() + if value.IsStaleNaN(s.V) { + t.cleanUpSeries[s.Ref] = time.Now().Unix() + } else if _, ok := t.cleanUpSeries[s.Ref]; ok { + delete(t.cleanUpSeries, s.Ref) + } + t.cleanUpLock.Unlock() + t.seriesMtx.Lock() lbls, ok := t.seriesLabels[s.Ref] if !ok { @@ -599,7 +655,7 @@ outer: default: } if t.shards.enqueue(s.Ref, timeSeries{ - seriesLabels: lbls, + seriesLabels: *lbls, timestamp: s.T, value: s.V, sType: tSample, @@ -649,7 +705,7 @@ outer: default: } if t.shards.enqueue(e.Ref, timeSeries{ - seriesLabels: lbls, + seriesLabels: *lbls, timestamp: e.T, value: e.V, exemplarLabels: e.Labels, @@ -697,7 +753,7 @@ outer: default: } if t.shards.enqueue(h.Ref, timeSeries{ - seriesLabels: lbls, + seriesLabels: *lbls, timestamp: h.T, histogram: h.H, sType: tHistogram, @@ -744,7 +800,7 @@ outer: default: } if t.shards.enqueue(h.Ref, timeSeries{ - seriesLabels: lbls, + seriesLabels: *lbls, timestamp: h.T, floatHistogram: h.FH, sType: tFloatHistogram, @@ -780,9 +836,10 @@ func (t *QueueManager) Start() { t.metadataWatcher.Start() } - t.wg.Add(2) + t.wg.Add(3) go t.updateShardsLoop() go t.reshardLoop() + go t.labelsCacheCleanupLoop() } // Stop stops sending samples to the remote storage and waits for pending @@ -805,7 +862,7 @@ func (t *QueueManager) Stop() { // On shutdown, release the strings in the labels from the intern pool. t.seriesMtx.Lock() for _, labels := range t.seriesLabels { - t.releaseLabels(labels) + t.releaseLabels(*labels) } t.seriesMtx.Unlock() t.metrics.unregister() @@ -833,9 +890,9 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { // in case we do we need to ensure we do not leak the replaced interned // strings. if orig, ok := t.seriesLabels[s.Ref]; ok { - t.releaseLabels(orig) + t.releaseLabels(*orig) } - t.seriesLabels[s.Ref] = lbls + t.seriesLabels[s.Ref] = &lbls } } @@ -862,7 +919,7 @@ func (t *QueueManager) SeriesReset(index int) { for k, v := range t.seriesSegmentIndexes { if v < index { delete(t.seriesSegmentIndexes, k) - t.releaseLabels(t.seriesLabels[k]) + t.releaseLabels(*t.seriesLabels[k]) delete(t.seriesLabels, k) delete(t.droppedSeries, k) }