promql: refactor: simplify internal data structures

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-02-27 07:00:35 +00:00
parent 5f10d17cef
commit bd9bdccb22

View file

@ -1293,29 +1293,27 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string) (Matrix, annotations.Annotations) {
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
matrixes := make([]Matrix, 2)
origMatrixes := make([]Matrix, 2)
originalNumSamples := ev.currentSamples
var warnings annotations.Annotations
for i, e := range []parser.Expr{aggExpr.Param, aggExpr.Expr} {
// Functions will take string arguments from the expressions, not the values.
if e != nil && e.Type() != parser.ValueTypeString {
// ev.currentSamples will be updated to the correct value within the ev.eval call.
val, ws := ev.eval(e)
// param is the number k for topk/bottomk.
var param float64
if aggExpr.Param != nil {
val, ws := ev.eval(aggExpr.Param)
warnings.Merge(ws)
matrixes[i] = val.(Matrix)
// Keep a copy of the original point slices so that they
// can be returned to the pool.
origMatrixes[i] = make(Matrix, len(matrixes[i]))
copy(origMatrixes[i], matrixes[i])
}
param = val.(Matrix)[0].Floats[0].F
}
// Now fetch the data to be aggregated.
// ev.currentSamples will be updated to the correct value within the ev.eval call.
val, ws := ev.eval(aggExpr.Expr)
warnings.Merge(ws)
inputMatrix := val.(Matrix)
vectors := make([]Vector, 2) // Input vectors for the function.
args := make([]parser.Value, 2) // Argument to function.
biggestLen := len(matrixes[1])
// Keep a copy of the original point slice so that it can be returned to the pool.
origMatrix := inputMatrix
var vector Vector // Input vectors for the function.
biggestLen := len(inputMatrix)
enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)}
type seriesAndTimestamp struct {
Series
@ -1324,16 +1322,14 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash.
tempNumSamples := ev.currentSamples
seriesHelpers := make([][]EvalSeriesHelper, 2)
bufHelpers := make([][]EvalSeriesHelper, 2)
// Prepare a function to initialise series helpers with the grouping key.
// Initialise series helpers with the grouping key.
buf := make([]byte, 0, 1024)
seriesHelpers[1] = make([]EvalSeriesHelper, len(matrixes[1]))
bufHelpers[1] = make([]EvalSeriesHelper, len(matrixes[1]))
seriesHelper := make([]EvalSeriesHelper, len(inputMatrix))
bufHelper := make([]EvalSeriesHelper, len(inputMatrix))
for si, series := range matrixes[1] {
seriesHelpers[1][si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf)
for si, series := range inputMatrix {
seriesHelper[si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf)
}
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
@ -1343,42 +1339,35 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
// Reset number of samples in memory after each timestamp.
ev.currentSamples = tempNumSamples
// Gather input vectors for this timestamp.
for i := range []parser.Expr{aggExpr.Param, aggExpr.Expr} {
vectors[i] = vectors[i][:0]
bufHelpers[i] = bufHelpers[i][:0]
{
vector = vector[:0]
bufHelper = bufHelper[:0]
for si, series := range matrixes[i] {
for si, series := range inputMatrix {
switch {
case len(series.Floats) > 0 && series.Floats[0].T == ts:
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts})
vector = append(vector, Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts})
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
matrixes[i][si].Floats = series.Floats[1:]
inputMatrix[si].Floats = series.Floats[1:]
case len(series.Histograms) > 0 && series.Histograms[0].T == ts:
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts})
matrixes[i][si].Histograms = series.Histograms[1:]
vector = append(vector, Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts})
inputMatrix[si].Histograms = series.Histograms[1:]
default:
continue
}
if seriesHelpers[i] != nil {
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
}
bufHelper = append(bufHelper, seriesHelper[si])
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
args[i] = vectors[i]
ev.samplesStats.UpdatePeak(ev.currentSamples)
}
// Make the function call.
enh.Ts = ts
var param float64
if aggExpr.Param != nil {
param = args[0].(Vector)[0].F
}
result, ws := ev.aggregation(aggExpr, sortedGrouping, param, args[1].(Vector), bufHelpers[1], enh)
result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh)
enh.Out = result[:0] // Reuse result vector.
warnings.Merge(ws)
@ -1440,13 +1429,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
}
}
// Reuse the original point slices.
for _, m := range origMatrixes {
for _, s := range m {
// Reuse the original point slice.
for _, s := range origMatrix {
putFPointSlice(s.Floats)
putHPointSlice(s.Histograms)
}
}
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
mat := make(Matrix, 0, len(seriess))
for _, ss := range seriess {