Improve predict_linear

Fixes https://github.com/prometheus/prometheus/issues/1401

This remove the last (and in fact bogus) use of BoundaryValues.

Thus, a whole lot of unused (and arguably sub-optimal / ugly) code can
be removed here, too.
This commit is contained in:
beorn7 2016-02-24 17:16:24 +01:00
parent 4b503ed9a5
commit c740789ce3
8 changed files with 66 additions and 196 deletions

View file

@ -575,15 +575,6 @@ func (ev *evaluator) evalMatrix(e Expr) matrix {
return mat
}
// evalMatrixBounds attempts to evaluate e to matrix boundaries and errors otherwise.
func (ev *evaluator) evalMatrixBounds(e Expr) matrix {
ms, ok := e.(*MatrixSelector)
if !ok {
ev.errorf("matrix bounds can only be evaluated for matrix selectors, got %T", e)
}
return ev.matrixSelectorBounds(ms)
}
// evalString attempts to evaluate e to a string value and errors otherwise.
func (ev *evaluator) evalString(e Expr) *model.String {
val := ev.eval(e)
@ -731,29 +722,6 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) matrix {
return matrix(sampleStreams)
}
// matrixSelectorBounds evaluates the boundaries of a *MatrixSelector.
func (ev *evaluator) matrixSelectorBounds(node *MatrixSelector) matrix {
interval := metric.Interval{
OldestInclusive: ev.Timestamp.Add(-node.Range - node.Offset),
NewestInclusive: ev.Timestamp.Add(-node.Offset),
}
sampleStreams := make([]*sampleStream, 0, len(node.iterators))
for fp, it := range node.iterators {
samplePairs := it.BoundaryValues(interval)
if len(samplePairs) == 0 {
continue
}
ss := &sampleStream{
Metric: node.metrics[fp],
Values: samplePairs,
}
sampleStreams = append(sampleStreams, ss)
}
return matrix(sampleStreams)
}
func (ev *evaluator) vectorAnd(lhs, rhs vector, matching *VectorMatching) vector {
if matching.Card != CardManyToMany {
panic("logical operations must always be many-to-many matching")

View file

@ -524,10 +524,37 @@ func funcLog10(ev *evaluator, args Expressions) model.Value {
return vector
}
// linearRegression performs a least-square linear regression analysis on the
// provided SamplePairs. It returns the slope, and the intercept value at the
// provided time.
func linearRegression(samples []model.SamplePair, interceptTime model.Time) (slope, intercept model.SampleValue) {
var (
n model.SampleValue
sumX, sumY model.SampleValue
sumXY, sumX2 model.SampleValue
)
for _, sample := range samples {
x := model.SampleValue(
model.Time(sample.Timestamp-interceptTime).UnixNano(),
) / 1e9
n += 1.0
sumY += sample.Value
sumX += x
sumXY += x * sample.Value
sumX2 += x * x
}
covXY := sumXY - sumX*sumY/n
varX := sumX2 - sumX*sumX/n
slope = covXY / varX
intercept = sumY/n - slope*sumX/n
return
}
// === deriv(node model.ValMatrix) Vector ===
func funcDeriv(ev *evaluator, args Expressions) model.Value {
resultVector := vector{}
mat := ev.evalMatrix(args[0])
resultVector := make(vector, 0, len(mat))
for _, samples := range mat {
// No sense in trying to compute a derivative without at least two points.
@ -535,29 +562,10 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value {
if len(samples.Values) < 2 {
continue
}
// Least squares.
var (
n model.SampleValue
sumX, sumY model.SampleValue
sumXY, sumX2 model.SampleValue
)
for _, sample := range samples.Values {
x := model.SampleValue(sample.Timestamp.UnixNano() / 1e9)
n += 1.0
sumY += sample.Value
sumX += x
sumXY += x * sample.Value
sumX2 += x * x
}
numerator := sumXY - sumX*sumY/n
denominator := sumX2 - (sumX*sumX)/n
resultValue := numerator / denominator
slope, _ := linearRegression(samples.Values, 0)
resultSample := &sample{
Metric: samples.Metric,
Value: resultValue,
Value: slope,
Timestamp: ev.Timestamp,
}
resultSample.Metric.Del(model.MetricNameLabel)
@ -568,44 +576,26 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value {
// === predict_linear(node model.ValMatrix, k model.ValScalar) Vector ===
func funcPredictLinear(ev *evaluator, args Expressions) model.Value {
vec := funcDeriv(ev, args[0:1]).(vector)
duration := model.SampleValue(model.SampleValue(ev.evalFloat(args[1])))
mat := ev.evalMatrix(args[0])
resultVector := make(vector, 0, len(mat))
duration := model.SampleValue(ev.evalFloat(args[1]))
excludedLabels := map[model.LabelName]struct{}{
model.MetricNameLabel: {},
}
// Calculate predicted delta over the duration.
signatureToDelta := map[uint64]model.SampleValue{}
for _, el := range vec {
signature := model.SignatureWithoutLabels(el.Metric.Metric, excludedLabels)
signatureToDelta[signature] = el.Value * duration
}
// add predicted delta to last value.
// TODO(beorn7): This is arguably suboptimal. The funcDeriv above has
// given us an estimate over the range. So we should add the delta to
// the value predicted for the end of the range. Also, once this has
// been rectified, we are not using BoundaryValues anywhere anymore, so
// we can kick out a whole lot of code.
matrixBounds := ev.evalMatrixBounds(args[0])
outVec := make(vector, 0, len(signatureToDelta))
for _, samples := range matrixBounds {
for _, samples := range mat {
// No sense in trying to predict anything without at least two points.
// Drop this vector element.
if len(samples.Values) < 2 {
continue
}
signature := model.SignatureWithoutLabels(samples.Metric.Metric, excludedLabels)
delta, ok := signatureToDelta[signature]
if ok {
samples.Metric.Del(model.MetricNameLabel)
outVec = append(outVec, &sample{
Metric: samples.Metric,
Value: delta + samples.Values[1].Value,
Timestamp: ev.Timestamp,
})
slope, intercept := linearRegression(samples.Values, ev.Timestamp)
resultSample := &sample{
Metric: samples.Metric,
Value: slope*duration + intercept,
Timestamp: ev.Timestamp,
}
resultSample.Metric.Del(model.MetricNameLabel)
resultVector = append(resultVector, resultSample)
}
return outVec
return resultVector
}
// === histogram_quantile(k model.ValScalar, vector model.ValVector) Vector ===

View file

@ -102,16 +102,28 @@ eval instant at 50m deriv(testcounter_reset_middle[100m])
{} 0.010606060606060607
# predict_linear should return correct result.
# X/s = [ 0, 300, 600, 900,1200,1500,1800,2100,2400,2700,3000]
# Y = [ 0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 50]
# sumX = 16500
# sumY = 250
# sumXY = 480000
# sumX2 = 34650000
# n = 11
# covXY = 105000
# varX = 9900000
# slope = 0.010606060606060607
# intercept at t=0: 6.818181818181818
# intercept at t=3000: 38.63636363636364
# intercept at t=3000+3600: 76.81818181818181
eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600)
{} 88.181818181818185200
{} 76.81818181818181
# predict_linear is syntactic sugar around deriv.
# With http_requests, there is a sample value exactly at the end of
# the range, and it has exactly the predicted value, so predict_linear
# can be emulated with deriv.
eval instant at 50m predict_linear(http_requests[50m], 3600) - (http_requests + deriv(http_requests[50m]) * 3600)
{group="canary", instance="1", job="app-server"} 0
eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600) - (testcounter_reset_middle + deriv(testcounter_reset_middle[100m]) * 3600)
{} 0
clear
# Tests for label_replace.

View file

@ -79,9 +79,6 @@ type SeriesIterator interface {
// exist at precisely the given time, that value is returned. If no
// applicable value exists, ZeroSamplePair is returned.
ValueAtOrBeforeTime(model.Time) model.SamplePair
// Gets the boundary values of an interval: the first and last value
// within a given interval.
BoundaryValues(metric.Interval) []model.SamplePair
// Gets all values contained within a given interval.
RangeValues(metric.Interval) []model.SamplePair
}

View file

@ -540,71 +540,6 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
return it.chunkIt.valueAtOrBeforeTime(t)
}
// BoundaryValues implements SeriesIterator.
func (it *memorySeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair {
// Find the first chunk for which the first sample is within the interval.
i := sort.Search(len(it.chunks), func(i int) bool {
return !it.chunks[i].firstTime().Before(in.OldestInclusive)
})
// Only now check the last timestamp of the previous chunk (which is
// fairly expensive).
if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) {
i--
}
values := make([]model.SamplePair, 0, 2)
for j, c := range it.chunks[i:] {
if c.firstTime().After(in.NewestInclusive) {
if len(values) == 1 {
// We found the first value before but are now
// already past the last value. The value we
// want must be the last value of the previous
// chunk. So backtrack...
chunkIt := it.chunkIterator(i + j - 1)
values = append(values, model.SamplePair{
Timestamp: chunkIt.lastTimestamp(),
Value: chunkIt.lastSampleValue(),
})
}
break
}
chunkIt := it.chunkIterator(i + j)
if len(values) == 0 {
for s := range chunkIt.values() {
if len(values) == 0 && !s.Timestamp.Before(in.OldestInclusive) {
values = append(values, *s)
// We cannot just break out here as we have to consume all
// the values to not leak a goroutine. This could obviously
// be made much neater with more suitable methods in the chunk
// interface. But currently, BoundaryValues is only used by
// `predict_linear` so we would pollute the chunk interface
// unduly just for one single corner case. Plus, even that use
// of BoundaryValues is suboptimal and should be replaced.
}
}
}
if chunkIt.lastTimestamp().After(in.NewestInclusive) {
s := chunkIt.valueAtOrBeforeTime(in.NewestInclusive)
if s.Timestamp != model.Earliest {
values = append(values, s)
}
break
}
}
if len(values) == 1 {
// We found exactly one value. In that case, add the most recent we know.
chunkIt := it.chunkIterator(len(it.chunks) - 1)
values = append(values, model.SamplePair{
Timestamp: chunkIt.lastTimestamp(),
Value: chunkIt.lastSampleValue(),
})
}
if len(values) == 2 && values[0].Equal(&values[1]) {
return values[:1]
}
return values
}
// RangeValues implements SeriesIterator.
func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePair {
// Find the first chunk for which the first sample is within the interval.
@ -653,11 +588,6 @@ func (it *singleSampleSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.Sa
return it.samplePair
}
// BoundaryValues implements SeriesIterator.
func (it *singleSampleSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair {
return it.RangeValues(in)
}
// RangeValues implements SeriesIterator.
func (it *singleSampleSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair {
if it.samplePair.Timestamp.After(in.NewestInclusive) ||
@ -675,11 +605,6 @@ func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
return ZeroSamplePair
}
// BoundaryValues implements SeriesIterator.
func (i nopSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair {
return []model.SamplePair{}
}
// RangeValues implements SeriesIterator.
func (i nopSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair {
return []model.SamplePair{}

View file

@ -374,17 +374,6 @@ func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair
return bit.it.ValueAtOrBeforeTime(ts)
}
// BoundaryValues implements the SeriesIterator interface.
func (bit *boundedIterator) BoundaryValues(interval metric.Interval) []model.SamplePair {
if interval.NewestInclusive < bit.start {
return []model.SamplePair{}
}
if interval.OldestInclusive < bit.start {
interval.OldestInclusive = bit.start
}
return bit.it.BoundaryValues(interval)
}
// RangeValues implements the SeriesIterator interface.
func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.SamplePair {
if interval.NewestInclusive < bit.start {

View file

@ -424,14 +424,6 @@ func TestRetentionCutoff(t *testing.T) {
if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt {
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
}
vals = it.BoundaryValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now})
if len(vals) != 2 {
t.Errorf("expected 2 values but got %d", len(vals))
}
if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt {
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
}
}
func TestDropMetrics(t *testing.T) {
@ -1036,18 +1028,18 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
actual := it.BoundaryValues(metric.Interval{
actual := it.RangeValues(metric.Interval{
OldestInclusive: 0,
NewestInclusive: 100000,
})
if len(actual) != 2 {
t.Fatal("expected two results after purging half of series")
if len(actual) < 4000 {
t.Fatalf("expected more than %d results after purging half of series, got %d", 4000, len(actual))
}
if actual[0].Timestamp < 6000 || actual[0].Timestamp > 10000 {
t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp)
}
want := model.Time(19998)
if actual[1].Timestamp != want {
if actual[len(actual)-1].Timestamp != want {
t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp)
}
@ -1057,7 +1049,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
actual = it.BoundaryValues(metric.Interval{
actual = it.RangeValues(metric.Interval{
OldestInclusive: 0,
NewestInclusive: 100000,
})

View file

@ -29,7 +29,6 @@ const (
ResultAppendTime
QueryAnalysisTime
GetValueAtTimeTime
GetBoundaryValuesTime
GetRangeValuesTime
ExecQueueTime
ViewDiskPreparationTime
@ -60,8 +59,6 @@ func (s QueryTiming) String() string {
return "Query analysis time"
case GetValueAtTimeTime:
return "GetValueAtTime() time"
case GetBoundaryValuesTime:
return "GetBoundaryValues() time"
case GetRangeValuesTime:
return "GetRangeValues() time"
case ExecQueueTime: