From c28cc5076cd48c344d8c7b86a9c551f6c87b1fdf Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 18 Jul 2018 09:45:16 +0530 Subject: [PATCH] Saner defaults and metrics for remote-write (#4279) * Rename queueCapacity to shardCapacity * Saner defaults for remote write * Reduce allocs on retries Signed-off-by: Goutham Veeramachaneni --- config/config.go | 8 ++++---- storage/remote/queue_manager.go | 13 +++++++------ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/config/config.go b/config/config.go index b7a566764d..b107d6dc96 100644 --- a/config/config.go +++ b/config/config.go @@ -113,13 +113,13 @@ var ( MaxShards: 1000, MaxSamplesPerSend: 100, - // By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At - // 1000 shards, this will buffer 100M samples total. - Capacity: 100 * 1000, + // By default, buffer 100 batches, which at 100ms per batch is 10s. At + // 1000 shards, this will buffer 10M samples total. + Capacity: 100 * 100, BatchSendDeadline: 5 * time.Second, // Max number of times to retry a batch on recoverable errors. - MaxRetries: 10, + MaxRetries: 3, MinBackoff: 30 * time.Millisecond, MaxBackoff: 100 * time.Millisecond, } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 927601d7fa..421a89c209 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -97,12 +97,12 @@ var ( }, []string{queue}, ) - queueCapacity = prometheus.NewGaugeVec( + shardCapacity = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "queue_capacity", - Help: "The capacity of the queue of samples to be sent to the remote storage.", + Name: "shard_capacity", + Help: "The capacity of each shard of the queue used for parallel sending to the remote storage.", }, []string{queue}, ) @@ -123,7 +123,7 @@ func init() { prometheus.MustRegister(droppedSamplesTotal) prometheus.MustRegister(sentBatchDuration) prometheus.MustRegister(queueLength) - prometheus.MustRegister(queueCapacity) + prometheus.MustRegister(shardCapacity) prometheus.MustRegister(numShards) } @@ -187,7 +187,7 @@ func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels m } t.shards = t.newShards(t.numShards) numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) - queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity)) + shardCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity)) // Initialise counter labels to zero. sentBatchDuration.WithLabelValues(t.queueName) @@ -516,9 +516,10 @@ func (s *shards) sendSamples(samples model.Samples) { // sendSamples to the remote storage with backoff for recoverable errors. func (s *shards) sendSamplesWithBackoff(samples model.Samples) { backoff := s.qm.cfg.MinBackoff + req := ToWriteRequest(samples) + for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- { begin := time.Now() - req := ToWriteRequest(samples) err := s.qm.client.Store(s.ctx, req) sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds())