From f2fd85df823feba4dcff666d62716a6038c0f33c Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 28 Mar 2023 10:42:15 +0000 Subject: [PATCH] promql: use faster heap method for topk/bottomk Call `Fix()` instead of `Pop()` followed by `Push()`. This is slightly faster. Signed-off-by: Bryan Boreham --- promql/engine.go | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index ddfb26b13..ab3465707 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2508,39 +2508,39 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without group.value += delta * (s.V - group.mean) case parser.TOPK: - if int64(len(group.heap)) < k || group.heap[0].V < s.V || math.IsNaN(group.heap[0].V) { - if int64(len(group.heap)) == k { - if k == 1 { // For k==1 we can replace in-situ. - group.heap[0] = Sample{ - Point: Point{V: s.V}, - Metric: s.Metric, - } - break - } - heap.Pop(&group.heap) - } + // We build a heap of up to k elements, with the smallest element at heap[0]. + if int64(len(group.heap)) < k { heap.Push(&group.heap, &Sample{ Point: Point{V: s.V}, Metric: s.Metric, }) + } else if group.heap[0].V < s.V || (math.IsNaN(group.heap[0].V) && !math.IsNaN(s.V)) { + // This new element is bigger than the previous smallest element - overwrite that. + group.heap[0] = Sample{ + Point: Point{V: s.V}, + Metric: s.Metric, + } + if k > 1 { + heap.Fix(&group.heap, 0) // Maintain the heap invariant. + } } case parser.BOTTOMK: - if int64(len(group.reverseHeap)) < k || group.reverseHeap[0].V > s.V || math.IsNaN(group.reverseHeap[0].V) { - if int64(len(group.reverseHeap)) == k { - if k == 1 { // For k==1 we can replace in-situ. - group.reverseHeap[0] = Sample{ - Point: Point{V: s.V}, - Metric: s.Metric, - } - break - } - heap.Pop(&group.reverseHeap) - } + // We build a heap of up to k elements, with the biggest element at heap[0]. + if int64(len(group.reverseHeap)) < k { heap.Push(&group.reverseHeap, &Sample{ Point: Point{V: s.V}, Metric: s.Metric, }) + } else if group.reverseHeap[0].V > s.V || (math.IsNaN(group.reverseHeap[0].V) && !math.IsNaN(s.V)) { + // This new element is smaller than the previous biggest element - overwrite that. + group.reverseHeap[0] = Sample{ + Point: Point{V: s.V}, + Metric: s.Metric, + } + if k > 1 { + heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant. + } } case parser.QUANTILE: