Merge pull request #10285 from prometheus/release-2.33

This commit is contained in:
Julien Pivotto 2022-02-12 00:02:24 +01:00 committed by GitHub
commit b0d70557b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 14 deletions

View file

@ -1,3 +1,12 @@
## 2.33.3 / 2022-02-11
* [BUGFIX] Azure SD: Fix a regression when public IP Address isn't set. #10289
## 2.33.2 / 2022-02-11
* [BUGFIX] Azure SD: Fix panic when public IP Address isn't set. #10280
* [BUGFIX] Remote-write: Fix deadlock when stopping a shard. #10279
## 2.33.1 / 2022-02-02 ## 2.33.1 / 2022-02-02
* [BUGFIX] SD: Fix _no such file or directory_ in K8s SD when not running inside K8s. #10235 * [BUGFIX] SD: Fix _no such file or directory_ in K8s SD when not running inside K8s. #10235

View file

@ -1 +1 @@
2.33.1 2.33.3

View file

@ -371,7 +371,9 @@ func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
if *networkInterface.Primary { if *networkInterface.Primary {
for _, ip := range *networkInterface.IPConfigurations { for _, ip := range *networkInterface.IPConfigurations {
if ip.PublicIPAddress != nil && ip.PublicIPAddress.PublicIPAddressPropertiesFormat != nil { // IPAddress is a field defined in PublicIPAddressPropertiesFormat,
// therefore we need to validate that both are not nil.
if ip.PublicIPAddress != nil && ip.PublicIPAddress.PublicIPAddressPropertiesFormat != nil && ip.PublicIPAddress.IPAddress != nil {
labels[azureLabelMachinePublicIP] = model.LabelValue(*ip.PublicIPAddress.IPAddress) labels[azureLabelMachinePublicIP] = model.LabelValue(*ip.PublicIPAddress.IPAddress)
} }
if ip.PrivateIPAddress != nil { if ip.PrivateIPAddress != nil {

View file

@ -907,7 +907,8 @@ func (t *QueueManager) newShards() *shards {
} }
type shards struct { type shards struct {
mtx sync.RWMutex // With the WAL, this is never actually contended. mtx sync.RWMutex // With the WAL, this is never actually contended.
writeMtx sync.Mutex
qm *QueueManager qm *QueueManager
queues []*queue queues []*queue
@ -950,6 +951,8 @@ func (s *shards) start(n int) {
s.softShutdown = make(chan struct{}) s.softShutdown = make(chan struct{})
s.running.Store(int32(n)) s.running.Store(int32(n))
s.done = make(chan struct{}) s.done = make(chan struct{})
s.enqueuedSamples.Store(0)
s.enqueuedExemplars.Store(0)
s.samplesDroppedOnHardShutdown.Store(0) s.samplesDroppedOnHardShutdown.Store(0)
s.exemplarsDroppedOnHardShutdown.Store(0) s.exemplarsDroppedOnHardShutdown.Store(0)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
@ -997,6 +1000,8 @@ func (s *shards) stop() {
func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
s.mtx.RLock() s.mtx.RLock()
defer s.mtx.RUnlock() defer s.mtx.RUnlock()
s.writeMtx.Lock()
defer s.writeMtx.Unlock()
select { select {
case <-s.softShutdown: case <-s.softShutdown:
@ -1025,14 +1030,18 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
} }
type queue struct { type queue struct {
// batchMtx covers sending to the batchQueue and batch operations other
// than appending a sample. It is mainly to make sure (*queue).Batch() and
// (*queue).FlushAndShutdown() are not called concurrently.
batchMtx sync.Mutex
batch []sampleOrExemplar batch []sampleOrExemplar
batchQueue chan []sampleOrExemplar batchQueue chan []sampleOrExemplar
// Since we know there are a limited number of batches out, using a stack // Since we know there are a limited number of batches out, using a stack
// is easy and safe so a sync.Pool is not necessary. // is easy and safe so a sync.Pool is not necessary.
// poolMtx covers adding and removing batches from the batchPool.
poolMtx sync.Mutex
batchPool [][]sampleOrExemplar batchPool [][]sampleOrExemplar
// This mutex covers adding and removing batches from the batchPool.
poolMux sync.Mutex
} }
type sampleOrExemplar struct { type sampleOrExemplar struct {
@ -1077,6 +1086,9 @@ func (q *queue) Chan() <-chan []sampleOrExemplar {
// Batch returns the current batch and allocates a new batch. Must not be // Batch returns the current batch and allocates a new batch. Must not be
// called concurrently with Append. // called concurrently with Append.
func (q *queue) Batch() []sampleOrExemplar { func (q *queue) Batch() []sampleOrExemplar {
q.batchMtx.Lock()
defer q.batchMtx.Unlock()
batch := q.batch batch := q.batch
q.batch = q.newBatch(cap(batch)) q.batch = q.newBatch(cap(batch))
return batch return batch
@ -1084,8 +1096,8 @@ func (q *queue) Batch() []sampleOrExemplar {
// ReturnForReuse adds the batch buffer back to the internal pool. // ReturnForReuse adds the batch buffer back to the internal pool.
func (q *queue) ReturnForReuse(batch []sampleOrExemplar) { func (q *queue) ReturnForReuse(batch []sampleOrExemplar) {
q.poolMux.Lock() q.poolMtx.Lock()
defer q.poolMux.Unlock() defer q.poolMtx.Unlock()
if len(q.batchPool) < cap(q.batchPool) { if len(q.batchPool) < cap(q.batchPool) {
q.batchPool = append(q.batchPool, batch[:0]) q.batchPool = append(q.batchPool, batch[:0])
} }
@ -1094,6 +1106,9 @@ func (q *queue) ReturnForReuse(batch []sampleOrExemplar) {
// FlushAndShutdown stops the queue and flushes any samples. No appends can be // FlushAndShutdown stops the queue and flushes any samples. No appends can be
// made after this is called. // made after this is called.
func (q *queue) FlushAndShutdown(done <-chan struct{}) { func (q *queue) FlushAndShutdown(done <-chan struct{}) {
q.batchMtx.Lock()
defer q.batchMtx.Unlock()
if len(q.batch) > 0 { if len(q.batch) > 0 {
select { select {
case q.batchQueue <- q.batch: case q.batchQueue <- q.batch:
@ -1107,8 +1122,8 @@ func (q *queue) FlushAndShutdown(done <-chan struct{}) {
} }
func (q *queue) newBatch(capacity int) []sampleOrExemplar { func (q *queue) newBatch(capacity int) []sampleOrExemplar {
q.poolMux.Lock() q.poolMtx.Lock()
defer q.poolMux.Unlock() defer q.poolMtx.Unlock()
batches := len(q.batchPool) batches := len(q.batchPool)
if batches > 0 { if batches > 0 {
batch := q.batchPool[batches-1] batch := q.batchPool[batches-1]
@ -1187,18 +1202,22 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
case <-timer.C: case <-timer.C:
// We need to take the lock when getting a batch to avoid // We need to take the write lock when getting a batch to avoid
// concurrent Appends. Generally this will only happen on low // concurrent Appends. Generally this will only happen on low
// traffic instances. // traffic instances or during resharding. We have to use writeMtx
s.mtx.Lock() // and not the batchMtx on a queue because we do not want to have
// First, we need to see if we can happen to get a batch from the queue if it filled while acquiring the lock. // to lock each queue for each sample, and cannot call
// queue.Batch() while an Append happens.
s.writeMtx.Lock()
// First, we need to see if we can happen to get a batch from the
// queue if it filled while acquiring the lock.
var batch []sampleOrExemplar var batch []sampleOrExemplar
select { select {
case batch = <-batchQueue: case batch = <-batchQueue:
default: default:
batch = queue.Batch() batch = queue.Batch()
} }
s.mtx.Unlock() s.writeMtx.Unlock()
if len(batch) > 0 { if len(batch) > 0 {
nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars n := nPendingSamples + nPendingExemplars

View file

@ -382,6 +382,44 @@ func TestReshardRaceWithStop(t *testing.T) {
<-exitCh <-exitCh
} }
func TestReshardPartialBatch(t *testing.T) {
samples, series := createTimeseries(1, 10)
c := NewTestBlockedWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
batchSendDeadline := time.Millisecond
flushDeadline := 10 * time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
m.Start()
for i := 0; i < 100; i++ {
done := make(chan struct{})
go func() {
m.Append(samples)
time.Sleep(batchSendDeadline)
m.shards.stop()
m.shards.start(1)
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Error("Deadlock between sending and stopping detected")
t.FailNow()
}
}
// We can only call stop if there was not a deadlock.
m.Stop()
}
func TestReleaseNoninternedString(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig