diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 112198c0de..eb4751aa57 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) +// The DefaultChunkEncoding can be changed via a flag. var DefaultChunkEncoding = doubleDelta type chunkEncoding byte diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index cbb0061bf4..4cccdbfbc2 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -254,7 +254,7 @@ func (p *persistence) sanitizeSeries( // disk. Treat this series as a freshly unarchived one // by loading the chunkDescs and setting all parameters // based on the loaded chunkDescs. - cds, err := p.loadChunkDescs(fp, clientmodel.Latest) + cds, err := p.loadChunkDescs(fp, 0) if err != nil { log.Errorf( "Failed to load chunk descriptors for metric %v, fingerprint %v: %s", @@ -286,8 +286,7 @@ func (p *persistence) sanitizeSeries( // First, throw away the chunkDescs without chunks. s.chunkDescs = s.chunkDescs[s.persistWatermark:] numMemChunkDescs.Sub(float64(s.persistWatermark)) - // Load all the chunk descs. - cds, err := p.loadChunkDescs(fp, clientmodel.Latest) + cds, err := p.loadChunkDescs(fp, 0) if err != nil { log.Errorf( "Failed to load chunk descriptors for metric %v, fingerprint %v: %s", @@ -407,7 +406,7 @@ func (p *persistence) cleanUpArchiveIndexes( if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil { return err } - cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Latest) + cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), 0) if err != nil { return err } diff --git a/storage/local/index/index.go b/storage/local/index/index.go index dd31211fcf..5749cba9a8 100644 --- a/storage/local/index/index.go +++ b/storage/local/index/index.go @@ -33,6 +33,7 @@ const ( labelPairToFingerprintsDir = "labelpair_to_fingerprints" ) +// LevelDB cache sizes, changeable via flags. var ( FingerprintMetricCacheSize = 10 * 1024 * 1024 FingerprintTimeRangeCacheSize = 5 * 1024 * 1024 diff --git a/storage/local/persistence.go b/storage/local/persistence.go index a629e23f43..d1a6955b1f 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -444,10 +444,11 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde return chunks, nil } -// loadChunkDescs loads chunkDescs for a series up until a given time. It is -// the caller's responsibility to not persist or drop anything for the same +// loadChunkDescs loads the chunkDescs for a series from disk. offsetFromEnd is +// the number of chunkDescs to skip from the end of the series file. It is the +// caller's responsibility to not persist or drop anything for the same // fingerprint concurrently. -func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) { +func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) { f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { return nil, nil @@ -469,8 +470,8 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie ) } - numChunks := int(fi.Size()) / chunkLenWithHeader - cds := make([]*chunkDesc, 0, numChunks) + numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd + cds := make([]*chunkDesc, numChunks) chunkTimesBuf := make([]byte, 16) for i := 0; i < numChunks; i++ { _, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) @@ -482,15 +483,10 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie if err != nil { return nil, err } - cd := &chunkDesc{ + cds[i] = &chunkDesc{ chunkFirstTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf)), chunkLastTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), } - if !cd.chunkLastTime.Before(beforeTime) { - // From here on, we have chunkDescs in memory already. - break - } - cds = append(cds, cd) } chunkDescOps.WithLabelValues(load).Add(float64(len(cds))) numMemChunkDescs.Add(float64(len(cds))) diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 34bb672b39..0dcd7997ed 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -122,7 +122,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { } } // Load all chunk descs. - actualChunkDescs, err := p.loadChunkDescs(fp, 10) + actualChunkDescs, err := p.loadChunkDescs(fp, 0) if len(actualChunkDescs) != 10 { t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10) } @@ -974,7 +974,7 @@ func BenchmarkLoadChunkDescs(b *testing.B) { for i := 0; i < b.N; i++ { for _, s := range fpStrings { fp.LoadFromString(s) - cds, err := p.loadChunkDescs(fp, clientmodel.Latest) + cds, err := p.loadChunkDescs(fp, 0) if err != nil { b.Error(err) } diff --git a/storage/local/series.go b/storage/local/series.go index 07388a2e5f..87a193eef2 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -384,7 +384,7 @@ func (s *memorySeries) preloadChunksForRange( firstChunkDescTime = s.chunkDescs[0].firstTime() } if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { - cds, err := mss.loadChunkDescs(fp, firstChunkDescTime) + cds, err := mss.loadChunkDescs(fp, s.persistWatermark) if err != nil { return nil, err } diff --git a/storage/local/storage.go b/storage/local/storage.go index 7e0ef06c31..fa2461275a 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -589,7 +589,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl // end up with a series without any chunkDescs for a // while (which is confusing as it makes the series // appear as archived or purged). - cds, err = s.loadChunkDescs(fp, clientmodel.Latest) + cds, err = s.loadChunkDescs(fp, 0) if err != nil { log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err) } @@ -1107,8 +1107,8 @@ func (s *memorySeriesStorage) loadChunks(fp clientmodel.Fingerprint, indexes []i } // See persistence.loadChunkDescs for detailed explanation. -func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) { - return s.persistence.loadChunkDescs(fp, beforeTime) +func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) { + return s.persistence.loadChunkDescs(fp, offsetFromEnd) } // getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way.