diff --git a/storage/local/storage.go b/storage/local/storage.go index a243878310..734b97ad37 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -520,15 +520,12 @@ func (s *memorySeriesStorage) metricForFingerprint( fp model.Fingerprint, from, through model.Time, ) (metric.Metric, bool) { - // Lock FP so that no (un-)archiving will happen during lookup. s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) - series, ok := s.fpToSeries.get(fp) if ok { - if series.lastTime.Before(from) || series.savedFirstTime.After(through) { + if series.lastTime.Before(from) || series.firstTime().After(through) { return metric.Metric{}, false } // Wrap the returned metric in a copy-on-write (COW) metric here because @@ -539,13 +536,15 @@ func (s *memorySeriesStorage) metricForFingerprint( } // From here on, we are only concerned with archived metrics. // If the high watermark of archived series is before 'from', we are done. + watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) if watermark < from { return metric.Metric{}, false } if from.After(model.Earliest) || through.Before(model.Latest) { - // The range lookup is relatively cheap, so let's do it first. - ok, first, last := s.persistence.hasArchivedMetric(fp) - if !ok || first.After(through) || last.Before(from) { + // The range lookup is relatively cheap, so let's do it first if + // we have a chance the archived metric is not in the range. + has, first, last := s.persistence.hasArchivedMetric(fp) + if !has || first.After(through) || last.Before(from) { return metric.Metric{}, false } } @@ -725,14 +724,25 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me } // getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +// +// The caller must have locked the fp. func (s *memorySeriesStorage) getSeriesForRange( fp model.Fingerprint, from model.Time, through model.Time, ) *memorySeries { series, ok := s.fpToSeries.get(fp) if ok { + if series.lastTime.Before(from) || series.firstTime().After(through) { + return nil + } return series } + + watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) + if watermark < from { + return nil + } + has, first, last := s.persistence.hasArchivedMetric(fp) if !has { s.invalidPreloadRequestsCount.Inc() diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 291b329188..f305e792f6 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -34,6 +34,7 @@ func TestMatches(t *testing.T) { storage, closer := NewTestStorage(t, 1) defer closer.Close() + storage.archiveHighWatermark = 90 samples := make([]*model.Sample, 100) fingerprints := make(model.Fingerprints, 100) @@ -56,6 +57,20 @@ func TestMatches(t *testing.T) { } storage.WaitForIndexing() + // Archive every tenth metric. + for i, fp := range fingerprints { + if i%10 != 0 { + continue + } + s, ok := storage.fpToSeries.get(fp) + if !ok { + t.Fatal("could not retrieve series for fp", fp) + } + storage.fpLocker.Lock(fp) + storage.persistence.archiveMetric(fp, s.metric, s.firstTime(), s.lastTime) + storage.fpLocker.Unlock(fp) + } + newMatcher := func(matchType metric.MatchType, name model.LabelName, value model.LabelValue) *metric.LabelMatcher { lm, err := metric.NewLabelMatcher(matchType, name, value) if err != nil { @@ -197,6 +212,56 @@ func TestMatches(t *testing.T) { t.Errorf("expected fingerprint %s for %q not in result", fp1, mt.matchers) } } + // Smoketest for from/through. + if len(storage.MetricsForLabelMatchers( + model.Earliest, -10000, + mt.matchers..., + )) > 0 { + t.Error("expected no matches with 'through' older than any sample") + } + if len(storage.MetricsForLabelMatchers( + 10000, model.Latest, + mt.matchers..., + )) > 0 { + t.Error("expected no matches with 'from' newer than any sample") + } + // Now the tricky one, cut out something from the middle. + var ( + from model.Time = 25 + through model.Time = 75 + ) + res = storage.MetricsForLabelMatchers( + from, through, + mt.matchers..., + ) + expected := model.Fingerprints{} + for _, fp := range mt.expected { + i := 0 + for ; fingerprints[i] != fp && i < len(fingerprints); i++ { + } + if i == len(fingerprints) { + t.Fatal("expected fingerprint does not exist") + } + if !model.Time(i).Before(from) && !model.Time(i).After(through) { + expected = append(expected, fp) + } + } + if len(expected) != len(res) { + t.Errorf("expected %d range-limited matches for %q, found %d", len(expected), mt.matchers, len(res)) + } + for fp1 := range res { + found := false + for _, fp2 := range expected { + if fp1 == fp2 { + found = true + break + } + } + if !found { + t.Errorf("expected fingerprint %s for %q not in range-limited result", fp1, mt.matchers) + } + } + } } @@ -1195,6 +1260,9 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { t.Fatal("archived") } + // Set archiveHighWatermark to a low value so that we can see it increase. + s.archiveHighWatermark = 42 + // This will archive again, but must not drop it completely, despite the // memorySeries being empty. s.maintainMemorySeries(fp, 10000) @@ -1202,6 +1270,10 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if !archived { t.Fatal("series purged completely") } + // archiveHighWatermark must have been set by maintainMemorySeries. + if want, got := model.Time(19998), s.archiveHighWatermark; want != got { + t.Errorf("want archiveHighWatermark %v, got %v", want, got) + } } func TestEvictAndPurgeSeriesChunkType0(t *testing.T) {