Merge pull request #1345 from prometheus/beorn7/storage2

Fix multiple issues around chunks to persist.
This commit is contained in:
Björn Rabenstein 2016-01-26 17:47:16 +01:00
commit d7f92a012b
3 changed files with 127 additions and 42 deletions

View file

@ -113,7 +113,7 @@ func init() {
"How long to retain samples in the local storage.", "How long to retain samples in the local storage.",
) )
cfg.fs.IntVar( cfg.fs.IntVar(
&cfg.storage.MaxChunksToPersist, "storage.local.max-chunks-to-persist", 1024*1024, &cfg.storage.MaxChunksToPersist, "storage.local.max-chunks-to-persist", 512*1024,
"How many chunks can be waiting for persistence before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.", "How many chunks can be waiting for persistence before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.",
) )
cfg.fs.DurationVar( cfg.fs.DurationVar(

View file

@ -861,6 +861,12 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
} }
} }
headChunkClosed := persistWatermark >= numChunkDescs
if !headChunkClosed {
// Head chunk is not ready for persisting yet.
chunksToPersist--
}
fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{ fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{
metric: model.Metric(metric), metric: model.Metric(metric),
chunkDescs: chunkDescs, chunkDescs: chunkDescs,
@ -869,7 +875,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
chunkDescsOffset: int(chunkDescsOffset), chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: model.Time(savedFirstTime), savedFirstTime: model.Time(savedFirstTime),
lastTime: chunkDescs[len(chunkDescs)-1].lastTime(), lastTime: chunkDescs[len(chunkDescs)-1].lastTime(),
headChunkClosed: persistWatermark >= numChunkDescs, headChunkClosed: headChunkClosed,
} }
} }
return sm, chunksToPersist, nil return sm, chunksToPersist, nil

View file

@ -17,6 +17,7 @@ package local
import ( import (
"container/list" "container/list"
"fmt" "fmt"
"math"
"sync/atomic" "sync/atomic"
"time" "time"
@ -38,11 +39,23 @@ const (
// See waitForNextFP. // See waitForNextFP.
maxEvictInterval = time.Minute maxEvictInterval = time.Minute
// If numChunskToPersist is this percentage of maxChunksToPersist, we // Constants to control the hysteresis of entering and leaving "rushed
// consider the storage in "graceful degradation mode", i.e. we do not // mode". In rushed mode, the dirty series count is ignored for
// checkpoint anymore based on the dirty series count, and we do not // checkpointing, series are maintained as frequently as possible, and
// sync series files anymore if using the adaptive sync strategy. // series files are not synced if the adaptive sync strategy is used.
percentChunksToPersistForDegradation = 80 persintenceUrgencyScoreForEnteringRushedMode = 0.8
persintenceUrgencyScoreForLeavingRushedMode = 0.7
// This factor times -storage.local.memory-chunks is the number of
// memory chunks we tolerate before suspending ingestion (TODO!). It is
// also a basis for calculating the persistenceUrgencyScore.
toleranceFactorForMemChunks = 1.1
// This factor times -storage.local.max-chunks-to-persist is the minimum
// required number of chunks waiting for persistence before the number
// of chunks in memory may influence the persistenceUrgencyScore. (In
// other words: if there are no chunks to persist, it doesn't help chunk
// eviction if we speed up persistence.)
factorMinChunksToPersist = 0.2
) )
var ( var (
@ -110,7 +123,7 @@ type memorySeriesStorage struct {
// numChunksToPersist has to be aligned for atomic operations. // numChunksToPersist has to be aligned for atomic operations.
numChunksToPersist int64 // The number of chunks waiting for persistence. numChunksToPersist int64 // The number of chunks waiting for persistence.
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall. maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall.
degraded bool rushed bool // Whether the storage is in rushed mode.
fpLocker *fingerprintLocker fpLocker *fingerprintLocker
fpToSeries *seriesMap fpToSeries *seriesMap
@ -137,6 +150,8 @@ type memorySeriesStorage struct {
outOfOrderSamplesCount prometheus.Counter outOfOrderSamplesCount prometheus.Counter
invalidPreloadRequestsCount prometheus.Counter invalidPreloadRequestsCount prometheus.Counter
maintainSeriesDuration *prometheus.SummaryVec maintainSeriesDuration *prometheus.SummaryVec
persistenceUrgencyScore prometheus.Gauge
rushedMode prometheus.Gauge
} }
// MemorySeriesStorageOptions contains options needed by // MemorySeriesStorageOptions contains options needed by
@ -225,6 +240,18 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
}, },
[]string{seriesLocationLabel}, []string{seriesLocationLabel},
), ),
persistenceUrgencyScore: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persistence_urgency_score",
Help: "A score of urgency to persist chunks, 0 is least urgent, 1 most.",
}),
rushedMode: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "rushed_mode",
Help: "1 if the storage is in rushed mode, 0 otherwise. In rushed mode, the system behaves as if the persistence_urgency_score is 1.",
}),
} }
return s return s
} }
@ -238,7 +265,7 @@ func (s *memorySeriesStorage) Start() (err error) {
case Always: case Always:
syncStrategy = func() bool { return true } syncStrategy = func() bool { return true }
case Adaptive: case Adaptive:
syncStrategy = func() bool { return !s.isDegraded() } syncStrategy = func() bool { return s.calculatePersistenceUrgencyScore() < 1 }
default: default:
panic("unknown sync strategy") panic("unknown sync strategy")
} }
@ -805,8 +832,8 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger
case <-s.loopStopping: case <-s.loopStopping:
return return
} }
// Reduce the wait time by the backlog score. // Reduce the wait time according to the urgency score.
s.waitForNextFP(s.fpToSeries.length(), s.persistenceBacklogScore()) s.waitForNextFP(s.fpToSeries.length(), 1-s.calculatePersistenceUrgencyScore())
count++ count++
} }
if count > 0 { if count > 0 {
@ -898,9 +925,9 @@ loop:
// would be counterproductive, as it would slow down chunk persisting even more, // would be counterproductive, as it would slow down chunk persisting even more,
// while in a situation like that, where we are clearly lacking speed of disk // while in a situation like that, where we are clearly lacking speed of disk
// maintenance, the best we can do for crash recovery is to persist chunks as // maintenance, the best we can do for crash recovery is to persist chunks as
// quickly as possible. So only checkpoint if the storage is not in "graceful // quickly as possible. So only checkpoint if the urgency score is < 1.
// degradation mode". if dirtySeriesCount >= s.checkpointDirtySeriesLimit &&
if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.isDegraded() { s.calculatePersistenceUrgencyScore() < 1 {
checkpointTimer.Reset(0) checkpointTimer.Reset(0)
} }
} }
@ -1144,36 +1171,84 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) {
atomic.AddInt64(&s.numChunksToPersist, int64(by)) atomic.AddInt64(&s.numChunksToPersist, int64(by))
} }
// isDegraded returns whether the storage is in "graceful degradation mode", // calculatePersistenceUrgencyScore calculates and returns an urgency score for
// which is the case if the number of chunks waiting for persistence has reached // the speed of persisting chunks. The score is between 0 and 1, where 0 means
// a percentage of maxChunksToPersist that exceeds // no urgency at all and 1 means highest urgency.
// percentChunksToPersistForDegradation. The method is not goroutine safe (but //
// only ever called from the goroutine dealing with series maintenance). // The score is the maximum of the two following sub-scores:
// Changes of degradation mode are logged. //
func (s *memorySeriesStorage) isDegraded() bool { // (1) The first sub-score is the number of chunks waiting for persistence
nowDegraded := s.getNumChunksToPersist() > s.maxChunksToPersist*percentChunksToPersistForDegradation/100 // divided by the maximum number of chunks allowed to be waiting for
if s.degraded && !nowDegraded { // persistence.
log.Warn("Storage has left graceful degradation mode. Things are back to normal.") //
} else if !s.degraded && nowDegraded { // (2) If there are more chunks in memory than allowed AND there are more chunks
log.Warnf( // waiting for persistence than factorMinChunksToPersist times
"%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", // -storage.local.max-chunks-to-persist, then the second sub-score is the
s.getNumChunksToPersist(), // fraction the number of memory chunks has reached between
s.getNumChunksToPersist()*100/s.maxChunksToPersist, // -storage.local.memory-chunks and toleranceFactorForMemChunks times
s.maxChunksToPersist, // -storage.local.memory-chunks.
s.checkpointInterval) //
// Should the score ever hit persintenceUrgencyScoreForEnteringRushedMode, the
// storage locks into "rushed mode", in which the returned score is always
// bumped up to 1 until the non-bumped score is below
// persintenceUrgencyScoreForLeavingRushedMode.
//
// This method is not goroutine-safe, but it is only ever called by the single
// goroutine that is in charge of series maintenance. According to the returned
// score, series maintenence should be sped up. If a score of 1 is returned,
// checkpointing based on dirty-series count should be disabled, and series
// files should not by synced anymore provided the user has specified the
// adaptive sync strategy.
func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
var (
chunksToPersist = float64(s.getNumChunksToPersist())
maxChunksToPersist = float64(s.maxChunksToPersist)
memChunks = float64(atomic.LoadInt64(&numMemChunks))
maxMemChunks = float64(s.maxMemoryChunks)
)
score := chunksToPersist / maxChunksToPersist
if chunksToPersist > maxChunksToPersist*factorMinChunksToPersist {
score = math.Max(
score,
(memChunks/maxMemChunks-1)/(toleranceFactorForMemChunks-1),
)
} }
s.degraded = nowDegraded if score > 1 {
return s.degraded score = 1
} }
s.persistenceUrgencyScore.Set(score)
// persistenceBacklogScore works similar to isDegraded, but returns a score if s.rushed {
// about how close we are to degradation. This score is 1.0 if no chunks are // We are already in rushed mode. If the score is still above
// waiting for persistence and 0.0 if we are at or above the degradation // persintenceUrgencyScoreForLeavingRushedMode, return 1 and
// threshold. // leave things as they are.
func (s *memorySeriesStorage) persistenceBacklogScore() float64 { if score > persintenceUrgencyScoreForLeavingRushedMode {
score := 1 - float64(s.getNumChunksToPersist())/float64(s.maxChunksToPersist*percentChunksToPersistForDegradation/100) return 1
if score < 0 { }
return 0 // We are out of rushed mode!
s.rushed = false
s.rushedMode.Set(0)
log.
With("urgencyScore", score).
With("chunksToPersist", chunksToPersist).
With("maxChunksToPersist", maxChunksToPersist).
With("memoryChunks", memChunks).
With("maxMemoryChunks", maxMemChunks).
Warn("Storage has left rushed mode.")
return score
}
if score > persintenceUrgencyScoreForEnteringRushedMode {
// Enter rushed mode.
s.rushed = true
s.rushedMode.Set(1)
log.
With("urgencyScore", score).
With("chunksToPersist", chunksToPersist).
With("maxChunksToPersist", maxChunksToPersist).
With("memoryChunks", memChunks).
With("maxMemoryChunks", maxMemChunks).
Warn("Storage has entered rushed mode.")
return 1
} }
return score return score
} }
@ -1193,6 +1268,8 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
ch <- s.invalidPreloadRequestsCount.Desc() ch <- s.invalidPreloadRequestsCount.Desc()
ch <- numMemChunksDesc ch <- numMemChunksDesc
s.maintainSeriesDuration.Describe(ch) s.maintainSeriesDuration.Describe(ch)
ch <- s.persistenceUrgencyScore.Desc()
ch <- s.rushedMode.Desc()
} }
// Collect implements prometheus.Collector. // Collect implements prometheus.Collector.
@ -1222,4 +1299,6 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
float64(atomic.LoadInt64(&numMemChunks)), float64(atomic.LoadInt64(&numMemChunks)),
) )
s.maintainSeriesDuration.Collect(ch) s.maintainSeriesDuration.Collect(ch)
ch <- s.persistenceUrgencyScore
ch <- s.rushedMode
} }