mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #442 from prometheus/beorn7/fix-crash-recovery
Fix ALL the crash-recovery related problems.
This commit is contained in:
commit
cca2e58f20
6
main.go
6
main.go
|
@ -58,7 +58,8 @@ var (
|
||||||
|
|
||||||
storageRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
|
storageRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
|
||||||
|
|
||||||
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
|
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
|
||||||
|
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
|
||||||
|
|
||||||
storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.")
|
storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.")
|
||||||
|
|
||||||
|
@ -118,7 +119,8 @@ func NewPrometheus() *prometheus {
|
||||||
PersistenceStoragePath: *metricsStoragePath,
|
PersistenceStoragePath: *metricsStoragePath,
|
||||||
PersistenceRetentionPeriod: *storageRetentionPeriod,
|
PersistenceRetentionPeriod: *storageRetentionPeriod,
|
||||||
CheckpointInterval: *checkpointInterval,
|
CheckpointInterval: *checkpointInterval,
|
||||||
Dirty: *storageDirty,
|
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
|
||||||
|
Dirty: *storageDirty,
|
||||||
}
|
}
|
||||||
memStorage, err := local.NewMemorySeriesStorage(o)
|
memStorage, err := local.NewMemorySeriesStorage(o)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -14,9 +14,9 @@
|
||||||
package local
|
package local
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
clientmodel "github.com/prometheus/client_golang/model"
|
clientmodel "github.com/prometheus/client_golang/model"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
"github.com/prometheus/prometheus/storage/metric"
|
||||||
)
|
)
|
||||||
|
|
|
@ -85,7 +85,7 @@ type indexingOp struct {
|
||||||
// A Persistence is used by a Storage implementation to store samples
|
// A Persistence is used by a Storage implementation to store samples
|
||||||
// persistently across restarts. The methods are only goroutine-safe if
|
// persistently across restarts. The methods are only goroutine-safe if
|
||||||
// explicitly marked as such below. The chunk-related methods PersistChunk,
|
// explicitly marked as such below. The chunk-related methods PersistChunk,
|
||||||
// DropChunks, LoadChunks, and LoadChunkDescs can be called concurrently with
|
// dropChunks, loadChunks, and loadChunkDescs can be called concurrently with
|
||||||
// each other if each call refers to a different fingerprint.
|
// each other if each call refers to a different fingerprint.
|
||||||
type persistence struct {
|
type persistence struct {
|
||||||
basePath string
|
basePath string
|
||||||
|
@ -350,26 +350,31 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// sanitizeSeries sanitizes a series based on its series file as defined by the provided directory and FileInfo.
|
// sanitizeSeries sanitizes a series based on its series file as defined by the
|
||||||
// The method returns the fingerprint as derived from the directory and file name, and whether the provided
|
// provided directory and FileInfo. The method returns the fingerprint as
|
||||||
// file has been sanitized. A file that failed to be sanitized is deleted, if possible.
|
// derived from the directory and file name, and whether the provided file has
|
||||||
|
// been sanitized. A file that failed to be sanitized is deleted, if possible.
|
||||||
//
|
//
|
||||||
// The following steps are performed:
|
// The following steps are performed:
|
||||||
//
|
//
|
||||||
// - A file whose name doesn't comply with the naming scheme of a series file is simply deleted.
|
// - A file whose name doesn't comply with the naming scheme of a series file is
|
||||||
|
// simply deleted.
|
||||||
//
|
//
|
||||||
// - If the size of the series file isn't a multiple of the chunk size, extraneous bytes are truncated.
|
// - If the size of the series file isn't a multiple of the chunk size,
|
||||||
// If the truncation fails, the file is deleted instead.
|
// extraneous bytes are truncated. If the truncation fails, the file is
|
||||||
|
// deleted instead.
|
||||||
//
|
//
|
||||||
// - A file that is empty (after truncation) is deleted.
|
// - A file that is empty (after truncation) is deleted.
|
||||||
//
|
//
|
||||||
// - A series that is not archived (i.e. it is in the fingerprintToSeries map) is checked for consistency of
|
// - A series that is not archived (i.e. it is in the fingerprintToSeries map)
|
||||||
// its various parameters (like head-chunk persistence state, offset of chunkDescs etc.). In particular,
|
// is checked for consistency of its various parameters (like head-chunk
|
||||||
// overlap between an in-memory head chunk with the most recent persisted chunk is checked. Inconsistencies
|
// persistence state, offset of chunkDescs etc.). In particular, overlap
|
||||||
// are rectified.
|
// between an in-memory head chunk with the most recent persisted chunk is
|
||||||
|
// checked. Inconsistencies are rectified.
|
||||||
//
|
//
|
||||||
// - A series this in archived (i.e. it is not in the fingerprintToSeries map) is checked for its presence
|
// - A series this in archived (i.e. it is not in the fingerprintToSeries map)
|
||||||
// in the index of archived series. If it cannot be found there, it is deleted.
|
// is checked for its presence in the index of archived series. If it cannot
|
||||||
|
// be found there, it is deleted.
|
||||||
func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) {
|
func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) {
|
||||||
filename := path.Join(dirname, fi.Name())
|
filename := path.Join(dirname, fi.Name())
|
||||||
purge := func() {
|
purge := func() {
|
||||||
|
|
|
@ -397,7 +397,8 @@ func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
|
||||||
}
|
}
|
||||||
|
|
||||||
// head returns a pointer to the head chunk descriptor. The caller must have
|
// head returns a pointer to the head chunk descriptor. The caller must have
|
||||||
// locked the fingerprint of the memorySeries.
|
// locked the fingerprint of the memorySeries. This method will panic if this
|
||||||
|
// series has no chunk descriptors.
|
||||||
func (s *memorySeries) head() *chunkDesc {
|
func (s *memorySeries) head() *chunkDesc {
|
||||||
return s.chunkDescs[len(s.chunkDescs)-1]
|
return s.chunkDescs[len(s.chunkDescs)-1]
|
||||||
}
|
}
|
||||||
|
@ -411,12 +412,6 @@ func (s *memorySeries) firstTime() clientmodel.Timestamp {
|
||||||
return s.savedFirstTime
|
return s.savedFirstTime
|
||||||
}
|
}
|
||||||
|
|
||||||
// lastTime returns the timestamp of the last sample in the series. The caller
|
|
||||||
// must have locked the fingerprint of the memorySeries.
|
|
||||||
func (s *memorySeries) lastTime() clientmodel.Timestamp {
|
|
||||||
return s.head().lastTime()
|
|
||||||
}
|
|
||||||
|
|
||||||
// memorySeriesIterator implements SeriesIterator.
|
// memorySeriesIterator implements SeriesIterator.
|
||||||
type memorySeriesIterator struct {
|
type memorySeriesIterator struct {
|
||||||
lock, unlock func()
|
lock, unlock func()
|
||||||
|
|
|
@ -16,6 +16,7 @@ package local
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
|
"math"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -63,15 +64,18 @@ type memorySeriesStorage struct {
|
||||||
fpLocker *fingerprintLocker
|
fpLocker *fingerprintLocker
|
||||||
fpToSeries *seriesMap
|
fpToSeries *seriesMap
|
||||||
|
|
||||||
loopStopping, loopStopped chan struct{}
|
loopStopping, loopStopped chan struct{}
|
||||||
maxMemoryChunks int
|
maxMemoryChunks int
|
||||||
purgeAfter time.Duration
|
purgeAfter time.Duration
|
||||||
checkpointInterval time.Duration
|
checkpointInterval time.Duration
|
||||||
|
checkpointDirtySeriesLimit int
|
||||||
|
|
||||||
persistQueue chan persistRequest
|
persistQueue chan persistRequest
|
||||||
persistStopped chan struct{}
|
persistStopped chan struct{}
|
||||||
persistence *persistence
|
persistence *persistence
|
||||||
|
|
||||||
|
countPersistedHeadChunks chan struct{}
|
||||||
|
|
||||||
evictList *list.List
|
evictList *list.List
|
||||||
evictRequests chan evictRequest
|
evictRequests chan evictRequest
|
||||||
evictStopping, evictStopped chan struct{}
|
evictStopping, evictStopped chan struct{}
|
||||||
|
@ -94,6 +98,7 @@ type MemorySeriesStorageOptions struct {
|
||||||
PersistenceStoragePath string // Location of persistence files.
|
PersistenceStoragePath string // Location of persistence files.
|
||||||
PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
|
PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
|
||||||
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
|
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
|
||||||
|
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
|
||||||
Dirty bool // Force the storage to consider itself dirty on startup.
|
Dirty bool // Force the storage to consider itself dirty on startup.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,16 +127,19 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
fpLocker: newFingerprintLocker(256),
|
fpLocker: newFingerprintLocker(256),
|
||||||
fpToSeries: fpToSeries,
|
fpToSeries: fpToSeries,
|
||||||
|
|
||||||
loopStopping: make(chan struct{}),
|
loopStopping: make(chan struct{}),
|
||||||
loopStopped: make(chan struct{}),
|
loopStopped: make(chan struct{}),
|
||||||
maxMemoryChunks: o.MemoryChunks,
|
maxMemoryChunks: o.MemoryChunks,
|
||||||
purgeAfter: o.PersistenceRetentionPeriod,
|
purgeAfter: o.PersistenceRetentionPeriod,
|
||||||
checkpointInterval: o.CheckpointInterval,
|
checkpointInterval: o.CheckpointInterval,
|
||||||
|
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
||||||
|
|
||||||
persistQueue: make(chan persistRequest, persistQueueCap),
|
persistQueue: make(chan persistRequest, persistQueueCap),
|
||||||
persistStopped: make(chan struct{}),
|
persistStopped: make(chan struct{}),
|
||||||
persistence: p,
|
persistence: p,
|
||||||
|
|
||||||
|
countPersistedHeadChunks: make(chan struct{}, 1024),
|
||||||
|
|
||||||
evictList: list.New(),
|
evictList: list.New(),
|
||||||
evictRequests: make(chan evictRequest, evictRequestsCap),
|
evictRequests: make(chan evictRequest, evictRequestsCap),
|
||||||
evictStopping: make(chan struct{}),
|
evictStopping: make(chan struct{}),
|
||||||
|
@ -362,11 +370,20 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
|
||||||
Timestamp: sample.Timestamp,
|
Timestamp: sample.Timestamp,
|
||||||
})
|
})
|
||||||
s.fpLocker.Unlock(fp)
|
s.fpLocker.Unlock(fp)
|
||||||
|
if len(chunkDescsToPersist) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
// Queue only outside of the locked area, processing the persistQueue
|
// Queue only outside of the locked area, processing the persistQueue
|
||||||
// requires the same lock!
|
// requires the same lock!
|
||||||
for _, cd := range chunkDescsToPersist {
|
for _, cd := range chunkDescsToPersist {
|
||||||
s.persistQueue <- persistRequest{fp, cd}
|
s.persistQueue <- persistRequest{fp, cd}
|
||||||
}
|
}
|
||||||
|
// Count that a head chunk was persisted, but only best effort, i.e. we
|
||||||
|
// don't want to block here.
|
||||||
|
select {
|
||||||
|
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
|
||||||
|
default: // Meh...
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
|
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
|
||||||
|
@ -641,10 +658,19 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memorySeriesStorage) loop() {
|
func (s *memorySeriesStorage) loop() {
|
||||||
checkpointTicker := time.NewTicker(s.checkpointInterval)
|
checkpointTimer := time.NewTimer(s.checkpointInterval)
|
||||||
|
|
||||||
|
// We take the number of head chunks persisted since the last checkpoint
|
||||||
|
// as an approximation for the number of series that are "dirty",
|
||||||
|
// i.e. whose head chunk is different from the one in the most recent
|
||||||
|
// checkpoint or for which the fact that the head chunk has been
|
||||||
|
// persisted is not reflected in the most recent checkpoint. This count
|
||||||
|
// could overestimate the number of dirty series, but it's good enough
|
||||||
|
// as a heuristics.
|
||||||
|
headChunksPersistedSinceLastCheckpoint := 0
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
checkpointTicker.Stop()
|
checkpointTimer.Stop()
|
||||||
glog.Info("Maintenance loop stopped.")
|
glog.Info("Maintenance loop stopped.")
|
||||||
close(s.loopStopped)
|
close(s.loopStopped)
|
||||||
}()
|
}()
|
||||||
|
@ -657,15 +683,21 @@ loop:
|
||||||
select {
|
select {
|
||||||
case <-s.loopStopping:
|
case <-s.loopStopping:
|
||||||
break loop
|
break loop
|
||||||
case <-checkpointTicker.C:
|
case <-checkpointTimer.C:
|
||||||
s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
|
s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
|
||||||
|
headChunksPersistedSinceLastCheckpoint = 0
|
||||||
|
checkpointTimer.Reset(s.checkpointInterval)
|
||||||
case fp := <-memoryFingerprints:
|
case fp := <-memoryFingerprints:
|
||||||
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))
|
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))
|
||||||
s.maintainSeries(fp)
|
|
||||||
s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
|
s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
|
||||||
case fp := <-archivedFingerprints:
|
case fp := <-archivedFingerprints:
|
||||||
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))
|
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))
|
||||||
s.seriesOps.WithLabelValues(archiveMaintenance).Inc()
|
s.seriesOps.WithLabelValues(archiveMaintenance).Inc()
|
||||||
|
case <-s.countPersistedHeadChunks:
|
||||||
|
headChunksPersistedSinceLastCheckpoint++
|
||||||
|
if headChunksPersistedSinceLastCheckpoint >= s.checkpointDirtySeriesLimit {
|
||||||
|
checkpointTimer.Reset(0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Wait until both channels are closed.
|
// Wait until both channels are closed.
|
||||||
|
@ -676,7 +708,8 @@ loop:
|
||||||
}
|
}
|
||||||
|
|
||||||
// maintainSeries closes the head chunk if not touched in a while. It archives a
|
// maintainSeries closes the head chunk if not touched in a while. It archives a
|
||||||
// series if all chunks are evicted. It evicts chunkDescs if there are too many.
|
// series if all chunks are evicted. It evicts chunkDescs if there are too
|
||||||
|
// many.
|
||||||
func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
|
func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
|
||||||
var headChunkToPersist *chunkDesc
|
var headChunkToPersist *chunkDesc
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
|
@ -686,6 +719,12 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
|
||||||
if headChunkToPersist != nil {
|
if headChunkToPersist != nil {
|
||||||
s.persistQueue <- persistRequest{fp, headChunkToPersist}
|
s.persistQueue <- persistRequest{fp, headChunkToPersist}
|
||||||
}
|
}
|
||||||
|
// Count that a head chunk was persisted, but only best effort, i.e. we
|
||||||
|
// don't want to block here.
|
||||||
|
select {
|
||||||
|
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
|
||||||
|
default: // Meh...
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
series, ok := s.fpToSeries.get(fp)
|
series, ok := s.fpToSeries.get(fp)
|
||||||
|
@ -704,13 +743,23 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
|
||||||
if iOldestNotEvicted == -1 {
|
if iOldestNotEvicted == -1 {
|
||||||
s.fpToSeries.del(fp)
|
s.fpToSeries.del(fp)
|
||||||
s.numSeries.Dec()
|
s.numSeries.Dec()
|
||||||
|
// Make sure we have a head chunk descriptor (a freshly
|
||||||
|
// unarchived series has none).
|
||||||
|
if len(series.chunkDescs) == 0 {
|
||||||
|
cds, err := s.loadChunkDescs(fp, math.MaxInt64)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", series.metric, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
series.chunkDescs = cds
|
||||||
|
}
|
||||||
if err := s.persistence.archiveMetric(
|
if err := s.persistence.archiveMetric(
|
||||||
fp, series.metric, series.firstTime(), series.lastTime(),
|
fp, series.metric, series.firstTime(), series.head().lastTime(),
|
||||||
); err != nil {
|
); err != nil {
|
||||||
glog.Errorf("Error archiving metric %v: %v", series.metric, err)
|
glog.Errorf("Error archiving metric %v: %v", series.metric, err)
|
||||||
} else {
|
return
|
||||||
s.seriesOps.WithLabelValues(archive).Inc()
|
|
||||||
}
|
}
|
||||||
|
s.seriesOps.WithLabelValues(archive).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// If we are here, the series is not archived, so check for chunkDesc
|
// If we are here, the series is not archived, so check for chunkDesc
|
||||||
|
|
|
@ -382,7 +382,7 @@ func TestEvictAndPurgeSeries(t *testing.T) {
|
||||||
// Archive metrics.
|
// Archive metrics.
|
||||||
ms.fpToSeries.del(fp)
|
ms.fpToSeries.del(fp)
|
||||||
if err := ms.persistence.archiveMetric(
|
if err := ms.persistence.archiveMetric(
|
||||||
fp, series.metric, series.firstTime(), series.lastTime(),
|
fp, series.metric, series.firstTime(), series.head().lastTime(),
|
||||||
); err != nil {
|
); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue