diff --git a/main.go b/main.go index a455a37aa9..17c32bedc4 100644 --- a/main.go +++ b/main.go @@ -60,6 +60,8 @@ var ( checkpointInterval = flag.Duration("storage.checkpointInterval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.") + storageDirty = flag.Bool("storage.dirty", false, "If set, the storage layer will perform crash recovery even if the last shutdown appears to be clean.") + notificationQueueCapacity = flag.Int("alertmanager.notificationQueueCapacity", 100, "The size of the queue for pending alert manager notifications.") printVersion = flag.Bool("version", false, "print version information") @@ -107,6 +109,7 @@ func NewPrometheus() *prometheus { PersistencePurgeInterval: *storagePurgeInterval, PersistenceRetentionPeriod: *storageRetentionPeriod, CheckpointInterval: *checkpointInterval, + Dirty: *storageDirty, } memStorage, err := local.NewMemorySeriesStorage(o) if err != nil { diff --git a/storage/local/index/index.go b/storage/local/index/index.go index fe272fbe70..2aa55e89d4 100644 --- a/storage/local/index/index.go +++ b/storage/local/index/index.go @@ -18,6 +18,7 @@ package index import ( "flag" + "os" "path" clientmodel "github.com/prometheus/client_golang/model" @@ -174,6 +175,12 @@ func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, }, nil } +// DeleteLabelNameLabelValuesIndex deletes the LevelDB-backed +// LabelNameLabelValuesIndex. Use only for a not yet opened index. +func DeleteLabelNameLabelValuesIndex(basePath string) error { + return os.RemoveAll(path.Join(basePath, labelNameToLabelValuesDir)) +} + // LabelPairFingerprintsMapping is an in-memory map of label pairs to // fingerprints. type LabelPairFingerprintsMapping map[metric.LabelPair]codable.FingerprintSet @@ -242,6 +249,12 @@ func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, }, nil } +// DeleteLabelPairFingerprintIndex deletes the LevelDB-backed +// LabelPairFingerprintIndex. Use only for a not yet opened index. +func DeleteLabelPairFingerprintIndex(basePath string) error { + return os.RemoveAll(path.Join(basePath, labelPairToFingerprintsDir)) +} + // FingerprintTimeRangeIndex models a database tracking the time ranges // of metrics by their fingerprints. type FingerprintTimeRangeIndex struct { @@ -259,13 +272,6 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTim return tr.First, tr.Last, ok, err } -// Has returns true if the given fingerprint is present. -// -// This method is goroutine-safe. -func (i *FingerprintTimeRangeIndex) Has(fp clientmodel.Fingerprint) (ok bool, err error) { - return i.KeyValueStore.Has(codable.Fingerprint(fp)) -} - // NewFingerprintTimeRangeIndex returns a LevelDB-backed // FingerprintTimeRangeIndex ready to use. func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) { diff --git a/storage/local/persistence.go b/storage/local/persistence.go index e9d116db2b..525a2b729c 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -18,8 +18,10 @@ import ( "encoding/binary" "fmt" "io" + "math" "os" "path" + "strings" "sync" "sync/atomic" "time" @@ -52,7 +54,7 @@ const ( indexingMaxBatchSize = 1024 * 1024 indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. - indexingQueueCapacity = 1024 + indexingQueueCapacity = 1024 * 16 ) const ( @@ -103,14 +105,17 @@ type persistence struct { indexingBatchSizes prometheus.Summary indexingBatchLatency prometheus.Summary checkpointDuration prometheus.Gauge + + dirtyMtx sync.Mutex // Protects dirty and becameDirty. + + dirty, becameDirty bool } // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. -func newPersistence(basePath string, chunkLen int) (*persistence, error) { +func newPersistence(basePath string, chunkLen int, dirty bool) (*persistence, error) { if err := os.MkdirAll(basePath, 0700); err != nil { return nil, err } - var err error archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath) if err != nil { return nil, err @@ -119,14 +124,6 @@ func newPersistence(basePath string, chunkLen int) (*persistence, error) { if err != nil { return nil, err } - labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath) - if err != nil { - return nil, err - } - labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath) - if err != nil { - return nil, err - } p := &persistence{ basePath: basePath, @@ -134,8 +131,6 @@ func newPersistence(basePath string, chunkLen int) (*persistence, error) { archivedFingerprintToMetrics: archivedFingerprintToMetrics, archivedFingerprintToTimeRange: archivedFingerprintToTimeRange, - labelPairToFingerprints: labelPairToFingerprints, - labelNameToLabelValues: labelNameToLabelValues, indexingQueue: make(chan indexingOp, indexingQueueCapacity), indexingStopped: make(chan struct{}), @@ -178,7 +173,36 @@ func newPersistence(basePath string, chunkLen int) (*persistence, error) { Name: "checkpoint_duration_milliseconds", Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.", }), + dirty: dirty, } + if dirtyFile, err := os.OpenFile(p.dirtyFileName(), os.O_CREATE|os.O_EXCL, 0666); err == nil { + dirtyFile.Close() + } else if os.IsExist(err) { + p.dirty = true + } else { + return nil, err + } + + if p.dirty { + // Blow away the label indexes. We'll rebuild them later. + if err := index.DeleteLabelPairFingerprintIndex(basePath); err != nil { + return nil, err + } + if err := index.DeleteLabelNameLabelValuesIndex(basePath); err != nil { + return nil, err + } + } + labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath) + if err != nil { + return nil, err + } + labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath) + if err != nil { + return nil, err + } + p.labelPairToFingerprints = labelPairToFingerprints + p.labelNameToLabelValues = labelNameToLabelValues + go p.processIndexingQueue() return p, nil } @@ -203,6 +227,374 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) { ch <- p.checkpointDuration } +// dirtyFileName returns the name of the (empty) file used to mark the +// persistency layer as dirty. +func (p *persistence) dirtyFileName() string { + return path.Join(p.basePath, "DIRTY") +} + +// isDirty returns the dirty flag in a goroutine-safe way. +func (p *persistence) isDirty() bool { + p.dirtyMtx.Lock() + defer p.dirtyMtx.Unlock() + return p.dirty +} + +// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was +// set to true with this method, it cannot be set to false again. (If we became +// dirty during our runtime, there is no way back. If we were dirty from the +// start, a clean-up might make us clean again.) +func (p *persistence) setDirty(dirty bool) { + p.dirtyMtx.Lock() + defer p.dirtyMtx.Unlock() + if p.becameDirty { + return + } + p.dirty = dirty + if dirty { + p.becameDirty = true + glog.Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") + } +} + +// crashRecovery is called by loadSeriesMapAndHeads if the persistence appears +// to be dirty after the loading (either because the loading resulted in an +// error or because the persistence was dirty from the start). +func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) error { + glog.Warning("Starting crash recovery. Prometheus is inoperational until complete.") + + fpsSeen := map[clientmodel.Fingerprint]struct{}{} + count := 0 + + glog.Info("Scanning files.") + for i := 0; i < 256; i++ { + dirname := path.Join(p.basePath, fmt.Sprintf("%02x", i)) + dir, err := os.Open(dirname) + if os.IsNotExist(err) { + continue + } + if err != nil { + return err + } + defer dir.Close() + var fis []os.FileInfo + for ; err != io.EOF; fis, err = dir.Readdir(1024) { + if err != nil { + return err + } + for _, fi := range fis { + fp, ok := p.sanitizeSeries(dirname, fi, fingerprintToSeries) + if ok { + fpsSeen[fp] = struct{}{} + } + count++ + if count%10000 == 0 { + glog.Infof("%d files scanned.", count) + } + } + } + } + glog.Infof("File scan complete. %d fingerprints found.", len(fpsSeen)) + + glog.Info("Checking for series without series file.") + for fp, s := range fingerprintToSeries { + if _, seen := fpsSeen[fp]; !seen { + // fp exists in fingerprintToSeries, but has no representation on disk. + if s.headChunkPersisted { + // Oops, head chunk was persisted, but nothing on disk. + // Thus, we lost that series completely. Clean up the remnants. + delete(fingerprintToSeries, fp) + if err := p.dropArchivedMetric(fp); err != nil { + // Dropping the archived metric didn't work, so try + // to unindex it, just in case it's in the indexes. + p.unindexMetric(fp, s.metric) + } + glog.Warningf("Lost series detected: fingerprint %v, metric %v.", fp, s.metric) + continue + } + // If we are here, the only chunk we have is the head chunk. + // Adjust things accordingly. + if len(s.chunkDescs) > 1 || s.chunkDescsOffset != 0 { + glog.Warningf( + "Lost at least %d chunks for fingerprint %v, metric %v.", + len(s.chunkDescs)+s.chunkDescsOffset-1, fp, s.metric, + // If chunkDescsOffset is -1, this will underreport. Oh well... + ) + s.chunkDescs = s.chunkDescs[len(s.chunkDescs)-1:] + s.chunkDescsOffset = 0 + } + fpsSeen[fp] = struct{}{} // Add so that fpsSeen is complete. + } + } + glog.Info("Check for series without series file complete.") + + if err := p.cleanUpArchiveIndexes(fingerprintToSeries, fpsSeen); err != nil { + return err + } + if err := p.rebuildLabelIndexes(fingerprintToSeries); err != nil { + return err + } + + p.setDirty(false) + glog.Warning("Crash recovery complete.") + return nil +} + +// TODO: Document. +func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) { + filename := path.Join(dirname, fi.Name()) + purge := func() { + glog.Warningf("Deleting lost series file %s.", filename) // TODO: Move to lost+found directory? + os.Remove(filename) + } + + var fp clientmodel.Fingerprint + if len(fi.Name()) != 17 || !strings.HasSuffix(fi.Name(), ".db") { + glog.Warningf("Unexpected series file name %s.", filename) + purge() + return fp, false + } + fp.LoadFromString(path.Base(dirname) + fi.Name()[:14]) // TODO: Panics if that doesn't parse as hex. + + bytesToTrim := fi.Size() % int64(p.chunkLen+chunkHeaderLen) + chunksInFile := int(fi.Size()) / (p.chunkLen + chunkHeaderLen) + if bytesToTrim != 0 { + glog.Warningf( + "Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.", + filename, chunksInFile, bytesToTrim, + ) + f, err := os.OpenFile(filename, os.O_WRONLY, 0640) + if err != nil { + glog.Errorf("Could not open file %s: %s", filename, err) + purge() + return fp, false + } + if err := f.Truncate(fi.Size() - bytesToTrim); err != nil { + glog.Errorf("Failed to truncate file %s: %s", filename, err) + purge() + return fp, false + } + } + if chunksInFile == 0 { + glog.Warningf("No chunks left in file %s.", filename) + purge() + return fp, false + } + + s, ok := fingerprintToSeries[fp] + if ok { + // This series is supposed to not be archived. + if s == nil { + panic("fingerprint mapped to nil pointer") + } + if bytesToTrim == 0 && s.chunkDescsOffset != -1 && + ((s.headChunkPersisted && chunksInFile == s.chunkDescsOffset+len(s.chunkDescs)) || + (!s.headChunkPersisted && chunksInFile == s.chunkDescsOffset+len(s.chunkDescs)-1)) { + // Everything is consistent. We are good. + return fp, true + } + // If we are here, something's fishy. + if s.headChunkPersisted { + // This is the easy case as we don't have a head chunk + // in heads.db. Treat this series as a freshly + // unarchived one. No chunks or chunkDescs in memory, no + // current head chunk. + glog.Warningf( + "Treating recovered metric %v, fingerprint %v, as freshly unarchvied, with %d chunks in series file.", + s.metric, fp, chunksInFile, + ) + s.chunkDescs = nil + s.chunkDescsOffset = -1 + return fp, true + } + // This is the tricky one: We have a head chunk from heads.db, + // but the very same head chunk might already be in the series + // file. Strategy: Check the first time of both. If it is the + // same or newer, assume the latest chunk in the series file + // is the most recent head chunk. If not, keep the head chunk + // we got from heads.db. + // First, assume the head chunk is not yet persisted. + s.chunkDescs = s.chunkDescs[len(s.chunkDescs)-1:] + s.chunkDescsOffset = -1 + // Load all the chunk descs (which assumes we have none from the future). + cds, err := p.loadChunkDescs(fp, clientmodel.Now()) + if err != nil { + glog.Errorf( + "Failed to load chunk descriptors for metric %v, fingerprint %v: %s", + s.metric, fp, err, + ) + purge() + return fp, false + } + if cds[len(cds)-1].firstTime().Before(s.head().firstTime()) { + s.chunkDescs = append(cds, s.chunkDescs...) + glog.Warningf( + "Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered head chunk from checkpoint.", + s.metric, fp, chunksInFile, + ) + } else { + glog.Warningf( + "Recovered metric %v, fingerprint %v: head chunk found among the %d recovered chunks in series file.", + s.metric, fp, chunksInFile, + ) + s.chunkDescs = cds + s.headChunkPersisted = true + } + s.chunkDescsOffset = 0 + return fp, true + } + // This series is supposed to be archived. + metric, err := p.getArchivedMetric(fp) + if err != nil { + glog.Errorf( + "Fingerprint %v assumed archived but couldn't be looked up in archived index: %s", + fp, err, + ) + purge() + return fp, false + } + if metric == nil { + glog.Warningf( + "Fingerprint %v assumed archived but couldn't be found in archived index.", + fp, + ) + purge() + return fp, false + } + // This series looks like a properly archived one. + return fp, true +} + +func (p *persistence) cleanUpArchiveIndexes( + fpToSeries map[clientmodel.Fingerprint]*memorySeries, + fpsSeen map[clientmodel.Fingerprint]struct{}, +) error { + glog.Info("Cleaning up archive indexes.") + var fp codable.Fingerprint + var m codable.Metric + count := 0 + if err := p.archivedFingerprintToMetrics.ForEach(func(kv index.KeyValueAccessor) error { + count++ + if count%10000 == 0 { + glog.Infof("%d archived metrics checked.", count) + } + if err := kv.Key(&fp); err != nil { + return err + } + _, fpSeen := fpsSeen[clientmodel.Fingerprint(fp)] + inMemory := false + if fpSeen { + _, inMemory = fpToSeries[clientmodel.Fingerprint(fp)] + } + if !fpSeen || inMemory { + if inMemory { + glog.Warningf("Archive clean-up: Fingerprint %v is not archived. Purging from archive indexes.", clientmodel.Fingerprint(fp)) + } + if !fpSeen { + glog.Warningf("Archive clean-up: Fingerprint %v is unknown. Purging from archive indexes.", clientmodel.Fingerprint(fp)) + } + if err := p.archivedFingerprintToMetrics.Delete(fp); err != nil { + return err + } + // Delete from timerange index, too. + p.archivedFingerprintToTimeRange.Delete(fp) + // TODO: Ignoring errors here as fp might not be in + // timerange index (which is good) but which would + // return an error. Delete signature could be changed + // like the Get signature to detect a real error. + return nil + } + // fp is legitimately archived. Make sure it is in timerange index, too. + has, err := p.archivedFingerprintToTimeRange.Has(fp) + if err != nil { + return err + } + if has { + return nil // All good. + } + glog.Warningf("Archive clean-up: Fingerprint %v is not in time-range index. Unarchiving it for recovery.") + if err := p.archivedFingerprintToMetrics.Delete(fp); err != nil { + return err + } + if err := kv.Value(&m); err != nil { + return err + } + series := newMemorySeries(clientmodel.Metric(m), false, math.MinInt64) + cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now()) + if err != nil { + return err + } + series.chunkDescs = cds + series.chunkDescsOffset = 0 + fpToSeries[clientmodel.Fingerprint(fp)] = series + return nil + }); err != nil { + return err + } + count = 0 + if err := p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error { + count++ + if count%10000 == 0 { + glog.Infof("%d archived time ranges checked.", count) + } + if err := kv.Key(&fp); err != nil { + return err + } + has, err := p.archivedFingerprintToMetrics.Has(fp) + if err != nil { + return err + } + if has { + return nil // All good. + } + glog.Warningf("Archive clean-up: Purging unknown fingerprint %v in time-range index.", fp) + if err := p.archivedFingerprintToTimeRange.Delete(fp); err != nil { + return err + } + return nil + }); err != nil { + return err + } + glog.Info("Clean-up of archive indexes complete.") + return nil +} + +func (p *persistence) rebuildLabelIndexes( + fpToSeries map[clientmodel.Fingerprint]*memorySeries, +) error { + count := 0 + glog.Info("Rebuilding label indexes.") + glog.Info("Indexing metrics in memory.") + for fp, s := range fpToSeries { + p.indexMetric(fp, s.metric) + count++ + if count%10000 == 0 { + glog.Infof("%d metrics queued for indexing.", count) + } + } + glog.Info("Indexing archived metrics.") + var fp codable.Fingerprint + var m codable.Metric + if err := p.archivedFingerprintToMetrics.ForEach(func(kv index.KeyValueAccessor) error { + if err := kv.Key(&fp); err != nil { + return err + } + if err := kv.Value(&m); err != nil { + return err + } + p.indexMetric(clientmodel.Fingerprint(fp), clientmodel.Metric(m)) + count++ + if count%10000 == 0 { + glog.Infof("%d metrics queued for indexing.", count) + } + return nil + }); err != nil { + return err + } + glog.Info("All requests for rebuilding the label indexes queued. (Actual processing may lag behind.)") + return nil +} + // getFingerprintsForLabelPair returns the fingerprints for the given label // pair. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not yet made it into the index. (Same @@ -492,63 +884,100 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } // loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all -// open (non-full) head chunks. Only call this method during start-up while -// nothing else is running in storage land. This method is utterly -// goroutine-unsafe. -func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { +// open (non-full) head chunks. If recoverable corruption is detected, or if the +// dirty flag was set from the beginning, crash recovery is run, which might +// take a while. If an unrecoverable error is encountered, it is returned. Call +// this method during start-up while nothing else is running in storage +// land. This method is utterly goroutine-unsafe. +func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { var chunksTotal, chunkDescsTotal int64 + fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries) + sm = &seriesMap{m: fingerprintToSeries} + + defer func() { + if sm != nil && p.dirty { + glog.Warning("Persistence layer appears dirty.") + err = p.crashRecovery(fingerprintToSeries) + if err != nil { + sm = nil + } + } + if err == nil { + atomic.AddInt64(&numMemChunks, chunksTotal) + atomic.AddInt64(&numMemChunkDescs, chunkDescsTotal) + } + }() f, err := os.Open(p.headsFileName()) if os.IsNotExist(err) { - return newSeriesMap(), nil + return sm, nil } if err != nil { - return nil, err + glog.Warning("Could not open heads file:", err) + p.dirty = true + return } defer f.Close() r := bufio.NewReaderSize(f, fileBufSize) buf := make([]byte, len(headsMagicString)) if _, err := io.ReadFull(r, buf); err != nil { - return nil, err + glog.Warning("Could not read from heads file:", err) + p.dirty = true + return sm, nil } magic := string(buf) if magic != headsMagicString { - return nil, fmt.Errorf( + glog.Warningf( "unexpected magic string, want %q, got %q", headsMagicString, magic, ) + p.dirty = true + return } if version, err := binary.ReadVarint(r); version != headsFormatVersion || err != nil { - return nil, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion) + glog.Warningf("unknown heads format version, want %d", headsFormatVersion) + p.dirty = true + return sm, nil } numSeries, err := codable.DecodeUint64(r) if err != nil { - return nil, err + glog.Warning("Could not decode number of series:", err) + p.dirty = true + return sm, nil } - fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries, numSeries) for ; numSeries > 0; numSeries-- { seriesFlags, err := r.ReadByte() if err != nil { - return nil, err + glog.Warning("Could not read series flags:", err) + p.dirty = true + return sm, nil } headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0 fp, err := codable.DecodeUint64(r) if err != nil { - return nil, err + glog.Warning("Could not decode fingerprint:", err) + p.dirty = true + return sm, nil } var metric codable.Metric if err := metric.UnmarshalFromReader(r); err != nil { - return nil, err + glog.Warning("Could not decode metric:", err) + p.dirty = true + return sm, nil } chunkDescsOffset, err := binary.ReadVarint(r) if err != nil { - return nil, err + glog.Warning("Could not decode chunk descriptor offset:", err) + p.dirty = true + return sm, nil } numChunkDescs, err := binary.ReadVarint(r) if err != nil { - return nil, err + glog.Warning("Could not decode number of chunk descriptors:", err) + p.dirty = true + return sm, nil } chunkDescs := make([]*chunkDesc, numChunkDescs) chunkDescsTotal += numChunkDescs @@ -557,11 +986,15 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { if headChunkPersisted || i < numChunkDescs-1 { firstTime, err := binary.ReadVarint(r) if err != nil { - return nil, err + glog.Warning("Could not decode first time:", err) + p.dirty = true + return sm, nil } lastTime, err := binary.ReadVarint(r) if err != nil { - return nil, err + glog.Warning("Could not decode last time:", err) + p.dirty = true + return sm, nil } chunkDescs[i] = &chunkDesc{ chunkFirstTime: clientmodel.Timestamp(firstTime), @@ -572,11 +1005,15 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { chunksTotal++ chunkType, err := r.ReadByte() if err != nil { - return nil, err + glog.Warning("Could not decode chunk type:", err) + p.dirty = true + return sm, nil } chunk := chunkForType(chunkType) if err := chunk.unmarshal(r); err != nil { - return nil, err + glog.Warning("Could not decode chunk type:", err) + p.dirty = true + return sm, nil } chunkDescs[i] = newChunkDesc(chunk) } @@ -589,9 +1026,7 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { headChunkPersisted: headChunkPersisted, } } - atomic.AddInt64(&numMemChunks, chunksTotal) - atomic.AddInt64(&numMemChunkDescs, chunkDescsTotal) - return &seriesMap{m: fingerprintToSeries}, nil + return sm, nil } // dropChunks deletes all chunks from a series whose last sample time is before @@ -778,27 +1213,29 @@ func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) error { } // unarchiveMetric deletes an archived fingerprint and its metric, but (in -// contrast to dropArchivedMetric) does not un-index the metric. The method -// returns true if a metric was actually deleted. This method is goroutine-safe. -func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (bool, error) { +// contrast to dropArchivedMetric) does not un-index the metric. If a metric +// was actually deleted, the method returns true and the first time of the +// deleted metric. This method is goroutine-safe. +func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (bool, clientmodel.Timestamp, error) { p.archiveMtx.Lock() defer p.archiveMtx.Unlock() - has, err := p.archivedFingerprintToTimeRange.Has(fp) + firstTime, _, has, err := p.archivedFingerprintToTimeRange.Lookup(fp) if err != nil || !has { - return false, err + return false, firstTime, err } if err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)); err != nil { - return false, err + return false, firstTime, err } if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil { - return false, err + return false, firstTime, err } - return true, nil + return true, firstTime, nil } // close flushes the indexing queue and other buffered data and releases any -// held resources. +// held resources. It also removes the dirty marker file if successful and if +// the persistence is currently not marked as dirty. func (p *persistence) close() error { close(p.indexingQueue) <-p.indexingStopped @@ -820,22 +1257,25 @@ func (p *persistence) close() error { lastError = err glog.Error("Error closing labelNameToLabelValues index DB: ", err) } + if lastError == nil && !p.isDirty() { + lastError = os.Remove(p.dirtyFileName()) + } return lastError } func (p *persistence) dirNameForFingerprint(fp clientmodel.Fingerprint) string { fpStr := fp.String() - return fmt.Sprintf("%s/%c%c", p.basePath, fpStr[0], fpStr[1]) + return path.Join(p.basePath, fpStr[0:2]) } func (p *persistence) fileNameForFingerprint(fp clientmodel.Fingerprint) string { fpStr := fp.String() - return fmt.Sprintf("%s/%c%c/%s%s", p.basePath, fpStr[0], fpStr[1], fpStr[2:], seriesFileSuffix) + return path.Join(p.basePath, fpStr[0:2], fpStr[2:]+seriesFileSuffix) } func (p *persistence) tempFileNameForFingerprint(fp clientmodel.Fingerprint) string { fpStr := fp.String() - return fmt.Sprintf("%s/%c%c/%s%s", p.basePath, fpStr[0], fpStr[1], fpStr[2:], seriesTempFileSuffix) + return path.Join(p.basePath, fpStr[0:2], fpStr[2:]+seriesTempFileSuffix) } func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.File, error) { diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 262c35bd81..0f9e245485 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -33,7 +33,7 @@ var ( func newTestPersistence(t *testing.T) (*persistence, test.Closer) { dir := test.NewTemporaryDirectory("test_persistence", t) - p, err := newPersistence(dir.Path(), 1024) + p, err := newPersistence(dir.Path(), 1024, false) if err != nil { dir.Close() t.Fatal(err) @@ -183,9 +183,9 @@ func TestCheckpointAndLoadSeriesMapAndHeads(t *testing.T) { fpLocker := newFingerprintLocker(10) sm := newSeriesMap() - s1 := newMemorySeries(m1, true) - s2 := newMemorySeries(m2, false) - s3 := newMemorySeries(m3, false) + s1 := newMemorySeries(m1, true, 0) + s2 := newMemorySeries(m2, false, 0) + s3 := newMemorySeries(m3, false, 0) s1.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 1, Value: 3.14}) s3.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkPersisted = true @@ -269,14 +269,17 @@ func TestGetFingerprintsModifiedBefore(t *testing.T) { } } - unarchived, err := p.unarchiveMetric(1) + unarchived, firstTime, err := p.unarchiveMetric(1) if err != nil { t.Fatal(err) } if !unarchived { t.Fatal("expected actual unarchival") } - unarchived, err = p.unarchiveMetric(1) + if firstTime != 2 { + t.Errorf("expected first time 2, got %v", firstTime) + } + unarchived, firstTime, err = p.unarchiveMetric(1) if err != nil { t.Fatal(err) } @@ -535,13 +538,16 @@ func TestIndexing(t *testing.T) { verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p) for fp, m := range b.fpToMetric { p.unindexMetric(fp, m) - unarchived, err := p.unarchiveMetric(fp) + unarchived, firstTime, err := p.unarchiveMetric(fp) if err != nil { t.Fatal(err) } if !unarchived { t.Errorf("%d. metric not unarchived", i) } + if firstTime != 1 { + t.Errorf("%d. expected firstTime=1, got %v", i, firstTime) + } delete(indexedFpsToMetrics, fp) } } diff --git a/storage/local/series.go b/storage/local/series.go index 4edcb24bc2..886914b9e5 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -144,8 +144,13 @@ type memorySeries struct { // special case: There are chunks on disk, but the offset to the // chunkDescs in memory is unknown. Also, there is no overlap between // chunks on disk and chunks in memory (implying that upon first - // persiting of a chunk in memory, the offset has to be set). + // persisting of a chunk in memory, the offset has to be set). chunkDescsOffset int + // The savedFirstTime field is used as a fallback when the + // chunkDescsOffset is not 0. It can be used to save the firstTime of the + // first chunk before its chunk desc is evicted. In doubt, this field is + // just set to the oldest possible timestamp. + savedFirstTime clientmodel.Timestamp // Whether the current head chunk has already been scheduled to be // persisted. If true, the current head chunk must not be modified // anymore. @@ -159,11 +164,17 @@ type memorySeries struct { // newMemorySeries returns a pointer to a newly allocated memorySeries for the // given metric. reallyNew defines if the memorySeries is a genuinely new series // or (if false) a series for a metric being unarchived, i.e. a series that -// existed before but has been evicted from memory. -func newMemorySeries(m clientmodel.Metric, reallyNew bool) *memorySeries { +// existed before but has been evicted from memory. If reallyNew is false, +// firstTime is ignored (and set to the lowest possible timestamp instead - it +// will be set properly upon the first eviction of chunkDescs). +func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel.Timestamp) *memorySeries { + if reallyNew { + firstTime = math.MinInt64 + } s := memorySeries{ metric: m, headChunkPersisted: !reallyNew, + savedFirstTime: firstTime, } if !reallyNew { s.chunkDescsOffset = -1 @@ -252,6 +263,7 @@ func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool, if iOldestNotEvicted != -1 { lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) if lenToKeep < len(s.chunkDescs) { + s.savedFirstTime = s.firstTime() lenEvicted := len(s.chunkDescs) - lenToKeep s.chunkDescsOffset += lenEvicted chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted)) @@ -470,7 +482,10 @@ func (s *memorySeries) values() metric.Values { // firstTime returns the timestamp of the first sample in the series. The caller // must have locked the fingerprint of the memorySeries. func (s *memorySeries) firstTime() clientmodel.Timestamp { - return s.chunkDescs[0].firstTime() + if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 { + return s.chunkDescs[0].firstTime() + } + return s.savedFirstTime } // lastTime returns the timestamp of the last sample in the series. The caller diff --git a/storage/local/storage.go b/storage/local/storage.go index 15b8b71755..fcb5a4632b 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -15,7 +15,6 @@ package local import ( - "fmt" "sync/atomic" "time" @@ -27,7 +26,10 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -const persistQueueCap = 1024 +const ( + persistQueueCap = 1024 + chunkLen = 1024 +) type storageState uint @@ -61,6 +63,7 @@ type memorySeriesStorage struct { numSeries prometheus.Gauge seriesOps *prometheus.CounterVec ingestedSamplesCount prometheus.Counter + invalidPreloadRequestsCount prometheus.Counter purgeDuration, evictDuration prometheus.Gauge } @@ -74,12 +77,13 @@ type MemorySeriesStorageOptions struct { PersistencePurgeInterval time.Duration // How often to check for purging. PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. + Dirty bool // Force the storage to consider itself dirty on startup. } // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // has to be called to start the storage. func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { - p, err := newPersistence(o.PersistenceStoragePath, 1024) + p, err := newPersistence(o.PersistenceStoragePath, chunkLen, o.Dirty) if err != nil { return nil, err } @@ -150,6 +154,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { Name: "ingested_samples_total", Help: "The total number of samples ingested.", }), + invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "invalid_preload_requests_total", + Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", + }), purgeDuration: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -346,9 +356,10 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries { series, ok := s.fpToSeries.get(fp) if !ok { - unarchived, err := s.persistence.unarchiveMetric(fp) + unarchived, firstTime, err := s.persistence.unarchiveMetric(fp) if err != nil { glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err) + s.persistence.setDirty(true) } if unarchived { s.seriesOps.WithLabelValues(unarchive).Inc() @@ -357,7 +368,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl s.persistence.indexMetric(fp, m) s.seriesOps.WithLabelValues(create).Inc() } - series = newMemorySeries(m, !unarchived) + series = newMemorySeries(m, !unarchived, firstTime) s.fpToSeries.put(fp, series) s.numSeries.Inc() } @@ -389,7 +400,8 @@ func (s *memorySeriesStorage) preloadChunksForRange( return nil, err } if !has { - return nil, fmt.Errorf("requested preload for non-existent series %v", fp) + s.invalidPreloadRequestsCount.Inc() + return nil, nil } if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) { metric, err := s.persistence.getArchivedMetric(fp) @@ -421,8 +433,7 @@ func (s *memorySeriesStorage) handlePersistQueue() { if err != nil { s.persistErrors.WithLabelValues(err.Error()).Inc() glog.Error("Error persisting chunk: ", err) - glog.Error("The storage is now inconsistent. Prepare for disaster.") - // TODO: Remove respective chunkDesc to at least be consistent? + s.persistence.setDirty(true) continue } req.chunkDesc.unpin() @@ -474,6 +485,7 @@ func (s *memorySeriesStorage) loop() { m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(), ); err != nil { glog.Errorf("Error archiving metric %v: %v", m.series.metric, err) + s.persistence.setDirty(true) } else { s.seriesOps.WithLabelValues(archive).Inc() } @@ -535,6 +547,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime numDropped, allDropped, err := s.persistence.dropChunks(fp, beforeTime) if err != nil { glog.Error("Error purging persisted chunks: ", err) + s.persistence.setDirty(true) } // Purge chunks from memory accordingly. @@ -563,6 +576,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime if allDropped { if err := s.persistence.dropArchivedMetric(fp); err != nil { glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err) + s.persistence.setDirty(true) } else { s.seriesOps.WithLabelValues(archivePurge).Inc() } @@ -591,6 +605,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { ch <- s.numSeries.Desc() s.seriesOps.Describe(ch) ch <- s.ingestedSamplesCount.Desc() + ch <- s.invalidPreloadRequestsCount.Desc() ch <- s.purgeDuration.Desc() ch <- s.evictDuration.Desc() @@ -610,6 +625,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { ch <- s.numSeries s.seriesOps.Collect(ch) ch <- s.ingestedSamplesCount + ch <- s.invalidPreloadRequestsCount ch <- s.purgeDuration ch <- s.evictDuration