Implement getValueRangeAtIntervalOp for faster range queries.

This also short-circuits optimize() for now, since it is complex to implement
for the new operator, and ops generated by the query layer already fulfill the
needed invariants. We should still investigate later whether to completely
delete operator optimization code or extend it to support
getValueRangeAtIntervalOp operators.
This commit is contained in:
Julius Volz 2013-05-07 14:25:01 +02:00
parent 9726808a0e
commit d2da21121c
5 changed files with 297 additions and 35 deletions

View file

@ -132,9 +132,10 @@ func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interva
requestBuildTimer := queryStats.GetTimer(stats.ViewRequestBuildTime).Start()
viewBuilder := metric.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges {
// TODO: we should support GetMetricRangeAtInterval() or similar ops in the view builder.
for t := start; t.Before(end); t = t.Add(interval) {
viewBuilder.GetMetricRange(&fingerprint, t.Add(-rangeDuration), t)
if interval < rangeDuration {
viewBuilder.GetMetricRange(&fingerprint, start.Add(-rangeDuration), end)
} else {
viewBuilder.GetMetricRangeAtInterval(&fingerprint, start.Add(-rangeDuration), end, interval, rangeDuration)
}
}
for fingerprint := range analyzer.IntervalRanges {

View file

@ -25,7 +25,9 @@ type op interface {
// The time at which this operation starts.
StartsAt() time.Time
// Extract samples from stream of values and advance operation time.
ExtractSamples(in Values) (out Values)
ExtractSamples(Values) Values
// Return whether the operator has consumed all data it needs.
Consumed() bool
// Get current operation time or nil if no subsequent work associated with
// this operator remains.
CurrentTime() *time.Time
@ -124,11 +126,12 @@ func extractValuesAroundTime(t time.Time, in Values) (out Values) {
return
}
func (g getValuesAtTimeOp) CurrentTime() (currentTime *time.Time) {
if !g.consumed {
currentTime = &g.time
}
return
func (g getValuesAtTimeOp) CurrentTime() *time.Time {
return &g.time
}
func (g getValuesAtTimeOp) Consumed() bool {
return g.consumed
}
// Encapsulates getting values at a given interval over a duration.
@ -173,13 +176,14 @@ func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) {
return
}
func (g *getValuesAtIntervalOp) CurrentTime() (currentTime *time.Time) {
if g.from.After(g.through) {
return
}
func (g *getValuesAtIntervalOp) CurrentTime() *time.Time {
return &g.from
}
func (g *getValuesAtIntervalOp) Consumed() bool {
return g.from.After(g.through)
}
func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) {
switch o := op.(type) {
case *getValuesAtTimeOp:
@ -193,6 +197,7 @@ func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) {
return
}
// Encapsulates getting all values in a given range.
type getValuesAlongRangeOp struct {
from time.Time
through time.Time
@ -243,13 +248,14 @@ func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) {
return in[firstIdx:lastIdx]
}
func (g *getValuesAlongRangeOp) CurrentTime() (currentTime *time.Time) {
if g.from.After(g.through) {
return
}
func (g *getValuesAlongRangeOp) CurrentTime() *time.Time {
return &g.from
}
func (g *getValuesAlongRangeOp) Consumed() bool {
return g.from.After(g.through)
}
func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) {
switch o := op.(type) {
case *getValuesAtTimeOp:
@ -263,6 +269,86 @@ func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) {
return
}
// Encapsulates getting all values from ranges along intervals.
//
// Works just like getValuesAlongRangeOp, but when from > through, through is
// incremented by interval and from is reset to through-rangeDuration. Returns
// current time nil when from > totalThrough.
type getValueRangeAtIntervalOp struct {
rangeFrom time.Time
rangeThrough time.Time
rangeDuration time.Duration
interval time.Duration
through time.Time
}
func (o *getValueRangeAtIntervalOp) String() string {
return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", o.rangeDuration, o.rangeFrom, o.interval, o.through)
}
func (g *getValueRangeAtIntervalOp) StartsAt() time.Time {
return g.rangeFrom
}
func (g *getValueRangeAtIntervalOp) Through() time.Time {
panic("not implemented")
}
func (g *getValueRangeAtIntervalOp) advanceToNextInterval() {
g.rangeThrough = g.rangeThrough.Add(g.interval)
g.rangeFrom = g.rangeThrough.Add(-g.rangeDuration)
}
func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) {
if len(in) == 0 {
return
}
// Find the first sample where time >= g.from.
firstIdx := sort.Search(len(in), func(i int) bool {
return !in[i].Timestamp.Before(g.rangeFrom)
})
if firstIdx == len(in) {
// No samples at or after operator start time. This can only happen if we
// try applying the operator to a time after the last recorded sample. In
// this case, we're finished.
g.rangeFrom = g.through.Add(1)
return
}
// Find the first sample where time > g.rangeThrough.
lastIdx := sort.Search(len(in), func(i int) bool {
return in[i].Timestamp.After(g.rangeThrough)
})
// This only happens when there is only one sample and it is both after
// g.rangeFrom and after g.rangeThrough. In this case, both indexes are 0.
if lastIdx == firstIdx {
g.advanceToNextInterval()
return
}
lastSampleTime := in[lastIdx-1].Timestamp
// Sample times are stored with a maximum time resolution of one second, so
// we have to add exactly that to target the next chunk on the next op
// iteration.
g.rangeFrom = lastSampleTime.Add(time.Second)
if g.rangeFrom.After(g.rangeThrough) {
g.advanceToNextInterval()
}
return in[firstIdx:lastIdx]
}
func (g *getValueRangeAtIntervalOp) CurrentTime() *time.Time {
return &g.rangeFrom
}
func (g *getValueRangeAtIntervalOp) Consumed() bool {
return g.rangeFrom.After(g.through)
}
func (g *getValueRangeAtIntervalOp) GreedierThan(op op) bool {
panic("not implemented")
}
// Provides a collection of getMetricRangeOperation.
type getMetricRangeOperations []*getValuesAlongRangeOp

View file

@ -1633,17 +1633,15 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
for i, scenario := range scenarios {
actual := scenario.op.ExtractSamples(scenario.in)
if len(actual) != len(scenario.out) {
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), scenario.op)
t.Fatalf("%d. expected length %d, got %d", i, len(scenario.out), len(actual))
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual)
}
if len(scenario.in) < 1 {
continue
}
opTime := scenario.op.CurrentTime()
lastExtractedTime := scenario.out[len(scenario.out)-1].Timestamp
if opTime != nil && opTime.Before(lastExtractedTime) {
t.Fatalf("%d. expected op.CurrentTime() to be nil or after current chunk, %v, %v", i, scenario.op.CurrentTime(), scenario.out)
if !scenario.op.Consumed() && scenario.op.CurrentTime().Before(lastExtractedTime) {
t.Fatalf("%d. expected op to be consumed or with CurrentTime() after current chunk, %v, %v", i, scenario.op.CurrentTime(), scenario.out)
}
for j, out := range scenario.out {
@ -1819,8 +1817,164 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
for i, scenario := range scenarios {
actual := scenario.op.ExtractSamples(scenario.in)
if len(actual) != len(scenario.out) {
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), scenario.op)
t.Fatalf("%d. expected length %d, got %d", i, len(scenario.out), len(actual))
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual)
}
for j, out := range scenario.out {
if out != actual[j] {
t.Fatalf("%d. expected output %v, got %v", i, scenario.out, actual)
}
}
}
}
func TestGetValueRangeAtIntervalOp(t *testing.T) {
testOp := getValueRangeAtIntervalOp{
rangeFrom: testInstant.Add(-2 * time.Minute),
rangeThrough: testInstant,
rangeDuration: 2 * time.Minute,
interval: 10 * time.Minute,
through: testInstant.Add(20 * time.Minute),
}
var scenarios = []struct {
op getValueRangeAtIntervalOp
in model.Values
out model.Values
}{
// All values before the first range.
{
op: testOp,
in: model.Values{
{
Timestamp: testInstant.Add(-4 * time.Minute),
Value: 1,
},
{
Timestamp: testInstant.Add(-3 * time.Minute),
Value: 2,
},
},
out: model.Values{},
},
// Values starting before first range, ending after last.
{
op: testOp,
in: model.Values{
{
Timestamp: testInstant.Add(-4 * time.Minute),
Value: 1,
},
{
Timestamp: testInstant.Add(-3 * time.Minute),
Value: 2,
},
{
Timestamp: testInstant.Add(-2 * time.Minute),
Value: 3,
},
{
Timestamp: testInstant.Add(-1 * time.Minute),
Value: 4,
},
{
Timestamp: testInstant.Add(0 * time.Minute),
Value: 5,
},
{
Timestamp: testInstant.Add(5 * time.Minute),
Value: 6,
},
{
Timestamp: testInstant.Add(8 * time.Minute),
Value: 7,
},
{
Timestamp: testInstant.Add(9 * time.Minute),
Value: 8,
},
{
Timestamp: testInstant.Add(10 * time.Minute),
Value: 9,
},
{
Timestamp: testInstant.Add(15 * time.Minute),
Value: 10,
},
{
Timestamp: testInstant.Add(18 * time.Minute),
Value: 11,
},
{
Timestamp: testInstant.Add(19 * time.Minute),
Value: 12,
},
{
Timestamp: testInstant.Add(20 * time.Minute),
Value: 13,
},
{
Timestamp: testInstant.Add(21 * time.Minute),
Value: 14,
},
},
out: model.Values{
{
Timestamp: testInstant.Add(-2 * time.Minute),
Value: 3,
},
{
Timestamp: testInstant.Add(-1 * time.Minute),
Value: 4,
},
{
Timestamp: testInstant.Add(0 * time.Minute),
Value: 5,
},
{
Timestamp: testInstant.Add(8 * time.Minute),
Value: 7,
},
{
Timestamp: testInstant.Add(9 * time.Minute),
Value: 8,
},
{
Timestamp: testInstant.Add(10 * time.Minute),
Value: 9,
},
{
Timestamp: testInstant.Add(18 * time.Minute),
Value: 11,
},
{
Timestamp: testInstant.Add(19 * time.Minute),
Value: 12,
},
{
Timestamp: testInstant.Add(20 * time.Minute),
Value: 13,
},
},
},
// Values starting after last range.
{
op: testOp,
in: model.Values{
{
Timestamp: testInstant.Add(21 * time.Minute),
Value: 14,
},
},
out: model.Values{},
},
}
for i, scenario := range scenarios {
actual := model.Values{}
for !scenario.op.Consumed() {
actual = append(actual, scenario.op.ExtractSamples(scenario.in)...)
}
if len(actual) != len(scenario.out) {
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual)
}
for j, out := range scenario.out {
if !out.Equal(actual[j]) {

View file

@ -471,7 +471,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
currentChunk = currentChunk.TruncateBefore(*(op.CurrentTime()))
for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) {
for !op.Consumed() && !op.CurrentTime().After(targetTime) {
out = op.ExtractSamples(Values(currentChunk))
// Append the extracted samples to the materialized view.
@ -482,7 +482,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
// Throw away standing ops which are finished.
filteredOps := ops{}
for _, op := range standingOps {
if op.CurrentTime() != nil {
if !op.Consumed() {
filteredOps = append(filteredOps, op)
}
}

View file

@ -42,15 +42,15 @@ type viewRequestBuilder struct {
}
// Furnishes a ViewRequestBuilder for remarking what types of queries to perform.
func NewViewRequestBuilder() viewRequestBuilder {
return viewRequestBuilder{
func NewViewRequestBuilder() *viewRequestBuilder {
return &viewRequestBuilder{
operations: make(map[clientmodel.Fingerprint]ops),
}
}
// Gets for the given Fingerprint either the value at that time if there is an
// match or the one or two values adjacent thereto.
func (v viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) {
func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValuesAtTimeOp{
time: time,
@ -61,7 +61,7 @@ func (v viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint
// Gets for the given Fingerprint either the value at that interval from From
// through Through if there is an match or the one or two values adjacent
// for each point.
func (v viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) {
func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValuesAtIntervalOp{
from: from,
@ -71,9 +71,9 @@ func (v viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerp
v.operations[*fingerprint] = ops
}
// Gets for the given Fingerprint either the values that occur inclusively from
// From through Through.
func (v viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) {
// Gets for the given Fingerprint the values that occur inclusively from From
// through Through.
func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValuesAlongRangeOp{
from: from,
@ -82,16 +82,37 @@ func (v viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint,
v.operations[*fingerprint] = ops
}
// Gets value ranges at intervals for the given Fingerprint:
//
// |----| |----| |----| |----|
// ^ ^ ^ ^ ^ ^
// | \------------/ \----/ |
// from interval rangeDuration through
func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval, rangeDuration time.Duration) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValueRangeAtIntervalOp{
rangeFrom: from,
rangeThrough: from.Add(rangeDuration),
rangeDuration: rangeDuration,
interval: interval,
through: through,
})
v.operations[*fingerprint] = ops
}
// Emits the optimized scans that will occur in the data store. This
// effectively resets the ViewRequestBuilder back to a pristine state.
func (v viewRequestBuilder) ScanJobs() (j scanJobs) {
func (v *viewRequestBuilder) ScanJobs() (j scanJobs) {
for fingerprint, operations := range v.operations {
sort.Sort(startsAtSort{operations})
fpCopy := fingerprint
j = append(j, scanJob{
fingerprint: &fpCopy,
operations: optimize(operations),
// BUG: Evaluate whether we need to implement an optimize() working also
// for getValueRangeAtIntervalOp and use it here instead of just passing
// through the list of ops as-is.
operations: operations,
})
delete(v.operations, fingerprint)