Rework the way ranges and instants are handled

In a way, our instants were also ranges, just with the staleness delta
as range length. They are no treated equally, just that in one case,
the range length is set as range, in the other the staleness
delta. However, there are "real" instants where start and and time of
a query is the same. In those cases, we only want to return a single
value (the one closest before or at the equal start and end time). If
that value is the last sample in the series, odds are we have it
already in the series object. In that case, there is no need to pin or
load any chunks. A special singleSampleSeriesIterator is created for
that. This should greatly speed up instant queries as they happen
frequently for rule evaluations.
This commit is contained in:
beorn7 2016-02-19 18:35:30 +01:00
parent b876f8e6a5
commit 454ecf3f52
6 changed files with 118 additions and 49 deletions

View file

@ -41,14 +41,20 @@ type Analyzer struct {
// fingerprints. One of these structs is collected for each offset by the query
// analyzer.
type preloadTimes struct {
// Instants require single samples to be loaded along the entire query
// range, with intervals between the samples corresponding to the query
// resolution.
instants map[model.Fingerprint]struct{}
// Ranges require loading a range of samples at each resolution step,
// stretching backwards from the current evaluation timestamp. The length of
// the range into the past is given by the duration, as in "foo[5m]".
// Ranges require loading a range of samples. They can be triggered by
// two type of expressions: First a range expression AKA matrix
// selector, where the Duration in the ranges map is the length of the
// range in the range expression. Second an instant expression AKA
// vector selector, where the Duration in the ranges map is the
// StalenessDelta. In preloading, both types of expressions result in
// the same effect: Preload everything between the specified start time
// minus the Duration in the ranges map up to the specified end time.
ranges map[model.Fingerprint]time.Duration
// Instants require a single sample to be loaded. This only happens for
// instant expressions AKA vector selectors iff the specified start ond
// end time are the same, Thus, instants is only populated if start and
// end time are the same.
instants map[model.Fingerprint]struct{}
}
// Analyze the provided expression and attach metrics and fingerprints to data-selecting
@ -57,13 +63,15 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
a.offsetPreloadTimes = map[time.Duration]preloadTimes{}
getPreloadTimes := func(offset time.Duration) preloadTimes {
if _, ok := a.offsetPreloadTimes[offset]; !ok {
a.offsetPreloadTimes[offset] = preloadTimes{
instants: map[model.Fingerprint]struct{}{},
ranges: map[model.Fingerprint]time.Duration{},
}
if pt, ok := a.offsetPreloadTimes[offset]; ok {
return pt
}
return a.offsetPreloadTimes[offset]
pt := preloadTimes{
instants: map[model.Fingerprint]struct{}{},
ranges: map[model.Fingerprint]time.Duration{},
}
a.offsetPreloadTimes[offset] = pt
return pt
}
// Retrieve fingerprints and metrics for the required time range for
@ -76,11 +84,14 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
pt := getPreloadTimes(n.Offset)
for fp := range n.metrics {
// Only add the fingerprint to the instants if not yet present in the
// ranges. Ranges always contain more points and span more time than
// instants for the same offset.
if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges {
r, alreadyInRanges := pt.ranges[fp]
if a.Start.Equal(a.End) && !alreadyInRanges {
// A true instant, we only need one value.
pt.instants[fp] = struct{}{}
continue
}
if r < StalenessDelta {
pt.ranges[fp] = StalenessDelta
}
}
case *MatrixSelector:
@ -133,18 +144,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
if err = contextDone(ctx, env); err != nil {
return nil, err
}
startOfRange := start.Add(-rangeDuration)
if StalenessDelta > rangeDuration {
// Cover a weird corner case: The expression
// mixes up instants and ranges for the same
// series. We'll handle that over-all as
// range. But if the rangeDuration is smaller
// than the StalenessDelta, the range wouldn't
// cover everything potentially needed for the
// instant, so we have to extend startOfRange.
startOfRange = start.Add(-StalenessDelta)
}
iter, err := p.PreloadRange(fp, startOfRange, end)
iter, err := p.PreloadRange(fp, start.Add(-rangeDuration), end)
if err != nil {
return nil, err
}
@ -154,10 +154,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
if err = contextDone(ctx, env); err != nil {
return nil, err
}
// Need to look backwards by StalenessDelta but not
// forward because we always return the closest sample
// _before_ the reference time.
iter, err := p.PreloadRange(fp, start.Add(-StalenessDelta), end)
iter, err := p.PreloadInstant(fp, start, StalenessDelta)
if err != nil {
return nil, err
}

View file

@ -14,6 +14,8 @@
package local
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
@ -92,6 +94,10 @@ type Preloader interface {
fp model.Fingerprint,
from model.Time, through model.Time,
) (SeriesIterator, error)
PreloadInstant(
fp model.Fingerprint,
timestamp model.Time, stalenessDelta time.Duration,
) (SeriesIterator, error)
// Close unpins any previously requested series data from memory.
Close()
}

View file

@ -13,7 +13,11 @@
package local
import "github.com/prometheus/common/model"
import (
"time"
"github.com/prometheus/common/model"
)
// memorySeriesPreloader is a Preloader for the memorySeriesStorage.
type memorySeriesPreloader struct {
@ -26,7 +30,20 @@ func (p *memorySeriesPreloader) PreloadRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) (SeriesIterator, error) {
cds, iter, err := p.storage.preloadChunksForRange(fp, from, through)
cds, iter, err := p.storage.preloadChunksForRange(fp, from, through, false)
if err != nil {
return iter, err
}
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return iter, nil
}
// PreloadInstant implements Preloader
func (p *memorySeriesPreloader) PreloadInstant(
fp model.Fingerprint,
timestamp model.Time, stalenessDelta time.Duration,
) (SeriesIterator, error) {
cds, iter, err := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true)
if err != nil {
return iter, err
}

View file

@ -367,8 +367,9 @@ func (s *memorySeries) preloadChunks(
}
// newIterator returns a new SeriesIterator for the provided chunkDescs (which
// must be pinned). The caller must have locked the fingerprint of the
// memorySeries.
// must be pinned).
//
// The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator {
chunks := make([]chunk, 0, len(pinnedChunkDescs))
for _, cd := range pinnedChunkDescs {
@ -385,9 +386,27 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator
// preloadChunksForRange loads chunks for the given range from the persistence.
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) preloadChunksForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
fp model.Fingerprint, mss *memorySeriesStorage,
lastSampleOnly bool,
mss *memorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) {
// If we have to preload for only one sample, and we have a
// lastSamplePair in the series, and thas last samplePair is in the
// interval, just take it in a singleSampleSeriesIterator. No need to
// pin or load anything.
if lastSampleOnly {
lastSample := s.lastSamplePair()
if !through.Before(lastSample.Timestamp) &&
!from.After(lastSample.Timestamp) &&
lastSample != ZeroSamplePair {
iter := &boundedIterator{
it: &singleSampleSeriesIterator{samplePair: lastSample},
start: model.Now().Add(-mss.dropAfter),
}
return nil, iter, nil
}
}
firstChunkDescTime := model.Latest
if len(s.chunkDescs) > 0 {
firstChunkDescTime = s.chunkDescs[0].firstTime()
@ -619,6 +638,35 @@ func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator {
return chunkIt
}
// singleSampleSeriesIterator implements Series Iterator. It is a "shortcut
// iterator" that returns a single samplee only. The sample is saved in the
// iterator itself, so no chunks need to be pinned.
type singleSampleSeriesIterator struct {
samplePair model.SamplePair
}
// ValueAtTime implements SeriesIterator.
func (it *singleSampleSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
if it.samplePair.Timestamp.After(t) {
return ZeroSamplePair
}
return it.samplePair
}
// BoundaryValues implements SeriesIterator.
func (it *singleSampleSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair {
return it.RangeValues(in)
}
// RangeValues implements SeriesIterator.
func (it *singleSampleSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair {
if it.samplePair.Timestamp.After(in.NewestInclusive) ||
it.samplePair.Timestamp.Before(in.OldestInclusive) {
return []model.SamplePair{}
}
return []model.SamplePair{it.samplePair}
}
// nopSeriesIterator implements Series Iterator. It never returns any values.
type nopSeriesIterator struct{}

View file

@ -689,6 +689,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
func (s *memorySeriesStorage) preloadChunksForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
lastSampleOnly bool,
) ([]*chunkDesc, SeriesIterator, error) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
@ -713,7 +714,7 @@ func (s *memorySeriesStorage) preloadChunksForRange(
return nil, nopIter, nil
}
}
return series.preloadChunksForRange(from, through, fp, s)
return series.preloadChunksForRange(fp, from, through, lastSampleOnly, s)
}
func (s *memorySeriesStorage) handleEvictList() {

View file

@ -500,7 +500,7 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps2))
}
_, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
_, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -508,7 +508,7 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of samples: %d", len(vals))
}
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -533,7 +533,7 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps3))
}
_, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
_, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -541,7 +541,7 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of samples: %d", len(vals))
}
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -670,7 +670,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -747,7 +747,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
@ -828,7 +828,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -983,7 +983,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
@ -1032,7 +1032,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop ~half of the chunks.
s.maintainMemorySeries(fp, 10000)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -1053,7 +1053,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop everything.
s.maintainMemorySeries(fp, 100000)
_, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest)
_, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}