From 699946bf326fb5c668b0801207b66e7a0d28b7e8 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 7 Jul 2015 01:10:14 +0200 Subject: [PATCH] Fix chunk desc loading. If all samples in consecutive chunks have the same timestamp, the way we used to load chunks will fail. With this change, the persist watermark is used to load the right amount of chunkDescs from disk. This bug is a possible reason for the rare storage corruption we have observed. --- storage/local/chunk.go | 1 + storage/local/crashrecovery.go | 7 +++---- storage/local/index/index.go | 1 + storage/local/persistence.go | 18 +++++++----------- storage/local/persistence_test.go | 4 ++-- storage/local/series.go | 2 +- storage/local/storage.go | 6 +++--- 7 files changed, 18 insertions(+), 21 deletions(-) diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 112198c0d..eb4751aa5 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) +// The DefaultChunkEncoding can be changed via a flag. var DefaultChunkEncoding = doubleDelta type chunkEncoding byte diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index cbb0061bf..4cccdbfbc 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -254,7 +254,7 @@ func (p *persistence) sanitizeSeries( // disk. Treat this series as a freshly unarchived one // by loading the chunkDescs and setting all parameters // based on the loaded chunkDescs. - cds, err := p.loadChunkDescs(fp, clientmodel.Latest) + cds, err := p.loadChunkDescs(fp, 0) if err != nil { log.Errorf( "Failed to load chunk descriptors for metric %v, fingerprint %v: %s", @@ -286,8 +286,7 @@ func (p *persistence) sanitizeSeries( // First, throw away the chunkDescs without chunks. s.chunkDescs = s.chunkDescs[s.persistWatermark:] numMemChunkDescs.Sub(float64(s.persistWatermark)) - // Load all the chunk descs. - cds, err := p.loadChunkDescs(fp, clientmodel.Latest) + cds, err := p.loadChunkDescs(fp, 0) if err != nil { log.Errorf( "Failed to load chunk descriptors for metric %v, fingerprint %v: %s", @@ -407,7 +406,7 @@ func (p *persistence) cleanUpArchiveIndexes( if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil { return err } - cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Latest) + cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), 0) if err != nil { return err } diff --git a/storage/local/index/index.go b/storage/local/index/index.go index dd31211fc..5749cba9a 100644 --- a/storage/local/index/index.go +++ b/storage/local/index/index.go @@ -33,6 +33,7 @@ const ( labelPairToFingerprintsDir = "labelpair_to_fingerprints" ) +// LevelDB cache sizes, changeable via flags. var ( FingerprintMetricCacheSize = 10 * 1024 * 1024 FingerprintTimeRangeCacheSize = 5 * 1024 * 1024 diff --git a/storage/local/persistence.go b/storage/local/persistence.go index a629e23f4..d1a6955b1 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -444,10 +444,11 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde return chunks, nil } -// loadChunkDescs loads chunkDescs for a series up until a given time. It is -// the caller's responsibility to not persist or drop anything for the same +// loadChunkDescs loads the chunkDescs for a series from disk. offsetFromEnd is +// the number of chunkDescs to skip from the end of the series file. It is the +// caller's responsibility to not persist or drop anything for the same // fingerprint concurrently. -func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) { +func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) { f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { return nil, nil @@ -469,8 +470,8 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie ) } - numChunks := int(fi.Size()) / chunkLenWithHeader - cds := make([]*chunkDesc, 0, numChunks) + numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd + cds := make([]*chunkDesc, numChunks) chunkTimesBuf := make([]byte, 16) for i := 0; i < numChunks; i++ { _, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) @@ -482,15 +483,10 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie if err != nil { return nil, err } - cd := &chunkDesc{ + cds[i] = &chunkDesc{ chunkFirstTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf)), chunkLastTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), } - if !cd.chunkLastTime.Before(beforeTime) { - // From here on, we have chunkDescs in memory already. - break - } - cds = append(cds, cd) } chunkDescOps.WithLabelValues(load).Add(float64(len(cds))) numMemChunkDescs.Add(float64(len(cds))) diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 34bb672b3..0dcd7997e 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -122,7 +122,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { } } // Load all chunk descs. - actualChunkDescs, err := p.loadChunkDescs(fp, 10) + actualChunkDescs, err := p.loadChunkDescs(fp, 0) if len(actualChunkDescs) != 10 { t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10) } @@ -974,7 +974,7 @@ func BenchmarkLoadChunkDescs(b *testing.B) { for i := 0; i < b.N; i++ { for _, s := range fpStrings { fp.LoadFromString(s) - cds, err := p.loadChunkDescs(fp, clientmodel.Latest) + cds, err := p.loadChunkDescs(fp, 0) if err != nil { b.Error(err) } diff --git a/storage/local/series.go b/storage/local/series.go index 07388a2e5..87a193eef 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -384,7 +384,7 @@ func (s *memorySeries) preloadChunksForRange( firstChunkDescTime = s.chunkDescs[0].firstTime() } if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { - cds, err := mss.loadChunkDescs(fp, firstChunkDescTime) + cds, err := mss.loadChunkDescs(fp, s.persistWatermark) if err != nil { return nil, err } diff --git a/storage/local/storage.go b/storage/local/storage.go index 7e0ef06c3..fa2461275 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -589,7 +589,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl // end up with a series without any chunkDescs for a // while (which is confusing as it makes the series // appear as archived or purged). - cds, err = s.loadChunkDescs(fp, clientmodel.Latest) + cds, err = s.loadChunkDescs(fp, 0) if err != nil { log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err) } @@ -1107,8 +1107,8 @@ func (s *memorySeriesStorage) loadChunks(fp clientmodel.Fingerprint, indexes []i } // See persistence.loadChunkDescs for detailed explanation. -func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) { - return s.persistence.loadChunkDescs(fp, beforeTime) +func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) { + return s.persistence.loadChunkDescs(fp, offsetFromEnd) } // getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way.