Increase resilience of the storage against data corruption - step 2.

Step 2: Add a flag -storage.local.pedantic-checks to check every
series file.

Also, remove countPersistedHeadChunks channel, which is unused.
This commit is contained in:
beorn7 2015-03-19 12:03:09 +01:00
parent 3d8d8928be
commit e25cca823c
5 changed files with 20 additions and 17 deletions

View file

@ -61,6 +61,7 @@ var (
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.") checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.") storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.")
storagePedanticChecks = flag.Bool("storage.local.pedantic-checks", false, "If set, a crash recovery will perform checks on each series file. This might take a very long time.")
printVersion = flag.Bool("version", false, "Print version information.") printVersion = flag.Bool("version", false, "Print version information.")
) )

View file

@ -221,7 +221,10 @@ func (p *persistence) sanitizeSeries(
if s == nil { if s == nil {
panic("fingerprint mapped to nil pointer") panic("fingerprint mapped to nil pointer")
} }
if bytesToTrim == 0 && s.chunkDescsOffset != -1 && chunksInFile == s.chunkDescsOffset+s.persistWatermark { if !p.pedanticChecks &&
bytesToTrim == 0 &&
s.chunkDescsOffset != -1 &&
chunksInFile == s.chunkDescsOffset+s.persistWatermark {
// Everything is consistent. We are good. // Everything is consistent. We are good.
return fp, true return fp, true
} }

View file

@ -117,12 +117,13 @@ type persistence struct {
dirtyMtx sync.Mutex // Protects dirty and becameDirty. dirtyMtx sync.Mutex // Protects dirty and becameDirty.
dirty bool // true if persistence was started in dirty state. dirty bool // true if persistence was started in dirty state.
becameDirty bool // true if an inconsistency came up during runtime. becameDirty bool // true if an inconsistency came up during runtime.
pedanticChecks bool // true if crash recovery should check each series.
dirtyFileName string // The file used for locking and to mark dirty state. dirtyFileName string // The file used for locking and to mark dirty state.
fLock flock.Releaser // The file lock to protect against concurrent usage. fLock flock.Releaser // The file lock to protect against concurrent usage.
} }
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
func newPersistence(basePath string, dirty bool) (*persistence, error) { func newPersistence(basePath string, dirty, pedanticChecks bool) (*persistence, error) {
dirtyPath := filepath.Join(basePath, dirtyFileName) dirtyPath := filepath.Join(basePath, dirtyFileName)
versionPath := filepath.Join(basePath, versionFileName) versionPath := filepath.Join(basePath, versionFileName)
@ -226,6 +227,7 @@ func newPersistence(basePath string, dirty bool) (*persistence, error) {
Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.", Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.",
}), }),
dirty: dirty, dirty: dirty,
pedanticChecks: pedanticChecks,
dirtyFileName: dirtyPath, dirtyFileName: dirtyPath,
fLock: fLock, fLock: fLock,
} }

View file

@ -36,7 +36,7 @@ var (
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, test.Closer) { func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, test.Closer) {
*defaultChunkEncoding = int(encoding) *defaultChunkEncoding = int(encoding)
dir := test.NewTemporaryDirectory("test_persistence", t) dir := test.NewTemporaryDirectory("test_persistence", t)
p, err := newPersistence(dir.Path(), false) p, err := newPersistence(dir.Path(), false, false)
if err != nil { if err != nil {
dir.Close() dir.Close()
t.Fatal(err) t.Fatal(err)

View file

@ -71,8 +71,6 @@ type memorySeriesStorage struct {
persistence *persistence persistence *persistence
countPersistedHeadChunks chan struct{}
evictList *list.List evictList *list.List
evictRequests chan evictRequest evictRequests chan evictRequest
evictStopping, evictStopped chan struct{} evictStopping, evictStopped chan struct{}
@ -95,12 +93,13 @@ type MemorySeriesStorageOptions struct {
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
Dirty bool // Force the storage to consider itself dirty on startup. Dirty bool // Force the storage to consider itself dirty on startup.
PedanticChecks bool // If dirty, perform crash-recovery checks on each series file.
} }
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage. // has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
p, err := newPersistence(o.PersistenceStoragePath, o.Dirty) p, err := newPersistence(o.PersistenceStoragePath, o.Dirty, o.PedanticChecks)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -133,8 +132,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
numChunksToPersist: numChunksToPersist, numChunksToPersist: numChunksToPersist,
persistence: p, persistence: p,
countPersistedHeadChunks: make(chan struct{}, 100),
evictList: list.New(), evictList: list.New(),
evictRequests: make(chan evictRequest, evictRequestsCap), evictRequests: make(chan evictRequest, evictRequestsCap),
evictStopping: make(chan struct{}), evictStopping: make(chan struct{}),