diff --git a/storage/local/storage.go b/storage/local/storage.go index d772758d9..8c8ca8d2f 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -1252,7 +1252,7 @@ func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fing func (s *MemorySeriesStorage) loop() { checkpointTimer := time.NewTimer(s.checkpointInterval) - dirtySeriesCount := 0 + var dirtySeriesCount int64 defer func() { checkpointTimer.Stop() @@ -1263,38 +1263,52 @@ func (s *MemorySeriesStorage) loop() { memoryFingerprints := s.cycleThroughMemoryFingerprints() archivedFingerprints := s.cycleThroughArchivedFingerprints() + // Checkpoints can happen concurrently with maintenance so even with heavy + // checkpointing there will still be sufficient progress on maintenance. + checkpointLoopStopped := make(chan struct{}) + go func() { + for { + select { + case <-s.loopStopping: + checkpointLoopStopped <- struct{}{} + return + case <-checkpointTimer.C: + // We clear this before the checkpoint so that dirtySeriesCount + // is an upper bound. + atomic.StoreInt64(&dirtySeriesCount, 0) + s.dirtySeries.Set(0) + err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) + if err != nil { + log.Errorln("Error while checkpointing:", err) + } + // If a checkpoint takes longer than checkpointInterval, unluckily timed + // combination with the Reset(0) call below can lead to a case where a + // time is lurking in C leading to repeated checkpointing without break. + select { + case <-checkpointTimer.C: // Get rid of the lurking time. + default: + } + checkpointTimer.Reset(s.checkpointInterval) + } + } + }() + loop: for { select { case <-s.loopStopping: break loop - case <-checkpointTimer.C: - err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) - if err != nil { - log.Errorln("Error while checkpointing:", err) - } else { - dirtySeriesCount = 0 - s.dirtySeries.Set(0) - } - // If a checkpoint takes longer than checkpointInterval, unluckily timed - // combination with the Reset(0) call below can lead to a case where a - // time is lurking in C leading to repeated checkpointing without break. - select { - case <-checkpointTimer.C: // Get rid of the lurking time. - default: - } - checkpointTimer.Reset(s.checkpointInterval) case fp := <-memoryFingerprints: if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) { - dirtySeriesCount++ - s.dirtySeries.Inc() + dirty := atomic.AddInt64(&dirtySeriesCount, 1) + s.dirtySeries.Set(float64(dirty)) // Check if we have enough "dirty" series so that we need an early checkpoint. // However, if we are already behind persisting chunks, creating a checkpoint // would be counterproductive, as it would slow down chunk persisting even more, // while in a situation like that, where we are clearly lacking speed of disk // maintenance, the best we can do for crash recovery is to persist chunks as // quickly as possible. So only checkpoint if the urgency score is < 1. - if dirtySeriesCount >= s.checkpointDirtySeriesLimit && + if dirty >= int64(s.checkpointDirtySeriesLimit) && s.calculatePersistenceUrgencyScore() < 1 { checkpointTimer.Reset(0) } @@ -1308,6 +1322,7 @@ loop: } for range archivedFingerprints { } + <-checkpointLoopStopped } // maintainMemorySeries maintains a series that is in memory (i.e. not