mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
Merge branch 'release-2.33' into beorn7/release
This commit is contained in:
commit
79376c1e94
|
@ -14,6 +14,12 @@
|
|||
* [BUGFIX] UI: Fix bug that sets the range input to the resolution. #10227
|
||||
* [BUGFIX] TSDB: Fix a query panic when `memory-snapshot-on-shutdown` is enabled. #10348
|
||||
|
||||
## 2.33.5 / 2022-03-08
|
||||
|
||||
The binaries published with this release are built with Go1.17.8 to avoid [CVE-2022-24921](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-24921).
|
||||
|
||||
* [BUGFIX] Remote-write: Fix deadlock between adding to queue and getting batch. #10395
|
||||
|
||||
## 2.33.4 / 2022-02-22
|
||||
|
||||
* [BUGFIX] TSDB: Fix panic when m-mapping head chunks onto the disk. #10316
|
||||
|
|
|
@ -523,8 +523,11 @@ outer:
|
|||
continue
|
||||
}
|
||||
t.seriesMtx.Unlock()
|
||||
// This will only loop if the queues are being resharded.
|
||||
backoff := t.cfg.MinBackoff
|
||||
// Start with a very small backoff. This should not be t.cfg.MinBackoff
|
||||
// as it can happen without errors, and we want to pickup work after
|
||||
// filling a queue/resharding as quickly as possible.
|
||||
// TODO: Consider using the average duration of a request as the backoff.
|
||||
backoff := model.Duration(5 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case <-t.quit:
|
||||
|
@ -543,6 +546,8 @@ outer:
|
|||
t.metrics.enqueueRetriesTotal.Inc()
|
||||
time.Sleep(time.Duration(backoff))
|
||||
backoff = backoff * 2
|
||||
// It is reasonable to use t.cfg.MaxBackoff here, as if we have hit
|
||||
// the full backoff we are likely waiting for external resources.
|
||||
if backoff > t.cfg.MaxBackoff {
|
||||
backoff = t.cfg.MaxBackoff
|
||||
}
|
||||
|
@ -907,8 +912,7 @@ func (t *QueueManager) newShards() *shards {
|
|||
}
|
||||
|
||||
type shards struct {
|
||||
mtx sync.RWMutex // With the WAL, this is never actually contended.
|
||||
writeMtx sync.Mutex
|
||||
mtx sync.RWMutex // With the WAL, this is never actually contended.
|
||||
|
||||
qm *QueueManager
|
||||
queues []*queue
|
||||
|
@ -995,26 +999,21 @@ func (s *shards) stop() {
|
|||
}
|
||||
}
|
||||
|
||||
// enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding,
|
||||
// will return false; in this case, you should back off and retry.
|
||||
// enqueue data (sample or exemplar). If the shard is full, shutting down, or
|
||||
// resharding, it will return false; in this case, you should back off and
|
||||
// retry. A shard is full when its configured capacity has been reached,
|
||||
// specifically, when s.queues[shard] has filled its batchQueue channel and the
|
||||
// partial batch has also been filled.
|
||||
func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
s.writeMtx.Lock()
|
||||
defer s.writeMtx.Unlock()
|
||||
|
||||
select {
|
||||
case <-s.softShutdown:
|
||||
return false
|
||||
default:
|
||||
}
|
||||
|
||||
shard := uint64(ref) % uint64(len(s.queues))
|
||||
select {
|
||||
case <-s.softShutdown:
|
||||
return false
|
||||
default:
|
||||
appended := s.queues[shard].Append(data, s.softShutdown)
|
||||
appended := s.queues[shard].Append(data)
|
||||
if !appended {
|
||||
return false
|
||||
}
|
||||
|
@ -1030,9 +1029,7 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
|
|||
}
|
||||
|
||||
type queue struct {
|
||||
// batchMtx covers sending to the batchQueue and batch operations other
|
||||
// than appending a sample. It is mainly to make sure (*queue).Batch() and
|
||||
// (*queue).FlushAndShutdown() are not called concurrently.
|
||||
// batchMtx covers operations appending to or publishing the partial batch.
|
||||
batchMtx sync.Mutex
|
||||
batch []sampleOrExemplar
|
||||
batchQueue chan []sampleOrExemplar
|
||||
|
@ -1054,6 +1051,11 @@ type sampleOrExemplar struct {
|
|||
|
||||
func newQueue(batchSize, capacity int) *queue {
|
||||
batches := capacity / batchSize
|
||||
// Always create an unbuffered channel even if capacity is configured to be
|
||||
// less than max_samples_per_send.
|
||||
if batches == 0 {
|
||||
batches = 1
|
||||
}
|
||||
return &queue{
|
||||
batch: make([]sampleOrExemplar, 0, batchSize),
|
||||
batchQueue: make(chan []sampleOrExemplar, batches),
|
||||
|
@ -1063,14 +1065,18 @@ func newQueue(batchSize, capacity int) *queue {
|
|||
}
|
||||
}
|
||||
|
||||
func (q *queue) Append(datum sampleOrExemplar, stop <-chan struct{}) bool {
|
||||
// Append the sampleOrExemplar to the buffered batch. Returns false if it
|
||||
// cannot be added and must be retried.
|
||||
func (q *queue) Append(datum sampleOrExemplar) bool {
|
||||
q.batchMtx.Lock()
|
||||
defer q.batchMtx.Unlock()
|
||||
q.batch = append(q.batch, datum)
|
||||
if len(q.batch) == cap(q.batch) {
|
||||
select {
|
||||
case q.batchQueue <- q.batch:
|
||||
q.batch = q.newBatch(cap(q.batch))
|
||||
return true
|
||||
case <-stop:
|
||||
default:
|
||||
// Remove the sample we just appended. It will get retried.
|
||||
q.batch = q.batch[:len(q.batch)-1]
|
||||
return false
|
||||
|
@ -1083,15 +1089,19 @@ func (q *queue) Chan() <-chan []sampleOrExemplar {
|
|||
return q.batchQueue
|
||||
}
|
||||
|
||||
// Batch returns the current batch and allocates a new batch. Must not be
|
||||
// called concurrently with Append.
|
||||
// Batch returns the current batch and allocates a new batch.
|
||||
func (q *queue) Batch() []sampleOrExemplar {
|
||||
q.batchMtx.Lock()
|
||||
defer q.batchMtx.Unlock()
|
||||
|
||||
batch := q.batch
|
||||
q.batch = q.newBatch(cap(batch))
|
||||
return batch
|
||||
select {
|
||||
case batch := <-q.batchQueue:
|
||||
return batch
|
||||
default:
|
||||
batch := q.batch
|
||||
q.batch = q.newBatch(cap(batch))
|
||||
return batch
|
||||
}
|
||||
}
|
||||
|
||||
// ReturnForReuse adds the batch buffer back to the internal pool.
|
||||
|
@ -1202,22 +1212,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
|||
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
|
||||
|
||||
case <-timer.C:
|
||||
// We need to take the write lock when getting a batch to avoid
|
||||
// concurrent Appends. Generally this will only happen on low
|
||||
// traffic instances or during resharding. We have to use writeMtx
|
||||
// and not the batchMtx on a queue because we do not want to have
|
||||
// to lock each queue for each sample, and cannot call
|
||||
// queue.Batch() while an Append happens.
|
||||
s.writeMtx.Lock()
|
||||
// First, we need to see if we can happen to get a batch from the
|
||||
// queue if it filled while acquiring the lock.
|
||||
var batch []sampleOrExemplar
|
||||
select {
|
||||
case batch = <-batchQueue:
|
||||
default:
|
||||
batch = queue.Batch()
|
||||
}
|
||||
s.writeMtx.Unlock()
|
||||
batch := queue.Batch()
|
||||
if len(batch) > 0 {
|
||||
nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData)
|
||||
n := nPendingSamples + nPendingExemplars
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"math"
|
||||
"net/url"
|
||||
"os"
|
||||
"runtime/pprof"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -413,6 +414,7 @@ func TestReshardPartialBatch(t *testing.T) {
|
|||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("Deadlock between sending and stopping detected")
|
||||
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
@ -420,6 +422,47 @@ func TestReshardPartialBatch(t *testing.T) {
|
|||
m.Stop()
|
||||
}
|
||||
|
||||
// TestQueueFilledDeadlock makes sure the code does not deadlock in the case
|
||||
// where a large scrape (> capacity + max samples per send) is appended at the
|
||||
// same time as a batch times out according to the batch send deadline.
|
||||
func TestQueueFilledDeadlock(t *testing.T) {
|
||||
samples, series := createTimeseries(50, 1)
|
||||
|
||||
c := NewNopWriteClient()
|
||||
|
||||
cfg := config.DefaultQueueConfig
|
||||
mcfg := config.DefaultMetadataConfig
|
||||
cfg.MaxShards = 1
|
||||
cfg.MaxSamplesPerSend = 10
|
||||
cfg.Capacity = 20
|
||||
flushDeadline := time.Second
|
||||
batchSendDeadline := time.Millisecond
|
||||
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
||||
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
|
||||
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
||||
m.StoreSeries(series, 0)
|
||||
m.Start()
|
||||
defer m.Stop()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
time.Sleep(batchSendDeadline)
|
||||
m.Append(samples)
|
||||
done <- struct{}{}
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("Deadlock between sending and appending detected")
|
||||
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestReleaseNoninternedString(t *testing.T) {
|
||||
cfg := config.DefaultQueueConfig
|
||||
mcfg := config.DefaultMetadataConfig
|
||||
|
|
Loading…
Reference in a new issue