diff --git a/promql/engine.go b/promql/engine.go index 56a7774c68..9074912d66 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1352,9 +1352,18 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio unwrapParenExpr(&e.Param) param := unwrapStepInvariantExpr(e.Param) unwrapParenExpr(¶m) - if s, ok := param.(*parser.StringLiteral); ok { - return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - return ev.aggregation(e, sortedGrouping, s.Val, v[0].(Vector), sh[0], enh) + + if e.Op == parser.COUNT_VALUES { + valueLabel := param.(*parser.StringLiteral) + if !model.LabelName(valueLabel.Val).IsValid() { + ev.errorf("invalid label name %q", valueLabel) + } + if !e.Without { + sortedGrouping = append(sortedGrouping, valueLabel.Val) + slices.Sort(sortedGrouping) + } + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.aggregationCountValues(e, sortedGrouping, valueLabel.Val, v[0].(Vector), enh) }, e.Expr) } @@ -2649,44 +2658,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par if op == parser.QUANTILE { q = param.(float64) } - var valueLabel string - var recomputeGroupingKey bool - if op == parser.COUNT_VALUES { - valueLabel = param.(string) - if !model.LabelName(valueLabel).IsValid() { - ev.errorf("invalid label name %q", valueLabel) - } - if !without { - // We're changing the grouping labels so we have to ensure they're still sorted - // and we have to flag to recompute the grouping key. Considering the count_values() - // operator is less frequently used than other aggregations, we're fine having to - // re-compute the grouping key on each step for this case. - grouping = append(grouping, valueLabel) - slices.Sort(grouping) - recomputeGroupingKey = true - } - } - var buf []byte for si, s := range vec { metric := s.Metric - - if op == parser.COUNT_VALUES { - enh.resetBuilder(metric) - enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64)) - metric = enh.lb.Labels() - - // We've changed the metric so we have to recompute the grouping key. - recomputeGroupingKey = true - } - - // We can use the pre-computed grouping key unless grouping labels have changed. - var groupingKey uint64 - if !recomputeGroupingKey { - groupingKey = seriesHelper[si].groupingKey - } else { - groupingKey, buf = generateGroupingKey(metric, grouping, without, buf) - } + groupingKey := seriesHelper[si].groupingKey group, ok := result[groupingKey] // Add a new group if it doesn't exist. @@ -2807,7 +2782,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par group.floatValue = s.F } - case parser.COUNT, parser.COUNT_VALUES: + case parser.COUNT: group.groupCount++ case parser.STDVAR, parser.STDDEV: @@ -2879,7 +2854,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par aggr.floatValue = aggr.floatMean } - case parser.COUNT, parser.COUNT_VALUES: + case parser.COUNT: aggr.floatValue = float64(aggr.groupCount) case parser.STDVAR: @@ -2942,6 +2917,50 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par return enh.Out, annos } +// aggregationK evaluates count_values on vec. +// Outputs as many series per group as there are values in the input. +func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []string, valueLabel string, vec Vector, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + result := map[uint64]*groupedAggregation{} + orderedResult := []*groupedAggregation{} + + var buf []byte + for _, s := range vec { + enh.resetBuilder(s.Metric) + enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64)) + metric := enh.lb.Labels() + + // Considering the count_values() + // operator is less frequently used than other aggregations, we're fine having to + // re-compute the grouping key on each step for this case. + var groupingKey uint64 + groupingKey, buf = generateGroupingKey(metric, grouping, e.Without, buf) + + group, ok := result[groupingKey] + // Add a new group if it doesn't exist. + if !ok { + newAgg := &groupedAggregation{ + labels: generateGroupingLabels(enh, metric, e.Without, grouping), + groupCount: 1, + } + + result[groupingKey] = newAgg + orderedResult = append(orderedResult, newAgg) + continue + } + + group.groupCount++ + } + + // Construct the result Vector from the aggregated groups. + for _, aggr := range orderedResult { + enh.Out = append(enh.Out, Sample{ + Metric: aggr.labels, + F: float64(aggr.groupCount), + }) + } + return enh.Out, nil +} + // groupingKey builds and returns the grouping key for the given metric and // grouping labels. func generateGroupingKey(metric labels.Labels, grouping []string, without bool, buf []byte) (uint64, []byte) {