mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #6511 from prometheus/remote-write-delay
Use samplesPending rather than integral
This commit is contained in:
commit
5a59fd9332
|
@ -214,8 +214,6 @@ type QueueManager struct {
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate
|
samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate
|
||||||
integralAccumulator float64
|
|
||||||
startedAt time.Time
|
|
||||||
|
|
||||||
highestSentTimestampMetric *maxGauge
|
highestSentTimestampMetric *maxGauge
|
||||||
pendingSamplesMetric prometheus.Gauge
|
pendingSamplesMetric prometheus.Gauge
|
||||||
|
@ -316,8 +314,6 @@ outer:
|
||||||
// Start the queue manager sending samples to the remote storage.
|
// Start the queue manager sending samples to the remote storage.
|
||||||
// Does not block.
|
// Does not block.
|
||||||
func (t *QueueManager) Start() {
|
func (t *QueueManager) Start() {
|
||||||
t.startedAt = time.Now()
|
|
||||||
|
|
||||||
// Setup the QueueManagers metrics. We do this here rather than in the
|
// Setup the QueueManagers metrics. We do this here rather than in the
|
||||||
// constructor because of the ordering of creating Queue Managers's, stopping them,
|
// constructor because of the ordering of creating Queue Managers's, stopping them,
|
||||||
// and then starting new ones in storage/remote/storage.go ApplyConfig.
|
// and then starting new ones in storage/remote/storage.go ApplyConfig.
|
||||||
|
@ -530,26 +526,14 @@ func (t *QueueManager) calculateDesiredShards() int {
|
||||||
samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate
|
samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate
|
||||||
highestSent = t.highestSentTimestampMetric.Get()
|
highestSent = t.highestSentTimestampMetric.Get()
|
||||||
highestRecv = highestTimestamp.Get()
|
highestRecv = highestTimestamp.Get()
|
||||||
samplesPending = (highestRecv - highestSent) * samplesInRate * samplesKeptRatio
|
delay = highestRecv - highestSent
|
||||||
|
samplesPending = delay * samplesInRate * samplesKeptRatio
|
||||||
)
|
)
|
||||||
|
|
||||||
if samplesOutRate <= 0 {
|
if samplesOutRate <= 0 {
|
||||||
return t.numShards
|
return t.numShards
|
||||||
}
|
}
|
||||||
|
|
||||||
// We use an integral accumulator, like in a PID, to help dampen
|
|
||||||
// oscillation. The accumulator will correct for any errors not accounted
|
|
||||||
// for in the desired shard calculation by adjusting for pending samples.
|
|
||||||
const integralGain = 0.2
|
|
||||||
// Initialise the integral accumulator as the average rate of samples
|
|
||||||
// pending. This accounts for pending samples that were created while the
|
|
||||||
// WALWatcher starts up.
|
|
||||||
if t.integralAccumulator == 0 {
|
|
||||||
elapsed := time.Since(t.startedAt) / time.Second
|
|
||||||
t.integralAccumulator = integralGain * samplesPending / float64(elapsed)
|
|
||||||
}
|
|
||||||
t.integralAccumulator += samplesPendingRate * integralGain
|
|
||||||
|
|
||||||
// We shouldn't reshard if Prometheus hasn't been able to send to the
|
// We shouldn't reshard if Prometheus hasn't been able to send to the
|
||||||
// remote endpoint successfully within some period of time.
|
// remote endpoint successfully within some period of time.
|
||||||
minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix()
|
minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix()
|
||||||
|
@ -559,9 +543,14 @@ func (t *QueueManager) calculateDesiredShards() int {
|
||||||
return t.numShards
|
return t.numShards
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When behind we will try to catch up on a proporation of samples per tick.
|
||||||
|
// This works similarly to an integral accumulator in that pending samples
|
||||||
|
// is the result of the error integral.
|
||||||
|
const integralGain = 0.1 / float64(shardUpdateDuration/time.Second)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
timePerSample = samplesOutDuration / samplesOutRate
|
timePerSample = samplesOutDuration / samplesOutRate
|
||||||
desiredShards = timePerSample * (samplesInRate + t.integralAccumulator)
|
desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending)
|
||||||
)
|
)
|
||||||
t.desiredNumShards.Set(desiredShards)
|
t.desiredNumShards.Set(desiredShards)
|
||||||
level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards",
|
level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards",
|
||||||
|
@ -575,7 +564,6 @@ func (t *QueueManager) calculateDesiredShards() int {
|
||||||
"desiredShards", desiredShards,
|
"desiredShards", desiredShards,
|
||||||
"highestSent", highestSent,
|
"highestSent", highestSent,
|
||||||
"highestRecv", highestRecv,
|
"highestRecv", highestRecv,
|
||||||
"integralAccumulator", t.integralAccumulator,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Changes in the number of shards must be greater than shardToleranceFraction.
|
// Changes in the number of shards must be greater than shardToleranceFraction.
|
||||||
|
@ -590,6 +578,12 @@ func (t *QueueManager) calculateDesiredShards() int {
|
||||||
}
|
}
|
||||||
|
|
||||||
numShards := int(math.Ceil(desiredShards))
|
numShards := int(math.Ceil(desiredShards))
|
||||||
|
// Do not downshard if we are more than ten seconds back.
|
||||||
|
if numShards < t.numShards && delay > 10.0 {
|
||||||
|
level.Debug(t.logger).Log("msg", "Not downsharding due to being too far behind")
|
||||||
|
return t.numShards
|
||||||
|
}
|
||||||
|
|
||||||
if numShards > t.cfg.MaxShards {
|
if numShards > t.cfg.MaxShards {
|
||||||
numShards = t.cfg.MaxShards
|
numShards = t.cfg.MaxShards
|
||||||
} else if numShards < t.cfg.MinShards {
|
} else if numShards < t.cfg.MinShards {
|
||||||
|
|
|
@ -577,3 +577,80 @@ func TestProcessExternalLabels(t *testing.T) {
|
||||||
testutil.Equals(t, tc.expected, processExternalLabels(tc.labels, tc.externalLabels))
|
testutil.Equals(t, tc.expected, processExternalLabels(tc.labels, tc.externalLabels))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCalculateDesiredShards(t *testing.T) {
|
||||||
|
c := NewTestStorageClient()
|
||||||
|
cfg := config.DefaultQueueConfig
|
||||||
|
|
||||||
|
dir, err := ioutil.TempDir("", "TestCalculateDesiredShards")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
|
||||||
|
m := NewQueueManager(nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline)
|
||||||
|
|
||||||
|
// Need to start the queue manager so the proper metrics are initialized.
|
||||||
|
// However we can stop it right away since we don't need to do any actual
|
||||||
|
// processing.
|
||||||
|
m.Start()
|
||||||
|
m.Stop()
|
||||||
|
|
||||||
|
inputRate := int64(50000)
|
||||||
|
var pendingSamples int64
|
||||||
|
|
||||||
|
// Two minute startup, no samples are sent.
|
||||||
|
startedAt := time.Now().Add(-2 * time.Minute)
|
||||||
|
|
||||||
|
// helper function for adding samples.
|
||||||
|
addSamples := func(s int64, ts time.Duration) {
|
||||||
|
pendingSamples += s
|
||||||
|
samplesIn.incr(s)
|
||||||
|
samplesIn.tick()
|
||||||
|
|
||||||
|
highestTimestamp.Set(float64(startedAt.Add(ts).Unix()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper function for sending samples.
|
||||||
|
sendSamples := func(s int64, ts time.Duration) {
|
||||||
|
pendingSamples -= s
|
||||||
|
m.samplesOut.incr(s)
|
||||||
|
m.samplesOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration))
|
||||||
|
|
||||||
|
// highest sent is how far back pending samples would be at our input rate.
|
||||||
|
highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second)
|
||||||
|
m.highestSentTimestampMetric.Set(float64(highestSent.Unix()))
|
||||||
|
|
||||||
|
atomic.StoreInt64(&m.lastSendTimestamp, time.Now().Unix())
|
||||||
|
}
|
||||||
|
|
||||||
|
ts := time.Duration(0)
|
||||||
|
for ; ts < 120*time.Second; ts += shardUpdateDuration {
|
||||||
|
addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts)
|
||||||
|
m.numShards = m.calculateDesiredShards()
|
||||||
|
testutil.Equals(t, 1, m.numShards)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assume 100ms per request, or 10 requests per second per shard.
|
||||||
|
// Shard calculation should never drop below barely keeping up.
|
||||||
|
minShards := int(inputRate) / cfg.MaxSamplesPerSend / 10
|
||||||
|
// This test should never go above 200 shards, that would be more resources than needed.
|
||||||
|
maxShards := 200
|
||||||
|
|
||||||
|
for ; ts < 15*time.Minute; ts += shardUpdateDuration {
|
||||||
|
sin := inputRate * int64(shardUpdateDuration/time.Second)
|
||||||
|
addSamples(sin, ts)
|
||||||
|
|
||||||
|
sout := int64(m.numShards*cfg.MaxSamplesPerSend) * int64(shardUpdateDuration/(100*time.Millisecond))
|
||||||
|
// You can't send samples that don't exist so cap at the number of pending samples.
|
||||||
|
if sout > pendingSamples {
|
||||||
|
sout = pendingSamples
|
||||||
|
}
|
||||||
|
sendSamples(sout, ts)
|
||||||
|
|
||||||
|
t.Log("desiredShards", m.numShards, "pendingSamples", pendingSamples)
|
||||||
|
m.numShards = m.calculateDesiredShards()
|
||||||
|
testutil.Assert(t, m.numShards >= minShards, "Shards are too low. desiredShards=%d, minShards=%d, t_seconds=%d", m.numShards, minShards, ts/time.Second)
|
||||||
|
testutil.Assert(t, m.numShards <= maxShards, "Shards are too high. desiredShards=%d, maxShards=%d, t_seconds=%d", m.numShards, maxShards, ts/time.Second)
|
||||||
|
}
|
||||||
|
testutil.Assert(t, pendingSamples == 0, "Remote write never caught up, there are still %d pending samples.", pendingSamples)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue