Fix deadlock between adding to queue and getting batch

Do not block when trying to write a batch to the queue. This can cause
appends to lock forever if the only thing reading from the queue needs
the mutex to write. Instead, if batchQueue is full pop the sample that
was just added from the partial batch and return false. The code doing
the appending already handles retries with backoff.

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
Chris Marchbanks 2022-03-03 10:40:09 -07:00
parent afdc1decac
commit e970acb085
No known key found for this signature in database
GPG key ID: B7FD940BC86A8E7A

View file

@ -522,8 +522,11 @@ outer:
continue continue
} }
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
// This will only loop if the queues are being resharded. // Start with a very small backoff. This should not be t.cfg.MinBackoff
backoff := t.cfg.MinBackoff // as it can happen without errors, and we want to pickup work after
// filling a queue/resharding as quickly as possible.
// TODO: Consider using the average duration of a request as the backoff.
backoff := model.Duration(5 * time.Millisecond)
for { for {
select { select {
case <-t.quit: case <-t.quit:
@ -542,6 +545,8 @@ outer:
t.metrics.enqueueRetriesTotal.Inc() t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff)) time.Sleep(time.Duration(backoff))
backoff = backoff * 2 backoff = backoff * 2
// It is reasonable to use t.cfg.MaxBackoff here, as if we have hit
// the full backoff we are likely waiting for external resources.
if backoff > t.cfg.MaxBackoff { if backoff > t.cfg.MaxBackoff {
backoff = t.cfg.MaxBackoff backoff = t.cfg.MaxBackoff
} }
@ -907,7 +912,6 @@ func (t *QueueManager) newShards() *shards {
type shards struct { type shards struct {
mtx sync.RWMutex // With the WAL, this is never actually contended. mtx sync.RWMutex // With the WAL, this is never actually contended.
writeMtx sync.Mutex
qm *QueueManager qm *QueueManager
queues []*queue queues []*queue
@ -994,26 +998,21 @@ func (s *shards) stop() {
} }
} }
// enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding, // enqueue data (sample or exemplar). If the shard is full, shutting down, or
// will return false; in this case, you should back off and retry. // resharding, it will return false; in this case, you should back off and
// retry. A shard is full when its configured capacity has been reached,
// specifically, when s.queues[shard] has filled its batchQueue channel and the
// partial batch has also been filled.
func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
s.mtx.RLock() s.mtx.RLock()
defer s.mtx.RUnlock() defer s.mtx.RUnlock()
s.writeMtx.Lock()
defer s.writeMtx.Unlock()
select {
case <-s.softShutdown:
return false
default:
}
shard := uint64(ref) % uint64(len(s.queues)) shard := uint64(ref) % uint64(len(s.queues))
select { select {
case <-s.softShutdown: case <-s.softShutdown:
return false return false
default: default:
appended := s.queues[shard].Append(data, s.softShutdown) appended := s.queues[shard].Append(data)
if !appended { if !appended {
return false return false
} }
@ -1029,9 +1028,7 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
} }
type queue struct { type queue struct {
// batchMtx covers sending to the batchQueue and batch operations other // batchMtx covers operations appending to or publishing the partial batch.
// than appending a sample. It is mainly to make sure (*queue).Batch() and
// (*queue).FlushAndShutdown() are not called concurrently.
batchMtx sync.Mutex batchMtx sync.Mutex
batch []sampleOrExemplar batch []sampleOrExemplar
batchQueue chan []sampleOrExemplar batchQueue chan []sampleOrExemplar
@ -1053,6 +1050,11 @@ type sampleOrExemplar struct {
func newQueue(batchSize, capacity int) *queue { func newQueue(batchSize, capacity int) *queue {
batches := capacity / batchSize batches := capacity / batchSize
// Always create an unbuffered channel even if capacity is configured to be
// less than max_samples_per_send.
if batches == 0 {
batches = 1
}
return &queue{ return &queue{
batch: make([]sampleOrExemplar, 0, batchSize), batch: make([]sampleOrExemplar, 0, batchSize),
batchQueue: make(chan []sampleOrExemplar, batches), batchQueue: make(chan []sampleOrExemplar, batches),
@ -1062,14 +1064,18 @@ func newQueue(batchSize, capacity int) *queue {
} }
} }
func (q *queue) Append(datum sampleOrExemplar, stop <-chan struct{}) bool { // Append the sampleOrExemplar to the buffered batch. Returns false if it
// cannot be added and must be retried.
func (q *queue) Append(datum sampleOrExemplar) bool {
q.batchMtx.Lock()
defer q.batchMtx.Unlock()
q.batch = append(q.batch, datum) q.batch = append(q.batch, datum)
if len(q.batch) == cap(q.batch) { if len(q.batch) == cap(q.batch) {
select { select {
case q.batchQueue <- q.batch: case q.batchQueue <- q.batch:
q.batch = q.newBatch(cap(q.batch)) q.batch = q.newBatch(cap(q.batch))
return true return true
case <-stop: default:
// Remove the sample we just appended. It will get retried. // Remove the sample we just appended. It will get retried.
q.batch = q.batch[:len(q.batch)-1] q.batch = q.batch[:len(q.batch)-1]
return false return false
@ -1082,15 +1088,19 @@ func (q *queue) Chan() <-chan []sampleOrExemplar {
return q.batchQueue return q.batchQueue
} }
// Batch returns the current batch and allocates a new batch. Must not be // Batch returns the current batch and allocates a new batch.
// called concurrently with Append.
func (q *queue) Batch() []sampleOrExemplar { func (q *queue) Batch() []sampleOrExemplar {
q.batchMtx.Lock() q.batchMtx.Lock()
defer q.batchMtx.Unlock() defer q.batchMtx.Unlock()
select {
case batch := <-q.batchQueue:
return batch
default:
batch := q.batch batch := q.batch
q.batch = q.newBatch(cap(batch)) q.batch = q.newBatch(cap(batch))
return batch return batch
}
} }
// ReturnForReuse adds the batch buffer back to the internal pool. // ReturnForReuse adds the batch buffer back to the internal pool.
@ -1201,22 +1211,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
case <-timer.C: case <-timer.C:
// We need to take the write lock when getting a batch to avoid batch := queue.Batch()
// concurrent Appends. Generally this will only happen on low
// traffic instances or during resharding. We have to use writeMtx
// and not the batchMtx on a queue because we do not want to have
// to lock each queue for each sample, and cannot call
// queue.Batch() while an Append happens.
s.writeMtx.Lock()
// First, we need to see if we can happen to get a batch from the
// queue if it filled while acquiring the lock.
var batch []sampleOrExemplar
select {
case batch = <-batchQueue:
default:
batch = queue.Batch()
}
s.writeMtx.Unlock()
if len(batch) > 0 { if len(batch) > 0 {
nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars n := nPendingSamples + nPendingExemplars