From a6a55c433cb9735923eebe52292b18db289c7f8a Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 13 Aug 2019 03:10:21 -0600 Subject: [PATCH] Improve desired shards calculation (#5763) The desired shards calculation now properly keeps track of the rate of pending samples, and uses the previously unused integralAccumulator to adjust for missing information in the desired shards calculation. Also, configure more capacity for each shard. The default 10 capacity causes shards to block on each other while sending remote requests. Default to a 500 sample capacity and explain in the documentation that having more capacity will help throughput. Signed-off-by: Chris Marchbanks --- config/config.go | 11 +++---- docs/configuration/configuration.md | 7 +++-- storage/remote/ewma.go | 5 ++-- storage/remote/queue_manager.go | 45 +++++++++++++++++++--------- storage/remote/queue_manager_test.go | 2 +- 5 files changed, 45 insertions(+), 25 deletions(-) diff --git a/config/config.go b/config/config.go index 2716c8cf3..391d51e31 100644 --- a/config/config.go +++ b/config/config.go @@ -111,10 +111,10 @@ var ( MinShards: 1, MaxSamplesPerSend: 100, - // Each shard will have a max of 10 samples pending in it's channel, plus the pending - // samples that have been enqueued. Theoretically we should only ever have about 110 samples - // per shard pending. At 1000 shards that's 110k. - Capacity: 10, + // Each shard will have a max of 500 samples pending in it's channel, plus the pending + // samples that have been enqueued. Theoretically we should only ever have about 600 samples + // per shard pending. At 1000 shards that's 600k. + Capacity: 500, BatchSendDeadline: model.Duration(5 * time.Second), // Backoff times for retrying a batch of samples on recoverable errors. @@ -616,7 +616,8 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err // QueueConfig is the configuration for the queue used to write to remote // storage. type QueueConfig struct { - // Number of samples to buffer per shard before we start dropping them. + // Number of samples to buffer per shard before we block. Defaults to + // MaxSamplesPerSend. Capacity int `yaml:"capacity,omitempty"` // Max number of shards, i.e. amount of concurrency. diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index f168192a2..2770fcfbd 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -1294,8 +1294,11 @@ tls_config: # Configures the queue used to write to remote storage. queue_config: - # Number of samples to buffer per shard before we block reading of more samples from the WAL. - [ capacity: | default = 10 ] + # Number of samples to buffer per shard before we block reading of more + # samples from the WAL. It is recommended to have enough capacity in each + # shard to buffer several requests to keep throughput up while processing + # occassional slow remote requests. + [ capacity: | default = 500 ] # Maximum number of shards, i.e. amount of concurrency. [ max_shards: | default = 1000 ] # Minimum number of shards, i.e. amount of concurrency. diff --git a/storage/remote/ewma.go b/storage/remote/ewma.go index 82b6dd101..d1ba27565 100644 --- a/storage/remote/ewma.go +++ b/storage/remote/ewma.go @@ -47,8 +47,7 @@ func (r *ewmaRate) rate() float64 { // tick assumes to be called every r.interval. func (r *ewmaRate) tick() { - newEvents := atomic.LoadInt64(&r.newEvents) - atomic.AddInt64(&r.newEvents, -newEvents) + newEvents := atomic.SwapInt64(&r.newEvents, 0) instantRate := float64(newEvents) / r.interval.Seconds() r.mutex.Lock() @@ -56,7 +55,7 @@ func (r *ewmaRate) tick() { if r.init { r.lastRate += r.alpha * (instantRate - r.lastRate) - } else { + } else if newEvents > 0 { r.init = true r.lastRate = instantRate } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 7a1ce313a..e5c62bb0f 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -178,6 +178,7 @@ type QueueManager struct { samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate integralAccumulator float64 + startedAt time.Time highestSentTimestampMetric *maxGauge pendingSamplesMetric prometheus.Gauge @@ -277,6 +278,8 @@ outer: // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { + t.startedAt = time.Now() + // Setup the QueueManagers metrics. We do this here rather than in the // constructor because of the ordering of creating Queue Managers's, stopping them, // and then starting new ones in storage/remote/storage.go ApplyConfig. @@ -440,36 +443,50 @@ func (t *QueueManager) calculateDesiredShards() { // (received - send) so we can catch up with any backlog. We use the average // outgoing batch latency to work out how many shards we need. var ( - samplesIn = t.samplesIn.rate() - samplesOut = t.samplesOut.rate() - samplesKeptRatio = samplesOut / (t.samplesDropped.rate() + samplesOut) - samplesOutDuration = t.samplesOutDuration.rate() + samplesInRate = t.samplesIn.rate() + samplesOutRate = t.samplesOut.rate() + samplesKeptRatio = samplesOutRate / (t.samplesDropped.rate() + samplesOutRate) + samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second) + samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate highestSent = t.highestSentTimestampMetric.Get() highestRecv = highestTimestamp.Get() - samplesPending = (highestRecv - highestSent) * samplesIn * samplesKeptRatio + samplesPending = (highestRecv - highestSent) * samplesInRate * samplesKeptRatio ) - // We use an integral accumulator, like in a PID, to help dampen oscillation. - t.integralAccumulator = t.integralAccumulator + (samplesPending * 0.1) - - if samplesOut <= 0 { + if samplesOutRate <= 0 { return } + // We use an integral accumulator, like in a PID, to help dampen + // oscillation. The accumulator will correct for any errors not accounted + // for in the desired shard calculation by adjusting for pending samples. + const integralGain = 0.2 + // Initialise the integral accumulator as the average rate of samples + // pending. This accounts for pending samples that were created while the + // WALWatcher starts up. + if t.integralAccumulator == 0 { + elapsed := time.Since(t.startedAt) / time.Second + t.integralAccumulator = integralGain * samplesPending / float64(elapsed) + } + t.integralAccumulator += samplesPendingRate * integralGain + var ( - timePerSample = samplesOutDuration / samplesOut - desiredShards = (timePerSample * samplesPending) / float64(time.Second) + timePerSample = samplesOutDuration / samplesOutRate + desiredShards = timePerSample * (samplesInRate + t.integralAccumulator) ) level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards", - "samplesIn", samplesIn, - "samplesOut", samplesOut, + "samplesInRate", samplesInRate, + "samplesOutRate", samplesOutRate, "samplesKeptRatio", samplesKeptRatio, + "samplesPendingRate", samplesPendingRate, "samplesPending", samplesPending, "samplesOutDuration", samplesOutDuration, "timePerSample", timePerSample, "desiredShards", desiredShards, "highestSent", highestSent, - "highestRecv", highestRecv) + "highestRecv", highestRecv, + "integralAccumulator", t.integralAccumulator, + ) // Changes in the number of shards must be greater than shardToleranceFraction. var ( diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 9e3198201..6731be81d 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -46,7 +46,7 @@ const defaultFlushDeadline = 1 * time.Minute func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. - n := config.DefaultQueueConfig.Capacity * 2 + n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 samples, series := createTimeseries(n) c := NewTestStorageClient()