diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 71d34e0d32..c3d97430db 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -214,8 +214,6 @@ type QueueManager struct { wg sync.WaitGroup samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate - integralAccumulator float64 - startedAt time.Time highestSentTimestampMetric *maxGauge pendingSamplesMetric prometheus.Gauge @@ -316,8 +314,6 @@ outer: // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { - t.startedAt = time.Now() - // Setup the QueueManagers metrics. We do this here rather than in the // constructor because of the ordering of creating Queue Managers's, stopping them, // and then starting new ones in storage/remote/storage.go ApplyConfig. @@ -537,19 +533,6 @@ func (t *QueueManager) calculateDesiredShards() int { return t.numShards } - // We use an integral accumulator, like in a PID, to help dampen - // oscillation. The accumulator will correct for any errors not accounted - // for in the desired shard calculation by adjusting for pending samples. - const integralGain = 0.2 - // Initialise the integral accumulator as the average rate of samples - // pending. This accounts for pending samples that were created while the - // WALWatcher starts up. - if t.integralAccumulator == 0 { - elapsed := time.Since(t.startedAt) / time.Second - t.integralAccumulator = integralGain * samplesPending / float64(elapsed) - } - t.integralAccumulator += samplesPendingRate * integralGain - // We shouldn't reshard if Prometheus hasn't been able to send to the // remote endpoint successfully within some period of time. minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix() @@ -559,9 +542,14 @@ func (t *QueueManager) calculateDesiredShards() int { return t.numShards } + // When behind we will try to catch up on a proporation of samples per tick. + // This works similarly to an integral accumulator in that pending samples + // is the result of the error integral. + const integralGain = 0.1 / float64(shardUpdateDuration/time.Second) + var ( timePerSample = samplesOutDuration / samplesOutRate - desiredShards = timePerSample * (samplesInRate + t.integralAccumulator) + desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending) ) t.desiredNumShards.Set(desiredShards) level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards", @@ -575,7 +563,6 @@ func (t *QueueManager) calculateDesiredShards() int { "desiredShards", desiredShards, "highestSent", highestSent, "highestRecv", highestRecv, - "integralAccumulator", t.integralAccumulator, ) // Changes in the number of shards must be greater than shardToleranceFraction. diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 68a32ff041..82206f3744 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -624,7 +624,6 @@ func TestCalculateDesiredShards(t *testing.T) { } ts := time.Duration(0) - m.startedAt = startedAt for ; ts < 120*time.Second; ts += shardUpdateDuration { addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts) m.numShards = m.calculateDesiredShards()