Merge pull request #10395 from prometheus/csmarchbanks/fix-rw-deadlock-append

Fix deadlock between adding to queue and getting batch
This commit is contained in:
Chris Marchbanks 2022-03-07 18:10:12 -07:00 committed by GitHub
commit 6508df8f57
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 41 deletions

View file

@ -522,8 +522,11 @@ outer:
continue continue
} }
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
// This will only loop if the queues are being resharded. // Start with a very small backoff. This should not be t.cfg.MinBackoff
backoff := t.cfg.MinBackoff // as it can happen without errors, and we want to pickup work after
// filling a queue/resharding as quickly as possible.
// TODO: Consider using the average duration of a request as the backoff.
backoff := model.Duration(5 * time.Millisecond)
for { for {
select { select {
case <-t.quit: case <-t.quit:
@ -542,6 +545,8 @@ outer:
t.metrics.enqueueRetriesTotal.Inc() t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff)) time.Sleep(time.Duration(backoff))
backoff = backoff * 2 backoff = backoff * 2
// It is reasonable to use t.cfg.MaxBackoff here, as if we have hit
// the full backoff we are likely waiting for external resources.
if backoff > t.cfg.MaxBackoff { if backoff > t.cfg.MaxBackoff {
backoff = t.cfg.MaxBackoff backoff = t.cfg.MaxBackoff
} }
@ -906,8 +911,7 @@ func (t *QueueManager) newShards() *shards {
} }
type shards struct { type shards struct {
mtx sync.RWMutex // With the WAL, this is never actually contended. mtx sync.RWMutex // With the WAL, this is never actually contended.
writeMtx sync.Mutex
qm *QueueManager qm *QueueManager
queues []*queue queues []*queue
@ -994,26 +998,21 @@ func (s *shards) stop() {
} }
} }
// enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding, // enqueue data (sample or exemplar). If the shard is full, shutting down, or
// will return false; in this case, you should back off and retry. // resharding, it will return false; in this case, you should back off and
// retry. A shard is full when its configured capacity has been reached,
// specifically, when s.queues[shard] has filled its batchQueue channel and the
// partial batch has also been filled.
func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
s.mtx.RLock() s.mtx.RLock()
defer s.mtx.RUnlock() defer s.mtx.RUnlock()
s.writeMtx.Lock()
defer s.writeMtx.Unlock()
select {
case <-s.softShutdown:
return false
default:
}
shard := uint64(ref) % uint64(len(s.queues)) shard := uint64(ref) % uint64(len(s.queues))
select { select {
case <-s.softShutdown: case <-s.softShutdown:
return false return false
default: default:
appended := s.queues[shard].Append(data, s.softShutdown) appended := s.queues[shard].Append(data)
if !appended { if !appended {
return false return false
} }
@ -1029,9 +1028,7 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
} }
type queue struct { type queue struct {
// batchMtx covers sending to the batchQueue and batch operations other // batchMtx covers operations appending to or publishing the partial batch.
// than appending a sample. It is mainly to make sure (*queue).Batch() and
// (*queue).FlushAndShutdown() are not called concurrently.
batchMtx sync.Mutex batchMtx sync.Mutex
batch []sampleOrExemplar batch []sampleOrExemplar
batchQueue chan []sampleOrExemplar batchQueue chan []sampleOrExemplar
@ -1053,6 +1050,11 @@ type sampleOrExemplar struct {
func newQueue(batchSize, capacity int) *queue { func newQueue(batchSize, capacity int) *queue {
batches := capacity / batchSize batches := capacity / batchSize
// Always create an unbuffered channel even if capacity is configured to be
// less than max_samples_per_send.
if batches == 0 {
batches = 1
}
return &queue{ return &queue{
batch: make([]sampleOrExemplar, 0, batchSize), batch: make([]sampleOrExemplar, 0, batchSize),
batchQueue: make(chan []sampleOrExemplar, batches), batchQueue: make(chan []sampleOrExemplar, batches),
@ -1062,14 +1064,18 @@ func newQueue(batchSize, capacity int) *queue {
} }
} }
func (q *queue) Append(datum sampleOrExemplar, stop <-chan struct{}) bool { // Append the sampleOrExemplar to the buffered batch. Returns false if it
// cannot be added and must be retried.
func (q *queue) Append(datum sampleOrExemplar) bool {
q.batchMtx.Lock()
defer q.batchMtx.Unlock()
q.batch = append(q.batch, datum) q.batch = append(q.batch, datum)
if len(q.batch) == cap(q.batch) { if len(q.batch) == cap(q.batch) {
select { select {
case q.batchQueue <- q.batch: case q.batchQueue <- q.batch:
q.batch = q.newBatch(cap(q.batch)) q.batch = q.newBatch(cap(q.batch))
return true return true
case <-stop: default:
// Remove the sample we just appended. It will get retried. // Remove the sample we just appended. It will get retried.
q.batch = q.batch[:len(q.batch)-1] q.batch = q.batch[:len(q.batch)-1]
return false return false
@ -1082,15 +1088,19 @@ func (q *queue) Chan() <-chan []sampleOrExemplar {
return q.batchQueue return q.batchQueue
} }
// Batch returns the current batch and allocates a new batch. Must not be // Batch returns the current batch and allocates a new batch.
// called concurrently with Append.
func (q *queue) Batch() []sampleOrExemplar { func (q *queue) Batch() []sampleOrExemplar {
q.batchMtx.Lock() q.batchMtx.Lock()
defer q.batchMtx.Unlock() defer q.batchMtx.Unlock()
batch := q.batch select {
q.batch = q.newBatch(cap(batch)) case batch := <-q.batchQueue:
return batch return batch
default:
batch := q.batch
q.batch = q.newBatch(cap(batch))
return batch
}
} }
// ReturnForReuse adds the batch buffer back to the internal pool. // ReturnForReuse adds the batch buffer back to the internal pool.
@ -1201,22 +1211,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
case <-timer.C: case <-timer.C:
// We need to take the write lock when getting a batch to avoid batch := queue.Batch()
// concurrent Appends. Generally this will only happen on low
// traffic instances or during resharding. We have to use writeMtx
// and not the batchMtx on a queue because we do not want to have
// to lock each queue for each sample, and cannot call
// queue.Batch() while an Append happens.
s.writeMtx.Lock()
// First, we need to see if we can happen to get a batch from the
// queue if it filled while acquiring the lock.
var batch []sampleOrExemplar
select {
case batch = <-batchQueue:
default:
batch = queue.Batch()
}
s.writeMtx.Unlock()
if len(batch) > 0 { if len(batch) > 0 {
nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars n := nPendingSamples + nPendingExemplars

View file

@ -20,6 +20,7 @@ import (
"math" "math"
"net/url" "net/url"
"os" "os"
"runtime/pprof"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -413,6 +414,7 @@ func TestReshardPartialBatch(t *testing.T) {
case <-done: case <-done:
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Error("Deadlock between sending and stopping detected") t.Error("Deadlock between sending and stopping detected")
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.FailNow() t.FailNow()
} }
} }
@ -420,6 +422,47 @@ func TestReshardPartialBatch(t *testing.T) {
m.Stop() m.Stop()
} }
// TestQueueFilledDeadlock makes sure the code does not deadlock in the case
// where a large scrape (> capacity + max samples per send) is appended at the
// same time as a batch times out according to the batch send deadline.
func TestQueueFilledDeadlock(t *testing.T) {
samples, series := createTimeseries(50, 1)
c := NewNopWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.MaxSamplesPerSend = 10
cfg.Capacity = 20
flushDeadline := time.Second
batchSendDeadline := time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
for i := 0; i < 100; i++ {
done := make(chan struct{})
go func() {
time.Sleep(batchSendDeadline)
m.Append(samples)
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Error("Deadlock between sending and appending detected")
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.FailNow()
}
}
}
func TestReleaseNoninternedString(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig