diff --git a/consoles/prometheus-overview.html b/consoles/prometheus-overview.html
index f1f111e9b4..d85c9310f8 100644
--- a/consoles/prometheus-overview.html
+++ b/consoles/prometheus-overview.html
@@ -46,7 +46,7 @@
Checkpoint Duration |
- {{ template "prom_query_drilldown" (args (printf "prometheus_local_storage_checkpoint_duration_seconds{job='prometheus',instance='%s'}" .Params.instance) "" "humanizeDuration") }} |
+ {{ template "prom_query_drilldown" (args (printf "prometheus_local_storage_checkpoint_last_duration_seconds{job='prometheus',instance='%s'}" .Params.instance) "" "humanizeDuration") }} |
diff --git a/storage/local/persistence.go b/storage/local/persistence.go
index 9fbb3f1f59..ef173f2119 100644
--- a/storage/local/persistence.go
+++ b/storage/local/persistence.go
@@ -110,13 +110,18 @@ type persistence struct {
indexingStopped chan struct{}
indexingFlush chan chan int
- indexingQueueLength prometheus.Gauge
- indexingQueueCapacity prometheus.Metric
- indexingBatchSizes prometheus.Summary
- indexingBatchDuration prometheus.Summary
- checkpointDuration prometheus.Gauge
- dirtyCounter prometheus.Counter
- startedDirty prometheus.Gauge
+ indexingQueueLength prometheus.Gauge
+ indexingQueueCapacity prometheus.Metric
+ indexingBatchSizes prometheus.Summary
+ indexingBatchDuration prometheus.Summary
+ checkpointDuration prometheus.Summary
+ checkpointLastDuration prometheus.Gauge
+ checkpointLastSize prometheus.Gauge
+ checkpointChunksWritten prometheus.Summary
+ dirtyCounter prometheus.Counter
+ startedDirty prometheus.Gauge
+ checkpointing prometheus.Gauge
+ seriesChunksPersisted prometheus.Histogram
dirtyMtx sync.Mutex // Protects dirty and becameDirty.
dirty bool // true if persistence was started in dirty state.
@@ -247,11 +252,31 @@ func newPersistence(
Help: "Quantiles for batch indexing duration in seconds.",
},
),
- checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{
+ checkpointLastDuration: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
- Name: "checkpoint_duration_seconds",
- Help: "The duration in seconds it took to checkpoint in-memory metrics and head chunks.",
+ Name: "checkpoint_last_duration_seconds",
+ Help: "The duration in seconds it took to last checkpoint open chunks and chunks yet to be persisted.",
+ }),
+ checkpointDuration: prometheus.NewSummary(prometheus.SummaryOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Objectives: map[float64]float64{},
+ Name: "checkpoint_duration_seconds",
+ Help: "The duration in seconds taken for checkpointing open chunks and chunks yet to be persisted",
+ }),
+ checkpointLastSize: prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "checkpoint_last_size_bytes",
+ Help: "The size of the last checkpoint of open chunks and chunks yet to be persisted",
+ }),
+ checkpointChunksWritten: prometheus.NewSummary(prometheus.SummaryOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Objectives: map[float64]float64{},
+ Name: "checkpoint_series_chunks_written",
+ Help: "The number of chunk written per series while checkpointing open chunks and chunks yet to be persisted.",
}),
dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
@@ -265,6 +290,21 @@ func newPersistence(
Name: "started_dirty",
Help: "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup.",
}),
+ checkpointing: prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "checkpointing",
+ Help: "1 if the storage is checkpointing, 0 otherwise.",
+ }),
+ seriesChunksPersisted: prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "series_chunks_persisted",
+ Help: "The number of chunks persisted per series.",
+ // Even with 4 bytes per sample, you're not going to get more than 85
+ // chunks in 6 hours for a time series with 1s resolution.
+ Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128},
+ }),
dirty: dirty,
pedanticChecks: pedanticChecks,
dirtyFileName: dirtyPath,
@@ -310,8 +350,13 @@ func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
p.indexingBatchSizes.Describe(ch)
p.indexingBatchDuration.Describe(ch)
ch <- p.checkpointDuration.Desc()
+ ch <- p.checkpointLastDuration.Desc()
+ ch <- p.checkpointLastSize.Desc()
+ ch <- p.checkpointChunksWritten.Desc()
+ ch <- p.checkpointing.Desc()
ch <- p.dirtyCounter.Desc()
ch <- p.startedDirty.Desc()
+ ch <- p.seriesChunksPersisted.Desc()
}
// Collect implements prometheus.Collector.
@@ -323,8 +368,13 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) {
p.indexingBatchSizes.Collect(ch)
p.indexingBatchDuration.Collect(ch)
ch <- p.checkpointDuration
+ ch <- p.checkpointLastDuration
+ ch <- p.checkpointLastSize
+ ch <- p.checkpointChunksWritten
+ ch <- p.checkpointing
ch <- p.dirtyCounter
ch <- p.startedDirty
+ ch <- p.seriesChunksPersisted
}
// isDirty returns the dirty flag in a goroutine-safe way.
@@ -559,6 +609,8 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
//
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
log.Info("Checkpointing in-memory metrics and chunks...")
+ p.checkpointing.Set(1)
+ defer p.checkpointing.Set(0)
begin := time.Now()
f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
@@ -581,7 +633,8 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
}
err = os.Rename(p.headsTempFileName(), p.headsFileName())
duration := time.Since(begin)
- p.checkpointDuration.Set(duration.Seconds())
+ p.checkpointDuration.Observe(duration.Seconds())
+ p.checkpointLastDuration.Set(duration.Seconds())
log.Infof("Done checkpointing in-memory metrics and chunks in %v.", duration)
}()
@@ -677,6 +730,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return
}
}
+ p.checkpointChunksWritten.Observe(float64(len(m.series.chunkDescs) - m.series.persistWatermark))
}
// Series is checkpointed now, so declare it clean. In case the entire
// checkpoint fails later on, this is fine, as the storage's series
@@ -704,6 +758,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return err
}
}
+ info, err := f.Stat()
+ if err != nil {
+ return err
+ }
+ p.checkpointLastSize.Set(float64(info.Size()))
return err
}
@@ -1520,6 +1579,7 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error {
// would only put back the original buf.
p.bufPool.Put(b)
}()
+ numChunks := len(chunks)
for batchSize := chunkMaxBatchSize; len(chunks) > 0; chunks = chunks[batchSize:] {
if batchSize > len(chunks) {
@@ -1543,6 +1603,7 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error {
return err
}
}
+ p.seriesChunksPersisted.Observe(float64(numChunks))
return nil
}
diff --git a/storage/local/storage.go b/storage/local/storage.go
index 17f3e9d28f..d772758d9b 100644
--- a/storage/local/storage.go
+++ b/storage/local/storage.go
@@ -178,7 +178,9 @@ type MemorySeriesStorage struct {
quarantineStopping, quarantineStopped chan struct{}
persistErrors prometheus.Counter
+ queuedChunksToPersist prometheus.Counter
numSeries prometheus.Gauge
+ dirtySeries prometheus.Gauge
seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter
discardedSamplesCount *prometheus.CounterVec
@@ -240,12 +242,24 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
Name: "persist_errors_total",
Help: "The total number of errors while persisting chunks.",
}),
+ queuedChunksToPersist: prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "queued_chunks_to_persist_total",
+ Help: "The total number of chunks queued for persistence.",
+ }),
numSeries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_series",
Help: "The current number of series in memory.",
}),
+ dirtySeries: prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "memory_dirty_series",
+ Help: "The current number of series that would require a disk seek during crash recovery.",
+ }),
seriesOps: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
@@ -1260,6 +1274,7 @@ loop:
log.Errorln("Error while checkpointing:", err)
} else {
dirtySeriesCount = 0
+ s.dirtySeries.Set(0)
}
// If a checkpoint takes longer than checkpointInterval, unluckily timed
// combination with the Reset(0) call below can lead to a case where a
@@ -1272,6 +1287,7 @@ loop:
case fp := <-memoryFingerprints:
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
dirtySeriesCount++
+ s.dirtySeries.Inc()
// Check if we have enough "dirty" series so that we need an early checkpoint.
// However, if we are already behind persisting chunks, creating a checkpoint
// would be counterproductive, as it would slow down chunk persisting even more,
@@ -1531,6 +1547,9 @@ func (s *MemorySeriesStorage) getNumChunksToPersist() int {
// negative 'by' to decrement.
func (s *MemorySeriesStorage) incNumChunksToPersist(by int) {
atomic.AddInt64(&s.numChunksToPersist, int64(by))
+ if by > 0 {
+ s.queuedChunksToPersist.Add(float64(by))
+ }
}
// calculatePersistenceUrgencyScore calculates and returns an urgency score for
@@ -1734,9 +1753,11 @@ func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.mapper.Describe(ch)
ch <- s.persistErrors.Desc()
+ ch <- s.queuedChunksToPersist.Desc()
ch <- maxChunksToPersistDesc
ch <- numChunksToPersistDesc
ch <- s.numSeries.Desc()
+ ch <- s.dirtySeries.Desc()
s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc()
s.discardedSamplesCount.Describe(ch)
@@ -1753,6 +1774,7 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.mapper.Collect(ch)
ch <- s.persistErrors
+ ch <- s.queuedChunksToPersist
ch <- prometheus.MustNewConstMetric(
maxChunksToPersistDesc,
prometheus.GaugeValue,
@@ -1764,6 +1786,7 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
float64(s.getNumChunksToPersist()),
)
ch <- s.numSeries
+ ch <- s.dirtySeries
s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount
s.discardedSamplesCount.Collect(ch)