From 8f525b4ba4eb14f22a54cec4cf9b855d5c8efe4a Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 27 Mar 2023 16:32:25 +0100 Subject: [PATCH] storage/remote tests: refactor: extract function newTestQueueManager To reduce repetition. Signed-off-by: Bryan Boreham --- storage/remote/queue_manager_test.go | 114 ++++++++------------------- 1 file changed, 34 insertions(+), 80 deletions(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index c953266b7..daa89a930 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -163,16 +163,23 @@ func TestSampleDelivery(t *testing.T) { } } -func TestMetadataDelivery(t *testing.T) { +func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration) (*TestWriteClient, *QueueManager) { c := NewTestWriteClient() - - dir := t.TempDir() - cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig + return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c) +} +func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient) *QueueManager { + dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) + + return m +} + +func TestMetadataDelivery(t *testing.T) { + c, m := newTestClientAndQueueManager(t, defaultFlushDeadline) m.Start() defer m.Stop() @@ -192,7 +199,7 @@ func TestMetadataDelivery(t *testing.T) { require.Len(t, c.receivedMetadata, numMetadata) // One more write than the rounded qoutient should be performed in order to get samples that didn't // fit into MaxSamplesPerSend. - require.Equal(t, numMetadata/mcfg.MaxSamplesPerSend+1, c.writesReceived) + require.Equal(t, numMetadata/config.DefaultMetadataConfig.MaxSamplesPerSend+1, c.writesReceived) // Make sure the last samples were sent. require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric) } @@ -201,17 +208,13 @@ func TestSampleDeliveryTimeout(t *testing.T) { // Let's send one less sample than batch size, and wait the timeout duration n := 9 samples, series := createTimeseries(n, n) - c := NewTestWriteClient() cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) - dir := t.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + c := NewTestWriteClient() + m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -244,16 +247,8 @@ func TestSampleDeliveryOrder(t *testing.T) { }) } - c := NewTestWriteClient() + c, m := newTestClientAndQueueManager(t, defaultFlushDeadline) c.expectSamples(samples, series) - - dir := t.TempDir() - - cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig - - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m.StoreSeries(series, 0) m.Start() @@ -267,13 +262,10 @@ func TestShutdown(t *testing.T) { deadline := 1 * time.Second c := NewTestBlockedWriteClient() - dir := t.TempDir() - cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := newTestQueueManager(t, cfg, mcfg, deadline, c) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -306,12 +298,10 @@ func TestSeriesReset(t *testing.T) { numSegments := 4 numSeries := 25 - dir := t.TempDir() - cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := newTestQueueManager(t, cfg, mcfg, deadline, c) + for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -330,17 +320,12 @@ func TestReshard(t *testing.T) { nSamples := config.DefaultQueueConfig.Capacity * size samples, series := createTimeseries(nSamples, nSeries) - c := NewTestWriteClient() - c.expectSamples(samples, series) - cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 - dir := t.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + c := NewTestWriteClient() + m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c) + c.expectSamples(samples, series) m.StoreSeries(series, 0) m.Start() @@ -363,7 +348,7 @@ func TestReshard(t *testing.T) { c.waitForExpectedData(t) } -func TestReshardRaceWithStop(*testing.T) { +func TestReshardRaceWithStop(t *testing.T) { c := NewTestWriteClient() var m *QueueManager h := sync.Mutex{} @@ -375,8 +360,7 @@ func TestReshardRaceWithStop(*testing.T) { exitCh := make(chan struct{}) go func() { for { - metrics := newQueueManagerMetrics(nil, "", "") - m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c) m.Start() h.Unlock() h.Lock() @@ -410,8 +394,7 @@ func TestReshardPartialBatch(t *testing.T) { flushDeadline := 10 * time.Millisecond cfg.BatchSendDeadline = model.Duration(batchSendDeadline) - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c) m.StoreSeries(series, 0) m.Start() @@ -454,9 +437,7 @@ func TestQueueFilledDeadlock(t *testing.T) { batchSendDeadline := time.Millisecond cfg.BatchSendDeadline = model.Duration(batchSendDeadline) - metrics := newQueueManagerMetrics(nil, "", "") - - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -479,11 +460,7 @@ func TestQueueFilledDeadlock(t *testing.T) { } func TestReleaseNoninternedString(t *testing.T) { - cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig - metrics := newQueueManagerMetrics(nil, "", "") - c := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + _, m := newTestClientAndQueueManager(t, defaultFlushDeadline) m.Start() defer m.Stop() @@ -525,12 +502,8 @@ func TestShouldReshard(t *testing.T) { }, } - cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig for _, c := range cases { - metrics := newQueueManagerMetrics(nil, "", "") - client := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + _, m := newTestClientAndQueueManager(t, defaultFlushDeadline) m.numShards = c.startingShards m.dataIn.incr(c.samplesIn) m.dataOut.incr(c.samplesOut) @@ -904,10 +877,7 @@ func BenchmarkSampleSend(b *testing.B) { cfg.MinShards = 20 cfg.MaxShards = 20 - dir := b.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c) m.StoreSeries(series, 0) // These should be received by the client. @@ -1083,15 +1053,9 @@ func TestProcessExternalLabels(t *testing.T) { } func TestCalculateDesiredShards(t *testing.T) { - c := NewTestWriteClient() cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig - - dir := t.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") - samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + _, m := newTestClientAndQueueManager(t, defaultFlushDeadline) + samplesIn := m.dataIn // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual @@ -1160,15 +1124,8 @@ func TestCalculateDesiredShards(t *testing.T) { } func TestCalculateDesiredShardsDetail(t *testing.T) { - c := NewTestWriteClient() - cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig - - dir := t.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") - samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + _, m := newTestClientAndQueueManager(t, defaultFlushDeadline) + samplesIn := m.dataIn for _, tc := range []struct { name string @@ -1393,10 +1350,7 @@ func TestDropOldTimeSeries(t *testing.T) { mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 cfg.SampleAgeLimit = model.Duration(60 * time.Second) - dir := t.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c) m.StoreSeries(series, 0) m.Start()