diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index f51e54e7b..17626c07c 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -140,7 +140,13 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint } } - p.setDirty(false, nil) + p.dirtyMtx.Lock() + // Only declare storage clean if it didn't become dirty during crash recovery. + if !p.becameDirty { + p.dirty = false + } + p.dirtyMtx.Unlock() + log.Warn("Crash recovery complete.") return nil } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index a89307bc3..68bc612f0 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -312,49 +312,44 @@ func (p *persistence) isDirty() bool { return p.dirty } -// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was -// set to true with this method, it cannot be set to false again. (If we became -// dirty during our runtime, there is no way back. If we were dirty from the -// start, a clean-up might make us clean again.) The provided error will be -// logged as a reason if dirty is true. -func (p *persistence) setDirty(dirty bool, err error) { - if dirty { - p.dirtyCounter.Inc() - } +// setDirty flags the storage as dirty in a goroutine-safe way. The provided +// error will be logged as a reason the first time the storage is flagged as dirty. +func (p *persistence) setDirty(err error) { + p.dirtyCounter.Inc() p.dirtyMtx.Lock() defer p.dirtyMtx.Unlock() if p.becameDirty { return } - p.dirty = dirty - if dirty { - p.becameDirty = true - log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") - } + p.dirty = true + p.becameDirty = true + log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") } // fingerprintsForLabelPair returns the fingerprints for the given label // pair. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) (model.Fingerprints, error) { +func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints { fps, _, err := p.labelPairToFingerprints.Lookup(lp) if err != nil { - return nil, err + p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err)) + return nil } - return fps, nil + return fps } // labelValuesForLabelName returns the label values for the given label // name. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) { +func (p *persistence) labelValuesForLabelName(ln model.LabelName) model.LabelValues { lvs, _, err := p.labelNameToLabelValues.Lookup(ln) if err != nil { - return nil, err + p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err)) + return nil } - return lvs, nil + return lvs } // persistChunks persists a number of consecutive chunks of a series. It is the @@ -1008,29 +1003,28 @@ func (p *persistence) waitForIndexing() { // the metric. The caller must have locked the fingerprint. func (p *persistence) archiveMetric( fp model.Fingerprint, m model.Metric, first, last model.Time, -) error { +) { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { - p.setDirty(true, err) - return err + p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s", fp, err)) + return } if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { - p.setDirty(true, err) - return err + p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s", fp, err)) } - return nil } // hasArchivedMetric returns whether the archived metric for the given // fingerprint exists and if yes, what the first and last timestamp in the // corresponding series is. This method is goroutine-safe. func (p *persistence) hasArchivedMetric(fp model.Fingerprint) ( - hasMetric bool, firstTime, lastTime model.Time, err error, + hasMetric bool, firstTime, lastTime model.Time, ) { - firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) + firstTime, lastTime, hasMetric, err := p.archivedFingerprintToTimeRange.Lookup(fp) if err != nil { - p.setDirty(true, err) + p.setDirty(fmt.Errorf("error in method hasArchivedMetric(%v): %s", fp, err)) + hasMetric = false } - return + return hasMetric, firstTime, lastTime } // updateArchivedTimeRange updates an archived time range. The caller must make @@ -1069,9 +1063,10 @@ func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) if err != nil { - p.setDirty(true, err) + p.setDirty(fmt.Errorf("error in method archivedMetric(%v): %s", fp, err)) + return nil, err } - return metric, err + return metric, nil } // purgeArchivedMetric deletes an archived fingerprint and its corresponding @@ -1081,7 +1076,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { defer func() { if err != nil { - p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err)) + p.setDirty(fmt.Errorf("error in method purgeArchivedMetric(%v): %s", fp, err)) } }() diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index e2da77803..692f494d5 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -770,58 +770,46 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { p.indexMetric(2, m2) p.waitForIndexing() - outFPs, err := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) - if err != nil { - t.Fatal(err) - } + outFPs := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) want := model.Fingerprints{1} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) want = model.Fingerprints{2} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - if archived, _, _, err := p.hasArchivedMetric(1); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(1); !archived { t.Error("want FP 1 archived") } - if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(2); !archived { t.Error("want FP 2 archived") } - if err != p.purgeArchivedMetric(1) { + if err := p.purgeArchivedMetric(1); err != nil { t.Fatal(err) } - if err != p.purgeArchivedMetric(3) { + if err := p.purgeArchivedMetric(3); err != nil { // Purging something that has not beet archived is not an error. t.Fatal(err) } p.waitForIndexing() - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) want = nil if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) want = model.Fingerprints{2} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - if archived, _, _, err := p.hasArchivedMetric(1); err != nil || archived { + if archived, _, _ := p.hasArchivedMetric(1); archived { t.Error("want FP 1 not archived") } - if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(2); !archived { t.Error("want FP 2 archived") } } @@ -983,9 +971,7 @@ func testIndexing(t *testing.T, encoding chunkEncoding) { for i, b := range batches { for fp, m := range b.fpToMetric { p.indexMetric(fp, m) - if err := p.archiveMetric(fp, m, 1, 2); err != nil { - t.Fatal(err) - } + p.archiveMetric(fp, m, 1, 2) indexedFpsToMetrics[fp] = m } verifyIndexedState(i, t, b, indexedFpsToMetrics, p) @@ -1029,10 +1015,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet } // Check that archived metrics are in membership index. - has, first, last, err := p.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + has, first, last := p.hasArchivedMetric(fp) if !has { t.Errorf("%d. fingerprint %v not found", i, fp) } @@ -1046,10 +1029,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label name -> label values mappings. for ln, lvs := range b.expectedLnToLvs { - outLvs, err := p.labelValuesForLabelName(ln) - if err != nil { - t.Fatal(err) - } + outLvs := p.labelValuesForLabelName(ln) outSet := codable.LabelValueSet{} for _, lv := range outLvs { @@ -1063,10 +1043,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label pair -> fingerprints mappings. for lp, fps := range b.expectedLpToFps { - outFPs, err := p.fingerprintsForLabelPair(lp) - if err != nil { - t.Fatal(err) - } + outFPs := p.fingerprintsForLabelPair(lp) outSet := codable.FingerprintSet{} for _, fp := range outFPs { diff --git a/storage/local/storage.go b/storage/local/storage.go index bc380bd6b..a24387831 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -425,10 +425,7 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair var result map[model.Fingerprint]struct{} for _, pair := range pairs { intersection := map[model.Fingerprint]struct{}{} - fps, err := s.persistence.fingerprintsForLabelPair(pair) - if err != nil { - log.Error("Error getting fingerprints for label pair: ", err) - } + fps := s.persistence.fingerprintsForLabelPair(pair) if len(fps) == 0 { return nil } @@ -547,19 +544,14 @@ func (s *memorySeriesStorage) metricForFingerprint( } if from.After(model.Earliest) || through.Before(model.Latest) { // The range lookup is relatively cheap, so let's do it first. - ok, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived time range for fingerprint %v: %v", fp, err) - return metric.Metric{}, false - } + ok, first, last := s.persistence.hasArchivedMetric(fp) if !ok || first.After(through) || last.Before(from) { return metric.Metric{}, false } } - met, err := s.persistence.archivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) + met, _ := s.persistence.archivedMetric(fp) // Ignoring error, there is nothing we can do. + if met == nil { return metric.Metric{}, false } @@ -571,11 +563,7 @@ func (s *memorySeriesStorage) metricForFingerprint( // LabelValuesForLabelName implements Storage. func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { - lvs, err := s.persistence.labelValuesForLabelName(labelName) - if err != nil { - log.Errorf("Error getting label values for label name %q: %v", labelName, err) - } - return lvs + return s.persistence.labelValuesForLabelName(labelName) } // DropMetric implements Storage. @@ -603,7 +591,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { s.fpLocker.Unlock(fp) }() // Func wrapper because fp might change below. if err != nil { - s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) + s.persistence.setDirty(fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) return err } if fp != rawFP { @@ -745,11 +733,7 @@ func (s *memorySeriesStorage) getSeriesForRange( if ok { return series } - has, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") - return nil - } + has, first, last := s.persistence.hasArchivedMetric(fp) if !has { s.invalidPreloadRequestsCount.Inc() return nil @@ -759,7 +743,7 @@ func (s *memorySeriesStorage) getSeriesForRange( } metric, err := s.persistence.archivedMetric(fp) if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + // Error already logged, storage declared dirty by archivedMetric. return nil } series, err = s.getOrCreateSeries(fp, metric) @@ -1152,12 +1136,7 @@ func (s *memorySeriesStorage) maintainMemorySeries( if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { s.fpToSeries.del(fp) s.numSeries.Dec() - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.lastTime, - ); err != nil { - log.Errorf("Error archiving metric %v: %v", series.metric, err) - return - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) s.seriesOps.WithLabelValues(archive).Inc() oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) if oldWatermark < int64(series.lastTime) { @@ -1278,11 +1257,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.Error("Error looking up archived time range: ", err) - return - } + has, firstTime, lastTime := s.persistence.hasArchivedMetric(fp) if !has || !firstTime.Before(beforeTime) { // Oldest sample not old enough, or metric purged or unarchived in the meantime. return @@ -1295,10 +1270,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor log.Error("Error dropping persisted chunks: ", err) } if allDropped { - if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err) - return - } + s.persistence.purgeArchivedMetric(fp) // Ignoring error. Nothing we can do. s.seriesOps.WithLabelValues(archivePurge).Inc() return } @@ -1487,13 +1459,7 @@ func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, s.incNumChunksToPersist(-numChunksNotYetPersisted) } else { - if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log. - With("fingerprint", fp). - With("metric", m). - With("error", err). - Error("Error purging metric from archive.") - } + s.persistence.purgeArchivedMetric(fp) // Ignoring error. There is nothing we can do. } if m != nil { // If we know a metric now, unindex it in any case. diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 01f30c28b..291b32918 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -471,11 +471,7 @@ func TestDropMetrics(t *testing.T) { s.maintainMemorySeries(fpToBeArchived, 0) s.fpLocker.Lock(fpToBeArchived) s.fpToSeries.del(fpToBeArchived) - if err := s.persistence.archiveMetric( - fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), - ); err != nil { - t.Error(err) - } + s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond)) s.fpLocker.Unlock(fpToBeArchived) fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) @@ -582,11 +578,7 @@ func TestQuarantineMetric(t *testing.T) { s.maintainMemorySeries(fpToBeArchived, 0) s.fpLocker.Lock(fpToBeArchived) s.fpToSeries.del(fpToBeArchived) - if err := s.persistence.archiveMetric( - fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), - ); err != nil { - t.Error(err) - } + s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond)) s.fpLocker.Unlock(fpToBeArchived) // Corrupt the series file for m3. @@ -1144,36 +1136,22 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), lastTime, - ); err != nil { - t.Fatal(err) - } - - archived, _, _, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + archived, _, _ := s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") } // Drop ~half of the chunks of an archived series. s.maintainArchivedSeries(fp, 10000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("archived series purged although only half of the chunks dropped") } // Drop everything. s.maintainArchivedSeries(fp, 100000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if archived { t.Fatal("archived series not dropped") } @@ -1199,16 +1177,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), lastTime, - ); err != nil { - t.Fatal(err) - } - - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") } @@ -1220,10 +1190,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if !ok { t.Fatal("could not find series") } - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if archived { t.Fatal("archived") } @@ -1231,10 +1198,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // This will archive again, but must not drop it completely, despite the // memorySeries being empty. s.maintainMemorySeries(fp, 10000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("series purged completely") }