diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 54495ca0d..7d52ed1ef 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -430,6 +430,15 @@ func (s *shards) runShard(i int) { pendingSamples := model.Samples{} timer := time.NewTimer(s.qm.cfg.BatchSendDeadline) + stop := func() { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + } + defer stop() for { select { @@ -446,19 +455,20 @@ func (s *shards) runShard(i int) { queueLength.WithLabelValues(s.qm.queueName).Dec() pendingSamples = append(pendingSamples, sample) - for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { + if len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] + + stop() + timer.Reset(s.qm.cfg.BatchSendDeadline) } - if !timer.Stop() { - <-timer.C - } - timer.Reset(s.qm.cfg.BatchSendDeadline) + case <-timer.C: if len(pendingSamples) > 0 { s.sendSamples(pendingSamples) pendingSamples = pendingSamples[:0] } + timer.Reset(s.qm.cfg.BatchSendDeadline) } } } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 9d2e7a34b..e2a5b19cb 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -44,6 +44,9 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) { c.mtx.Lock() defer c.mtx.Unlock() + c.expectedSamples = map[string][]*prompb.Sample{} + c.receivedSamples = map[string][]*prompb.Sample{} + for _, s := range ss { ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String() c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{ @@ -122,6 +125,44 @@ func TestSampleDelivery(t *testing.T) { c.waitForExpectedSamples(t) } +func TestSampleDeliveryTimeout(t *testing.T) { + // Let's send one less sample than batch size, and wait the timeout duration + n := config.DefaultQueueConfig.Capacity - 1 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestStorageClient() + + cfg := config.DefaultQueueConfig + cfg.MaxShards = 1 + cfg.BatchSendDeadline = 100 * time.Millisecond + m := NewQueueManager(nil, cfg, nil, nil, c) + m.Start() + defer m.Stop() + + // Send the samples twice, waiting for the samples in the meantime. + c.expectSamples(samples) + for _, s := range samples { + m.Append(s) + } + c.waitForExpectedSamples(t) + + c.expectSamples(samples) + for _, s := range samples { + m.Append(s) + } + c.waitForExpectedSamples(t) +} + func TestSampleDeliveryOrder(t *testing.T) { ts := 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * ts