From f8c9d375b6fb273157ebf2ae368ed7755d791af6 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 9 Mar 2018 12:00:26 +0000 Subject: [PATCH 1/4] Correctly stop the timer used in the remote write path. --- storage/remote/queue_manager.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 54495ca0d9..01537aa3dc 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -430,6 +430,14 @@ func (s *shards) runShard(i int) { pendingSamples := model.Samples{} timer := time.NewTimer(s.qm.cfg.BatchSendDeadline) + defer func() { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + }() for { select { @@ -449,11 +457,16 @@ func (s *shards) runShard(i int) { for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] + + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(s.qm.cfg.BatchSendDeadline) } - if !timer.Stop() { - <-timer.C - } - timer.Reset(s.qm.cfg.BatchSendDeadline) + case <-timer.C: if len(pendingSamples) > 0 { s.sendSamples(pendingSamples) From 22d820ef8e3178b1af0e8e4b98d5cda1c7887629 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 12 Mar 2018 14:27:48 +0000 Subject: [PATCH 2/4] Review feedback. --- storage/remote/queue_manager.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 01537aa3dc..7d52ed1eff 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -430,14 +430,15 @@ func (s *shards) runShard(i int) { pendingSamples := model.Samples{} timer := time.NewTimer(s.qm.cfg.BatchSendDeadline) - defer func() { + stop := func() { if !timer.Stop() { select { case <-timer.C: default: } } - }() + } + defer stop() for { select { @@ -454,16 +455,11 @@ func (s *shards) runShard(i int) { queueLength.WithLabelValues(s.qm.queueName).Dec() pendingSamples = append(pendingSamples, sample) - for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { + if len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } + stop() timer.Reset(s.qm.cfg.BatchSendDeadline) } @@ -472,6 +468,7 @@ func (s *shards) runShard(i int) { s.sendSamples(pendingSamples) pendingSamples = pendingSamples[:0] } + timer.Reset(s.qm.cfg.BatchSendDeadline) } } } From 390b018c90d019d091c0aa43787432870a89b167 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 12 Mar 2018 15:35:43 +0000 Subject: [PATCH 3/4] Test sample timeout delivery. --- storage/remote/queue_manager_test.go | 41 ++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 9d2e7a34b3..d81eaf8218 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -44,6 +44,9 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) { c.mtx.Lock() defer c.mtx.Unlock() + c.expectedSamples = map[string][]*prompb.Sample{} + c.receivedSamples = map[string][]*prompb.Sample{} + for _, s := range ss { ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String() c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{ @@ -122,6 +125,44 @@ func TestSampleDelivery(t *testing.T) { c.waitForExpectedSamples(t) } +func TestSampleDeliveryTimeout(t *testing.T) { + // Let's send on less sample than batch size, and wait the timeout duration + n := config.DefaultQueueConfig.Capacity - 1 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestStorageClient() + + cfg := config.DefaultQueueConfig + cfg.MaxShards = 1 + cfg.BatchSendDeadline = 100 * time.Millisecond + m := NewQueueManager(nil, cfg, nil, nil, c) + m.Start() + defer m.Stop() + + // Send the samples twice, waiting for the samples in the meantime. + c.expectSamples(samples) + for _, s := range samples { + m.Append(s) + } + c.waitForExpectedSamples(t) + + c.expectSamples(samples) + for _, s := range samples { + m.Append(s) + } + c.waitForExpectedSamples(t) +} + func TestSampleDeliveryOrder(t *testing.T) { ts := 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * ts From dc860e7d0eb78520a4860e2004bc23b3befe0d71 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 12 Mar 2018 16:48:51 +0000 Subject: [PATCH 4/4] Fix nit. --- storage/remote/queue_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index d81eaf8218..e2a5b19cb5 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -126,7 +126,7 @@ func TestSampleDelivery(t *testing.T) { } func TestSampleDeliveryTimeout(t *testing.T) { - // Let's send on less sample than batch size, and wait the timeout duration + // Let's send one less sample than batch size, and wait the timeout duration n := config.DefaultQueueConfig.Capacity - 1 samples := make(model.Samples, 0, n)