diff --git a/promql/bench_test.go b/promql/bench_test.go index 4353d25bb..a1501a3cb 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -175,6 +175,10 @@ func BenchmarkRangeQuery(b *testing.B) { { expr: "histogram_quantile(0.9, rate(h_X[5m]))", }, + // Many-to-one join. + { + expr: "a_X + on(l) group_right a_one", + }, } // X in an expr will be replaced by different metric sizes. diff --git a/promql/engine.go b/promql/engine.go index bd387f2c5..8379bf48e 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -913,6 +913,8 @@ func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws storage.Warnings type EvalSeriesHelper struct { // The grouping key used by aggregation. groupingKey uint64 + // Used to map left-hand to right-hand in binary operations. + signature string } // EvalNodeHelper stores extra information and caches for evaluating a single node across steps. @@ -925,8 +927,6 @@ type EvalNodeHelper struct { // Caches. // DropMetricName and label_*. Dmn map[uint64]labels.Labels - // signatureFunc. - sigf map[string]string // funcHistogramQuantile. signatureToMetricWithBuckets map[string]*metricWithBuckets // label_replace. @@ -957,23 +957,6 @@ func (enh *EvalNodeHelper) DropMetricName(l labels.Labels) labels.Labels { return ret } -func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.Labels) string { - if enh.sigf == nil { - enh.sigf = make(map[string]string, len(enh.Out)) - } - f := signatureFunc(on, enh.lblBuf, names...) - return func(l labels.Labels) string { - enh.lblBuf = l.Bytes(enh.lblBuf) - ret, ok := enh.sigf[string(enh.lblBuf)] - if ok { - return ret - } - ret = f(l) - enh.sigf[string(enh.lblBuf)] = ret - return ret - } -} - // rangeEval evaluates the given expressions, and then for each step calls // the given funcCall with the values computed for each expression at that // step. The return value is the combination into time series of all the @@ -1432,22 +1415,28 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { return append(enh.Out, Sample{Point: Point{V: val}}), nil }, e.LHS, e.RHS) case lt == parser.ValueTypeVector && rt == parser.ValueTypeVector: + // Function to compute the join signature for each series. + buf := make([]byte, 0, 1024) + sigf := signatureFunc(e.VectorMatching.On, buf, e.VectorMatching.MatchingLabels...) + initSignatures := func(series labels.Labels, h *EvalSeriesHelper) { + h.signature = sigf(series) + } switch e.Op { case parser.LAND: - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { - return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil + return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil }, e.LHS, e.RHS) case parser.LOR: - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { - return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil + return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil }, e.LHS, e.RHS) case parser.LUNLESS: - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { - return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil + return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil }, e.LHS, e.RHS) default: - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { - return ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, enh), nil + return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, sh[0], sh[1], enh), nil }, e.LHS, e.RHS) } @@ -1774,62 +1763,59 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m return out } -func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching, enh *EvalNodeHelper) Vector { +func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector { if matching.Card != parser.CardManyToMany { panic("set operations must only use many-to-many matching") } - sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) // The set of signatures for the right-hand side Vector. rightSigs := map[string]struct{}{} // Add all rhs samples to a map so we can easily find matches later. - for _, rs := range rhs { - rightSigs[sigf(rs.Metric)] = struct{}{} + for _, sh := range rhsh { + rightSigs[sh.signature] = struct{}{} } - for _, ls := range lhs { + for i, ls := range lhs { // If there's a matching entry in the right-hand side Vector, add the sample. - if _, ok := rightSigs[sigf(ls.Metric)]; ok { + if _, ok := rightSigs[lhsh[i].signature]; ok { enh.Out = append(enh.Out, ls) } } return enh.Out } -func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *parser.VectorMatching, enh *EvalNodeHelper) Vector { +func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector { if matching.Card != parser.CardManyToMany { panic("set operations must only use many-to-many matching") } - sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) leftSigs := map[string]struct{}{} // Add everything from the left-hand-side Vector. - for _, ls := range lhs { - leftSigs[sigf(ls.Metric)] = struct{}{} + for i, ls := range lhs { + leftSigs[lhsh[i].signature] = struct{}{} enh.Out = append(enh.Out, ls) } // Add all right-hand side elements which have not been added from the left-hand side. - for _, rs := range rhs { - if _, ok := leftSigs[sigf(rs.Metric)]; !ok { + for j, rs := range rhs { + if _, ok := leftSigs[rhsh[j].signature]; !ok { enh.Out = append(enh.Out, rs) } } return enh.Out } -func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *parser.VectorMatching, enh *EvalNodeHelper) Vector { +func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector { if matching.Card != parser.CardManyToMany { panic("set operations must only use many-to-many matching") } - sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) rightSigs := map[string]struct{}{} - for _, rs := range rhs { - rightSigs[sigf(rs.Metric)] = struct{}{} + for _, sh := range rhsh { + rightSigs[sh.signature] = struct{}{} } - for _, ls := range lhs { - if _, ok := rightSigs[sigf(ls.Metric)]; !ok { + for i, ls := range lhs { + if _, ok := rightSigs[lhsh[i].signature]; !ok { enh.Out = append(enh.Out, ls) } } @@ -1837,17 +1823,17 @@ func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *parser.VectorMatchi } // VectorBinop evaluates a binary operation between two Vectors, excluding set operators. -func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *parser.VectorMatching, returnBool bool, enh *EvalNodeHelper) Vector { +func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *parser.VectorMatching, returnBool bool, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector { if matching.Card == parser.CardManyToMany { panic("many-to-many only allowed for set operators") } - sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) // The control flow below handles one-to-one or many-to-one matching. // For one-to-many, swap sidedness and account for the swap when calculating // values. if matching.Card == parser.CardOneToMany { lhs, rhs = rhs, lhs + lhsh, rhsh = rhsh, lhsh } // All samples from the rhs hashed by the matching label/values. @@ -1861,8 +1847,8 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching * rightSigs := enh.rightSigs // Add all rhs samples to a map so we can easily find matches later. - for _, rs := range rhs { - sig := sigf(rs.Metric) + for i, rs := range rhs { + sig := rhsh[i].signature // The rhs is guaranteed to be the 'one' side. Having multiple samples // with the same signature means that the matching is many-to-many. if duplSample, found := rightSigs[sig]; found { @@ -1892,8 +1878,8 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching * // For all lhs samples find a respective rhs sample and perform // the binary operation. - for _, ls := range lhs { - sig := sigf(ls.Metric) + for i, ls := range lhs { + sig := lhsh[i].signature rs, found := rightSigs[sig] // Look for a match in the rhs Vector. if !found {