promql: simplify data collection in aggregations

We don't need a Sample, just the float and histogram values.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-03-04 21:05:00 +00:00
parent 2f03acbafc
commit 526ce4ee7a

View file

@ -2740,7 +2740,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
} }
for si := range inputMatrix { for si := range inputMatrix {
s, ok := ev.nextSample(enh.Ts, inputMatrix, si) f, h, ok := ev.nextValues(enh.Ts, &inputMatrix[si])
if !ok { if !ok {
continue continue
} }
@ -2749,18 +2749,18 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
// Initialize this group if it's the first time we've seen it. // Initialize this group if it's the first time we've seen it.
if !seen[seriesToResult[si]] { if !seen[seriesToResult[si]] {
*group = groupedAggregation{ *group = groupedAggregation{
floatValue: s.F, floatValue: f,
floatMean: s.F, floatMean: f,
groupCount: 1, groupCount: 1,
} }
switch { switch {
case s.H == nil: case h == nil:
group.hasFloat = true group.hasFloat = true
case op == parser.SUM: case op == parser.SUM:
group.histogramValue = s.H.Copy() group.histogramValue = h.Copy()
group.hasHistogram = true group.hasHistogram = true
case op == parser.AVG: case op == parser.AVG:
group.histogramMean = s.H.Copy() group.histogramMean = h.Copy()
group.hasHistogram = true group.hasHistogram = true
case op == parser.STDVAR || op == parser.STDDEV: case op == parser.STDVAR || op == parser.STDDEV:
group.groupCount = 0 group.groupCount = 0
@ -2771,10 +2771,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
group.floatValue = 0 group.floatValue = 0
case parser.QUANTILE: case parser.QUANTILE:
group.heap = make(vectorByValueHeap, 1) group.heap = make(vectorByValueHeap, 1)
group.heap[0] = Sample{ group.heap[0] = Sample{F: f}
F: s.F,
Metric: s.Metric,
}
case parser.GROUP: case parser.GROUP:
group.floatValue = 1 group.floatValue = 1
} }
@ -2784,25 +2781,25 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
switch op { switch op {
case parser.SUM: case parser.SUM:
if s.H != nil { if h != nil {
group.hasHistogram = true group.hasHistogram = true
if group.histogramValue != nil { if group.histogramValue != nil {
group.histogramValue.Add(s.H) group.histogramValue.Add(h)
} }
// Otherwise the aggregation contained floats // Otherwise the aggregation contained floats
// previously and will be invalid anyway. No // previously and will be invalid anyway. No
// point in copying the histogram in that case. // point in copying the histogram in that case.
} else { } else {
group.hasFloat = true group.hasFloat = true
group.floatValue += s.F group.floatValue += f
} }
case parser.AVG: case parser.AVG:
group.groupCount++ group.groupCount++
if s.H != nil { if h != nil {
group.hasHistogram = true group.hasHistogram = true
if group.histogramMean != nil { if group.histogramMean != nil {
left := s.H.Copy().Div(float64(group.groupCount)) left := h.Copy().Div(float64(group.groupCount))
right := group.histogramMean.Copy().Div(float64(group.groupCount)) right := group.histogramMean.Copy().Div(float64(group.groupCount))
toAdd := left.Sub(right) toAdd := left.Sub(right)
group.histogramMean.Add(toAdd) group.histogramMean.Add(toAdd)
@ -2813,13 +2810,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
} else { } else {
group.hasFloat = true group.hasFloat = true
if math.IsInf(group.floatMean, 0) { if math.IsInf(group.floatMean, 0) {
if math.IsInf(s.F, 0) && (group.floatMean > 0) == (s.F > 0) { if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) {
// The `floatMean` and `s.F` values are `Inf` of the same sign. They // The `floatMean` and `s.F` values are `Inf` of the same sign. They
// can't be subtracted, but the value of `floatMean` is correct // can't be subtracted, but the value of `floatMean` is correct
// already. // already.
break break
} }
if !math.IsInf(s.F, 0) && !math.IsNaN(s.F) { if !math.IsInf(f, 0) && !math.IsNaN(f) {
// At this stage, the mean is an infinite. If the added // At this stage, the mean is an infinite. If the added
// value is neither an Inf or a Nan, we can keep that mean // value is neither an Inf or a Nan, we can keep that mean
// value. // value.
@ -2830,35 +2827,35 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
} }
} }
// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
group.floatMean += s.F/float64(group.groupCount) - group.floatMean/float64(group.groupCount) group.floatMean += f/float64(group.groupCount) - group.floatMean/float64(group.groupCount)
} }
case parser.GROUP: case parser.GROUP:
// Do nothing. Required to avoid the panic in `default:` below. // Do nothing. Required to avoid the panic in `default:` below.
case parser.MAX: case parser.MAX:
if group.floatValue < s.F || math.IsNaN(group.floatValue) { if group.floatValue < f || math.IsNaN(group.floatValue) {
group.floatValue = s.F group.floatValue = f
} }
case parser.MIN: case parser.MIN:
if group.floatValue > s.F || math.IsNaN(group.floatValue) { if group.floatValue > f || math.IsNaN(group.floatValue) {
group.floatValue = s.F group.floatValue = f
} }
case parser.COUNT: case parser.COUNT:
group.groupCount++ group.groupCount++
case parser.STDVAR, parser.STDDEV: case parser.STDVAR, parser.STDDEV:
if s.H == nil { // Ignore native histograms. if h == nil { // Ignore native histograms.
group.groupCount++ group.groupCount++
delta := s.F - group.floatMean delta := f - group.floatMean
group.floatMean += delta / float64(group.groupCount) group.floatMean += delta / float64(group.groupCount)
group.floatValue += delta * (s.F - group.floatMean) group.floatValue += delta * (f - group.floatMean)
} }
case parser.QUANTILE: case parser.QUANTILE:
group.heap = append(group.heap, s) group.heap = append(group.heap, Sample{F: f})
default: default:
panic(fmt.Errorf("expected aggregation operator but got %q", op)) panic(fmt.Errorf("expected aggregation operator but got %q", op))