diff --git a/storage/local/storage.go b/storage/local/storage.go index 1d85c75ef..1e8082fe6 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -32,9 +32,10 @@ const ( chunkLen = 1024 // See waitForNextFP. - fpMaxWaitDuration = 10 * time.Second fpMaxSweepTime = 6 * time.Hour + fpMaxWaitDuration = 10 * time.Second + // See waitForNextFP. maxEvictInterval = time.Minute // If numChunskToPersist is this percentage of maxChunksToPersist, we @@ -532,21 +533,31 @@ func (s *memorySeriesStorage) maybeEvict() { // another fingerprint so that we will process all fingerprints in a tenth of // s.dropAfter assuming that the system is doing nothing else, e.g. if we want // to drop chunks after 40h, we want to cycle through all fingerprints within -// 4h. However, the maximum sweep time is capped at fpMaxSweepTime. If -// s.loopStopped is closed, it will return false immediately. The estimation is -// based on the total number of fingerprints as passed in. -func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool { +// 4h. The estimation is based on the total number of fingerprints as passed +// in. However, the maximum sweep time is capped at fpMaxSweepTime. Also, the +// method will never wait for longer than fpMaxWaitDuration. +// +// The maxWaitDurationFactor can be used to reduce the waiting time if a faster +// processing is required (for example because unpersisted chunks pile up too +// much). +// +// Normally, the method returns true once the wait duration has passed. However, +// if s.loopStopped is closed, it will return false immediately. +func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFactor float64) bool { d := fpMaxWaitDuration if numberOfFPs != 0 { sweepTime := s.dropAfter / 10 if sweepTime > fpMaxSweepTime { sweepTime = fpMaxSweepTime } - d = sweepTime / time.Duration(numberOfFPs) - if d > fpMaxWaitDuration { - d = fpMaxWaitDuration + calculatedWait := time.Duration(float64(sweepTime) / float64(numberOfFPs) * maxWaitDurationFactor) + if calculatedWait < d { + d = calculatedWait } } + if d == 0 { + return true + } t := time.NewTimer(d) select { case <-t.C: @@ -575,7 +586,7 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel. for { // Initial wait, also important if there are no FPs yet. - if !s.waitForNextFP(s.fpToSeries.length()) { + if !s.waitForNextFP(s.fpToSeries.length(), 1) { return } begin := time.Now() @@ -587,7 +598,8 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel. case <-s.loopStopping: return } - s.waitForNextFP(s.fpToSeries.length()) + // Reduce the wait time by the backlog score. + s.waitForNextFP(s.fpToSeries.length(), s.persistenceBacklogScore()) count++ } if count > 0 { @@ -616,11 +628,11 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode ) if err != nil { glog.Error("Failed to lookup archived fingerprint ranges: ", err) - s.waitForNextFP(0) + s.waitForNextFP(0, 1) continue } // Initial wait, also important if there are no FPs yet. - if !s.waitForNextFP(len(archivedFPs)) { + if !s.waitForNextFP(len(archivedFPs), 1) { return } begin := time.Now() @@ -630,7 +642,8 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode case <-s.loopStopping: return } - s.waitForNextFP(len(archivedFPs)) + // Never speed up maintenance of archived FPs. + s.waitForNextFP(len(archivedFPs), 1) } if len(archivedFPs) > 0 { glog.Infof( @@ -945,7 +958,7 @@ func (s *memorySeriesStorage) isDegraded() bool { glog.Warning("Storage has left graceful degradation mode. Things are back to normal.") } else if !s.degraded && nowDegraded { glog.Warningf( - "%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v.", + "%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", s.getNumChunksToPersist(), s.getNumChunksToPersist()*100/s.maxChunksToPersist, s.maxChunksToPersist, @@ -955,6 +968,18 @@ func (s *memorySeriesStorage) isDegraded() bool { return s.degraded } +// persistenceBacklogScore works similar to isDegraded, but returns a score +// about how close we are to degradation. This score is 1.0 if no chunks are +// waiting for persistence and 0.0 if we are at or above the degradation +// threshold. +func (s *memorySeriesStorage) persistenceBacklogScore() float64 { + score := 1 - float64(s.getNumChunksToPersist())/float64(s.maxChunksToPersist*percentChunksToPersistForDegradation/100) + if score < 0 { + return 0 + } + return score +} + // Describe implements prometheus.Collector. func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.persistence.Describe(ch)