mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Increase resilience of the storage against data corruption - step 4.
Step 4: Add a configurable sync'ing of series files after modification.
This commit is contained in:
parent
11bd9ce1bd
commit
12ae6e9203
14
main.go
14
main.go
|
@ -59,6 +59,7 @@ var (
|
|||
|
||||
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory metrics and the chunks not yet persisted to series files are checkpointed.")
|
||||
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
|
||||
seriesSyncStrategy = flag.String("storage.local.series-sync-strategy", "adaptive", "When to sync series files after modification. Possible values: 'never', 'always', 'adaptive'. Sync'ing slows down storage performance but reduces the risk of data loss in case of an OS crash. With the 'adaptive' strategy, series files are sync'd for as long as the storage is not too much behind on chunk persistence.")
|
||||
|
||||
storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.")
|
||||
storagePedanticChecks = flag.Bool("storage.local.pedantic-checks", false, "If set, a crash recovery will perform checks on each series file. This might take a very long time.")
|
||||
|
@ -88,6 +89,18 @@ func NewPrometheus() *prometheus {
|
|||
|
||||
notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity)
|
||||
|
||||
var syncStrategy local.SyncStrategy
|
||||
switch *seriesSyncStrategy {
|
||||
case "never":
|
||||
syncStrategy = local.Never
|
||||
case "always":
|
||||
syncStrategy = local.Always
|
||||
case "adaptive":
|
||||
syncStrategy = local.Adaptive
|
||||
default:
|
||||
glog.Fatalf("Invalid flag value for 'storage.local.series-sync-strategy': %s", *seriesSyncStrategy)
|
||||
}
|
||||
|
||||
o := &local.MemorySeriesStorageOptions{
|
||||
MemoryChunks: *numMemoryChunks,
|
||||
MaxChunksToPersist: *maxChunksToPersist,
|
||||
|
@ -97,6 +110,7 @@ func NewPrometheus() *prometheus {
|
|||
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
|
||||
Dirty: *storageDirty,
|
||||
PedanticChecks: *storagePedanticChecks,
|
||||
SyncStrategy: syncStrategy,
|
||||
}
|
||||
memStorage, err := local.NewMemorySeriesStorage(o)
|
||||
if err != nil {
|
||||
|
|
|
@ -120,10 +120,12 @@ type persistence struct {
|
|||
pedanticChecks bool // true if crash recovery should check each series.
|
||||
dirtyFileName string // The file used for locking and to mark dirty state.
|
||||
fLock flock.Releaser // The file lock to protect against concurrent usage.
|
||||
|
||||
shouldSync syncStrategy
|
||||
}
|
||||
|
||||
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
|
||||
func newPersistence(basePath string, dirty, pedanticChecks bool) (*persistence, error) {
|
||||
func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync syncStrategy) (*persistence, error) {
|
||||
dirtyPath := filepath.Join(basePath, dirtyFileName)
|
||||
versionPath := filepath.Join(basePath, versionFileName)
|
||||
|
||||
|
@ -230,6 +232,7 @@ func newPersistence(basePath string, dirty, pedanticChecks bool) (*persistence,
|
|||
pedanticChecks: pedanticChecks,
|
||||
dirtyFileName: dirtyPath,
|
||||
fLock: fLock,
|
||||
shouldSync: shouldSync,
|
||||
}
|
||||
|
||||
if p.dirty {
|
||||
|
@ -342,7 +345,7 @@ func (p *persistence) persistChunks(fp clientmodel.Fingerprint, chunks []chunk)
|
|||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
defer f.Close()
|
||||
defer p.closeChunkFile(f)
|
||||
|
||||
if err := writeChunks(f, chunks); err != nil {
|
||||
return -1, err
|
||||
|
@ -947,7 +950,7 @@ func (p *persistence) dropAndPersistChunks(
|
|||
return
|
||||
}
|
||||
defer func() {
|
||||
temp.Close()
|
||||
p.closeChunkFile(temp)
|
||||
if err == nil {
|
||||
err = os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp))
|
||||
}
|
||||
|
@ -1232,6 +1235,19 @@ func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.F
|
|||
// would still be detected.
|
||||
}
|
||||
|
||||
// closeChunkFile first sync's the provided file if mandated so by the sync
|
||||
// strategy. Then it closes the file. Errors are logged.
|
||||
func (p *persistence) closeChunkFile(f *os.File) {
|
||||
if p.shouldSync() {
|
||||
if err := f.Sync(); err != nil {
|
||||
glog.Error("Error sync'ing file:", err)
|
||||
}
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
glog.Error("Error closing chunk file:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) {
|
||||
return os.Open(p.fileNameForFingerprint(fp))
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ var (
|
|||
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, test.Closer) {
|
||||
*defaultChunkEncoding = int(encoding)
|
||||
dir := test.NewTemporaryDirectory("test_persistence", t)
|
||||
p, err := newPersistence(dir.Path(), false, false)
|
||||
p, err := newPersistence(dir.Path(), false, false, func() bool { return false })
|
||||
if err != nil {
|
||||
dir.Close()
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -36,6 +36,12 @@ const (
|
|||
fpMaxSweepTime = 6 * time.Hour
|
||||
|
||||
maxEvictInterval = time.Minute
|
||||
|
||||
// If numChunskToPersist is this percentage of maxChunksToPersist, we
|
||||
// consider the storage in "graceful degradation mode", i.e. we do not
|
||||
// checkpoint anymore based on the dirty series count, and we do not
|
||||
// sync series files anymore if using the adaptive sync strategy.
|
||||
percentChunksToPersistForDegradation = 80
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -56,6 +62,21 @@ type evictRequest struct {
|
|||
evict bool
|
||||
}
|
||||
|
||||
// SyncStrategy is an enum to select a sync strategy for series files.
|
||||
type SyncStrategy int
|
||||
|
||||
// Possible values for SyncStrategy.
|
||||
const (
|
||||
_ SyncStrategy = iota
|
||||
Never
|
||||
Always
|
||||
Adaptive
|
||||
)
|
||||
|
||||
// A syncStrategy is a function that returns if series files should be sync'd or
|
||||
// not. It does not need to be goroutine safe.
|
||||
type syncStrategy func() bool
|
||||
|
||||
type memorySeriesStorage struct {
|
||||
fpLocker *fingerprintLocker
|
||||
fpToSeries *seriesMap
|
||||
|
@ -68,6 +89,7 @@ type memorySeriesStorage struct {
|
|||
|
||||
numChunksToPersist int64 // The number of chunks waiting for persistence.
|
||||
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall.
|
||||
degraded bool
|
||||
|
||||
persistence *persistence
|
||||
|
||||
|
@ -94,32 +116,14 @@ type MemorySeriesStorageOptions struct {
|
|||
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
|
||||
Dirty bool // Force the storage to consider itself dirty on startup.
|
||||
PedanticChecks bool // If dirty, perform crash-recovery checks on each series file.
|
||||
SyncStrategy SyncStrategy // Which sync strategy to apply to series files.
|
||||
}
|
||||
|
||||
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
|
||||
// has to be called to start the storage.
|
||||
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||
p, err := newPersistence(o.PersistenceStoragePath, o.Dirty, o.PedanticChecks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.Info("Loading series map and head chunks...")
|
||||
fpToSeries, numChunksToPersist, err := p.loadSeriesMapAndHeads()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.Infof("%d series loaded.", fpToSeries.length())
|
||||
numSeries := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "memory_series",
|
||||
Help: "The current number of series in memory.",
|
||||
})
|
||||
numSeries.Set(float64(fpToSeries.length()))
|
||||
|
||||
s := &memorySeriesStorage{
|
||||
fpLocker: newFingerprintLocker(1024),
|
||||
fpToSeries: fpToSeries,
|
||||
fpLocker: newFingerprintLocker(1024),
|
||||
|
||||
loopStopping: make(chan struct{}),
|
||||
loopStopped: make(chan struct{}),
|
||||
|
@ -129,8 +133,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
|||
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
||||
|
||||
maxChunksToPersist: o.MaxChunksToPersist,
|
||||
numChunksToPersist: numChunksToPersist,
|
||||
persistence: p,
|
||||
|
||||
evictList: list.New(),
|
||||
evictRequests: make(chan evictRequest, evictRequestsCap),
|
||||
|
@ -143,7 +145,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
|||
Name: "persist_errors_total",
|
||||
Help: "The total number of errors while persisting chunks.",
|
||||
}),
|
||||
numSeries: numSeries,
|
||||
numSeries: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "memory_series",
|
||||
Help: "The current number of series in memory.",
|
||||
}),
|
||||
seriesOps: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
|
@ -167,6 +174,32 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
|||
}),
|
||||
}
|
||||
|
||||
var syncStrategy syncStrategy
|
||||
switch o.SyncStrategy {
|
||||
case Never:
|
||||
syncStrategy = func() bool { return false }
|
||||
case Always:
|
||||
syncStrategy = func() bool { return true }
|
||||
case Adaptive:
|
||||
syncStrategy = func() bool { return !s.isDegraded() }
|
||||
default:
|
||||
panic("unknown sync strategy")
|
||||
}
|
||||
|
||||
p, err := newPersistence(o.PersistenceStoragePath, o.Dirty, o.PedanticChecks, syncStrategy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.persistence = p
|
||||
|
||||
glog.Info("Loading series map and head chunks...")
|
||||
s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.Infof("%d series loaded.", s.fpToSeries.length())
|
||||
s.numSeries.Set(float64(s.fpToSeries.length()))
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
@ -626,20 +659,14 @@ loop:
|
|||
case fp := <-memoryFingerprints:
|
||||
if s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) {
|
||||
dirtySeriesCount++
|
||||
// Check if we have enough "dirty" series so
|
||||
// that we need an early checkpoint. However,
|
||||
// if we are already at 90% capacity of the
|
||||
// persist queue, creating a checkpoint would be
|
||||
// counterproductive, as it would slow down
|
||||
// chunk persisting even more, while in a
|
||||
// situation like that, where we are clearly
|
||||
// lacking speed of disk maintenance, the best
|
||||
// we can do for crash recovery is to work
|
||||
// through the persist queue as quickly as
|
||||
// possible. So only checkpoint if the persist
|
||||
// queue is at most 90% full.
|
||||
if dirtySeriesCount >= s.checkpointDirtySeriesLimit &&
|
||||
s.getNumChunksToPersist() < s.maxChunksToPersist*9/10 {
|
||||
// Check if we have enough "dirty" series so that we need an early checkpoint.
|
||||
// However, if we are already behind persisting chunks, creating a checkpoint
|
||||
// would be counterproductive, as it would slow down chunk persisting even more,
|
||||
// while in a situation like that, where we are clearly lacking speed of disk
|
||||
// maintenance, the best we can do for crash recovery is to persist chunks as
|
||||
// quickly as possible. So only checkpoint if the storage is not in "graceful
|
||||
// degratadion mode".
|
||||
if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.isDegraded() {
|
||||
checkpointTimer.Reset(0)
|
||||
}
|
||||
}
|
||||
|
@ -884,6 +911,23 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) {
|
|||
atomic.AddInt64(&s.numChunksToPersist, int64(by))
|
||||
}
|
||||
|
||||
// isDegraded returns whether the storage is in "graceful degradation mode",
|
||||
// which is the case if the number of chunks waiting for persistence has reached
|
||||
// a percentage of maxChunksToPersist that exceepds
|
||||
// percentChunksToPersistForDegradation. The method is not goroutine safe (but
|
||||
// only ever called from the goroutine dealing with series maintenance).
|
||||
// Changes of degradation mode are logged.
|
||||
func (s *memorySeriesStorage) isDegraded() bool {
|
||||
nowDegraded := s.getNumChunksToPersist() > s.maxChunksToPersist*percentChunksToPersistForDegradation/100
|
||||
if s.degraded && !nowDegraded {
|
||||
glog.Warning("Storage has left graceful degradation mode.")
|
||||
} else if !s.degraded && nowDegraded {
|
||||
glog.Warning("%d chunks waiting for persistence (allowed maximum %d). Storage is now in graceful degradation mode. Series files are not sync'd anymore. Checkpoints will not be performed more often then every %v.", s.getNumChunksToPersist, s.maxChunksToPersist, s.checkpointInterval)
|
||||
}
|
||||
s.degraded = nowDegraded
|
||||
return s.degraded
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
|
||||
s.persistence.Describe(ch)
|
||||
|
|
|
@ -161,6 +161,7 @@ func TestLoop(t *testing.T) {
|
|||
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
|
||||
PersistenceStoragePath: directory.Path(),
|
||||
CheckpointInterval: 250 * time.Millisecond,
|
||||
SyncStrategy: Adaptive,
|
||||
}
|
||||
storage, err := NewMemorySeriesStorage(o)
|
||||
if err != nil {
|
||||
|
@ -673,6 +674,7 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
|
|||
PersistenceRetentionPeriod: time.Hour,
|
||||
PersistenceStoragePath: directory.Path(),
|
||||
CheckpointInterval: time.Second,
|
||||
SyncStrategy: Adaptive,
|
||||
}
|
||||
s, err := NewMemorySeriesStorage(o)
|
||||
if err != nil {
|
||||
|
|
|
@ -46,6 +46,7 @@ func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) {
|
|||
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
|
||||
PersistenceStoragePath: directory.Path(),
|
||||
CheckpointInterval: time.Hour,
|
||||
SyncStrategy: Adaptive,
|
||||
}
|
||||
storage, err := NewMemorySeriesStorage(o)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in a new issue