diff --git a/promql/engine.go b/promql/engine.go index 06a26e377b..8a0aa23f69 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2773,15 +2773,19 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram } type groupedAggregation struct { + floatValue float64 + histogramValue *histogram.FloatHistogram + floatMean float64 + floatKahanC float64 // "Compensating value" for Kahan summation. + groupCount float64 + heap vectorByValueHeap + + // All bools together for better packing within the struct. seen bool // Was this output groups seen in the input at this timestamp. hasFloat bool // Has at least 1 float64 sample aggregated. hasHistogram bool // Has at least 1 histogram sample aggregated. - floatValue float64 - histogramValue *histogram.FloatHistogram - floatMean float64 // Mean, or "compensating value" for Kahan summation. - groupCount float64 groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group. - heap vectorByValueHeap + incrementalMean bool // True after reverting to incremental calculation of the mean value. } // aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. @@ -2807,13 +2811,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix *group = groupedAggregation{ seen: true, floatValue: f, + floatMean: f, groupCount: 1, } switch op { - case parser.AVG: - group.floatMean = f - fallthrough - case parser.SUM: + case parser.AVG, parser.SUM: if h == nil { group.hasFloat = true } else { @@ -2821,7 +2823,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix group.hasHistogram = true } case parser.STDVAR, parser.STDDEV: - group.floatMean = f group.floatValue = 0 case parser.QUANTILE: group.heap = make(vectorByValueHeap, 1) @@ -2847,7 +2848,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // point in copying the histogram in that case. } else { group.hasFloat = true - group.floatValue, group.floatMean = kahanSumInc(f, group.floatValue, group.floatMean) + group.floatValue, group.floatKahanC = kahanSumInc(f, group.floatValue, group.floatKahanC) } case parser.AVG: @@ -2871,6 +2872,22 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // point in copying the histogram in that case. } else { group.hasFloat = true + if !group.incrementalMean { + newV, newC := kahanSumInc(f, group.floatValue, group.floatKahanC) + if !math.IsInf(newV, 0) { + // The sum doesn't overflow, so we propagate it to the + // group struct and continue with the regular + // calculation of the mean value. + group.floatValue, group.floatKahanC = newV, newC + break + } + // If we are here, we know that the sum _would_ overflow. So + // instead of continue to sum up, we revert to incremental + // calculation of the mean value from here on. + group.incrementalMean = true + group.floatMean = group.floatValue / (group.groupCount - 1) + group.floatKahanC /= group.groupCount - 1 + } if math.IsInf(group.floatMean, 0) { if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) { // The `floatMean` and `s.F` values are `Inf` of the same sign. They @@ -2888,8 +2905,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix break } } - // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. - group.floatMean += f/group.groupCount - group.floatMean/group.groupCount + currentMean := group.floatMean + group.floatKahanC + group.floatMean, group.floatKahanC = kahanSumInc( + // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. + f/group.groupCount-currentMean/group.groupCount, + group.floatMean, + group.floatKahanC, + ) } case parser.GROUP: @@ -2938,10 +2960,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange())) continue } - if aggr.hasHistogram { + switch { + case aggr.hasHistogram: aggr.histogramValue = aggr.histogramValue.Compact(0) - } else { - aggr.floatValue = aggr.floatMean + case aggr.incrementalMean: + aggr.floatValue = aggr.floatMean + aggr.floatKahanC + default: + aggr.floatValue = (aggr.floatValue + aggr.floatKahanC) / aggr.groupCount } case parser.COUNT: @@ -2965,7 +2990,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix if aggr.hasHistogram { aggr.histogramValue.Compact(0) } else { - aggr.floatValue += aggr.floatMean // Add Kahan summation compensating term. + aggr.floatValue += aggr.floatKahanC } default: // For other aggregations, we already have the right value. diff --git a/promql/promqltest/testdata/aggregators.test b/promql/promqltest/testdata/aggregators.test index cbb255a122..68d2e735b3 100644 --- a/promql/promqltest/testdata/aggregators.test +++ b/promql/promqltest/testdata/aggregators.test @@ -503,7 +503,7 @@ eval instant at 1m avg(data{test="-big"}) eval instant at 1m avg(data{test="bigzero"}) {} 0 -# Test summing extreme values. +# Test summing and averaging extreme values. clear load 10s @@ -529,21 +529,39 @@ load 10s eval instant at 1m sum(data{test="ten"}) {} 10 +eval instant at 1m avg(data{test="ten"}) + {} 2.5 + eval instant at 1m sum by (group) (data{test="pos_inf"}) {group="1"} Inf {group="2"} Inf +eval instant at 1m avg by (group) (data{test="pos_inf"}) + {group="1"} Inf + {group="2"} Inf + eval instant at 1m sum by (group) (data{test="neg_inf"}) {group="1"} -Inf {group="2"} -Inf +eval instant at 1m avg by (group) (data{test="neg_inf"}) + {group="1"} -Inf + {group="2"} -Inf + eval instant at 1m sum(data{test="inf_inf"}) {} NaN +eval instant at 1m avg(data{test="inf_inf"}) + {} NaN + eval instant at 1m sum by (group) (data{test="nan"}) {group="1"} NaN {group="2"} NaN +eval instant at 1m avg by (group) (data{test="nan"}) + {group="1"} NaN + {group="2"} NaN + clear # Test that aggregations are deterministic.