Add support for parallel walking of the AST

This commit is contained in:
Thomas Jackson 2023-03-09 14:07:03 -08:00 committed by Rishabh Kumar
parent 7a15d1349f
commit 5d239e85a3
4 changed files with 1138 additions and 1050 deletions

View file

@ -562,10 +562,8 @@ func (ng *Engine) validateOpts(expr *parser.EvalStmt) error {
return nil return nil
} }
var atModifierUsed, negativeOffsetUsed bool _, err := parser.Inspect(context.TODO(), expr, func(node parser.Node, path []parser.Node) error {
var atModifierUsed, negativeOffsetUsed bool
var validationErr error
parser.Inspect(context.TODO(), expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) { switch n := node.(type) {
case *parser.VectorSelector: case *parser.VectorSelector:
if n.Timestamp != nil || n.StartOrEnd == parser.START || n.StartOrEnd == parser.END { if n.Timestamp != nil || n.StartOrEnd == parser.START || n.StartOrEnd == parser.END {
@ -594,18 +592,16 @@ func (ng *Engine) validateOpts(expr *parser.EvalStmt) error {
} }
if atModifierUsed && !ng.enableAtModifier { if atModifierUsed && !ng.enableAtModifier {
validationErr = ErrValidationAtModifierDisabled return ErrValidationAtModifierDisabled
return validationErr
} }
if negativeOffsetUsed && !ng.enableNegativeOffset { if negativeOffsetUsed && !ng.enableNegativeOffset {
validationErr = ErrValidationNegativeOffsetDisabled return ErrValidationNegativeOffsetDisabled
return validationErr
} }
return nil return nil
}, nil) }, nil)
return validationErr return err
} }
// NewTestQuery injects special behaviour into Query for testing. // NewTestQuery injects special behaviour into Query for testing.
@ -879,20 +875,38 @@ func FindMinMaxTime(s *parser.EvalStmt) (int64, int64) {
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
// The evaluation of the VectorSelector inside then evaluates the given range and unsets // The evaluation of the VectorSelector inside then evaluates the given range and unsets
// the variable. // the variable.
var evalRange time.Duration //var evalRange time.Duration
// Since this fork allows for parallel execution of the tree Walk we need a more
// sophisticated datastructure (to avoid conflicts)
ranges := make([]evalRange, 0, 10) // TODO: better size guess?
// We are dual-purposing the lock for both the `ranges` and the `min/max` timestamp variables
l := sync.RWMutex{}
parser.Inspect(context.TODO(), s, func(node parser.Node, path []parser.Node) error { parser.Inspect(context.TODO(), s, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) { switch n := node.(type) {
case *parser.VectorSelector: case *parser.VectorSelector:
l.RLock()
evalRange := findPathRange(path, ranges)
l.RUnlock()
start, end := getTimeRangesForSelector(s, n, path, evalRange) start, end := getTimeRangesForSelector(s, n, path, evalRange)
l.Lock()
if start < minTimestamp { if start < minTimestamp {
minTimestamp = start minTimestamp = start
} }
if end > maxTimestamp { if end > maxTimestamp {
maxTimestamp = end maxTimestamp = end
} }
evalRange = 0 l.Unlock()
case *parser.MatrixSelector: case *parser.MatrixSelector:
evalRange = n.Range l.Lock()
prefix := make([]posrange.PositionRange, len(path))
for i, p := range path {
prefix[i] = p.PositionRange()
}
ranges = append(ranges, evalRange{Prefix: prefix, Range: n.Range})
l.Unlock()
} }
return nil return nil
}, nil) }, nil)
@ -964,14 +978,20 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
// The evaluation of the VectorSelector inside then evaluates the given range and unsets // The evaluation of the VectorSelector inside then evaluates the given range and unsets
// the variable. // the variable.
var evalRange time.Duration //var evalRange time.Duration
l := sync.Mutex{}
// Since this fork allows for parallel execution of the tree Walk we need a more
// sophisticated datastructure (to avoid conflicts)
ranges := make([]evalRange, 0, 10) // TODO: better size guess?
l := sync.RWMutex{}
n, err := parser.Inspect(ctx, s, func(node parser.Node, path []parser.Node) error { n, err := parser.Inspect(ctx, s, func(node parser.Node, path []parser.Node) error {
l.Lock()
defer l.Unlock()
switch n := node.(type) { switch n := node.(type) {
case *parser.VectorSelector: case *parser.VectorSelector:
l.RLock()
evalRange := findPathRange(path, ranges)
l.RUnlock()
start, end := getTimeRangesForSelector(s, n, path, evalRange) start, end := getTimeRangesForSelector(s, n, path, evalRange)
interval := ng.getLastSubqueryInterval(path) interval := ng.getLastSubqueryInterval(path)
if interval == 0 { if interval == 0 {
@ -984,12 +1004,17 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s
Range: durationMilliseconds(evalRange), Range: durationMilliseconds(evalRange),
Func: extractFuncFromPath(path), Func: extractFuncFromPath(path),
} }
evalRange = 0
hints.By, hints.Grouping = extractGroupsFromPath(path) hints.By, hints.Grouping = extractGroupsFromPath(path)
n.UnexpandedSeriesSet = querier.Select(ctx, false, hints, n.LabelMatchers...) n.UnexpandedSeriesSet = querier.Select(ctx, false, hints, n.LabelMatchers...)
case *parser.MatrixSelector: case *parser.MatrixSelector:
evalRange = n.Range l.Lock()
prefix := make([]posrange.PositionRange, len(path))
for i, p := range path {
prefix[i] = p.PositionRange()
}
ranges = append(ranges, evalRange{Prefix: prefix, Range: n.Range})
l.Unlock()
} }
return nil return nil
}, ng.NodeReplacer) }, ng.NodeReplacer)

37
promql/engine_extra.go Normal file
View file

@ -0,0 +1,37 @@
package promql
import (
"time"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
)
func findPathRange(path []parser.Node, eRanges []evalRange) time.Duration {
var (
evalRange time.Duration
depth int
)
for _, r := range eRanges {
// Check if we are a child
child := true
for i, p := range r.Prefix {
if p != path[i].PositionRange() {
child = false
break
}
}
if child && len(r.Prefix) > depth {
evalRange = r.Range
depth = len(r.Prefix)
}
}
return evalRange
}
// evalRange summarizes a defined evalRange (from a MatrixSelector) within the asg
type evalRange struct {
Prefix []posrange.PositionRange
Range time.Duration
}

View file

@ -16,6 +16,7 @@ package parser
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
@ -347,15 +348,35 @@ func Walk(ctx context.Context, v Visitor, s *EvalStmt, node Node, path []Node, n
} }
path = append(path, node) path = append(path, node)
// TODO: parallel execution of children // We parallelize the execution of children
for i, e := range Children(node) { wg := &sync.WaitGroup{}
if childNode, err := Walk(ctx, v, s, e, path, nr); err != nil { children := Children(node)
newChildren := make([]Node, len(children))
errs := make([]error, len(children))
for i, e := range children {
wg.Add(1)
go func(i int, e Node) {
defer wg.Done()
if childNode, childErr := Walk(ctx, v, s, e, append([]Node{}, path...), nr); err != nil {
errs[i] = childErr
} else {
newChildren[i] = childNode
}
}(i, e)
}
wg.Wait()
// If there was an error we return the first one
for _, err := range errs {
if err != nil {
return node, err return node, err
} else {
SetChild(node, i, childNode)
} }
} }
for i, childNode := range newChildren {
SetChild(node, i, childNode)
}
_, err = v.Visit(nil, nil) _, err = v.Visit(nil, nil)
return node, err return node, err
} }

File diff suppressed because it is too large Load diff