This commit is contained in:
Callum Styan 2024-09-19 19:54:40 +00:00 committed by GitHub
commit ae832ceb1e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 89 additions and 23 deletions

View file

@ -959,11 +959,13 @@ func (t *QueueManager) Stop() {
}
// On shutdown, release the strings in the labels from the intern pool.
t.seriesMtx.Lock()
for _, labels := range t.seriesLabels {
t.releaseLabels(labels)
if t.interner != nil {
t.seriesMtx.Lock()
for _, labels := range t.seriesLabels {
t.releaseLabels(labels)
}
t.seriesMtx.Unlock()
}
t.seriesMtx.Unlock()
t.metrics.unregister()
}
@ -985,14 +987,17 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
continue
}
lbls := t.builder.Labels()
t.internLabels(lbls)
if t.interner != nil {
t.internLabels(lbls)
// We should not ever be replacing a series labels in the map, but just
// in case we do we need to ensure we do not leak the replaced interned
// strings.
if orig, ok := t.seriesLabels[s.Ref]; ok {
t.releaseLabels(orig)
// We should not ever be replacing a series labels in the map, but just
// in case we do we need to ensure we do not leak the replaced interned
// strings.
if orig, ok := t.seriesLabels[s.Ref]; ok {
t.releaseLabels(orig)
}
}
t.seriesLabels[s.Ref] = lbls
}
}
@ -1037,10 +1042,13 @@ func (t *QueueManager) SeriesReset(index int) {
for k, v := range t.seriesSegmentIndexes {
if v < index {
delete(t.seriesSegmentIndexes, k)
t.releaseLabels(t.seriesLabels[k])
delete(t.seriesLabels, k)
delete(t.seriesMetadata, k)
delete(t.droppedSeries, k)
if t.interner != nil {
t.releaseLabels(t.seriesLabels[k])
}
}
}
}

View file

@ -294,7 +294,6 @@ func TestSampleDelivery(t *testing.T) {
qm.AppendHistograms(histograms[:len(histograms)/2])
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
c.waitForExpectedData(t, 30*time.Second)
// Send second half of data.
c.expectSamples(samples[len(samples)/2:], series)
c.expectExemplars(exemplars[len(exemplars)/2:], series)
@ -1395,6 +1394,64 @@ func BenchmarkStoreSeries(b *testing.B) {
}
}
// Check how long it takes to add N series, including external labels processing, simulate having two remote writes configured.
func BenchmarkStoreSeries_TwoEndpoints(b *testing.B) {
externalLabels := []labels.Label{
{Name: "cluster", Value: "mycluster"},
{Name: "replica", Value: "1"},
}
relabelConfigs := []*relabel.Config{{
SourceLabels: model.LabelNames{"namespace"},
Separator: ";",
Regex: relabel.MustNewRegexp("kube.*"),
TargetLabel: "job",
Replacement: "$1",
Action: relabel.Replace,
}}
testCases := []struct {
name string
externalLabels []labels.Label
ts []prompb.TimeSeries
relabelConfigs []*relabel.Config
}{
{name: "plain"},
{name: "externalLabels", externalLabels: externalLabels},
{name: "relabel", relabelConfigs: relabelConfigs},
{
name: "externalLabels+relabel",
externalLabels: externalLabels,
relabelConfigs: relabelConfigs,
},
}
// numSeries chosen to be big enough that StoreSeries dominates creating a new queue manager.
const numSeries = 1000
_, series := createTimeseries(0, numSeries, extraLabels...)
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
c := NewTestWriteClient(config.RemoteWriteProtoMsgV1)
dir := b.TempDir()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs
m.StoreSeries(series, 0)
m2 := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m2.externalLabels = tc.externalLabels
m2.relabelConfigs = tc.relabelConfigs
m2.StoreSeries(series, 0)
}
})
}
}
func BenchmarkStartup(b *testing.B) {
dir := os.Getenv("WALDIR")
if dir == "" {

View file

@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
logger: logger,
localStartTimeCallback: stCallback,
}
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, metadataInWAL)
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, metadataInWAL, true)
return s
}

View file

@ -78,7 +78,7 @@ type WriteStorage struct {
}
// NewWriteStorage creates and runs a WriteStorage.
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal bool) *WriteStorage {
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal, shouldIntern bool) *WriteStorage {
if logger == nil {
logger = log.NewNopLogger()
}
@ -91,7 +91,6 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f
flushDeadline: flushDeadline,
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
dir: dir,
interner: newPool(),
scraper: sm,
quit: make(chan struct{}),
metadataInWAL: metadataInWal,
@ -104,6 +103,9 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f
}),
},
}
if shouldIntern {
rws.interner = newPool()
}
if reg != nil {
reg.MustRegister(rws.highestTimestamp)
}
@ -226,7 +228,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
for _, q := range rws.queues {
q.Stop()
}
for _, hash := range newHashes {
newQueues[hash].Start()
}

View file

@ -105,7 +105,7 @@ func TestWriteStorageApplyConfig_NoDuplicateWriteConfigs(t *testing.T) {
},
} {
t.Run("", func(t *testing.T) {
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, true)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: tc.cfgs,
@ -131,7 +131,7 @@ func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) {
hash, err := toHash(cfg)
require.NoError(t, err)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, true)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
@ -153,7 +153,7 @@ func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) {
func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false, true)
c1 := &config.RemoteWriteConfig{
Name: "named",
URL: &common_config.URL{
@ -194,7 +194,7 @@ func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, true)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -210,7 +210,7 @@ func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false, true)
externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{
@ -238,7 +238,7 @@ func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
func TestWriteStorageApplyConfig_Idempotent(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, true)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -262,7 +262,7 @@ func TestWriteStorageApplyConfig_Idempotent(t *testing.T) {
func TestWriteStorageApplyConfig_PartialUpdate(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, true)
c0 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(10 * time.Second),