Extract OR operation into own eval method.

This commit is contained in:
Fabian Reinartz 2015-05-16 14:00:11 +02:00
parent 2c3e9e2e87
commit 8a109e061b

View file

@ -595,6 +595,9 @@ func (ev *evaluator) eval(expr Expr) Value {
if e.Op == itemLAND {
return ev.vectorAnd(lhs.(Vector), rhs.(Vector), e.VectorMatching)
}
if e.Op == itemLOR {
return ev.vectorOr(lhs.(Vector), rhs.(Vector), e.VectorMatching)
}
return ev.vectorBinop(e.Op, lhs.(Vector), rhs.(Vector), e.VectorMatching)
case lt == ExprVector && rt == ExprScalar:
@ -733,6 +736,37 @@ func (ev *evaluator) vectorAnd(lhs, rhs Vector, matching *VectorMatching) Vector
return result
}
func (ev *evaluator) vectorOr(lhs, rhs Vector, matching *VectorMatching) Vector {
if matching.Card != CardManyToMany {
panic("logical operations must always be many-to-many matching")
}
// If no matching labels are specified, match by all labels.
signature := func(m clientmodel.COWMetric) uint64 {
return clientmodel.SignatureForLabels(m.Metric, matching.On)
}
if len(matching.On) == 0 {
signature = func(m clientmodel.COWMetric) uint64 {
m.Delete(clientmodel.MetricNameLabel)
return uint64(m.Metric.Fingerprint())
}
}
var result Vector
leftSigs := map[uint64]struct{}{}
// Add everything from the left-hand-side vector.
for _, ls := range lhs {
leftSigs[signature(ls.Metric)] = struct{}{}
result = append(result, ls)
}
// Add all right-hand side elements which have not been added from the left-hand side.
for _, rs := range rhs {
if _, ok := leftSigs[signature(rs.Metric)]; !ok {
result = append(result, rs)
}
}
return result
}
// vectorBinop evaluates a binary operation between two vector values.
func (ev *evaluator) vectorBinop(op itemType, lhs, rhs Vector, matching *VectorMatching) Vector {
result := make(Vector, 0, len(rhs))
@ -755,7 +789,7 @@ func (ev *evaluator) vectorBinop(op itemType, lhs, rhs Vector, matching *VectorM
// The rhs is guaranteed to be the 'one' side. Having multiple samples
// with the same hash means that the matching is many-to-many,
// which is not supported.
if _, found := rm[hash]; matching.Card != CardManyToMany && found {
if _, found := rm[hash]; found {
// Many-to-many matching not allowed.
ev.errorf("many-to-many matching not allowed")
}
@ -768,13 +802,6 @@ func (ev *evaluator) vectorBinop(op itemType, lhs, rhs Vector, matching *VectorM
// the binary operation.
for _, ls := range lhs {
hash := hashForMetric(ls.Metric.Metric, matching.On)
// Any lhs sample we encounter in an OR operation belongs to the result.
if op == itemLOR {
ls.Metric = resultMetric(op, ls, nil, matching)
result = append(result, ls)
added[hash] = nil // Ensure matching rhs sample is not added later.
continue
}
rs, found := rm[hash] // Look for a match in the rhs vector.
if !found {
@ -820,16 +847,6 @@ func (ev *evaluator) vectorBinop(op itemType, lhs, rhs Vector, matching *VectorM
}
}
// Add all remaining samples in the rhs in an OR operation if they
// have not been matched up with a lhs sample.
if op == itemLOR {
for hash, rs := range rm {
if _, exists := added[hash]; !exists {
rs.Metric = resultMetric(op, rs, nil, matching)
result = append(result, rs)
}
}
}
return result
}