diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 3bbd0be4b6..321d586223 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -318,6 +318,21 @@ func TestSampleDelivery(t *testing.T) { } } +func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, protoMsg config.RemoteWriteProtoMsg) (*TestWriteClient, *QueueManager) { + c := NewTestWriteClient(protoMsg) + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig + return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg) +} + +func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager { + dir := t.TempDir() + metrics := newQueueManagerMetrics(nil, "", "") + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) + + return m +} + func testDefaultQueueConfig() config.QueueConfig { cfg := config.DefaultQueueConfig // For faster unit tests we don't wait default 5 seconds. @@ -326,15 +341,7 @@ func testDefaultQueueConfig() config.QueueConfig { } func TestMetadataDelivery(t *testing.T) { - c := NewTestWriteClient(config.RemoteWriteProtoMsgV1) - - dir := t.TempDir() - - cfg := testDefaultQueueConfig() - mcfg := config.DefaultMetadataConfig - - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1) m.Start() defer m.Stop() @@ -355,7 +362,7 @@ func TestMetadataDelivery(t *testing.T) { require.Len(t, c.receivedMetadata, numMetadata) // One more write than the rounded qoutient should be performed in order to get samples that didn't // fit into MaxSamplesPerSend. - require.Equal(t, numMetadata/mcfg.MaxSamplesPerSend+1, c.writesReceived) + require.Equal(t, numMetadata/config.DefaultMetadataConfig.MaxSamplesPerSend+1, c.writesReceived) // Make sure the last samples were sent. require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric) } @@ -407,16 +414,12 @@ func TestSampleDeliveryTimeout(t *testing.T) { // Let's send one less sample than batch size, and wait the timeout duration n := 9 samples, series := createTimeseries(n, n) - c := NewTestWriteClient(protoMsg) - cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 - dir := t.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) + c := NewTestWriteClient(protoMsg) + m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -453,16 +456,8 @@ func TestSampleDeliveryOrder(t *testing.T) { }) } - c := NewTestWriteClient(protoMsg) + c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg) c.expectSamples(samples, series) - - dir := t.TempDir() - - cfg := testDefaultQueueConfig() - mcfg := config.DefaultMetadataConfig - - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) m.StoreSeries(series, 0) m.Start() @@ -478,13 +473,10 @@ func TestShutdown(t *testing.T) { deadline := 1 * time.Second c := NewTestBlockedWriteClient() - dir := t.TempDir() - - cfg := testDefaultQueueConfig() + cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -517,12 +509,9 @@ func TestSeriesReset(t *testing.T) { numSegments := 4 numSeries := 25 - dir := t.TempDir() - - cfg := testDefaultQueueConfig() + cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -543,17 +532,12 @@ func TestReshard(t *testing.T) { nSamples := config.DefaultQueueConfig.Capacity * size samples, series := createTimeseries(nSamples, nSeries) - c := NewTestWriteClient(protoMsg) - c.expectSamples(samples, series) - - cfg := testDefaultQueueConfig() - mcfg := config.DefaultMetadataConfig + cfg := config.DefaultQueueConfig cfg.MaxShards = 1 - dir := t.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) + c := NewTestWriteClient(protoMsg) + m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c, protoMsg) + c.expectSamples(samples, series) m.StoreSeries(series, 0) m.Start() @@ -591,8 +575,8 @@ func TestReshardRaceWithStop(t *testing.T) { exitCh := make(chan struct{}) go func() { for { - metrics := newQueueManagerMetrics(nil, "", "") - m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) + m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg) + m.Start() h.Unlock() h.Lock() @@ -630,8 +614,7 @@ func TestReshardPartialBatch(t *testing.T) { flushDeadline := 10 * time.Millisecond cfg.BatchSendDeadline = model.Duration(batchSendDeadline) - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) + m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg) m.StoreSeries(series, 0) m.Start() @@ -678,9 +661,7 @@ func TestQueueFilledDeadlock(t *testing.T) { batchSendDeadline := time.Millisecond cfg.BatchSendDeadline = model.Duration(batchSendDeadline) - metrics := newQueueManagerMetrics(nil, "", "") - - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) + m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -707,11 +688,7 @@ func TestQueueFilledDeadlock(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) { for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { - cfg := testDefaultQueueConfig() - mcfg := config.DefaultMetadataConfig - metrics := newQueueManagerMetrics(nil, "", "") - c := NewTestWriteClient(protoMsg) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) + _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg) m.Start() defer m.Stop() for i := 1; i < 1000; i++ { @@ -754,13 +731,9 @@ func TestShouldReshard(t *testing.T) { }, } - cfg := testDefaultQueueConfig() - mcfg := config.DefaultMetadataConfig for _, c := range cases { - metrics := newQueueManagerMetrics(nil, "", "") // todo: test with new proto type(s) - client := NewTestWriteClient(config.RemoteWriteProtoMsgV1) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1) m.numShards = c.startingShards m.dataIn.incr(c.samplesIn) m.dataOut.incr(c.samplesOut) @@ -1315,11 +1288,8 @@ func BenchmarkSampleSend(b *testing.B) { cfg.MinShards = 20 cfg.MaxShards = 20 - dir := b.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") // todo: test with new proto type(s) - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1) m.StoreSeries(series, 0) // These should be received by the client. @@ -1496,16 +1466,10 @@ func TestProcessExternalLabels(t *testing.T) { } func TestCalculateDesiredShards(t *testing.T) { - c := NewNopWriteClient() - cfg := testDefaultQueueConfig() - mcfg := config.DefaultMetadataConfig - - dir := t.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") - samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) // todo: test with new proto type(s) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + cfg := config.DefaultQueueConfig + _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1) + samplesIn := m.dataIn // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual @@ -1574,16 +1538,9 @@ func TestCalculateDesiredShards(t *testing.T) { } func TestCalculateDesiredShardsDetail(t *testing.T) { - c := NewTestWriteClient(config.RemoteWriteProtoMsgV1) - cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig - - dir := t.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") - samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) // todo: test with new proto type(s) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1) + samplesIn := m.dataIn for _, tc := range []struct { name string @@ -1960,10 +1917,7 @@ func TestDropOldTimeSeries(t *testing.T) { mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 cfg.SampleAgeLimit = model.Duration(60 * time.Second) - dir := t.TempDir() - - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1) m.StoreSeries(series, 0) m.Start()