Remove the multi-op-per-fingerprint capability.

Currently, rendering a view is capable of handling multiple ops for
the same fingerprint efficiently. However, this capability requires a
lot of complexity in the code, which we are not using at all because
the way we assemble a viewRequest will never have more than one
operation per fingerprint.

This commit weeds out the said capability, along with all the code
needed for it. It is still possible to have more than one operation
for the same fingerprint, it will just be handled in a less efficient
way (as proven by the unit tests).

As a result, scanjob.go could be removed entirely.

This commit also contains a few related refactorings and removals of
dead code in operation.go, view,go, and freelist.go. Also, the
docstrings received some love.

Change-Id: I032b976e0880151c3f3fdb3234fb65e484f0e2e5
This commit is contained in:
Bjoern Rabenstein 2014-02-27 20:09:00 +01:00
parent 817d9b0e97
commit 9ea9189dd1
8 changed files with 389 additions and 2072 deletions

View file

@ -14,8 +14,10 @@
package metric
import (
"time"
"github.com/prometheus/prometheus/utility"
clientmodel "github.com/prometheus/client_golang/model"
dto "github.com/prometheus/prometheus/model/generated"
)
@ -81,12 +83,17 @@ type valueAtTimeList struct {
l utility.FreeList
}
func (l *valueAtTimeList) Get() (*getValuesAtTimeOp, bool) {
if v, ok := l.l.Get(); ok {
return v.(*getValuesAtTimeOp), ok
func (l *valueAtTimeList) Get(fp *clientmodel.Fingerprint, time clientmodel.Timestamp) *getValuesAtTimeOp {
var op *getValuesAtTimeOp
v, ok := l.l.Get()
if ok {
op = v.(*getValuesAtTimeOp)
} else {
op = &getValuesAtTimeOp{}
}
return &getValuesAtTimeOp{}, false
op.fp = *fp
op.current = time
return op
}
var pGetValuesAtTimeOp = &getValuesAtTimeOp{}
@ -107,12 +114,19 @@ type valueAtIntervalList struct {
l utility.FreeList
}
func (l *valueAtIntervalList) Get() (*getValuesAtIntervalOp, bool) {
if v, ok := l.l.Get(); ok {
return v.(*getValuesAtIntervalOp), ok
func (l *valueAtIntervalList) Get(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) *getValuesAtIntervalOp {
var op *getValuesAtIntervalOp
v, ok := l.l.Get()
if ok {
op = v.(*getValuesAtIntervalOp)
} else {
op = &getValuesAtIntervalOp{}
}
return &getValuesAtIntervalOp{}, false
op.fp = *fp
op.current = from
op.through = through
op.interval = interval
return op
}
var pGetValuesAtIntervalOp = &getValuesAtIntervalOp{}
@ -133,12 +147,18 @@ type valueAlongRangeList struct {
l utility.FreeList
}
func (l *valueAlongRangeList) Get() (*getValuesAlongRangeOp, bool) {
if v, ok := l.l.Get(); ok {
return v.(*getValuesAlongRangeOp), ok
func (l *valueAlongRangeList) Get(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp) *getValuesAlongRangeOp {
var op *getValuesAlongRangeOp
v, ok := l.l.Get()
if ok {
op = v.(*getValuesAlongRangeOp)
} else {
op = &getValuesAlongRangeOp{}
}
return &getValuesAlongRangeOp{}, false
op.fp = *fp
op.current = from
op.through = through
return op
}
var pGetValuesAlongRangeOp = &getValuesAlongRangeOp{}
@ -159,12 +179,21 @@ type valueAtIntervalAlongRangeList struct {
l utility.FreeList
}
func (l *valueAtIntervalAlongRangeList) Get() (*getValueRangeAtIntervalOp, bool) {
if v, ok := l.l.Get(); ok {
return v.(*getValueRangeAtIntervalOp), ok
func (l *valueAtIntervalAlongRangeList) Get(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) *getValueRangeAtIntervalOp {
var op *getValueRangeAtIntervalOp
v, ok := l.l.Get()
if ok {
op = v.(*getValueRangeAtIntervalOp)
} else {
op = &getValueRangeAtIntervalOp{}
}
return &getValueRangeAtIntervalOp{}, false
op.fp = *fp
op.current = from
op.rangeThrough = from.Add(rangeDuration)
op.rangeDuration = rangeDuration
op.interval = interval
op.through = through
return op
}
var pGetValueRangeAtIntervalOp = &getValueRangeAtIntervalOp{}

View file

@ -15,7 +15,6 @@ package metric
import (
"fmt"
"math"
"sort"
"time"
@ -24,117 +23,116 @@ import (
// op encapsulates a primitive query operation.
type op interface {
// The time at which this operation starts.
StartsAt() clientmodel.Timestamp
// Extract samples from stream of values and advance operation time.
// Fingerprint returns the fingerprint of the metric this operation
// operates on.
Fingerprint() *clientmodel.Fingerprint
// ExtractSamples extracts samples from a stream of values and advances
// the operation time.
ExtractSamples(Values) Values
// Return whether the operator has consumed all data it needs.
// Consumed returns whether the operator has consumed all data it needs.
Consumed() bool
// Get current operation time or nil if no subsequent work associated
// with this operator remains.
// CurrentTime gets the current operation time. In a newly created op,
// this is the starting time of the operation. During ongoing execution
// of the op, the current time is advanced accordingly. Once no
// subsequent work associated with the operation remains, nil is
// returned.
CurrentTime() clientmodel.Timestamp
// GreedierThan indicates whether this present operation should take
// precedence over the other operation due to greediness.
//
// A critical assumption is that this operator and the other occur at
// the same time: this.StartsAt().Equal(op.StartsAt()).
GreedierThan(op) bool
}
// Provides a sortable collection of operations.
// durationOperator encapsulates a general operation that occurs over a
// duration.
type durationOperator interface {
op
Through() clientmodel.Timestamp
}
// ops is a heap of operations, primary sorting key is the fingerprint.
type ops []op
// Len implements sort.Interface and heap.Interface.
func (o ops) Len() int {
return len(o)
}
// startsAtSort implements sort.Interface and allows operator to be sorted in
// chronological order by when they start.
type startsAtSort struct {
// Less implements sort.Interface and heap.Interface. It compares the
// fingerprints. If they are equal, the comparison is delegated to
// currentTimeSort.
func (o ops) Less(i, j int) bool {
fpi := o[i].Fingerprint()
fpj := o[j].Fingerprint()
if fpi.Equal(fpj) {
return currentTimeSort{o}.Less(i, j)
}
return fpi.Less(fpj)
}
// Swap implements sort.Interface and heap.Interface.
func (o ops) Swap(i, j int) {
o[i], o[j] = o[j], o[i]
}
// Push implements heap.Interface.
func (o *ops) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's
// length, not just its contents.
*o = append(*o, x.(op))
}
// Push implements heap.Interface.
func (o *ops) Pop() interface{} {
old := *o
n := len(old)
x := old[n-1]
*o = old[0 : n-1]
return x
}
// currentTimeSort is a wrapper for ops with customized sorting order.
type currentTimeSort struct {
ops
}
func (s startsAtSort) Less(i, j int) bool {
return s.ops[i].StartsAt().Before(s.ops[j].StartsAt())
// currentTimeSort implements sort.Interface and sorts the operations in
// chronological order by their current time.
func (s currentTimeSort) Less(i, j int) bool {
return s.ops[i].CurrentTime().Before(s.ops[j].CurrentTime())
}
func (o ops) Swap(i, j int) {
o[i], o[j] = o[j], o[i]
// baseOp contains the implementations and fields shared between different op
// types.
type baseOp struct {
fp clientmodel.Fingerprint
current clientmodel.Timestamp
}
func (g *baseOp) Fingerprint() *clientmodel.Fingerprint {
return &g.fp
}
func (g *baseOp) CurrentTime() clientmodel.Timestamp {
return g.current
}
// getValuesAtTimeOp encapsulates getting values at or adjacent to a specific
// time.
type getValuesAtTimeOp struct {
time clientmodel.Timestamp
baseOp
consumed bool
}
func (g *getValuesAtTimeOp) String() string {
return fmt.Sprintf("getValuesAtTimeOp at %s", g.time)
}
func (g *getValuesAtTimeOp) StartsAt() clientmodel.Timestamp {
return g.time
return fmt.Sprintf("getValuesAtTimeOp at %s", g.current)
}
func (g *getValuesAtTimeOp) ExtractSamples(in Values) (out Values) {
if len(in) == 0 {
return
}
out = extractValuesAroundTime(g.time, in)
out = extractValuesAroundTime(g.current, in)
g.consumed = true
return
}
func (g *getValuesAtTimeOp) GreedierThan(op op) (superior bool) {
switch op.(type) {
case *getValuesAtTimeOp:
superior = true
case durationOperator:
superior = false
default:
panic("unknown operation")
}
return
}
// extractValuesAroundTime searches for the provided time in the list of
// available samples and emits a slice containing the data points that
// are adjacent to it.
//
// An assumption of this is that the provided samples are already sorted!
func extractValuesAroundTime(t clientmodel.Timestamp, in Values) (out Values) {
i := sort.Search(len(in), func(i int) bool {
return !in[i].Timestamp.Before(t)
})
if i == len(in) {
// Target time is past the end, return only the last sample.
out = in[len(in)-1:]
} else {
if in[i].Timestamp.Equal(t) && len(in) > i+1 {
// We hit exactly the current sample time. Very unlikely
// in practice. Return only the current sample.
out = in[i : i+1]
} else {
if i == 0 {
// We hit before the first sample time. Return
// only the first sample.
out = in[0:1]
} else {
// We hit between two samples. Return both
// surrounding samples.
out = in[i-1 : i+1]
}
}
}
return
}
func (g getValuesAtTimeOp) CurrentTime() clientmodel.Timestamp {
return g.time
}
func (g getValuesAtTimeOp) Consumed() bool {
return g.consumed
}
@ -142,17 +140,13 @@ func (g getValuesAtTimeOp) Consumed() bool {
// getValuesAtIntervalOp encapsulates getting values at a given interval over a
// duration.
type getValuesAtIntervalOp struct {
from clientmodel.Timestamp
baseOp
through clientmodel.Timestamp
interval time.Duration
}
func (g *getValuesAtIntervalOp) String() string {
return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", g.from, g.interval, g.through)
}
func (g *getValuesAtIntervalOp) StartsAt() clientmodel.Timestamp {
return g.from
return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", g.current, g.interval, g.through)
}
func (g *getValuesAtIntervalOp) Through() clientmodel.Timestamp {
@ -165,56 +159,36 @@ func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) {
}
lastChunkTime := in[len(in)-1].Timestamp
for len(in) > 0 {
out = append(out, extractValuesAroundTime(g.from, in)...)
out = append(out, extractValuesAroundTime(g.current, in)...)
lastExtractedTime := out[len(out)-1].Timestamp
in = in.TruncateBefore(lastExtractedTime.Add(clientmodel.MinimumTick))
g.from = g.from.Add(g.interval)
for !g.from.After(lastExtractedTime) {
g.from = g.from.Add(g.interval)
in = in.TruncateBefore(lastExtractedTime.Add(
clientmodel.MinimumTick))
g.current = g.current.Add(g.interval)
for !g.current.After(lastExtractedTime) {
g.current = g.current.Add(g.interval)
}
if lastExtractedTime.Equal(lastChunkTime) {
break
}
if g.from.After(g.through) {
if g.current.After(g.through) {
break
}
}
return
}
func (g *getValuesAtIntervalOp) CurrentTime() clientmodel.Timestamp {
return g.from
}
func (g *getValuesAtIntervalOp) Consumed() bool {
return g.from.After(g.through)
}
func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) {
switch o := op.(type) {
case *getValuesAtTimeOp:
superior = true
case durationOperator:
superior = g.Through().After(o.Through())
default:
panic("unknown operation")
}
return
return g.current.After(g.through)
}
// getValuesAlongRangeOp encapsulates getting all values in a given range.
type getValuesAlongRangeOp struct {
from clientmodel.Timestamp
baseOp
through clientmodel.Timestamp
}
func (g *getValuesAlongRangeOp) String() string {
return fmt.Sprintf("getValuesAlongRangeOp from %s through %s", g.from, g.through)
}
func (g *getValuesAlongRangeOp) StartsAt() clientmodel.Timestamp {
return g.from
return fmt.Sprintf("getValuesAlongRangeOp from %s through %s", g.current, g.through)
}
func (g *getValuesAlongRangeOp) Through() clientmodel.Timestamp {
@ -225,15 +199,15 @@ func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) {
if len(in) == 0 {
return
}
// Find the first sample where time >= g.from.
// Find the first sample where time >= g.current.
firstIdx := sort.Search(len(in), func(i int) bool {
return !in[i].Timestamp.Before(g.from)
return !in[i].Timestamp.Before(g.current)
})
if firstIdx == len(in) {
// No samples at or after operator start time. This can only happen if we
// try applying the operator to a time after the last recorded sample. In
// this case, we're finished.
g.from = g.through.Add(clientmodel.MinimumTick)
// No samples at or after operator start time. This can only
// happen if we try applying the operator to a time after the
// last recorded sample. In this case, we're finished.
g.current = g.through.Add(clientmodel.MinimumTick)
return
}
@ -242,37 +216,20 @@ func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) {
return in[i].Timestamp.After(g.through)
})
if lastIdx == firstIdx {
g.from = g.through.Add(clientmodel.MinimumTick)
g.current = g.through.Add(clientmodel.MinimumTick)
return
}
lastSampleTime := in[lastIdx-1].Timestamp
// Sample times are stored with a maximum time resolution of one second, so
// we have to add exactly that to target the next chunk on the next op
// iteration.
g.from = lastSampleTime.Add(time.Second)
// Sample times are stored with a maximum time resolution of one second,
// so we have to add exactly that to target the next chunk on the next
// op iteration.
g.current = lastSampleTime.Add(time.Second)
return in[firstIdx:lastIdx]
}
func (g *getValuesAlongRangeOp) CurrentTime() clientmodel.Timestamp {
return g.from
}
func (g *getValuesAlongRangeOp) Consumed() bool {
return g.from.After(g.through)
}
func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) {
switch o := op.(type) {
case *getValuesAtTimeOp:
superior = true
case durationOperator:
superior = g.Through().After(o.Through())
default:
panic("unknown operation")
}
return
return g.current.After(g.through)
}
// getValueRangeAtIntervalOp encapsulates getting all values from ranges along
@ -282,7 +239,7 @@ func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) {
// incremented by interval and from is reset to through-rangeDuration. Returns
// current time nil when from > totalThrough.
type getValueRangeAtIntervalOp struct {
rangeFrom clientmodel.Timestamp
baseOp
rangeThrough clientmodel.Timestamp
rangeDuration time.Duration
interval time.Duration
@ -290,11 +247,7 @@ type getValueRangeAtIntervalOp struct {
}
func (g *getValueRangeAtIntervalOp) String() string {
return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", g.rangeDuration, g.rangeFrom, g.interval, g.through)
}
func (g *getValueRangeAtIntervalOp) StartsAt() clientmodel.Timestamp {
return g.rangeFrom
return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", g.rangeDuration, g.current, g.interval, g.through)
}
func (g *getValueRangeAtIntervalOp) Through() clientmodel.Timestamp {
@ -303,22 +256,22 @@ func (g *getValueRangeAtIntervalOp) Through() clientmodel.Timestamp {
func (g *getValueRangeAtIntervalOp) advanceToNextInterval() {
g.rangeThrough = g.rangeThrough.Add(g.interval)
g.rangeFrom = g.rangeThrough.Add(-g.rangeDuration)
g.current = g.rangeThrough.Add(-g.rangeDuration)
}
func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) {
if len(in) == 0 {
return
}
// Find the first sample where time >= g.from.
// Find the first sample where time >= g.current.
firstIdx := sort.Search(len(in), func(i int) bool {
return !in[i].Timestamp.Before(g.rangeFrom)
return !in[i].Timestamp.Before(g.current)
})
if firstIdx == len(in) {
// No samples at or after operator start time. This can only happen if we
// try applying the operator to a time after the last recorded sample. In
// this case, we're finished.
g.rangeFrom = g.through.Add(clientmodel.MinimumTick)
// No samples at or after operator start time. This can only
// happen if we try applying the operator to a time after the
// last recorded sample. In this case, we're finished.
g.current = g.through.Add(clientmodel.MinimumTick)
return
}
@ -327,76 +280,30 @@ func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) {
return in[i].Timestamp.After(g.rangeThrough)
})
// This only happens when there is only one sample and it is both after
// g.rangeFrom and after g.rangeThrough. In this case, both indexes are 0.
// g.current and after g.rangeThrough. In this case, both indexes are 0.
if lastIdx == firstIdx {
g.advanceToNextInterval()
return
}
lastSampleTime := in[lastIdx-1].Timestamp
// Sample times are stored with a maximum time resolution of one second, so
// we have to add exactly that to target the next chunk on the next op
// iteration.
g.rangeFrom = lastSampleTime.Add(time.Second)
if g.rangeFrom.After(g.rangeThrough) {
// Sample times are stored with a maximum time resolution of one second,
// so we have to add exactly that to target the next chunk on the next
// op iteration.
g.current = lastSampleTime.Add(time.Second)
if g.current.After(g.rangeThrough) {
g.advanceToNextInterval()
}
return in[firstIdx:lastIdx]
}
func (g *getValueRangeAtIntervalOp) CurrentTime() clientmodel.Timestamp {
return g.rangeFrom
}
func (g *getValueRangeAtIntervalOp) Consumed() bool {
return g.rangeFrom.After(g.through)
return g.current.After(g.through)
}
func (g *getValueRangeAtIntervalOp) GreedierThan(op op) bool {
panic("not implemented")
}
// Provides a collection of getMetricRangeOperation.
type getMetricRangeOperations []*getValuesAlongRangeOp
func (s getMetricRangeOperations) Len() int {
return len(s)
}
func (s getMetricRangeOperations) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// Sorts getMetricRangeOperation according to duration in descending order.
type rangeDurationSorter struct {
getMetricRangeOperations
}
func (s rangeDurationSorter) Less(i, j int) bool {
l := s.getMetricRangeOperations[i]
r := s.getMetricRangeOperations[j]
return !l.through.Before(r.through)
}
// Encapsulates a general operation that occurs over a duration.
type durationOperator interface {
op
Through() clientmodel.Timestamp
}
// greedinessSort sorts the operations in descending order by level of
// greediness.
type greedinessSort struct {
ops
}
func (g greedinessSort) Less(i, j int) bool {
return g.ops[i].GreedierThan(g.ops[j])
}
// Contains getValuesAtIntervalOp operations.
// getValuesAtIntervalOps contains getValuesAtIntervalOp operations. It
// implements sort.Interface and sorts the operations in ascending order by
// their frequency.
type getValuesAtIntervalOps []*getValuesAtIntervalOp
func (s getValuesAtIntervalOps) Len() int {
@ -407,382 +314,33 @@ func (s getValuesAtIntervalOps) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// Sorts durationOperator by the operation's duration in descending order.
type intervalDurationSorter struct {
getValuesAtIntervalOps
func (s getValuesAtIntervalOps) Less(i, j int) bool {
return s[i].interval < s[j].interval
}
func (s intervalDurationSorter) Less(i, j int) bool {
l := s.getValuesAtIntervalOps[i]
r := s.getValuesAtIntervalOps[j]
return !l.through.Before(r.through)
}
// Sorts getValuesAtIntervalOp operations in ascending order by their
// frequency.
type frequencySorter struct {
getValuesAtIntervalOps
}
func (s frequencySorter) Less(i, j int) bool {
l := s.getValuesAtIntervalOps[i]
r := s.getValuesAtIntervalOps[j]
return l.interval < r.interval
}
// Selects and returns all operations that are getValuesAtIntervalOp operations
// in a map whereby the operation interval is the key and the value are the
// operations sorted by respective level of greediness.
func collectIntervals(o ops) (intervals map[time.Duration]ops) {
intervals = make(map[time.Duration]ops)
for _, operation := range o {
switch t := operation.(type) {
case *getValuesAtIntervalOp:
operations, _ := intervals[t.interval]
operations = append(operations, t)
intervals[t.interval] = operations
}
}
return
}
// Selects and returns all operations that are getValuesAlongRangeOp operations.
func collectRanges(ops ops) (ranges ops) {
for _, operation := range ops {
switch t := operation.(type) {
case *getValuesAlongRangeOp:
ranges = append(ranges, t)
}
}
return
}
// optimizeForward iteratively scans operations and peeks ahead to subsequent
// ones to find candidates that can either be removed or truncated through
// simplification. For instance, if a range query happens to overlap a get-a-
// value-at-a-certain-point-request, the range query should flatten and subsume
// the other.
func optimizeForward(unoptimized ops) ops {
if len(unoptimized) <= 1 {
return unoptimized
}
head := unoptimized[0]
unoptimized = unoptimized[1:]
optimized := ops{}
switch headOp := head.(type) {
case *getValuesAtTimeOp:
optimized = ops{head}
case *getValuesAtIntervalOp:
optimized, unoptimized = optimizeForwardGetValuesAtInterval(headOp, unoptimized)
case *getValuesAlongRangeOp:
optimized, unoptimized = optimizeForwardGetValuesAlongRange(headOp, unoptimized)
default:
panic("unknown operation type")
}
tail := optimizeForward(unoptimized)
return append(optimized, tail...)
}
func optimizeForwardGetValuesAtInterval(headOp *getValuesAtIntervalOp, unoptimized ops) (ops, ops) {
// If the last value was a scan at a given frequency along an interval,
// several optimizations may exist.
for _, peekOperation := range unoptimized {
if peekOperation.StartsAt().After(headOp.Through()) {
break
}
// If the type is not a range request, we can't do anything.
switch next := peekOperation.(type) {
case *getValuesAlongRangeOp:
if !next.GreedierThan(headOp) {
after := getValuesAtIntervalOp(*headOp)
headOp.through = next.from
// Truncate the get value at interval request if a range request cuts
// it off somewhere.
from := next.from
for !from.After(next.through) {
from = from.Add(headOp.interval)
}
after.from = from
unoptimized = append(ops{&after}, unoptimized...)
sort.Sort(startsAtSort{unoptimized})
return ops{headOp}, unoptimized
}
}
}
return ops{headOp}, unoptimized
}
func optimizeForwardGetValuesAlongRange(headOp *getValuesAlongRangeOp, unoptimized ops) (ops, ops) {
optimized := ops{}
for _, peekOperation := range unoptimized {
if peekOperation.StartsAt().After(headOp.Through()) {
optimized = ops{headOp}
break
}
switch next := peekOperation.(type) {
// All values at a specific time may be elided into the range query.
case *getValuesAtTimeOp:
optimized = ops{headOp}
unoptimized = unoptimized[1:]
case *getValuesAlongRangeOp:
// Range queries should be concatenated if they overlap.
if next.GreedierThan(headOp) {
next.from = headOp.from
return optimized, unoptimized
}
optimized = ops{headOp}
unoptimized = unoptimized[1:]
case *getValuesAtIntervalOp:
optimized = ops{headOp}
unoptimized = unoptimized[1:]
if next.GreedierThan(headOp) {
nextStart := next.from
for !nextStart.After(headOp.through) {
nextStart = nextStart.Add(next.interval)
}
next.from = nextStart
unoptimized = append(ops{next}, unoptimized...)
}
default:
panic("unknown operation type")
}
}
return optimized, unoptimized
}
// selectQueriesForTime chooses all subsequent operations from the slice that
// have the same start time as the provided time and emits them.
func selectQueriesForTime(time clientmodel.Timestamp, queries ops) (out ops) {
if len(queries) == 0 {
return
}
if !queries[0].StartsAt().Equal(time) {
return
}
out = append(out, queries[0])
tail := selectQueriesForTime(time, queries[1:])
return append(out, tail...)
}
// selectGreediestRange scans through the various getValuesAlongRangeOp
// operations and emits the one that is the greediest.
func selectGreediestRange(in ops) (o durationOperator) {
if len(in) == 0 {
return
}
sort.Sort(greedinessSort{in})
o = in[0].(*getValuesAlongRangeOp)
return
}
// selectGreediestIntervals scans through the various getValuesAtIntervalOp
// operations and emits a map of the greediest operation keyed by its start
// time.
func selectGreediestIntervals(in map[time.Duration]ops) (out map[time.Duration]durationOperator) {
if len(in) == 0 {
return
}
out = make(map[time.Duration]durationOperator)
for i, ops := range in {
sort.Sort(greedinessSort{ops})
out[i] = ops[0].(*getValuesAtIntervalOp)
}
return
}
// rewriteForGreediestRange rewrites the current pending operation such that the
// greediest range operation takes precedence over all other operators in this
// time group.
// extractValuesAroundTime searches for the provided time in the list of
// available samples and emits a slice containing the data points that
// are adjacent to it.
//
// Between two range operations O1 and O2, they both start at the same time;
// however, O2 extends for a longer duration than O1. Thusly, O1 should be
// deleted with O2.
//
// O1------>|
// T1 T4
//
// O2------------>|
// T1 T7
//
// Thusly O1 can be squashed into O2 without having side-effects.
func rewriteForGreediestRange(greediestRange durationOperator) ops {
return ops{greediestRange}
}
// rewriteForGreediestInterval rewrites teh current pending interval operations
// such that the interval operation with the smallest collection period is
// invoked first, for it will skip around the soonest of any of the remaining
// other operators.
//
// Between two interval operations O1 and O2, they both start at the same time;
// however, O2's period is shorter than O1, meaning it will sample far more
// frequently from the underlying time series. Thusly, O2 should start before
// O1.
//
// O1---->|---->|
// T1 T5
//
// O2->|->|->|->|
// T1 T5
//
// The rewriter presently does not scan and compact for common divisors in the
// periods, though this may be nice to have. For instance, if O1 has a period
// of 2 and O2 has a period of 4, O2 would be dropped for O1 would implicitly
// cover its period.
func rewriteForGreediestInterval(greediestIntervals map[time.Duration]durationOperator) ops {
var (
memo getValuesAtIntervalOps
out ops
)
for _, o := range greediestIntervals {
memo = append(memo, o.(*getValuesAtIntervalOp))
// An assumption of this is that the provided samples are already sorted!
func extractValuesAroundTime(t clientmodel.Timestamp, in Values) Values {
i := sort.Search(len(in), func(i int) bool {
return !in[i].Timestamp.Before(t)
})
if i == len(in) {
// Target time is past the end, return only the last sample.
return in[len(in)-1:]
}
sort.Sort(frequencySorter{memo})
for _, o := range memo {
out = append(out, o)
if in[i].Timestamp.Equal(t) && len(in) > i+1 {
// We hit exactly the current sample time. Very unlikely in
// practice. Return only the current sample.
return in[i : i+1]
}
return out
}
// rewriteForRangeAndInterval examines the existence of a range operation and a
// set of interval operations that start at the same time and deletes all
// interval operations that start and finish before the range operation
// completes and rewrites all interval operations that continue longer than
// the range operation to start at the next best increment after the range.
//
// Assume that we have a range operator O1 and two interval operations O2 and
// O3. O2 and O3 have the same period (i.e., sampling interval), but O2
// terminates before O1 and O3 continue beyond O1.
//
// O1------------>|
// T1------------T7
//
// O2-->|-->|-->|
// T1----------T6
//
// O3-->|-->|-->|-->|-->|
// T1------------------T10
//
// This scenario will be rewritten such that O2 is deleted and O3 is truncated
// from T1 through T7, and O3's new starting time is at T7 and runs through T10:
//
// O1------------>|
// T1------------T7
//
// O2>|-->|
// T7---T10
//
// All rewritten interval operators will respect their original start time
// multipliers.
func rewriteForRangeAndInterval(greediestRange durationOperator, greediestIntervals map[time.Duration]durationOperator) (out ops) {
out = append(out, greediestRange)
for _, op := range greediestIntervals {
if !op.GreedierThan(greediestRange) {
continue
}
// The range operation does not exceed interval. Leave a snippet of
// interval.
var (
truncated = op.(*getValuesAtIntervalOp)
newIntervalOperation getValuesAtIntervalOp
// Refactor
remainingSlice = greediestRange.Through().Sub(greediestRange.StartsAt()) / time.Second
nextIntervalPoint = time.Duration(math.Ceil(float64(remainingSlice)/float64(truncated.interval)) * float64(truncated.interval/time.Second))
nextStart = greediestRange.Through().Add(nextIntervalPoint)
)
newIntervalOperation.from = nextStart
newIntervalOperation.interval = truncated.interval
newIntervalOperation.through = truncated.Through()
// Added back to the pending because additional curation could be
// necessary.
out = append(out, &newIntervalOperation)
if i == 0 {
// We hit before the first sample time. Return only the first
// sample.
return in[0:1]
}
return
}
// Flattens queries that occur at the same time according to duration and level
// of greed. Consult the various rewriter functions for their respective modes
// of operation.
func optimizeTimeGroup(group ops) (out ops) {
var (
greediestRange = selectGreediestRange(collectRanges(group))
greediestIntervals = selectGreediestIntervals(collectIntervals(group))
containsRange = greediestRange != nil
containsInterval = len(greediestIntervals) > 0
)
switch {
case containsRange && !containsInterval:
out = rewriteForGreediestRange(greediestRange)
case !containsRange && containsInterval:
out = rewriteForGreediestInterval(greediestIntervals)
case containsRange && containsInterval:
out = rewriteForRangeAndInterval(greediestRange, greediestIntervals)
default:
// Operation is OK as-is.
out = append(out, group[0])
}
return
}
// Flattens all groups of time according to greed.
func optimizeTimeGroups(pending ops) (out ops) {
if len(pending) == 0 {
return
}
sort.Sort(startsAtSort{pending})
var (
nextOperation = pending[0]
groupedQueries = selectQueriesForTime(nextOperation.StartsAt(), pending)
)
out = optimizeTimeGroup(groupedQueries)
pending = pending[len(groupedQueries):]
tail := optimizeTimeGroups(pending)
return append(out, tail...)
}
func optimize(pending ops) (out ops) {
return optimizeForward(optimizeTimeGroups(pending))
// We hit between two samples. Return both surrounding samples.
return in[i-1 : i+1]
}

File diff suppressed because it is too large Load diff

View file

@ -1,55 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"bytes"
"fmt"
clientmodel "github.com/prometheus/client_golang/model"
)
// scanJob models a range of queries.
type scanJob struct {
fingerprint *clientmodel.Fingerprint
operations ops
}
func (s scanJob) String() string {
buffer := &bytes.Buffer{}
fmt.Fprintf(buffer, "Scan Job { fingerprint=%s ", s.fingerprint)
fmt.Fprintf(buffer, " with %d operations [", len(s.operations))
for _, operation := range s.operations {
fmt.Fprintf(buffer, "%s", operation)
}
fmt.Fprintf(buffer, "] }")
return buffer.String()
}
type scanJobs []scanJob
func (s scanJobs) Len() int {
return len(s)
}
func (s scanJobs) Less(i, j int) (less bool) {
less = s[i].fingerprint.Less(s[j].fingerprint)
return
}
func (s scanJobs) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

View file

@ -115,7 +115,13 @@ const (
const watermarkCacheLimit = 1024 * 1024
// NewTieredStorage returns a TieredStorage object ready to use.
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval time.Duration, memoryTTL time.Duration, rootDirectory string) (*TieredStorage, error) {
func NewTieredStorage(
appendToDiskQueueDepth,
viewQueueDepth uint,
flushMemoryInterval time.Duration,
memoryTTL time.Duration,
rootDirectory string,
) (*TieredStorage, error) {
if isDir, _ := utility.IsDir(rootDirectory); !isDir {
if err := os.MkdirAll(rootDirectory, 0755); err != nil {
return nil, fmt.Errorf("could not find or create metrics directory %s: %s", rootDirectory, err)
@ -389,11 +395,15 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure})
recordOutcome(
duration,
err,
map[string]string{operation: renderView, result: success},
map[string]string{operation: renderView, result: failure},
)
}()
scanJobsTimer := viewJob.stats.GetTimer(stats.ViewScanJobsTime).Start()
scans := viewJob.builder.ScanJobs()
scanJobsTimer.Stop()
view := newView()
@ -410,135 +420,104 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
defer t.dtoSampleKeys.Give(sampleKeyDto)
extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
for _, scanJob := range scans {
old, err := t.seriesTooOld(scanJob.fingerprint, scanJob.operations[0].CurrentTime())
for viewJob.builder.HasOp() {
op := viewJob.builder.PopOp()
fp := op.Fingerprint()
old, err := t.seriesTooOld(fp, op.CurrentTime())
if err != nil {
glog.Errorf("Error getting watermark from cache for %s: %s", scanJob.fingerprint, err)
glog.Errorf("Error getting watermark from cache for %s: %s", fp, err)
continue
}
if old {
continue
}
standingOps := scanJob.operations
memValues := t.memoryArena.CloneSamples(scanJob.fingerprint)
memValues := t.memoryArena.CloneSamples(fp)
for len(standingOps) > 0 {
// Abort the view rendering if the caller (MakeView) has timed out.
if len(viewJob.abort) > 0 {
return
}
// Abort the view rendering if the caller (MakeView) has timed out.
if len(viewJob.abort) > 0 {
return
}
// Load data value chunk(s) around the first standing op's current time.
targetTime := standingOps[0].CurrentTime()
// Load data value chunk(s) around the current time.
targetTime := op.CurrentTime()
currentChunk := chunk{}
// If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent {
if iterator == nil {
// Get a single iterator that will be used for all data extraction
// below.
iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close()
if diskPresent = iterator.SeekToLast(); diskPresent {
currentChunk := chunk{}
// If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent {
if iterator == nil {
// Get a single iterator that will be used for all data extraction
// below.
iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close()
if diskPresent = iterator.SeekToLast(); diskPresent {
if err := iterator.Key(sampleKeyDto); err != nil {
panic(err)
}
lastBlock.Load(sampleKeyDto)
if !iterator.SeekToFirst() {
diskPresent = false
} else {
if err := iterator.Key(sampleKeyDto); err != nil {
panic(err)
}
lastBlock.Load(sampleKeyDto)
if !iterator.SeekToFirst() {
diskPresent = false
} else {
if err := iterator.Key(sampleKeyDto); err != nil {
panic(err)
}
firstBlock.Load(sampleKeyDto)
}
firstBlock.Load(sampleKeyDto)
}
}
}
if diskPresent {
diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start()
diskValues, expired := t.loadChunkAroundTime(iterator, scanJob.fingerprint, targetTime, firstBlock, lastBlock)
if expired {
diskPresent = false
}
diskTimer.Stop()
if diskPresent {
diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start()
diskValues, expired := t.loadChunkAroundTime(
iterator,
fp,
targetTime,
firstBlock,
lastBlock,
)
if expired {
diskPresent = false
}
diskTimer.Stop()
// If we aimed past the newest value on disk, combine it with the next value from memory.
if len(diskValues) == 0 {
currentChunk = chunk(memValues)
} else {
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:]
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
} else {
currentChunk = chunk(diskValues)
}
}
} else {
// If we aimed past the newest value on disk,
// combine it with the next value from memory.
if len(diskValues) == 0 {
currentChunk = chunk(memValues)
} else {
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:]
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
} else {
currentChunk = chunk(diskValues)
}
}
} else {
currentChunk = chunk(memValues)
}
} else {
currentChunk = chunk(memValues)
}
// There's no data at all for this fingerprint, so stop processing ops for it.
if len(currentChunk) == 0 {
break
}
// There's no data at all for this fingerprint, so stop processing.
if len(currentChunk) == 0 {
continue
}
currentChunk = currentChunk.TruncateBefore(targetTime)
currentChunk = currentChunk.TruncateBefore(targetTime)
lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp
if lastChunkTime.After(targetTime) {
targetTime = lastChunkTime
}
lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp
if lastChunkTime.After(targetTime) {
targetTime = lastChunkTime
}
// For each op, extract all needed data from the current chunk.
out := Values{}
for _, op := range standingOps {
if op.CurrentTime().After(targetTime) {
break
}
currentChunk = currentChunk.TruncateBefore(op.CurrentTime())
for !op.Consumed() && !op.CurrentTime().After(targetTime) {
out = op.ExtractSamples(Values(currentChunk))
// Append the extracted samples to the materialized view.
view.appendSamples(scanJob.fingerprint, out)
}
}
// Throw away standing ops which are finished.
filteredOps := ops{}
for _, op := range standingOps {
if !op.Consumed() {
filteredOps = append(filteredOps, op)
continue
}
giveBackOp(op)
}
standingOps = filteredOps
// Sort ops by start time again, since they might be slightly off now.
// For example, consider a current chunk of values and two interval ops
// with different interval lengths. Their states after the cycle above
// could be:
//
// (C = current op time)
//
// Chunk: [ X X X X X ]
// Op 1: [ X X C . . . ]
// Op 2: [ X X C . . .]
//
// Op 2 now has an earlier current time than Op 1.
sort.Sort(startsAtSort{standingOps})
// Extract all needed data from the current chunk and append the
// extracted samples to the materialized view.
for !op.Consumed() && !op.CurrentTime().After(targetTime) {
view.appendSamples(fp, op.ExtractSamples(Values(currentChunk)))
}
}
extractionTimer.Stop()
@ -547,7 +526,13 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
return
}
func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerprint *clientmodel.Fingerprint, ts clientmodel.Timestamp, firstBlock, lastBlock *SampleKey) (chunk Values, expired bool) {
func (t *TieredStorage) loadChunkAroundTime(
iterator leveldb.Iterator,
fingerprint *clientmodel.Fingerprint,
ts clientmodel.Timestamp,
firstBlock,
lastBlock *SampleKey,
) (chunk Values, expired bool) {
if fingerprint.Less(firstBlock.Fingerprint) {
return nil, false
}

View file

@ -68,7 +68,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant,
baseOp: baseOp{current: instant},
},
},
},
@ -88,7 +88,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant,
baseOp: baseOp{current: instant},
},
},
},
@ -120,7 +120,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant,
baseOp: baseOp{current: instant},
},
},
},
@ -147,7 +147,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant.Add(time.Second),
baseOp: baseOp{current: instant.Add(time.Second)},
},
},
},
@ -179,7 +179,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant,
baseOp: baseOp{current: instant},
},
},
},
@ -216,7 +216,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant.Add(time.Second),
baseOp: baseOp{current: instant.Add(time.Second)},
},
},
},
@ -253,7 +253,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant.Add(time.Second),
baseOp: baseOp{current: instant.Add(time.Second)},
},
},
},
@ -294,7 +294,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant.Add(time.Second * 3),
baseOp: baseOp{current: instant.Add(time.Second * 3)},
},
},
},
@ -319,7 +319,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant.Add(time.Second*time.Duration(*leveldbChunkSize*2) + clientmodel.MinimumTick),
baseOp: baseOp{current: instant.Add(time.Second*time.Duration(*leveldbChunkSize*2) + clientmodel.MinimumTick)},
},
},
},
@ -356,15 +356,15 @@ func testMakeView(t test.Tester, flushToDisk bool) {
requestBuilder := NewViewRequestBuilder()
for _, atTime := range scenario.in.atTime {
requestBuilder.GetMetricAtTime(fingerprint, atTime.time)
requestBuilder.GetMetricAtTime(fingerprint, atTime.current)
}
for _, atInterval := range scenario.in.atInterval {
requestBuilder.GetMetricAtInterval(fingerprint, atInterval.from, atInterval.through, atInterval.interval)
requestBuilder.GetMetricAtInterval(fingerprint, atInterval.current, atInterval.through, atInterval.interval)
}
for _, alongRange := range scenario.in.alongRange {
requestBuilder.GetMetricRange(fingerprint, alongRange.from, alongRange.through)
requestBuilder.GetMetricRange(fingerprint, alongRange.current, alongRange.through)
}
v, err := tiered.MakeView(requestBuilder, time.Second*5, stats.NewTimerGroup())
@ -374,7 +374,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
}
for j, atTime := range scenario.in.atTime {
actual := v.GetValueAtTime(fingerprint, atTime.time)
actual := v.GetValueAtTime(fingerprint, atTime.current)
if len(actual) != len(scenario.out.atTime[j]) {
t.Fatalf("%d.%d. expected %d output, got %d", i, j, len(scenario.out.atTime[j]), len(actual))

View file

@ -14,7 +14,7 @@
package metric
import (
"sort"
"container/heap"
"time"
clientmodel "github.com/prometheus/client_golang/model"
@ -28,110 +28,87 @@ var (
)
// ViewRequestBuilder represents the summation of all datastore queries that
// shall be performed to extract values. Each operation mutates the state of
// the builder.
// shall be performed to extract values. Call the Get... methods to record the
// queries. Once done, use HasOp and PopOp to retrieve the resulting
// operations. The operations are sorted by their fingerprint (and, for equal
// fingerprints, by the StartsAt timestamp of their operation).
type ViewRequestBuilder interface {
// GetMetricAtTime records a query to get, for the given Fingerprint,
// either the value at that time if there is a match or the one or two
// values adjacent thereto.
GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp)
// GetMetricAtInterval records a query to get, for the given
// Fingerprint, either the value at that interval from From through
// Through if there is a match or the one or two values adjacent for
// each point.
GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration)
// GetMetricRange records a query to get, for the given Fingerprint, the
// values that occur inclusively from From through Through.
GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp)
ScanJobs() scanJobs
// GetMetricRangeAtInterval records a query to get value ranges at
// intervals for the given Fingerprint:
//
// |----| |----| |----| |----|
// ^ ^ ^ ^ ^ ^
// | \------------/ \----/ |
// from interval rangeDuration through
GetMetricRangeAtInterval(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration)
// PopOp emits the next operation in the queue (sorted by
// fingerprint). If called while HasOps returns false, the
// behavior is undefined.
PopOp() op
// HasOp returns true if there is at least one more operation in the
// queue.
HasOp() bool
}
// viewRequestBuilder contains the various unoptimized requests for data.
// viewRequestBuilder contains the various requests for data.
type viewRequestBuilder struct {
operations map[clientmodel.Fingerprint]ops
operations ops
}
// NewViewRequestBuilder furnishes a ViewRequestBuilder for remarking what types
// of queries to perform.
func NewViewRequestBuilder() *viewRequestBuilder {
return &viewRequestBuilder{
operations: make(map[clientmodel.Fingerprint]ops),
}
return &viewRequestBuilder{}
}
var getValuesAtTimes = newValueAtTimeList(10 * 1024)
// GetMetricAtTime gets for the given Fingerprint either the value at that time
// if there is an match or the one or two values adjacent thereto.
func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp) {
ops := v.operations[*fingerprint]
op, _ := getValuesAtTimes.Get()
op.time = time
ops = append(ops, op)
v.operations[*fingerprint] = ops
// GetMetricAtTime implements ViewRequestBuilder.
func (v *viewRequestBuilder) GetMetricAtTime(fp *clientmodel.Fingerprint, time clientmodel.Timestamp) {
heap.Push(&v.operations, getValuesAtTimes.Get(fp, time))
}
var getValuesAtIntervals = newValueAtIntervalList(10 * 1024)
// GetMetricAtInterval gets for the given Fingerprint either the value at that
// interval from From through Through if there is an match or the one or two
// values adjacent for each point.
func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) {
ops := v.operations[*fingerprint]
op, _ := getValuesAtIntervals.Get()
op.from = from
op.through = through
op.interval = interval
ops = append(ops, op)
v.operations[*fingerprint] = ops
// GetMetricAtInterval implements ViewRequestBuilder.
func (v *viewRequestBuilder) GetMetricAtInterval(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) {
heap.Push(&v.operations, getValuesAtIntervals.Get(fp, from, through, interval))
}
var getValuesAlongRanges = newValueAlongRangeList(10 * 1024)
// GetMetricRange gets for the given Fingerprint the values that occur
// inclusively from From through Through.
func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp) {
ops := v.operations[*fingerprint]
op, _ := getValuesAlongRanges.Get()
op.from = from
op.through = through
ops = append(ops, op)
v.operations[*fingerprint] = ops
// GetMetricRange implements ViewRequestBuilder.
func (v *viewRequestBuilder) GetMetricRange(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp) {
heap.Push(&v.operations, getValuesAlongRanges.Get(fp, from, through))
}
var getValuesAtIntervalAlongRanges = newValueAtIntervalAlongRangeList(10 * 1024)
// GetMetricRangeAtInterval gets value ranges at intervals for the given
// Fingerprint:
//
// |----| |----| |----| |----|
// ^ ^ ^ ^ ^ ^
// | \------------/ \----/ |
// from interval rangeDuration through
func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) {
ops := v.operations[*fingerprint]
op, _ := getValuesAtIntervalAlongRanges.Get()
op.rangeFrom = from
op.rangeThrough = from.Add(rangeDuration)
op.rangeDuration = rangeDuration
op.interval = interval
op.through = through
ops = append(ops, op)
v.operations[*fingerprint] = ops
// GetMetricRangeAtInterval implements ViewRequestBuilder.
func (v *viewRequestBuilder) GetMetricRangeAtInterval(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) {
heap.Push(&v.operations, getValuesAtIntervalAlongRanges.Get(fp, from, through, interval, rangeDuration))
}
// ScanJobs emits the optimized scans that will occur in the data store. This
// effectively resets the ViewRequestBuilder back to a pristine state.
func (v *viewRequestBuilder) ScanJobs() (j scanJobs) {
for fingerprint, operations := range v.operations {
sort.Sort(startsAtSort{operations})
// PopOp implements ViewRequestBuilder.
func (v *viewRequestBuilder) PopOp() op {
return heap.Pop(&v.operations).(op)
}
fpCopy := fingerprint
j = append(j, scanJob{
fingerprint: &fpCopy,
// BUG: Evaluate whether we need to implement an optimize() working also
// for getValueRangeAtIntervalOp and use it here instead of just passing
// through the list of ops as-is.
operations: operations,
})
delete(v.operations, fingerprint)
}
sort.Sort(j)
return
// HasOp implements ViewRequestBuilder.
func (v *viewRequestBuilder) HasOp() bool {
return v.operations.Len() > 0
}
type view struct {

View file

@ -105,6 +105,12 @@ func testBuilder(t test.Tester) {
{
fingerprint: "00000000000000000000-a-4-a",
},
{
fingerprint: "00000000000000000000-a-4-a",
},
{
fingerprint: "00000000000000001111-a-4-a",
},
{
fingerprint: "00000000000000001111-a-4-a",
},
@ -136,14 +142,18 @@ func testBuilder(t test.Tester) {
{
fingerprint: "00000000000000001111-a-4-a",
},
{
fingerprint: "00000000000000001111-a-4-a",
},
{
fingerprint: "00000000000000001111-a-4-a",
},
},
},
}
for i, scenario := range scenarios {
builder := viewRequestBuilder{
operations: map[clientmodel.Fingerprint]ops{},
}
builder := NewViewRequestBuilder()
for _, atTime := range scenario.in.atTimes {
fingerprint := &clientmodel.Fingerprint{}
@ -163,17 +173,15 @@ func testBuilder(t test.Tester) {
builder.GetMetricRange(fingerprint, atRange.from, atRange.through)
}
jobs := builder.ScanJobs()
if len(scenario.out) != len(jobs) {
t.Fatalf("%d. expected job length of %d, got %d", i, len(scenario.out), len(jobs))
}
for j, job := range scenario.out {
if jobs[j].fingerprint.String() != job.fingerprint {
t.Fatalf("%d.%d. expected fingerprint %s, got %s", i, j, job.fingerprint, jobs[j].fingerprint)
got := builder.PopOp()
if got.Fingerprint().String() != job.fingerprint {
t.Errorf("%d.%d. expected fingerprint %s, got %s", i, j, job.fingerprint, got.Fingerprint())
}
}
if builder.HasOp() {
t.Error("Expected builder to have no scan jobs left.")
}
}
}