diff --git a/config/config.go b/config/config.go index 2716c8cf37..391d51e31d 100644 --- a/config/config.go +++ b/config/config.go @@ -111,10 +111,10 @@ var ( MinShards: 1, MaxSamplesPerSend: 100, - // Each shard will have a max of 10 samples pending in it's channel, plus the pending - // samples that have been enqueued. Theoretically we should only ever have about 110 samples - // per shard pending. At 1000 shards that's 110k. - Capacity: 10, + // Each shard will have a max of 500 samples pending in it's channel, plus the pending + // samples that have been enqueued. Theoretically we should only ever have about 600 samples + // per shard pending. At 1000 shards that's 600k. + Capacity: 500, BatchSendDeadline: model.Duration(5 * time.Second), // Backoff times for retrying a batch of samples on recoverable errors. @@ -616,7 +616,8 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err // QueueConfig is the configuration for the queue used to write to remote // storage. type QueueConfig struct { - // Number of samples to buffer per shard before we start dropping them. + // Number of samples to buffer per shard before we block. Defaults to + // MaxSamplesPerSend. Capacity int `yaml:"capacity,omitempty"` // Max number of shards, i.e. amount of concurrency. diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index f168192a28..2770fcfbd5 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -1294,8 +1294,11 @@ tls_config: # Configures the queue used to write to remote storage. queue_config: - # Number of samples to buffer per shard before we block reading of more samples from the WAL. - [ capacity: | default = 10 ] + # Number of samples to buffer per shard before we block reading of more + # samples from the WAL. It is recommended to have enough capacity in each + # shard to buffer several requests to keep throughput up while processing + # occassional slow remote requests. + [ capacity: | default = 500 ] # Maximum number of shards, i.e. amount of concurrency. [ max_shards: | default = 1000 ] # Minimum number of shards, i.e. amount of concurrency. diff --git a/storage/remote/ewma.go b/storage/remote/ewma.go index 82b6dd1014..d1ba275650 100644 --- a/storage/remote/ewma.go +++ b/storage/remote/ewma.go @@ -47,8 +47,7 @@ func (r *ewmaRate) rate() float64 { // tick assumes to be called every r.interval. func (r *ewmaRate) tick() { - newEvents := atomic.LoadInt64(&r.newEvents) - atomic.AddInt64(&r.newEvents, -newEvents) + newEvents := atomic.SwapInt64(&r.newEvents, 0) instantRate := float64(newEvents) / r.interval.Seconds() r.mutex.Lock() @@ -56,7 +55,7 @@ func (r *ewmaRate) tick() { if r.init { r.lastRate += r.alpha * (instantRate - r.lastRate) - } else { + } else if newEvents > 0 { r.init = true r.lastRate = instantRate } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 7a1ce313a3..e5c62bb0f1 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -178,6 +178,7 @@ type QueueManager struct { samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate integralAccumulator float64 + startedAt time.Time highestSentTimestampMetric *maxGauge pendingSamplesMetric prometheus.Gauge @@ -277,6 +278,8 @@ outer: // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { + t.startedAt = time.Now() + // Setup the QueueManagers metrics. We do this here rather than in the // constructor because of the ordering of creating Queue Managers's, stopping them, // and then starting new ones in storage/remote/storage.go ApplyConfig. @@ -440,36 +443,50 @@ func (t *QueueManager) calculateDesiredShards() { // (received - send) so we can catch up with any backlog. We use the average // outgoing batch latency to work out how many shards we need. var ( - samplesIn = t.samplesIn.rate() - samplesOut = t.samplesOut.rate() - samplesKeptRatio = samplesOut / (t.samplesDropped.rate() + samplesOut) - samplesOutDuration = t.samplesOutDuration.rate() + samplesInRate = t.samplesIn.rate() + samplesOutRate = t.samplesOut.rate() + samplesKeptRatio = samplesOutRate / (t.samplesDropped.rate() + samplesOutRate) + samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second) + samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate highestSent = t.highestSentTimestampMetric.Get() highestRecv = highestTimestamp.Get() - samplesPending = (highestRecv - highestSent) * samplesIn * samplesKeptRatio + samplesPending = (highestRecv - highestSent) * samplesInRate * samplesKeptRatio ) - // We use an integral accumulator, like in a PID, to help dampen oscillation. - t.integralAccumulator = t.integralAccumulator + (samplesPending * 0.1) - - if samplesOut <= 0 { + if samplesOutRate <= 0 { return } + // We use an integral accumulator, like in a PID, to help dampen + // oscillation. The accumulator will correct for any errors not accounted + // for in the desired shard calculation by adjusting for pending samples. + const integralGain = 0.2 + // Initialise the integral accumulator as the average rate of samples + // pending. This accounts for pending samples that were created while the + // WALWatcher starts up. + if t.integralAccumulator == 0 { + elapsed := time.Since(t.startedAt) / time.Second + t.integralAccumulator = integralGain * samplesPending / float64(elapsed) + } + t.integralAccumulator += samplesPendingRate * integralGain + var ( - timePerSample = samplesOutDuration / samplesOut - desiredShards = (timePerSample * samplesPending) / float64(time.Second) + timePerSample = samplesOutDuration / samplesOutRate + desiredShards = timePerSample * (samplesInRate + t.integralAccumulator) ) level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards", - "samplesIn", samplesIn, - "samplesOut", samplesOut, + "samplesInRate", samplesInRate, + "samplesOutRate", samplesOutRate, "samplesKeptRatio", samplesKeptRatio, + "samplesPendingRate", samplesPendingRate, "samplesPending", samplesPending, "samplesOutDuration", samplesOutDuration, "timePerSample", timePerSample, "desiredShards", desiredShards, "highestSent", highestSent, - "highestRecv", highestRecv) + "highestRecv", highestRecv, + "integralAccumulator", t.integralAccumulator, + ) // Changes in the number of shards must be greater than shardToleranceFraction. var ( diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 9e31982019..6731be81db 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -46,7 +46,7 @@ const defaultFlushDeadline = 1 * time.Minute func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. - n := config.DefaultQueueConfig.Capacity * 2 + n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 samples, series := createTimeseries(n) c := NewTestStorageClient()