diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 54aee4e12..c990c9460 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -171,11 +171,11 @@ func init() { ) cfg.fs.DurationVar( &cfg.storage.CheckpointInterval, "storage.local.checkpoint-interval", 5*time.Minute, - "The period at which the in-memory metrics and the chunks not yet persisted to series files are checkpointed.", + "The time to wait between checkpoints of in-memory metrics and chunks not yet persisted to series files. Note that a checkpoint is never triggered before at least as much time has passed as the last checkpoint took.", ) cfg.fs.IntVar( &cfg.storage.CheckpointDirtySeriesLimit, "storage.local.checkpoint-dirty-series-limit", 5000, - "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.", + "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints. Also note that a checkpoint is never triggered before at least as much time has passed as the last checkpoint took.", ) cfg.fs.Var( &cfg.storage.SyncStrategy, "storage.local.series-sync-strategy", diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 393aebbef..689900f31 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -15,6 +15,7 @@ package local import ( "bufio" + "context" "encoding/binary" "fmt" "io" @@ -626,7 +627,9 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([ // NOTE: Above, varint encoding is used consistently although uvarint would have // made more sense in many cases. This was simply a glitch while designing the // format. -func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { +func (p *persistence) checkpointSeriesMapAndHeads( + ctx context.Context, fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker, +) (err error) { log.Info("Checkpointing in-memory metrics and chunks...") p.checkpointing.Set(1) defer p.checkpointing.Set(0) @@ -637,11 +640,16 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } defer func() { - syncErr := f.Sync() - closeErr := f.Close() + defer os.Remove(p.headsTempFileName()) // Just in case it was left behind. + if err != nil { + // If we already had an error, do not bother to sync, + // just close, ignoring any further error. + f.Close() return } + syncErr := f.Sync() + closeErr := f.Close() err = syncErr if err != nil { return @@ -683,6 +691,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap var realNumberOfSeries uint64 for m := range iter { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } func() { // Wrapped in function to use defer for unlocking the fp. fpLocker.Lock(m.fp) defer fpLocker.Unlock(m.fp) diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 33a8195a3..a468d7261 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -15,6 +15,7 @@ package local import ( "bufio" + "context" "errors" "os" "path/filepath" @@ -547,6 +548,27 @@ func TestPersistLoadDropChunksType1(t *testing.T) { testPersistLoadDropChunks(t, 1) } +func TestCancelCheckpoint(t *testing.T) { + p, closer := newTestPersistence(t, 2) + defer closer.Close() + + fpLocker := newFingerprintLocker(10) + sm := newSeriesMap() + s, _ := newMemorySeries(m1, nil, time.Time{}) + sm.put(m1.FastFingerprint(), s) + sm.put(m2.FastFingerprint(), s) + sm.put(m3.FastFingerprint(), s) + sm.put(m4.FastFingerprint(), s) + sm.put(m5.FastFingerprint(), s) + + ctx, cancel := context.WithCancel(context.Background()) + // Cancel right now to avoid races. + cancel() + if err := p.checkpointSeriesMapAndHeads(ctx, sm, fpLocker); err != context.Canceled { + t.Fatalf("expected error %v, got %v", context.Canceled, err) + } +} + func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -584,7 +606,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin sm.put(m4.FastFingerprint(), s4) sm.put(m5.FastFingerprint(), s5) - if err := p.checkpointSeriesMapAndHeads(sm, fpLocker); err != nil { + if err := p.checkpointSeriesMapAndHeads(context.Background(), sm, fpLocker); err != nil { t.Fatal(err) } diff --git a/storage/local/storage.go b/storage/local/storage.go index d96185c08..6e799f010 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -462,7 +462,9 @@ func (s *MemorySeriesStorage) Stop() error { <-s.evictStopped // One final checkpoint of the series map and the head chunks. - if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil { + if err := s.persistence.checkpointSeriesMapAndHeads( + context.Background(), s.fpToSeries, s.fpLocker, + ); err != nil { return err } if err := s.mapper.checkpoint(); err != nil { @@ -1421,11 +1423,13 @@ func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fing func (s *MemorySeriesStorage) loop() { checkpointTimer := time.NewTimer(s.checkpointInterval) + checkpointMinTimer := time.NewTimer(0) var dirtySeriesCount int64 defer func() { checkpointTimer.Stop() + checkpointMinTimer.Stop() log.Info("Maintenance loop stopped.") close(s.loopStopped) }() @@ -1433,32 +1437,57 @@ func (s *MemorySeriesStorage) loop() { memoryFingerprints := s.cycleThroughMemoryFingerprints() archivedFingerprints := s.cycleThroughArchivedFingerprints() + checkpointCtx, checkpointCancel := context.WithCancel(context.Background()) + checkpointNow := make(chan struct{}, 1) + + doCheckpoint := func() time.Duration { + start := time.Now() + // We clear this before the checkpoint so that dirtySeriesCount + // is an upper bound. + atomic.StoreInt64(&dirtySeriesCount, 0) + s.dirtySeries.Set(0) + select { + case <-checkpointNow: + // Signal cleared. + default: + // No signal pending. + } + err := s.persistence.checkpointSeriesMapAndHeads( + checkpointCtx, s.fpToSeries, s.fpLocker, + ) + if err == context.Canceled { + log.Info("Checkpoint canceled.") + } else if err != nil { + s.persistErrors.Inc() + log.Errorln("Error while checkpointing:", err) + } + return time.Since(start) + } + // Checkpoints can happen concurrently with maintenance so even with heavy // checkpointing there will still be sufficient progress on maintenance. checkpointLoopStopped := make(chan struct{}) go func() { for { select { - case <-s.loopStopping: + case <-checkpointCtx.Done(): checkpointLoopStopped <- struct{}{} return - case <-checkpointTimer.C: - // We clear this before the checkpoint so that dirtySeriesCount - // is an upper bound. - atomic.StoreInt64(&dirtySeriesCount, 0) - s.dirtySeries.Set(0) - err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) - if err != nil { - s.persistErrors.Inc() - log.Errorln("Error while checkpointing:", err) - } - // If a checkpoint takes longer than checkpointInterval, unluckily timed - // combination with the Reset(0) call below can lead to a case where a - // time is lurking in C leading to repeated checkpointing without break. + case <-checkpointMinTimer.C: + var took time.Duration select { - case <-checkpointTimer.C: // Get rid of the lurking time. - default: + case <-checkpointCtx.Done(): + checkpointLoopStopped <- struct{}{} + return + case <-checkpointTimer.C: + took = doCheckpoint() + case <-checkpointNow: + if !checkpointTimer.Stop() { + <-checkpointTimer.C + } + took = doCheckpoint() } + checkpointMinTimer.Reset(took) checkpointTimer.Reset(s.checkpointInterval) } } @@ -1468,6 +1497,7 @@ loop: for { select { case <-s.loopStopping: + checkpointCancel() break loop case fp := <-memoryFingerprints: if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) { @@ -1478,10 +1508,15 @@ loop: // would be counterproductive, as it would slow down chunk persisting even more, // while in a situation like that, where we are clearly lacking speed of disk // maintenance, the best we can do for crash recovery is to persist chunks as - // quickly as possible. So only checkpoint if the urgency score is < 1. + // quickly as possible. So only checkpoint if we are not in rushed mode. if _, rushed := s.getPersistenceUrgencyScore(); !rushed && dirty >= int64(s.checkpointDirtySeriesLimit) { - checkpointTimer.Reset(0) + select { + case checkpointNow <- struct{}{}: + // Signal sent. + default: + // Signal already pending. + } } } case fp := <-archivedFingerprints: