diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index 16950ff300..78e2e0aed3 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -42,6 +42,12 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge count := 0 seriesDirNameFmt := fmt.Sprintf("%%0%dx", seriesDirNameLen) + // Delete the fingerprint mapping file as it might be stale or + // corrupt. We'll rebuild the mappings as we go. + os.Remove(p.mappingsFileName()) + // The mappings to rebuild. + fpm := fpMappings{} + glog.Info("Scanning files.") for i := 0; i < 1<<(seriesDirNameLen*4); i++ { dirname := path.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i)) @@ -58,7 +64,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge return err } for _, fi := range fis { - fp, ok := p.sanitizeSeries(dirname, fi, fingerprintToSeries) + fp, ok := p.sanitizeSeries(dirname, fi, fingerprintToSeries, fpm) if ok { fpsSeen[fp] = struct{}{} } @@ -75,7 +81,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge for fp, s := range fingerprintToSeries { if _, seen := fpsSeen[fp]; !seen { // fp exists in fingerprintToSeries, but has no representation on disk. - if s.headChunkClosed { + if s.persistWatermark == len(s.chunkDescs) { // Oops, everything including the head chunk was // already persisted, but nothing on disk. // Thus, we lost that series completely. Clean @@ -112,17 +118,24 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge s.persistWatermark = 0 s.chunkDescsOffset = 0 } + maybeAddMapping(fp, s.metric, fpm) fpsSeen[fp] = struct{}{} // Add so that fpsSeen is complete. } } glog.Info("Check for series without series file complete.") - if err := p.cleanUpArchiveIndexes(fingerprintToSeries, fpsSeen); err != nil { + if err := p.cleanUpArchiveIndexes(fingerprintToSeries, fpsSeen, fpm); err != nil { return err } if err := p.rebuildLabelIndexes(fingerprintToSeries); err != nil { return err } + // Finally rewrite the mappings file if there are any mappings. + if len(fpm) > 0 { + if err := p.checkpointFPMappings(fpm); err != nil { + return err + } + } p.setDirty(false) glog.Warning("Crash recovery complete.") @@ -156,7 +169,9 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge // is checked for its presence in the index of archived series. If it cannot // be found there, it is moved into the orphaned directory. func (p *persistence) sanitizeSeries( - dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries, + dirname string, fi os.FileInfo, + fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries, + fpm fpMappings, ) (clientmodel.Fingerprint, bool) { filename := path.Join(dirname, fi.Name()) purge := func() { @@ -222,6 +237,7 @@ func (p *persistence) sanitizeSeries( if s == nil { panic("fingerprint mapped to nil pointer") } + maybeAddMapping(fp, s.metric, fpm) if !p.pedanticChecks && bytesToTrim == 0 && s.chunkDescsOffset != -1 && @@ -320,12 +336,14 @@ func (p *persistence) sanitizeSeries( return fp, false } // This series looks like a properly archived one. + maybeAddMapping(fp, metric, fpm) return fp, true } func (p *persistence) cleanUpArchiveIndexes( fpToSeries map[clientmodel.Fingerprint]*memorySeries, fpsSeen map[clientmodel.Fingerprint]struct{}, + fpm fpMappings, ) error { glog.Info("Cleaning up archive indexes.") var fp codable.Fingerprint @@ -359,7 +377,12 @@ func (p *persistence) cleanUpArchiveIndexes( _, err := p.archivedFingerprintToTimeRange.Delete(fp) return err } - // fp is legitimately archived. Make sure it is in timerange index, too. + // fp is legitimately archived. Now we need the metric to check for a mapped fingerprint. + if err := kv.Value(&m); err != nil { + return err + } + maybeAddMapping(clientmodel.Fingerprint(fp), clientmodel.Metric(m), fpm) + // Make sure it is in timerange index, too. has, err := p.archivedFingerprintToTimeRange.Has(fp) if err != nil { return err @@ -372,9 +395,6 @@ func (p *persistence) cleanUpArchiveIndexes( if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil { return err } - if err := kv.Value(&m); err != nil { - return err - } series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest) cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now()) if err != nil { @@ -455,3 +475,20 @@ func (p *persistence) rebuildLabelIndexes( glog.Info("All requests for rebuilding the label indexes queued. (Actual processing may lag behind.)") return nil } + +// maybeAddMapping adds a fingerprint mapping to fpm if the FastFingerprint of m is different from fp. +func maybeAddMapping(fp clientmodel.Fingerprint, m clientmodel.Metric, fpm fpMappings) { + if rawFP := m.FastFingerprint(); rawFP != fp { + glog.Warningf( + "Metric %v with fingerprint %v is mapped from raw fingerprint %v.", + m, fp, rawFP, + ) + if mappedFPs, ok := fpm[rawFP]; ok { + mappedFPs[metricToUniqueString(m)] = fp + } else { + fpm[rawFP] = map[string]clientmodel.Fingerprint{ + metricToUniqueString(m): fp, + } + } + } +} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index f26ac76fbc..7b2a34cecd 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -1418,7 +1418,7 @@ loop: // (4.3.1) The uvarint-encoded length of the unique metric string. // (4.3.2) The unique metric string. // (4.3.3) The mapped fingerprint as big-endian uint64. -func (p *persistence) checkpointFPMappings(c fpMappings) (err error) { +func (p *persistence) checkpointFPMappings(fpm fpMappings) (err error) { glog.Info("Checkpointing fingerprint mappings...") begin := time.Now() f, err := os.OpenFile(p.mappingsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) @@ -1449,11 +1449,11 @@ func (p *persistence) checkpointFPMappings(c fpMappings) (err error) { if _, err = codable.EncodeUvarint(w, mappingsFormatVersion); err != nil { return } - if _, err = codable.EncodeUvarint(w, uint64(len(c))); err != nil { + if _, err = codable.EncodeUvarint(w, uint64(len(fpm))); err != nil { return } - for fp, mappings := range c { + for fp, mappings := range fpm { if err = codable.EncodeUint64(w, uint64(fp)); err != nil { return }