mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Fix a deadlock between Batch and FlushAndShutdown (#10608)
If FlushAndShutdown is called with a full batchQueue, and then Batch is called rather than the normal path of reading from a queue a deadlock might be encountered. Rather than having FlushAndShutdown having blocking code while holding a lock retry sending the batch every second. Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
parent
5b80eaa3ca
commit
a11e73edda
|
@ -1116,21 +1116,35 @@ func (q *queue) ReturnForReuse(batch []sampleOrExemplar) {
|
|||
// FlushAndShutdown stops the queue and flushes any samples. No appends can be
|
||||
// made after this is called.
|
||||
func (q *queue) FlushAndShutdown(done <-chan struct{}) {
|
||||
q.batchMtx.Lock()
|
||||
defer q.batchMtx.Unlock()
|
||||
|
||||
if len(q.batch) > 0 {
|
||||
select {
|
||||
case q.batchQueue <- q.batch:
|
||||
case <-done:
|
||||
// The shard has been hard shut down, so no more samples can be
|
||||
// sent. Drop everything left in the queue.
|
||||
}
|
||||
for q.tryEnqueueingBatch(done) {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
q.batch = nil
|
||||
close(q.batchQueue)
|
||||
}
|
||||
|
||||
// tryEnqueueingBatch tries to send a batch if necessary. If sending needs to
|
||||
// be retried it will return true.
|
||||
func (q *queue) tryEnqueueingBatch(done <-chan struct{}) bool {
|
||||
q.batchMtx.Lock()
|
||||
defer q.batchMtx.Unlock()
|
||||
if len(q.batch) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
select {
|
||||
case q.batchQueue <- q.batch:
|
||||
return false
|
||||
case <-done:
|
||||
// The shard has been hard shut down, so no more samples can be sent.
|
||||
// No need to try again as we will drop everything left in the queue.
|
||||
return false
|
||||
default:
|
||||
// The batchQueue is full, so we need to try again later.
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (q *queue) newBatch(capacity int) []sampleOrExemplar {
|
||||
q.poolMtx.Lock()
|
||||
defer q.poolMtx.Unlock()
|
||||
|
|
|
@ -1183,3 +1183,29 @@ func TestQueueManagerMetrics(t *testing.T) {
|
|||
err = client_testutil.GatherAndCompare(reg, strings.NewReader(""))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) {
|
||||
capacity := 100
|
||||
batchSize := 10
|
||||
queue := newQueue(batchSize, capacity)
|
||||
for i := 0; i < capacity+batchSize; i++ {
|
||||
queue.Append(sampleOrExemplar{})
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go queue.FlushAndShutdown(done)
|
||||
go func() {
|
||||
// Give enough time for FlushAndShutdown to acquire the lock. queue.Batch()
|
||||
// should not block forever even if the lock is acquired.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
queue.Batch()
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("Deadlock in FlushAndShutdown detected")
|
||||
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue