allow downstream importers of remote write to turn of the interning code

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2024-08-30 16:13:07 -07:00
parent 9f3d25f959
commit 6d72d1ad79
3 changed files with 41 additions and 2 deletions

View file

@ -441,6 +441,7 @@ type QueueManager struct {
dataIn, dataDropped, dataOut, dataOutDuration *ewmaRate
metrics *queueManagerMetrics
interner *pool
highestRecvTimestamp *maxTimestamp
}
@ -462,6 +463,7 @@ func NewQueueManager(
relabelConfigs []*relabel.Config,
client WriteClient,
flushDeadline time.Duration,
interner *pool,
highestRecvTimestamp *maxTimestamp,
sm ReadyScrapeManager,
enableExemplarRemoteWrite bool,
@ -506,6 +508,7 @@ func NewQueueManager(
dataOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
metrics: metrics,
interner: interner,
highestRecvTimestamp: highestRecvTimestamp,
protoMsg: protoMsg,
@ -955,6 +958,14 @@ func (t *QueueManager) Stop() {
t.metadataWatcher.Stop()
}
// On shutdown, release the strings in the labels from the intern pool.
if t.interner != nil {
t.seriesMtx.Lock()
for _, labels := range t.seriesLabels {
t.releaseLabels(labels)
}
t.seriesMtx.Unlock()
}
t.metrics.unregister()
}
@ -976,6 +987,17 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
continue
}
lbls := t.builder.Labels()
if t.interner != nil {
t.internLabels(lbls)
// We should not ever be replacing a series labels in the map, but just
// in case we do we need to ensure we do not leak the replaced interned
// strings.
if orig, ok := t.seriesLabels[s.Ref]; ok {
t.releaseLabels(orig)
}
}
t.seriesLabels[s.Ref] = lbls
}
}
@ -1023,6 +1045,10 @@ func (t *QueueManager) SeriesReset(index int) {
delete(t.seriesLabels, k)
delete(t.seriesMetadata, k)
delete(t.droppedSeries, k)
if t.interner != nil {
t.releaseLabels(t.seriesLabels[k])
}
}
}
}
@ -1041,6 +1067,14 @@ func (t *QueueManager) client() WriteClient {
return t.storeClient
}
func (t *QueueManager) internLabels(lbls labels.Labels) {
lbls.InternStrings(t.interner.intern)
}
func (t *QueueManager) releaseLabels(ls labels.Labels) {
ls.ReleaseStrings(t.interner.release)
}
// processExternalLabels merges externalLabels into b. If b contains
// a label in externalLabels, the value in b wins.
func processExternalLabels(b *labels.Builder, externalLabels []labels.Label) {

View file

@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
logger: logger,
localStartTimeCallback: stCallback,
}
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, metadataInWAL)
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, metadataInWAL, true)
return s
}

View file

@ -69,6 +69,7 @@ type WriteStorage struct {
metadataInWAL bool
samplesIn *ewmaRate
flushDeadline time.Duration
interner *pool
scraper ReadyScrapeManager
quit chan struct{}
@ -77,7 +78,7 @@ type WriteStorage struct {
}
// NewWriteStorage creates and runs a WriteStorage.
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal bool) *WriteStorage {
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal, shouldIntern bool) *WriteStorage {
if logger == nil {
logger = log.NewNopLogger()
}
@ -102,6 +103,9 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f
}),
},
}
if shouldIntern {
rws.interner = newPool()
}
if reg != nil {
reg.MustRegister(rws.highestTimestamp)
}
@ -208,6 +212,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rwConf.WriteRelabelConfigs,
c,
rws.flushDeadline,
rws.interner,
rws.highestTimestamp,
rws.scraper,
rwConf.SendExemplars,