From 9e24e1f9e8ea069de30cd576416d95e3e0b9727a Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Mon, 23 Dec 2019 11:03:54 -0700 Subject: [PATCH] Use samplesPending rather than integral The integral accumulator in the remote write sharding code is just a second way of keeping track of the number of samples pending. Remove integralAccumulator and use the samplesPending value we already calculate to calculate the number of shards. This has the added benefit of fixing a bug where the integralAccumulator was not being initialized correctly due to not taking into account the number of ticks being counted, causing the integralAccumulator initial value to be off by an order of magnitude in some cases. Signed-off-by: Chris Marchbanks --- storage/remote/queue_manager.go | 25 ++++++------------------- storage/remote/queue_manager_test.go | 1 - 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 71d34e0d32..c3d97430db 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -214,8 +214,6 @@ type QueueManager struct { wg sync.WaitGroup samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate - integralAccumulator float64 - startedAt time.Time highestSentTimestampMetric *maxGauge pendingSamplesMetric prometheus.Gauge @@ -316,8 +314,6 @@ outer: // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { - t.startedAt = time.Now() - // Setup the QueueManagers metrics. We do this here rather than in the // constructor because of the ordering of creating Queue Managers's, stopping them, // and then starting new ones in storage/remote/storage.go ApplyConfig. @@ -537,19 +533,6 @@ func (t *QueueManager) calculateDesiredShards() int { return t.numShards } - // We use an integral accumulator, like in a PID, to help dampen - // oscillation. The accumulator will correct for any errors not accounted - // for in the desired shard calculation by adjusting for pending samples. - const integralGain = 0.2 - // Initialise the integral accumulator as the average rate of samples - // pending. This accounts for pending samples that were created while the - // WALWatcher starts up. - if t.integralAccumulator == 0 { - elapsed := time.Since(t.startedAt) / time.Second - t.integralAccumulator = integralGain * samplesPending / float64(elapsed) - } - t.integralAccumulator += samplesPendingRate * integralGain - // We shouldn't reshard if Prometheus hasn't been able to send to the // remote endpoint successfully within some period of time. minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix() @@ -559,9 +542,14 @@ func (t *QueueManager) calculateDesiredShards() int { return t.numShards } + // When behind we will try to catch up on a proporation of samples per tick. + // This works similarly to an integral accumulator in that pending samples + // is the result of the error integral. + const integralGain = 0.1 / float64(shardUpdateDuration/time.Second) + var ( timePerSample = samplesOutDuration / samplesOutRate - desiredShards = timePerSample * (samplesInRate + t.integralAccumulator) + desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending) ) t.desiredNumShards.Set(desiredShards) level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards", @@ -575,7 +563,6 @@ func (t *QueueManager) calculateDesiredShards() int { "desiredShards", desiredShards, "highestSent", highestSent, "highestRecv", highestRecv, - "integralAccumulator", t.integralAccumulator, ) // Changes in the number of shards must be greater than shardToleranceFraction. diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 68a32ff041..82206f3744 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -624,7 +624,6 @@ func TestCalculateDesiredShards(t *testing.T) { } ts := time.Duration(0) - m.startedAt = startedAt for ; ts < 120*time.Second; ts += shardUpdateDuration { addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts) m.numShards = m.calculateDesiredShards()