diff --git a/promql/functions.go b/promql/functions.go index 0d3ec41951..433c06f023 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -548,7 +548,7 @@ func linearRegression(samples []model.SamplePair, interceptTime model.Time) (slo slope = covXY / varX intercept = sumY/n - slope*sumX/n - return + return slope, intercept } // === deriv(node model.ValMatrix) Vector === diff --git a/storage/local/chunk.go b/storage/local/chunk.go index b843fa3a71..2e9fcf4cff 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -260,42 +260,52 @@ type chunk interface { // A chunkIterator enables efficient access to the content of a chunk. It is // generally not safe to use a chunkIterator concurrently with or after chunk -// mutation. The error returned by any of the methods is always the last error -// encountered by the iterator, i.e. once an error has been encountered, no -// method will ever return a nil error again. In general, errors signal data -// corruption in the chunk and require quarantining. +// mutation. type chunkIterator interface { // Gets the last timestamp in the chunk. lastTimestamp() (model.Time, error) - // Gets the value that is closest before the given time. In case a value - // exists at precisely the given time, that value is returned. If no - // applicable value exists, ZeroSamplePair is returned. - valueAtOrBeforeTime(model.Time) (model.SamplePair, error) - // Gets all values contained within a given interval. - rangeValues(metric.Interval) ([]model.SamplePair, error) // Whether a given timestamp is contained between first and last value // in the chunk. contains(model.Time) (bool, error) - // scan, value, and err implement a bufio.Scanner-like interface. The - // scan method advances the iterator to the next value in the chunk and - // returns true if that worked. In that case, the value method will - // return the sample pair the iterator has advanced to. If the scan - // method returns false, either an error has occured or the end of the - // chunk has been reached. In the former case, the err method will - // return the error. In the latter case, the err method will return nil. - // Upon creation, the iterator is at position "minus one". After the - // first scan call, value will return the 1st value in the - // chunk. valueAtOrBeforeTime and rangeValues all modify the iterator - // position, too. They behave as if their return values were retrieved - // after a scan call, i.e. calling the value or err methods after a call - // to those methods will retrieve the same return value again (or the - // last value in the range in case of rangeValues), and subsequent scan - // calls will advance the iterator from there. + // Scans the next value in the chunk. Directly after the iterator has + // been created, the next value is the first value in the + // chunk. Otherwise, it is the value following the last value scanned or + // found (by one of the find... methods). Returns false if either the + // end of the chunk is reached or an error has occurred. scan() bool + // Finds the most recent value at or before the provided time. Returns + // false if either the chunk contains no value at or before the provided + // time, or an error has occurred. + findAtOrBefore(model.Time) bool + // Finds the oldest value at or after the provided time. Returns false + // if either the chunk contains no value at or after the provided time, + // or an error has occurred. + findAtOrAfter(model.Time) bool + // Returns the last value scanned (by the scan method) or found (by one + // of the find... methods). It returns ZeroSamplePair before any of + // those methods were called. value() model.SamplePair + // Returns the last error encountered. In general, an error signal data + // corruption in the chunk and requires quarantining. err() error } +// rangeValues is a utility function that retrieves all values within the given +// range from a chunkIterator. +func rangeValues(it chunkIterator, in metric.Interval) ([]model.SamplePair, error) { + result := []model.SamplePair{} + if !it.findAtOrAfter(in.OldestInclusive) { + return result, it.err() + } + for !it.value().Timestamp.After(in.NewestInclusive) { + result = append(result, it.value()) + if !it.scan() { + break + } + } + return result, it.err() +} + func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { chunkOps.WithLabelValues(transcode).Inc() @@ -374,46 +384,6 @@ func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) { return it.acc.timestampAtIndex(it.len - 1), it.acc.err() } -// valueAtOrBeforeTime implements chunkIterator. -func (it *indexAccessingChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { - i := sort.Search(it.len, func(i int) bool { - return it.acc.timestampAtIndex(i).After(t) - }) - if i == 0 || it.acc.err() != nil { - return ZeroSamplePair, it.acc.err() - } - it.pos = i - 1 - it.lastValue = model.SamplePair{ - Timestamp: it.acc.timestampAtIndex(i - 1), - Value: it.acc.sampleValueAtIndex(i - 1), - } - return it.lastValue, it.acc.err() -} - -// rangeValues implements chunkIterator. -func (it *indexAccessingChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { - oldest := sort.Search(it.len, func(i int) bool { - return !it.acc.timestampAtIndex(i).Before(in.OldestInclusive) - }) - newest := sort.Search(it.len, func(i int) bool { - return it.acc.timestampAtIndex(i).After(in.NewestInclusive) - }) - if oldest == it.len || it.acc.err() != nil { - return nil, it.acc.err() - } - - result := make([]model.SamplePair, 0, newest-oldest) - for i := oldest; i < newest; i++ { - it.pos = i - it.lastValue = model.SamplePair{ - Timestamp: it.acc.timestampAtIndex(i), - Value: it.acc.sampleValueAtIndex(i), - } - result = append(result, it.lastValue) - } - return result, it.acc.err() -} - // contains implements chunkIterator. func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) { return !t.Before(it.acc.timestampAtIndex(0)) && @@ -433,6 +403,38 @@ func (it *indexAccessingChunkIterator) scan() bool { return it.acc.err() == nil } +// findAtOrBefore implements chunkIterator. +func (it *indexAccessingChunkIterator) findAtOrBefore(t model.Time) bool { + i := sort.Search(it.len, func(i int) bool { + return it.acc.timestampAtIndex(i).After(t) + }) + if i == 0 || it.acc.err() != nil { + return false + } + it.pos = i - 1 + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i - 1), + Value: it.acc.sampleValueAtIndex(i - 1), + } + return true +} + +// findAtOrAfter implements chunkIterator. +func (it *indexAccessingChunkIterator) findAtOrAfter(t model.Time) bool { + i := sort.Search(it.len, func(i int) bool { + return !it.acc.timestampAtIndex(i).Before(t) + }) + if i == it.len || it.acc.err() != nil { + return false + } + it.pos = i + it.lastValue = model.SamplePair{ + Timestamp: it.acc.timestampAtIndex(i), + Value: it.acc.sampleValueAtIndex(i), + } + return true +} + // value implements chunkIterator. func (it *indexAccessingChunkIterator) value() model.SamplePair { return it.lastValue diff --git a/storage/local/preload.go b/storage/local/preload.go index b0113bd6be..fb6a21f64b 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -30,7 +30,7 @@ func (p *memorySeriesPreloader) PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, ) SeriesIterator { - cds, iter := p.storage.preloadChunksForRange(fp, from, through, false) + cds, iter := p.storage.preloadChunksForRange(fp, from, through) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) return iter } @@ -40,7 +40,7 @@ func (p *memorySeriesPreloader) PreloadInstant( fp model.Fingerprint, timestamp model.Time, stalenessDelta time.Duration, ) SeriesIterator { - cds, iter := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true) + cds, iter := p.storage.preloadChunksForInstant(fp, timestamp.Add(-stalenessDelta), timestamp) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) return iter } diff --git a/storage/local/series.go b/storage/local/series.go index f76d5ee27b..0dcad859a8 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -399,30 +399,40 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine fun } } +// preloadChunksForInstant preloads chunks for the latest value in the given +// range. If the last sample saved in the memorySeries itself is the latest +// value in the given range, it will in fact preload zero chunks and just take +// that value. +func (s *memorySeries) preloadChunksForInstant( + fp model.Fingerprint, + from model.Time, through model.Time, + mss *memorySeriesStorage, +) ([]*chunkDesc, SeriesIterator, error) { + // If we have a lastSamplePair in the series, and thas last samplePair + // is in the interval, just take it in a singleSampleSeriesIterator. No + // need to pin or load anything. + lastSample := s.lastSamplePair() + if !through.Before(lastSample.Timestamp) && + !from.After(lastSample.Timestamp) && + lastSample != ZeroSamplePair { + iter := &boundedIterator{ + it: &singleSampleSeriesIterator{samplePair: lastSample}, + start: model.Now().Add(-mss.dropAfter), + } + return nil, iter, nil + } + // If we are here, we are out of luck and have to delegate to the more + // expensive method. + return s.preloadChunksForRange(fp, from, through, mss) +} + // preloadChunksForRange loads chunks for the given range from the persistence. // The caller must have locked the fingerprint of the series. func (s *memorySeries) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, - lastSampleOnly bool, mss *memorySeriesStorage, ) ([]*chunkDesc, SeriesIterator, error) { - // If we have to preload for only one sample, and we have a - // lastSamplePair in the series, and thas last samplePair is in the - // interval, just take it in a singleSampleSeriesIterator. No need to - // pin or load anything. - if lastSampleOnly { - lastSample := s.lastSamplePair() - if !through.Before(lastSample.Timestamp) && - !from.After(lastSample.Timestamp) && - lastSample != ZeroSamplePair { - iter := &boundedIterator{ - it: &singleSampleSeriesIterator{samplePair: lastSample}, - start: model.Now().Add(-mss.dropAfter), - } - return nil, iter, nil - } - } firstChunkDescTime := model.Latest if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].firstTime() @@ -547,12 +557,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa return ZeroSamplePair } if containsT { - value, err := it.chunkIt.valueAtOrBeforeTime(t) - if err != nil { - it.quarantine(err) - return ZeroSamplePair + if it.chunkIt.findAtOrBefore(t) { + return it.chunkIt.value() } - return value + if it.chunkIt.err() != nil { + it.quarantine(it.chunkIt.err()) + } + return ZeroSamplePair } } @@ -570,12 +581,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa return ZeroSamplePair } it.chunkIt = it.chunkIterator(l - i) - value, err := it.chunkIt.valueAtOrBeforeTime(t) - if err != nil { - it.quarantine(err) - return ZeroSamplePair + if it.chunkIt.findAtOrBefore(t) { + return it.chunkIt.value() } - return value + if it.chunkIt.err() != nil { + it.quarantine(it.chunkIt.err()) + } + return ZeroSamplePair } // RangeValues implements SeriesIterator. @@ -602,7 +614,7 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa if c.firstTime().After(in.NewestInclusive) { break } - chValues, err := it.chunkIterator(i + j).rangeValues(in) + chValues, err := rangeValues(it.chunkIterator(i+j), in) if err != nil { it.quarantine(err) return nil diff --git a/storage/local/storage.go b/storage/local/storage.go index e6527085be..bc380bd6bd 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -736,46 +736,76 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me return series, nil } +// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +func (s *memorySeriesStorage) getSeriesForRange( + fp model.Fingerprint, + from model.Time, through model.Time, +) *memorySeries { + series, ok := s.fpToSeries.get(fp) + if ok { + return series + } + has, first, last, err := s.persistence.hasArchivedMetric(fp) + if err != nil { + log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + return nil + } + if !has { + s.invalidPreloadRequestsCount.Inc() + return nil + } + if last.Before(from) || first.After(through) { + return nil + } + metric, err := s.persistence.archivedMetric(fp) + if err != nil { + log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + return nil + } + series, err = s.getOrCreateSeries(fp, metric) + if err != nil { + // getOrCreateSeries took care of quarantining already. + return nil + } + return series +} + func (s *memorySeriesStorage) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, - lastSampleOnly bool, ) ([]*chunkDesc, SeriesIterator) { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series, ok := s.fpToSeries.get(fp) - if !ok { - has, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") - return nil, nopIter - } - if !has { - s.invalidPreloadRequestsCount.Inc() - return nil, nopIter - } - if from.Before(last) && through.After(first) { - metric, err := s.persistence.archivedMetric(fp) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") - return nil, nopIter - } - series, err = s.getOrCreateSeries(fp, metric) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Error while retrieving series.") - return nil, nopIter - } - } else { - return nil, nopIter - } + series := s.getSeriesForRange(fp, from, through) + if series == nil { + return nil, nopIter } - cds, it, err := series.preloadChunksForRange(fp, from, through, lastSampleOnly, s) + cds, iter, err := series.preloadChunksForRange(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) return nil, nopIter } - return cds, it + return cds, iter +} + +func (s *memorySeriesStorage) preloadChunksForInstant( + fp model.Fingerprint, + from model.Time, through model.Time, +) ([]*chunkDesc, SeriesIterator) { + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + + series := s.getSeriesForRange(fp, from, through) + if series == nil { + return nil, nopIter + } + cds, iter, err := series.preloadChunksForInstant(fp, from, through, s) + if err != nil { + s.quarantineSeries(fp, series.metric, err) + return nil, nopIter + } + return cds, iter } func (s *memorySeriesStorage) handleEvictList() { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index ab5c2a9ed5..01f30c28b1 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -495,12 +495,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - _, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) + _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -522,12 +522,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - _, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) + _, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) + _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -746,7 +746,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) // #1 Exactly on a sample. for i, expected := range samples { @@ -820,7 +820,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) b.ResetTimer() @@ -898,7 +898,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) // #1 Zero length interval at sample. for i, expected := range samples { @@ -1050,7 +1050,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) b.ResetTimer() @@ -1096,7 +1096,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) actual := it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1114,7 +1114,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop everything. s.maintainMemorySeries(fp, 100000) - _, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) + _, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest) actual = it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000,