Saner defaults and metrics for remote-write (#4279)

* Rename queueCapacity to shardCapacity
* Saner defaults for remote write
* Reduce allocs on retries

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2018-07-18 09:45:16 +05:30 committed by Brian Brazil
parent e3b775b78b
commit c28cc5076c
2 changed files with 11 additions and 10 deletions

View file

@ -113,13 +113,13 @@ var (
MaxShards: 1000, MaxShards: 1000,
MaxSamplesPerSend: 100, MaxSamplesPerSend: 100,
// By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At // By default, buffer 100 batches, which at 100ms per batch is 10s. At
// 1000 shards, this will buffer 100M samples total. // 1000 shards, this will buffer 10M samples total.
Capacity: 100 * 1000, Capacity: 100 * 100,
BatchSendDeadline: 5 * time.Second, BatchSendDeadline: 5 * time.Second,
// Max number of times to retry a batch on recoverable errors. // Max number of times to retry a batch on recoverable errors.
MaxRetries: 10, MaxRetries: 3,
MinBackoff: 30 * time.Millisecond, MinBackoff: 30 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond, MaxBackoff: 100 * time.Millisecond,
} }

View file

@ -97,12 +97,12 @@ var (
}, },
[]string{queue}, []string{queue},
) )
queueCapacity = prometheus.NewGaugeVec( shardCapacity = prometheus.NewGaugeVec(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "queue_capacity", Name: "shard_capacity",
Help: "The capacity of the queue of samples to be sent to the remote storage.", Help: "The capacity of each shard of the queue used for parallel sending to the remote storage.",
}, },
[]string{queue}, []string{queue},
) )
@ -123,7 +123,7 @@ func init() {
prometheus.MustRegister(droppedSamplesTotal) prometheus.MustRegister(droppedSamplesTotal)
prometheus.MustRegister(sentBatchDuration) prometheus.MustRegister(sentBatchDuration)
prometheus.MustRegister(queueLength) prometheus.MustRegister(queueLength)
prometheus.MustRegister(queueCapacity) prometheus.MustRegister(shardCapacity)
prometheus.MustRegister(numShards) prometheus.MustRegister(numShards)
} }
@ -187,7 +187,7 @@ func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels m
} }
t.shards = t.newShards(t.numShards) t.shards = t.newShards(t.numShards)
numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity)) shardCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity))
// Initialise counter labels to zero. // Initialise counter labels to zero.
sentBatchDuration.WithLabelValues(t.queueName) sentBatchDuration.WithLabelValues(t.queueName)
@ -516,9 +516,10 @@ func (s *shards) sendSamples(samples model.Samples) {
// sendSamples to the remote storage with backoff for recoverable errors. // sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(samples model.Samples) { func (s *shards) sendSamplesWithBackoff(samples model.Samples) {
backoff := s.qm.cfg.MinBackoff backoff := s.qm.cfg.MinBackoff
req := ToWriteRequest(samples)
for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- { for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- {
begin := time.Now() begin := time.Now()
req := ToWriteRequest(samples)
err := s.qm.client.Store(s.ctx, req) err := s.qm.client.Store(s.ctx, req)
sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds()) sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds())