rip out interning, except if we have multiple remote writes

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2024-08-27 10:51:27 -07:00
parent 75e6497ec1
commit 2992aaec8e
7 changed files with 174 additions and 97 deletions

2
go.mod
View file

@ -1,6 +1,6 @@
module github.com/prometheus/prometheus module github.com/prometheus/prometheus
go 1.21.0 go 1.23.0
toolchain go1.22.5 toolchain go1.22.5

View file

@ -0,0 +1,46 @@
goos: linux
goarch: amd64
pkg: github.com/prometheus/prometheus/storage/remote
cpu: AMD Ryzen 9 5950X 16-Core Processor
BenchmarkStoreSeries/plain-32 4168 256049 ns/op 228660 B/op 1596 allocs/op
BenchmarkStoreSeries/plain-32 4500 254302 ns/op 228643 B/op 1596 allocs/op
BenchmarkStoreSeries/plain-32 4552 255622 ns/op 228618 B/op 1596 allocs/op
BenchmarkStoreSeries/plain-32 4460 254980 ns/op 228615 B/op 1596 allocs/op
BenchmarkStoreSeries/plain-32 4506 259763 ns/op 228626 B/op 1596 allocs/op
BenchmarkStoreSeries/externalLabels-32 1262 895862 ns/op 869235 B/op 2596 allocs/op
BenchmarkStoreSeries/externalLabels-32 1324 897306 ns/op 869256 B/op 2597 allocs/op
BenchmarkStoreSeries/externalLabels-32 1297 894041 ns/op 869225 B/op 2597 allocs/op
BenchmarkStoreSeries/externalLabels-32 1315 907147 ns/op 869157 B/op 2596 allocs/op
BenchmarkStoreSeries/externalLabels-32 1207 895866 ns/op 869218 B/op 2597 allocs/op
BenchmarkStoreSeries/relabel-32 1635 784739 ns/op 741725 B/op 5597 allocs/op
BenchmarkStoreSeries/relabel-32 1532 727039 ns/op 741748 B/op 5598 allocs/op
BenchmarkStoreSeries/relabel-32 1604 729110 ns/op 741750 B/op 5597 allocs/op
BenchmarkStoreSeries/relabel-32 1614 729609 ns/op 741696 B/op 5597 allocs/op
BenchmarkStoreSeries/relabel-32 1626 727394 ns/op 741669 B/op 5597 allocs/op
BenchmarkStoreSeries/externalLabels+relabel-32 987 1208797 ns/op 837849 B/op 5597 allocs/op
BenchmarkStoreSeries/externalLabels+relabel-32 985 1197194 ns/op 837867 B/op 5597 allocs/op
BenchmarkStoreSeries/externalLabels+relabel-32 992 1195505 ns/op 837853 B/op 5597 allocs/op
BenchmarkStoreSeries/externalLabels+relabel-32 999 1201181 ns/op 837831 B/op 5597 allocs/op
BenchmarkStoreSeries/externalLabels+relabel-32 1000 1195945 ns/op 837889 B/op 5597 allocs/op
BenchmarkStoreSeries_TwoEndpoints/plain-32 2469 475513 ns/op 436320 B/op 2731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/plain-32 2523 478113 ns/op 436257 B/op 2731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/plain-32 2458 475820 ns/op 436279 B/op 2731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/plain-32 2492 472694 ns/op 436243 B/op 2731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/plain-32 2437 476160 ns/op 436259 B/op 2731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 679 1747548 ns/op 1717397 B/op 4731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 676 1754213 ns/op 1717468 B/op 4732 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 674 1739374 ns/op 1717653 B/op 4732 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 679 1738507 ns/op 1717426 B/op 4732 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 694 1739284 ns/op 1717384 B/op 4732 allocs/op
BenchmarkStoreSeries_TwoEndpoints/relabel-32 850 1399855 ns/op 1462383 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/relabel-32 834 1405426 ns/op 1462345 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/relabel-32 842 1424322 ns/op 1462449 B/op 10734 allocs/op
BenchmarkStoreSeries_TwoEndpoints/relabel-32 841 1404540 ns/op 1462356 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/relabel-32 843 1414279 ns/op 1462380 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 508 2351248 ns/op 1654492 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 507 2368400 ns/op 1654660 B/op 10734 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 505 2347374 ns/op 1654649 B/op 10734 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 507 2349636 ns/op 1654516 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 504 2349570 ns/op 1654583 B/op 10733 allocs/op
PASS
ok github.com/prometheus/prometheus/storage/remote 53.470s

View file

@ -23,6 +23,8 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
"go.uber.org/atomic" "go.uber.org/atomic"
) )
@ -35,52 +37,32 @@ var noReferenceReleases = promauto.NewCounter(prometheus.CounterOpts{
type pool struct { type pool struct {
mtx sync.RWMutex mtx sync.RWMutex
pool map[string]*entry pool map[chunks.HeadSeriesRef]*entry
shouldIntern bool
} }
type entry struct { type entry struct {
refs atomic.Int64 refs atomic.Int64
lset labels.Labels
s string
} }
func newEntry(s string) *entry { func newEntry(lset labels.Labels) *entry {
return &entry{s: s} return &entry{lset: lset}
} }
func newPool() *pool { func newPool(shouldIntern bool) *pool {
return &pool{ return &pool{
pool: map[string]*entry{}, pool: map[chunks.HeadSeriesRef]*entry{},
shouldIntern: shouldIntern,
} }
} }
func (p *pool) intern(s string) string { func (p *pool) release(ref chunks.HeadSeriesRef) {
if s == "" { if !p.shouldIntern {
return "" return
} }
p.mtx.RLock() p.mtx.RLock()
interned, ok := p.pool[s] interned, ok := p.pool[ref]
p.mtx.RUnlock()
if ok {
interned.refs.Inc()
return interned.s
}
p.mtx.Lock()
defer p.mtx.Unlock()
if interned, ok := p.pool[s]; ok {
interned.refs.Inc()
return interned.s
}
p.pool[s] = newEntry(s)
p.pool[s].refs.Store(1)
return s
}
func (p *pool) release(s string) {
p.mtx.RLock()
interned, ok := p.pool[s]
p.mtx.RUnlock() p.mtx.RUnlock()
if !ok { if !ok {
@ -98,5 +80,33 @@ func (p *pool) release(s string) {
if interned.refs.Load() != 0 { if interned.refs.Load() != 0 {
return return
} }
delete(p.pool, s) delete(p.pool, ref)
}
func (p *pool) intern(ref chunks.HeadSeriesRef, lset labels.Labels) labels.Labels {
if !p.shouldIntern {
return lset
}
p.mtx.RLock()
interned, ok := p.pool[ref]
p.mtx.RUnlock()
if ok {
interned.refs.Inc()
return interned.lset
}
p.mtx.Lock()
defer p.mtx.Unlock()
if interned, ok := p.pool[ref]; ok {
interned.refs.Inc()
return interned.lset
}
if len(lset) == 0 {
return nil
}
p.pool[ref] = newEntry(lset)
p.pool[ref].refs.Store(1)
return p.pool[ref].lset
} }

View file

@ -24,67 +24,79 @@ import (
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
) )
func TestIntern(t *testing.T) { func TestIntern(t *testing.T) {
interner := newPool() interner := newPool(true)
testString := "TestIntern" testString := "TestIntern_DeleteRef"
interner.intern(testString) ref := chunks.HeadSeriesRef(1234)
interned, ok := interner.pool[testString]
lset := labels.FromStrings("name", testString)
interner.intern(ref, lset)
interned, ok := interner.pool[ref]
require.True(t, ok) require.True(t, ok)
require.Equal(t, lset, interned.lset)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))
} }
func TestIntern_MultiRef(t *testing.T) { func TestIntern_MultiRef(t *testing.T) {
interner := newPool() interner := newPool(true)
testString := "TestIntern_MultiRef" testString := "TestIntern_DeleteRef"
ref := chunks.HeadSeriesRef(1234)
interner.intern(testString) lset := labels.FromStrings("name", testString)
interned, ok := interner.pool[testString] interner.intern(ref, lset)
interned, ok := interner.pool[ref]
require.True(t, ok) require.True(t, ok)
require.Equal(t, lset, interned.lset)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))
interner.intern(testString) interner.intern(ref, lset)
interned, ok = interner.pool[testString] interned, ok = interner.pool[ref]
require.True(t, ok) require.NotNil(t, interned)
require.Equal(t, int64(2), interned.refs.Load(), fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs.Load())) require.Equal(t, int64(2), interned.refs.Load(), fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs.Load()))
} }
func TestIntern_DeleteRef(t *testing.T) { func TestIntern_DeleteRef(t *testing.T) {
interner := newPool() interner := newPool(true)
testString := "TestIntern_DeleteRef" testString := "TestIntern_DeleteRef"
ref := chunks.HeadSeriesRef(1234)
interner.intern(ref, labels.FromStrings("name", testString))
interned, ok := interner.pool[ref]
interner.intern(testString) require.NotNil(t, interned)
interned, ok := interner.pool[testString]
require.True(t, ok)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))
interner.release(testString) interner.release(ref)
_, ok = interner.pool[testString] _, ok = interner.pool[ref]
require.False(t, ok) require.False(t, ok)
} }
func TestIntern_MultiRef_Concurrent(t *testing.T) { func TestIntern_MultiRef_Concurrent(t *testing.T) {
interner := newPool() interner := newPool(true)
testString := "TestIntern_MultiRef_Concurrent" testString := "TestIntern_MultiRef_Concurrent"
ref := chunks.HeadSeriesRef(1234)
interner.intern(testString) interner.intern(ref, labels.FromStrings("name", testString))
interned, ok := interner.pool[testString] interned, ok := interner.pool[ref]
require.True(t, ok)
require.NotNil(t, interned)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))
go interner.release(testString) go interner.release(ref)
interner.intern(testString) interner.intern(ref, labels.FromStrings("name", testString))
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
interner.mtx.RLock() interner.mtx.RLock()
interned, ok = interner.pool[testString] interned, ok = interner.pool[ref]
interner.mtx.RUnlock() interner.mtx.RUnlock()
require.True(t, ok) require.True(t, ok)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))

View file

@ -702,8 +702,13 @@ outer:
continue continue
} }
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[s.Ref] var lbls labels.Labels
if !ok { if t.interner.shouldIntern {
lbls = t.interner.intern(s.Ref, nil)
} else {
lbls = t.seriesLabels[s.Ref]
}
if len(lbls) == 0 {
t.dataDropped.incr(1) t.dataDropped.incr(1)
if _, ok := t.droppedSeries[s.Ref]; !ok { if _, ok := t.droppedSeries[s.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
@ -764,8 +769,13 @@ outer:
continue continue
} }
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[e.Ref] var lbls labels.Labels
if !ok { if t.interner.shouldIntern {
lbls = t.interner.intern(e.Ref, nil)
} else {
lbls = t.seriesLabels[e.Ref]
}
if len(lbls) == 0 {
// Track dropped exemplars in the same EWMA for sharding calc. // Track dropped exemplars in the same EWMA for sharding calc.
t.dataDropped.incr(1) t.dataDropped.incr(1)
if _, ok := t.droppedSeries[e.Ref]; !ok { if _, ok := t.droppedSeries[e.Ref]; !ok {
@ -821,8 +831,13 @@ outer:
continue continue
} }
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref] var lbls labels.Labels
if !ok { if t.interner.shouldIntern {
lbls = t.interner.intern(h.Ref, nil)
} else {
lbls = t.seriesLabels[h.Ref]
}
if len(lbls) == 0 {
t.dataDropped.incr(1) t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok { if _, ok := t.droppedSeries[h.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
@ -876,8 +891,13 @@ outer:
continue continue
} }
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref] var lbls labels.Labels
if !ok { if t.interner.shouldIntern {
lbls = t.interner.intern(h.Ref, nil)
} else {
lbls = t.seriesLabels[h.Ref]
}
if len(lbls) == 0 {
t.dataDropped.incr(1) t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok { if _, ok := t.droppedSeries[h.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
@ -960,9 +980,6 @@ func (t *QueueManager) Stop() {
// On shutdown, release the strings in the labels from the intern pool. // On shutdown, release the strings in the labels from the intern pool.
t.seriesMtx.Lock() t.seriesMtx.Lock()
for _, labels := range t.seriesLabels {
t.releaseLabels(labels)
}
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
t.metrics.unregister() t.metrics.unregister()
} }
@ -985,16 +1002,12 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
continue continue
} }
lbls := t.builder.Labels() lbls := t.builder.Labels()
t.internLabels(lbls) if t.interner.shouldIntern {
t.interner.intern(s.Ref, lbls)
// We should not ever be replacing a series labels in the map, but just } else {
// in case we do we need to ensure we do not leak the replaced interned
// strings.
if orig, ok := t.seriesLabels[s.Ref]; ok {
t.releaseLabels(orig)
}
t.seriesLabels[s.Ref] = lbls t.seriesLabels[s.Ref] = lbls
} }
}
} }
// StoreMetadata keeps track of known series' metadata for lookups when sending samples to remote. // StoreMetadata keeps track of known series' metadata for lookups when sending samples to remote.
@ -1037,8 +1050,9 @@ func (t *QueueManager) SeriesReset(index int) {
for k, v := range t.seriesSegmentIndexes { for k, v := range t.seriesSegmentIndexes {
if v < index { if v < index {
delete(t.seriesSegmentIndexes, k) delete(t.seriesSegmentIndexes, k)
t.releaseLabels(t.seriesLabels[k]) t.interner.release(k)
delete(t.seriesLabels, k) //t.releaseLabels(t.seriesLabels[k])
//delete(t.seriesLabels, k)
delete(t.seriesMetadata, k) delete(t.seriesMetadata, k)
delete(t.droppedSeries, k) delete(t.droppedSeries, k)
} }
@ -1059,14 +1073,6 @@ func (t *QueueManager) client() WriteClient {
return t.storeClient return t.storeClient
} }
func (t *QueueManager) internLabels(lbls labels.Labels) {
lbls.InternStrings(t.interner.intern)
}
func (t *QueueManager) releaseLabels(ls labels.Labels) {
ls.ReleaseStrings(t.interner.release)
}
// processExternalLabels merges externalLabels into b. If b contains // processExternalLabels merges externalLabels into b. If b contains
// a label in externalLabels, the value in b wins. // a label in externalLabels, the value in b wins.
func processExternalLabels(b *labels.Builder, externalLabels []labels.Label) { func processExternalLabels(b *labels.Builder, externalLabels []labels.Label) {

View file

@ -294,7 +294,6 @@ func TestSampleDelivery(t *testing.T) {
qm.AppendHistograms(histograms[:len(histograms)/2]) qm.AppendHistograms(histograms[:len(histograms)/2])
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2]) qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
c.waitForExpectedData(t, 30*time.Second) c.waitForExpectedData(t, 30*time.Second)
// Send second half of data. // Send second half of data.
c.expectSamples(samples[len(samples)/2:], series) c.expectSamples(samples[len(samples)/2:], series)
c.expectExemplars(exemplars[len(exemplars)/2:], series) c.expectExemplars(exemplars[len(exemplars)/2:], series)
@ -319,7 +318,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager { func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager {
dir := t.TempDir() dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(false), newHighestTimestampMetric(), nil, false, false, protoMsg)
return m return m
} }
@ -772,7 +771,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
} }
) )
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.StoreSeries(fakeSeries, 0) m.StoreSeries(fakeSeries, 0)
// Attempt to samples while the manager is running. We immediately stop the // Attempt to samples while the manager is running. We immediately stop the
@ -1385,7 +1384,7 @@ func BenchmarkStoreSeries(b *testing.B) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.externalLabels = tc.externalLabels m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs m.relabelConfigs = tc.relabelConfigs
@ -1437,13 +1436,13 @@ func BenchmarkStoreSeries_TwoEndpoints(b *testing.B) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.externalLabels = tc.externalLabels m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs m.relabelConfigs = tc.relabelConfigs
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m2 := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) m2 := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m2.externalLabels = tc.externalLabels m2.externalLabels = tc.externalLabels
m2.relabelConfigs = tc.relabelConfigs m2.relabelConfigs = tc.relabelConfigs
@ -1483,7 +1482,7 @@ func BenchmarkStartup(b *testing.B) {
// todo: test with new proto type(s) // todo: test with new proto type(s)
m := NewQueueManager(metrics, nil, nil, logger, dir, m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration), newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2] m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run() err := m.watcher.Run()

View file

@ -91,7 +91,7 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f
flushDeadline: flushDeadline, flushDeadline: flushDeadline,
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
dir: dir, dir: dir,
interner: newPool(), interner: newPool(false),
scraper: sm, scraper: sm,
quit: make(chan struct{}), quit: make(chan struct{}),
metadataInWAL: metadataInWal, metadataInWAL: metadataInWal,
@ -227,6 +227,10 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
q.Stop() q.Stop()
} }
if len(newHashes) > 1 {
rws.interner.shouldIntern = true
}
for _, hash := range newHashes { for _, hash := range newHashes {
newQueues[hash].Start() newQueues[hash].Start()
} }