Merge pull request #14413 from prometheus/beorn7/promql
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run

promql: more Kahan summation (avg) and less incremental mean calculation (avg, avg_over_time)
This commit is contained in:
Björn Rabenstein 2024-08-06 19:56:32 +02:00 committed by GitHub
commit ee5bba07c0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 99 additions and 32 deletions

View file

@ -165,6 +165,9 @@ func rangeQueryCases() []benchCase {
{ {
expr: "sum(a_X)", expr: "sum(a_X)",
}, },
{
expr: "avg(a_X)",
},
{ {
expr: "sum without (l)(h_X)", expr: "sum without (l)(h_X)",
}, },

View file

@ -2773,15 +2773,19 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram
} }
type groupedAggregation struct { type groupedAggregation struct {
floatValue float64
histogramValue *histogram.FloatHistogram
floatMean float64
floatKahanC float64 // "Compensating value" for Kahan summation.
groupCount float64
heap vectorByValueHeap
// All bools together for better packing within the struct.
seen bool // Was this output groups seen in the input at this timestamp. seen bool // Was this output groups seen in the input at this timestamp.
hasFloat bool // Has at least 1 float64 sample aggregated. hasFloat bool // Has at least 1 float64 sample aggregated.
hasHistogram bool // Has at least 1 histogram sample aggregated. hasHistogram bool // Has at least 1 histogram sample aggregated.
floatValue float64 groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group.
histogramValue *histogram.FloatHistogram incrementalMean bool // True after reverting to incremental calculation of the mean value.
floatMean float64 // Mean, or "compensating value" for Kahan summation.
groupCount int
groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group
heap vectorByValueHeap
} }
// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. // aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix.
@ -2807,13 +2811,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
*group = groupedAggregation{ *group = groupedAggregation{
seen: true, seen: true,
floatValue: f, floatValue: f,
floatMean: f,
groupCount: 1, groupCount: 1,
} }
switch op { switch op {
case parser.AVG: case parser.AVG, parser.SUM:
group.floatMean = f
fallthrough
case parser.SUM:
if h == nil { if h == nil {
group.hasFloat = true group.hasFloat = true
} else { } else {
@ -2821,7 +2823,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
group.hasHistogram = true group.hasHistogram = true
} }
case parser.STDVAR, parser.STDDEV: case parser.STDVAR, parser.STDDEV:
group.floatMean = f
group.floatValue = 0 group.floatValue = 0
case parser.QUANTILE: case parser.QUANTILE:
group.heap = make(vectorByValueHeap, 1) group.heap = make(vectorByValueHeap, 1)
@ -2847,7 +2848,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
// point in copying the histogram in that case. // point in copying the histogram in that case.
} else { } else {
group.hasFloat = true group.hasFloat = true
group.floatValue, group.floatMean = kahanSumInc(f, group.floatValue, group.floatMean) group.floatValue, group.floatKahanC = kahanSumInc(f, group.floatValue, group.floatKahanC)
} }
case parser.AVG: case parser.AVG:
@ -2855,8 +2856,8 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
if h != nil { if h != nil {
group.hasHistogram = true group.hasHistogram = true
if group.histogramValue != nil { if group.histogramValue != nil {
left := h.Copy().Div(float64(group.groupCount)) left := h.Copy().Div(group.groupCount)
right := group.histogramValue.Copy().Div(float64(group.groupCount)) right := group.histogramValue.Copy().Div(group.groupCount)
toAdd, err := left.Sub(right) toAdd, err := left.Sub(right)
if err != nil { if err != nil {
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
@ -2871,6 +2872,22 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
// point in copying the histogram in that case. // point in copying the histogram in that case.
} else { } else {
group.hasFloat = true group.hasFloat = true
if !group.incrementalMean {
newV, newC := kahanSumInc(f, group.floatValue, group.floatKahanC)
if !math.IsInf(newV, 0) {
// The sum doesn't overflow, so we propagate it to the
// group struct and continue with the regular
// calculation of the mean value.
group.floatValue, group.floatKahanC = newV, newC
break
}
// If we are here, we know that the sum _would_ overflow. So
// instead of continue to sum up, we revert to incremental
// calculation of the mean value from here on.
group.incrementalMean = true
group.floatMean = group.floatValue / (group.groupCount - 1)
group.floatKahanC /= group.groupCount - 1
}
if math.IsInf(group.floatMean, 0) { if math.IsInf(group.floatMean, 0) {
if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) { if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) {
// The `floatMean` and `s.F` values are `Inf` of the same sign. They // The `floatMean` and `s.F` values are `Inf` of the same sign. They
@ -2888,8 +2905,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
break break
} }
} }
currentMean := group.floatMean + group.floatKahanC
group.floatMean, group.floatKahanC = kahanSumInc(
// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
group.floatMean += f/float64(group.groupCount) - group.floatMean/float64(group.groupCount) f/group.groupCount-currentMean/group.groupCount,
group.floatMean,
group.floatKahanC,
)
} }
case parser.GROUP: case parser.GROUP:
@ -2912,7 +2934,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
if h == nil { // Ignore native histograms. if h == nil { // Ignore native histograms.
group.groupCount++ group.groupCount++
delta := f - group.floatMean delta := f - group.floatMean
group.floatMean += delta / float64(group.groupCount) group.floatMean += delta / group.groupCount
group.floatValue += delta * (f - group.floatMean) group.floatValue += delta * (f - group.floatMean)
} }
@ -2938,20 +2960,23 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange())) annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange()))
continue continue
} }
if aggr.hasHistogram { switch {
case aggr.hasHistogram:
aggr.histogramValue = aggr.histogramValue.Compact(0) aggr.histogramValue = aggr.histogramValue.Compact(0)
} else { case aggr.incrementalMean:
aggr.floatValue = aggr.floatMean aggr.floatValue = aggr.floatMean + aggr.floatKahanC
default:
aggr.floatValue = (aggr.floatValue + aggr.floatKahanC) / aggr.groupCount
} }
case parser.COUNT: case parser.COUNT:
aggr.floatValue = float64(aggr.groupCount) aggr.floatValue = aggr.groupCount
case parser.STDVAR: case parser.STDVAR:
aggr.floatValue /= float64(aggr.groupCount) aggr.floatValue /= aggr.groupCount
case parser.STDDEV: case parser.STDDEV:
aggr.floatValue = math.Sqrt(aggr.floatValue / float64(aggr.groupCount)) aggr.floatValue = math.Sqrt(aggr.floatValue / aggr.groupCount)
case parser.QUANTILE: case parser.QUANTILE:
aggr.floatValue = quantile(q, aggr.heap) aggr.floatValue = quantile(q, aggr.heap)
@ -2965,7 +2990,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
if aggr.hasHistogram { if aggr.hasHistogram {
aggr.histogramValue.Compact(0) aggr.histogramValue.Compact(0)
} else { } else {
aggr.floatValue += aggr.floatMean // Add Kahan summation compensating term. aggr.floatValue += aggr.floatKahanC
} }
default: default:
// For other aggregations, we already have the right value. // For other aggregations, we already have the right value.

View file

@ -580,9 +580,28 @@ func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode
return vec, nil return vec, nil
} }
return aggrOverTime(vals, enh, func(s Series) float64 { return aggrOverTime(vals, enh, func(s Series) float64 {
var mean, count, c float64 var (
sum, mean, count, kahanC float64
incrementalMean bool
)
for _, f := range s.Floats { for _, f := range s.Floats {
count++ count++
if !incrementalMean {
newSum, newC := kahanSumInc(f.F, sum, kahanC)
// Perform regular mean calculation as long as
// the sum doesn't overflow and (in any case)
// for the first iteration (even if we start
// with ±Inf) to not run into division-by-zero
// problems below.
if count == 1 || !math.IsInf(newSum, 0) {
sum, kahanC = newSum, newC
continue
}
// Handle overflow by reverting to incremental calculation of the mean value.
incrementalMean = true
mean = sum / (count - 1)
kahanC /= count - 1
}
if math.IsInf(mean, 0) { if math.IsInf(mean, 0) {
if math.IsInf(f.F, 0) && (mean > 0) == (f.F > 0) { if math.IsInf(f.F, 0) && (mean > 0) == (f.F > 0) {
// The `mean` and `f.F` values are `Inf` of the same sign. They // The `mean` and `f.F` values are `Inf` of the same sign. They
@ -600,13 +619,13 @@ func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode
continue continue
} }
} }
mean, c = kahanSumInc(f.F/count-mean/count, mean, c) correctedMean := mean + kahanC
mean, kahanC = kahanSumInc(f.F/count-correctedMean/count, mean, kahanC)
} }
if incrementalMean {
if math.IsInf(mean, 0) { return mean + kahanC
return mean
} }
return mean + c return (sum + kahanC) / count
}), nil }), nil
} }

View file

@ -503,7 +503,7 @@ eval instant at 1m avg(data{test="-big"})
eval instant at 1m avg(data{test="bigzero"}) eval instant at 1m avg(data{test="bigzero"})
{} 0 {} 0
# Test summing extreme values. # Test summing and averaging extreme values.
clear clear
load 10s load 10s
@ -529,21 +529,39 @@ load 10s
eval instant at 1m sum(data{test="ten"}) eval instant at 1m sum(data{test="ten"})
{} 10 {} 10
eval instant at 1m avg(data{test="ten"})
{} 2.5
eval instant at 1m sum by (group) (data{test="pos_inf"}) eval instant at 1m sum by (group) (data{test="pos_inf"})
{group="1"} Inf {group="1"} Inf
{group="2"} Inf {group="2"} Inf
eval instant at 1m avg by (group) (data{test="pos_inf"})
{group="1"} Inf
{group="2"} Inf
eval instant at 1m sum by (group) (data{test="neg_inf"}) eval instant at 1m sum by (group) (data{test="neg_inf"})
{group="1"} -Inf {group="1"} -Inf
{group="2"} -Inf {group="2"} -Inf
eval instant at 1m avg by (group) (data{test="neg_inf"})
{group="1"} -Inf
{group="2"} -Inf
eval instant at 1m sum(data{test="inf_inf"}) eval instant at 1m sum(data{test="inf_inf"})
{} NaN {} NaN
eval instant at 1m avg(data{test="inf_inf"})
{} NaN
eval instant at 1m sum by (group) (data{test="nan"}) eval instant at 1m sum by (group) (data{test="nan"})
{group="1"} NaN {group="1"} NaN
{group="2"} NaN {group="2"} NaN
eval instant at 1m avg by (group) (data{test="nan"})
{group="1"} NaN
{group="2"} NaN
clear clear
# Test that aggregations are deterministic. # Test that aggregations are deterministic.

View file

@ -748,7 +748,6 @@ eval instant at 1m avg_over_time(metric6c[1m])
eval instant at 1m sum_over_time(metric6c[1m])/count_over_time(metric6c[1m]) eval instant at 1m sum_over_time(metric6c[1m])/count_over_time(metric6c[1m])
{} NaN {} NaN
eval instant at 1m avg_over_time(metric7[1m]) eval instant at 1m avg_over_time(metric7[1m])
{} NaN {} NaN
@ -783,6 +782,9 @@ load 10s
eval instant at 1m sum_over_time(metric[1m]) eval instant at 1m sum_over_time(metric[1m])
{} 2 {} 2
eval instant at 1m avg_over_time(metric[1m])
{} 0.5
# Tests for stddev_over_time and stdvar_over_time. # Tests for stddev_over_time and stdvar_over_time.
clear clear
load 10s load 10s