From 836f1db04c9718bc8164e39d4dc98876c143524a Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 9 Mar 2016 00:09:42 +0100 Subject: [PATCH 1/5] Improve MetricsForLabelMatchers WIP: This needs more tests. It now gets a from and through value, which it may opportunistically use to optimize the retrieval. With possible future range indices, this could be used in a very efficient way. This change merely applies some easy checks, which should nevertheless solve the use case of heavy rule evaluations on servers with a lot of series churn. Idea is the following: - Only archive series that are at least as old as the headChunkTimeout (which was already extremely unlikely to happen). - Then maintain a high watermark for the last archival, i.e. no archived series has a sample more recent than that watermark. - Any query that doesn't reach to a time before that watermark doesn't have to touch the archive index at all. (A production server at Soundcloud with the aforementioned series churn and heavy rule evaluations spends 50% of its CPU time in archive index lookups. Since rule evaluations usually only touch very recent values, most of those lookup should disappear with this change.) - Federation with a very broad label matcher will profit from this, too. As a byproduct, the un-needed MetricForFingerprint method was removed from the Storage interface. --- promql/analyzer.go | 10 ++- storage/local/interface.go | 39 +++++++---- storage/local/persistence.go | 3 + storage/local/storage.go | 126 ++++++++++++++++++++++++---------- storage/local/storage_test.go | 10 ++- web/api/v1/api.go | 10 ++- web/federate.go | 22 +++--- 7 files changed, 151 insertions(+), 69 deletions(-) diff --git a/promql/analyzer.go b/promql/analyzer.go index bad5fbd92..7243d31db 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -79,7 +79,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error { Inspect(a.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: - n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...) + n.metrics = a.Storage.MetricsForLabelMatchers( + a.Start.Add(-n.Offset-StalenessDelta), a.End.Add(-n.Offset), + n.LabelMatchers..., + ) n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) pt := getPreloadTimes(n.Offset) @@ -95,7 +98,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error { } } case *MatrixSelector: - n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...) + n.metrics = a.Storage.MetricsForLabelMatchers( + a.Start.Add(-n.Offset-n.Range), a.End.Add(-n.Offset), + n.LabelMatchers..., + ) n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) pt := getPreloadTimes(n.Offset) diff --git a/storage/local/interface.go b/storage/local/interface.go index d9dbc4f21..26bc5325a 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -40,20 +40,22 @@ type Storage interface { // NewPreloader returns a new Preloader which allows preloading and pinning // series data into memory for use within a query. NewPreloader() Preloader - // MetricsForLabelMatchers returns the metrics from storage that satisfy the given - // label matchers. At least one label matcher must be specified that does not - // match the empty string. - MetricsForLabelMatchers(...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric - // LastSamplePairForFingerprint returns the last sample pair that has - // been ingested for the provided fingerprint. If this instance of the + // MetricsForLabelMatchers returns the metrics from storage that satisfy + // the given label matchers. At least one label matcher must be + // specified that does not match the empty string. The times from and + // through are hints for the storage to optimize the search. The storage + // MAY exclude metrics that have no samples in the specified interval + // from the returned map. In doubt, specify model.Earliest for from and + // model.Latest for through. + MetricsForLabelMatchers(from, through model.Time, matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric + // LastSampleForFingerprint returns the last sample that has been + // ingested for the provided fingerprint. If this instance of the // Storage has never ingested a sample for the provided fingerprint (or // the last ingestion is so long ago that the series has been archived), - // ZeroSamplePair is returned. - LastSamplePairForFingerprint(model.Fingerprint) model.SamplePair + // ZeroSample is returned. + LastSampleForFingerprint(model.Fingerprint) model.Sample // Get all of the label values that are associated with a given label name. LabelValuesForLabelName(model.LabelName) model.LabelValues - // Get the metric associated with the provided fingerprint. - MetricForFingerprint(model.Fingerprint) metric.Metric // Drop all time series associated with the given fingerprints. DropMetricsForFingerprints(...model.Fingerprint) // Run the various maintenance loops in goroutines. Returns when the @@ -89,7 +91,7 @@ type SeriesIterator interface { type Preloader interface { PreloadRange( fp model.Fingerprint, - from model.Time, through model.Time, + from, through model.Time, ) SeriesIterator PreloadInstant( fp model.Fingerprint, @@ -100,8 +102,15 @@ type Preloader interface { } // ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local -// package to signal a non-existing sample. It is a SamplePair with timestamp -// model.Earliest and value 0.0. Note that the natural zero value of SamplePair -// has a timestamp of 0, which is possible to appear in a real SamplePair and -// thus not suitable to signal a non-existing SamplePair. +// package to signal a non-existing sample pair. It is a SamplePair with +// timestamp model.Earliest and value 0.0. Note that the natural zero value of +// SamplePair has a timestamp of 0, which is possible to appear in a real +// SamplePair and thus not suitable to signal a non-existing SamplePair. var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest} + +// ZeroSample is the pseudo zero-value of model.Sample used by the local package +// to signal a non-existing sample. It is a Sample with timestamp +// model.Earliest, value 0.0, and metric nil. Note that the natural zero value +// of Sample has a timestamp of 0, which is possible to appear in a real +// Sample and thus not suitable to signal a non-existing Sample. +var ZeroSample = model.Sample{Timestamp: model.Earliest} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 124a41fb6..a89307bc3 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -1068,6 +1068,9 @@ func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model // method is goroutine-safe. func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) + if err != nil { + p.setDirty(true, err) + } return metric, err } diff --git a/storage/local/storage.go b/storage/local/storage.go index 201c2ba8b..e6527085b 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -129,12 +129,13 @@ const ( type syncStrategy func() bool type memorySeriesStorage struct { - // numChunksToPersist has to be aligned for atomic operations. - numChunksToPersist int64 // The number of chunks waiting for persistence. - maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. - rushed bool // Whether the storage is in rushed mode. - rushedMtx sync.Mutex // Protects entering and exiting rushed mode. - throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). + // archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations. + archiveHighWatermark model.Time // No archived series has samples after this time. + numChunksToPersist int64 // The number of chunks waiting for persistence. + maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. + rushed bool // Whether the storage is in rushed mode. + rushedMtx sync.Mutex // Protects entering and exiting rushed mode. + throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). fpLocker *fingerprintLocker fpToSeries *seriesMap @@ -201,6 +202,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { dropAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, + archiveHighWatermark: model.Now().Add(-headChunkTimeout), maxChunksToPersist: o.MaxChunksToPersist, @@ -368,15 +370,20 @@ func (s *memorySeriesStorage) WaitForIndexing() { } // LastSampleForFingerprint implements Storage. -func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) model.SamplePair { +func (s *memorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) series, ok := s.fpToSeries.get(fp) if !ok { - return ZeroSamplePair + return ZeroSample + } + sp := series.lastSamplePair() + return model.Sample{ + Metric: series.metric, + Value: sp.Value, + Timestamp: sp.Timestamp, } - return series.lastSamplePair() } // boundedIterator wraps a SeriesIterator and does not allow fetching @@ -439,7 +446,10 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair } // MetricsForLabelMatchers implements Storage. -func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric { +func (s *memorySeriesStorage) MetricsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) map[model.Fingerprint]metric.Metric { var ( equals []model.LabelPair filters []*metric.LabelMatcher @@ -491,9 +501,11 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM filters = remaining } - result := make(map[model.Fingerprint]metric.Metric, len(resFPs)) + result := map[model.Fingerprint]metric.Metric{} for fp := range resFPs { - result[fp] = s.MetricForFingerprint(fp) + if metric, ok := s.metricForFingerprint(fp, from, through); ok { + result[fp] = metric + } } for _, matcher := range filters { for fp, met := range result { @@ -505,6 +517,58 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM return result } +// metricForFingerprint returns the metric for the given fingerprint if the +// corresponding time series has samples between 'from' and 'through'. +func (s *memorySeriesStorage) metricForFingerprint( + fp model.Fingerprint, + from, through model.Time, +) (metric.Metric, bool) { + // Lock FP so that no (un-)archiving will happen during lookup. + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + + watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) + + series, ok := s.fpToSeries.get(fp) + if ok { + if series.lastTime.Before(from) || series.savedFirstTime.After(through) { + return metric.Metric{}, false + } + // Wrap the returned metric in a copy-on-write (COW) metric here because + // the caller might mutate it. + return metric.Metric{ + Metric: series.metric, + }, true + } + // From here on, we are only concerned with archived metrics. + // If the high watermark of archived series is before 'from', we are done. + if watermark < from { + return metric.Metric{}, false + } + if from.After(model.Earliest) || through.Before(model.Latest) { + // The range lookup is relatively cheap, so let's do it first. + ok, first, last, err := s.persistence.hasArchivedMetric(fp) + if err != nil { + log.Errorf("Error retrieving archived time range for fingerprint %v: %v", fp, err) + return metric.Metric{}, false + } + if !ok || first.After(through) || last.Before(from) { + return metric.Metric{}, false + } + } + + met, err := s.persistence.archivedMetric(fp) + if err != nil { + log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) + return metric.Metric{}, false + } + + return metric.Metric{ + Metric: met, + Copied: false, + }, true +} + // LabelValuesForLabelName implements Storage. func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { lvs, err := s.persistence.labelValuesForLabelName(labelName) @@ -514,30 +578,6 @@ func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) return lvs } -// MetricForFingerprint implements Storage. -func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric.Metric { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series, ok := s.fpToSeries.get(fp) - if ok { - // Wrap the returned metric in a copy-on-write (COW) metric here because - // the caller might mutate it. - return metric.Metric{ - Metric: series.metric, - } - } - met, err := s.persistence.archivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) - } - - return metric.Metric{ - Metric: met, - Copied: false, - } -} - // DropMetric implements Storage. func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) { for _, fp := range fps { @@ -1077,8 +1117,9 @@ func (s *memorySeriesStorage) maintainMemorySeries( } } - // Archive if all chunks are evicted. - if iOldestNotEvicted == -1 { + // Archive if all chunks are evicted. Also make sure the last sample has + // an age of at least headChunkTimeout (which is very likely anyway). + if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { s.fpToSeries.del(fp) s.numSeries.Dec() if err := s.persistence.archiveMetric( @@ -1088,6 +1129,15 @@ func (s *memorySeriesStorage) maintainMemorySeries( return } s.seriesOps.WithLabelValues(archive).Inc() + oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) + if oldWatermark < int64(series.lastTime) { + if !atomic.CompareAndSwapInt64( + (*int64)(&s.archiveHighWatermark), + oldWatermark, int64(series.lastTime), + ) { + panic("s.archiveHighWatermark modified outside of maintainMemorySeries") + } + } return } // If we are here, the series is not archived, so check for chunkDesc diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 5fcb39c43..ab5c2a9ed 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -178,7 +178,10 @@ func TestMatches(t *testing.T) { } for _, mt := range matcherTests { - res := storage.MetricsForLabelMatchers(mt.matchers...) + res := storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, + mt.matchers..., + ) if len(mt.expected) != len(res) { t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res)) } @@ -362,7 +365,10 @@ func BenchmarkLabelMatching(b *testing.B) { for i := 0; i < b.N; i++ { benchLabelMatchingRes = map[model.Fingerprint]metric.Metric{} for _, mt := range matcherTests { - benchLabelMatchingRes = s.MetricsForLabelMatchers(mt...) + benchLabelMatchingRes = s.MetricsForLabelMatchers( + model.Earliest, model.Latest, + mt..., + ) } } // Stop timer to not count the storage closing. diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 823665f3b..6858938fb 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -226,7 +226,10 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { if err != nil { return nil, &apiError{errorBadData, err} } - for fp, met := range api.Storage.MetricsForLabelMatchers(matchers...) { + for fp, met := range api.Storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, // Get every series. + matchers..., + ) { res[fp] = met } } @@ -250,7 +253,10 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { if err != nil { return nil, &apiError{errorBadData, err} } - for fp := range api.Storage.MetricsForLabelMatchers(matchers...) { + for fp := range api.Storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, // Get every series. + matchers..., + ) { fps[fp] = struct{}{} } } diff --git a/web/federate.go b/web/federate.go index d9baf676b..26f4710eb 100644 --- a/web/federate.go +++ b/web/federate.go @@ -19,7 +19,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" @@ -33,7 +32,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { req.ParseForm() - metrics := map[model.Fingerprint]metric.Metric{} + fps := map[model.Fingerprint]struct{}{} for _, s := range req.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) @@ -41,8 +40,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - for fp, met := range h.storage.MetricsForLabelMatchers(matchers...) { - metrics[fp] = met + for fp := range h.storage.MetricsForLabelMatchers( + model.Now().Add(-promql.StalenessDelta), model.Latest, + matchers..., + ) { + fps[fp] = struct{}{} } } @@ -62,19 +64,19 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { Type: dto.MetricType_UNTYPED.Enum(), } - for fp, met := range metrics { + for fp := range fps { globalUsed := map[model.LabelName]struct{}{} - sp := h.storage.LastSamplePairForFingerprint(fp) + s := h.storage.LastSampleForFingerprint(fp) // Discard if sample does not exist or lays before the staleness interval. - if sp.Timestamp.Before(minTimestamp) { + if s.Timestamp.Before(minTimestamp) { continue } // Reset label slice. protMetric.Label = protMetric.Label[:0] - for ln, lv := range met.Metric { + for ln, lv := range s.Metric { if ln == model.MetricNameLabel { protMetricFam.Name = proto.String(string(lv)) continue @@ -98,8 +100,8 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } } - protMetric.TimestampMs = proto.Int64(int64(sp.Timestamp)) - protMetric.Untyped.Value = proto.Float64(float64(sp.Value)) + protMetric.TimestampMs = proto.Int64(int64(s.Timestamp)) + protMetric.Untyped.Value = proto.Float64(float64(s.Value)) if err := enc.Encode(protMetricFam); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) From 47e3c90f9b417938c9caa72e1932f743800d47da Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 9 Mar 2016 18:56:30 +0100 Subject: [PATCH 2/5] Clean up error propagation Only return an error where callers are doing something with it except simply logging and ignoring. All the errors touched in this commit flag the storage as dirty anyway, and that fact is logged anyway. So most of what is being removed here is just log spam. As discussed earlier, the class of errors that flags the storage as dirty signals fundamental corruption, no even bubbling up a one-time warning to the user (e.g. about incomplete results) isn't helping much because _anything_ happening in the storage has to be doubted from that point on (and in fact retroactively into the past, too). Flagging the storage dirty, and alerting on it (plus marking the state in the web UI) is the only way I can see right now. As a byproduct, I cleaned up the setDirty method a bit and improved the logged errors. --- storage/local/crashrecovery.go | 8 +++- storage/local/persistence.go | 61 ++++++++++++++----------------- storage/local/persistence_test.go | 51 +++++++------------------- storage/local/storage.go | 58 ++++++----------------------- storage/local/storage_test.go | 56 +++++----------------------- 5 files changed, 71 insertions(+), 163 deletions(-) diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index f51e54e7b..17626c07c 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -140,7 +140,13 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint } } - p.setDirty(false, nil) + p.dirtyMtx.Lock() + // Only declare storage clean if it didn't become dirty during crash recovery. + if !p.becameDirty { + p.dirty = false + } + p.dirtyMtx.Unlock() + log.Warn("Crash recovery complete.") return nil } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index a89307bc3..68bc612f0 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -312,49 +312,44 @@ func (p *persistence) isDirty() bool { return p.dirty } -// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was -// set to true with this method, it cannot be set to false again. (If we became -// dirty during our runtime, there is no way back. If we were dirty from the -// start, a clean-up might make us clean again.) The provided error will be -// logged as a reason if dirty is true. -func (p *persistence) setDirty(dirty bool, err error) { - if dirty { - p.dirtyCounter.Inc() - } +// setDirty flags the storage as dirty in a goroutine-safe way. The provided +// error will be logged as a reason the first time the storage is flagged as dirty. +func (p *persistence) setDirty(err error) { + p.dirtyCounter.Inc() p.dirtyMtx.Lock() defer p.dirtyMtx.Unlock() if p.becameDirty { return } - p.dirty = dirty - if dirty { - p.becameDirty = true - log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") - } + p.dirty = true + p.becameDirty = true + log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") } // fingerprintsForLabelPair returns the fingerprints for the given label // pair. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) (model.Fingerprints, error) { +func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints { fps, _, err := p.labelPairToFingerprints.Lookup(lp) if err != nil { - return nil, err + p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err)) + return nil } - return fps, nil + return fps } // labelValuesForLabelName returns the label values for the given label // name. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) { +func (p *persistence) labelValuesForLabelName(ln model.LabelName) model.LabelValues { lvs, _, err := p.labelNameToLabelValues.Lookup(ln) if err != nil { - return nil, err + p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err)) + return nil } - return lvs, nil + return lvs } // persistChunks persists a number of consecutive chunks of a series. It is the @@ -1008,29 +1003,28 @@ func (p *persistence) waitForIndexing() { // the metric. The caller must have locked the fingerprint. func (p *persistence) archiveMetric( fp model.Fingerprint, m model.Metric, first, last model.Time, -) error { +) { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { - p.setDirty(true, err) - return err + p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s", fp, err)) + return } if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { - p.setDirty(true, err) - return err + p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s", fp, err)) } - return nil } // hasArchivedMetric returns whether the archived metric for the given // fingerprint exists and if yes, what the first and last timestamp in the // corresponding series is. This method is goroutine-safe. func (p *persistence) hasArchivedMetric(fp model.Fingerprint) ( - hasMetric bool, firstTime, lastTime model.Time, err error, + hasMetric bool, firstTime, lastTime model.Time, ) { - firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) + firstTime, lastTime, hasMetric, err := p.archivedFingerprintToTimeRange.Lookup(fp) if err != nil { - p.setDirty(true, err) + p.setDirty(fmt.Errorf("error in method hasArchivedMetric(%v): %s", fp, err)) + hasMetric = false } - return + return hasMetric, firstTime, lastTime } // updateArchivedTimeRange updates an archived time range. The caller must make @@ -1069,9 +1063,10 @@ func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) if err != nil { - p.setDirty(true, err) + p.setDirty(fmt.Errorf("error in method archivedMetric(%v): %s", fp, err)) + return nil, err } - return metric, err + return metric, nil } // purgeArchivedMetric deletes an archived fingerprint and its corresponding @@ -1081,7 +1076,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { defer func() { if err != nil { - p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err)) + p.setDirty(fmt.Errorf("error in method purgeArchivedMetric(%v): %s", fp, err)) } }() diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index e2da77803..692f494d5 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -770,58 +770,46 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { p.indexMetric(2, m2) p.waitForIndexing() - outFPs, err := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) - if err != nil { - t.Fatal(err) - } + outFPs := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) want := model.Fingerprints{1} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) want = model.Fingerprints{2} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - if archived, _, _, err := p.hasArchivedMetric(1); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(1); !archived { t.Error("want FP 1 archived") } - if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(2); !archived { t.Error("want FP 2 archived") } - if err != p.purgeArchivedMetric(1) { + if err := p.purgeArchivedMetric(1); err != nil { t.Fatal(err) } - if err != p.purgeArchivedMetric(3) { + if err := p.purgeArchivedMetric(3); err != nil { // Purging something that has not beet archived is not an error. t.Fatal(err) } p.waitForIndexing() - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) want = nil if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) want = model.Fingerprints{2} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - if archived, _, _, err := p.hasArchivedMetric(1); err != nil || archived { + if archived, _, _ := p.hasArchivedMetric(1); archived { t.Error("want FP 1 not archived") } - if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(2); !archived { t.Error("want FP 2 archived") } } @@ -983,9 +971,7 @@ func testIndexing(t *testing.T, encoding chunkEncoding) { for i, b := range batches { for fp, m := range b.fpToMetric { p.indexMetric(fp, m) - if err := p.archiveMetric(fp, m, 1, 2); err != nil { - t.Fatal(err) - } + p.archiveMetric(fp, m, 1, 2) indexedFpsToMetrics[fp] = m } verifyIndexedState(i, t, b, indexedFpsToMetrics, p) @@ -1029,10 +1015,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet } // Check that archived metrics are in membership index. - has, first, last, err := p.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + has, first, last := p.hasArchivedMetric(fp) if !has { t.Errorf("%d. fingerprint %v not found", i, fp) } @@ -1046,10 +1029,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label name -> label values mappings. for ln, lvs := range b.expectedLnToLvs { - outLvs, err := p.labelValuesForLabelName(ln) - if err != nil { - t.Fatal(err) - } + outLvs := p.labelValuesForLabelName(ln) outSet := codable.LabelValueSet{} for _, lv := range outLvs { @@ -1063,10 +1043,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label pair -> fingerprints mappings. for lp, fps := range b.expectedLpToFps { - outFPs, err := p.fingerprintsForLabelPair(lp) - if err != nil { - t.Fatal(err) - } + outFPs := p.fingerprintsForLabelPair(lp) outSet := codable.FingerprintSet{} for _, fp := range outFPs { diff --git a/storage/local/storage.go b/storage/local/storage.go index bc380bd6b..a24387831 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -425,10 +425,7 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair var result map[model.Fingerprint]struct{} for _, pair := range pairs { intersection := map[model.Fingerprint]struct{}{} - fps, err := s.persistence.fingerprintsForLabelPair(pair) - if err != nil { - log.Error("Error getting fingerprints for label pair: ", err) - } + fps := s.persistence.fingerprintsForLabelPair(pair) if len(fps) == 0 { return nil } @@ -547,19 +544,14 @@ func (s *memorySeriesStorage) metricForFingerprint( } if from.After(model.Earliest) || through.Before(model.Latest) { // The range lookup is relatively cheap, so let's do it first. - ok, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived time range for fingerprint %v: %v", fp, err) - return metric.Metric{}, false - } + ok, first, last := s.persistence.hasArchivedMetric(fp) if !ok || first.After(through) || last.Before(from) { return metric.Metric{}, false } } - met, err := s.persistence.archivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) + met, _ := s.persistence.archivedMetric(fp) // Ignoring error, there is nothing we can do. + if met == nil { return metric.Metric{}, false } @@ -571,11 +563,7 @@ func (s *memorySeriesStorage) metricForFingerprint( // LabelValuesForLabelName implements Storage. func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { - lvs, err := s.persistence.labelValuesForLabelName(labelName) - if err != nil { - log.Errorf("Error getting label values for label name %q: %v", labelName, err) - } - return lvs + return s.persistence.labelValuesForLabelName(labelName) } // DropMetric implements Storage. @@ -603,7 +591,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { s.fpLocker.Unlock(fp) }() // Func wrapper because fp might change below. if err != nil { - s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) + s.persistence.setDirty(fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) return err } if fp != rawFP { @@ -745,11 +733,7 @@ func (s *memorySeriesStorage) getSeriesForRange( if ok { return series } - has, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") - return nil - } + has, first, last := s.persistence.hasArchivedMetric(fp) if !has { s.invalidPreloadRequestsCount.Inc() return nil @@ -759,7 +743,7 @@ func (s *memorySeriesStorage) getSeriesForRange( } metric, err := s.persistence.archivedMetric(fp) if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + // Error already logged, storage declared dirty by archivedMetric. return nil } series, err = s.getOrCreateSeries(fp, metric) @@ -1152,12 +1136,7 @@ func (s *memorySeriesStorage) maintainMemorySeries( if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { s.fpToSeries.del(fp) s.numSeries.Dec() - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.lastTime, - ); err != nil { - log.Errorf("Error archiving metric %v: %v", series.metric, err) - return - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) s.seriesOps.WithLabelValues(archive).Inc() oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) if oldWatermark < int64(series.lastTime) { @@ -1278,11 +1257,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.Error("Error looking up archived time range: ", err) - return - } + has, firstTime, lastTime := s.persistence.hasArchivedMetric(fp) if !has || !firstTime.Before(beforeTime) { // Oldest sample not old enough, or metric purged or unarchived in the meantime. return @@ -1295,10 +1270,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor log.Error("Error dropping persisted chunks: ", err) } if allDropped { - if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err) - return - } + s.persistence.purgeArchivedMetric(fp) // Ignoring error. Nothing we can do. s.seriesOps.WithLabelValues(archivePurge).Inc() return } @@ -1487,13 +1459,7 @@ func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, s.incNumChunksToPersist(-numChunksNotYetPersisted) } else { - if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log. - With("fingerprint", fp). - With("metric", m). - With("error", err). - Error("Error purging metric from archive.") - } + s.persistence.purgeArchivedMetric(fp) // Ignoring error. There is nothing we can do. } if m != nil { // If we know a metric now, unindex it in any case. diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 01f30c28b..291b32918 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -471,11 +471,7 @@ func TestDropMetrics(t *testing.T) { s.maintainMemorySeries(fpToBeArchived, 0) s.fpLocker.Lock(fpToBeArchived) s.fpToSeries.del(fpToBeArchived) - if err := s.persistence.archiveMetric( - fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), - ); err != nil { - t.Error(err) - } + s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond)) s.fpLocker.Unlock(fpToBeArchived) fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) @@ -582,11 +578,7 @@ func TestQuarantineMetric(t *testing.T) { s.maintainMemorySeries(fpToBeArchived, 0) s.fpLocker.Lock(fpToBeArchived) s.fpToSeries.del(fpToBeArchived) - if err := s.persistence.archiveMetric( - fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), - ); err != nil { - t.Error(err) - } + s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond)) s.fpLocker.Unlock(fpToBeArchived) // Corrupt the series file for m3. @@ -1144,36 +1136,22 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), lastTime, - ); err != nil { - t.Fatal(err) - } - - archived, _, _, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + archived, _, _ := s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") } // Drop ~half of the chunks of an archived series. s.maintainArchivedSeries(fp, 10000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("archived series purged although only half of the chunks dropped") } // Drop everything. s.maintainArchivedSeries(fp, 100000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if archived { t.Fatal("archived series not dropped") } @@ -1199,16 +1177,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), lastTime, - ); err != nil { - t.Fatal(err) - } - - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") } @@ -1220,10 +1190,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if !ok { t.Fatal("could not find series") } - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if archived { t.Fatal("archived") } @@ -1231,10 +1198,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // This will archive again, but must not drop it completely, despite the // memorySeries being empty. s.maintainMemorySeries(fp, 10000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("series purged completely") } From 9445c7053d2203ea8ff1d37dfb33d80331c4d709 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 9 Mar 2016 20:27:50 +0100 Subject: [PATCH 3/5] Add tests for range-limited label matching While doing so, improve getSeriesForRange. --- storage/local/storage.go | 24 ++++++++---- storage/local/storage_test.go | 72 +++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index a24387831..734b97ad3 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -520,15 +520,12 @@ func (s *memorySeriesStorage) metricForFingerprint( fp model.Fingerprint, from, through model.Time, ) (metric.Metric, bool) { - // Lock FP so that no (un-)archiving will happen during lookup. s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) - series, ok := s.fpToSeries.get(fp) if ok { - if series.lastTime.Before(from) || series.savedFirstTime.After(through) { + if series.lastTime.Before(from) || series.firstTime().After(through) { return metric.Metric{}, false } // Wrap the returned metric in a copy-on-write (COW) metric here because @@ -539,13 +536,15 @@ func (s *memorySeriesStorage) metricForFingerprint( } // From here on, we are only concerned with archived metrics. // If the high watermark of archived series is before 'from', we are done. + watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) if watermark < from { return metric.Metric{}, false } if from.After(model.Earliest) || through.Before(model.Latest) { - // The range lookup is relatively cheap, so let's do it first. - ok, first, last := s.persistence.hasArchivedMetric(fp) - if !ok || first.After(through) || last.Before(from) { + // The range lookup is relatively cheap, so let's do it first if + // we have a chance the archived metric is not in the range. + has, first, last := s.persistence.hasArchivedMetric(fp) + if !has || first.After(through) || last.Before(from) { return metric.Metric{}, false } } @@ -725,14 +724,25 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me } // getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +// +// The caller must have locked the fp. func (s *memorySeriesStorage) getSeriesForRange( fp model.Fingerprint, from model.Time, through model.Time, ) *memorySeries { series, ok := s.fpToSeries.get(fp) if ok { + if series.lastTime.Before(from) || series.firstTime().After(through) { + return nil + } return series } + + watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) + if watermark < from { + return nil + } + has, first, last := s.persistence.hasArchivedMetric(fp) if !has { s.invalidPreloadRequestsCount.Inc() diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 291b32918..f305e792f 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -34,6 +34,7 @@ func TestMatches(t *testing.T) { storage, closer := NewTestStorage(t, 1) defer closer.Close() + storage.archiveHighWatermark = 90 samples := make([]*model.Sample, 100) fingerprints := make(model.Fingerprints, 100) @@ -56,6 +57,20 @@ func TestMatches(t *testing.T) { } storage.WaitForIndexing() + // Archive every tenth metric. + for i, fp := range fingerprints { + if i%10 != 0 { + continue + } + s, ok := storage.fpToSeries.get(fp) + if !ok { + t.Fatal("could not retrieve series for fp", fp) + } + storage.fpLocker.Lock(fp) + storage.persistence.archiveMetric(fp, s.metric, s.firstTime(), s.lastTime) + storage.fpLocker.Unlock(fp) + } + newMatcher := func(matchType metric.MatchType, name model.LabelName, value model.LabelValue) *metric.LabelMatcher { lm, err := metric.NewLabelMatcher(matchType, name, value) if err != nil { @@ -197,6 +212,56 @@ func TestMatches(t *testing.T) { t.Errorf("expected fingerprint %s for %q not in result", fp1, mt.matchers) } } + // Smoketest for from/through. + if len(storage.MetricsForLabelMatchers( + model.Earliest, -10000, + mt.matchers..., + )) > 0 { + t.Error("expected no matches with 'through' older than any sample") + } + if len(storage.MetricsForLabelMatchers( + 10000, model.Latest, + mt.matchers..., + )) > 0 { + t.Error("expected no matches with 'from' newer than any sample") + } + // Now the tricky one, cut out something from the middle. + var ( + from model.Time = 25 + through model.Time = 75 + ) + res = storage.MetricsForLabelMatchers( + from, through, + mt.matchers..., + ) + expected := model.Fingerprints{} + for _, fp := range mt.expected { + i := 0 + for ; fingerprints[i] != fp && i < len(fingerprints); i++ { + } + if i == len(fingerprints) { + t.Fatal("expected fingerprint does not exist") + } + if !model.Time(i).Before(from) && !model.Time(i).After(through) { + expected = append(expected, fp) + } + } + if len(expected) != len(res) { + t.Errorf("expected %d range-limited matches for %q, found %d", len(expected), mt.matchers, len(res)) + } + for fp1 := range res { + found := false + for _, fp2 := range expected { + if fp1 == fp2 { + found = true + break + } + } + if !found { + t.Errorf("expected fingerprint %s for %q not in range-limited result", fp1, mt.matchers) + } + } + } } @@ -1195,6 +1260,9 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { t.Fatal("archived") } + // Set archiveHighWatermark to a low value so that we can see it increase. + s.archiveHighWatermark = 42 + // This will archive again, but must not drop it completely, despite the // memorySeries being empty. s.maintainMemorySeries(fp, 10000) @@ -1202,6 +1270,10 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if !archived { t.Fatal("series purged completely") } + // archiveHighWatermark must have been set by maintainMemorySeries. + if want, got := model.Time(19998), s.archiveHighWatermark; want != got { + t.Errorf("want archiveHighWatermark %v, got %v", want, got) + } } func TestEvictAndPurgeSeriesChunkType0(t *testing.T) { From e8c1f30ab2020a558cae2f6ec030f1d4fab24421 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 9 Mar 2016 21:56:15 +0100 Subject: [PATCH 4/5] Merge the parallel logic of getSeriesForRange and metricForFingerprint --- storage/local/storage.go | 89 +++++++++++++---------------------- storage/local/test_helpers.go | 2 + 2 files changed, 34 insertions(+), 57 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index 734b97ad3..740dede3c 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -500,9 +500,11 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers( result := map[model.Fingerprint]metric.Metric{} for fp := range resFPs { - if metric, ok := s.metricForFingerprint(fp, from, through); ok { - result[fp] = metric + s.fpLocker.Lock(fp) + if met, _, ok := s.metricForRange(fp, from, through); ok { + result[fp] = metric.Metric{Metric: met} } + s.fpLocker.Unlock(fp) } for _, matcher := range filters { for fp, met := range result { @@ -514,50 +516,46 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers( return result } -// metricForFingerprint returns the metric for the given fingerprint if the -// corresponding time series has samples between 'from' and 'through'. -func (s *memorySeriesStorage) metricForFingerprint( +// metricForRange returns the metric for the given fingerprint if the +// corresponding time series has samples between 'from' and 'through', together +// with a pointer to the series if it is in memory already. For a series that +// does not have samples between 'from' and 'through', the returned bool is +// false. For an archived series that does contain samples between 'from' and +// 'through', it returns (metric, nil, true). +// +// The caller must have locked the fp. +func (s *memorySeriesStorage) metricForRange( fp model.Fingerprint, from, through model.Time, -) (metric.Metric, bool) { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - +) (model.Metric, *memorySeries, bool) { series, ok := s.fpToSeries.get(fp) if ok { if series.lastTime.Before(from) || series.firstTime().After(through) { - return metric.Metric{}, false + return nil, nil, false } - // Wrap the returned metric in a copy-on-write (COW) metric here because - // the caller might mutate it. - return metric.Metric{ - Metric: series.metric, - }, true + return series.metric, series, true } // From here on, we are only concerned with archived metrics. // If the high watermark of archived series is before 'from', we are done. watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) if watermark < from { - return metric.Metric{}, false + return nil, nil, false } if from.After(model.Earliest) || through.Before(model.Latest) { // The range lookup is relatively cheap, so let's do it first if // we have a chance the archived metric is not in the range. has, first, last := s.persistence.hasArchivedMetric(fp) if !has || first.After(through) || last.Before(from) { - return metric.Metric{}, false + return nil, nil, false } } - met, _ := s.persistence.archivedMetric(fp) // Ignoring error, there is nothing we can do. - if met == nil { - return metric.Metric{}, false + metric, err := s.persistence.archivedMetric(fp) + if err != nil { + // archivedMetric has already flagged the storage as dirty in this case. + return nil, nil, false } - - return metric.Metric{ - Metric: met, - Copied: false, - }, true + return metric, nil, true } // LabelValuesForLabelName implements Storage. @@ -723,43 +721,20 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me return series, nil } -// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. // // The caller must have locked the fp. -func (s *memorySeriesStorage) getSeriesForRange( +func (s *memorySeriesStorage) seriesForRange( fp model.Fingerprint, from model.Time, through model.Time, ) *memorySeries { - series, ok := s.fpToSeries.get(fp) - if ok { - if series.lastTime.Before(from) || series.firstTime().After(through) { - return nil - } - return series - } - - watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) - if watermark < from { + metric, series, ok := s.metricForRange(fp, from, through) + if !ok { return nil } - - has, first, last := s.persistence.hasArchivedMetric(fp) - if !has { - s.invalidPreloadRequestsCount.Inc() - return nil - } - if last.Before(from) || first.After(through) { - return nil - } - metric, err := s.persistence.archivedMetric(fp) - if err != nil { - // Error already logged, storage declared dirty by archivedMetric. - return nil - } - series, err = s.getOrCreateSeries(fp, metric) - if err != nil { - // getOrCreateSeries took care of quarantining already. - return nil + if series == nil { + series, _ = s.getOrCreateSeries(fp, metric) + // getOrCreateSeries took care of quarantining already, so ignore the error. } return series } @@ -771,7 +746,7 @@ func (s *memorySeriesStorage) preloadChunksForRange( s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series := s.getSeriesForRange(fp, from, through) + series := s.seriesForRange(fp, from, through) if series == nil { return nil, nopIter } @@ -790,7 +765,7 @@ func (s *memorySeriesStorage) preloadChunksForInstant( s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series := s.getSeriesForRange(fp, from, through) + series := s.seriesForRange(fp, from, through) if series == nil { return nil, nopIter } diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 4e914b724..1dedf518e 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -21,6 +21,7 @@ package local import ( "time" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/util/testutil" ) @@ -51,6 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, SyncStrategy: Adaptive, } storage := NewMemorySeriesStorage(o) + storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest if err := storage.Start(); err != nil { directory.Close() t.Fatalf("Error creating storage: %s", err) From 199f309a39c6079b3502766225bfaa54cfd434bc Mon Sep 17 00:00:00 2001 From: beorn7 Date: Sun, 13 Mar 2016 11:54:24 +0100 Subject: [PATCH 5/5] Resurrect and rename invalid preload requests count metric. It is now also used in label matching, so the name of the metric changed from `prometheus_local_storage_invalid_preload_requests_total` to `non_existent_series_matches_total'. --- storage/local/storage.go | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index 740dede3c..79ece03d3 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -159,15 +159,15 @@ type memorySeriesStorage struct { quarantineRequests chan quarantineRequest quarantineStopping, quarantineStopped chan struct{} - persistErrors prometheus.Counter - numSeries prometheus.Gauge - seriesOps *prometheus.CounterVec - ingestedSamplesCount prometheus.Counter - outOfOrderSamplesCount prometheus.Counter - invalidPreloadRequestsCount prometheus.Counter - maintainSeriesDuration *prometheus.SummaryVec - persistenceUrgencyScore prometheus.Gauge - rushedMode prometheus.Gauge + persistErrors prometheus.Counter + numSeries prometheus.Gauge + seriesOps *prometheus.CounterVec + ingestedSamplesCount prometheus.Counter + outOfOrderSamplesCount prometheus.Counter + nonExistentSeriesMatchesCount prometheus.Counter + maintainSeriesDuration *prometheus.SummaryVec + persistenceUrgencyScore prometheus.Gauge + rushedMode prometheus.Gauge } // MemorySeriesStorageOptions contains options needed by @@ -248,11 +248,11 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { Name: "out_of_order_samples_total", Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.", }), - invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{ + nonExistentSeriesMatchesCount: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "invalid_preload_requests_total", - Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", + Name: "non_existent_series_matches_total", + Help: "How often a non-existent series was referred to during label matching or chunk preloading. This is an indication of outdated label indexes.", }), maintainSeriesDuration: prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -545,7 +545,11 @@ func (s *memorySeriesStorage) metricForRange( // The range lookup is relatively cheap, so let's do it first if // we have a chance the archived metric is not in the range. has, first, last := s.persistence.hasArchivedMetric(fp) - if !has || first.After(through) || last.Before(from) { + if !has { + s.nonExistentSeriesMatchesCount.Inc() + return nil, nil, false + } + if first.After(through) || last.Before(from) { return nil, nil, false } } @@ -1492,7 +1496,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.seriesOps.Describe(ch) ch <- s.ingestedSamplesCount.Desc() ch <- s.outOfOrderSamplesCount.Desc() - ch <- s.invalidPreloadRequestsCount.Desc() + ch <- s.nonExistentSeriesMatchesCount.Desc() ch <- numMemChunksDesc s.maintainSeriesDuration.Describe(ch) ch <- s.persistenceUrgencyScore.Desc() @@ -1519,7 +1523,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { s.seriesOps.Collect(ch) ch <- s.ingestedSamplesCount ch <- s.outOfOrderSamplesCount - ch <- s.invalidPreloadRequestsCount + ch <- s.nonExistentSeriesMatchesCount ch <- prometheus.MustNewConstMetric( numMemChunksDesc, prometheus.GaugeValue,