Merge pull request #12682 from bboreham/contains-same-label-set

promql engine: check unique labels using existing map

ContainsSameLabelset constructs a map with the same hash key as the one used to compile the output of rangeEval, so we can use that one and save work.

Need to hold the timestamp so we can be sure we saw the same series in the same evaluation.
This commit is contained in:
Bryan Boreham 2023-08-14 14:12:47 +01:00 committed by GitHub
commit 5cea37c069
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1143,7 +1143,11 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
} }
} }
enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)} enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)}
seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. type seriesAndTimestamp struct {
Series
ts int64
}
seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash.
tempNumSamples := ev.currentSamples tempNumSamples := ev.currentSamples
var ( var (
@ -1228,9 +1232,6 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
// Make the function call. // Make the function call.
enh.Ts = ts enh.Ts = ts
result, ws := funcCall(args, bufHelpers, enh) result, ws := funcCall(args, bufHelpers, enh)
if result.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
}
enh.Out = result[:0] // Reuse result vector. enh.Out = result[:0] // Reuse result vector.
warnings = append(warnings, ws...) warnings = append(warnings, ws...)
@ -1247,6 +1248,9 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
// If this could be an instant query, shortcut so as not to change sort order. // If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp { if ev.endTimestamp == ev.startTimestamp {
if result.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
}
mat := make(Matrix, len(result)) mat := make(Matrix, len(result))
for i, s := range result { for i, s := range result {
if s.H == nil { if s.H == nil {
@ -1264,8 +1268,13 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
for _, sample := range result { for _, sample := range result {
h := sample.Metric.Hash() h := sample.Metric.Hash()
ss, ok := seriess[h] ss, ok := seriess[h]
if !ok { if ok {
ss = Series{Metric: sample.Metric} if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate.
ev.errorf("vector cannot contain metrics with the same labelset")
}
ss.ts = ts
} else {
ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts}
} }
if sample.H == nil { if sample.H == nil {
if ss.Floats == nil { if ss.Floats == nil {
@ -1292,7 +1301,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
// Assemble the output matrix. By the time we get here we know we don't have too many samples. // Assemble the output matrix. By the time we get here we know we don't have too many samples.
mat := make(Matrix, 0, len(seriess)) mat := make(Matrix, 0, len(seriess))
for _, ss := range seriess { for _, ss := range seriess {
mat = append(mat, ss) mat = append(mat, ss.Series)
} }
ev.currentSamples = originalNumSamples + mat.TotalSamples() ev.currentSamples = originalNumSamples + mat.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples) ev.samplesStats.UpdatePeak(ev.currentSamples)