promql: misc fixes

This commit is contained in:
Fabian Reinartz 2016-12-28 09:16:48 +01:00
parent fecf9532b9
commit 71fe0c58a8
11 changed files with 394 additions and 344 deletions

View file

@ -205,7 +205,9 @@ func (b *Builder) Labels() Labels {
return b.base
}
res := make(Labels, 0, len(b.base)+len(b.add)-len(b.del))
// In the general case, labels are removed, modified or moved
// rather than added.
res := make(Labels, 0, len(b.base))
Outer:
for _, l := range b.base {
for _, n := range b.del {

View file

@ -184,7 +184,7 @@ func (e *BinaryExpr) Type() ValueType {
if e.LHS.Type() == ValueTypeScalar && e.RHS.Type() == ValueTypeScalar {
return ValueTypeScalar
}
return ValueTypeScalar
return ValueTypeVector
}
func (*AggregateExpr) expr() {}

View file

@ -19,11 +19,12 @@ import (
"math"
"runtime"
"sort"
"strings"
"strconv"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/storage"
"golang.org/x/net/context"
@ -42,163 +43,6 @@ func convertibleToInt64(v float64) bool {
return v <= maxInt64 && v >= minInt64
}
// Value is a generic interface for values resulting from a query evaluation.
type Value interface {
Type() ValueType
String() string
}
func (Matrix) Type() ValueType { return ValueTypeMatrix }
func (Vector) Type() ValueType { return ValueTypeVector }
func (Scalar) Type() ValueType { return ValueTypeScalar }
func (String) Type() ValueType { return ValueTypeString }
// ValueType describes a type of a value.
type ValueType string
// The valid value types.
const (
ValueTypeNone = "none"
ValueTypeVector = "vector"
ValueTypeScalar = "scalar"
ValueTypeMatrix = "matrix"
ValueTypeString = "string"
)
// String represents a string value.
type String struct {
V string
T int64
}
func (s String) String() string {
return s.V
}
// Scalar is a data point that's explicitly not associated with a metric.
type Scalar struct {
T int64
V float64
}
func (s Scalar) String() string {
return ""
}
// Series is a stream of data points belonging to a metric.
type Series struct {
Metric labels.Labels
Points []Point
}
func (s Series) String() string {
return ""
}
// Point represents a single data point for a given timestamp.
type Point struct {
T int64
V float64
}
func (s Point) String() string {
return ""
}
// Sample is a single sample belonging to a metric.
type Sample struct {
Point
Metric labels.Labels
}
func (s Sample) String() string {
return ""
}
// Vector is basically only an alias for model.Samples, but the
// contract is that in a Vector, all Samples have the same timestamp.
type Vector []Sample
func (vec Vector) String() string {
entries := make([]string, len(vec))
for i, s := range vec {
entries[i] = s.String()
}
return strings.Join(entries, "\n")
}
// Matrix is a slice of Seriess that implements sort.Interface and
// has a String method.
type Matrix []Series
func (m Matrix) String() string {
// TODO(fabxc): sort, or can we rely on order from the querier?
strs := make([]string, len(m))
for i, ss := range m {
strs[i] = ss.String()
}
return strings.Join(strs, "\n")
}
// Result holds the resulting value of an execution or an error
// if any occurred.
type Result struct {
Err error
Value Value
}
// Vector returns a Vector if the result value is one. An error is returned if
// the result was an error or the result value is not a Vector.
func (r *Result) Vector() (Vector, error) {
if r.Err != nil {
return nil, r.Err
}
v, ok := r.Value.(Vector)
if !ok {
return nil, fmt.Errorf("query result is not a Vector")
}
return v, nil
}
// Matrix returns a Matrix. An error is returned if
// the result was an error or the result value is not a Matrix.
func (r *Result) Matrix() (Matrix, error) {
if r.Err != nil {
return nil, r.Err
}
v, ok := r.Value.(Matrix)
if !ok {
return nil, fmt.Errorf("query result is not a range Vector")
}
return v, nil
}
// Scalar returns a Scalar value. An error is returned if
// the result was an error or the result value is not a Scalar.
func (r *Result) Scalar() (Scalar, error) {
if r.Err != nil {
return Scalar{}, r.Err
}
v, ok := r.Value.(Scalar)
if !ok {
return Scalar{}, fmt.Errorf("query result is not a Scalar")
}
return v, nil
}
func (r *Result) String() string {
if r.Err != nil {
return r.Err.Error()
}
if r.Value == nil {
return ""
}
return r.Value.String()
}
type (
// ErrQueryTimeout is returned if a query timed out during processing.
ErrQueryTimeout string
@ -531,11 +375,17 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
Inspect(s.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
if n.Offset > maxOffset {
if maxOffset < StalenessDelta {
maxOffset = StalenessDelta
}
if n.Offset+StalenessDelta > maxOffset {
maxOffset = n.Offset + StalenessDelta
}
case *MatrixSelector:
if n.Offset > maxOffset {
if maxOffset < n.Range {
maxOffset = n.Range
}
if n.Offset+n.Range > maxOffset {
maxOffset = n.Offset + n.Range
}
}
@ -544,7 +394,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
mint := s.Start.Add(-maxOffset)
querier, err := ng.queryable.Querier(timeMilliseconds(mint), timeMilliseconds(s.End))
querier, err := ng.queryable.Querier(timestamp.FromTime(mint), timestamp.FromTime(s.End))
if err != nil {
return nil, err
}
@ -554,6 +404,8 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
case *VectorSelector:
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...))
if err != nil {
// TODO(fabxc): use multi-error.
log.Errorln("expand series set:", err)
return false
}
for _, s := range n.series {
@ -564,6 +416,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
case *MatrixSelector:
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...))
if err != nil {
log.Errorln("expand series set:", err)
return false
}
for _, s := range n.series {
@ -736,7 +589,7 @@ func (ev *evaluator) eval(expr Expr) Value {
return e.Func.Call(ev, e.Args)
case *MatrixSelector:
return ev.MatrixSelector(e)
return ev.matrixSelector(e)
case *NumberLiteral:
return Scalar{V: e.Val, T: ev.Timestamp}
@ -763,35 +616,33 @@ func (ev *evaluator) eval(expr Expr) Value {
return se
case *VectorSelector:
return ev.VectorSelector(e)
return ev.vectorSelector(e)
}
panic(fmt.Errorf("unhandled expression of type: %T", expr))
}
// VectorSelector evaluates a *VectorSelector expression.
func (ev *evaluator) VectorSelector(node *VectorSelector) Vector {
// vectorSelector evaluates a *VectorSelector expression.
func (ev *evaluator) vectorSelector(node *VectorSelector) Vector {
var (
ok bool
vec = make(Vector, 0, len(node.series))
refTime = ev.Timestamp - durationMilliseconds(node.Offset)
)
for i, it := range node.iterators {
if !it.Seek(refTime) {
ok := it.Seek(refTime)
if !ok {
if it.Err() != nil {
ev.error(it.Err())
}
continue
}
t, v := it.Values()
if t > refTime {
if !ok || t > refTime {
t, v, ok = it.PeekBack()
if !ok || t < refTime-durationMilliseconds(StalenessDelta) {
continue
}
}
vec = append(vec, Sample{
Metric: node.series[i].Labels(),
Point: Point{V: v, T: ev.Timestamp},
@ -800,8 +651,8 @@ func (ev *evaluator) VectorSelector(node *VectorSelector) Vector {
return vec
}
// MatrixSelector evaluates a *MatrixSelector expression.
func (ev *evaluator) MatrixSelector(node *MatrixSelector) Matrix {
// matrixSelector evaluates a *MatrixSelector expression.
func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
var (
offset = durationMilliseconds(node.Offset)
maxt = ev.Timestamp - offset
@ -815,28 +666,30 @@ func (ev *evaluator) MatrixSelector(node *MatrixSelector) Matrix {
Points: make([]Point, 0, 16),
}
if !it.Seek(maxt) {
ok := it.Seek(maxt)
if !ok {
if it.Err() != nil {
ev.error(it.Err())
}
continue
}
t, v := it.Values()
buf := it.Buffer()
for buf.Next() {
t, v := buf.Values()
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
ss.Points = append(ss.Points, Point{T: t + offset, V: v})
ss.Points = append(ss.Points, Point{T: t, V: v})
}
}
// The seeked sample might also be in the range.
t, v := it.Values()
t, v = it.Values()
if t == maxt {
ss.Points = append(ss.Points, Point{T: t + offset, V: v})
ss.Points = append(ss.Points, Point{T: t, V: v})
}
if len(ss.Points) > 0 {
Matrix = append(Matrix, ss)
}
Matrix = append(Matrix, ss)
}
return Matrix
}
@ -999,7 +852,7 @@ func (ev *evaluator) VectorBinop(op itemType, lhs, rhs Vector, matching *VectorM
}
func hashWithoutLabels(lset labels.Labels, names ...string) uint64 {
cm := make(labels.Labels, 0, len(lset)-len(names)-1)
cm := make(labels.Labels, 0, len(lset))
Outer:
for _, l := range lset {
@ -1226,26 +1079,36 @@ func (ev *evaluator) aggregation(op itemType, grouping []string, without bool, k
for _, s := range vec {
lb := labels.NewBuilder(s.Metric)
if without || keepCommon {
if without {
lb.Del(grouping...)
lb.Del(labels.MetricName)
}
if op == itemCountValues {
lb.Set(valueLabel, fmt.Sprintf("%f", s.V)) // TODO(fabxc): use correct printing.
lb.Set(valueLabel, strconv.FormatFloat(float64(s.V), 'f', -1, 64))
}
var (
groupingKey uint64
metric = lb.Labels()
groupingKey = metric.Hash()
)
if without {
groupingKey = metric.Hash()
} else {
groupingKey = hashForLabels(metric, grouping...)
}
group, ok := result[groupingKey]
// Add a new group if it doesn't exist.
if !ok {
var m labels.Labels
if keepCommon || without {
if keepCommon {
m = lb.Del(labels.MetricName).Labels()
} else if without {
m = metric
} else {
m = make(labels.Labels, 0, len(grouping))
for _, l := range s.Metric {
for _, l := range metric {
for _, n := range grouping {
if l.Name == n {
m = append(m, labels.Label{Name: n, Value: l.Value})
@ -1253,6 +1116,7 @@ func (ev *evaluator) aggregation(op itemType, grouping []string, without bool, k
}
}
}
sort.Sort(m)
}
result[groupingKey] = &groupedAggregation{
labels: m,
@ -1451,10 +1315,10 @@ func (g *queryGate) Done() {
// user facing terminology as defined in the documentation.
func documentedType(t ValueType) string {
switch t {
case "Vector":
return "instant Vector"
case "Matrix":
return "range Vector"
case "vector":
return "instant vector"
case "matrix":
return "range vector"
default:
return string(t)
}

View file

@ -49,13 +49,14 @@ func funcTime(ev *evaluator, args Expressions) Value {
func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Value {
ms := arg.(*MatrixSelector)
rangeStart := ev.Timestamp - durationMilliseconds(ms.Range+ms.Offset)
rangeEnd := ev.Timestamp - durationMilliseconds(ms.Offset)
var (
matrix = ev.evalMatrix(ms)
rangeStart = ev.Timestamp - durationMilliseconds(ms.Range+ms.Offset)
rangeEnd = ev.Timestamp - durationMilliseconds(ms.Offset)
resultVector = make(Vector, 0, len(matrix))
)
resultVector := Vector{}
MatrixValue := ev.evalMatrix(ms)
for _, samples := range MatrixValue {
for _, samples := range matrix {
// No sense in trying to compute a rate without at least two points. Drop
// this Vector element.
if len(samples.Points) < 2 {
@ -74,11 +75,11 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu
resultValue := lastValue - samples.Points[0].V + counterCorrection
// Duration between first/last samples and boundary of range.
durationToStart := float64(samples.Points[0].T - rangeStart)
durationToEnd := float64(rangeEnd - samples.Points[len(samples.Points)-1].T)
durationToStart := float64(samples.Points[0].T-rangeStart) / 1000
durationToEnd := float64(rangeEnd-samples.Points[len(samples.Points)-1].T) / 1000
sampledInterval := float64(samples.Points[len(samples.Points)-1].T - samples.Points[0].T)
averageDurationBetweenSamples := float64(sampledInterval) / float64(len(samples.Points)-1)
sampledInterval := float64(samples.Points[len(samples.Points)-1].T-samples.Points[0].T) / 1000
averageDurationBetweenSamples := sampledInterval / float64(len(samples.Points)-1)
if isCounter && resultValue > 0 && samples.Points[0].V >= 0 {
// Counters cannot be negative. If we have any slope at
@ -88,7 +89,7 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu
// take the zero point as the start of the series,
// thereby avoiding extrapolation to negative counter
// values.
durationToZero := float64(sampledInterval) * float64(samples.Points[0].V/resultValue)
durationToZero := sampledInterval * (samples.Points[0].V / resultValue)
if durationToZero < durationToStart {
durationToStart = durationToZero
}
@ -111,9 +112,9 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu
} else {
extrapolateToInterval += averageDurationBetweenSamples / 2
}
resultValue = resultValue * extrapolateToInterval / sampledInterval
resultValue = resultValue * (extrapolateToInterval / sampledInterval)
if isRate {
resultValue = resultValue / 1000 / ms.Range.Seconds()
resultValue = resultValue / ms.Range.Seconds()
}
resultVector = append(resultVector, Sample{
@ -303,7 +304,9 @@ func funcSortDesc(ev *evaluator, args Expressions) Value {
func funcClampMax(ev *evaluator, args Expressions) Value {
vec := ev.evalVector(args[0])
max := ev.evalFloat(args[1])
for _, el := range vec {
for i := range vec {
el := &vec[i]
el.Metric = dropMetricName(el.Metric)
el.V = math.Min(max, float64(el.V))
}
@ -314,7 +317,9 @@ func funcClampMax(ev *evaluator, args Expressions) Value {
func funcClampMin(ev *evaluator, args Expressions) Value {
vec := ev.evalVector(args[0])
min := ev.evalFloat(args[1])
for _, el := range vec {
for i := range vec {
el := &vec[i]
el.Metric = dropMetricName(el.Metric)
el.V = math.Max(min, float64(el.V))
}
@ -356,7 +361,8 @@ func funcDropCommonLabels(ev *evaluator, args Expressions) Value {
cnames = append(cnames, n)
}
for _, el := range vec {
for i := range vec {
el := &vec[i]
el.Metric = labels.NewBuilder(el.Metric).Del(cnames...).Labels()
}
return vec
@ -374,7 +380,9 @@ func funcRound(ev *evaluator, args Expressions) Value {
toNearestInverse := 1.0 / toNearest
vec := ev.evalVector(args[0])
for _, el := range vec {
for i := range vec {
el := &vec[i]
el.Metric = dropMetricName(el.Metric)
el.V = math.Floor(float64(el.V)*toNearestInverse+0.5) / toNearestInverse
}
@ -396,7 +404,7 @@ func funcScalar(ev *evaluator, args Expressions) Value {
}
}
// === count_Scalar(Vector ValueTypeVector) float64 ===
// === count_scalar(Vector ValueTypeVector) float64 ===
func funcCountScalar(ev *evaluator, args Expressions) Value {
return Scalar{
V: float64(len(ev.evalVector(args[0]))),
@ -441,12 +449,14 @@ func funcCountOverTime(ev *evaluator, args Expressions) Value {
// === floor(Vector ValueTypeVector) Vector ===
func funcFloor(ev *evaluator, args Expressions) Value {
Vector := ev.evalVector(args[0])
for _, el := range Vector {
vec := ev.evalVector(args[0])
for i := range vec {
el := &vec[i]
el.Metric = dropMetricName(el.Metric)
el.V = math.Floor(float64(el.V))
}
return Vector
return vec
}
// === max_over_time(Matrix ValueTypeMatrix) Vector ===
@ -536,12 +546,14 @@ func funcStdvarOverTime(ev *evaluator, args Expressions) Value {
// === abs(Vector ValueTypeVector) Vector ===
func funcAbs(ev *evaluator, args Expressions) Value {
Vector := ev.evalVector(args[0])
for _, el := range Vector {
vec := ev.evalVector(args[0])
for i := range vec {
el := &vec[i]
el.Metric = dropMetricName(el.Metric)
el.V = math.Abs(float64(el.V))
}
return Vector
return vec
}
// === absent(Vector ValueTypeVector) Vector ===
@ -568,62 +580,74 @@ func funcAbsent(ev *evaluator, args Expressions) Value {
// === ceil(Vector ValueTypeVector) Vector ===
func funcCeil(ev *evaluator, args Expressions) Value {
Vector := ev.evalVector(args[0])
for _, el := range Vector {
vec := ev.evalVector(args[0])
for i := range vec {
el := &vec[i]
el.Metric = dropMetricName(el.Metric)
el.V = math.Ceil(float64(el.V))
}
return Vector
return vec
}
// === exp(Vector ValueTypeVector) Vector ===
func funcExp(ev *evaluator, args Expressions) Value {
Vector := ev.evalVector(args[0])
for _, el := range Vector {
vec := ev.evalVector(args[0])
for i := range vec {
el := &vec[i]
el.Metric = dropMetricName(el.Metric)
el.V = math.Exp(float64(el.V))
}
return Vector
return vec
}
// === sqrt(Vector VectorNode) Vector ===
func funcSqrt(ev *evaluator, args Expressions) Value {
Vector := ev.evalVector(args[0])
for _, el := range Vector {
vec := ev.evalVector(args[0])
for i := range vec {
el := &vec[i]
el.Metric = dropMetricName(el.Metric)
el.V = math.Sqrt(float64(el.V))
}
return Vector
return vec
}
// === ln(Vector ValueTypeVector) Vector ===
func funcLn(ev *evaluator, args Expressions) Value {
Vector := ev.evalVector(args[0])
for _, el := range Vector {
vec := ev.evalVector(args[0])
for i := range vec {
el := &vec[i]
el.Metric = dropMetricName(el.Metric)
el.V = math.Log(float64(el.V))
}
return Vector
return vec
}
// === log2(Vector ValueTypeVector) Vector ===
func funcLog2(ev *evaluator, args Expressions) Value {
Vector := ev.evalVector(args[0])
for _, el := range Vector {
vec := ev.evalVector(args[0])
for i := range vec {
el := &vec[i]
el.Metric = dropMetricName(el.Metric)
el.V = math.Log2(float64(el.V))
}
return Vector
return vec
}
// === log10(Vector ValueTypeVector) Vector ===
func funcLog10(ev *evaluator, args Expressions) Value {
Vector := ev.evalVector(args[0])
for _, el := range Vector {
vec := ev.evalVector(args[0])
for i := range vec {
el := &vec[i]
el.Metric = dropMetricName(el.Metric)
el.V = math.Log10(float64(el.V))
}
return Vector
return vec
}
// linearRegression performs a least-square linear regression analysis on the
@ -636,7 +660,7 @@ func linearRegression(samples []Point, interceptTime int64) (slope, intercept fl
sumXY, sumX2 float64
)
for _, sample := range samples {
x := float64(sample.T-interceptTime) / 1e6
x := float64(sample.T-interceptTime) / 1e3
n += 1.0
sumY += sample.V
sumX += x
@ -786,7 +810,7 @@ func funcChanges(ev *evaluator, args Expressions) Value {
// === label_replace(Vector ValueTypeVector, dst_label, replacement, src_labelname, regex ValueTypeString) Vector ===
func funcLabelReplace(ev *evaluator, args Expressions) Value {
var (
Vector = ev.evalVector(args[0])
vector = ev.evalVector(args[0])
dst = ev.evalString(args[1]).V
repl = ev.evalString(args[2]).V
src = ev.evalString(args[3]).V
@ -801,8 +825,10 @@ func funcLabelReplace(ev *evaluator, args Expressions) Value {
ev.errorf("invalid destination label name in label_replace(): %s", dst)
}
outSet := make(map[uint64]struct{}, len(Vector))
for _, el := range Vector {
outSet := make(map[uint64]struct{}, len(vector))
for i := range vector {
el := &vector[i]
srcVal := el.Metric.Get(src)
indexes := regex.FindStringSubmatchIndex(srcVal)
// If there is no match, no replacement should take place.
@ -825,7 +851,7 @@ func funcLabelReplace(ev *evaluator, args Expressions) Value {
}
}
return Vector
return vector
}
// === Vector(s Scalar) Vector ===
@ -851,7 +877,9 @@ func dateWrapper(ev *evaluator, args Expressions, f func(time.Time) float64) Val
} else {
v = ev.evalVector(args[0])
}
for _, el := range v {
for i := range v {
el := &v[i]
el.Metric = dropMetricName(el.Metric)
t := time.Unix(int64(el.V), 0).UTC()
el.V = f(t)
@ -957,8 +985,8 @@ var functions = map[string]*Function{
ReturnType: ValueTypeVector,
Call: funcCountOverTime,
},
"count_Scalar": {
Name: "count_Scalar",
"count_scalar": {
Name: "count_scalar",
ArgTypes: []ValueType{ValueTypeVector},
ReturnType: ValueTypeScalar,
Call: funcCountScalar,
@ -1132,8 +1160,8 @@ var functions = map[string]*Function{
ReturnType: ValueTypeVector,
Call: funcRound,
},
"Scalar": {
Name: "Scalar",
"scalar": {
Name: "scalar",
ArgTypes: []ValueType{ValueTypeVector},
ReturnType: ValueTypeScalar,
Call: funcScalar,
@ -1180,8 +1208,8 @@ var functions = map[string]*Function{
ReturnType: ValueTypeScalar,
Call: funcTime,
},
"Vector": {
Name: "Vector",
"vector": {
Name: "vector",
ArgTypes: []ValueType{ValueTypeScalar},
ReturnType: ValueTypeVector,
Call: funcVector,
@ -1219,7 +1247,7 @@ func (s vectorByValueHeap) Swap(i, j int) {
}
func (s *vectorByValueHeap) Push(x interface{}) {
*s = append(*s, x.(Sample))
*s = append(*s, *(x.(*Sample)))
}
func (s *vectorByValueHeap) Pop() interface{} {
@ -1248,7 +1276,7 @@ func (s vectorByReverseValueHeap) Swap(i, j int) {
}
func (s *vectorByReverseValueHeap) Push(x interface{}) {
*s = append(*s, x.(Sample))
*s = append(*s, *(x.(*Sample)))
}
func (s *vectorByReverseValueHeap) Pop() interface{} {

View file

@ -506,7 +506,7 @@ func (p *parser) balance(lhs Expr, op itemType, rhs Expr, vecMatching *VectorMat
if (precd < 0) || (precd == 0 && op.isRightAssociative()) {
balanced := p.balance(lhsBE.RHS, op, rhs, vecMatching, returnBool)
if lhsBE.Op.isComparisonOperator() && !lhsBE.ReturnBool && balanced.Type() == ValueTypeScalar && lhsBE.LHS.Type() == ValueTypeScalar {
p.errorf("comparisons between Scalars must use BOOL modifier")
p.errorf("comparisons between scalars must use BOOL modifier")
}
return &BinaryExpr{
Op: lhsBE.Op,
@ -518,7 +518,7 @@ func (p *parser) balance(lhs Expr, op itemType, rhs Expr, vecMatching *VectorMat
}
}
if op.isComparisonOperator() && !returnBool && rhs.Type() == ValueTypeScalar && lhs.Type() == ValueTypeScalar {
p.errorf("comparisons between Scalars must use BOOL modifier")
p.errorf("comparisons between scalars must use BOOL modifier")
}
return &BinaryExpr{
Op: op,
@ -935,7 +935,7 @@ func (p *parser) offset() time.Duration {
return offset
}
// VectorSelector parses a new (instant) Vector selector.
// VectorSelector parses a new (instant) vector selector.
//
// <metric_identifier> [<label_matchers>]
// [<metric_identifier>] <label_matchers>
@ -962,7 +962,7 @@ func (p *parser) VectorSelector(name string) *VectorSelector {
}
if len(matchers) == 0 {
p.errorf("Vector selector must contain label matchers or metric name")
p.errorf("vector selector must contain label matchers or metric name")
}
// A Vector selector must contain at least one non-empty matcher to prevent
// implicit selection of all metrics (e.g. by a typo).
@ -974,7 +974,7 @@ func (p *parser) VectorSelector(name string) *VectorSelector {
}
}
if !notEmpty {
p.errorf("Vector selector must contain at least one non-empty matcher")
p.errorf("vector selector must contain at least one non-empty matcher")
}
return &VectorSelector{
@ -1028,7 +1028,7 @@ func (p *parser) checkType(node Node) (typ ValueType) {
case *RecordStmt:
ty := p.checkType(n.Expr)
if ty != ValueTypeVector && ty != ValueTypeScalar {
p.errorf("record statement must have a valid expression of type instant Vector or Scalar but got %s", documentedType(ty))
p.errorf("record statement must have a valid expression of type instant vector or scalar but got %s", documentedType(ty))
}
case Expressions:
@ -1058,12 +1058,12 @@ func (p *parser) checkType(node Node) (typ ValueType) {
p.errorf("binary expression does not support operator %q", n.Op)
}
if (lt != ValueTypeScalar && lt != ValueTypeVector) || (rt != ValueTypeScalar && rt != ValueTypeVector) {
p.errorf("binary expression must contain only Scalar and instant Vector types")
p.errorf("binary expression must contain only scalar and instant vector types")
}
if (lt != ValueTypeVector || rt != ValueTypeVector) && n.VectorMatching != nil {
if len(n.VectorMatching.MatchingLabels) > 0 {
p.errorf("Vector matching only allowed between instant Vectors")
p.errorf("vector matching only allowed between instant vectors")
}
n.VectorMatching = nil
} else {
@ -1079,7 +1079,7 @@ func (p *parser) checkType(node Node) (typ ValueType) {
}
if (lt == ValueTypeScalar || rt == ValueTypeScalar) && n.Op.isSetOperator() {
p.errorf("set operator %q not allowed in binary Scalar expression", n.Op)
p.errorf("set operator %q not allowed in binary scalar expression", n.Op)
}
case *Call:
@ -1102,7 +1102,7 @@ func (p *parser) checkType(node Node) (typ ValueType) {
p.errorf("only + and - operators allowed for unary expressions")
}
if t := p.checkType(n.Expr); t != ValueTypeScalar && t != ValueTypeVector {
p.errorf("unary expression only allowed on expressions of type Scalar or instant Vector, got %q", documentedType(t))
p.errorf("unary expression only allowed on expressions of type scalar or instant vector, got %q", documentedType(t))
}
case *NumberLiteral, *MatrixSelector, *StringLiteral, *VectorSelector:

View file

@ -23,6 +23,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
)
var testExpr = []struct {
@ -31,7 +32,7 @@ var testExpr = []struct {
fail bool // Whether parsing is supposed to fail.
errMsg string // If not empty the parsing error has to contain this string.
}{
// Scalars and Scalar-to-Scalar operations.
// Scalars and scalar-to-scalar operations.
{
input: "1",
expected: &NumberLiteral{1},
@ -212,19 +213,19 @@ var testExpr = []struct {
}, {
input: "1 and 1",
fail: true,
errMsg: "set operator \"and\" not allowed in binary Scalar expression",
errMsg: "set operator \"and\" not allowed in binary scalar expression",
}, {
input: "1 == 1",
fail: true,
errMsg: "parse error at char 7: comparisons between Scalars must use BOOL modifier",
errMsg: "parse error at char 7: comparisons between scalars must use BOOL modifier",
}, {
input: "1 or 1",
fail: true,
errMsg: "set operator \"or\" not allowed in binary Scalar expression",
errMsg: "set operator \"or\" not allowed in binary scalar expression",
}, {
input: "1 unless 1",
fail: true,
errMsg: "set operator \"unless\" not allowed in binary Scalar expression",
errMsg: "set operator \"unless\" not allowed in binary scalar expression",
}, {
input: "1 !~ 1",
fail: true,
@ -236,11 +237,11 @@ var testExpr = []struct {
}, {
input: `-"string"`,
fail: true,
errMsg: `unary expression only allowed on expressions of type Scalar or instant Vector, got "string"`,
errMsg: `unary expression only allowed on expressions of type scalar or instant vector, got "string"`,
}, {
input: `-test[5m]`,
fail: true,
errMsg: `unary expression only allowed on expressions of type Scalar or instant Vector, got "range Vector"`,
errMsg: `unary expression only allowed on expressions of type scalar or instant vector, got "range vector"`,
}, {
input: `*test`,
fail: true,
@ -747,35 +748,35 @@ var testExpr = []struct {
}, {
input: "foo and 1",
fail: true,
errMsg: "set operator \"and\" not allowed in binary Scalar expression",
errMsg: "set operator \"and\" not allowed in binary scalar expression",
}, {
input: "1 and foo",
fail: true,
errMsg: "set operator \"and\" not allowed in binary Scalar expression",
errMsg: "set operator \"and\" not allowed in binary scalar expression",
}, {
input: "foo or 1",
fail: true,
errMsg: "set operator \"or\" not allowed in binary Scalar expression",
errMsg: "set operator \"or\" not allowed in binary scalar expression",
}, {
input: "1 or foo",
fail: true,
errMsg: "set operator \"or\" not allowed in binary Scalar expression",
errMsg: "set operator \"or\" not allowed in binary scalar expression",
}, {
input: "foo unless 1",
fail: true,
errMsg: "set operator \"unless\" not allowed in binary Scalar expression",
errMsg: "set operator \"unless\" not allowed in binary scalar expression",
}, {
input: "1 unless foo",
fail: true,
errMsg: "set operator \"unless\" not allowed in binary Scalar expression",
errMsg: "set operator \"unless\" not allowed in binary scalar expression",
}, {
input: "1 or on(bar) foo",
fail: true,
errMsg: "Vector matching only allowed between instant Vectors",
errMsg: "vector matching only allowed between instant vectors",
}, {
input: "foo == on(bar) 10",
fail: true,
errMsg: "Vector matching only allowed between instant Vectors",
errMsg: "vector matching only allowed between instant vectors",
}, {
input: "foo and on(bar) group_left(baz) bar",
fail: true,
@ -914,23 +915,23 @@ var testExpr = []struct {
}, {
input: `{}`,
fail: true,
errMsg: "Vector selector must contain label matchers or metric name",
errMsg: "vector selector must contain label matchers or metric name",
}, {
input: `{x=""}`,
fail: true,
errMsg: "Vector selector must contain at least one non-empty matcher",
errMsg: "vector selector must contain at least one non-empty matcher",
}, {
input: `{x=~".*"}`,
fail: true,
errMsg: "Vector selector must contain at least one non-empty matcher",
errMsg: "vector selector must contain at least one non-empty matcher",
}, {
input: `{x!~".+"}`,
fail: true,
errMsg: "Vector selector must contain at least one non-empty matcher",
errMsg: "vector selector must contain at least one non-empty matcher",
}, {
input: `{x!="a"}`,
fail: true,
errMsg: "Vector selector must contain at least one non-empty matcher",
errMsg: "vector selector must contain at least one non-empty matcher",
}, {
input: `foo{__name__="bar"}`,
fail: true,
@ -1285,11 +1286,11 @@ var testExpr = []struct {
}, {
input: `topk(some_metric, other_metric)`,
fail: true,
errMsg: "parse error at char 32: expected type Scalar in aggregation parameter, got instant Vector",
errMsg: "parse error at char 32: expected type scalar in aggregation parameter, got instant vector",
}, {
input: `count_values(5, other_metric)`,
fail: true,
errMsg: "parse error at char 30: expected type string in aggregation parameter, got Scalar",
errMsg: "parse error at char 30: expected type string in aggregation parameter, got scalar",
},
// Test function calls.
{
@ -1363,7 +1364,7 @@ var testExpr = []struct {
}, {
input: "floor(1)",
fail: true,
errMsg: "expected type instant Vector in call to function \"floor\", got Scalar",
errMsg: "expected type instant vector in call to function \"floor\", got scalar",
}, {
input: "non_existent_function_far_bar()",
fail: true,
@ -1371,7 +1372,7 @@ var testExpr = []struct {
}, {
input: "rate(some_metric)",
fail: true,
errMsg: "expected type range Vector in call to function \"rate\", got instant Vector",
errMsg: "expected type range vector in call to function \"rate\", got instant vector",
},
// Fuzzing regression tests.
{
@ -1532,7 +1533,7 @@ var testStatement = []struct {
summary = "Global request rate low",
description = "The global request rate is low"
}
foo = bar{label1="value1"}
ALERT BazAlert IF foo > 10
@ -1591,7 +1592,6 @@ var testStatement = []struct {
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "bar"),
},
},
Labels: nil,
},
&AlertStmt{
Name: "BazAlert",
@ -1605,7 +1605,6 @@ var testStatement = []struct {
},
RHS: &NumberLiteral{10},
},
Labels: labels.Labels{},
Annotations: labels.FromStrings(
"summary", "Baz",
"description", "BazAlert",
@ -1782,59 +1781,41 @@ func mustGetFunction(name string) *Function {
var testSeries = []struct {
input string
expectedMetric model.Metric
expectedMetric labels.Labels
expectedValues []sequenceValue
fail bool
}{
{
input: `{} 1 2 3`,
expectedMetric: model.Metric{},
expectedMetric: labels.Labels{},
expectedValues: newSeq(1, 2, 3),
}, {
input: `{a="b"} -1 2 3`,
expectedMetric: model.Metric{
"a": "b",
},
input: `{a="b"} -1 2 3`,
expectedMetric: labels.FromStrings("a", "b"),
expectedValues: newSeq(-1, 2, 3),
}, {
input: `my_metric 1 2 3`,
expectedMetric: model.Metric{
model.MetricNameLabel: "my_metric",
},
input: `my_metric 1 2 3`,
expectedMetric: labels.FromStrings(labels.MetricName, "my_metric"),
expectedValues: newSeq(1, 2, 3),
}, {
input: `my_metric{} 1 2 3`,
expectedMetric: model.Metric{
model.MetricNameLabel: "my_metric",
},
input: `my_metric{} 1 2 3`,
expectedMetric: labels.FromStrings(labels.MetricName, "my_metric"),
expectedValues: newSeq(1, 2, 3),
}, {
input: `my_metric{a="b"} 1 2 3`,
expectedMetric: model.Metric{
model.MetricNameLabel: "my_metric",
"a": "b",
},
input: `my_metric{a="b"} 1 2 3`,
expectedMetric: labels.FromStrings(labels.MetricName, "my_metric", "a", "b"),
expectedValues: newSeq(1, 2, 3),
}, {
input: `my_metric{a="b"} 1 2 3-10x4`,
expectedMetric: model.Metric{
model.MetricNameLabel: "my_metric",
"a": "b",
},
input: `my_metric{a="b"} 1 2 3-10x4`,
expectedMetric: labels.FromStrings(labels.MetricName, "my_metric", "a", "b"),
expectedValues: newSeq(1, 2, 3, -7, -17, -27, -37),
}, {
input: `my_metric{a="b"} 1 2 3-0x4`,
expectedMetric: model.Metric{
model.MetricNameLabel: "my_metric",
"a": "b",
},
input: `my_metric{a="b"} 1 2 3-0x4`,
expectedMetric: labels.FromStrings(labels.MetricName, "my_metric", "a", "b"),
expectedValues: newSeq(1, 2, 3, 3, 3, 3, 3),
}, {
input: `my_metric{a="b"} 1 3 _ 5 _x4`,
expectedMetric: model.Metric{
model.MetricNameLabel: "my_metric",
"a": "b",
},
input: `my_metric{a="b"} 1 3 _ 5 _x4`,
expectedMetric: labels.FromStrings(labels.MetricName, "my_metric", "a", "b"),
expectedValues: newSeq(1, 3, none, 5, none, none, none, none),
}, {
input: `my_metric{a="b"} 1 3 _ 5 _a4`,
@ -1884,6 +1865,9 @@ func TestParseSeries(t *testing.T) {
t.Fatalf("failure expected, but passed")
}
require.Equal(t, test.expectedMetric, metric)
require.Equal(t, test.expectedValues, vals)
if !reflect.DeepEqual(vals, test.expectedValues) || !reflect.DeepEqual(metric, test.expectedMetric) {
t.Errorf("error in input: \n\n%s\n", test.input)
t.Fatalf("no match\n\nexpected:\n%s %s\ngot: \n%s %s\n", test.expectedMetric, test.expectedValues, metric, vals)

View file

@ -146,7 +146,7 @@ func (node *AggregateExpr) String() string {
} else {
format = "%s BY (%s)"
}
aggrString = fmt.Sprintf(format, aggrString, node.Grouping)
aggrString = fmt.Sprintf(format, aggrString, strings.Join(node.Grouping, ", "))
}
if node.KeepCommonLabels {
aggrString += " KEEP_COMMON"
@ -164,9 +164,9 @@ func (node *BinaryExpr) String() string {
vm := node.VectorMatching
if vm != nil && (len(vm.MatchingLabels) > 0 || vm.On) {
if vm.On {
matching = fmt.Sprintf(" ON(%s)", vm.MatchingLabels)
matching = fmt.Sprintf(" ON(%s)", strings.Join(vm.MatchingLabels, ", "))
} else {
matching = fmt.Sprintf(" IGNORING(%s)", vm.MatchingLabels)
matching = fmt.Sprintf(" IGNORING(%s)", strings.Join(vm.MatchingLabels, ", "))
}
if vm.Card == CardManyToOne || vm.Card == CardOneToMany {
matching += " GROUP_"
@ -175,7 +175,7 @@ func (node *BinaryExpr) String() string {
} else {
matching += "RIGHT"
}
matching += fmt.Sprintf("(%s)", vm.Include)
matching += fmt.Sprintf("(%s)", strings.Join(vm.Include, ", "))
}
}
return fmt.Sprintf("%s %s%s%s %s", node.LHS, node.Op, returnBool, matching, node.RHS)

View file

@ -42,7 +42,7 @@ const (
epsilon = 0.000001 // Relative error allowed for sample values.
)
var testStartTime = time.Time{}
var testStartTime = time.Unix(0, 0)
// Test is a sequence of read and write commands that are run
// against a test storage.
@ -281,18 +281,13 @@ func (cmd *loadCmd) set(m labels.Labels, vals ...sequenceValue) {
// append the defined time series to the storage.
func (cmd *loadCmd) append(a storage.Appender) {
// TODO(fabxc): commented out until Appender refactoring.
// for fp, samples := range cmd.defs {
// met := cmd.metrics[fp]
// for _, smpl := range samples {
// s := &model.Sample{
// Metric: met,
// Value: smpl.Value,
// Timestamp: smpl.Timestamp,
// }
// a.Append(s)
// }
// }
for h, smpls := range cmd.defs {
m := cmd.metrics[h]
for _, s := range smpls {
a.Add(m, s.T, s.V)
}
}
}
// evalCmd is a command that evaluates an expression for the given time (range)
@ -381,6 +376,7 @@ func (ev *evalCmd) compareResult(result Value) error {
if !ev.instant {
return fmt.Errorf("received instant result on range evaluation")
}
seen := map[uint64]bool{}
for pos, v := range val {
fp := v.Metric.Hash()
@ -391,7 +387,7 @@ func (ev *evalCmd) compareResult(result Value) error {
if ev.ordered && exp.pos != pos+1 {
return fmt.Errorf("expected metric %s with %v at position %d but was at %d", v.Metric, exp.vals, exp.pos, pos+1)
}
if !almostEqual(float64(exp.vals[0].value), float64(v.V)) {
if !almostEqual(exp.vals[0].value, v.V) {
return fmt.Errorf("expected %v for %s but got %v", exp.vals[0].value, v.Metric, v.V)
}
@ -485,7 +481,7 @@ func (t *Test) exec(tc testCommand) error {
if cmd.fail {
return nil
}
return fmt.Errorf("error evaluating query: %s", res.Err)
return fmt.Errorf("error evaluating query %q: %s", cmd.expr, res.Err)
}
if res.Err == nil && cmd.fail {
return fmt.Errorf("expected error evaluating query but got none")
@ -514,8 +510,7 @@ func (t *Test) clear() {
}
t.storage = testutil.NewStorage(t)
// TODO(fabxc): add back
// t.queryEngine = NewEngine(t.storage, nil)
t.queryEngine = NewEngine(t.storage, nil)
t.context, t.cancelCtx = context.WithCancel(context.Background())
}

169
promql/value.go Normal file
View file

@ -0,0 +1,169 @@
package promql
import (
"fmt"
"strings"
"github.com/prometheus/prometheus/pkg/labels"
)
// Value is a generic interface for values resulting from a query evaluation.
type Value interface {
Type() ValueType
String() string
}
func (Matrix) Type() ValueType { return ValueTypeMatrix }
func (Vector) Type() ValueType { return ValueTypeVector }
func (Scalar) Type() ValueType { return ValueTypeScalar }
func (String) Type() ValueType { return ValueTypeString }
// ValueType describes a type of a value.
type ValueType string
// The valid value types.
const (
ValueTypeNone = "none"
ValueTypeVector = "vector"
ValueTypeScalar = "scalar"
ValueTypeMatrix = "matrix"
ValueTypeString = "string"
)
// String represents a string value.
type String struct {
V string
T int64
}
func (s String) String() string {
return s.V
}
// Scalar is a data point that's explicitly not associated with a metric.
type Scalar struct {
T int64
V float64
}
func (s Scalar) String() string {
return fmt.Sprintf("scalar: %v @[%v]", s.V, s.T)
}
// Series is a stream of data points belonging to a metric.
type Series struct {
Metric labels.Labels
Points []Point
}
func (s Series) String() string {
vals := make([]string, len(s.Points))
for i, v := range s.Points {
vals[i] = v.String()
}
return fmt.Sprintf("%s =>\n%s", s.Metric, strings.Join(vals, "\n"))
}
// Point represents a single data point for a given timestamp.
type Point struct {
T int64
V float64
}
func (p Point) String() string {
return fmt.Sprintf("%f @[%d]", p.V, p.T)
}
// Sample is a single sample belonging to a metric.
type Sample struct {
Point
Metric labels.Labels
}
func (s Sample) String() string {
return fmt.Sprintf("%s => %s", s.Metric, s.Point)
}
// Vector is basically only an alias for model.Samples, but the
// contract is that in a Vector, all Samples have the same timestamp.
type Vector []Sample
func (vec Vector) String() string {
entries := make([]string, len(vec))
for i, s := range vec {
entries[i] = s.String()
}
return strings.Join(entries, "\n")
}
// Matrix is a slice of Seriess that implements sort.Interface and
// has a String method.
type Matrix []Series
func (m Matrix) String() string {
// TODO(fabxc): sort, or can we rely on order from the querier?
strs := make([]string, len(m))
for i, ss := range m {
strs[i] = ss.String()
}
return strings.Join(strs, "\n")
}
// Result holds the resulting value of an execution or an error
// if any occurred.
type Result struct {
Err error
Value Value
}
// Vector returns a Vector if the result value is one. An error is returned if
// the result was an error or the result value is not a Vector.
func (r *Result) Vector() (Vector, error) {
if r.Err != nil {
return nil, r.Err
}
v, ok := r.Value.(Vector)
if !ok {
return nil, fmt.Errorf("query result is not a Vector")
}
return v, nil
}
// Matrix returns a Matrix. An error is returned if
// the result was an error or the result value is not a Matrix.
func (r *Result) Matrix() (Matrix, error) {
if r.Err != nil {
return nil, r.Err
}
v, ok := r.Value.(Matrix)
if !ok {
return nil, fmt.Errorf("query result is not a range Vector")
}
return v, nil
}
// Scalar returns a Scalar value. An error is returned if
// the result was an error or the result value is not a Scalar.
func (r *Result) Scalar() (Scalar, error) {
if r.Err != nil {
return Scalar{}, r.Err
}
v, ok := r.Value.(Scalar)
if !ok {
return Scalar{}, fmt.Errorf("query result is not a Scalar")
}
return v, nil
}
func (r *Result) String() string {
if r.Err != nil {
return r.Err.Error()
}
if r.Value == nil {
return ""
}
return r.Value.String()
}

View file

@ -24,6 +24,7 @@ func Open(path string) (storage.Storage, error) {
}
func (a adapter) Querier(mint, maxt int64) (storage.Querier, error) {
// fmt.Println("new querier at", timestamp.Time(mint), timestamp.Time(maxt), maxt-mint)
return querier{q: a.db.Querier(mint, maxt)}, nil
}
@ -48,9 +49,7 @@ func (q querier) Select(oms ...*labels.Matcher) storage.SeriesSet {
ms = append(ms, convertMatcher(om))
}
set := q.q.Select(ms...)
return seriesSet{set: set}
return seriesSet{set: q.q.Select(ms...)}
}
func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) }
@ -75,8 +74,11 @@ type appender struct {
a tsdb.Appender
}
func (a appender) Add(lset labels.Labels, t int64, v float64) { a.a.Add(toTSDBLabels(lset), t, v) }
func (a appender) Commit() error { return a.a.Commit() }
func (a appender) Add(lset labels.Labels, t int64, v float64) {
// fmt.Println("add", lset, timestamp.Time(t), v)
a.a.Add(toTSDBLabels(lset), t, v)
}
func (a appender) Commit() error { return a.a.Commit() }
func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher {
switch m.Type {

View file

@ -4,6 +4,7 @@ import (
"io/ioutil"
"os"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/tsdb"
)
@ -15,6 +16,9 @@ func NewStorage(t T) storage.Storage {
if err != nil {
t.Fatalf("Opening test dir failed: %s", err)
}
log.With("dir", dir).Debugln("opening test storage")
db, err := tsdb.Open(dir)
if err != nil {
t.Fatalf("Opening test storage failed: %s", err)
@ -28,6 +32,8 @@ type testStorage struct {
}
func (s testStorage) Close() error {
log.With("dir", s.dir).Debugln("closing test storage")
if err := s.Storage.Close(); err != nil {
return err
}