remove option of using the intern pool

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2024-08-28 12:43:57 -07:00
parent 4fa20fc0fe
commit 455377ce53
5 changed files with 14 additions and 95 deletions

View file

@ -1,46 +0,0 @@
goos: linux
goarch: amd64
pkg: github.com/prometheus/prometheus/storage/remote
cpu: AMD Ryzen 9 5950X 16-Core Processor
BenchmarkStoreSeries/plain-32 4168 256049 ns/op 228660 B/op 1596 allocs/op
BenchmarkStoreSeries/plain-32 4500 254302 ns/op 228643 B/op 1596 allocs/op
BenchmarkStoreSeries/plain-32 4552 255622 ns/op 228618 B/op 1596 allocs/op
BenchmarkStoreSeries/plain-32 4460 254980 ns/op 228615 B/op 1596 allocs/op
BenchmarkStoreSeries/plain-32 4506 259763 ns/op 228626 B/op 1596 allocs/op
BenchmarkStoreSeries/externalLabels-32 1262 895862 ns/op 869235 B/op 2596 allocs/op
BenchmarkStoreSeries/externalLabels-32 1324 897306 ns/op 869256 B/op 2597 allocs/op
BenchmarkStoreSeries/externalLabels-32 1297 894041 ns/op 869225 B/op 2597 allocs/op
BenchmarkStoreSeries/externalLabels-32 1315 907147 ns/op 869157 B/op 2596 allocs/op
BenchmarkStoreSeries/externalLabels-32 1207 895866 ns/op 869218 B/op 2597 allocs/op
BenchmarkStoreSeries/relabel-32 1635 784739 ns/op 741725 B/op 5597 allocs/op
BenchmarkStoreSeries/relabel-32 1532 727039 ns/op 741748 B/op 5598 allocs/op
BenchmarkStoreSeries/relabel-32 1604 729110 ns/op 741750 B/op 5597 allocs/op
BenchmarkStoreSeries/relabel-32 1614 729609 ns/op 741696 B/op 5597 allocs/op
BenchmarkStoreSeries/relabel-32 1626 727394 ns/op 741669 B/op 5597 allocs/op
BenchmarkStoreSeries/externalLabels+relabel-32 987 1208797 ns/op 837849 B/op 5597 allocs/op
BenchmarkStoreSeries/externalLabels+relabel-32 985 1197194 ns/op 837867 B/op 5597 allocs/op
BenchmarkStoreSeries/externalLabels+relabel-32 992 1195505 ns/op 837853 B/op 5597 allocs/op
BenchmarkStoreSeries/externalLabels+relabel-32 999 1201181 ns/op 837831 B/op 5597 allocs/op
BenchmarkStoreSeries/externalLabels+relabel-32 1000 1195945 ns/op 837889 B/op 5597 allocs/op
BenchmarkStoreSeries_TwoEndpoints/plain-32 2469 475513 ns/op 436320 B/op 2731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/plain-32 2523 478113 ns/op 436257 B/op 2731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/plain-32 2458 475820 ns/op 436279 B/op 2731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/plain-32 2492 472694 ns/op 436243 B/op 2731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/plain-32 2437 476160 ns/op 436259 B/op 2731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 679 1747548 ns/op 1717397 B/op 4731 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 676 1754213 ns/op 1717468 B/op 4732 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 674 1739374 ns/op 1717653 B/op 4732 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 679 1738507 ns/op 1717426 B/op 4732 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 694 1739284 ns/op 1717384 B/op 4732 allocs/op
BenchmarkStoreSeries_TwoEndpoints/relabel-32 850 1399855 ns/op 1462383 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/relabel-32 834 1405426 ns/op 1462345 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/relabel-32 842 1424322 ns/op 1462449 B/op 10734 allocs/op
BenchmarkStoreSeries_TwoEndpoints/relabel-32 841 1404540 ns/op 1462356 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/relabel-32 843 1414279 ns/op 1462380 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 508 2351248 ns/op 1654492 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 507 2368400 ns/op 1654660 B/op 10734 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 505 2347374 ns/op 1654649 B/op 10734 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 507 2349636 ns/op 1654516 B/op 10733 allocs/op
BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 504 2349570 ns/op 1654583 B/op 10733 allocs/op
PASS
ok github.com/prometheus/prometheus/storage/remote 53.470s

View file

@ -59,6 +59,7 @@ func TestIntern_MultiRef(t *testing.T) {
interner.intern(ref, lset)
interned, ok = interner.pool[ref]
require.True(t, ok)
require.NotNil(t, interned)
require.Equal(t, int64(2), interned.refs.Load(), fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs.Load()))
}
@ -71,6 +72,7 @@ func TestIntern_DeleteRef(t *testing.T) {
interned, ok := interner.pool[ref]
require.NotNil(t, interned)
require.True(t, ok)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))
interner.release(ref)
@ -87,6 +89,7 @@ func TestIntern_MultiRef_Concurrent(t *testing.T) {
interned, ok := interner.pool[ref]
require.NotNil(t, interned)
require.True(t, ok)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))
go interner.release(ref)

View file

@ -441,7 +441,6 @@ type QueueManager struct {
dataIn, dataDropped, dataOut, dataOutDuration *ewmaRate
metrics *queueManagerMetrics
interner *pool
highestRecvTimestamp *maxTimestamp
}
@ -463,7 +462,6 @@ func NewQueueManager(
relabelConfigs []*relabel.Config,
client WriteClient,
flushDeadline time.Duration,
interner *pool,
highestRecvTimestamp *maxTimestamp,
sm ReadyScrapeManager,
enableExemplarRemoteWrite bool,
@ -508,7 +506,6 @@ func NewQueueManager(
dataOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
metrics: metrics,
interner: interner,
highestRecvTimestamp: highestRecvTimestamp,
protoMsg: protoMsg,
@ -702,12 +699,7 @@ outer:
continue
}
t.seriesMtx.Lock()
var lbls labels.Labels
if t.interner.shouldIntern {
lbls = t.interner.intern(s.Ref, labels.EmptyLabels())
} else {
lbls = t.seriesLabels[s.Ref]
}
lbls := t.seriesLabels[s.Ref]
if lbls.Len() == 0 {
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[s.Ref]; !ok {
@ -769,12 +761,7 @@ outer:
continue
}
t.seriesMtx.Lock()
var lbls labels.Labels
if t.interner.shouldIntern {
lbls = t.interner.intern(e.Ref, labels.EmptyLabels())
} else {
lbls = t.seriesLabels[e.Ref]
}
lbls := t.seriesLabels[e.Ref]
if lbls.Len() == 0 {
// Track dropped exemplars in the same EWMA for sharding calc.
t.dataDropped.incr(1)
@ -831,12 +818,7 @@ outer:
continue
}
t.seriesMtx.Lock()
var lbls labels.Labels
if t.interner.shouldIntern {
lbls = t.interner.intern(h.Ref, labels.EmptyLabels())
} else {
lbls = t.seriesLabels[h.Ref]
}
lbls := t.seriesLabels[h.Ref]
if lbls.Len() == 0 {
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok {
@ -891,12 +873,7 @@ outer:
continue
}
t.seriesMtx.Lock()
var lbls labels.Labels
if t.interner.shouldIntern {
lbls = t.interner.intern(h.Ref, labels.EmptyLabels())
} else {
lbls = t.seriesLabels[h.Ref]
}
lbls := t.seriesLabels[h.Ref]
if lbls.Len() == 0 {
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok {
@ -1002,11 +979,7 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
continue
}
lbls := t.builder.Labels()
if t.interner.shouldIntern {
t.interner.intern(s.Ref, lbls)
} else {
t.seriesLabels[s.Ref] = lbls
}
t.seriesLabels[s.Ref] = lbls
}
}
@ -1050,10 +1023,7 @@ func (t *QueueManager) SeriesReset(index int) {
for k, v := range t.seriesSegmentIndexes {
if v < index {
delete(t.seriesSegmentIndexes, k)
t.interner.release(k)
delete(t.seriesLabels, k)
//t.releaseLabels(t.seriesLabels[k])
//delete(t.seriesLabels, k)
delete(t.seriesMetadata, k)
delete(t.droppedSeries, k)
}

View file

@ -318,7 +318,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(false), newHighestTimestampMetric(), nil, false, false, protoMsg)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newHighestTimestampMetric(), nil, false, false, protoMsg)
return m
}
@ -771,7 +771,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
}
)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.StoreSeries(fakeSeries, 0)
// Attempt to samples while the manager is running. We immediately stop the
@ -1384,7 +1384,7 @@ func BenchmarkStoreSeries(b *testing.B) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs
@ -1436,13 +1436,13 @@ func BenchmarkStoreSeries_TwoEndpoints(b *testing.B) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs
m.StoreSeries(series, 0)
m2 := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m2 := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m2.externalLabels = tc.externalLabels
m2.relabelConfigs = tc.relabelConfigs
@ -1482,7 +1482,7 @@ func BenchmarkStartup(b *testing.B) {
// todo: test with new proto type(s)
m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run()

View file

@ -69,7 +69,6 @@ type WriteStorage struct {
metadataInWAL bool
samplesIn *ewmaRate
flushDeadline time.Duration
interner *pool
scraper ReadyScrapeManager
quit chan struct{}
@ -91,7 +90,6 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f
flushDeadline: flushDeadline,
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
dir: dir,
interner: newPool(false),
scraper: sm,
quit: make(chan struct{}),
metadataInWAL: metadataInWal,
@ -210,7 +208,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rwConf.WriteRelabelConfigs,
c,
rws.flushDeadline,
rws.interner,
rws.highestTimestamp,
rws.scraper,
rwConf.SendExemplars,
@ -226,11 +223,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
for _, q := range rws.queues {
q.Stop()
}
if len(newHashes) > 1 {
rws.interner.shouldIntern = true
}
for _, hash := range newHashes {
newQueues[hash].Start()
}