diff --git a/promql/functions.go b/promql/functions.go index 0d3ec41951..433c06f023 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -548,7 +548,7 @@ func linearRegression(samples []model.SamplePair, interceptTime model.Time) (slo slope = covXY / varX intercept = sumY/n - slope*sumX/n - return + return slope, intercept } // === deriv(node model.ValMatrix) Vector === diff --git a/storage/local/preload.go b/storage/local/preload.go index b0113bd6be..fb6a21f64b 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -30,7 +30,7 @@ func (p *memorySeriesPreloader) PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, ) SeriesIterator { - cds, iter := p.storage.preloadChunksForRange(fp, from, through, false) + cds, iter := p.storage.preloadChunksForRange(fp, from, through) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) return iter } @@ -40,7 +40,7 @@ func (p *memorySeriesPreloader) PreloadInstant( fp model.Fingerprint, timestamp model.Time, stalenessDelta time.Duration, ) SeriesIterator { - cds, iter := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true) + cds, iter := p.storage.preloadChunksForInstant(fp, timestamp.Add(-stalenessDelta), timestamp) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) return iter } diff --git a/storage/local/series.go b/storage/local/series.go index f76d5ee27b..51ad865a93 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -399,30 +399,40 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine fun } } +// preloadChunksForInstant preloads chunks for the latest value in the given +// range. If the last sample saved in the memorySeries itself is the latest +// value in the given range, it will in fact preload zero chunks and just take +// that value. +func (s *memorySeries) preloadChunksForInstant( + fp model.Fingerprint, + from model.Time, through model.Time, + mss *memorySeriesStorage, +) ([]*chunkDesc, SeriesIterator, error) { + // If we have a lastSamplePair in the series, and thas last samplePair + // is in the interval, just take it in a singleSampleSeriesIterator. No + // need to pin or load anything. + lastSample := s.lastSamplePair() + if !through.Before(lastSample.Timestamp) && + !from.After(lastSample.Timestamp) && + lastSample != ZeroSamplePair { + iter := &boundedIterator{ + it: &singleSampleSeriesIterator{samplePair: lastSample}, + start: model.Now().Add(-mss.dropAfter), + } + return nil, iter, nil + } + // If we are here, we are out of luck and have to delegate to the more + // expensive method. + return s.preloadChunksForRange(fp, from, through, mss) +} + // preloadChunksForRange loads chunks for the given range from the persistence. // The caller must have locked the fingerprint of the series. func (s *memorySeries) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, - lastSampleOnly bool, mss *memorySeriesStorage, ) ([]*chunkDesc, SeriesIterator, error) { - // If we have to preload for only one sample, and we have a - // lastSamplePair in the series, and thas last samplePair is in the - // interval, just take it in a singleSampleSeriesIterator. No need to - // pin or load anything. - if lastSampleOnly { - lastSample := s.lastSamplePair() - if !through.Before(lastSample.Timestamp) && - !from.After(lastSample.Timestamp) && - lastSample != ZeroSamplePair { - iter := &boundedIterator{ - it: &singleSampleSeriesIterator{samplePair: lastSample}, - start: model.Now().Add(-mss.dropAfter), - } - return nil, iter, nil - } - } firstChunkDescTime := model.Latest if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].firstTime() diff --git a/storage/local/storage.go b/storage/local/storage.go index 2d99c073f3..ede1bfc612 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -695,46 +695,76 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me return series, nil } +// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +func (s *memorySeriesStorage) getSeriesForRange( + fp model.Fingerprint, + from model.Time, through model.Time, +) *memorySeries { + series, ok := s.fpToSeries.get(fp) + if ok { + return series + } + has, first, last, err := s.persistence.hasArchivedMetric(fp) + if err != nil { + log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + return nil + } + if !has { + s.invalidPreloadRequestsCount.Inc() + return nil + } + if last.Before(from) || first.After(through) { + return nil + } + metric, err := s.persistence.archivedMetric(fp) + if err != nil { + log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + return nil + } + series, err = s.getOrCreateSeries(fp, metric) + if err != nil { + // getOrCreateSeries took care of quarantining already. + return nil + } + return series +} + func (s *memorySeriesStorage) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, - lastSampleOnly bool, ) ([]*chunkDesc, SeriesIterator) { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series, ok := s.fpToSeries.get(fp) - if !ok { - has, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") - return nil, nopIter - } - if !has { - s.invalidPreloadRequestsCount.Inc() - return nil, nopIter - } - if from.Before(last) && through.After(first) { - metric, err := s.persistence.archivedMetric(fp) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") - return nil, nopIter - } - series, err = s.getOrCreateSeries(fp, metric) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Error while retrieving series.") - return nil, nopIter - } - } else { - return nil, nopIter - } + series := s.getSeriesForRange(fp, from, through) + if series == nil { + return nil, nopIter } - cds, it, err := series.preloadChunksForRange(fp, from, through, lastSampleOnly, s) + cds, iter, err := series.preloadChunksForRange(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) return nil, nopIter } - return cds, it + return cds, iter +} + +func (s *memorySeriesStorage) preloadChunksForInstant( + fp model.Fingerprint, + from model.Time, through model.Time, +) ([]*chunkDesc, SeriesIterator) { + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + + series := s.getSeriesForRange(fp, from, through) + if series == nil { + return nil, nopIter + } + cds, iter, err := series.preloadChunksForInstant(fp, from, through, s) + if err != nil { + s.quarantineSeries(fp, series.metric, err) + return nil, nopIter + } + return cds, iter } func (s *memorySeriesStorage) handleEvictList() { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 97fa450ba7..33d1a99183 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -489,12 +489,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - _, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) + _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -516,12 +516,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - _, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) + _, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) + _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -739,7 +739,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) // #1 Exactly on a sample. for i, expected := range samples { @@ -813,7 +813,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) b.ResetTimer() @@ -891,7 +891,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) // #1 Zero length interval at sample. for i, expected := range samples { @@ -1043,7 +1043,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) b.ResetTimer() @@ -1089,7 +1089,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) actual := it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1107,7 +1107,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop everything. s.maintainMemorySeries(fp, 100000) - _, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest) actual = it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000,