From 972d94433a8caa214bf6fef5c6179b9af46cfd1c Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 25 Jan 2016 18:44:43 +0100 Subject: [PATCH] Introduce a hysteresis for "rushed mode" "Rushed mode" is formerly known as "degraded mode", which is changed with this commit, too. The name "degraded" was very misleading. Also, switch into rushed mode if we have too many chunks in memory and an at least reasonable amount of chunks to persist so that speeding up persisting chunks can help. --- storage/local/storage.go | 126 +++++++++++++++++++++++++++++---------- 1 file changed, 93 insertions(+), 33 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index 4e75f5f8d..3afc9fb86 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -17,6 +17,7 @@ package local import ( "container/list" "fmt" + "math" "sync/atomic" "time" @@ -38,11 +39,28 @@ const ( // See waitForNextFP. maxEvictInterval = time.Minute - // If numChunskToPersist is this percentage of maxChunksToPersist, we - // consider the storage in "graceful degradation mode", i.e. we do not - // checkpoint anymore based on the dirty series count, and we do not - // sync series files anymore if using the adaptive sync strategy. - percentChunksToPersistForDegradation = 80 + // Constants to control the hysteresis of entering and leaving "rushed + // mode". In rushed mode, the dirty series count is ignored for + // checkpointing, and series files are not synced if the adaptive sync + // strategy is used. + // + // If we reach 80% of -storage.local.max-chunks-to-persist, we enter + // "rushed mode". + factorChunksToPersistForEnteringRushedMode = 0.8 + // To leave "rushed mode", we must be below 70% of + // -storage.local.max-chunks-to-persist. + factorChunksToPersistForLeavingRushedMode = 0.7 + // To enter "rushed mode" for other reasons (see below), we must have at + // least 30% of -storage.local.max-chunks-to-persist. + factorMinChunksToPersistToAllowRushedMode = 0.3 + // If the number of chunks in memory reaches 110% of + // -storage.local.memory-chunks, we will enter "rushed mode" (provided + // we have enough chunks to persist at all, see + // factorMinChunksToPersistToAllowRushedMode.) + factorMemChunksForEnteringRushedMode = 1.1 + // To leave "rushed mode", we must be below 105% of + // -storage.local.memory-chunks. + factorMemChunksForLeavingRushedMode = 1.05 ) var ( @@ -110,7 +128,7 @@ type memorySeriesStorage struct { // numChunksToPersist has to be aligned for atomic operations. numChunksToPersist int64 // The number of chunks waiting for persistence. maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall. - degraded bool + rushed bool // Whether the storage is in rushed mode. fpLocker *fingerprintLocker fpToSeries *seriesMap @@ -238,7 +256,7 @@ func (s *memorySeriesStorage) Start() (err error) { case Always: syncStrategy = func() bool { return true } case Adaptive: - syncStrategy = func() bool { return !s.isDegraded() } + syncStrategy = func() bool { return !s.inRushedMode() } default: panic("unknown sync strategy") } @@ -898,9 +916,8 @@ loop: // would be counterproductive, as it would slow down chunk persisting even more, // while in a situation like that, where we are clearly lacking speed of disk // maintenance, the best we can do for crash recovery is to persist chunks as - // quickly as possible. So only checkpoint if the storage is not in "graceful - // degradation mode". - if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.isDegraded() { + // quickly as possible. So only checkpoint if the storage is not in "rushed mode". + if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.inRushedMode() { checkpointTimer.Reset(0) } } @@ -1144,37 +1161,80 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) { atomic.AddInt64(&s.numChunksToPersist, int64(by)) } -// isDegraded returns whether the storage is in "graceful degradation mode", -// which is the case if the number of chunks waiting for persistence has reached -// a percentage of maxChunksToPersist that exceeds -// percentChunksToPersistForDegradation. The method is not goroutine safe (but -// only ever called from the goroutine dealing with series maintenance). -// Changes of degradation mode are logged. -func (s *memorySeriesStorage) isDegraded() bool { - nowDegraded := s.getNumChunksToPersist() > s.maxChunksToPersist*percentChunksToPersistForDegradation/100 - if s.degraded && !nowDegraded { - log.Warn("Storage has left graceful degradation mode. Things are back to normal.") - } else if !s.degraded && nowDegraded { - log.Warnf( - "%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", - s.getNumChunksToPersist(), - s.getNumChunksToPersist()*100/s.maxChunksToPersist, - s.maxChunksToPersist, - s.checkpointInterval) +// inRushedMode returns whether the storage is in "rushed mode", which is the +// case if there are too many chunks waiting for persistence or there are too +// many chunks in memory. The method is not goroutine safe (but only ever called +// from the goroutine dealing with series maintenance). Changes of degradation +// mode are logged. +func (s *memorySeriesStorage) inRushedMode() bool { + chunksToPersist := float64(s.getNumChunksToPersist()) + memChunks := float64(atomic.LoadInt64(&numMemChunks)) + + if s.rushed { + // We are already in rushed mode, so check if we can get out of + // it, using the lower hysteresis thresholds. + s.rushed = chunksToPersist > float64(s.maxChunksToPersist)*factorChunksToPersistForLeavingRushedMode || + memChunks > float64(s.maxMemoryChunks)*factorMemChunksForLeavingRushedMode + if !s.rushed { + log.Warn("Storage has left rushed mode. Things are back to normal.") + } + return s.rushed } - s.degraded = nowDegraded - return s.degraded + // We are not rushed yet, so check the higher hysteresis threshold if we enter it now. + // First WRT chunksToPersist... + s.rushed = chunksToPersist > float64(s.maxChunksToPersist)*factorChunksToPersistForEnteringRushedMode + if s.rushed { + log.Warnf( + "%.0f chunks waiting for persistence (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", + chunksToPersist, + chunksToPersist*100/float64(s.maxChunksToPersist), + s.maxChunksToPersist, + s.checkpointInterval, + ) + return true + } + // ...then WRT memChunks. + s.rushed = memChunks > float64(s.maxMemoryChunks)*factorMemChunksForEnteringRushedMode && + chunksToPersist > float64(s.maxChunksToPersist)*factorMinChunksToPersistToAllowRushedMode + if s.rushed { + log.Warnf( + "%.0f chunks in memory (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", + memChunks, + memChunks*100/float64(s.maxMemoryChunks), + s.maxMemoryChunks, + s.checkpointInterval, + ) + } + return s.rushed } -// persistenceBacklogScore works similar to isDegraded, but returns a score +// persistenceBacklogScore works similar to inRushedMode, but returns a score // about how close we are to degradation. This score is 1.0 if no chunks are -// waiting for persistence and 0.0 if we are at or above the degradation -// threshold. +// waiting for persistence or we are not over the threshold for memory chunks, +// and 0.0 if we are at or above the thresholds. However, the score is always 0 +// if the storage is currently in rushed mode. (Getting out of it has a +// hysteresis, so we might be below thresholds again but still in rushed mode.) func (s *memorySeriesStorage) persistenceBacklogScore() float64 { - score := 1 - float64(s.getNumChunksToPersist())/float64(s.maxChunksToPersist*percentChunksToPersistForDegradation/100) + if s.inRushedMode() { + return 0 + } + + chunksToPersist := float64(s.getNumChunksToPersist()) + score := 1 - chunksToPersist/(float64(s.maxChunksToPersist)*factorChunksToPersistForEnteringRushedMode) + + if chunksToPersist > float64(s.maxChunksToPersist)*factorMinChunksToPersistToAllowRushedMode { + memChunks := float64(atomic.LoadInt64(&numMemChunks)) + score = math.Min( + score, + 1-(memChunks/float64(s.maxMemoryChunks)-1)/(factorMemChunksForEnteringRushedMode-1), + ) + } if score < 0 { return 0 } + if score > 1 { + return 1 + } return score }