Refactor vars to avoid test failures in storage/remote with -count > 1 (#7934)

* Refactor global vars to avoid failure with run test more than once.

Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>

* Register highestRecvTimestamp metric.

Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>

* Use local interner vars.

Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>

* Declare interner in write storage.

Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
This commit is contained in:
Harkishen Singh 2020-09-25 00:14:18 +05:30 committed by GitHub
parent 3364875ae5
commit 072b9649a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 73 additions and 39 deletions

View file

@ -27,7 +27,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
) )
var interner = newPool()
var noReferenceReleases = promauto.NewCounter(prometheus.CounterOpts{ var noReferenceReleases = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,

View file

@ -27,6 +27,7 @@ import (
) )
func TestIntern(t *testing.T) { func TestIntern(t *testing.T) {
interner := newPool()
testString := "TestIntern" testString := "TestIntern"
interner.intern(testString) interner.intern(testString)
interned, ok := interner.pool[testString] interned, ok := interner.pool[testString]
@ -36,6 +37,7 @@ func TestIntern(t *testing.T) {
} }
func TestIntern_MultiRef(t *testing.T) { func TestIntern_MultiRef(t *testing.T) {
interner := newPool()
testString := "TestIntern_MultiRef" testString := "TestIntern_MultiRef"
interner.intern(testString) interner.intern(testString)
@ -52,6 +54,7 @@ func TestIntern_MultiRef(t *testing.T) {
} }
func TestIntern_DeleteRef(t *testing.T) { func TestIntern_DeleteRef(t *testing.T) {
interner := newPool()
testString := "TestIntern_DeleteRef" testString := "TestIntern_DeleteRef"
interner.intern(testString) interner.intern(testString)
@ -66,6 +69,7 @@ func TestIntern_DeleteRef(t *testing.T) {
} }
func TestIntern_MultiRef_Concurrent(t *testing.T) { func TestIntern_MultiRef_Concurrent(t *testing.T) {
interner := newPool()
testString := "TestIntern_MultiRef_Concurrent" testString := "TestIntern_MultiRef_Concurrent"
interner.intern(testString) interner.intern(testString)

View file

@ -260,7 +260,9 @@ type QueueManager struct {
samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate
metrics *queueManagerMetrics metrics *queueManagerMetrics
interner *pool
highestRecvTimestamp *maxGauge
} }
// NewQueueManager builds a new QueueManager. // NewQueueManager builds a new QueueManager.
@ -276,6 +278,8 @@ func NewQueueManager(
relabelConfigs []*relabel.Config, relabelConfigs []*relabel.Config,
client WriteClient, client WriteClient,
flushDeadline time.Duration, flushDeadline time.Duration,
interner *pool,
highestRecvTimestamp *maxGauge,
) *QueueManager { ) *QueueManager {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
@ -303,7 +307,9 @@ func NewQueueManager(
samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
metrics: metrics, metrics: metrics,
interner: interner,
highestRecvTimestamp: highestRecvTimestamp,
} }
t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir) t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir)
@ -392,7 +398,7 @@ func (t *QueueManager) Stop() {
// On shutdown, release the strings in the labels from the intern pool. // On shutdown, release the strings in the labels from the intern pool.
t.seriesMtx.Lock() t.seriesMtx.Lock()
for _, labels := range t.seriesLabels { for _, labels := range t.seriesLabels {
releaseLabels(labels) t.releaseLabels(labels)
} }
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
t.metrics.unregister() t.metrics.unregister()
@ -410,13 +416,13 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
continue continue
} }
t.seriesSegmentIndexes[s.Ref] = index t.seriesSegmentIndexes[s.Ref] = index
internLabels(lbls) t.internLabels(lbls)
// We should not ever be replacing a series labels in the map, but just // We should not ever be replacing a series labels in the map, but just
// in case we do we need to ensure we do not leak the replaced interned // in case we do we need to ensure we do not leak the replaced interned
// strings. // strings.
if orig, ok := t.seriesLabels[s.Ref]; ok { if orig, ok := t.seriesLabels[s.Ref]; ok {
releaseLabels(orig) t.releaseLabels(orig)
} }
t.seriesLabels[s.Ref] = lbls t.seriesLabels[s.Ref] = lbls
} }
@ -433,7 +439,7 @@ func (t *QueueManager) SeriesReset(index int) {
for k, v := range t.seriesSegmentIndexes { for k, v := range t.seriesSegmentIndexes {
if v < index { if v < index {
delete(t.seriesSegmentIndexes, k) delete(t.seriesSegmentIndexes, k)
releaseLabels(t.seriesLabels[k]) t.releaseLabels(t.seriesLabels[k])
delete(t.seriesLabels, k) delete(t.seriesLabels, k)
delete(t.droppedSeries, k) delete(t.droppedSeries, k)
} }
@ -454,17 +460,17 @@ func (t *QueueManager) client() WriteClient {
return t.storeClient return t.storeClient
} }
func internLabels(lbls labels.Labels) { func (t *QueueManager) internLabels(lbls labels.Labels) {
for i, l := range lbls { for i, l := range lbls {
lbls[i].Name = interner.intern(l.Name) lbls[i].Name = t.interner.intern(l.Name)
lbls[i].Value = interner.intern(l.Value) lbls[i].Value = t.interner.intern(l.Value)
} }
} }
func releaseLabels(ls labels.Labels) { func (t *QueueManager) releaseLabels(ls labels.Labels) {
for _, l := range ls { for _, l := range ls {
interner.release(l.Name) t.interner.release(l.Name)
interner.release(l.Value) t.interner.release(l.Value)
} }
} }
@ -564,7 +570,7 @@ func (t *QueueManager) calculateDesiredShards() int {
samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second) samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second)
samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate
highestSent = t.metrics.highestSentTimestamp.Get() highestSent = t.metrics.highestSentTimestamp.Get()
highestRecv = highestTimestamp.Get() highestRecv = t.highestRecvTimestamp.Get()
delay = highestRecv - highestSent delay = highestRecv - highestSent
samplesPending = delay * samplesInRate * samplesKeptRatio samplesPending = delay * samplesInRate * samplesKeptRatio
) )

View file

@ -47,6 +47,17 @@ import (
const defaultFlushDeadline = 1 * time.Minute const defaultFlushDeadline = 1 * time.Minute
func newHighestTimestampMetric() *maxGauge {
return &maxGauge{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "highest_timestamp_in_seconds",
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.",
}),
}
}
func TestSampleDelivery(t *testing.T) { func TestSampleDelivery(t *testing.T) {
// Let's create an even number of send batches so we don't run into the // Let's create an even number of send batches so we don't run into the
// batch timeout case. // batch timeout case.
@ -117,7 +128,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
}() }()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -160,7 +171,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
}() }()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), nil)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -182,7 +193,7 @@ func TestShutdown(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline, newPool(), newHighestTimestampMetric())
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n) samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
@ -222,7 +233,7 @@ func TestSeriesReset(t *testing.T) {
}() }()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline, newPool(), newHighestTimestampMetric())
for i := 0; i < numSegments; i++ { for i := 0; i < numSegments; i++ {
series := []record.RefSeries{} series := []record.RefSeries{}
for j := 0; j < numSeries; j++ { for j := 0; j < numSeries; j++ {
@ -254,7 +265,7 @@ func TestReshard(t *testing.T) {
}() }()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -287,7 +298,7 @@ func TestReshardRaceWithStop(t *testing.T) {
go func() { go func() {
for { for {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m.Start() m.Start()
h.Unlock() h.Unlock()
h.Lock() h.Lock()
@ -305,7 +316,7 @@ func TestReshardRaceWithStop(t *testing.T) {
func TestReleaseNoninternedString(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient() c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m.Start() m.Start()
for i := 1; i < 1000; i++ { for i := 1; i < 1000; i++ {
@ -353,7 +364,7 @@ func TestShouldReshard(t *testing.T) {
for _, c := range cases { for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient() client := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m.numShards = c.startingShards m.numShards = c.startingShards
m.samplesIn.incr(c.samplesIn) m.samplesIn.incr(c.samplesIn)
m.samplesOut.incr(c.samplesOut) m.samplesOut.incr(c.samplesOut)
@ -560,7 +571,7 @@ func BenchmarkSampleDelivery(b *testing.B) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
// These should be received by the client. // These should be received by the client.
@ -604,7 +615,7 @@ func BenchmarkStartup(b *testing.B) {
c := NewTestBlockedWriteClient() c := NewTestBlockedWriteClient()
m := NewQueueManager(metrics, nil, nil, logger, dir, m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration), newEWMARate(ewmaWeight, shardUpdateDuration),
config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) config.DefaultQueueConfig, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric())
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2] m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run() err := m.watcher.Run()
@ -655,7 +666,7 @@ func TestCalculateDesiredShards(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
// Need to start the queue manager so the proper metrics are initialized. // Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual // However we can stop it right away since we don't need to do any actual
@ -675,7 +686,7 @@ func TestCalculateDesiredShards(t *testing.T) {
samplesIn.incr(s) samplesIn.incr(s)
samplesIn.tick() samplesIn.tick()
highestTimestamp.Set(float64(startedAt.Add(ts).Unix())) m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix()))
} }
// helper function for sending samples. // helper function for sending samples.

View file

@ -23,6 +23,7 @@ import (
"testing" "testing"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
@ -100,6 +101,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
RemoteReadConfigs: tc.cfgs, RemoteReadConfigs: tc.cfgs,
} }
err := s.ApplyConfig(conf) err := s.ApplyConfig(conf)
prometheus.Unregister(s.rws.highestTimestamp)
gotError := err != nil gotError := err != nil
testutil.Equals(t, tc.err, gotError) testutil.Equals(t, tc.err, gotError)
testutil.Ok(t, s.Close()) testutil.Ok(t, s.Close())

View file

@ -35,14 +35,6 @@ var (
Name: "samples_in_total", Name: "samples_in_total",
Help: "Samples in to remote storage, compare to samples out for queue managers.", Help: "Samples in to remote storage, compare to samples out for queue managers.",
}) })
highestTimestamp = maxGauge{
Gauge: promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "highest_timestamp_in_seconds",
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.",
}),
}
) )
// WriteStorage represents all the remote write storage. // WriteStorage represents all the remote write storage.
@ -58,6 +50,10 @@ type WriteStorage struct {
queues map[string]*QueueManager queues map[string]*QueueManager
samplesIn *ewmaRate samplesIn *ewmaRate
flushDeadline time.Duration flushDeadline time.Duration
interner *pool
// For timestampTracker.
highestTimestamp *maxGauge
} }
// NewWriteStorage creates and runs a WriteStorage. // NewWriteStorage creates and runs a WriteStorage.
@ -74,6 +70,18 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string
flushDeadline: flushDeadline, flushDeadline: flushDeadline,
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
walDir: walDir, walDir: walDir,
interner: newPool(),
highestTimestamp: &maxGauge{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "highest_timestamp_in_seconds",
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.",
}),
},
}
if reg != nil {
reg.MustRegister(rws.highestTimestamp)
} }
go rws.run() go rws.run()
return rws return rws
@ -150,6 +158,8 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rwConf.WriteRelabelConfigs, rwConf.WriteRelabelConfigs,
c, c,
rws.flushDeadline, rws.flushDeadline,
rws.interner,
rws.highestTimestamp,
) )
// Keep track of which queues are new so we know which to start. // Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash) newHashes = append(newHashes, hash)
@ -173,7 +183,8 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
// Appender implements storage.Storage. // Appender implements storage.Storage.
func (rws *WriteStorage) Appender(_ context.Context) storage.Appender { func (rws *WriteStorage) Appender(_ context.Context) storage.Appender {
return &timestampTracker{ return &timestampTracker{
writeStorage: rws, writeStorage: rws,
highestRecvTimestamp: rws.highestTimestamp,
} }
} }
@ -188,9 +199,10 @@ func (rws *WriteStorage) Close() error {
} }
type timestampTracker struct { type timestampTracker struct {
writeStorage *WriteStorage writeStorage *WriteStorage
samples int64 samples int64
highestTimestamp int64 highestTimestamp int64
highestRecvTimestamp *maxGauge
} }
// Add implements storage.Appender. // Add implements storage.Appender.
@ -213,7 +225,7 @@ func (t *timestampTracker) Commit() error {
t.writeStorage.samplesIn.incr(t.samples) t.writeStorage.samplesIn.incr(t.samples)
samplesIn.Add(float64(t.samples)) samplesIn.Add(float64(t.samples))
highestTimestamp.Set(float64(t.highestTimestamp / 1000)) t.highestRecvTimestamp.Set(float64(t.highestTimestamp / 1000))
return nil return nil
} }