From 007907b4103f5245fd3f6f18e245cfb0855e5b5e Mon Sep 17 00:00:00 2001 From: Dan Milstein Date: Tue, 23 Aug 2016 16:26:33 -0400 Subject: [PATCH 1/2] Fix one of the tests for a remote storage QueueManager Specifically, the TestSpawnNotMoreThanMaxConcurrentSendsGoroutines was failing on a fresh checkout of master. The test had a race condition -- it would only pass if one of the spawned goroutines happened to very quickly pull a set of samples off an internal queue. This patch rewrites the test so that it deterministically waits until all samples have been pulled off that queue. In case of errors, it also now reports on the difference between what it expected and what it found. I verified that, if the code under test is deliberately broken, the test successfully reports on that. --- storage/remote/queue_manager_test.go | 97 +++++++++++++++++----------- 1 file changed, 61 insertions(+), 36 deletions(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 9e2522d0b1..2a37ecdd0d 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -15,7 +15,9 @@ package remote import ( "sync" + "sync/atomic" "testing" + "time" "github.com/prometheus/common/model" ) @@ -50,33 +52,6 @@ func (c *TestStorageClient) Name() string { return "teststorageclient" } -type TestBlockingStorageClient struct { - block chan bool - getData chan bool -} - -func NewTestBlockedStorageClient() *TestBlockingStorageClient { - return &TestBlockingStorageClient{ - block: make(chan bool), - getData: make(chan bool), - } -} - -func (c *TestBlockingStorageClient) Store(s model.Samples) error { - <-c.getData - <-c.block - return nil -} - -func (c *TestBlockingStorageClient) unlock() { - close(c.getData) - close(c.block) -} - -func (c *TestBlockingStorageClient) Name() string { - return "testblockingstorageclient" -} - func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. @@ -110,10 +85,47 @@ func TestSampleDelivery(t *testing.T) { c.waitForExpectedSamples(t) } +// TestBlockingStorageClient is a queue_manager StorageClient which will block +// on any calls to Store(), until the `block` channel is closed, at which point +// the `numCalls` property will contain a count of how many times Store() was +// called. +type TestBlockingStorageClient struct { + block chan bool + numCalls uint64 +} + +func NewTestBlockedStorageClient() *TestBlockingStorageClient { + return &TestBlockingStorageClient{ + block: make(chan bool), + numCalls: 0, + } +} + +func (c *TestBlockingStorageClient) Store(s model.Samples) error { + atomic.AddUint64(&c.numCalls, 1) + <-c.block + return nil +} + +func (c *TestBlockingStorageClient) NumCalls() uint64 { + return atomic.LoadUint64(&c.numCalls) +} + +func (c *TestBlockingStorageClient) unlock() { + close(c.block) +} + +func (c *TestBlockingStorageClient) Name() string { + return "testblockingstorageclient" +} + func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { - // `maxSamplesPerSend*maxConcurrentSends + 1` samples should be consumed by - // goroutines, `maxSamplesPerSend` should be still in the queue. - n := maxSamplesPerSend*maxConcurrentSends + maxSamplesPerSend*2 + // Our goal is to fully empty the queue: + // `maxSamplesPerSend*maxConcurrentSends` samples should be consumed by the + // semaphore-controlled goroutines, and then another `maxSamplesPerSend` + // should be consumed by the Run() loop calling sendSample and immediately + // blocking. + n := maxSamplesPerSend*maxConcurrentSends + maxSamplesPerSend samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -129,20 +141,33 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { m := NewStorageQueueManager(c, n) go m.Run() + defer m.Stop() for _, s := range samples { m.Append(s) } - for i := 0; i < maxConcurrentSends; i++ { - c.getData <- true // Wait while all goroutines are spawned. + // Wait until the Run() loop drains the queue. If things went right, it + // should then immediately block in sendSamples(), but, in case of error, + // it would spawn too many goroutines, and thus we'd see more calls to + // client.Store() + // + // The timed wait is maybe non-ideal, but, in order to verify that we're + // not spawning too many concurrent goroutines, we have to wait on the + // Run() loop to consume a specific number of elements from the + // queue... and it doesn't signal that in any obvious way, except by + // draining the queue. Also, this way, if something goes surprisingly + // wrong that prevents the queue from being drained (aka, not just spawning + // extra goroutines, but something totally unexpected), we deadlock, which + // is a clear signal of a problem. + for len(m.queue) > 0 { + time.Sleep(10 * time.Millisecond) } - if len(m.queue) != maxSamplesPerSend { - t.Errorf("Queue should contain %d samples, it contains 0.", maxSamplesPerSend) + numCalls := c.NumCalls() + if numCalls != maxConcurrentSends { + t.Errorf("Saw %d concurrent sends, expected %d", numCalls, maxConcurrentSends) } c.unlock() - - defer m.Stop() } From 764ceaa9394801629542fef00055153e5e03c2ca Mon Sep 17 00:00:00 2001 From: Dan Milstein Date: Wed, 24 Aug 2016 11:30:38 -0400 Subject: [PATCH 2/2] Add timeout to test, cap waiting at 1 second --- storage/remote/queue_manager_test.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 2a37ecdd0d..afe23178f2 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -141,7 +141,11 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { m := NewStorageQueueManager(c, n) go m.Run() - defer m.Stop() + + defer func() { + c.unlock() + m.Stop() + }() for _, s := range samples { m.Append(s) @@ -156,18 +160,22 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // not spawning too many concurrent goroutines, we have to wait on the // Run() loop to consume a specific number of elements from the // queue... and it doesn't signal that in any obvious way, except by - // draining the queue. Also, this way, if something goes surprisingly - // wrong that prevents the queue from being drained (aka, not just spawning - // extra goroutines, but something totally unexpected), we deadlock, which - // is a clear signal of a problem. - for len(m.queue) > 0 { + // draining the queue. We cap the waiting at 1 second -- that should give + // plenty of time, and keeps the failure fairly quick if we're not draining + // the queue properly. + for i := 0; i < 100 && len(m.queue) > 0; i++ { time.Sleep(10 * time.Millisecond) } + if len(m.queue) > 0 { + t.Fatalf("Failed to drain StorageQueueManager queue, %d elements left", + len(m.queue), + ) + } + numCalls := c.NumCalls() if numCalls != maxConcurrentSends { t.Errorf("Saw %d concurrent sends, expected %d", numCalls, maxConcurrentSends) } - c.unlock() }