mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-26 13:11:11 -08:00
Fix chunk desc loading.
If all samples in consecutive chunks have the same timestamp, the way we used to load chunks will fail. With this change, the persist watermark is used to load the right amount of chunkDescs from disk. This bug is a possible reason for the rare storage corruption we have observed.
This commit is contained in:
parent
4203849c92
commit
699946bf32
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/prometheus/prometheus/storage/metric"
|
||||
)
|
||||
|
||||
// The DefaultChunkEncoding can be changed via a flag.
|
||||
var DefaultChunkEncoding = doubleDelta
|
||||
|
||||
type chunkEncoding byte
|
||||
|
|
|
@ -254,7 +254,7 @@ func (p *persistence) sanitizeSeries(
|
|||
// disk. Treat this series as a freshly unarchived one
|
||||
// by loading the chunkDescs and setting all parameters
|
||||
// based on the loaded chunkDescs.
|
||||
cds, err := p.loadChunkDescs(fp, clientmodel.Latest)
|
||||
cds, err := p.loadChunkDescs(fp, 0)
|
||||
if err != nil {
|
||||
log.Errorf(
|
||||
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s",
|
||||
|
@ -286,8 +286,7 @@ func (p *persistence) sanitizeSeries(
|
|||
// First, throw away the chunkDescs without chunks.
|
||||
s.chunkDescs = s.chunkDescs[s.persistWatermark:]
|
||||
numMemChunkDescs.Sub(float64(s.persistWatermark))
|
||||
// Load all the chunk descs.
|
||||
cds, err := p.loadChunkDescs(fp, clientmodel.Latest)
|
||||
cds, err := p.loadChunkDescs(fp, 0)
|
||||
if err != nil {
|
||||
log.Errorf(
|
||||
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s",
|
||||
|
@ -407,7 +406,7 @@ func (p *persistence) cleanUpArchiveIndexes(
|
|||
if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil {
|
||||
return err
|
||||
}
|
||||
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Latest)
|
||||
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ const (
|
|||
labelPairToFingerprintsDir = "labelpair_to_fingerprints"
|
||||
)
|
||||
|
||||
// LevelDB cache sizes, changeable via flags.
|
||||
var (
|
||||
FingerprintMetricCacheSize = 10 * 1024 * 1024
|
||||
FingerprintTimeRangeCacheSize = 5 * 1024 * 1024
|
||||
|
|
|
@ -444,10 +444,11 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde
|
|||
return chunks, nil
|
||||
}
|
||||
|
||||
// loadChunkDescs loads chunkDescs for a series up until a given time. It is
|
||||
// the caller's responsibility to not persist or drop anything for the same
|
||||
// loadChunkDescs loads the chunkDescs for a series from disk. offsetFromEnd is
|
||||
// the number of chunkDescs to skip from the end of the series file. It is the
|
||||
// caller's responsibility to not persist or drop anything for the same
|
||||
// fingerprint concurrently.
|
||||
func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) {
|
||||
func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) {
|
||||
f, err := p.openChunkFileForReading(fp)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
|
@ -469,8 +470,8 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
|
|||
)
|
||||
}
|
||||
|
||||
numChunks := int(fi.Size()) / chunkLenWithHeader
|
||||
cds := make([]*chunkDesc, 0, numChunks)
|
||||
numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd
|
||||
cds := make([]*chunkDesc, numChunks)
|
||||
chunkTimesBuf := make([]byte, 16)
|
||||
for i := 0; i < numChunks; i++ {
|
||||
_, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET)
|
||||
|
@ -482,15 +483,10 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cd := &chunkDesc{
|
||||
cds[i] = &chunkDesc{
|
||||
chunkFirstTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf)),
|
||||
chunkLastTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
|
||||
}
|
||||
if !cd.chunkLastTime.Before(beforeTime) {
|
||||
// From here on, we have chunkDescs in memory already.
|
||||
break
|
||||
}
|
||||
cds = append(cds, cd)
|
||||
}
|
||||
chunkDescOps.WithLabelValues(load).Add(float64(len(cds)))
|
||||
numMemChunkDescs.Add(float64(len(cds)))
|
||||
|
|
|
@ -122,7 +122,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
|
|||
}
|
||||
}
|
||||
// Load all chunk descs.
|
||||
actualChunkDescs, err := p.loadChunkDescs(fp, 10)
|
||||
actualChunkDescs, err := p.loadChunkDescs(fp, 0)
|
||||
if len(actualChunkDescs) != 10 {
|
||||
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10)
|
||||
}
|
||||
|
@ -974,7 +974,7 @@ func BenchmarkLoadChunkDescs(b *testing.B) {
|
|||
for i := 0; i < b.N; i++ {
|
||||
for _, s := range fpStrings {
|
||||
fp.LoadFromString(s)
|
||||
cds, err := p.loadChunkDescs(fp, clientmodel.Latest)
|
||||
cds, err := p.loadChunkDescs(fp, 0)
|
||||
if err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
|
|
|
@ -384,7 +384,7 @@ func (s *memorySeries) preloadChunksForRange(
|
|||
firstChunkDescTime = s.chunkDescs[0].firstTime()
|
||||
}
|
||||
if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) {
|
||||
cds, err := mss.loadChunkDescs(fp, firstChunkDescTime)
|
||||
cds, err := mss.loadChunkDescs(fp, s.persistWatermark)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -589,7 +589,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl
|
|||
// end up with a series without any chunkDescs for a
|
||||
// while (which is confusing as it makes the series
|
||||
// appear as archived or purged).
|
||||
cds, err = s.loadChunkDescs(fp, clientmodel.Latest)
|
||||
cds, err = s.loadChunkDescs(fp, 0)
|
||||
if err != nil {
|
||||
log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err)
|
||||
}
|
||||
|
@ -1107,8 +1107,8 @@ func (s *memorySeriesStorage) loadChunks(fp clientmodel.Fingerprint, indexes []i
|
|||
}
|
||||
|
||||
// See persistence.loadChunkDescs for detailed explanation.
|
||||
func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) {
|
||||
return s.persistence.loadChunkDescs(fp, beforeTime)
|
||||
func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) {
|
||||
return s.persistence.loadChunkDescs(fp, offsetFromEnd)
|
||||
}
|
||||
|
||||
// getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way.
|
||||
|
|
Loading…
Reference in a new issue