diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index fb931c2f2..abe0caf7c 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -16,6 +16,7 @@ package remote import ( "math" "sync" + "sync/atomic" "time" "golang.org/x/time/rate" @@ -372,10 +373,10 @@ func (t *QueueManager) reshard(n int) { } type shards struct { - qm *QueueManager - queues []chan *model.Sample - done chan struct{} - wg sync.WaitGroup + qm *QueueManager + queues []chan *model.Sample + done chan struct{} + running int32 } func (t *QueueManager) newShards(numShards int) *shards { @@ -384,11 +385,11 @@ func (t *QueueManager) newShards(numShards int) *shards { queues[i] = make(chan *model.Sample, t.cfg.Capacity) } s := &shards{ - qm: t, - queues: queues, - done: make(chan struct{}), + qm: t, + queues: queues, + done: make(chan struct{}), + running: int32(numShards), } - s.wg.Add(numShards) return s } @@ -407,14 +408,8 @@ func (s *shards) stop() { close(shard) } - done := make(chan struct{}) - go func() { - s.wg.Wait() - close(done) - }() - select { - case <-done: + case <-s.done: case <-time.After(stopFlushDeadline): level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") } @@ -435,7 +430,12 @@ func (s *shards) enqueue(sample *model.Sample) bool { } func (s *shards) runShard(i int) { - defer s.wg.Done() + defer func() { + if atomic.AddInt32(&s.running, -1) == 0 { + close(s.done) + } + }() + queue := s.queues[i] // Send batches of at most MaxSamplesPerSend samples to the remote storage.