Instrument series maintenance durations.

This commit is contained in:
beorn7 2015-03-19 17:06:16 +01:00
parent 12ae6e9203
commit 51d35f4481
3 changed files with 39 additions and 9 deletions

View file

@ -72,6 +72,12 @@ const (
// Op-types for chunkOps and chunkDescOps.
evict = "evict"
load = "load"
seriesLocationLabel = "location"
// Maintenance types for maintainSeriesDuration.
maintainInMemory = "memory"
maintainArchived = "archived"
)
func init() {

View file

@ -111,7 +111,7 @@ type persistence struct {
indexingQueueLength prometheus.Gauge
indexingQueueCapacity prometheus.Metric
indexingBatchSizes prometheus.Summary
indexingBatchLatency prometheus.Summary
indexingBatchDuration prometheus.Summary
checkpointDuration prometheus.Gauge
dirtyMtx sync.Mutex // Protects dirty and becameDirty.
@ -214,12 +214,12 @@ func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync sync
Help: "Quantiles for indexing batch sizes (number of metrics per batch).",
},
),
indexingBatchLatency: prometheus.NewSummary(
indexingBatchDuration: prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "indexing_batch_latency_milliseconds",
Help: "Quantiles for batch indexing latencies in milliseconds.",
Name: "indexing_batch_duration_milliseconds",
Help: "Quantiles for batch indexing duration in milliseconds.",
},
),
checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{
@ -264,7 +264,7 @@ func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
ch <- p.indexingQueueLength.Desc()
ch <- p.indexingQueueCapacity.Desc()
p.indexingBatchSizes.Describe(ch)
p.indexingBatchLatency.Describe(ch)
p.indexingBatchDuration.Describe(ch)
ch <- p.checkpointDuration.Desc()
}
@ -275,7 +275,7 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) {
ch <- p.indexingQueueLength
ch <- p.indexingQueueCapacity
p.indexingBatchSizes.Collect(ch)
p.indexingBatchLatency.Collect(ch)
p.indexingBatchDuration.Collect(ch)
ch <- p.checkpointDuration
}
@ -1270,7 +1270,9 @@ func (p *persistence) processIndexingQueue() {
commitBatch := func() {
p.indexingBatchSizes.Observe(float64(batchSize))
defer func(begin time.Time) {
p.indexingBatchLatency.Observe(float64(time.Since(begin) / time.Millisecond))
p.indexingBatchDuration.Observe(
float64(time.Since(begin)) / float64(time.Millisecond),
)
}(time.Now())
if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil {

View file

@ -102,6 +102,7 @@ type memorySeriesStorage struct {
seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter
invalidPreloadRequestsCount prometheus.Counter
maintainSeriesDuration *prometheus.SummaryVec
}
// MemorySeriesStorageOptions contains options needed by
@ -172,6 +173,15 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
Name: "invalid_preload_requests_total",
Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.",
}),
maintainSeriesDuration: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "maintain_series_duration_milliseconds",
Help: "The duration (in milliseconds) it took to perform maintenance on a series.",
},
[]string{seriesLocationLabel},
),
}
var syncStrategy syncStrategy
@ -715,6 +725,12 @@ loop:
func (s *memorySeriesStorage) maintainMemorySeries(
fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp,
) (becameDirty bool) {
defer func(begin time.Time) {
s.maintainSeriesDuration.WithLabelValues(maintainInMemory).Observe(
float64(time.Since(begin)) / float64(time.Millisecond),
)
}(time.Now())
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
@ -860,6 +876,12 @@ func (s *memorySeriesStorage) writeMemorySeries(
// maintainArchivedSeries drops chunks older than beforeTime from an archived
// series. If the series contains no chunks after that, it is purged entirely.
func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
defer func(begin time.Time) {
s.maintainSeriesDuration.WithLabelValues(maintainArchived).Observe(
float64(time.Since(begin)) / float64(time.Millisecond),
)
}(time.Now())
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
@ -939,8 +961,8 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc()
ch <- s.invalidPreloadRequestsCount.Desc()
ch <- numMemChunksDesc
s.maintainSeriesDuration.Describe(ch)
}
// Collect implements prometheus.Collector.
@ -962,10 +984,10 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount
ch <- s.invalidPreloadRequestsCount
ch <- prometheus.MustNewConstMetric(
numMemChunksDesc,
prometheus.GaugeValue,
float64(atomic.LoadInt64(&numMemChunks)),
)
s.maintainSeriesDuration.Collect(ch)
}