diff --git a/rules/ast/query_analyzer.go b/rules/ast/query_analyzer.go index e1830e1015..977dac240a 100644 --- a/rules/ast/query_analyzer.go +++ b/rules/ast/query_analyzer.go @@ -132,9 +132,10 @@ func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interva requestBuildTimer := queryStats.GetTimer(stats.ViewRequestBuildTime).Start() viewBuilder := metric.NewViewRequestBuilder() for fingerprint, rangeDuration := range analyzer.FullRanges { - // TODO: we should support GetMetricRangeAtInterval() or similar ops in the view builder. - for t := start; t.Before(end); t = t.Add(interval) { - viewBuilder.GetMetricRange(&fingerprint, t.Add(-rangeDuration), t) + if interval < rangeDuration { + viewBuilder.GetMetricRange(&fingerprint, start.Add(-rangeDuration), end) + } else { + viewBuilder.GetMetricRangeAtInterval(&fingerprint, start.Add(-rangeDuration), end, interval, rangeDuration) } } for fingerprint := range analyzer.IntervalRanges { diff --git a/storage/metric/operation.go b/storage/metric/operation.go index 2fef66a5c2..0ce9b9ecf9 100644 --- a/storage/metric/operation.go +++ b/storage/metric/operation.go @@ -25,7 +25,9 @@ type op interface { // The time at which this operation starts. StartsAt() time.Time // Extract samples from stream of values and advance operation time. - ExtractSamples(in Values) (out Values) + ExtractSamples(Values) Values + // Return whether the operator has consumed all data it needs. + Consumed() bool // Get current operation time or nil if no subsequent work associated with // this operator remains. CurrentTime() *time.Time @@ -124,11 +126,12 @@ func extractValuesAroundTime(t time.Time, in Values) (out Values) { return } -func (g getValuesAtTimeOp) CurrentTime() (currentTime *time.Time) { - if !g.consumed { - currentTime = &g.time - } - return +func (g getValuesAtTimeOp) CurrentTime() *time.Time { + return &g.time +} + +func (g getValuesAtTimeOp) Consumed() bool { + return g.consumed } // Encapsulates getting values at a given interval over a duration. @@ -173,13 +176,14 @@ func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) { return } -func (g *getValuesAtIntervalOp) CurrentTime() (currentTime *time.Time) { - if g.from.After(g.through) { - return - } +func (g *getValuesAtIntervalOp) CurrentTime() *time.Time { return &g.from } +func (g *getValuesAtIntervalOp) Consumed() bool { + return g.from.After(g.through) +} + func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) { switch o := op.(type) { case *getValuesAtTimeOp: @@ -193,6 +197,7 @@ func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) { return } +// Encapsulates getting all values in a given range. type getValuesAlongRangeOp struct { from time.Time through time.Time @@ -243,13 +248,14 @@ func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) { return in[firstIdx:lastIdx] } -func (g *getValuesAlongRangeOp) CurrentTime() (currentTime *time.Time) { - if g.from.After(g.through) { - return - } +func (g *getValuesAlongRangeOp) CurrentTime() *time.Time { return &g.from } +func (g *getValuesAlongRangeOp) Consumed() bool { + return g.from.After(g.through) +} + func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) { switch o := op.(type) { case *getValuesAtTimeOp: @@ -263,6 +269,86 @@ func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) { return } +// Encapsulates getting all values from ranges along intervals. +// +// Works just like getValuesAlongRangeOp, but when from > through, through is +// incremented by interval and from is reset to through-rangeDuration. Returns +// current time nil when from > totalThrough. +type getValueRangeAtIntervalOp struct { + rangeFrom time.Time + rangeThrough time.Time + rangeDuration time.Duration + interval time.Duration + through time.Time +} + +func (o *getValueRangeAtIntervalOp) String() string { + return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", o.rangeDuration, o.rangeFrom, o.interval, o.through) +} + +func (g *getValueRangeAtIntervalOp) StartsAt() time.Time { + return g.rangeFrom +} + +func (g *getValueRangeAtIntervalOp) Through() time.Time { + panic("not implemented") +} + +func (g *getValueRangeAtIntervalOp) advanceToNextInterval() { + g.rangeThrough = g.rangeThrough.Add(g.interval) + g.rangeFrom = g.rangeThrough.Add(-g.rangeDuration) +} + +func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) { + if len(in) == 0 { + return + } + // Find the first sample where time >= g.from. + firstIdx := sort.Search(len(in), func(i int) bool { + return !in[i].Timestamp.Before(g.rangeFrom) + }) + if firstIdx == len(in) { + // No samples at or after operator start time. This can only happen if we + // try applying the operator to a time after the last recorded sample. In + // this case, we're finished. + g.rangeFrom = g.through.Add(1) + return + } + + // Find the first sample where time > g.rangeThrough. + lastIdx := sort.Search(len(in), func(i int) bool { + return in[i].Timestamp.After(g.rangeThrough) + }) + // This only happens when there is only one sample and it is both after + // g.rangeFrom and after g.rangeThrough. In this case, both indexes are 0. + if lastIdx == firstIdx { + g.advanceToNextInterval() + return + } + + lastSampleTime := in[lastIdx-1].Timestamp + // Sample times are stored with a maximum time resolution of one second, so + // we have to add exactly that to target the next chunk on the next op + // iteration. + g.rangeFrom = lastSampleTime.Add(time.Second) + if g.rangeFrom.After(g.rangeThrough) { + g.advanceToNextInterval() + } + return in[firstIdx:lastIdx] +} + +func (g *getValueRangeAtIntervalOp) CurrentTime() *time.Time { + return &g.rangeFrom +} + +func (g *getValueRangeAtIntervalOp) Consumed() bool { + return g.rangeFrom.After(g.through) +} + +func (g *getValueRangeAtIntervalOp) GreedierThan(op op) bool { + panic("not implemented") +} + // Provides a collection of getMetricRangeOperation. type getMetricRangeOperations []*getValuesAlongRangeOp diff --git a/storage/metric/operation_test.go b/storage/metric/operation_test.go index ca0ee774fe..e39cbdeb09 100644 --- a/storage/metric/operation_test.go +++ b/storage/metric/operation_test.go @@ -1633,17 +1633,15 @@ func TestGetValuesAtIntervalOp(t *testing.T) { for i, scenario := range scenarios { actual := scenario.op.ExtractSamples(scenario.in) if len(actual) != len(scenario.out) { - t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), scenario.op) - t.Fatalf("%d. expected length %d, got %d", i, len(scenario.out), len(actual)) + t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual) } if len(scenario.in) < 1 { continue } - opTime := scenario.op.CurrentTime() lastExtractedTime := scenario.out[len(scenario.out)-1].Timestamp - if opTime != nil && opTime.Before(lastExtractedTime) { - t.Fatalf("%d. expected op.CurrentTime() to be nil or after current chunk, %v, %v", i, scenario.op.CurrentTime(), scenario.out) + if !scenario.op.Consumed() && scenario.op.CurrentTime().Before(lastExtractedTime) { + t.Fatalf("%d. expected op to be consumed or with CurrentTime() after current chunk, %v, %v", i, scenario.op.CurrentTime(), scenario.out) } for j, out := range scenario.out { @@ -1819,8 +1817,164 @@ func TestGetValuesAlongRangeOp(t *testing.T) { for i, scenario := range scenarios { actual := scenario.op.ExtractSamples(scenario.in) if len(actual) != len(scenario.out) { - t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), scenario.op) - t.Fatalf("%d. expected length %d, got %d", i, len(scenario.out), len(actual)) + t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual) + } + for j, out := range scenario.out { + if out != actual[j] { + t.Fatalf("%d. expected output %v, got %v", i, scenario.out, actual) + } + } + } +} + +func TestGetValueRangeAtIntervalOp(t *testing.T) { + testOp := getValueRangeAtIntervalOp{ + rangeFrom: testInstant.Add(-2 * time.Minute), + rangeThrough: testInstant, + rangeDuration: 2 * time.Minute, + interval: 10 * time.Minute, + through: testInstant.Add(20 * time.Minute), + } + + var scenarios = []struct { + op getValueRangeAtIntervalOp + in model.Values + out model.Values + }{ + // All values before the first range. + { + op: testOp, + in: model.Values{ + { + Timestamp: testInstant.Add(-4 * time.Minute), + Value: 1, + }, + { + Timestamp: testInstant.Add(-3 * time.Minute), + Value: 2, + }, + }, + out: model.Values{}, + }, + // Values starting before first range, ending after last. + { + op: testOp, + in: model.Values{ + { + Timestamp: testInstant.Add(-4 * time.Minute), + Value: 1, + }, + { + Timestamp: testInstant.Add(-3 * time.Minute), + Value: 2, + }, + { + Timestamp: testInstant.Add(-2 * time.Minute), + Value: 3, + }, + { + Timestamp: testInstant.Add(-1 * time.Minute), + Value: 4, + }, + { + Timestamp: testInstant.Add(0 * time.Minute), + Value: 5, + }, + { + Timestamp: testInstant.Add(5 * time.Minute), + Value: 6, + }, + { + Timestamp: testInstant.Add(8 * time.Minute), + Value: 7, + }, + { + Timestamp: testInstant.Add(9 * time.Minute), + Value: 8, + }, + { + Timestamp: testInstant.Add(10 * time.Minute), + Value: 9, + }, + { + Timestamp: testInstant.Add(15 * time.Minute), + Value: 10, + }, + { + Timestamp: testInstant.Add(18 * time.Minute), + Value: 11, + }, + { + Timestamp: testInstant.Add(19 * time.Minute), + Value: 12, + }, + { + Timestamp: testInstant.Add(20 * time.Minute), + Value: 13, + }, + { + Timestamp: testInstant.Add(21 * time.Minute), + Value: 14, + }, + }, + out: model.Values{ + { + Timestamp: testInstant.Add(-2 * time.Minute), + Value: 3, + }, + { + Timestamp: testInstant.Add(-1 * time.Minute), + Value: 4, + }, + { + Timestamp: testInstant.Add(0 * time.Minute), + Value: 5, + }, + { + Timestamp: testInstant.Add(8 * time.Minute), + Value: 7, + }, + { + Timestamp: testInstant.Add(9 * time.Minute), + Value: 8, + }, + { + Timestamp: testInstant.Add(10 * time.Minute), + Value: 9, + }, + { + Timestamp: testInstant.Add(18 * time.Minute), + Value: 11, + }, + { + Timestamp: testInstant.Add(19 * time.Minute), + Value: 12, + }, + { + Timestamp: testInstant.Add(20 * time.Minute), + Value: 13, + }, + }, + }, + // Values starting after last range. + { + op: testOp, + in: model.Values{ + { + Timestamp: testInstant.Add(21 * time.Minute), + Value: 14, + }, + }, + out: model.Values{}, + }, + } + for i, scenario := range scenarios { + actual := model.Values{} + for !scenario.op.Consumed() { + actual = append(actual, scenario.op.ExtractSamples(scenario.in)...) + } + if len(actual) != len(scenario.out) { + t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual) } for j, out := range scenario.out { if !out.Equal(actual[j]) { diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index a1bab4d0f6..d2105f2b2e 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -471,7 +471,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) { currentChunk = currentChunk.TruncateBefore(*(op.CurrentTime())) - for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) { + for !op.Consumed() && !op.CurrentTime().After(targetTime) { out = op.ExtractSamples(Values(currentChunk)) // Append the extracted samples to the materialized view. @@ -482,7 +482,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) { // Throw away standing ops which are finished. filteredOps := ops{} for _, op := range standingOps { - if op.CurrentTime() != nil { + if !op.Consumed() { filteredOps = append(filteredOps, op) } } diff --git a/storage/metric/view.go b/storage/metric/view.go index 80e2e92102..f2e1193e32 100644 --- a/storage/metric/view.go +++ b/storage/metric/view.go @@ -42,15 +42,15 @@ type viewRequestBuilder struct { } // Furnishes a ViewRequestBuilder for remarking what types of queries to perform. -func NewViewRequestBuilder() viewRequestBuilder { - return viewRequestBuilder{ +func NewViewRequestBuilder() *viewRequestBuilder { + return &viewRequestBuilder{ operations: make(map[clientmodel.Fingerprint]ops), } } // Gets for the given Fingerprint either the value at that time if there is an // match or the one or two values adjacent thereto. -func (v viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) { +func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) { ops := v.operations[*fingerprint] ops = append(ops, &getValuesAtTimeOp{ time: time, @@ -61,7 +61,7 @@ func (v viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint // Gets for the given Fingerprint either the value at that interval from From // through Through if there is an match or the one or two values adjacent // for each point. -func (v viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) { +func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) { ops := v.operations[*fingerprint] ops = append(ops, &getValuesAtIntervalOp{ from: from, @@ -71,9 +71,9 @@ func (v viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerp v.operations[*fingerprint] = ops } -// Gets for the given Fingerprint either the values that occur inclusively from -// From through Through. -func (v viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) { +// Gets for the given Fingerprint the values that occur inclusively from From +// through Through. +func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) { ops := v.operations[*fingerprint] ops = append(ops, &getValuesAlongRangeOp{ from: from, @@ -82,16 +82,37 @@ func (v viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, v.operations[*fingerprint] = ops } +// Gets value ranges at intervals for the given Fingerprint: +// +// |----| |----| |----| |----| +// ^ ^ ^ ^ ^ ^ +// | \------------/ \----/ | +// from interval rangeDuration through +func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval, rangeDuration time.Duration) { + ops := v.operations[*fingerprint] + ops = append(ops, &getValueRangeAtIntervalOp{ + rangeFrom: from, + rangeThrough: from.Add(rangeDuration), + rangeDuration: rangeDuration, + interval: interval, + through: through, + }) + v.operations[*fingerprint] = ops +} + // Emits the optimized scans that will occur in the data store. This // effectively resets the ViewRequestBuilder back to a pristine state. -func (v viewRequestBuilder) ScanJobs() (j scanJobs) { +func (v *viewRequestBuilder) ScanJobs() (j scanJobs) { for fingerprint, operations := range v.operations { sort.Sort(startsAtSort{operations}) fpCopy := fingerprint j = append(j, scanJob{ fingerprint: &fpCopy, - operations: optimize(operations), + // BUG: Evaluate whether we need to implement an optimize() working also + // for getValueRangeAtIntervalOp and use it here instead of just passing + // through the list of ops as-is. + operations: operations, }) delete(v.operations, fingerprint)