diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index e37ec8c70..4b0c72cf6 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -413,9 +413,10 @@ type QueueManager struct { clientMtx sync.RWMutex storeClient WriteClient - seriesMtx sync.Mutex // Covers seriesLabels and droppedSeries. + seriesMtx sync.Mutex // Covers seriesLabels, droppedSeries and builder. seriesLabels map[chunks.HeadSeriesRef]labels.Labels droppedSeries map[chunks.HeadSeriesRef]struct{} + builder *labels.Builder seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first. seriesSegmentIndexes map[chunks.HeadSeriesRef]int @@ -482,6 +483,7 @@ func NewQueueManager( seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), droppedSeries: make(map[chunks.HeadSeriesRef]struct{}), + builder: labels.NewBuilder(labels.EmptyLabels()), numShards: cfg.MinShards, reshardChan: make(chan int), @@ -897,12 +899,14 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { // Just make sure all the Refs of Series will insert into seriesSegmentIndexes map for tracking. t.seriesSegmentIndexes[s.Ref] = index - ls := processExternalLabels(s.Labels, t.externalLabels) - lbls, keep := relabel.Process(ls, t.relabelConfigs...) - if !keep || lbls.IsEmpty() { + t.builder.Reset(s.Labels) + processExternalLabels(t.builder, t.externalLabels) + keep := relabel.ProcessBuilder(t.builder, t.relabelConfigs...) + if !keep { t.droppedSeries[s.Ref] = struct{}{} continue } + lbls := t.builder.Labels() t.internLabels(lbls) // We should not ever be replacing a series labels in the map, but just @@ -967,30 +971,14 @@ func (t *QueueManager) releaseLabels(ls labels.Labels) { ls.ReleaseStrings(t.interner.release) } -// processExternalLabels merges externalLabels into ls. If ls contains -// a label in externalLabels, the value in ls wins. -func processExternalLabels(ls labels.Labels, externalLabels []labels.Label) labels.Labels { - if len(externalLabels) == 0 { - return ls - } - - b := labels.NewScratchBuilder(ls.Len() + len(externalLabels)) - j := 0 - ls.Range(func(l labels.Label) { - for j < len(externalLabels) && l.Name > externalLabels[j].Name { - b.Add(externalLabels[j].Name, externalLabels[j].Value) - j++ +// processExternalLabels merges externalLabels into b. If b contains +// a label in externalLabels, the value in b wins. +func processExternalLabels(b *labels.Builder, externalLabels []labels.Label) { + for _, el := range externalLabels { + if b.Get(el.Name) == "" { + b.Set(el.Name, el.Value) } - if j < len(externalLabels) && l.Name == externalLabels[j].Name { - j++ - } - b.Add(l.Name, l.Value) - }) - for ; j < len(externalLabels); j++ { - b.Add(externalLabels[j].Name, externalLabels[j].Value) } - - return b.Labels() } func (t *QueueManager) updateShardsLoop() { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index cd3c39d9d..8a6f68208 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -1013,7 +1013,8 @@ func BenchmarkStartup(b *testing.B) { } func TestProcessExternalLabels(t *testing.T) { - for _, tc := range []struct { + b := labels.NewBuilder(labels.EmptyLabels()) + for i, tc := range []struct { labels labels.Labels externalLabels []labels.Label expected labels.Labels @@ -1074,7 +1075,9 @@ func TestProcessExternalLabels(t *testing.T) { expected: labels.FromStrings("a", "b", "c", "d", "e", "f"), }, } { - require.Equal(t, tc.expected, processExternalLabels(tc.labels, tc.externalLabels)) + b.Reset(tc.labels) + processExternalLabels(b, tc.externalLabels) + require.Equal(t, tc.expected, b.Labels(), "test %d", i) } }