diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a4384e03ee..3e2bff3b58 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -791,7 +791,10 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { select { case <-ctx.Done(): return + default: + } + select { case sample, ok := <-queue: if !ok { if nPending > 0 { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e4917441fc..40584f0147 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -177,6 +177,45 @@ func TestShutdown(t *testing.T) { } } +func TestShutdownNoPendingSamples(t *testing.T) { + deadline := 1 * time.Second + c := NewTestBlockedStorageClient() + + dir, err := ioutil.TempDir("", "TestShutdown") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + metrics := newQueueManagerMetrics(nil) + m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) + n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend + samples, series := createTimeseries(n, n) + m.StoreSeries(series, 0) + m.Start() + m.reshardChan <- 10 + + // Append blocks to guarantee delivery, so we do it in the background. + go func() { + m.Append(samples) + }() + time.Sleep(100 * time.Millisecond) + + m.reshardChan <- 1 + // Test to ensure that Stop doesn't block. + start := time.Now() + m.Stop() + // The samples will never be delivered, so duration should + // be at least equal to deadline, otherwise the flush deadline + // was not respected. + duration := time.Since(start) + if duration > time.Duration(deadline+(deadline/10)) { + t.Errorf("Took too long to shutdown: %s > %s", duration, deadline) + } + if duration < time.Duration(deadline) { + t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline) + } + testutil.Assert(t, client_testutil.ToFloat64(m.pendingSamplesMetric) == 0, "pending samples was not 0 after all shards were stopped: %f", client_testutil.ToFloat64(m.pendingSamplesMetric)) +} + func TestSeriesReset(t *testing.T) { c := NewTestBlockedStorageClient() deadline := 5 * time.Second