mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
storage/remote tests: refactor: extract function newTestQueueManager
To reduce repetition. Signed-off-by: Bryan Boreham <bjboreham@gmail.com> Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
parent
b2cc998418
commit
14f71bd096
|
@ -318,6 +318,21 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, protoMsg config.RemoteWriteProtoMsg) (*TestWriteClient, *QueueManager) {
|
||||||
|
c := NewTestWriteClient(protoMsg)
|
||||||
|
cfg := config.DefaultQueueConfig
|
||||||
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager {
|
||||||
|
dir := t.TempDir()
|
||||||
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
||||||
|
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
func testDefaultQueueConfig() config.QueueConfig {
|
func testDefaultQueueConfig() config.QueueConfig {
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := config.DefaultQueueConfig
|
||||||
// For faster unit tests we don't wait default 5 seconds.
|
// For faster unit tests we don't wait default 5 seconds.
|
||||||
|
@ -326,15 +341,7 @@ func testDefaultQueueConfig() config.QueueConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMetadataDelivery(t *testing.T) {
|
func TestMetadataDelivery(t *testing.T) {
|
||||||
c := NewTestWriteClient(config.RemoteWriteProtoMsgV1)
|
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
|
||||||
|
|
||||||
dir := t.TempDir()
|
|
||||||
|
|
||||||
cfg := testDefaultQueueConfig()
|
|
||||||
mcfg := config.DefaultMetadataConfig
|
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
|
||||||
|
@ -355,7 +362,7 @@ func TestMetadataDelivery(t *testing.T) {
|
||||||
require.Len(t, c.receivedMetadata, numMetadata)
|
require.Len(t, c.receivedMetadata, numMetadata)
|
||||||
// One more write than the rounded qoutient should be performed in order to get samples that didn't
|
// One more write than the rounded qoutient should be performed in order to get samples that didn't
|
||||||
// fit into MaxSamplesPerSend.
|
// fit into MaxSamplesPerSend.
|
||||||
require.Equal(t, numMetadata/mcfg.MaxSamplesPerSend+1, c.writesReceived)
|
require.Equal(t, numMetadata/config.DefaultMetadataConfig.MaxSamplesPerSend+1, c.writesReceived)
|
||||||
// Make sure the last samples were sent.
|
// Make sure the last samples were sent.
|
||||||
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric)
|
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric)
|
||||||
}
|
}
|
||||||
|
@ -407,16 +414,12 @@ func TestSampleDeliveryTimeout(t *testing.T) {
|
||||||
// Let's send one less sample than batch size, and wait the timeout duration
|
// Let's send one less sample than batch size, and wait the timeout duration
|
||||||
n := 9
|
n := 9
|
||||||
samples, series := createTimeseries(n, n)
|
samples, series := createTimeseries(n, n)
|
||||||
c := NewTestWriteClient(protoMsg)
|
|
||||||
|
|
||||||
cfg := testDefaultQueueConfig()
|
cfg := testDefaultQueueConfig()
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
cfg.MaxShards = 1
|
cfg.MaxShards = 1
|
||||||
|
|
||||||
dir := t.TempDir()
|
c := NewTestWriteClient(protoMsg)
|
||||||
|
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg)
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
@ -453,16 +456,8 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
c := NewTestWriteClient(protoMsg)
|
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
|
||||||
c.expectSamples(samples, series)
|
c.expectSamples(samples, series)
|
||||||
|
|
||||||
dir := t.TempDir()
|
|
||||||
|
|
||||||
cfg := testDefaultQueueConfig()
|
|
||||||
mcfg := config.DefaultMetadataConfig
|
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
@ -478,13 +473,10 @@ func TestShutdown(t *testing.T) {
|
||||||
deadline := 1 * time.Second
|
deadline := 1 * time.Second
|
||||||
c := NewTestBlockedWriteClient()
|
c := NewTestBlockedWriteClient()
|
||||||
|
|
||||||
dir := t.TempDir()
|
cfg := config.DefaultQueueConfig
|
||||||
|
|
||||||
cfg := testDefaultQueueConfig()
|
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
|
||||||
|
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1)
|
||||||
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
|
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
|
||||||
samples, series := createTimeseries(n, n)
|
samples, series := createTimeseries(n, n)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
@ -517,12 +509,9 @@ func TestSeriesReset(t *testing.T) {
|
||||||
numSegments := 4
|
numSegments := 4
|
||||||
numSeries := 25
|
numSeries := 25
|
||||||
|
|
||||||
dir := t.TempDir()
|
cfg := config.DefaultQueueConfig
|
||||||
|
|
||||||
cfg := testDefaultQueueConfig()
|
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1)
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
|
||||||
for i := 0; i < numSegments; i++ {
|
for i := 0; i < numSegments; i++ {
|
||||||
series := []record.RefSeries{}
|
series := []record.RefSeries{}
|
||||||
for j := 0; j < numSeries; j++ {
|
for j := 0; j < numSeries; j++ {
|
||||||
|
@ -543,17 +532,12 @@ func TestReshard(t *testing.T) {
|
||||||
nSamples := config.DefaultQueueConfig.Capacity * size
|
nSamples := config.DefaultQueueConfig.Capacity * size
|
||||||
samples, series := createTimeseries(nSamples, nSeries)
|
samples, series := createTimeseries(nSamples, nSeries)
|
||||||
|
|
||||||
c := NewTestWriteClient(protoMsg)
|
cfg := config.DefaultQueueConfig
|
||||||
c.expectSamples(samples, series)
|
|
||||||
|
|
||||||
cfg := testDefaultQueueConfig()
|
|
||||||
mcfg := config.DefaultMetadataConfig
|
|
||||||
cfg.MaxShards = 1
|
cfg.MaxShards = 1
|
||||||
|
|
||||||
dir := t.TempDir()
|
c := NewTestWriteClient(protoMsg)
|
||||||
|
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c, protoMsg)
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
c.expectSamples(samples, series)
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
@ -591,8 +575,8 @@ func TestReshardRaceWithStop(t *testing.T) {
|
||||||
exitCh := make(chan struct{})
|
exitCh := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg)
|
||||||
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
|
||||||
m.Start()
|
m.Start()
|
||||||
h.Unlock()
|
h.Unlock()
|
||||||
h.Lock()
|
h.Lock()
|
||||||
|
@ -630,8 +614,7 @@ func TestReshardPartialBatch(t *testing.T) {
|
||||||
flushDeadline := 10 * time.Millisecond
|
flushDeadline := 10 * time.Millisecond
|
||||||
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
@ -678,9 +661,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
|
||||||
batchSendDeadline := time.Millisecond
|
batchSendDeadline := time.Millisecond
|
||||||
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
|
||||||
|
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
@ -707,11 +688,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
|
||||||
func TestReleaseNoninternedString(t *testing.T) {
|
func TestReleaseNoninternedString(t *testing.T) {
|
||||||
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
||||||
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
||||||
cfg := testDefaultQueueConfig()
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
|
||||||
mcfg := config.DefaultMetadataConfig
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
|
||||||
c := NewTestWriteClient(protoMsg)
|
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
for i := 1; i < 1000; i++ {
|
for i := 1; i < 1000; i++ {
|
||||||
|
@ -754,13 +731,9 @@ func TestShouldReshard(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg := testDefaultQueueConfig()
|
|
||||||
mcfg := config.DefaultMetadataConfig
|
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
|
||||||
// todo: test with new proto type(s)
|
// todo: test with new proto type(s)
|
||||||
client := NewTestWriteClient(config.RemoteWriteProtoMsgV1)
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
|
||||||
m.numShards = c.startingShards
|
m.numShards = c.startingShards
|
||||||
m.dataIn.incr(c.samplesIn)
|
m.dataIn.incr(c.samplesIn)
|
||||||
m.dataOut.incr(c.samplesOut)
|
m.dataOut.incr(c.samplesOut)
|
||||||
|
@ -1315,11 +1288,8 @@ func BenchmarkSampleSend(b *testing.B) {
|
||||||
cfg.MinShards = 20
|
cfg.MinShards = 20
|
||||||
cfg.MaxShards = 20
|
cfg.MaxShards = 20
|
||||||
|
|
||||||
dir := b.TempDir()
|
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
|
||||||
// todo: test with new proto type(s)
|
// todo: test with new proto type(s)
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1)
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
// These should be received by the client.
|
// These should be received by the client.
|
||||||
|
@ -1496,16 +1466,10 @@ func TestProcessExternalLabels(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCalculateDesiredShards(t *testing.T) {
|
func TestCalculateDesiredShards(t *testing.T) {
|
||||||
c := NewNopWriteClient()
|
|
||||||
cfg := testDefaultQueueConfig()
|
|
||||||
mcfg := config.DefaultMetadataConfig
|
|
||||||
|
|
||||||
dir := t.TempDir()
|
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
|
||||||
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
|
|
||||||
// todo: test with new proto type(s)
|
// todo: test with new proto type(s)
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
cfg := config.DefaultQueueConfig
|
||||||
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
|
||||||
|
samplesIn := m.dataIn
|
||||||
|
|
||||||
// Need to start the queue manager so the proper metrics are initialized.
|
// Need to start the queue manager so the proper metrics are initialized.
|
||||||
// However we can stop it right away since we don't need to do any actual
|
// However we can stop it right away since we don't need to do any actual
|
||||||
|
@ -1574,16 +1538,9 @@ func TestCalculateDesiredShards(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCalculateDesiredShardsDetail(t *testing.T) {
|
func TestCalculateDesiredShardsDetail(t *testing.T) {
|
||||||
c := NewTestWriteClient(config.RemoteWriteProtoMsgV1)
|
|
||||||
cfg := config.DefaultQueueConfig
|
|
||||||
mcfg := config.DefaultMetadataConfig
|
|
||||||
|
|
||||||
dir := t.TempDir()
|
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
|
||||||
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
|
|
||||||
// todo: test with new proto type(s)
|
// todo: test with new proto type(s)
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
|
||||||
|
samplesIn := m.dataIn
|
||||||
|
|
||||||
for _, tc := range []struct {
|
for _, tc := range []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -1960,10 +1917,7 @@ func TestDropOldTimeSeries(t *testing.T) {
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
cfg.MaxShards = 1
|
cfg.MaxShards = 1
|
||||||
cfg.SampleAgeLimit = model.Duration(60 * time.Second)
|
cfg.SampleAgeLimit = model.Duration(60 * time.Second)
|
||||||
dir := t.TempDir()
|
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1)
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
|
||||||
m.StoreSeries(series, 0)
|
m.StoreSeries(series, 0)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
|
Loading…
Reference in a new issue