diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go index 5da933e36..3223f904a 100644 --- a/storage/local/instrumentation.go +++ b/storage/local/instrumentation.go @@ -72,6 +72,12 @@ const ( // Op-types for chunkOps and chunkDescOps. evict = "evict" load = "load" + + seriesLocationLabel = "location" + + // Maintenance types for maintainSeriesDuration. + maintainInMemory = "memory" + maintainArchived = "archived" ) func init() { diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 3df444779..7e4fb2824 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -111,7 +111,7 @@ type persistence struct { indexingQueueLength prometheus.Gauge indexingQueueCapacity prometheus.Metric indexingBatchSizes prometheus.Summary - indexingBatchLatency prometheus.Summary + indexingBatchDuration prometheus.Summary checkpointDuration prometheus.Gauge dirtyMtx sync.Mutex // Protects dirty and becameDirty. @@ -214,12 +214,12 @@ func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync sync Help: "Quantiles for indexing batch sizes (number of metrics per batch).", }, ), - indexingBatchLatency: prometheus.NewSummary( + indexingBatchDuration: prometheus.NewSummary( prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "indexing_batch_latency_milliseconds", - Help: "Quantiles for batch indexing latencies in milliseconds.", + Name: "indexing_batch_duration_milliseconds", + Help: "Quantiles for batch indexing duration in milliseconds.", }, ), checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -264,7 +264,7 @@ func (p *persistence) Describe(ch chan<- *prometheus.Desc) { ch <- p.indexingQueueLength.Desc() ch <- p.indexingQueueCapacity.Desc() p.indexingBatchSizes.Describe(ch) - p.indexingBatchLatency.Describe(ch) + p.indexingBatchDuration.Describe(ch) ch <- p.checkpointDuration.Desc() } @@ -275,7 +275,7 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) { ch <- p.indexingQueueLength ch <- p.indexingQueueCapacity p.indexingBatchSizes.Collect(ch) - p.indexingBatchLatency.Collect(ch) + p.indexingBatchDuration.Collect(ch) ch <- p.checkpointDuration } @@ -1270,7 +1270,9 @@ func (p *persistence) processIndexingQueue() { commitBatch := func() { p.indexingBatchSizes.Observe(float64(batchSize)) defer func(begin time.Time) { - p.indexingBatchLatency.Observe(float64(time.Since(begin) / time.Millisecond)) + p.indexingBatchDuration.Observe( + float64(time.Since(begin)) / float64(time.Millisecond), + ) }(time.Now()) if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil { diff --git a/storage/local/storage.go b/storage/local/storage.go index 7ac66ab37..efe69cae7 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -102,6 +102,7 @@ type memorySeriesStorage struct { seriesOps *prometheus.CounterVec ingestedSamplesCount prometheus.Counter invalidPreloadRequestsCount prometheus.Counter + maintainSeriesDuration *prometheus.SummaryVec } // MemorySeriesStorageOptions contains options needed by @@ -172,6 +173,15 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { Name: "invalid_preload_requests_total", Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", }), + maintainSeriesDuration: prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "maintain_series_duration_milliseconds", + Help: "The duration (in milliseconds) it took to perform maintenance on a series.", + }, + []string{seriesLocationLabel}, + ), } var syncStrategy syncStrategy @@ -715,6 +725,12 @@ loop: func (s *memorySeriesStorage) maintainMemorySeries( fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp, ) (becameDirty bool) { + defer func(begin time.Time) { + s.maintainSeriesDuration.WithLabelValues(maintainInMemory).Observe( + float64(time.Since(begin)) / float64(time.Millisecond), + ) + }(time.Now()) + s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) @@ -860,6 +876,12 @@ func (s *memorySeriesStorage) writeMemorySeries( // maintainArchivedSeries drops chunks older than beforeTime from an archived // series. If the series contains no chunks after that, it is purged entirely. func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { + defer func(begin time.Time) { + s.maintainSeriesDuration.WithLabelValues(maintainArchived).Observe( + float64(time.Since(begin)) / float64(time.Millisecond), + ) + }(time.Now()) + s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) @@ -939,8 +961,8 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.seriesOps.Describe(ch) ch <- s.ingestedSamplesCount.Desc() ch <- s.invalidPreloadRequestsCount.Desc() - ch <- numMemChunksDesc + s.maintainSeriesDuration.Describe(ch) } // Collect implements prometheus.Collector. @@ -962,10 +984,10 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { s.seriesOps.Collect(ch) ch <- s.ingestedSamplesCount ch <- s.invalidPreloadRequestsCount - ch <- prometheus.MustNewConstMetric( numMemChunksDesc, prometheus.GaugeValue, float64(atomic.LoadInt64(&numMemChunks)), ) + s.maintainSeriesDuration.Collect(ch) }