promql: refactor: extract count_values implementation

The existing aggregation function is very long and covers very different
cases.

`aggregationCountValues` is just for `count_values`, which differs from
other aggregations in that it outputs as many series per group as there
are values in the input.

Remove the top-level switch on string parameter type; use the same `Op`
check there as elswehere.

Pull checking parameters out to caller, where it is only executed once.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-01-16 16:22:34 +00:00
parent 8e04ab6dd4
commit 29244fb841

View file

@ -1352,9 +1352,18 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
unwrapParenExpr(&e.Param)
param := unwrapStepInvariantExpr(e.Param)
unwrapParenExpr(&param)
if s, ok := param.(*parser.StringLiteral); ok {
return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.aggregation(e, sortedGrouping, s.Val, v[0].(Vector), sh[0], enh)
if e.Op == parser.COUNT_VALUES {
valueLabel := param.(*parser.StringLiteral)
if !model.LabelName(valueLabel.Val).IsValid() {
ev.errorf("invalid label name %q", valueLabel)
}
if !e.Without {
sortedGrouping = append(sortedGrouping, valueLabel.Val)
slices.Sort(sortedGrouping)
}
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.aggregationCountValues(e, sortedGrouping, valueLabel.Val, v[0].(Vector), enh)
}, e.Expr)
}
@ -2649,44 +2658,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
if op == parser.QUANTILE {
q = param.(float64)
}
var valueLabel string
var recomputeGroupingKey bool
if op == parser.COUNT_VALUES {
valueLabel = param.(string)
if !model.LabelName(valueLabel).IsValid() {
ev.errorf("invalid label name %q", valueLabel)
}
if !without {
// We're changing the grouping labels so we have to ensure they're still sorted
// and we have to flag to recompute the grouping key. Considering the count_values()
// operator is less frequently used than other aggregations, we're fine having to
// re-compute the grouping key on each step for this case.
grouping = append(grouping, valueLabel)
slices.Sort(grouping)
recomputeGroupingKey = true
}
}
var buf []byte
for si, s := range vec {
metric := s.Metric
if op == parser.COUNT_VALUES {
enh.resetBuilder(metric)
enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64))
metric = enh.lb.Labels()
// We've changed the metric so we have to recompute the grouping key.
recomputeGroupingKey = true
}
// We can use the pre-computed grouping key unless grouping labels have changed.
var groupingKey uint64
if !recomputeGroupingKey {
groupingKey = seriesHelper[si].groupingKey
} else {
groupingKey, buf = generateGroupingKey(metric, grouping, without, buf)
}
groupingKey := seriesHelper[si].groupingKey
group, ok := result[groupingKey]
// Add a new group if it doesn't exist.
@ -2807,7 +2782,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
group.floatValue = s.F
}
case parser.COUNT, parser.COUNT_VALUES:
case parser.COUNT:
group.groupCount++
case parser.STDVAR, parser.STDDEV:
@ -2879,7 +2854,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
aggr.floatValue = aggr.floatMean
}
case parser.COUNT, parser.COUNT_VALUES:
case parser.COUNT:
aggr.floatValue = float64(aggr.groupCount)
case parser.STDVAR:
@ -2942,6 +2917,50 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
return enh.Out, annos
}
// aggregationK evaluates count_values on vec.
// Outputs as many series per group as there are values in the input.
func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []string, valueLabel string, vec Vector, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
result := map[uint64]*groupedAggregation{}
orderedResult := []*groupedAggregation{}
var buf []byte
for _, s := range vec {
enh.resetBuilder(s.Metric)
enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64))
metric := enh.lb.Labels()
// Considering the count_values()
// operator is less frequently used than other aggregations, we're fine having to
// re-compute the grouping key on each step for this case.
var groupingKey uint64
groupingKey, buf = generateGroupingKey(metric, grouping, e.Without, buf)
group, ok := result[groupingKey]
// Add a new group if it doesn't exist.
if !ok {
newAgg := &groupedAggregation{
labels: generateGroupingLabels(enh, metric, e.Without, grouping),
groupCount: 1,
}
result[groupingKey] = newAgg
orderedResult = append(orderedResult, newAgg)
continue
}
group.groupCount++
}
// Construct the result Vector from the aggregated groups.
for _, aggr := range orderedResult {
enh.Out = append(enh.Out, Sample{
Metric: aggr.labels,
F: float64(aggr.groupCount),
})
}
return enh.Out, nil
}
// groupingKey builds and returns the grouping key for the given metric and
// grouping labels.
func generateGroupingKey(metric labels.Labels, grouping []string, without bool, buf []byte) (uint64, []byte) {