diff --git a/storage/local/storage.go b/storage/local/storage.go index 78eb557c7..5d4188419 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -415,19 +415,19 @@ func (s *MemorySeriesStorage) WaitForIndexing() { // LastSampleForLabelMatchers implements Storage. func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { - fps := map[model.Fingerprint]struct{}{} + mergedFPs := map[model.Fingerprint]struct{}{} for _, matchers := range matcherSets { - fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...) + fps, err := s.fpsForLabelMatchers(cutoff, model.Latest, matchers...) if err != nil { return nil, err } - for fp := range fpToMetric { - fps[fp] = struct{}{} + for fp := range fps { + mergedFPs[fp] = struct{}{} } } - res := make(model.Vector, 0, len(fps)) - for fp := range fps { + res := make(model.Vector, 0, len(mergedFPs)) + for fp := range mergedFPs { s.fpLocker.Lock(fp) series, ok := s.fpToSeries.get(fp) @@ -485,13 +485,13 @@ func (bit *boundedIterator) Close() { // QueryRange implements Storage. func (s *MemorySeriesStorage) QueryRange(_ context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { - fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) + fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...) if err != nil { return nil, err } - iterators := make([]SeriesIterator, 0, len(fpToMetric)) - for fp := range fpToMetric { - it := s.preloadChunksForRange(fp, from, through) + iterators := make([]SeriesIterator, 0, len(fpSeriesPairs)) + for _, pair := range fpSeriesPairs { + it := s.preloadChunksForRange(pair, from, through) iterators = append(iterators, it) } return iterators, nil @@ -502,13 +502,13 @@ func (s *MemorySeriesStorage) QueryInstant(_ context.Context, ts model.Time, sta from := ts.Add(-stalenessDelta) through := ts - fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) + fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...) if err != nil { return nil, err } - iterators := make([]SeriesIterator, 0, len(fpToMetric)) - for fp := range fpToMetric { - it := s.preloadChunksForInstant(fp, from, through) + iterators := make([]SeriesIterator, 0, len(fpSeriesPairs)) + for _, pair := range fpSeriesPairs { + it := s.preloadChunksForInstant(pair, from, through) iterators = append(iterators, it) } return iterators, nil @@ -563,7 +563,7 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers( return metrics, nil } -// returns candidate FPs for given matchers and remaining matchers to be checked +// candidateFPsForLabelMatchers returns candidate FPs for given matchers and remaining matchers to be checked. func (s *MemorySeriesStorage) candidateFPsForLabelMatchers( matchers ...*metric.LabelMatcher, ) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) { @@ -632,6 +632,66 @@ func (s *MemorySeriesStorage) candidateFPsForLabelMatchers( return candidateFPs, matchers[matcherIdx:], nil } +func (s *MemorySeriesStorage) seriesForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) ([]fingerprintSeriesPair, error) { + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } + + result := []fingerprintSeriesPair{} +FPLoop: + for fp := range candidateFPs { + s.fpLocker.Lock(fp) + series := s.seriesForRange(fp, from, through) + s.fpLocker.Unlock(fp) + + if series == nil { + continue FPLoop + } + + for _, m := range matchersToCheck { + if !m.Match(series.metric[m.Name]) { + continue FPLoop + } + } + result = append(result, fingerprintSeriesPair{fp, series}) + } + return result, nil +} + +func (s *MemorySeriesStorage) fpsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) (map[model.Fingerprint]struct{}, error) { + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } + +FPLoop: + for fp := range candidateFPs { + s.fpLocker.Lock(fp) + met, _, ok := s.metricForRange(fp, from, through) + s.fpLocker.Unlock(fp) + + if !ok { + delete(candidateFPs, fp) + continue FPLoop + } + + for _, m := range matchersToCheck { + if !m.Match(met[m.Name]) { + delete(candidateFPs, fp) + continue FPLoop + } + } + } + return candidateFPs, nil +} + func (s *MemorySeriesStorage) metricsForLabelMatchers( from, through model.Time, matchers ...*metric.LabelMatcher, @@ -643,19 +703,19 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( } result := map[model.Fingerprint]metric.Metric{} -FP_LOOP: +FPLoop: for fp := range candidateFPs { s.fpLocker.Lock(fp) met, _, ok := s.metricForRange(fp, from, through) s.fpLocker.Unlock(fp) if !ok { - continue FP_LOOP + continue FPLoop } for _, m := range matchersToCheck { if !m.Match(met[m.Name]) { - continue FP_LOOP + continue FPLoop } } result[fp] = metric.Metric{Metric: met} @@ -716,14 +776,14 @@ func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelNa // DropMetricsForLabelMatchers implements Storage. func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) { - fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...) + fps, err := s.fpsForLabelMatchers(model.Earliest, model.Latest, matchers...) if err != nil { return 0, err } - for fp := range fpToMetric { + for fp := range fps { s.purgeSeries(fp, nil, nil) } - return len(fpToMetric), nil + return len(fps), nil } var ( @@ -884,7 +944,7 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me return series, nil } -// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +// seriesForRange is a helper method for seriesForLabelMatchers. // // The caller must have locked the fp. func (s *MemorySeriesStorage) seriesForRange( @@ -903,16 +963,17 @@ func (s *MemorySeriesStorage) seriesForRange( } func (s *MemorySeriesStorage) preloadChunksForRange( - fp model.Fingerprint, + pair fingerprintSeriesPair, from model.Time, through model.Time, ) SeriesIterator { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series := s.seriesForRange(fp, from, through) + fp, series := pair.fp, pair.series if series == nil { return nopIter } + + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + iter, err := series.preloadChunksForRange(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) @@ -922,16 +983,17 @@ func (s *MemorySeriesStorage) preloadChunksForRange( } func (s *MemorySeriesStorage) preloadChunksForInstant( - fp model.Fingerprint, + pair fingerprintSeriesPair, from model.Time, through model.Time, ) SeriesIterator { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series := s.seriesForRange(fp, from, through) + fp, series := pair.fp, pair.series if series == nil { return nopIter } + + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + iter, err := series.preloadChunksForInstant(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index a19f753ae..c0cee6fe0 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -636,12 +636,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[0]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[1]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -669,12 +669,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[0]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[1]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -752,7 +752,7 @@ func TestQuarantineMetric(t *testing.T) { } // This will access the corrupt file and lead to quarantining. - iter := s.preloadChunksForInstant(fpToBeArchived, now.Add(-2*time.Hour-1*time.Minute), now.Add(-2*time.Hour)) + iter := s.preloadChunksForInstant(makeFingerprintSeriesPair(s, fpToBeArchived), now.Add(-2*time.Hour-1*time.Minute), now.Add(-2*time.Hour)) iter.Close() time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait. s.WaitForIndexing() @@ -894,7 +894,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) // #1 Exactly on a sample. for i, expected := range samples { @@ -972,7 +972,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) b.ResetTimer() @@ -1054,7 +1054,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) // #1 Zero length interval at sample. for i, expected := range samples { @@ -1210,7 +1210,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) b.ResetTimer() @@ -1260,7 +1260,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) actual := it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1278,7 +1278,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop everything. s.maintainMemorySeries(fp, 100000) - it = s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) actual = it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1442,7 +1442,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunkEncoding) { } // Load everything back. - it := s.preloadChunksForRange(fp, 0, 100000) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), 0, 100000) if oldLen != len(series.chunkDescs) { t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs)) @@ -1770,7 +1770,7 @@ func verifyStorageRandom(t testing.TB, s *MemorySeriesStorage, samples model.Sam for _, i := range rand.Perm(len(samples)) { sample := samples[i] fp := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) - it := s.preloadChunksForInstant(fp, sample.Timestamp, sample.Timestamp) + it := s.preloadChunksForInstant(makeFingerprintSeriesPair(s, fp), sample.Timestamp, sample.Timestamp) found := it.ValueAtOrBeforeTime(sample.Timestamp) startTime := it.(*boundedIterator).start switch { @@ -1813,7 +1813,7 @@ func verifyStorageSequential(t testing.TB, s *MemorySeriesStorage, samples model if it != nil { it.Close() } - it = s.preloadChunksForRange(fp, sample.Timestamp, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), sample.Timestamp, model.Latest) r = it.RangeValues(metric.Interval{ OldestInclusive: sample.Timestamp, NewestInclusive: model.Latest, @@ -1934,7 +1934,7 @@ func TestAppendOutOfOrder(t *testing.T) { fp := s.mapper.mapFP(m.FastFingerprint(), m) - it := s.preloadChunksForRange(fp, 0, 2) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), 0, 2) defer it.Close() want := []model.SamplePair{ diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 7c2a3a8ee..7e71a1a7d 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -65,3 +65,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*MemorySeriesStorage, return storage, closer } + +func makeFingerprintSeriesPair(s *MemorySeriesStorage, fp model.Fingerprint) fingerprintSeriesPair { + return fingerprintSeriesPair{fp, s.seriesForRange(fp, model.Earliest, model.Latest)} +}