promql: re-use one heap for topk and bottomk

Slightly ugly casting saves memory.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-03-09 10:51:16 +00:00
parent 5e3914a27c
commit cfbeb6681b

View file

@ -2735,7 +2735,6 @@ type groupedAggregation struct {
floatMean float64 floatMean float64
groupCount int groupCount int
heap vectorByValueHeap heap vectorByValueHeap
reverseHeap vectorByReverseValueHeap
} }
// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. // aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix.
@ -2937,16 +2936,10 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma
group := orderedResult[seriesToResult[si]] group := orderedResult[seriesToResult[si]]
// Initialize this group if it's the first time we've seen it. // Initialize this group if it's the first time we've seen it.
if !seen[seriesToResult[si]] { if !seen[seriesToResult[si]] {
*group = groupedAggregation{} *group = groupedAggregation{
heap: make(vectorByValueHeap, 1, k),
switch op {
case parser.TOPK:
group.heap = make(vectorByValueHeap, 1, k)
group.heap[0] = s
case parser.BOTTOMK:
group.reverseHeap = make(vectorByReverseValueHeap, 1, k)
group.reverseHeap[0] = s
} }
group.heap[0] = s
seen[seriesToResult[si]] = true seen[seriesToResult[si]] = true
continue continue
} }
@ -2968,13 +2961,13 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma
case parser.BOTTOMK: case parser.BOTTOMK:
// We build a heap of up to k elements, with the biggest element at heap[0]. // We build a heap of up to k elements, with the biggest element at heap[0].
switch { switch {
case len(group.reverseHeap) < k: case len(group.heap) < k:
heap.Push(&group.reverseHeap, &s) heap.Push((*vectorByReverseValueHeap)(&group.heap), &s)
case group.reverseHeap[0].F > s.F || (math.IsNaN(group.reverseHeap[0].F) && !math.IsNaN(s.F)): case group.heap[0].F > s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)):
// This new element is smaller than the previous biggest element - overwrite that. // This new element is smaller than the previous biggest element - overwrite that.
group.reverseHeap[0] = s group.heap[0] = s
if k > 1 { if k > 1 {
heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant. heap.Fix((*vectorByReverseValueHeap)(&group.heap), 0) // Maintain the heap invariant.
} }
} }
@ -3021,10 +3014,10 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma
case parser.BOTTOMK: case parser.BOTTOMK:
// The heap keeps the highest value on top, so reverse it. // The heap keeps the highest value on top, so reverse it.
if len(aggr.reverseHeap) > 1 { if len(aggr.heap) > 1 {
sort.Sort(sort.Reverse(aggr.reverseHeap)) sort.Sort(sort.Reverse((*vectorByReverseValueHeap)(&aggr.heap)))
} }
for _, v := range aggr.reverseHeap { for _, v := range aggr.heap {
add(v.Metric, v.F) add(v.Metric, v.F)
} }
} }