mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #2591 from prometheus/beorn7/storage
storage: Several optimizations of checkpointing
This commit is contained in:
commit
acd72ae1a7
|
@ -171,11 +171,11 @@ func init() {
|
|||
)
|
||||
cfg.fs.DurationVar(
|
||||
&cfg.storage.CheckpointInterval, "storage.local.checkpoint-interval", 5*time.Minute,
|
||||
"The period at which the in-memory metrics and the chunks not yet persisted to series files are checkpointed.",
|
||||
"The time to wait between checkpoints of in-memory metrics and chunks not yet persisted to series files. Note that a checkpoint is never triggered before at least as much time has passed as the last checkpoint took.",
|
||||
)
|
||||
cfg.fs.IntVar(
|
||||
&cfg.storage.CheckpointDirtySeriesLimit, "storage.local.checkpoint-dirty-series-limit", 5000,
|
||||
"If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.",
|
||||
"If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints. Also note that a checkpoint is never triggered before at least as much time has passed as the last checkpoint took.",
|
||||
)
|
||||
cfg.fs.Var(
|
||||
&cfg.storage.SyncStrategy, "storage.local.series-sync-strategy",
|
||||
|
|
|
@ -15,6 +15,7 @@ package local
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -626,7 +627,9 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
|
|||
// NOTE: Above, varint encoding is used consistently although uvarint would have
|
||||
// made more sense in many cases. This was simply a glitch while designing the
|
||||
// format.
|
||||
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
|
||||
func (p *persistence) checkpointSeriesMapAndHeads(
|
||||
ctx context.Context, fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker,
|
||||
) (err error) {
|
||||
log.Info("Checkpointing in-memory metrics and chunks...")
|
||||
p.checkpointing.Set(1)
|
||||
defer p.checkpointing.Set(0)
|
||||
|
@ -637,11 +640,16 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
|||
}
|
||||
|
||||
defer func() {
|
||||
syncErr := f.Sync()
|
||||
closeErr := f.Close()
|
||||
defer os.Remove(p.headsTempFileName()) // Just in case it was left behind.
|
||||
|
||||
if err != nil {
|
||||
// If we already had an error, do not bother to sync,
|
||||
// just close, ignoring any further error.
|
||||
f.Close()
|
||||
return
|
||||
}
|
||||
syncErr := f.Sync()
|
||||
closeErr := f.Close()
|
||||
err = syncErr
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -683,6 +691,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
|||
|
||||
var realNumberOfSeries uint64
|
||||
for m := range iter {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
func() { // Wrapped in function to use defer for unlocking the fp.
|
||||
fpLocker.Lock(m.fp)
|
||||
defer fpLocker.Unlock(m.fp)
|
||||
|
|
|
@ -15,6 +15,7 @@ package local
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -547,6 +548,27 @@ func TestPersistLoadDropChunksType1(t *testing.T) {
|
|||
testPersistLoadDropChunks(t, 1)
|
||||
}
|
||||
|
||||
func TestCancelCheckpoint(t *testing.T) {
|
||||
p, closer := newTestPersistence(t, 2)
|
||||
defer closer.Close()
|
||||
|
||||
fpLocker := newFingerprintLocker(10)
|
||||
sm := newSeriesMap()
|
||||
s, _ := newMemorySeries(m1, nil, time.Time{})
|
||||
sm.put(m1.FastFingerprint(), s)
|
||||
sm.put(m2.FastFingerprint(), s)
|
||||
sm.put(m3.FastFingerprint(), s)
|
||||
sm.put(m4.FastFingerprint(), s)
|
||||
sm.put(m5.FastFingerprint(), s)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
// Cancel right now to avoid races.
|
||||
cancel()
|
||||
if err := p.checkpointSeriesMapAndHeads(ctx, sm, fpLocker); err != context.Canceled {
|
||||
t.Fatalf("expected error %v, got %v", context.Canceled, err)
|
||||
}
|
||||
}
|
||||
|
||||
func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encoding) {
|
||||
p, closer := newTestPersistence(t, encoding)
|
||||
defer closer.Close()
|
||||
|
@ -584,7 +606,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
|
|||
sm.put(m4.FastFingerprint(), s4)
|
||||
sm.put(m5.FastFingerprint(), s5)
|
||||
|
||||
if err := p.checkpointSeriesMapAndHeads(sm, fpLocker); err != nil {
|
||||
if err := p.checkpointSeriesMapAndHeads(context.Background(), sm, fpLocker); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -462,7 +462,9 @@ func (s *MemorySeriesStorage) Stop() error {
|
|||
<-s.evictStopped
|
||||
|
||||
// One final checkpoint of the series map and the head chunks.
|
||||
if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil {
|
||||
if err := s.persistence.checkpointSeriesMapAndHeads(
|
||||
context.Background(), s.fpToSeries, s.fpLocker,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.mapper.checkpoint(); err != nil {
|
||||
|
@ -1421,11 +1423,13 @@ func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fing
|
|||
|
||||
func (s *MemorySeriesStorage) loop() {
|
||||
checkpointTimer := time.NewTimer(s.checkpointInterval)
|
||||
checkpointMinTimer := time.NewTimer(0)
|
||||
|
||||
var dirtySeriesCount int64
|
||||
|
||||
defer func() {
|
||||
checkpointTimer.Stop()
|
||||
checkpointMinTimer.Stop()
|
||||
log.Info("Maintenance loop stopped.")
|
||||
close(s.loopStopped)
|
||||
}()
|
||||
|
@ -1433,32 +1437,57 @@ func (s *MemorySeriesStorage) loop() {
|
|||
memoryFingerprints := s.cycleThroughMemoryFingerprints()
|
||||
archivedFingerprints := s.cycleThroughArchivedFingerprints()
|
||||
|
||||
checkpointCtx, checkpointCancel := context.WithCancel(context.Background())
|
||||
checkpointNow := make(chan struct{}, 1)
|
||||
|
||||
doCheckpoint := func() time.Duration {
|
||||
start := time.Now()
|
||||
// We clear this before the checkpoint so that dirtySeriesCount
|
||||
// is an upper bound.
|
||||
atomic.StoreInt64(&dirtySeriesCount, 0)
|
||||
s.dirtySeries.Set(0)
|
||||
select {
|
||||
case <-checkpointNow:
|
||||
// Signal cleared.
|
||||
default:
|
||||
// No signal pending.
|
||||
}
|
||||
err := s.persistence.checkpointSeriesMapAndHeads(
|
||||
checkpointCtx, s.fpToSeries, s.fpLocker,
|
||||
)
|
||||
if err == context.Canceled {
|
||||
log.Info("Checkpoint canceled.")
|
||||
} else if err != nil {
|
||||
s.persistErrors.Inc()
|
||||
log.Errorln("Error while checkpointing:", err)
|
||||
}
|
||||
return time.Since(start)
|
||||
}
|
||||
|
||||
// Checkpoints can happen concurrently with maintenance so even with heavy
|
||||
// checkpointing there will still be sufficient progress on maintenance.
|
||||
checkpointLoopStopped := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.loopStopping:
|
||||
case <-checkpointCtx.Done():
|
||||
checkpointLoopStopped <- struct{}{}
|
||||
return
|
||||
case <-checkpointMinTimer.C:
|
||||
var took time.Duration
|
||||
select {
|
||||
case <-checkpointCtx.Done():
|
||||
checkpointLoopStopped <- struct{}{}
|
||||
return
|
||||
case <-checkpointTimer.C:
|
||||
// We clear this before the checkpoint so that dirtySeriesCount
|
||||
// is an upper bound.
|
||||
atomic.StoreInt64(&dirtySeriesCount, 0)
|
||||
s.dirtySeries.Set(0)
|
||||
err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
|
||||
if err != nil {
|
||||
s.persistErrors.Inc()
|
||||
log.Errorln("Error while checkpointing:", err)
|
||||
took = doCheckpoint()
|
||||
case <-checkpointNow:
|
||||
if !checkpointTimer.Stop() {
|
||||
<-checkpointTimer.C
|
||||
}
|
||||
// If a checkpoint takes longer than checkpointInterval, unluckily timed
|
||||
// combination with the Reset(0) call below can lead to a case where a
|
||||
// time is lurking in C leading to repeated checkpointing without break.
|
||||
select {
|
||||
case <-checkpointTimer.C: // Get rid of the lurking time.
|
||||
default:
|
||||
took = doCheckpoint()
|
||||
}
|
||||
checkpointMinTimer.Reset(took)
|
||||
checkpointTimer.Reset(s.checkpointInterval)
|
||||
}
|
||||
}
|
||||
|
@ -1468,6 +1497,7 @@ loop:
|
|||
for {
|
||||
select {
|
||||
case <-s.loopStopping:
|
||||
checkpointCancel()
|
||||
break loop
|
||||
case fp := <-memoryFingerprints:
|
||||
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
|
||||
|
@ -1478,10 +1508,15 @@ loop:
|
|||
// would be counterproductive, as it would slow down chunk persisting even more,
|
||||
// while in a situation like that, where we are clearly lacking speed of disk
|
||||
// maintenance, the best we can do for crash recovery is to persist chunks as
|
||||
// quickly as possible. So only checkpoint if the urgency score is < 1.
|
||||
// quickly as possible. So only checkpoint if we are not in rushed mode.
|
||||
if _, rushed := s.getPersistenceUrgencyScore(); !rushed &&
|
||||
dirty >= int64(s.checkpointDirtySeriesLimit) {
|
||||
checkpointTimer.Reset(0)
|
||||
select {
|
||||
case checkpointNow <- struct{}{}:
|
||||
// Signal sent.
|
||||
default:
|
||||
// Signal already pending.
|
||||
}
|
||||
}
|
||||
}
|
||||
case fp := <-archivedFingerprints:
|
||||
|
|
Loading…
Reference in a new issue