diff --git a/promql/engine.go b/promql/engine.go index 2f7dcb222e..b8a8ea0959 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1067,8 +1067,6 @@ func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws annotations.Anno // EvalSeriesHelper stores extra information about a series. type EvalSeriesHelper struct { - // The grouping key used by aggregation. - groupingKey uint64 // Used to map left-hand to right-hand in binary operations. signature string } @@ -1259,17 +1257,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) } else { ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts} } - if sample.H == nil { - if ss.Floats == nil { - ss.Floats = getFPointSlice(numSteps) - } - ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F}) - } else { - if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) - } - ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H}) - } + addToSeries(&ss.Series, enh.Ts, sample.F, sample.H, numSteps) seriess[h] = ss } } @@ -1291,6 +1279,116 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) return mat, warnings } +func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) { + // Keep a copy of the original point slice so that it can be returned to the pool. + origMatrix := slices.Clone(inputMatrix) + defer func() { + for _, s := range origMatrix { + putFPointSlice(s.Floats) + putHPointSlice(s.Histograms) + } + }() + + var warnings annotations.Annotations + + enh := &EvalNodeHelper{} + tempNumSamples := ev.currentSamples + + // Create a mapping from input series to output groups. + buf := make([]byte, 0, 1024) + groupToResultIndex := make(map[uint64]int) + seriesToResult := make([]int, len(inputMatrix)) + var result Matrix + + groupCount := 0 + for si, series := range inputMatrix { + var groupingKey uint64 + groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) + index, ok := groupToResultIndex[groupingKey] + // Add a new group if it doesn't exist. + if !ok { + if aggExpr.Op != parser.TOPK && aggExpr.Op != parser.BOTTOMK { + m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping) + result = append(result, Series{Metric: m}) + } + index = groupCount + groupToResultIndex[groupingKey] = index + groupCount++ + } + seriesToResult[si] = index + } + groups := make([]groupedAggregation, groupCount) + + var k int + var seriess map[uint64]Series + switch aggExpr.Op { + case parser.TOPK, parser.BOTTOMK: + if !convertibleToInt64(param) { + ev.errorf("Scalar value %v overflows int64", param) + } + k = int(param) + if k > len(inputMatrix) { + k = len(inputMatrix) + } + if k < 1 { + return nil, warnings + } + seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. + case parser.QUANTILE: + if math.IsNaN(param) || param < 0 || param > 1 { + warnings.Add(annotations.NewInvalidQuantileWarning(param, aggExpr.Param.PositionRange())) + } + } + + for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { + if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + ev.error(err) + } + // Reset number of samples in memory after each timestamp. + ev.currentSamples = tempNumSamples + + // Make the function call. + enh.Ts = ts + var ws annotations.Annotations + switch aggExpr.Op { + case parser.TOPK, parser.BOTTOMK: + result, ws = ev.aggregationK(aggExpr, k, inputMatrix, seriesToResult, groups, enh, seriess) + // If this could be an instant query, shortcut so as not to change sort order. + if ev.endTimestamp == ev.startTimestamp { + return result, ws + } + default: + ws = ev.aggregation(aggExpr, param, inputMatrix, result, seriesToResult, groups, enh) + } + + warnings.Merge(ws) + + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + } + + // Assemble the output matrix. By the time we get here we know we don't have too many samples. + switch aggExpr.Op { + case parser.TOPK, parser.BOTTOMK: + result = make(Matrix, 0, len(seriess)) + for _, ss := range seriess { + result = append(result, ss) + } + default: + // Remove empty result rows. + dst := 0 + for _, series := range result { + if len(series.Floats) > 0 || len(series.Histograms) > 0 { + result[dst] = series + dst++ + } + } + result = result[:dst] + } + return result, warnings +} + // evalSubquery evaluates given SubqueryExpr and returns an equivalent // evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set. func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) { @@ -1343,28 +1441,44 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio sortedGrouping := e.Grouping slices.Sort(sortedGrouping) - // Prepare a function to initialise series helpers with the grouping key. - buf := make([]byte, 0, 1024) - initSeries := func(series labels.Labels, h *EvalSeriesHelper) { - h.groupingKey, buf = generateGroupingKey(series, sortedGrouping, e.Without, buf) - } - unwrapParenExpr(&e.Param) param := unwrapStepInvariantExpr(e.Param) unwrapParenExpr(¶m) - if s, ok := param.(*parser.StringLiteral); ok { - return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - return ev.aggregation(e, sortedGrouping, s.Val, v[0].(Vector), sh[0], enh) + + if e.Op == parser.COUNT_VALUES { + valueLabel := param.(*parser.StringLiteral) + if !model.LabelName(valueLabel.Val).IsValid() { + ev.errorf("invalid label name %q", valueLabel) + } + if !e.Without { + sortedGrouping = append(sortedGrouping, valueLabel.Val) + slices.Sort(sortedGrouping) + } + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.aggregationCountValues(e, sortedGrouping, valueLabel.Val, v[0].(Vector), enh) }, e.Expr) } - return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - var param float64 - if e.Param != nil { - param = v[0].(Vector)[0].F - } - return ev.aggregation(e, sortedGrouping, param, v[1].(Vector), sh[1], enh) - }, e.Param, e.Expr) + var warnings annotations.Annotations + originalNumSamples := ev.currentSamples + // param is the number k for topk/bottomk, or q for quantile. + var fParam float64 + if param != nil { + val, ws := ev.eval(param) + warnings.Merge(ws) + fParam = val.(Matrix)[0].Floats[0].F + } + // Now fetch the data to be aggregated. + val, ws := ev.eval(e.Expr) + warnings.Merge(ws) + inputMatrix := val.(Matrix) + + result, ws := ev.rangeEvalAgg(e, sortedGrouping, inputMatrix, fParam) + warnings.Merge(ws) + ev.currentSamples = originalNumSamples + result.TotalSamples() + ev.samplesStats.UpdatePeak(ev.currentSamples) + + return result, warnings case *parser.Call: call := FunctionCalls[e.Func.Name] @@ -2614,171 +2728,85 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram } type groupedAggregation struct { + seen bool // Was this output groups seen in the input at this timestamp. hasFloat bool // Has at least 1 float64 sample aggregated. hasHistogram bool // Has at least 1 histogram sample aggregated. - labels labels.Labels floatValue float64 histogramValue *histogram.FloatHistogram floatMean float64 - histogramMean *histogram.FloatHistogram groupCount int heap vectorByValueHeap - reverseHeap vectorByReverseValueHeap } -// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels -// must be sorted. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { +// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. +// These functions produce one output series for each group specified in the expression, with just the labels from `by(...)`. +// outputMatrix should be already populated with grouping labels; groups is one-to-one with outputMatrix. +// seriesToResult maps inputMatrix indexes to outputMatrix indexes. +func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix, outputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper) annotations.Annotations { op := e.Op - without := e.Without var annos annotations.Annotations - result := map[uint64]*groupedAggregation{} - orderedResult := []*groupedAggregation{} - var k int64 - if op == parser.TOPK || op == parser.BOTTOMK { - f := param.(float64) - if !convertibleToInt64(f) { - ev.errorf("Scalar value %v overflows int64", f) - } - k = int64(f) - if k < 1 { - return Vector{}, annos - } - } - var q float64 - if op == parser.QUANTILE { - q = param.(float64) - } - var valueLabel string - var recomputeGroupingKey bool - if op == parser.COUNT_VALUES { - valueLabel = param.(string) - if !model.LabelName(valueLabel).IsValid() { - ev.errorf("invalid label name %q", valueLabel) - } - if !without { - // We're changing the grouping labels so we have to ensure they're still sorted - // and we have to flag to recompute the grouping key. Considering the count_values() - // operator is less frequently used than other aggregations, we're fine having to - // re-compute the grouping key on each step for this case. - grouping = append(grouping, valueLabel) - slices.Sort(grouping) - recomputeGroupingKey = true - } + for i := range groups { + groups[i].seen = false } - var buf []byte - for si, s := range vec { - metric := s.Metric - - if op == parser.COUNT_VALUES { - enh.resetBuilder(metric) - enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64)) - metric = enh.lb.Labels() - - // We've changed the metric so we have to recompute the grouping key. - recomputeGroupingKey = true - } - - // We can use the pre-computed grouping key unless grouping labels have changed. - var groupingKey uint64 - if !recomputeGroupingKey { - groupingKey = seriesHelper[si].groupingKey - } else { - groupingKey, buf = generateGroupingKey(metric, grouping, without, buf) - } - - group, ok := result[groupingKey] - // Add a new group if it doesn't exist. + for si := range inputMatrix { + f, h, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) if !ok { - var m labels.Labels - enh.resetBuilder(metric) - switch { - case without: - enh.lb.Del(grouping...) - enh.lb.Del(labels.MetricName) - m = enh.lb.Labels() - case len(grouping) > 0: - enh.lb.Keep(grouping...) - m = enh.lb.Labels() - default: - m = labels.EmptyLabels() - } - newAgg := &groupedAggregation{ - labels: m, - floatValue: s.F, - floatMean: s.F, + continue + } + + group := &groups[seriesToResult[si]] + // Initialize this group if it's the first time we've seen it. + if !group.seen { + *group = groupedAggregation{ + seen: true, + floatValue: f, + floatMean: f, groupCount: 1, } - switch { - case s.H == nil: - newAgg.hasFloat = true - case op == parser.SUM: - newAgg.histogramValue = s.H.Copy() - newAgg.hasHistogram = true - case op == parser.AVG: - newAgg.histogramMean = s.H.Copy() - newAgg.hasHistogram = true - case op == parser.STDVAR || op == parser.STDDEV: - newAgg.groupCount = 0 - } - - result[groupingKey] = newAgg - orderedResult = append(orderedResult, newAgg) - - inputVecLen := int64(len(vec)) - resultSize := k - switch { - case k > inputVecLen: - resultSize = inputVecLen - case k == 0: - resultSize = 1 - } switch op { + case parser.SUM, parser.AVG: + if h == nil { + group.hasFloat = true + } else { + group.histogramValue = h.Copy() + group.hasHistogram = true + } case parser.STDVAR, parser.STDDEV: - result[groupingKey].floatValue = 0 - case parser.TOPK, parser.QUANTILE: - result[groupingKey].heap = make(vectorByValueHeap, 1, resultSize) - result[groupingKey].heap[0] = Sample{ - F: s.F, - Metric: s.Metric, - } - case parser.BOTTOMK: - result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 1, resultSize) - result[groupingKey].reverseHeap[0] = Sample{ - F: s.F, - Metric: s.Metric, - } + group.floatValue = 0 + case parser.QUANTILE: + group.heap = make(vectorByValueHeap, 1) + group.heap[0] = Sample{F: f} case parser.GROUP: - result[groupingKey].floatValue = 1 + group.floatValue = 1 } continue } switch op { case parser.SUM: - if s.H != nil { + if h != nil { group.hasHistogram = true if group.histogramValue != nil { - group.histogramValue.Add(s.H) + group.histogramValue.Add(h) } // Otherwise the aggregation contained floats // previously and will be invalid anyway. No // point in copying the histogram in that case. } else { group.hasFloat = true - group.floatValue += s.F + group.floatValue += f } case parser.AVG: group.groupCount++ - if s.H != nil { + if h != nil { group.hasHistogram = true - if group.histogramMean != nil { - left := s.H.Copy().Div(float64(group.groupCount)) - right := group.histogramMean.Copy().Div(float64(group.groupCount)) + if group.histogramValue != nil { + left := h.Copy().Div(float64(group.groupCount)) + right := group.histogramValue.Copy().Div(float64(group.groupCount)) toAdd := left.Sub(right) - group.histogramMean.Add(toAdd) + group.histogramValue.Add(toAdd) } // Otherwise the aggregation contained floats // previously and will be invalid anyway. No @@ -2786,13 +2814,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par } else { group.hasFloat = true if math.IsInf(group.floatMean, 0) { - if math.IsInf(s.F, 0) && (group.floatMean > 0) == (s.F > 0) { + if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) { // The `floatMean` and `s.F` values are `Inf` of the same sign. They // can't be subtracted, but the value of `floatMean` is correct // already. break } - if !math.IsInf(s.F, 0) && !math.IsNaN(s.F) { + if !math.IsInf(f, 0) && !math.IsNaN(f) { // At this stage, the mean is an infinite. If the added // value is neither an Inf or a Nan, we can keep that mean // value. @@ -2803,81 +2831,48 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par } } // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. - group.floatMean += s.F/float64(group.groupCount) - group.floatMean/float64(group.groupCount) + group.floatMean += f/float64(group.groupCount) - group.floatMean/float64(group.groupCount) } case parser.GROUP: // Do nothing. Required to avoid the panic in `default:` below. case parser.MAX: - if group.floatValue < s.F || math.IsNaN(group.floatValue) { - group.floatValue = s.F + if group.floatValue < f || math.IsNaN(group.floatValue) { + group.floatValue = f } case parser.MIN: - if group.floatValue > s.F || math.IsNaN(group.floatValue) { - group.floatValue = s.F + if group.floatValue > f || math.IsNaN(group.floatValue) { + group.floatValue = f } - case parser.COUNT, parser.COUNT_VALUES: + case parser.COUNT: group.groupCount++ case parser.STDVAR, parser.STDDEV: - if s.H == nil { // Ignore native histograms. + if h == nil { // Ignore native histograms. group.groupCount++ - delta := s.F - group.floatMean + delta := f - group.floatMean group.floatMean += delta / float64(group.groupCount) - group.floatValue += delta * (s.F - group.floatMean) - } - - case parser.TOPK: - // We build a heap of up to k elements, with the smallest element at heap[0]. - switch { - case int64(len(group.heap)) < k: - heap.Push(&group.heap, &Sample{ - F: s.F, - Metric: s.Metric, - }) - case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)): - // This new element is bigger than the previous smallest element - overwrite that. - group.heap[0] = Sample{ - F: s.F, - Metric: s.Metric, - } - if k > 1 { - heap.Fix(&group.heap, 0) // Maintain the heap invariant. - } - } - - case parser.BOTTOMK: - // We build a heap of up to k elements, with the biggest element at heap[0]. - switch { - case int64(len(group.reverseHeap)) < k: - heap.Push(&group.reverseHeap, &Sample{ - F: s.F, - Metric: s.Metric, - }) - case group.reverseHeap[0].F > s.F || (math.IsNaN(group.reverseHeap[0].F) && !math.IsNaN(s.F)): - // This new element is smaller than the previous biggest element - overwrite that. - group.reverseHeap[0] = Sample{ - F: s.F, - Metric: s.Metric, - } - if k > 1 { - heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant. - } + group.floatValue += delta * (f - group.floatMean) } case parser.QUANTILE: - group.heap = append(group.heap, s) + group.heap = append(group.heap, Sample{F: f}) default: panic(fmt.Errorf("expected aggregation operator but got %q", op)) } } - // Construct the result Vector from the aggregated groups. - for _, aggr := range orderedResult { + // Construct the output matrix from the aggregated groups. + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + + for ri, aggr := range groups { + if !aggr.seen { + continue + } switch op { case parser.AVG: if aggr.hasFloat && aggr.hasHistogram { @@ -2886,12 +2881,12 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par continue } if aggr.hasHistogram { - aggr.histogramValue = aggr.histogramMean.Compact(0) + aggr.histogramValue = aggr.histogramValue.Compact(0) } else { aggr.floatValue = aggr.floatMean } - case parser.COUNT, parser.COUNT_VALUES: + case parser.COUNT: aggr.floatValue = float64(aggr.groupCount) case parser.STDVAR: @@ -2900,36 +2895,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par case parser.STDDEV: aggr.floatValue = math.Sqrt(aggr.floatValue / float64(aggr.groupCount)) - case parser.TOPK: - // The heap keeps the lowest value on top, so reverse it. - if len(aggr.heap) > 1 { - sort.Sort(sort.Reverse(aggr.heap)) - } - for _, v := range aggr.heap { - enh.Out = append(enh.Out, Sample{ - Metric: v.Metric, - F: v.F, - }) - } - continue // Bypass default append. - - case parser.BOTTOMK: - // The heap keeps the highest value on top, so reverse it. - if len(aggr.reverseHeap) > 1 { - sort.Sort(sort.Reverse(aggr.reverseHeap)) - } - for _, v := range aggr.reverseHeap { - enh.Out = append(enh.Out, Sample{ - Metric: v.Metric, - F: v.F, - }) - } - continue // Bypass default append. - case parser.QUANTILE: - if math.IsNaN(q) || q < 0 || q > 1 { - annos.Add(annotations.NewInvalidQuantileWarning(q, e.Param.PositionRange())) - } aggr.floatValue = quantile(q, aggr.heap) case parser.SUM: @@ -2945,13 +2911,196 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par // For other aggregations, we already have the right value. } + ss := &outputMatrix[ri] + addToSeries(ss, enh.Ts, aggr.floatValue, aggr.histogramValue, numSteps) + } + + return annos +} + +// aggregationK evaluates topk or bottomk at one timestep on inputMatrix. +// Output that has the same labels as the input, but just k of them per group. +// seriesToResult maps inputMatrix indexes to groups indexes. +// For an instant query, returns a Matrix in descending order for topk or ascending for bottomk. +// For a range query, aggregates output in the seriess map. +func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { + op := e.Op + var s Sample + var annos annotations.Annotations + for i := range groups { + groups[i].seen = false + } + + for si := range inputMatrix { + f, _, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) + if !ok { + continue + } + s = Sample{Metric: inputMatrix[si].Metric, F: f} + + group := &groups[seriesToResult[si]] + // Initialize this group if it's the first time we've seen it. + if !group.seen { + *group = groupedAggregation{ + seen: true, + heap: make(vectorByValueHeap, 1, k), + } + group.heap[0] = s + continue + } + + switch op { + case parser.TOPK: + // We build a heap of up to k elements, with the smallest element at heap[0]. + switch { + case len(group.heap) < k: + heap.Push(&group.heap, &s) + case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)): + // This new element is bigger than the previous smallest element - overwrite that. + group.heap[0] = s + if k > 1 { + heap.Fix(&group.heap, 0) // Maintain the heap invariant. + } + } + + case parser.BOTTOMK: + // We build a heap of up to k elements, with the biggest element at heap[0]. + switch { + case len(group.heap) < k: + heap.Push((*vectorByReverseValueHeap)(&group.heap), &s) + case group.heap[0].F > s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)): + // This new element is smaller than the previous biggest element - overwrite that. + group.heap[0] = s + if k > 1 { + heap.Fix((*vectorByReverseValueHeap)(&group.heap), 0) // Maintain the heap invariant. + } + } + + default: + panic(fmt.Errorf("expected aggregation operator but got %q", op)) + } + } + + // Construct the result from the aggregated groups. + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + var mat Matrix + if ev.endTimestamp == ev.startTimestamp { + mat = make(Matrix, 0, len(groups)) + } + + add := func(lbls labels.Labels, f float64) { + // If this could be an instant query, add directly to the matrix so the result is in consistent order. + if ev.endTimestamp == ev.startTimestamp { + mat = append(mat, Series{Metric: lbls, Floats: []FPoint{{T: enh.Ts, F: f}}}) + } else { + // Otherwise the results are added into seriess elements. + hash := lbls.Hash() + ss, ok := seriess[hash] + if !ok { + ss = Series{Metric: lbls} + } + addToSeries(&ss, enh.Ts, f, nil, numSteps) + seriess[hash] = ss + } + } + for _, aggr := range groups { + if !aggr.seen { + continue + } + switch op { + case parser.TOPK: + // The heap keeps the lowest value on top, so reverse it. + if len(aggr.heap) > 1 { + sort.Sort(sort.Reverse(aggr.heap)) + } + for _, v := range aggr.heap { + add(v.Metric, v.F) + } + + case parser.BOTTOMK: + // The heap keeps the highest value on top, so reverse it. + if len(aggr.heap) > 1 { + sort.Sort(sort.Reverse((*vectorByReverseValueHeap)(&aggr.heap))) + } + for _, v := range aggr.heap { + add(v.Metric, v.F) + } + } + } + + return mat, annos +} + +// aggregationK evaluates count_values on vec. +// Outputs as many series per group as there are values in the input. +func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []string, valueLabel string, vec Vector, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + type groupCount struct { + labels labels.Labels + count int + } + result := map[uint64]*groupCount{} + + var buf []byte + for _, s := range vec { + enh.resetBuilder(s.Metric) + enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64)) + metric := enh.lb.Labels() + + // Considering the count_values() + // operator is less frequently used than other aggregations, we're fine having to + // re-compute the grouping key on each step for this case. + var groupingKey uint64 + groupingKey, buf = generateGroupingKey(metric, grouping, e.Without, buf) + + group, ok := result[groupingKey] + // Add a new group if it doesn't exist. + if !ok { + result[groupingKey] = &groupCount{ + labels: generateGroupingLabels(enh, metric, e.Without, grouping), + count: 1, + } + continue + } + + group.count++ + } + + // Construct the result Vector from the aggregated groups. + for _, aggr := range result { enh.Out = append(enh.Out, Sample{ Metric: aggr.labels, - F: aggr.floatValue, - H: aggr.histogramValue, + F: float64(aggr.count), }) } - return enh.Out, annos + return enh.Out, nil +} + +func addToSeries(ss *Series, ts int64, f float64, h *histogram.FloatHistogram, numSteps int) { + if h == nil { + if ss.Floats == nil { + ss.Floats = getFPointSlice(numSteps) + } + ss.Floats = append(ss.Floats, FPoint{T: ts, F: f}) + return + } + if ss.Histograms == nil { + ss.Histograms = getHPointSlice(numSteps) + } + ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: h}) +} + +func (ev *evaluator) nextValues(ts int64, series *Series) (f float64, h *histogram.FloatHistogram, b bool) { + switch { + case len(series.Floats) > 0 && series.Floats[0].T == ts: + f = series.Floats[0].F + series.Floats = series.Floats[1:] // Move input vectors forward + case len(series.Histograms) > 0 && series.Histograms[0].T == ts: + h = series.Histograms[0].H + series.Histograms = series.Histograms[1:] + default: + return f, h, false + } + return f, h, true } // groupingKey builds and returns the grouping key for the given metric and @@ -2969,6 +3118,21 @@ func generateGroupingKey(metric labels.Labels, grouping []string, without bool, return metric.HashForLabels(buf, grouping...) } +func generateGroupingLabels(enh *EvalNodeHelper, metric labels.Labels, without bool, grouping []string) labels.Labels { + enh.resetBuilder(metric) + switch { + case without: + enh.lb.Del(grouping...) + enh.lb.Del(labels.MetricName) + return enh.lb.Labels() + case len(grouping) > 0: + enh.lb.Keep(grouping...) + return enh.lb.Labels() + default: + return labels.EmptyLabels() + } +} + // btos returns 1 if b is true, 0 otherwise. func btos(b bool) float64 { if b { diff --git a/promql/engine_test.go b/promql/engine_test.go index 13731efd45..0202c15ae1 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -966,7 +966,7 @@ load 10s { Query: "sum by (b) (max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))", Start: time.Unix(201, 0), - PeakSamples: 8, + PeakSamples: 7, TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 12,