fix topk/bottomk with numbers greater than int maxsize on 32-bit

Signed-off-by: Joel Beckmeyer <joel@beckmeyer.us>
This commit is contained in:
Joel Beckmeyer 2024-12-16 10:42:56 -05:00
parent c8c128b0f1
commit 41dabfb464

View file

@ -1352,7 +1352,7 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate
} }
groups := make([]groupedAggregation, groupCount) groups := make([]groupedAggregation, groupCount)
var k int var k int64
var ratio float64 var ratio float64
var seriess map[uint64]Series var seriess map[uint64]Series
switch aggExpr.Op { switch aggExpr.Op {
@ -1360,9 +1360,9 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate
if !convertibleToInt64(param) { if !convertibleToInt64(param) {
ev.errorf("Scalar value %v overflows int64", param) ev.errorf("Scalar value %v overflows int64", param)
} }
k = int(param) k = int64(param)
if k > len(inputMatrix) { if k > int64(len(inputMatrix)) {
k = len(inputMatrix) k = int64(len(inputMatrix))
} }
if k < 1 { if k < 1 {
return nil, warnings return nil, warnings
@ -3172,7 +3172,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
// seriesToResult maps inputMatrix indexes to groups indexes. // seriesToResult maps inputMatrix indexes to groups indexes.
// For an instant query, returns a Matrix in descending order for topk or ascending for bottomk, or without any order for limitk / limit_ratio. // For an instant query, returns a Matrix in descending order for topk or ascending for bottomk, or without any order for limitk / limit_ratio.
// For a range query, aggregates output in the seriess map. // For a range query, aggregates output in the seriess map.
func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, r float64, inputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int64, r float64, inputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) {
op := e.Op op := e.Op
var s Sample var s Sample
var annos annotations.Annotations var annos annotations.Annotations
@ -3243,7 +3243,7 @@ seriesLoop:
case s.H != nil: case s.H != nil:
// Ignore histogram sample and add info annotation. // Ignore histogram sample and add info annotation.
annos.Add(annotations.NewHistogramIgnoredInAggregationInfo("topk", e.PosRange)) annos.Add(annotations.NewHistogramIgnoredInAggregationInfo("topk", e.PosRange))
case len(group.heap) < k: case int64(len(group.heap)) < k:
heap.Push(&group.heap, &s) heap.Push(&group.heap, &s)
case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)): case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)):
// This new element is bigger than the previous smallest element - overwrite that. // This new element is bigger than the previous smallest element - overwrite that.
@ -3259,7 +3259,7 @@ seriesLoop:
case s.H != nil: case s.H != nil:
// Ignore histogram sample and add info annotation. // Ignore histogram sample and add info annotation.
annos.Add(annotations.NewHistogramIgnoredInAggregationInfo("bottomk", e.PosRange)) annos.Add(annotations.NewHistogramIgnoredInAggregationInfo("bottomk", e.PosRange))
case len(group.heap) < k: case int64(len(group.heap)) < k:
heap.Push((*vectorByReverseValueHeap)(&group.heap), &s) heap.Push((*vectorByReverseValueHeap)(&group.heap), &s)
case group.heap[0].F > s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)): case group.heap[0].F > s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)):
// This new element is smaller than the previous biggest element - overwrite that. // This new element is smaller than the previous biggest element - overwrite that.
@ -3270,13 +3270,13 @@ seriesLoop:
} }
case parser.LIMITK: case parser.LIMITK:
if len(group.heap) < k { if int64(len(group.heap)) < k {
heap.Push(&group.heap, &s) heap.Push(&group.heap, &s)
} }
// LIMITK optimization: early break if we've added K elem to _every_ group, // LIMITK optimization: early break if we've added K elem to _every_ group,
// especially useful for large timeseries where the user is exploring labels via e.g. // especially useful for large timeseries where the user is exploring labels via e.g.
// limitk(10, my_metric) // limitk(10, my_metric)
if !group.groupAggrComplete && len(group.heap) == k { if !group.groupAggrComplete && int64(len(group.heap)) == k {
group.groupAggrComplete = true group.groupAggrComplete = true
groupsRemaining-- groupsRemaining--
if groupsRemaining == 0 { if groupsRemaining == 0 {