diff --git a/storage/metric/freelist.go b/storage/metric/freelist.go index 06bc9c2c6c..b260dc0ed2 100644 --- a/storage/metric/freelist.go +++ b/storage/metric/freelist.go @@ -14,8 +14,10 @@ package metric import ( + "time" "github.com/prometheus/prometheus/utility" + clientmodel "github.com/prometheus/client_golang/model" dto "github.com/prometheus/prometheus/model/generated" ) @@ -81,12 +83,17 @@ type valueAtTimeList struct { l utility.FreeList } -func (l *valueAtTimeList) Get() (*getValuesAtTimeOp, bool) { - if v, ok := l.l.Get(); ok { - return v.(*getValuesAtTimeOp), ok +func (l *valueAtTimeList) Get(fp *clientmodel.Fingerprint, time clientmodel.Timestamp) *getValuesAtTimeOp { + var op *getValuesAtTimeOp + v, ok := l.l.Get() + if ok { + op = v.(*getValuesAtTimeOp) + } else { + op = &getValuesAtTimeOp{} } - - return &getValuesAtTimeOp{}, false + op.fp = *fp + op.current = time + return op } var pGetValuesAtTimeOp = &getValuesAtTimeOp{} @@ -107,12 +114,19 @@ type valueAtIntervalList struct { l utility.FreeList } -func (l *valueAtIntervalList) Get() (*getValuesAtIntervalOp, bool) { - if v, ok := l.l.Get(); ok { - return v.(*getValuesAtIntervalOp), ok +func (l *valueAtIntervalList) Get(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) *getValuesAtIntervalOp { + var op *getValuesAtIntervalOp + v, ok := l.l.Get() + if ok { + op = v.(*getValuesAtIntervalOp) + } else { + op = &getValuesAtIntervalOp{} } - - return &getValuesAtIntervalOp{}, false + op.fp = *fp + op.current = from + op.through = through + op.interval = interval + return op } var pGetValuesAtIntervalOp = &getValuesAtIntervalOp{} @@ -133,12 +147,18 @@ type valueAlongRangeList struct { l utility.FreeList } -func (l *valueAlongRangeList) Get() (*getValuesAlongRangeOp, bool) { - if v, ok := l.l.Get(); ok { - return v.(*getValuesAlongRangeOp), ok +func (l *valueAlongRangeList) Get(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp) *getValuesAlongRangeOp { + var op *getValuesAlongRangeOp + v, ok := l.l.Get() + if ok { + op = v.(*getValuesAlongRangeOp) + } else { + op = &getValuesAlongRangeOp{} } - - return &getValuesAlongRangeOp{}, false + op.fp = *fp + op.current = from + op.through = through + return op } var pGetValuesAlongRangeOp = &getValuesAlongRangeOp{} @@ -159,12 +179,21 @@ type valueAtIntervalAlongRangeList struct { l utility.FreeList } -func (l *valueAtIntervalAlongRangeList) Get() (*getValueRangeAtIntervalOp, bool) { - if v, ok := l.l.Get(); ok { - return v.(*getValueRangeAtIntervalOp), ok +func (l *valueAtIntervalAlongRangeList) Get(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) *getValueRangeAtIntervalOp { + var op *getValueRangeAtIntervalOp + v, ok := l.l.Get() + if ok { + op = v.(*getValueRangeAtIntervalOp) + } else { + op = &getValueRangeAtIntervalOp{} } - - return &getValueRangeAtIntervalOp{}, false + op.fp = *fp + op.current = from + op.rangeThrough = from.Add(rangeDuration) + op.rangeDuration = rangeDuration + op.interval = interval + op.through = through + return op } var pGetValueRangeAtIntervalOp = &getValueRangeAtIntervalOp{} diff --git a/storage/metric/operation.go b/storage/metric/operation.go index ef341d8f93..786a755845 100644 --- a/storage/metric/operation.go +++ b/storage/metric/operation.go @@ -15,7 +15,6 @@ package metric import ( "fmt" - "math" "sort" "time" @@ -24,117 +23,116 @@ import ( // op encapsulates a primitive query operation. type op interface { - // The time at which this operation starts. - StartsAt() clientmodel.Timestamp - // Extract samples from stream of values and advance operation time. + // Fingerprint returns the fingerprint of the metric this operation + // operates on. + Fingerprint() *clientmodel.Fingerprint + // ExtractSamples extracts samples from a stream of values and advances + // the operation time. ExtractSamples(Values) Values - // Return whether the operator has consumed all data it needs. + // Consumed returns whether the operator has consumed all data it needs. Consumed() bool - // Get current operation time or nil if no subsequent work associated - // with this operator remains. + // CurrentTime gets the current operation time. In a newly created op, + // this is the starting time of the operation. During ongoing execution + // of the op, the current time is advanced accordingly. Once no + // subsequent work associated with the operation remains, nil is + // returned. CurrentTime() clientmodel.Timestamp - // GreedierThan indicates whether this present operation should take - // precedence over the other operation due to greediness. - // - // A critical assumption is that this operator and the other occur at - // the same time: this.StartsAt().Equal(op.StartsAt()). - GreedierThan(op) bool } -// Provides a sortable collection of operations. +// durationOperator encapsulates a general operation that occurs over a +// duration. +type durationOperator interface { + op + Through() clientmodel.Timestamp +} + +// ops is a heap of operations, primary sorting key is the fingerprint. type ops []op +// Len implements sort.Interface and heap.Interface. func (o ops) Len() int { return len(o) } -// startsAtSort implements sort.Interface and allows operator to be sorted in -// chronological order by when they start. -type startsAtSort struct { +// Less implements sort.Interface and heap.Interface. It compares the +// fingerprints. If they are equal, the comparison is delegated to +// currentTimeSort. +func (o ops) Less(i, j int) bool { + fpi := o[i].Fingerprint() + fpj := o[j].Fingerprint() + if fpi.Equal(fpj) { + return currentTimeSort{o}.Less(i, j) + } + return fpi.Less(fpj) +} + +// Swap implements sort.Interface and heap.Interface. +func (o ops) Swap(i, j int) { + o[i], o[j] = o[j], o[i] +} + +// Push implements heap.Interface. +func (o *ops) Push(x interface{}) { + // Push and Pop use pointer receivers because they modify the slice's + // length, not just its contents. + *o = append(*o, x.(op)) +} + +// Push implements heap.Interface. +func (o *ops) Pop() interface{} { + old := *o + n := len(old) + x := old[n-1] + *o = old[0 : n-1] + return x +} + +// currentTimeSort is a wrapper for ops with customized sorting order. +type currentTimeSort struct { ops } -func (s startsAtSort) Less(i, j int) bool { - return s.ops[i].StartsAt().Before(s.ops[j].StartsAt()) +// currentTimeSort implements sort.Interface and sorts the operations in +// chronological order by their current time. +func (s currentTimeSort) Less(i, j int) bool { + return s.ops[i].CurrentTime().Before(s.ops[j].CurrentTime()) } -func (o ops) Swap(i, j int) { - o[i], o[j] = o[j], o[i] +// baseOp contains the implementations and fields shared between different op +// types. +type baseOp struct { + fp clientmodel.Fingerprint + current clientmodel.Timestamp +} + +func (g *baseOp) Fingerprint() *clientmodel.Fingerprint { + return &g.fp +} + +func (g *baseOp) CurrentTime() clientmodel.Timestamp { + return g.current } // getValuesAtTimeOp encapsulates getting values at or adjacent to a specific // time. type getValuesAtTimeOp struct { - time clientmodel.Timestamp + baseOp consumed bool } func (g *getValuesAtTimeOp) String() string { - return fmt.Sprintf("getValuesAtTimeOp at %s", g.time) -} - -func (g *getValuesAtTimeOp) StartsAt() clientmodel.Timestamp { - return g.time + return fmt.Sprintf("getValuesAtTimeOp at %s", g.current) } func (g *getValuesAtTimeOp) ExtractSamples(in Values) (out Values) { if len(in) == 0 { return } - out = extractValuesAroundTime(g.time, in) + out = extractValuesAroundTime(g.current, in) g.consumed = true return } -func (g *getValuesAtTimeOp) GreedierThan(op op) (superior bool) { - switch op.(type) { - case *getValuesAtTimeOp: - superior = true - case durationOperator: - superior = false - default: - panic("unknown operation") - } - - return -} - -// extractValuesAroundTime searches for the provided time in the list of -// available samples and emits a slice containing the data points that -// are adjacent to it. -// -// An assumption of this is that the provided samples are already sorted! -func extractValuesAroundTime(t clientmodel.Timestamp, in Values) (out Values) { - i := sort.Search(len(in), func(i int) bool { - return !in[i].Timestamp.Before(t) - }) - if i == len(in) { - // Target time is past the end, return only the last sample. - out = in[len(in)-1:] - } else { - if in[i].Timestamp.Equal(t) && len(in) > i+1 { - // We hit exactly the current sample time. Very unlikely - // in practice. Return only the current sample. - out = in[i : i+1] - } else { - if i == 0 { - // We hit before the first sample time. Return - // only the first sample. - out = in[0:1] - } else { - // We hit between two samples. Return both - // surrounding samples. - out = in[i-1 : i+1] - } - } - } - return -} - -func (g getValuesAtTimeOp) CurrentTime() clientmodel.Timestamp { - return g.time -} - func (g getValuesAtTimeOp) Consumed() bool { return g.consumed } @@ -142,17 +140,13 @@ func (g getValuesAtTimeOp) Consumed() bool { // getValuesAtIntervalOp encapsulates getting values at a given interval over a // duration. type getValuesAtIntervalOp struct { - from clientmodel.Timestamp + baseOp through clientmodel.Timestamp interval time.Duration } func (g *getValuesAtIntervalOp) String() string { - return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", g.from, g.interval, g.through) -} - -func (g *getValuesAtIntervalOp) StartsAt() clientmodel.Timestamp { - return g.from + return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", g.current, g.interval, g.through) } func (g *getValuesAtIntervalOp) Through() clientmodel.Timestamp { @@ -165,56 +159,36 @@ func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) { } lastChunkTime := in[len(in)-1].Timestamp for len(in) > 0 { - out = append(out, extractValuesAroundTime(g.from, in)...) + out = append(out, extractValuesAroundTime(g.current, in)...) lastExtractedTime := out[len(out)-1].Timestamp - in = in.TruncateBefore(lastExtractedTime.Add(clientmodel.MinimumTick)) - g.from = g.from.Add(g.interval) - for !g.from.After(lastExtractedTime) { - g.from = g.from.Add(g.interval) + in = in.TruncateBefore(lastExtractedTime.Add( + clientmodel.MinimumTick)) + g.current = g.current.Add(g.interval) + for !g.current.After(lastExtractedTime) { + g.current = g.current.Add(g.interval) } if lastExtractedTime.Equal(lastChunkTime) { break } - if g.from.After(g.through) { + if g.current.After(g.through) { break } } return } -func (g *getValuesAtIntervalOp) CurrentTime() clientmodel.Timestamp { - return g.from -} - func (g *getValuesAtIntervalOp) Consumed() bool { - return g.from.After(g.through) -} - -func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) { - switch o := op.(type) { - case *getValuesAtTimeOp: - superior = true - case durationOperator: - superior = g.Through().After(o.Through()) - default: - panic("unknown operation") - } - - return + return g.current.After(g.through) } // getValuesAlongRangeOp encapsulates getting all values in a given range. type getValuesAlongRangeOp struct { - from clientmodel.Timestamp + baseOp through clientmodel.Timestamp } func (g *getValuesAlongRangeOp) String() string { - return fmt.Sprintf("getValuesAlongRangeOp from %s through %s", g.from, g.through) -} - -func (g *getValuesAlongRangeOp) StartsAt() clientmodel.Timestamp { - return g.from + return fmt.Sprintf("getValuesAlongRangeOp from %s through %s", g.current, g.through) } func (g *getValuesAlongRangeOp) Through() clientmodel.Timestamp { @@ -225,15 +199,15 @@ func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) { if len(in) == 0 { return } - // Find the first sample where time >= g.from. + // Find the first sample where time >= g.current. firstIdx := sort.Search(len(in), func(i int) bool { - return !in[i].Timestamp.Before(g.from) + return !in[i].Timestamp.Before(g.current) }) if firstIdx == len(in) { - // No samples at or after operator start time. This can only happen if we - // try applying the operator to a time after the last recorded sample. In - // this case, we're finished. - g.from = g.through.Add(clientmodel.MinimumTick) + // No samples at or after operator start time. This can only + // happen if we try applying the operator to a time after the + // last recorded sample. In this case, we're finished. + g.current = g.through.Add(clientmodel.MinimumTick) return } @@ -242,37 +216,20 @@ func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) { return in[i].Timestamp.After(g.through) }) if lastIdx == firstIdx { - g.from = g.through.Add(clientmodel.MinimumTick) + g.current = g.through.Add(clientmodel.MinimumTick) return } lastSampleTime := in[lastIdx-1].Timestamp - // Sample times are stored with a maximum time resolution of one second, so - // we have to add exactly that to target the next chunk on the next op - // iteration. - g.from = lastSampleTime.Add(time.Second) + // Sample times are stored with a maximum time resolution of one second, + // so we have to add exactly that to target the next chunk on the next + // op iteration. + g.current = lastSampleTime.Add(time.Second) return in[firstIdx:lastIdx] } -func (g *getValuesAlongRangeOp) CurrentTime() clientmodel.Timestamp { - return g.from -} - func (g *getValuesAlongRangeOp) Consumed() bool { - return g.from.After(g.through) -} - -func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) { - switch o := op.(type) { - case *getValuesAtTimeOp: - superior = true - case durationOperator: - superior = g.Through().After(o.Through()) - default: - panic("unknown operation") - } - - return + return g.current.After(g.through) } // getValueRangeAtIntervalOp encapsulates getting all values from ranges along @@ -282,7 +239,7 @@ func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) { // incremented by interval and from is reset to through-rangeDuration. Returns // current time nil when from > totalThrough. type getValueRangeAtIntervalOp struct { - rangeFrom clientmodel.Timestamp + baseOp rangeThrough clientmodel.Timestamp rangeDuration time.Duration interval time.Duration @@ -290,11 +247,7 @@ type getValueRangeAtIntervalOp struct { } func (g *getValueRangeAtIntervalOp) String() string { - return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", g.rangeDuration, g.rangeFrom, g.interval, g.through) -} - -func (g *getValueRangeAtIntervalOp) StartsAt() clientmodel.Timestamp { - return g.rangeFrom + return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", g.rangeDuration, g.current, g.interval, g.through) } func (g *getValueRangeAtIntervalOp) Through() clientmodel.Timestamp { @@ -303,22 +256,22 @@ func (g *getValueRangeAtIntervalOp) Through() clientmodel.Timestamp { func (g *getValueRangeAtIntervalOp) advanceToNextInterval() { g.rangeThrough = g.rangeThrough.Add(g.interval) - g.rangeFrom = g.rangeThrough.Add(-g.rangeDuration) + g.current = g.rangeThrough.Add(-g.rangeDuration) } func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) { if len(in) == 0 { return } - // Find the first sample where time >= g.from. + // Find the first sample where time >= g.current. firstIdx := sort.Search(len(in), func(i int) bool { - return !in[i].Timestamp.Before(g.rangeFrom) + return !in[i].Timestamp.Before(g.current) }) if firstIdx == len(in) { - // No samples at or after operator start time. This can only happen if we - // try applying the operator to a time after the last recorded sample. In - // this case, we're finished. - g.rangeFrom = g.through.Add(clientmodel.MinimumTick) + // No samples at or after operator start time. This can only + // happen if we try applying the operator to a time after the + // last recorded sample. In this case, we're finished. + g.current = g.through.Add(clientmodel.MinimumTick) return } @@ -327,76 +280,30 @@ func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) { return in[i].Timestamp.After(g.rangeThrough) }) // This only happens when there is only one sample and it is both after - // g.rangeFrom and after g.rangeThrough. In this case, both indexes are 0. + // g.current and after g.rangeThrough. In this case, both indexes are 0. if lastIdx == firstIdx { g.advanceToNextInterval() return } lastSampleTime := in[lastIdx-1].Timestamp - // Sample times are stored with a maximum time resolution of one second, so - // we have to add exactly that to target the next chunk on the next op - // iteration. - g.rangeFrom = lastSampleTime.Add(time.Second) - if g.rangeFrom.After(g.rangeThrough) { + // Sample times are stored with a maximum time resolution of one second, + // so we have to add exactly that to target the next chunk on the next + // op iteration. + g.current = lastSampleTime.Add(time.Second) + if g.current.After(g.rangeThrough) { g.advanceToNextInterval() } return in[firstIdx:lastIdx] } -func (g *getValueRangeAtIntervalOp) CurrentTime() clientmodel.Timestamp { - return g.rangeFrom -} - func (g *getValueRangeAtIntervalOp) Consumed() bool { - return g.rangeFrom.After(g.through) + return g.current.After(g.through) } -func (g *getValueRangeAtIntervalOp) GreedierThan(op op) bool { - panic("not implemented") -} - -// Provides a collection of getMetricRangeOperation. -type getMetricRangeOperations []*getValuesAlongRangeOp - -func (s getMetricRangeOperations) Len() int { - return len(s) -} - -func (s getMetricRangeOperations) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -// Sorts getMetricRangeOperation according to duration in descending order. -type rangeDurationSorter struct { - getMetricRangeOperations -} - -func (s rangeDurationSorter) Less(i, j int) bool { - l := s.getMetricRangeOperations[i] - r := s.getMetricRangeOperations[j] - - return !l.through.Before(r.through) -} - -// Encapsulates a general operation that occurs over a duration. -type durationOperator interface { - op - - Through() clientmodel.Timestamp -} - -// greedinessSort sorts the operations in descending order by level of -// greediness. -type greedinessSort struct { - ops -} - -func (g greedinessSort) Less(i, j int) bool { - return g.ops[i].GreedierThan(g.ops[j]) -} - -// Contains getValuesAtIntervalOp operations. +// getValuesAtIntervalOps contains getValuesAtIntervalOp operations. It +// implements sort.Interface and sorts the operations in ascending order by +// their frequency. type getValuesAtIntervalOps []*getValuesAtIntervalOp func (s getValuesAtIntervalOps) Len() int { @@ -407,382 +314,33 @@ func (s getValuesAtIntervalOps) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -// Sorts durationOperator by the operation's duration in descending order. -type intervalDurationSorter struct { - getValuesAtIntervalOps +func (s getValuesAtIntervalOps) Less(i, j int) bool { + return s[i].interval < s[j].interval } -func (s intervalDurationSorter) Less(i, j int) bool { - l := s.getValuesAtIntervalOps[i] - r := s.getValuesAtIntervalOps[j] - - return !l.through.Before(r.through) -} - -// Sorts getValuesAtIntervalOp operations in ascending order by their -// frequency. -type frequencySorter struct { - getValuesAtIntervalOps -} - -func (s frequencySorter) Less(i, j int) bool { - l := s.getValuesAtIntervalOps[i] - r := s.getValuesAtIntervalOps[j] - - return l.interval < r.interval -} - -// Selects and returns all operations that are getValuesAtIntervalOp operations -// in a map whereby the operation interval is the key and the value are the -// operations sorted by respective level of greediness. -func collectIntervals(o ops) (intervals map[time.Duration]ops) { - intervals = make(map[time.Duration]ops) - - for _, operation := range o { - switch t := operation.(type) { - case *getValuesAtIntervalOp: - operations, _ := intervals[t.interval] - - operations = append(operations, t) - intervals[t.interval] = operations - } - } - - return -} - -// Selects and returns all operations that are getValuesAlongRangeOp operations. -func collectRanges(ops ops) (ranges ops) { - for _, operation := range ops { - switch t := operation.(type) { - case *getValuesAlongRangeOp: - ranges = append(ranges, t) - } - } - - return -} - -// optimizeForward iteratively scans operations and peeks ahead to subsequent -// ones to find candidates that can either be removed or truncated through -// simplification. For instance, if a range query happens to overlap a get-a- -// value-at-a-certain-point-request, the range query should flatten and subsume -// the other. -func optimizeForward(unoptimized ops) ops { - if len(unoptimized) <= 1 { - return unoptimized - } - - head := unoptimized[0] - unoptimized = unoptimized[1:] - optimized := ops{} - - switch headOp := head.(type) { - case *getValuesAtTimeOp: - optimized = ops{head} - case *getValuesAtIntervalOp: - optimized, unoptimized = optimizeForwardGetValuesAtInterval(headOp, unoptimized) - case *getValuesAlongRangeOp: - optimized, unoptimized = optimizeForwardGetValuesAlongRange(headOp, unoptimized) - default: - panic("unknown operation type") - } - - tail := optimizeForward(unoptimized) - - return append(optimized, tail...) -} - -func optimizeForwardGetValuesAtInterval(headOp *getValuesAtIntervalOp, unoptimized ops) (ops, ops) { - // If the last value was a scan at a given frequency along an interval, - // several optimizations may exist. - for _, peekOperation := range unoptimized { - if peekOperation.StartsAt().After(headOp.Through()) { - break - } - - // If the type is not a range request, we can't do anything. - switch next := peekOperation.(type) { - case *getValuesAlongRangeOp: - if !next.GreedierThan(headOp) { - after := getValuesAtIntervalOp(*headOp) - - headOp.through = next.from - - // Truncate the get value at interval request if a range request cuts - // it off somewhere. - from := next.from - - for !from.After(next.through) { - from = from.Add(headOp.interval) - } - - after.from = from - - unoptimized = append(ops{&after}, unoptimized...) - sort.Sort(startsAtSort{unoptimized}) - return ops{headOp}, unoptimized - } - } - } - return ops{headOp}, unoptimized -} - -func optimizeForwardGetValuesAlongRange(headOp *getValuesAlongRangeOp, unoptimized ops) (ops, ops) { - optimized := ops{} - for _, peekOperation := range unoptimized { - if peekOperation.StartsAt().After(headOp.Through()) { - optimized = ops{headOp} - break - } - - switch next := peekOperation.(type) { - // All values at a specific time may be elided into the range query. - case *getValuesAtTimeOp: - optimized = ops{headOp} - unoptimized = unoptimized[1:] - case *getValuesAlongRangeOp: - // Range queries should be concatenated if they overlap. - if next.GreedierThan(headOp) { - next.from = headOp.from - return optimized, unoptimized - } - optimized = ops{headOp} - unoptimized = unoptimized[1:] - case *getValuesAtIntervalOp: - optimized = ops{headOp} - unoptimized = unoptimized[1:] - - if next.GreedierThan(headOp) { - nextStart := next.from - - for !nextStart.After(headOp.through) { - nextStart = nextStart.Add(next.interval) - } - - next.from = nextStart - unoptimized = append(ops{next}, unoptimized...) - } - default: - panic("unknown operation type") - } - } - return optimized, unoptimized -} - -// selectQueriesForTime chooses all subsequent operations from the slice that -// have the same start time as the provided time and emits them. -func selectQueriesForTime(time clientmodel.Timestamp, queries ops) (out ops) { - if len(queries) == 0 { - return - } - - if !queries[0].StartsAt().Equal(time) { - return - } - - out = append(out, queries[0]) - tail := selectQueriesForTime(time, queries[1:]) - - return append(out, tail...) -} - -// selectGreediestRange scans through the various getValuesAlongRangeOp -// operations and emits the one that is the greediest. -func selectGreediestRange(in ops) (o durationOperator) { - if len(in) == 0 { - return - } - - sort.Sort(greedinessSort{in}) - - o = in[0].(*getValuesAlongRangeOp) - - return -} - -// selectGreediestIntervals scans through the various getValuesAtIntervalOp -// operations and emits a map of the greediest operation keyed by its start -// time. -func selectGreediestIntervals(in map[time.Duration]ops) (out map[time.Duration]durationOperator) { - if len(in) == 0 { - return - } - - out = make(map[time.Duration]durationOperator) - - for i, ops := range in { - sort.Sort(greedinessSort{ops}) - - out[i] = ops[0].(*getValuesAtIntervalOp) - } - - return -} - -// rewriteForGreediestRange rewrites the current pending operation such that the -// greediest range operation takes precedence over all other operators in this -// time group. +// extractValuesAroundTime searches for the provided time in the list of +// available samples and emits a slice containing the data points that +// are adjacent to it. // -// Between two range operations O1 and O2, they both start at the same time; -// however, O2 extends for a longer duration than O1. Thusly, O1 should be -// deleted with O2. -// -// O1------>| -// T1 T4 -// -// O2------------>| -// T1 T7 -// -// Thusly O1 can be squashed into O2 without having side-effects. -func rewriteForGreediestRange(greediestRange durationOperator) ops { - return ops{greediestRange} -} - -// rewriteForGreediestInterval rewrites teh current pending interval operations -// such that the interval operation with the smallest collection period is -// invoked first, for it will skip around the soonest of any of the remaining -// other operators. -// -// Between two interval operations O1 and O2, they both start at the same time; -// however, O2's period is shorter than O1, meaning it will sample far more -// frequently from the underlying time series. Thusly, O2 should start before -// O1. -// -// O1---->|---->| -// T1 T5 -// -// O2->|->|->|->| -// T1 T5 -// -// The rewriter presently does not scan and compact for common divisors in the -// periods, though this may be nice to have. For instance, if O1 has a period -// of 2 and O2 has a period of 4, O2 would be dropped for O1 would implicitly -// cover its period. -func rewriteForGreediestInterval(greediestIntervals map[time.Duration]durationOperator) ops { - var ( - memo getValuesAtIntervalOps - out ops - ) - - for _, o := range greediestIntervals { - memo = append(memo, o.(*getValuesAtIntervalOp)) +// An assumption of this is that the provided samples are already sorted! +func extractValuesAroundTime(t clientmodel.Timestamp, in Values) Values { + i := sort.Search(len(in), func(i int) bool { + return !in[i].Timestamp.Before(t) + }) + if i == len(in) { + // Target time is past the end, return only the last sample. + return in[len(in)-1:] } - - sort.Sort(frequencySorter{memo}) - - for _, o := range memo { - out = append(out, o) + if in[i].Timestamp.Equal(t) && len(in) > i+1 { + // We hit exactly the current sample time. Very unlikely in + // practice. Return only the current sample. + return in[i : i+1] } - - return out -} - -// rewriteForRangeAndInterval examines the existence of a range operation and a -// set of interval operations that start at the same time and deletes all -// interval operations that start and finish before the range operation -// completes and rewrites all interval operations that continue longer than -// the range operation to start at the next best increment after the range. -// -// Assume that we have a range operator O1 and two interval operations O2 and -// O3. O2 and O3 have the same period (i.e., sampling interval), but O2 -// terminates before O1 and O3 continue beyond O1. -// -// O1------------>| -// T1------------T7 -// -// O2-->|-->|-->| -// T1----------T6 -// -// O3-->|-->|-->|-->|-->| -// T1------------------T10 -// -// This scenario will be rewritten such that O2 is deleted and O3 is truncated -// from T1 through T7, and O3's new starting time is at T7 and runs through T10: -// -// O1------------>| -// T1------------T7 -// -// O2>|-->| -// T7---T10 -// -// All rewritten interval operators will respect their original start time -// multipliers. -func rewriteForRangeAndInterval(greediestRange durationOperator, greediestIntervals map[time.Duration]durationOperator) (out ops) { - out = append(out, greediestRange) - for _, op := range greediestIntervals { - if !op.GreedierThan(greediestRange) { - continue - } - - // The range operation does not exceed interval. Leave a snippet of - // interval. - var ( - truncated = op.(*getValuesAtIntervalOp) - newIntervalOperation getValuesAtIntervalOp - // Refactor - remainingSlice = greediestRange.Through().Sub(greediestRange.StartsAt()) / time.Second - nextIntervalPoint = time.Duration(math.Ceil(float64(remainingSlice)/float64(truncated.interval)) * float64(truncated.interval/time.Second)) - nextStart = greediestRange.Through().Add(nextIntervalPoint) - ) - - newIntervalOperation.from = nextStart - newIntervalOperation.interval = truncated.interval - newIntervalOperation.through = truncated.Through() - // Added back to the pending because additional curation could be - // necessary. - out = append(out, &newIntervalOperation) + if i == 0 { + // We hit before the first sample time. Return only the first + // sample. + return in[0:1] } - - return -} - -// Flattens queries that occur at the same time according to duration and level -// of greed. Consult the various rewriter functions for their respective modes -// of operation. -func optimizeTimeGroup(group ops) (out ops) { - var ( - greediestRange = selectGreediestRange(collectRanges(group)) - greediestIntervals = selectGreediestIntervals(collectIntervals(group)) - containsRange = greediestRange != nil - containsInterval = len(greediestIntervals) > 0 - ) - - switch { - case containsRange && !containsInterval: - out = rewriteForGreediestRange(greediestRange) - case !containsRange && containsInterval: - out = rewriteForGreediestInterval(greediestIntervals) - case containsRange && containsInterval: - out = rewriteForRangeAndInterval(greediestRange, greediestIntervals) - default: - // Operation is OK as-is. - out = append(out, group[0]) - } - return -} - -// Flattens all groups of time according to greed. -func optimizeTimeGroups(pending ops) (out ops) { - if len(pending) == 0 { - return - } - - sort.Sort(startsAtSort{pending}) - - var ( - nextOperation = pending[0] - groupedQueries = selectQueriesForTime(nextOperation.StartsAt(), pending) - ) - - out = optimizeTimeGroup(groupedQueries) - pending = pending[len(groupedQueries):] - - tail := optimizeTimeGroups(pending) - - return append(out, tail...) -} - -func optimize(pending ops) (out ops) { - return optimizeForward(optimizeTimeGroups(pending)) + // We hit between two samples. Return both surrounding samples. + return in[i-1 : i+1] } diff --git a/storage/metric/operation_test.go b/storage/metric/operation_test.go index 99cd7fdd59..3441dc4fb0 100644 --- a/storage/metric/operation_test.go +++ b/storage/metric/operation_test.go @@ -14,1195 +14,10 @@ package metric import ( - "sort" "testing" "time" - - "github.com/prometheus/prometheus/utility/test" ) -func testOptimizeTimeGroups(t test.Tester) { - var ( - out ops - - scenarios = []struct { - in ops - out ops - }{ - // Empty set; return empty set. - { - in: ops{}, - out: ops{}, - }, - // Single time; return single time. - { - in: ops{ - &getValuesAtTimeOp{ - time: testInstant, - }, - }, - out: ops{ - &getValuesAtTimeOp{ - time: testInstant, - }, - }, - }, - // Single range; return single range. - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - }, - }, - // Single interval; return single interval. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - }, - }, - // Duplicate points; return single point. - { - in: ops{ - &getValuesAtTimeOp{ - time: testInstant, - }, - &getValuesAtTimeOp{ - time: testInstant, - }, - }, - out: ops{ - &getValuesAtTimeOp{ - time: testInstant, - }, - }, - }, - // Duplicate ranges; return single range. - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - }, - }, - // Duplicate intervals; return single interval. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - }, - }, - // Subordinate interval; return master. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 5, - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 5, - }, - }, - }, - // Subordinate range; return master. - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - }, - }, - // Equal range with different interval; return both. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - // Different range with different interval; return best. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 5, - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 5, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - /* - // Include Truncated Intervals with Range. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(30 * time.Second), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(30 * time.Second), - }, - &getValuesAtIntervalOp{ - from: testInstant.Add(30 * time.Second), - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - // Compacted Forward Truncation - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - &getValuesAtIntervalOp{ - from: testInstant.Add(2 * time.Minute), - through: testInstant.Add(3 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - // Compacted Tail Truncation - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - &getValuesAtIntervalOp{ - from: testInstant.Add(2 * time.Minute), - through: testInstant.Add(3 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - */ - // Regression Validation 1: Multiple Overlapping Interval Requests - // This one specific case expects no mutation. - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(15 * time.Second), - through: testInstant.Add(15 * time.Second).Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(30 * time.Second), - through: testInstant.Add(30 * time.Second).Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(45 * time.Second), - through: testInstant.Add(45 * time.Second).Add(5 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(15 * time.Second), - through: testInstant.Add(15 * time.Second).Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(30 * time.Second), - through: testInstant.Add(30 * time.Second).Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(45 * time.Second), - through: testInstant.Add(45 * time.Second).Add(5 * time.Minute), - }, - }, - }, - } - ) - - for i, scenario := range scenarios { - // The compaction system assumes that values are sorted on input. - sort.Sort(startsAtSort{scenario.in}) - - out = optimizeTimeGroups(scenario.in) - - if len(out) != len(scenario.out) { - t.Fatalf("%d. expected length of %d, got %d", i, len(scenario.out), len(out)) - } - - for j, op := range out { - - if actual, ok := op.(*getValuesAtTimeOp); ok { - - if expected, ok := scenario.out[j].(*getValuesAtTimeOp); ok { - if expected.time.Unix() != actual.time.Unix() { - t.Fatalf("%d.%d. expected time %s, got %s", i, j, expected.time, actual.time) - } - } else { - t.Fatalf("%d.%d. expected getValuesAtTimeOp, got %s", i, j, actual) - } - - } else if actual, ok := op.(*getValuesAtIntervalOp); ok { - - if expected, ok := scenario.out[j].(*getValuesAtIntervalOp); ok { - // Shaving off nanoseconds. - if expected.from.Unix() != actual.from.Unix() { - t.Fatalf("%d.%d. expected from %s, got %s", i, j, expected.from, actual.from) - } - if expected.through.Unix() != actual.through.Unix() { - t.Fatalf("%d.%d. expected through %s, got %s", i, j, expected.through, actual.through) - } - if expected.interval != (actual.interval) { - t.Fatalf("%d.%d. expected interval %s, got %s", i, j, expected.interval, actual.interval) - } - } else { - t.Fatalf("%d.%d. expected getValuesAtIntervalOp, got %s", i, j, actual) - } - - } else if actual, ok := op.(*getValuesAlongRangeOp); ok { - - if expected, ok := scenario.out[j].(*getValuesAlongRangeOp); ok { - if expected.from.Unix() != actual.from.Unix() { - t.Fatalf("%d.%d. expected from %s, got %s", i, j, expected.from, actual.from) - } - if expected.through.Unix() != actual.through.Unix() { - t.Fatalf("%d.%d. expected through %s, got %s", i, j, expected.through, actual.through) - } - } else { - t.Fatalf("%d.%d. expected getValuesAlongRangeOp, got %s", i, j, actual) - } - - } - - } - } -} - -func TestOptimizeTimeGroups(t *testing.T) { - testOptimizeTimeGroups(t) -} - -func BenchmarkOptimizeTimeGroups(b *testing.B) { - for i := 0; i < b.N; i++ { - testOptimizeTimeGroups(b) - } -} - -func testOptimizeForward(t test.Tester) { - var ( - out ops - - scenarios = []struct { - in ops - out ops - }{ - // Compact Interval with Subservient Range - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant.Add(1 * time.Minute), - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - }, - }, - }, - // Compact Ranges with Subservient Range - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant.Add(1 * time.Minute), - through: testInstant.Add(2 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - }, - }, - }, - // Carving Middle Elements - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(5 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(2 * time.Minute), - through: testInstant.Add(3 * time.Minute), - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(2 * time.Minute), - through: testInstant.Add(3 * time.Minute), - }, - &getValuesAtIntervalOp{ - // Since the range operation consumes Now() + 3 Minutes, we start - // an additional ten seconds later. - from: testInstant.Add(3 * time.Minute).Add(10 * time.Second), - through: testInstant.Add(5 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - // Compact Subservient Points with Range - // The points are at half-minute offsets due to optimizeTimeGroups - // work. - { - in: ops{ - &getValuesAtTimeOp{ - time: testInstant.Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(1 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(2 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(3 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(4 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(5 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(6 * time.Minute).Add(30 * time.Second), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(1 * time.Minute), - through: testInstant.Add(5 * time.Minute), - }, - }, - out: ops{ - &getValuesAtTimeOp{ - time: testInstant.Add(30 * time.Second), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(1 * time.Minute), - through: testInstant.Add(5 * time.Minute), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(5 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(6 * time.Minute).Add(30 * time.Second), - }, - }, - }, - // Regression Validation 1: Multiple Overlapping Interval Requests - // We expect to find compaction. - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(15 * time.Second), - through: testInstant.Add(15 * time.Second).Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(30 * time.Second), - through: testInstant.Add(30 * time.Second).Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(45 * time.Second), - through: testInstant.Add(45 * time.Second).Add(5 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(45 * time.Second).Add(5 * time.Minute), - }, - }, - }, - // Range with subsequent overlapping interval. - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - }, - &getValuesAtIntervalOp{ - from: testInstant.Add(1 * time.Minute), - through: testInstant.Add(4 * time.Minute), - interval: time.Second * 10, - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - }, - &getValuesAtIntervalOp{ - from: testInstant.Add(3*time.Minute + 10*time.Second), - through: testInstant.Add(4 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - } - ) - - for i, scenario := range scenarios { - // The compaction system assumes that values are sorted on input. - sort.Sort(startsAtSort{scenario.in}) - - out = optimizeForward(scenario.in) - - if len(out) != len(scenario.out) { - t.Fatalf("%d. expected length of %d, got %d", i, len(scenario.out), len(out)) - } - - for j, op := range out { - - if actual, ok := op.(*getValuesAtTimeOp); ok { - - if expected, ok := scenario.out[j].(*getValuesAtTimeOp); ok { - if expected.time.Unix() != actual.time.Unix() { - t.Fatalf("%d.%d. expected time %s, got %s", i, j, expected.time, actual.time) - } - } else { - t.Fatalf("%d.%d. expected getValuesAtTimeOp, got %s", i, j, actual) - } - - } else if actual, ok := op.(*getValuesAtIntervalOp); ok { - - if expected, ok := scenario.out[j].(*getValuesAtIntervalOp); ok { - // Shaving off nanoseconds. - if expected.from.Unix() != actual.from.Unix() { - t.Fatalf("%d.%d. expected from %s, got %s", i, j, expected.from, actual.from) - } - if expected.through.Unix() != actual.through.Unix() { - t.Fatalf("%d.%d. expected through %s, got %s", i, j, expected.through, actual.through) - } - if expected.interval != (actual.interval) { - t.Fatalf("%d.%d. expected interval %s, got %s", i, j, expected.interval, actual.interval) - } - } else { - t.Fatalf("%d.%d. expected getValuesAtIntervalOp, got %s", i, j, actual) - } - - } else if actual, ok := op.(*getValuesAlongRangeOp); ok { - - if expected, ok := scenario.out[j].(*getValuesAlongRangeOp); ok { - if expected.from.Unix() != actual.from.Unix() { - t.Fatalf("%d.%d. expected from %s, got %s", i, j, expected.from, actual.from) - } - if expected.through.Unix() != actual.through.Unix() { - t.Fatalf("%d.%d. expected through %s, got %s", i, j, expected.through, actual.through) - } - } else { - t.Fatalf("%d.%d. expected getValuesAlongRangeOp, got %s", i, j, actual) - } - - } - - } - } -} - -func TestOptimizeForward(t *testing.T) { - testOptimizeForward(t) -} - -func BenchmarkOptimizeForward(b *testing.B) { - for i := 0; i < b.N; i++ { - testOptimizeForward(b) - } -} - -func testOptimize(t test.Tester) { - var ( - out ops - - scenarios = []struct { - in ops - out ops - }{ - // Empty set; return empty set. - { - in: ops{}, - out: ops{}, - }, - // Single time; return single time. - { - in: ops{ - &getValuesAtTimeOp{ - time: testInstant, - }, - }, - out: ops{ - &getValuesAtTimeOp{ - time: testInstant, - }, - }, - }, - // Single range; return single range. - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - }, - }, - // Single interval; return single interval. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - }, - }, - // Duplicate points; return single point. - { - in: ops{ - &getValuesAtTimeOp{ - time: testInstant, - }, - &getValuesAtTimeOp{ - time: testInstant, - }, - }, - out: ops{ - &getValuesAtTimeOp{ - time: testInstant, - }, - }, - }, - // Duplicate ranges; return single range. - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - }, - }, - // Duplicate intervals; return single interval. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - }, - }, - // Subordinate interval; return master. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 5, - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 5, - }, - }, - }, - // Subordinate range; return master. - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - }, - }, - // Equal range with different interval; return both. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - /* - // Different range with different interval; return best. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 5, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 5, - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 5, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - // Include Truncated Intervals with Range. - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(1 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(30 * time.Second), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(30 * time.Second), - }, - &getValuesAtIntervalOp{ - from: testInstant.Add(30 * time.Second), - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - // Compacted Forward Truncation - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - &getValuesAtIntervalOp{ - from: testInstant.Add(2 * time.Minute), - through: testInstant.Add(3 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - // Compacted Tail Truncation - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - }, - &getValuesAtIntervalOp{ - from: testInstant.Add(2 * time.Minute), - through: testInstant.Add(3 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - */ - // Compact Interval with Subservient Range - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant.Add(1 * time.Minute), - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - }, - }, - }, - // Compact Ranges with Subservient Range - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant.Add(1 * time.Minute), - through: testInstant.Add(2 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(3 * time.Minute), - }, - }, - }, - // Carving Middle Elements - { - in: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(5 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(2 * time.Minute), - through: testInstant.Add(3 * time.Minute), - }, - }, - out: ops{ - &getValuesAtIntervalOp{ - from: testInstant, - through: testInstant.Add(2 * time.Minute), - interval: time.Second * 10, - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(2 * time.Minute), - through: testInstant.Add(3 * time.Minute), - }, - &getValuesAtIntervalOp{ - // Since the range operation consumes Now() + 3 Minutes, we start - // an additional ten seconds later. - from: testInstant.Add(3 * time.Minute).Add(10 * time.Second), - through: testInstant.Add(5 * time.Minute), - interval: time.Second * 10, - }, - }, - }, - // Compact Subservient Points with Range - // The points are at half-minute offsets due to optimizeTimeGroups - // work. - { - in: ops{ - &getValuesAtTimeOp{ - time: testInstant.Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(1 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(2 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(3 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(4 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(5 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(6 * time.Minute).Add(30 * time.Second), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(1 * time.Minute), - through: testInstant.Add(5 * time.Minute), - }, - }, - out: ops{ - &getValuesAtTimeOp{ - time: testInstant.Add(30 * time.Second), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(1 * time.Minute), - through: testInstant.Add(5 * time.Minute), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(5 * time.Minute).Add(30 * time.Second), - }, - &getValuesAtTimeOp{ - time: testInstant.Add(6 * time.Minute).Add(30 * time.Second), - }, - }, - }, - // Regression Validation 1: Multiple Overlapping Interval Requests - // We expect to find compaction. - { - in: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(15 * time.Second), - through: testInstant.Add(15 * time.Second).Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(30 * time.Second), - through: testInstant.Add(30 * time.Second).Add(5 * time.Minute), - }, - &getValuesAlongRangeOp{ - from: testInstant.Add(45 * time.Second), - through: testInstant.Add(45 * time.Second).Add(5 * time.Minute), - }, - }, - out: ops{ - &getValuesAlongRangeOp{ - from: testInstant, - through: testInstant.Add(45 * time.Second).Add(5 * time.Minute), - }, - }, - }, - } - ) - - for i, scenario := range scenarios { - // The compaction system assumes that values are sorted on input. - sort.Sort(startsAtSort{scenario.in}) - - out = optimize(scenario.in) - - if len(out) != len(scenario.out) { - t.Fatalf("%d. expected length of %d, got %d", i, len(scenario.out), len(out)) - } - - for j, op := range out { - - if actual, ok := op.(*getValuesAtTimeOp); ok { - - if expected, ok := scenario.out[j].(*getValuesAtTimeOp); ok { - if expected.time.Unix() != actual.time.Unix() { - t.Fatalf("%d.%d. expected time %s, got %s", i, j, expected.time, actual.time) - } - } else { - t.Fatalf("%d.%d. expected getValuesAtTimeOp, got %s", i, j, actual) - } - - } else if actual, ok := op.(*getValuesAtIntervalOp); ok { - - if expected, ok := scenario.out[j].(*getValuesAtIntervalOp); ok { - // Shaving off nanoseconds. - if expected.from.Unix() != actual.from.Unix() { - t.Fatalf("%d.%d. expected from %s, got %s", i, j, expected.from, actual.from) - } - if expected.through.Unix() != actual.through.Unix() { - t.Fatalf("%d.%d. expected through %s, got %s", i, j, expected.through, actual.through) - } - if expected.interval != (actual.interval) { - t.Fatalf("%d.%d. expected interval %s, got %s", i, j, expected.interval, actual.interval) - } - } else { - t.Fatalf("%d.%d. expected getValuesAtIntervalOp, got %s", i, j, actual) - } - - } else if actual, ok := op.(*getValuesAlongRangeOp); ok { - - if expected, ok := scenario.out[j].(*getValuesAlongRangeOp); ok { - if expected.from.Unix() != actual.from.Unix() { - t.Fatalf("%d.%d. expected from %s, got %s", i, j, expected.from, actual.from) - } - if expected.through.Unix() != actual.through.Unix() { - t.Fatalf("%d.%d. expected through %s, got %s", i, j, expected.through, actual.through) - } - } else { - t.Fatalf("%d.%d. expected getValuesAlongRangeOp, got %s", i, j, actual) - } - - } - - } - } -} - -func TestOptimize(t *testing.T) { - testOptimize(t) -} - -func BenchmarkOptimize(b *testing.B) { - for i := 0; i < b.N; i++ { - testOptimize(b) - } -} - func TestGetValuesAtTimeOp(t *testing.T) { var scenarios = []struct { op getValuesAtTimeOp @@ -1212,13 +27,13 @@ func TestGetValuesAtTimeOp(t *testing.T) { // No values. { op: getValuesAtTimeOp{ - time: testInstant, + baseOp: baseOp{current: testInstant}, }, }, // Operator time before single value. { op: getValuesAtTimeOp{ - time: testInstant, + baseOp: baseOp{current: testInstant}, }, in: Values{ { @@ -1236,7 +51,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { // Operator time exactly at single value. { op: getValuesAtTimeOp{ - time: testInstant.Add(1 * time.Minute), + baseOp: baseOp{current: testInstant.Add(1 * time.Minute)}, }, in: Values{ { @@ -1254,7 +69,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { // Operator time after single value. { op: getValuesAtTimeOp{ - time: testInstant.Add(2 * time.Minute), + baseOp: baseOp{current: testInstant.Add(2 * time.Minute)}, }, in: Values{ { @@ -1272,7 +87,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { // Operator time before two values. { op: getValuesAtTimeOp{ - time: testInstant, + baseOp: baseOp{current: testInstant}, }, in: Values{ { @@ -1294,7 +109,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { // Operator time at first of two values. { op: getValuesAtTimeOp{ - time: testInstant.Add(1 * time.Minute), + baseOp: baseOp{current: testInstant.Add(1 * time.Minute)}, }, in: Values{ { @@ -1316,7 +131,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { // Operator time between first and second of two values. { op: getValuesAtTimeOp{ - time: testInstant.Add(90 * time.Second), + baseOp: baseOp{current: testInstant.Add(90 * time.Second)}, }, in: Values{ { @@ -1342,7 +157,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { // Operator time at second of two values. { op: getValuesAtTimeOp{ - time: testInstant.Add(2 * time.Minute), + baseOp: baseOp{current: testInstant.Add(2 * time.Minute)}, }, in: Values{ { @@ -1368,7 +183,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { // Operator time after second of two values. { op: getValuesAtTimeOp{ - time: testInstant.Add(3 * time.Minute), + baseOp: baseOp{current: testInstant.Add(3 * time.Minute)}, }, in: Values{ { @@ -1412,7 +227,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { // No values. { op: getValuesAtIntervalOp{ - from: testInstant, + baseOp: baseOp{current: testInstant}, through: testInstant.Add(1 * time.Minute), interval: 30 * time.Second, }, @@ -1420,7 +235,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { // Entire operator range before first value. { op: getValuesAtIntervalOp{ - from: testInstant, + baseOp: baseOp{current: testInstant}, through: testInstant.Add(1 * time.Minute), interval: 30 * time.Second, }, @@ -1444,7 +259,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { // Operator range starts before first value, ends within available values. { op: getValuesAtIntervalOp{ - from: testInstant, + baseOp: baseOp{current: testInstant}, through: testInstant.Add(2 * time.Minute), interval: 30 * time.Second, }, @@ -1472,7 +287,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { // Entire operator range is within available values. { op: getValuesAtIntervalOp{ - from: testInstant.Add(1 * time.Minute), + baseOp: baseOp{current: testInstant.Add(1 * time.Minute)}, through: testInstant.Add(2 * time.Minute), interval: 30 * time.Second, }, @@ -1504,7 +319,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { // Operator range begins before first value, ends after last. { op: getValuesAtIntervalOp{ - from: testInstant, + baseOp: baseOp{current: testInstant}, through: testInstant.Add(3 * time.Minute), interval: 30 * time.Second, }, @@ -1532,7 +347,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { // Operator range begins within available values, ends after the last value. { op: getValuesAtIntervalOp{ - from: testInstant.Add(2 * time.Minute), + baseOp: baseOp{current: testInstant.Add(2 * time.Minute)}, through: testInstant.Add(4 * time.Minute), interval: 30 * time.Second, }, @@ -1568,7 +383,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { // Entire operator range after the last available value. { op: getValuesAtIntervalOp{ - from: testInstant.Add(2 * time.Minute), + baseOp: baseOp{current: testInstant.Add(2 * time.Minute)}, through: testInstant.Add(3 * time.Minute), interval: 30 * time.Second, }, @@ -1596,7 +411,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { // skip over values for the test). { op: getValuesAtIntervalOp{ - from: testInstant.Add(30 * time.Second), + baseOp: baseOp{current: testInstant.Add(30 * time.Second)}, through: testInstant.Add(4 * time.Minute), interval: 3 * time.Minute, }, @@ -1665,14 +480,14 @@ func TestGetValuesAlongRangeOp(t *testing.T) { // No values. { op: getValuesAlongRangeOp{ - from: testInstant, + baseOp: baseOp{current: testInstant}, through: testInstant.Add(1 * time.Minute), }, }, // Entire operator range before first value. { op: getValuesAlongRangeOp{ - from: testInstant, + baseOp: baseOp{current: testInstant}, through: testInstant.Add(1 * time.Minute), }, in: Values{ @@ -1690,7 +505,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { // Operator range starts before first value, ends within available values. { op: getValuesAlongRangeOp{ - from: testInstant, + baseOp: baseOp{current: testInstant}, through: testInstant.Add(2 * time.Minute), }, in: Values{ @@ -1713,7 +528,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { // Entire operator range is within available values. { op: getValuesAlongRangeOp{ - from: testInstant.Add(1 * time.Minute), + baseOp: baseOp{current: testInstant.Add(1 * time.Minute)}, through: testInstant.Add(2 * time.Minute), }, in: Values{ @@ -1740,7 +555,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { // Operator range begins before first value, ends after last. { op: getValuesAlongRangeOp{ - from: testInstant, + baseOp: baseOp{current: testInstant}, through: testInstant.Add(3 * time.Minute), }, in: Values{ @@ -1767,7 +582,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { // Operator range begins within available values, ends after the last value. { op: getValuesAlongRangeOp{ - from: testInstant.Add(2 * time.Minute), + baseOp: baseOp{current: testInstant.Add(2 * time.Minute)}, through: testInstant.Add(4 * time.Minute), }, in: Values{ @@ -1802,7 +617,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { // Entire operator range after the last available value. { op: getValuesAlongRangeOp{ - from: testInstant.Add(2 * time.Minute), + baseOp: baseOp{current: testInstant.Add(2 * time.Minute)}, through: testInstant.Add(3 * time.Minute), }, in: Values{ @@ -1833,7 +648,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { func TestGetValueRangeAtIntervalOp(t *testing.T) { testOp := getValueRangeAtIntervalOp{ - rangeFrom: testInstant.Add(-2 * time.Minute), + baseOp: baseOp{current: testInstant.Add(-2 * time.Minute)}, rangeThrough: testInstant, rangeDuration: 2 * time.Minute, interval: 10 * time.Minute, diff --git a/storage/metric/scanjob.go b/storage/metric/scanjob.go deleted file mode 100644 index 717cc3780a..0000000000 --- a/storage/metric/scanjob.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2013 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric - -import ( - "bytes" - "fmt" - - clientmodel "github.com/prometheus/client_golang/model" -) - -// scanJob models a range of queries. -type scanJob struct { - fingerprint *clientmodel.Fingerprint - operations ops -} - -func (s scanJob) String() string { - buffer := &bytes.Buffer{} - fmt.Fprintf(buffer, "Scan Job { fingerprint=%s ", s.fingerprint) - fmt.Fprintf(buffer, " with %d operations [", len(s.operations)) - for _, operation := range s.operations { - fmt.Fprintf(buffer, "%s", operation) - } - fmt.Fprintf(buffer, "] }") - - return buffer.String() -} - -type scanJobs []scanJob - -func (s scanJobs) Len() int { - return len(s) -} - -func (s scanJobs) Less(i, j int) (less bool) { - less = s[i].fingerprint.Less(s[j].fingerprint) - - return -} - -func (s scanJobs) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 44638b04db..54b2f0c5d4 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -115,7 +115,13 @@ const ( const watermarkCacheLimit = 1024 * 1024 // NewTieredStorage returns a TieredStorage object ready to use. -func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval time.Duration, memoryTTL time.Duration, rootDirectory string) (*TieredStorage, error) { +func NewTieredStorage( + appendToDiskQueueDepth, + viewQueueDepth uint, + flushMemoryInterval time.Duration, + memoryTTL time.Duration, + rootDirectory string, +) (*TieredStorage, error) { if isDir, _ := utility.IsDir(rootDirectory); !isDir { if err := os.MkdirAll(rootDirectory, 0755); err != nil { return nil, fmt.Errorf("could not find or create metrics directory %s: %s", rootDirectory, err) @@ -389,11 +395,15 @@ func (t *TieredStorage) renderView(viewJob viewJob) { duration := time.Since(begin) - recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure}) + recordOutcome( + duration, + err, + map[string]string{operation: renderView, result: success}, + map[string]string{operation: renderView, result: failure}, + ) }() scanJobsTimer := viewJob.stats.GetTimer(stats.ViewScanJobsTime).Start() - scans := viewJob.builder.ScanJobs() scanJobsTimer.Stop() view := newView() @@ -410,135 +420,104 @@ func (t *TieredStorage) renderView(viewJob viewJob) { defer t.dtoSampleKeys.Give(sampleKeyDto) extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start() - for _, scanJob := range scans { - old, err := t.seriesTooOld(scanJob.fingerprint, scanJob.operations[0].CurrentTime()) + for viewJob.builder.HasOp() { + op := viewJob.builder.PopOp() + fp := op.Fingerprint() + old, err := t.seriesTooOld(fp, op.CurrentTime()) if err != nil { - glog.Errorf("Error getting watermark from cache for %s: %s", scanJob.fingerprint, err) + glog.Errorf("Error getting watermark from cache for %s: %s", fp, err) continue } if old { continue } - standingOps := scanJob.operations - memValues := t.memoryArena.CloneSamples(scanJob.fingerprint) + memValues := t.memoryArena.CloneSamples(fp) - for len(standingOps) > 0 { - // Abort the view rendering if the caller (MakeView) has timed out. - if len(viewJob.abort) > 0 { - return - } + // Abort the view rendering if the caller (MakeView) has timed out. + if len(viewJob.abort) > 0 { + return + } - // Load data value chunk(s) around the first standing op's current time. - targetTime := standingOps[0].CurrentTime() + // Load data value chunk(s) around the current time. + targetTime := op.CurrentTime() - currentChunk := chunk{} - // If we aimed before the oldest value in memory, load more data from disk. - if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent { - if iterator == nil { - // Get a single iterator that will be used for all data extraction - // below. - iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true) - defer iterator.Close() - if diskPresent = iterator.SeekToLast(); diskPresent { + currentChunk := chunk{} + // If we aimed before the oldest value in memory, load more data from disk. + if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent { + if iterator == nil { + // Get a single iterator that will be used for all data extraction + // below. + iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true) + defer iterator.Close() + if diskPresent = iterator.SeekToLast(); diskPresent { + if err := iterator.Key(sampleKeyDto); err != nil { + panic(err) + } + + lastBlock.Load(sampleKeyDto) + + if !iterator.SeekToFirst() { + diskPresent = false + } else { if err := iterator.Key(sampleKeyDto); err != nil { panic(err) } - lastBlock.Load(sampleKeyDto) - - if !iterator.SeekToFirst() { - diskPresent = false - } else { - if err := iterator.Key(sampleKeyDto); err != nil { - panic(err) - } - - firstBlock.Load(sampleKeyDto) - } + firstBlock.Load(sampleKeyDto) } } + } - if diskPresent { - diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start() - diskValues, expired := t.loadChunkAroundTime(iterator, scanJob.fingerprint, targetTime, firstBlock, lastBlock) - if expired { - diskPresent = false - } - diskTimer.Stop() + if diskPresent { + diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start() + diskValues, expired := t.loadChunkAroundTime( + iterator, + fp, + targetTime, + firstBlock, + lastBlock, + ) + if expired { + diskPresent = false + } + diskTimer.Stop() - // If we aimed past the newest value on disk, combine it with the next value from memory. - if len(diskValues) == 0 { - currentChunk = chunk(memValues) - } else { - if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { - latestDiskValue := diskValues[len(diskValues)-1:] - currentChunk = append(chunk(latestDiskValue), chunk(memValues)...) - } else { - currentChunk = chunk(diskValues) - } - } - } else { + // If we aimed past the newest value on disk, + // combine it with the next value from memory. + if len(diskValues) == 0 { currentChunk = chunk(memValues) + } else { + if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { + latestDiskValue := diskValues[len(diskValues)-1:] + currentChunk = append(chunk(latestDiskValue), chunk(memValues)...) + } else { + currentChunk = chunk(diskValues) + } } } else { currentChunk = chunk(memValues) } + } else { + currentChunk = chunk(memValues) + } - // There's no data at all for this fingerprint, so stop processing ops for it. - if len(currentChunk) == 0 { - break - } + // There's no data at all for this fingerprint, so stop processing. + if len(currentChunk) == 0 { + continue + } - currentChunk = currentChunk.TruncateBefore(targetTime) + currentChunk = currentChunk.TruncateBefore(targetTime) - lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp - if lastChunkTime.After(targetTime) { - targetTime = lastChunkTime - } + lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp + if lastChunkTime.After(targetTime) { + targetTime = lastChunkTime + } - // For each op, extract all needed data from the current chunk. - out := Values{} - for _, op := range standingOps { - if op.CurrentTime().After(targetTime) { - break - } - - currentChunk = currentChunk.TruncateBefore(op.CurrentTime()) - - for !op.Consumed() && !op.CurrentTime().After(targetTime) { - out = op.ExtractSamples(Values(currentChunk)) - - // Append the extracted samples to the materialized view. - view.appendSamples(scanJob.fingerprint, out) - } - } - - // Throw away standing ops which are finished. - filteredOps := ops{} - for _, op := range standingOps { - if !op.Consumed() { - filteredOps = append(filteredOps, op) - continue - } - - giveBackOp(op) - } - standingOps = filteredOps - - // Sort ops by start time again, since they might be slightly off now. - // For example, consider a current chunk of values and two interval ops - // with different interval lengths. Their states after the cycle above - // could be: - // - // (C = current op time) - // - // Chunk: [ X X X X X ] - // Op 1: [ X X C . . . ] - // Op 2: [ X X C . . .] - // - // Op 2 now has an earlier current time than Op 1. - sort.Sort(startsAtSort{standingOps}) + // Extract all needed data from the current chunk and append the + // extracted samples to the materialized view. + for !op.Consumed() && !op.CurrentTime().After(targetTime) { + view.appendSamples(fp, op.ExtractSamples(Values(currentChunk))) } } extractionTimer.Stop() @@ -547,7 +526,13 @@ func (t *TieredStorage) renderView(viewJob viewJob) { return } -func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerprint *clientmodel.Fingerprint, ts clientmodel.Timestamp, firstBlock, lastBlock *SampleKey) (chunk Values, expired bool) { +func (t *TieredStorage) loadChunkAroundTime( + iterator leveldb.Iterator, + fingerprint *clientmodel.Fingerprint, + ts clientmodel.Timestamp, + firstBlock, + lastBlock *SampleKey, +) (chunk Values, expired bool) { if fingerprint.Less(firstBlock.Fingerprint) { return nil, false } diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index 0d8ac5b5f1..b129be1cde 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -68,7 +68,7 @@ func testMakeView(t test.Tester, flushToDisk bool) { in: in{ atTime: []getValuesAtTimeOp{ { - time: instant, + baseOp: baseOp{current: instant}, }, }, }, @@ -88,7 +88,7 @@ func testMakeView(t test.Tester, flushToDisk bool) { in: in{ atTime: []getValuesAtTimeOp{ { - time: instant, + baseOp: baseOp{current: instant}, }, }, }, @@ -120,7 +120,7 @@ func testMakeView(t test.Tester, flushToDisk bool) { in: in{ atTime: []getValuesAtTimeOp{ { - time: instant, + baseOp: baseOp{current: instant}, }, }, }, @@ -147,7 +147,7 @@ func testMakeView(t test.Tester, flushToDisk bool) { in: in{ atTime: []getValuesAtTimeOp{ { - time: instant.Add(time.Second), + baseOp: baseOp{current: instant.Add(time.Second)}, }, }, }, @@ -179,7 +179,7 @@ func testMakeView(t test.Tester, flushToDisk bool) { in: in{ atTime: []getValuesAtTimeOp{ { - time: instant, + baseOp: baseOp{current: instant}, }, }, }, @@ -216,7 +216,7 @@ func testMakeView(t test.Tester, flushToDisk bool) { in: in{ atTime: []getValuesAtTimeOp{ { - time: instant.Add(time.Second), + baseOp: baseOp{current: instant.Add(time.Second)}, }, }, }, @@ -253,7 +253,7 @@ func testMakeView(t test.Tester, flushToDisk bool) { in: in{ atTime: []getValuesAtTimeOp{ { - time: instant.Add(time.Second), + baseOp: baseOp{current: instant.Add(time.Second)}, }, }, }, @@ -294,7 +294,7 @@ func testMakeView(t test.Tester, flushToDisk bool) { in: in{ atTime: []getValuesAtTimeOp{ { - time: instant.Add(time.Second * 3), + baseOp: baseOp{current: instant.Add(time.Second * 3)}, }, }, }, @@ -319,7 +319,7 @@ func testMakeView(t test.Tester, flushToDisk bool) { in: in{ atTime: []getValuesAtTimeOp{ { - time: instant.Add(time.Second*time.Duration(*leveldbChunkSize*2) + clientmodel.MinimumTick), + baseOp: baseOp{current: instant.Add(time.Second*time.Duration(*leveldbChunkSize*2) + clientmodel.MinimumTick)}, }, }, }, @@ -356,15 +356,15 @@ func testMakeView(t test.Tester, flushToDisk bool) { requestBuilder := NewViewRequestBuilder() for _, atTime := range scenario.in.atTime { - requestBuilder.GetMetricAtTime(fingerprint, atTime.time) + requestBuilder.GetMetricAtTime(fingerprint, atTime.current) } for _, atInterval := range scenario.in.atInterval { - requestBuilder.GetMetricAtInterval(fingerprint, atInterval.from, atInterval.through, atInterval.interval) + requestBuilder.GetMetricAtInterval(fingerprint, atInterval.current, atInterval.through, atInterval.interval) } for _, alongRange := range scenario.in.alongRange { - requestBuilder.GetMetricRange(fingerprint, alongRange.from, alongRange.through) + requestBuilder.GetMetricRange(fingerprint, alongRange.current, alongRange.through) } v, err := tiered.MakeView(requestBuilder, time.Second*5, stats.NewTimerGroup()) @@ -374,7 +374,7 @@ func testMakeView(t test.Tester, flushToDisk bool) { } for j, atTime := range scenario.in.atTime { - actual := v.GetValueAtTime(fingerprint, atTime.time) + actual := v.GetValueAtTime(fingerprint, atTime.current) if len(actual) != len(scenario.out.atTime[j]) { t.Fatalf("%d.%d. expected %d output, got %d", i, j, len(scenario.out.atTime[j]), len(actual)) diff --git a/storage/metric/view.go b/storage/metric/view.go index 6311991edf..5b1c51b707 100644 --- a/storage/metric/view.go +++ b/storage/metric/view.go @@ -14,7 +14,7 @@ package metric import ( - "sort" + "container/heap" "time" clientmodel "github.com/prometheus/client_golang/model" @@ -28,110 +28,87 @@ var ( ) // ViewRequestBuilder represents the summation of all datastore queries that -// shall be performed to extract values. Each operation mutates the state of -// the builder. +// shall be performed to extract values. Call the Get... methods to record the +// queries. Once done, use HasOp and PopOp to retrieve the resulting +// operations. The operations are sorted by their fingerprint (and, for equal +// fingerprints, by the StartsAt timestamp of their operation). type ViewRequestBuilder interface { + // GetMetricAtTime records a query to get, for the given Fingerprint, + // either the value at that time if there is a match or the one or two + // values adjacent thereto. GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp) + // GetMetricAtInterval records a query to get, for the given + // Fingerprint, either the value at that interval from From through + // Through if there is a match or the one or two values adjacent for + // each point. GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) + // GetMetricRange records a query to get, for the given Fingerprint, the + // values that occur inclusively from From through Through. GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp) - ScanJobs() scanJobs + // GetMetricRangeAtInterval records a query to get value ranges at + // intervals for the given Fingerprint: + // + // |----| |----| |----| |----| + // ^ ^ ^ ^ ^ ^ + // | \------------/ \----/ | + // from interval rangeDuration through + GetMetricRangeAtInterval(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) + // PopOp emits the next operation in the queue (sorted by + // fingerprint). If called while HasOps returns false, the + // behavior is undefined. + PopOp() op + // HasOp returns true if there is at least one more operation in the + // queue. + HasOp() bool } -// viewRequestBuilder contains the various unoptimized requests for data. +// viewRequestBuilder contains the various requests for data. type viewRequestBuilder struct { - operations map[clientmodel.Fingerprint]ops + operations ops } // NewViewRequestBuilder furnishes a ViewRequestBuilder for remarking what types // of queries to perform. func NewViewRequestBuilder() *viewRequestBuilder { - return &viewRequestBuilder{ - operations: make(map[clientmodel.Fingerprint]ops), - } + return &viewRequestBuilder{} } var getValuesAtTimes = newValueAtTimeList(10 * 1024) -// GetMetricAtTime gets for the given Fingerprint either the value at that time -// if there is an match or the one or two values adjacent thereto. -func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp) { - ops := v.operations[*fingerprint] - op, _ := getValuesAtTimes.Get() - op.time = time - ops = append(ops, op) - v.operations[*fingerprint] = ops +// GetMetricAtTime implements ViewRequestBuilder. +func (v *viewRequestBuilder) GetMetricAtTime(fp *clientmodel.Fingerprint, time clientmodel.Timestamp) { + heap.Push(&v.operations, getValuesAtTimes.Get(fp, time)) } var getValuesAtIntervals = newValueAtIntervalList(10 * 1024) -// GetMetricAtInterval gets for the given Fingerprint either the value at that -// interval from From through Through if there is an match or the one or two -// values adjacent for each point. -func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) { - ops := v.operations[*fingerprint] - op, _ := getValuesAtIntervals.Get() - op.from = from - op.through = through - op.interval = interval - ops = append(ops, op) - v.operations[*fingerprint] = ops +// GetMetricAtInterval implements ViewRequestBuilder. +func (v *viewRequestBuilder) GetMetricAtInterval(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) { + heap.Push(&v.operations, getValuesAtIntervals.Get(fp, from, through, interval)) } var getValuesAlongRanges = newValueAlongRangeList(10 * 1024) -// GetMetricRange gets for the given Fingerprint the values that occur -// inclusively from From through Through. -func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp) { - ops := v.operations[*fingerprint] - op, _ := getValuesAlongRanges.Get() - op.from = from - op.through = through - ops = append(ops, op) - v.operations[*fingerprint] = ops +// GetMetricRange implements ViewRequestBuilder. +func (v *viewRequestBuilder) GetMetricRange(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp) { + heap.Push(&v.operations, getValuesAlongRanges.Get(fp, from, through)) } var getValuesAtIntervalAlongRanges = newValueAtIntervalAlongRangeList(10 * 1024) -// GetMetricRangeAtInterval gets value ranges at intervals for the given -// Fingerprint: -// -// |----| |----| |----| |----| -// ^ ^ ^ ^ ^ ^ -// | \------------/ \----/ | -// from interval rangeDuration through -func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) { - ops := v.operations[*fingerprint] - op, _ := getValuesAtIntervalAlongRanges.Get() - op.rangeFrom = from - op.rangeThrough = from.Add(rangeDuration) - op.rangeDuration = rangeDuration - op.interval = interval - op.through = through - ops = append(ops, op) - v.operations[*fingerprint] = ops +// GetMetricRangeAtInterval implements ViewRequestBuilder. +func (v *viewRequestBuilder) GetMetricRangeAtInterval(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) { + heap.Push(&v.operations, getValuesAtIntervalAlongRanges.Get(fp, from, through, interval, rangeDuration)) } -// ScanJobs emits the optimized scans that will occur in the data store. This -// effectively resets the ViewRequestBuilder back to a pristine state. -func (v *viewRequestBuilder) ScanJobs() (j scanJobs) { - for fingerprint, operations := range v.operations { - sort.Sort(startsAtSort{operations}) +// PopOp implements ViewRequestBuilder. +func (v *viewRequestBuilder) PopOp() op { + return heap.Pop(&v.operations).(op) +} - fpCopy := fingerprint - j = append(j, scanJob{ - fingerprint: &fpCopy, - // BUG: Evaluate whether we need to implement an optimize() working also - // for getValueRangeAtIntervalOp and use it here instead of just passing - // through the list of ops as-is. - operations: operations, - }) - - delete(v.operations, fingerprint) - } - - sort.Sort(j) - - return +// HasOp implements ViewRequestBuilder. +func (v *viewRequestBuilder) HasOp() bool { + return v.operations.Len() > 0 } type view struct { diff --git a/storage/metric/view_test.go b/storage/metric/view_test.go index 60458e9dd8..0aca240a68 100644 --- a/storage/metric/view_test.go +++ b/storage/metric/view_test.go @@ -105,6 +105,12 @@ func testBuilder(t test.Tester) { { fingerprint: "00000000000000000000-a-4-a", }, + { + fingerprint: "00000000000000000000-a-4-a", + }, + { + fingerprint: "00000000000000001111-a-4-a", + }, { fingerprint: "00000000000000001111-a-4-a", }, @@ -136,14 +142,18 @@ func testBuilder(t test.Tester) { { fingerprint: "00000000000000001111-a-4-a", }, + { + fingerprint: "00000000000000001111-a-4-a", + }, + { + fingerprint: "00000000000000001111-a-4-a", + }, }, }, } for i, scenario := range scenarios { - builder := viewRequestBuilder{ - operations: map[clientmodel.Fingerprint]ops{}, - } + builder := NewViewRequestBuilder() for _, atTime := range scenario.in.atTimes { fingerprint := &clientmodel.Fingerprint{} @@ -163,17 +173,15 @@ func testBuilder(t test.Tester) { builder.GetMetricRange(fingerprint, atRange.from, atRange.through) } - jobs := builder.ScanJobs() - - if len(scenario.out) != len(jobs) { - t.Fatalf("%d. expected job length of %d, got %d", i, len(scenario.out), len(jobs)) - } - for j, job := range scenario.out { - if jobs[j].fingerprint.String() != job.fingerprint { - t.Fatalf("%d.%d. expected fingerprint %s, got %s", i, j, job.fingerprint, jobs[j].fingerprint) + got := builder.PopOp() + if got.Fingerprint().String() != job.fingerprint { + t.Errorf("%d.%d. expected fingerprint %s, got %s", i, j, job.fingerprint, got.Fingerprint()) } } + if builder.HasOp() { + t.Error("Expected builder to have no scan jobs left.") + } } }