PromQL.Engine: Refactor Matrix expansion into a method

Add utility method promql.evaluator.expandSeriesToMatrix, for expanding a slice
of storage.Series into a promql.Matrix.

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Arve Knudsen 2024-09-11 14:06:07 +02:00
parent 73e94dc9f7
commit 96845f9b66

View file

@ -1004,6 +1004,8 @@ func extractGroupsFromPath(p []parser.Node) (bool, []string) {
return false, nil
}
// checkAndExpandSeriesSet expands expr's UnexpandedSeriesSet into expr's Series.
// If the Series field is already non-nil, it's a no-op.
func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations.Annotations, error) {
switch e := expr.(type) {
case *parser.MatrixSelector:
@ -1455,6 +1457,66 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate
return result, warnings
}
// expandSeriesToMatrix expands vs.Series into a Matrix.
func (ev *evaluator) expandSeriesToMatrix(ctx context.Context, vs *parser.VectorSelector, start, end, interval int64) Matrix {
numSteps := int((end-start)/interval) + 1
mat := make(Matrix, 0, len(vs.Series))
var prevSS *Series
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
var chkIter chunkenc.Iterator
for _, s := range vs.Series {
if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err)
}
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
ss := Series{
Metric: s.Labels(),
}
for ts, step := start, -1; ts <= end; ts += interval {
step++
_, f, h, ok := ev.vectorSelectorSingle(it, vs, ts)
if !ok {
continue
}
if h == nil {
ev.currentSamples++
ev.samplesStats.IncrementSamplesAtStep(step, 1)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
if ss.Floats == nil {
ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps)
}
ss.Floats = append(ss.Floats, FPoint{F: f, T: ts})
} else {
point := HPoint{H: h, T: ts}
histSize := point.size()
ev.currentSamples += histSize
ev.samplesStats.IncrementSamplesAtStep(step, int64(histSize))
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
if ss.Histograms == nil {
ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps)
}
ss.Histograms = append(ss.Histograms, point)
}
}
if len(ss.Floats)+len(ss.Histograms) > 0 {
mat = append(mat, ss)
prevSS = &mat[len(mat)-1]
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat
}
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
func (ev *evaluator) evalSubquery(ctx context.Context, subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) {
@ -1893,56 +1955,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
if err != nil {
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
}
mat := make(Matrix, 0, len(e.Series))
var prevSS *Series
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
var chkIter chunkenc.Iterator
for i, s := range e.Series {
if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err)
}
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
ss := Series{
Metric: e.Series[i].Labels(),
}
for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval {
step++
_, f, h, ok := ev.vectorSelectorSingle(it, e, ts)
if ok {
if h == nil {
ev.currentSamples++
ev.samplesStats.IncrementSamplesAtStep(step, 1)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
if ss.Floats == nil {
ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps)
}
ss.Floats = append(ss.Floats, FPoint{F: f, T: ts})
} else {
point := HPoint{H: h, T: ts}
histSize := point.size()
ev.currentSamples += histSize
ev.samplesStats.IncrementSamplesAtStep(step, int64(histSize))
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
if ss.Histograms == nil {
ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps)
}
ss.Histograms = append(ss.Histograms, point)
}
}
}
if len(ss.Floats)+len(ss.Histograms) > 0 {
mat = append(mat, ss)
prevSS = &mat[len(mat)-1]
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
mat := ev.expandSeriesToMatrix(ctx, e, ev.startTimestamp, ev.endTimestamp, ev.interval)
return mat, ws
case *parser.MatrixSelector: