mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-26 05:01:23 -08:00
Rationalise retrieval metrics so we have the state (success/failed) on both samples and batches, in a consistent fashion.
Also, report total queue capacity of all queues, i.e. capacity * shards.
This commit is contained in:
parent
ece12bff93
commit
a6931b71e8
|
@ -65,12 +65,10 @@ type StorageQueueManager struct {
|
|||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
|
||||
samplesCount *prometheus.CounterVec
|
||||
sendLatency prometheus.Summary
|
||||
failedBatches prometheus.Counter
|
||||
failedSamples prometheus.Counter
|
||||
queueLength prometheus.Gauge
|
||||
queueCapacity prometheus.Metric
|
||||
sentSamplesTotal *prometheus.CounterVec
|
||||
sentBatchDuration *prometheus.HistogramVec
|
||||
queueLength prometheus.Gauge
|
||||
queueCapacity prometheus.Metric
|
||||
}
|
||||
|
||||
// NewStorageQueueManager builds a new StorageQueueManager.
|
||||
|
@ -94,37 +92,27 @@ func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig)
|
|||
shards: shards,
|
||||
done: make(chan struct{}),
|
||||
|
||||
samplesCount: prometheus.NewCounterVec(
|
||||
sentSamplesTotal: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "sent_samples_total",
|
||||
Help: "Total number of processed samples to be sent to remote storage.",
|
||||
Help: "Total number of processed samples sent to remote storage.",
|
||||
ConstLabels: constLabels,
|
||||
},
|
||||
[]string{result},
|
||||
),
|
||||
sendLatency: prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "send_latency_seconds",
|
||||
Help: "Latency quantiles for sending sample batches to the remote storage.",
|
||||
ConstLabels: constLabels,
|
||||
}),
|
||||
failedBatches: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "failed_batches_total",
|
||||
Help: "Total number of sample batches that encountered an error while being sent to the remote storage.",
|
||||
ConstLabels: constLabels,
|
||||
}),
|
||||
failedSamples: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "failed_samples_total",
|
||||
Help: "Total number of samples that encountered an error while being sent to the remote storage.",
|
||||
ConstLabels: constLabels,
|
||||
}),
|
||||
sentBatchDuration: prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "sent_batch_duration_seconds",
|
||||
Help: "Duration of sample batch send calls to the remote storage.",
|
||||
ConstLabels: constLabels,
|
||||
Buckets: prometheus.DefBuckets,
|
||||
},
|
||||
[]string{result},
|
||||
),
|
||||
queueLength: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -140,7 +128,7 @@ func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig)
|
|||
constLabels,
|
||||
),
|
||||
prometheus.GaugeValue,
|
||||
float64(cfg.QueueCapacity),
|
||||
float64(cfg.QueueCapacity*cfg.Shards),
|
||||
),
|
||||
}
|
||||
|
||||
|
@ -158,7 +146,7 @@ func (t *StorageQueueManager) Append(s *model.Sample) error {
|
|||
select {
|
||||
case t.shards[shard] <- s:
|
||||
default:
|
||||
t.samplesCount.WithLabelValues(dropped).Inc()
|
||||
t.sentSamplesTotal.WithLabelValues(dropped).Inc()
|
||||
log.Warn("Remote storage queue full, discarding sample.")
|
||||
}
|
||||
return nil
|
||||
|
@ -173,10 +161,8 @@ func (*StorageQueueManager) NeedsThrottling() bool {
|
|||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) {
|
||||
t.samplesCount.Describe(ch)
|
||||
t.sendLatency.Describe(ch)
|
||||
ch <- t.failedBatches.Desc()
|
||||
ch <- t.failedSamples.Desc()
|
||||
t.sentSamplesTotal.Describe(ch)
|
||||
t.sentBatchDuration.Describe(ch)
|
||||
ch <- t.queueLength.Desc()
|
||||
ch <- t.queueCapacity.Desc()
|
||||
}
|
||||
|
@ -192,11 +178,9 @@ func (t *StorageQueueManager) queueLen() int {
|
|||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) {
|
||||
t.samplesCount.Collect(ch)
|
||||
t.sendLatency.Collect(ch)
|
||||
t.sentSamplesTotal.Collect(ch)
|
||||
t.sentBatchDuration.Collect(ch)
|
||||
t.queueLength.Set(float64(t.queueLen()))
|
||||
ch <- t.failedBatches
|
||||
ch <- t.failedSamples
|
||||
ch <- t.queueLength
|
||||
ch <- t.queueCapacity
|
||||
}
|
||||
|
@ -268,9 +252,7 @@ func (t *StorageQueueManager) sendSamples(s model.Samples) {
|
|||
if err != nil {
|
||||
log.Warnf("error sending %d samples to remote storage: %s", len(s), err)
|
||||
labelValue = failure
|
||||
t.failedBatches.Inc()
|
||||
t.failedSamples.Add(float64(len(s)))
|
||||
}
|
||||
t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s)))
|
||||
t.sendLatency.Observe(duration)
|
||||
t.sentSamplesTotal.WithLabelValues(labelValue).Add(float64(len(s)))
|
||||
t.sentBatchDuration.WithLabelValues(labelValue).Observe(duration)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue