From 1dcb7637f55bcad31dde690394c6ab92eaccd47d Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 11 Jan 2017 15:11:19 +0000 Subject: [PATCH] Add various persistence related metrics (#2333) Add metrics around checkpointing and persistence * Add a metric to say if checkpointing is happening, and another to track total checkpoint time and count. This breaks the existing prometheus_local_storage_checkpoint_duration_seconds by renaming it to prometheus_local_storage_checkpoint_last_duration_seconds as the former name is more appropriate for a summary. * Add metric for last checkpoint size. * Add metric for series/chunks processed by checkpoints. For long checkpoints it'd be useful to see how they're progressing. * Add metric for dirty series * Add metric for number of chunks persisted per series. You can get the number of chunks from chunk_ops, but not the matching number of series. This helps determine the size of the writes being made. * Add metric for chunks queued for persistence Chunks created includes both chunks that'll need persistence and chunks read in for queries. This only includes chunks created for persistence. * Code review comments on new persistence metrics. --- consoles/prometheus-overview.html | 2 +- storage/local/persistence.go | 83 +++++++++++++++++++++++++++---- storage/local/storage.go | 23 +++++++++ 3 files changed, 96 insertions(+), 12 deletions(-) diff --git a/consoles/prometheus-overview.html b/consoles/prometheus-overview.html index f1f111e9b..d85c9310f 100644 --- a/consoles/prometheus-overview.html +++ b/consoles/prometheus-overview.html @@ -46,7 +46,7 @@ Checkpoint Duration - {{ template "prom_query_drilldown" (args (printf "prometheus_local_storage_checkpoint_duration_seconds{job='prometheus',instance='%s'}" .Params.instance) "" "humanizeDuration") }} + {{ template "prom_query_drilldown" (args (printf "prometheus_local_storage_checkpoint_last_duration_seconds{job='prometheus',instance='%s'}" .Params.instance) "" "humanizeDuration") }} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 9fbb3f1f5..ef173f211 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -110,13 +110,18 @@ type persistence struct { indexingStopped chan struct{} indexingFlush chan chan int - indexingQueueLength prometheus.Gauge - indexingQueueCapacity prometheus.Metric - indexingBatchSizes prometheus.Summary - indexingBatchDuration prometheus.Summary - checkpointDuration prometheus.Gauge - dirtyCounter prometheus.Counter - startedDirty prometheus.Gauge + indexingQueueLength prometheus.Gauge + indexingQueueCapacity prometheus.Metric + indexingBatchSizes prometheus.Summary + indexingBatchDuration prometheus.Summary + checkpointDuration prometheus.Summary + checkpointLastDuration prometheus.Gauge + checkpointLastSize prometheus.Gauge + checkpointChunksWritten prometheus.Summary + dirtyCounter prometheus.Counter + startedDirty prometheus.Gauge + checkpointing prometheus.Gauge + seriesChunksPersisted prometheus.Histogram dirtyMtx sync.Mutex // Protects dirty and becameDirty. dirty bool // true if persistence was started in dirty state. @@ -247,11 +252,31 @@ func newPersistence( Help: "Quantiles for batch indexing duration in seconds.", }, ), - checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{ + checkpointLastDuration: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "checkpoint_duration_seconds", - Help: "The duration in seconds it took to checkpoint in-memory metrics and head chunks.", + Name: "checkpoint_last_duration_seconds", + Help: "The duration in seconds it took to last checkpoint open chunks and chunks yet to be persisted.", + }), + checkpointDuration: prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Objectives: map[float64]float64{}, + Name: "checkpoint_duration_seconds", + Help: "The duration in seconds taken for checkpointing open chunks and chunks yet to be persisted", + }), + checkpointLastSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "checkpoint_last_size_bytes", + Help: "The size of the last checkpoint of open chunks and chunks yet to be persisted", + }), + checkpointChunksWritten: prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Objectives: map[float64]float64{}, + Name: "checkpoint_series_chunks_written", + Help: "The number of chunk written per series while checkpointing open chunks and chunks yet to be persisted.", }), dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, @@ -265,6 +290,21 @@ func newPersistence( Name: "started_dirty", Help: "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup.", }), + checkpointing: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "checkpointing", + Help: "1 if the storage is checkpointing, 0 otherwise.", + }), + seriesChunksPersisted: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "series_chunks_persisted", + Help: "The number of chunks persisted per series.", + // Even with 4 bytes per sample, you're not going to get more than 85 + // chunks in 6 hours for a time series with 1s resolution. + Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128}, + }), dirty: dirty, pedanticChecks: pedanticChecks, dirtyFileName: dirtyPath, @@ -310,8 +350,13 @@ func (p *persistence) Describe(ch chan<- *prometheus.Desc) { p.indexingBatchSizes.Describe(ch) p.indexingBatchDuration.Describe(ch) ch <- p.checkpointDuration.Desc() + ch <- p.checkpointLastDuration.Desc() + ch <- p.checkpointLastSize.Desc() + ch <- p.checkpointChunksWritten.Desc() + ch <- p.checkpointing.Desc() ch <- p.dirtyCounter.Desc() ch <- p.startedDirty.Desc() + ch <- p.seriesChunksPersisted.Desc() } // Collect implements prometheus.Collector. @@ -323,8 +368,13 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) { p.indexingBatchSizes.Collect(ch) p.indexingBatchDuration.Collect(ch) ch <- p.checkpointDuration + ch <- p.checkpointLastDuration + ch <- p.checkpointLastSize + ch <- p.checkpointChunksWritten + ch <- p.checkpointing ch <- p.dirtyCounter ch <- p.startedDirty + ch <- p.seriesChunksPersisted } // isDirty returns the dirty flag in a goroutine-safe way. @@ -559,6 +609,8 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([ // func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { log.Info("Checkpointing in-memory metrics and chunks...") + p.checkpointing.Set(1) + defer p.checkpointing.Set(0) begin := time.Now() f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) if err != nil { @@ -581,7 +633,8 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } err = os.Rename(p.headsTempFileName(), p.headsFileName()) duration := time.Since(begin) - p.checkpointDuration.Set(duration.Seconds()) + p.checkpointDuration.Observe(duration.Seconds()) + p.checkpointLastDuration.Set(duration.Seconds()) log.Infof("Done checkpointing in-memory metrics and chunks in %v.", duration) }() @@ -677,6 +730,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap return } } + p.checkpointChunksWritten.Observe(float64(len(m.series.chunkDescs) - m.series.persistWatermark)) } // Series is checkpointed now, so declare it clean. In case the entire // checkpoint fails later on, this is fine, as the storage's series @@ -704,6 +758,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap return err } } + info, err := f.Stat() + if err != nil { + return err + } + p.checkpointLastSize.Set(float64(info.Size())) return err } @@ -1520,6 +1579,7 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error { // would only put back the original buf. p.bufPool.Put(b) }() + numChunks := len(chunks) for batchSize := chunkMaxBatchSize; len(chunks) > 0; chunks = chunks[batchSize:] { if batchSize > len(chunks) { @@ -1543,6 +1603,7 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error { return err } } + p.seriesChunksPersisted.Observe(float64(numChunks)) return nil } diff --git a/storage/local/storage.go b/storage/local/storage.go index 17f3e9d28..d772758d9 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -178,7 +178,9 @@ type MemorySeriesStorage struct { quarantineStopping, quarantineStopped chan struct{} persistErrors prometheus.Counter + queuedChunksToPersist prometheus.Counter numSeries prometheus.Gauge + dirtySeries prometheus.Gauge seriesOps *prometheus.CounterVec ingestedSamplesCount prometheus.Counter discardedSamplesCount *prometheus.CounterVec @@ -240,12 +242,24 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage Name: "persist_errors_total", Help: "The total number of errors while persisting chunks.", }), + queuedChunksToPersist: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queued_chunks_to_persist_total", + Help: "The total number of chunks queued for persistence.", + }), numSeries: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "memory_series", Help: "The current number of series in memory.", }), + dirtySeries: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "memory_dirty_series", + Help: "The current number of series that would require a disk seek during crash recovery.", + }), seriesOps: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -1260,6 +1274,7 @@ loop: log.Errorln("Error while checkpointing:", err) } else { dirtySeriesCount = 0 + s.dirtySeries.Set(0) } // If a checkpoint takes longer than checkpointInterval, unluckily timed // combination with the Reset(0) call below can lead to a case where a @@ -1272,6 +1287,7 @@ loop: case fp := <-memoryFingerprints: if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) { dirtySeriesCount++ + s.dirtySeries.Inc() // Check if we have enough "dirty" series so that we need an early checkpoint. // However, if we are already behind persisting chunks, creating a checkpoint // would be counterproductive, as it would slow down chunk persisting even more, @@ -1531,6 +1547,9 @@ func (s *MemorySeriesStorage) getNumChunksToPersist() int { // negative 'by' to decrement. func (s *MemorySeriesStorage) incNumChunksToPersist(by int) { atomic.AddInt64(&s.numChunksToPersist, int64(by)) + if by > 0 { + s.queuedChunksToPersist.Add(float64(by)) + } } // calculatePersistenceUrgencyScore calculates and returns an urgency score for @@ -1734,9 +1753,11 @@ func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.mapper.Describe(ch) ch <- s.persistErrors.Desc() + ch <- s.queuedChunksToPersist.Desc() ch <- maxChunksToPersistDesc ch <- numChunksToPersistDesc ch <- s.numSeries.Desc() + ch <- s.dirtySeries.Desc() s.seriesOps.Describe(ch) ch <- s.ingestedSamplesCount.Desc() s.discardedSamplesCount.Describe(ch) @@ -1753,6 +1774,7 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) { s.mapper.Collect(ch) ch <- s.persistErrors + ch <- s.queuedChunksToPersist ch <- prometheus.MustNewConstMetric( maxChunksToPersistDesc, prometheus.GaugeValue, @@ -1764,6 +1786,7 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) { float64(s.getNumChunksToPersist()), ) ch <- s.numSeries + ch <- s.dirtySeries s.seriesOps.Collect(ch) ch <- s.ingestedSamplesCount s.discardedSamplesCount.Collect(ch)