From e970acb085eb82d15b4cb7cfd8676f674541ddc6 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Thu, 3 Mar 2022 10:40:09 -0700 Subject: [PATCH] Fix deadlock between adding to queue and getting batch Do not block when trying to write a batch to the queue. This can cause appends to lock forever if the only thing reading from the queue needs the mutex to write. Instead, if batchQueue is full pop the sample that was just added from the partial batch and return false. The code doing the appending already handles retries with backoff. Signed-off-by: Chris Marchbanks --- storage/remote/queue_manager.go | 77 +++++++++++++++------------------ 1 file changed, 36 insertions(+), 41 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 22da78dca8..17ad7525eb 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -522,8 +522,11 @@ outer: continue } t.seriesMtx.Unlock() - // This will only loop if the queues are being resharded. - backoff := t.cfg.MinBackoff + // Start with a very small backoff. This should not be t.cfg.MinBackoff + // as it can happen without errors, and we want to pickup work after + // filling a queue/resharding as quickly as possible. + // TODO: Consider using the average duration of a request as the backoff. + backoff := model.Duration(5 * time.Millisecond) for { select { case <-t.quit: @@ -542,6 +545,8 @@ outer: t.metrics.enqueueRetriesTotal.Inc() time.Sleep(time.Duration(backoff)) backoff = backoff * 2 + // It is reasonable to use t.cfg.MaxBackoff here, as if we have hit + // the full backoff we are likely waiting for external resources. if backoff > t.cfg.MaxBackoff { backoff = t.cfg.MaxBackoff } @@ -906,8 +911,7 @@ func (t *QueueManager) newShards() *shards { } type shards struct { - mtx sync.RWMutex // With the WAL, this is never actually contended. - writeMtx sync.Mutex + mtx sync.RWMutex // With the WAL, this is never actually contended. qm *QueueManager queues []*queue @@ -994,26 +998,21 @@ func (s *shards) stop() { } } -// enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding, -// will return false; in this case, you should back off and retry. +// enqueue data (sample or exemplar). If the shard is full, shutting down, or +// resharding, it will return false; in this case, you should back off and +// retry. A shard is full when its configured capacity has been reached, +// specifically, when s.queues[shard] has filled its batchQueue channel and the +// partial batch has also been filled. func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { s.mtx.RLock() defer s.mtx.RUnlock() - s.writeMtx.Lock() - defer s.writeMtx.Unlock() - - select { - case <-s.softShutdown: - return false - default: - } shard := uint64(ref) % uint64(len(s.queues)) select { case <-s.softShutdown: return false default: - appended := s.queues[shard].Append(data, s.softShutdown) + appended := s.queues[shard].Append(data) if !appended { return false } @@ -1029,9 +1028,7 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { } type queue struct { - // batchMtx covers sending to the batchQueue and batch operations other - // than appending a sample. It is mainly to make sure (*queue).Batch() and - // (*queue).FlushAndShutdown() are not called concurrently. + // batchMtx covers operations appending to or publishing the partial batch. batchMtx sync.Mutex batch []sampleOrExemplar batchQueue chan []sampleOrExemplar @@ -1053,6 +1050,11 @@ type sampleOrExemplar struct { func newQueue(batchSize, capacity int) *queue { batches := capacity / batchSize + // Always create an unbuffered channel even if capacity is configured to be + // less than max_samples_per_send. + if batches == 0 { + batches = 1 + } return &queue{ batch: make([]sampleOrExemplar, 0, batchSize), batchQueue: make(chan []sampleOrExemplar, batches), @@ -1062,14 +1064,18 @@ func newQueue(batchSize, capacity int) *queue { } } -func (q *queue) Append(datum sampleOrExemplar, stop <-chan struct{}) bool { +// Append the sampleOrExemplar to the buffered batch. Returns false if it +// cannot be added and must be retried. +func (q *queue) Append(datum sampleOrExemplar) bool { + q.batchMtx.Lock() + defer q.batchMtx.Unlock() q.batch = append(q.batch, datum) if len(q.batch) == cap(q.batch) { select { case q.batchQueue <- q.batch: q.batch = q.newBatch(cap(q.batch)) return true - case <-stop: + default: // Remove the sample we just appended. It will get retried. q.batch = q.batch[:len(q.batch)-1] return false @@ -1082,15 +1088,19 @@ func (q *queue) Chan() <-chan []sampleOrExemplar { return q.batchQueue } -// Batch returns the current batch and allocates a new batch. Must not be -// called concurrently with Append. +// Batch returns the current batch and allocates a new batch. func (q *queue) Batch() []sampleOrExemplar { q.batchMtx.Lock() defer q.batchMtx.Unlock() - batch := q.batch - q.batch = q.newBatch(cap(batch)) - return batch + select { + case batch := <-q.batchQueue: + return batch + default: + batch := q.batch + q.batch = q.newBatch(cap(batch)) + return batch + } } // ReturnForReuse adds the batch buffer back to the internal pool. @@ -1201,22 +1211,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) case <-timer.C: - // We need to take the write lock when getting a batch to avoid - // concurrent Appends. Generally this will only happen on low - // traffic instances or during resharding. We have to use writeMtx - // and not the batchMtx on a queue because we do not want to have - // to lock each queue for each sample, and cannot call - // queue.Batch() while an Append happens. - s.writeMtx.Lock() - // First, we need to see if we can happen to get a batch from the - // queue if it filled while acquiring the lock. - var batch []sampleOrExemplar - select { - case batch = <-batchQueue: - default: - batch = queue.Batch() - } - s.writeMtx.Unlock() + batch := queue.Batch() if len(batch) > 0 { nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) n := nPendingSamples + nPendingExemplars