promql engine: check unique labels using existing map

`ContainsSameLabelset` constructs a map with the same hash key as
the one used to compile the output of `rangeEval`, so we can use that
one and save work.

Need to hold the timestamp so we can be sure we saw the same series
in the same evaluation.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2023-08-13 18:09:10 +01:00
parent 657da2eb98
commit 0670e4771a

View file

@ -1143,7 +1143,11 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
}
}
enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)}
seriess := make(map[uint64]Series, biggestLen) // Output series by series hash.
type seriesAndTimestamp struct {
Series
ts int64
}
seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash.
tempNumSamples := ev.currentSamples
var (
@ -1228,9 +1232,6 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
// Make the function call.
enh.Ts = ts
result, ws := funcCall(args, bufHelpers, enh)
if result.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
}
enh.Out = result[:0] // Reuse result vector.
warnings = append(warnings, ws...)
@ -1247,6 +1248,9 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
if result.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
}
mat := make(Matrix, len(result))
for i, s := range result {
if s.H == nil {
@ -1264,8 +1268,13 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
for _, sample := range result {
h := sample.Metric.Hash()
ss, ok := seriess[h]
if !ok {
ss = Series{Metric: sample.Metric}
if ok {
if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate.
ev.errorf("vector cannot contain metrics with the same labelset")
}
ss.ts = ts
} else {
ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts}
}
if sample.H == nil {
if ss.Floats == nil {
@ -1292,7 +1301,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
mat := make(Matrix, 0, len(seriess))
for _, ss := range seriess {
mat = append(mat, ss)
mat = append(mat, ss.Series)
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)