Add various persistence related metrics (#2333)

Add metrics around checkpointing and persistence

* Add a metric to say if checkpointing is happening,
and another to track total checkpoint time and count.

This breaks the existing prometheus_local_storage_checkpoint_duration_seconds
by renaming it to prometheus_local_storage_checkpoint_last_duration_seconds
as the former name is more appropriate for a summary.

* Add metric for last checkpoint size.

* Add metric for series/chunks processed by checkpoints.

For long checkpoints it'd be useful to see how they're progressing.

* Add metric for dirty series

* Add metric for number of chunks persisted per series.

You can get the number of chunks from chunk_ops,
but not the matching number of series. This helps determine
the size of the writes being made.

* Add metric for chunks queued for persistence

Chunks created includes both chunks that'll need persistence
and chunks read in for queries. This only includes chunks created
for persistence.

* Code review comments on new persistence metrics.
This commit is contained in:
Brian Brazil 2017-01-11 15:11:19 +00:00 committed by GitHub
parent 6ce97837ab
commit 1dcb7637f5
3 changed files with 96 additions and 12 deletions

View file

@ -46,7 +46,7 @@
</tr> </tr>
<tr> <tr>
<td>Checkpoint Duration</td> <td>Checkpoint Duration</td>
<td>{{ template "prom_query_drilldown" (args (printf "prometheus_local_storage_checkpoint_duration_seconds{job='prometheus',instance='%s'}" .Params.instance) "" "humanizeDuration") }}</td> <td>{{ template "prom_query_drilldown" (args (printf "prometheus_local_storage_checkpoint_last_duration_seconds{job='prometheus',instance='%s'}" .Params.instance) "" "humanizeDuration") }}</td>
</tr> </tr>
<tr> <tr>

View file

@ -114,9 +114,14 @@ type persistence struct {
indexingQueueCapacity prometheus.Metric indexingQueueCapacity prometheus.Metric
indexingBatchSizes prometheus.Summary indexingBatchSizes prometheus.Summary
indexingBatchDuration prometheus.Summary indexingBatchDuration prometheus.Summary
checkpointDuration prometheus.Gauge checkpointDuration prometheus.Summary
checkpointLastDuration prometheus.Gauge
checkpointLastSize prometheus.Gauge
checkpointChunksWritten prometheus.Summary
dirtyCounter prometheus.Counter dirtyCounter prometheus.Counter
startedDirty prometheus.Gauge startedDirty prometheus.Gauge
checkpointing prometheus.Gauge
seriesChunksPersisted prometheus.Histogram
dirtyMtx sync.Mutex // Protects dirty and becameDirty. dirtyMtx sync.Mutex // Protects dirty and becameDirty.
dirty bool // true if persistence was started in dirty state. dirty bool // true if persistence was started in dirty state.
@ -247,11 +252,31 @@ func newPersistence(
Help: "Quantiles for batch indexing duration in seconds.", Help: "Quantiles for batch indexing duration in seconds.",
}, },
), ),
checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{ checkpointLastDuration: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "checkpoint_last_duration_seconds",
Help: "The duration in seconds it took to last checkpoint open chunks and chunks yet to be persisted.",
}),
checkpointDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Objectives: map[float64]float64{},
Name: "checkpoint_duration_seconds", Name: "checkpoint_duration_seconds",
Help: "The duration in seconds it took to checkpoint in-memory metrics and head chunks.", Help: "The duration in seconds taken for checkpointing open chunks and chunks yet to be persisted",
}),
checkpointLastSize: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpoint_last_size_bytes",
Help: "The size of the last checkpoint of open chunks and chunks yet to be persisted",
}),
checkpointChunksWritten: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Objectives: map[float64]float64{},
Name: "checkpoint_series_chunks_written",
Help: "The number of chunk written per series while checkpointing open chunks and chunks yet to be persisted.",
}), }),
dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{ dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
@ -265,6 +290,21 @@ func newPersistence(
Name: "started_dirty", Name: "started_dirty",
Help: "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup.", Help: "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup.",
}), }),
checkpointing: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpointing",
Help: "1 if the storage is checkpointing, 0 otherwise.",
}),
seriesChunksPersisted: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_chunks_persisted",
Help: "The number of chunks persisted per series.",
// Even with 4 bytes per sample, you're not going to get more than 85
// chunks in 6 hours for a time series with 1s resolution.
Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128},
}),
dirty: dirty, dirty: dirty,
pedanticChecks: pedanticChecks, pedanticChecks: pedanticChecks,
dirtyFileName: dirtyPath, dirtyFileName: dirtyPath,
@ -310,8 +350,13 @@ func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
p.indexingBatchSizes.Describe(ch) p.indexingBatchSizes.Describe(ch)
p.indexingBatchDuration.Describe(ch) p.indexingBatchDuration.Describe(ch)
ch <- p.checkpointDuration.Desc() ch <- p.checkpointDuration.Desc()
ch <- p.checkpointLastDuration.Desc()
ch <- p.checkpointLastSize.Desc()
ch <- p.checkpointChunksWritten.Desc()
ch <- p.checkpointing.Desc()
ch <- p.dirtyCounter.Desc() ch <- p.dirtyCounter.Desc()
ch <- p.startedDirty.Desc() ch <- p.startedDirty.Desc()
ch <- p.seriesChunksPersisted.Desc()
} }
// Collect implements prometheus.Collector. // Collect implements prometheus.Collector.
@ -323,8 +368,13 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) {
p.indexingBatchSizes.Collect(ch) p.indexingBatchSizes.Collect(ch)
p.indexingBatchDuration.Collect(ch) p.indexingBatchDuration.Collect(ch)
ch <- p.checkpointDuration ch <- p.checkpointDuration
ch <- p.checkpointLastDuration
ch <- p.checkpointLastSize
ch <- p.checkpointChunksWritten
ch <- p.checkpointing
ch <- p.dirtyCounter ch <- p.dirtyCounter
ch <- p.startedDirty ch <- p.startedDirty
ch <- p.seriesChunksPersisted
} }
// isDirty returns the dirty flag in a goroutine-safe way. // isDirty returns the dirty flag in a goroutine-safe way.
@ -559,6 +609,8 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
// //
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
log.Info("Checkpointing in-memory metrics and chunks...") log.Info("Checkpointing in-memory metrics and chunks...")
p.checkpointing.Set(1)
defer p.checkpointing.Set(0)
begin := time.Now() begin := time.Now()
f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil { if err != nil {
@ -581,7 +633,8 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
} }
err = os.Rename(p.headsTempFileName(), p.headsFileName()) err = os.Rename(p.headsTempFileName(), p.headsFileName())
duration := time.Since(begin) duration := time.Since(begin)
p.checkpointDuration.Set(duration.Seconds()) p.checkpointDuration.Observe(duration.Seconds())
p.checkpointLastDuration.Set(duration.Seconds())
log.Infof("Done checkpointing in-memory metrics and chunks in %v.", duration) log.Infof("Done checkpointing in-memory metrics and chunks in %v.", duration)
}() }()
@ -677,6 +730,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return return
} }
} }
p.checkpointChunksWritten.Observe(float64(len(m.series.chunkDescs) - m.series.persistWatermark))
} }
// Series is checkpointed now, so declare it clean. In case the entire // Series is checkpointed now, so declare it clean. In case the entire
// checkpoint fails later on, this is fine, as the storage's series // checkpoint fails later on, this is fine, as the storage's series
@ -704,6 +758,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return err return err
} }
} }
info, err := f.Stat()
if err != nil {
return err
}
p.checkpointLastSize.Set(float64(info.Size()))
return err return err
} }
@ -1520,6 +1579,7 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error {
// would only put back the original buf. // would only put back the original buf.
p.bufPool.Put(b) p.bufPool.Put(b)
}() }()
numChunks := len(chunks)
for batchSize := chunkMaxBatchSize; len(chunks) > 0; chunks = chunks[batchSize:] { for batchSize := chunkMaxBatchSize; len(chunks) > 0; chunks = chunks[batchSize:] {
if batchSize > len(chunks) { if batchSize > len(chunks) {
@ -1543,6 +1603,7 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error {
return err return err
} }
} }
p.seriesChunksPersisted.Observe(float64(numChunks))
return nil return nil
} }

View file

@ -178,7 +178,9 @@ type MemorySeriesStorage struct {
quarantineStopping, quarantineStopped chan struct{} quarantineStopping, quarantineStopped chan struct{}
persistErrors prometheus.Counter persistErrors prometheus.Counter
queuedChunksToPersist prometheus.Counter
numSeries prometheus.Gauge numSeries prometheus.Gauge
dirtySeries prometheus.Gauge
seriesOps *prometheus.CounterVec seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter ingestedSamplesCount prometheus.Counter
discardedSamplesCount *prometheus.CounterVec discardedSamplesCount *prometheus.CounterVec
@ -240,12 +242,24 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
Name: "persist_errors_total", Name: "persist_errors_total",
Help: "The total number of errors while persisting chunks.", Help: "The total number of errors while persisting chunks.",
}), }),
queuedChunksToPersist: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queued_chunks_to_persist_total",
Help: "The total number of chunks queued for persistence.",
}),
numSeries: prometheus.NewGauge(prometheus.GaugeOpts{ numSeries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "memory_series", Name: "memory_series",
Help: "The current number of series in memory.", Help: "The current number of series in memory.",
}), }),
dirtySeries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_dirty_series",
Help: "The current number of series that would require a disk seek during crash recovery.",
}),
seriesOps: prometheus.NewCounterVec( seriesOps: prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
@ -1260,6 +1274,7 @@ loop:
log.Errorln("Error while checkpointing:", err) log.Errorln("Error while checkpointing:", err)
} else { } else {
dirtySeriesCount = 0 dirtySeriesCount = 0
s.dirtySeries.Set(0)
} }
// If a checkpoint takes longer than checkpointInterval, unluckily timed // If a checkpoint takes longer than checkpointInterval, unluckily timed
// combination with the Reset(0) call below can lead to a case where a // combination with the Reset(0) call below can lead to a case where a
@ -1272,6 +1287,7 @@ loop:
case fp := <-memoryFingerprints: case fp := <-memoryFingerprints:
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) { if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
dirtySeriesCount++ dirtySeriesCount++
s.dirtySeries.Inc()
// Check if we have enough "dirty" series so that we need an early checkpoint. // Check if we have enough "dirty" series so that we need an early checkpoint.
// However, if we are already behind persisting chunks, creating a checkpoint // However, if we are already behind persisting chunks, creating a checkpoint
// would be counterproductive, as it would slow down chunk persisting even more, // would be counterproductive, as it would slow down chunk persisting even more,
@ -1531,6 +1547,9 @@ func (s *MemorySeriesStorage) getNumChunksToPersist() int {
// negative 'by' to decrement. // negative 'by' to decrement.
func (s *MemorySeriesStorage) incNumChunksToPersist(by int) { func (s *MemorySeriesStorage) incNumChunksToPersist(by int) {
atomic.AddInt64(&s.numChunksToPersist, int64(by)) atomic.AddInt64(&s.numChunksToPersist, int64(by))
if by > 0 {
s.queuedChunksToPersist.Add(float64(by))
}
} }
// calculatePersistenceUrgencyScore calculates and returns an urgency score for // calculatePersistenceUrgencyScore calculates and returns an urgency score for
@ -1734,9 +1753,11 @@ func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.mapper.Describe(ch) s.mapper.Describe(ch)
ch <- s.persistErrors.Desc() ch <- s.persistErrors.Desc()
ch <- s.queuedChunksToPersist.Desc()
ch <- maxChunksToPersistDesc ch <- maxChunksToPersistDesc
ch <- numChunksToPersistDesc ch <- numChunksToPersistDesc
ch <- s.numSeries.Desc() ch <- s.numSeries.Desc()
ch <- s.dirtySeries.Desc()
s.seriesOps.Describe(ch) s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc() ch <- s.ingestedSamplesCount.Desc()
s.discardedSamplesCount.Describe(ch) s.discardedSamplesCount.Describe(ch)
@ -1753,6 +1774,7 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.mapper.Collect(ch) s.mapper.Collect(ch)
ch <- s.persistErrors ch <- s.persistErrors
ch <- s.queuedChunksToPersist
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
maxChunksToPersistDesc, maxChunksToPersistDesc,
prometheus.GaugeValue, prometheus.GaugeValue,
@ -1764,6 +1786,7 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
float64(s.getNumChunksToPersist()), float64(s.getNumChunksToPersist()),
) )
ch <- s.numSeries ch <- s.numSeries
ch <- s.dirtySeries
s.seriesOps.Collect(ch) s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount ch <- s.ingestedSamplesCount
s.discardedSamplesCount.Collect(ch) s.discardedSamplesCount.Collect(ch)