mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Merge pull request #3950 from prometheus/cherrypick-3941
Cherrypick #3941 "Correctly stop the timer used in the remote write path."
This commit is contained in:
commit
d8cfd8f108
|
@ -430,6 +430,15 @@ func (s *shards) runShard(i int) {
|
|||
pendingSamples := model.Samples{}
|
||||
|
||||
timer := time.NewTimer(s.qm.cfg.BatchSendDeadline)
|
||||
stop := func() {
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
defer stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
|
@ -446,19 +455,20 @@ func (s *shards) runShard(i int) {
|
|||
queueLength.WithLabelValues(s.qm.queueName).Dec()
|
||||
pendingSamples = append(pendingSamples, sample)
|
||||
|
||||
for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend {
|
||||
if len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend {
|
||||
s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend])
|
||||
pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:]
|
||||
|
||||
stop()
|
||||
timer.Reset(s.qm.cfg.BatchSendDeadline)
|
||||
}
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
timer.Reset(s.qm.cfg.BatchSendDeadline)
|
||||
|
||||
case <-timer.C:
|
||||
if len(pendingSamples) > 0 {
|
||||
s.sendSamples(pendingSamples)
|
||||
pendingSamples = pendingSamples[:0]
|
||||
}
|
||||
timer.Reset(s.qm.cfg.BatchSendDeadline)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,6 +44,9 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) {
|
|||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
c.expectedSamples = map[string][]*prompb.Sample{}
|
||||
c.receivedSamples = map[string][]*prompb.Sample{}
|
||||
|
||||
for _, s := range ss {
|
||||
ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String()
|
||||
c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{
|
||||
|
@ -122,6 +125,44 @@ func TestSampleDelivery(t *testing.T) {
|
|||
c.waitForExpectedSamples(t)
|
||||
}
|
||||
|
||||
func TestSampleDeliveryTimeout(t *testing.T) {
|
||||
// Let's send one less sample than batch size, and wait the timeout duration
|
||||
n := config.DefaultQueueConfig.Capacity - 1
|
||||
|
||||
samples := make(model.Samples, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i))
|
||||
samples = append(samples, &model.Sample{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: name,
|
||||
},
|
||||
Value: model.SampleValue(i),
|
||||
})
|
||||
}
|
||||
|
||||
c := NewTestStorageClient()
|
||||
|
||||
cfg := config.DefaultQueueConfig
|
||||
cfg.MaxShards = 1
|
||||
cfg.BatchSendDeadline = 100 * time.Millisecond
|
||||
m := NewQueueManager(nil, cfg, nil, nil, c)
|
||||
m.Start()
|
||||
defer m.Stop()
|
||||
|
||||
// Send the samples twice, waiting for the samples in the meantime.
|
||||
c.expectSamples(samples)
|
||||
for _, s := range samples {
|
||||
m.Append(s)
|
||||
}
|
||||
c.waitForExpectedSamples(t)
|
||||
|
||||
c.expectSamples(samples)
|
||||
for _, s := range samples {
|
||||
m.Append(s)
|
||||
}
|
||||
c.waitForExpectedSamples(t)
|
||||
}
|
||||
|
||||
func TestSampleDeliveryOrder(t *testing.T) {
|
||||
ts := 10
|
||||
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
|
||||
|
|
Loading…
Reference in a new issue