diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 9d2e7a34b..d81eaf821 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -44,6 +44,9 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) { c.mtx.Lock() defer c.mtx.Unlock() + c.expectedSamples = map[string][]*prompb.Sample{} + c.receivedSamples = map[string][]*prompb.Sample{} + for _, s := range ss { ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String() c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{ @@ -122,6 +125,44 @@ func TestSampleDelivery(t *testing.T) { c.waitForExpectedSamples(t) } +func TestSampleDeliveryTimeout(t *testing.T) { + // Let's send on less sample than batch size, and wait the timeout duration + n := config.DefaultQueueConfig.Capacity - 1 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestStorageClient() + + cfg := config.DefaultQueueConfig + cfg.MaxShards = 1 + cfg.BatchSendDeadline = 100 * time.Millisecond + m := NewQueueManager(nil, cfg, nil, nil, c) + m.Start() + defer m.Stop() + + // Send the samples twice, waiting for the samples in the meantime. + c.expectSamples(samples) + for _, s := range samples { + m.Append(s) + } + c.waitForExpectedSamples(t) + + c.expectSamples(samples) + for _, s := range samples { + m.Append(s) + } + c.waitForExpectedSamples(t) +} + func TestSampleDeliveryOrder(t *testing.T) { ts := 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * ts