promql: Add pooling to make polymorphism less costly

Unfortunately, this doesn't make the dent I was hoping for.

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2021-11-23 16:03:32 +01:00
parent d57b3e3f73
commit e7aed6eb44
2 changed files with 57 additions and 26 deletions

View file

@ -1107,7 +1107,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
Points: getPointSlice(numSteps),
}
}
ss.Points = append(ss.Points, sample.Point.SetTimestamp(ts))
ss.Points = append(ss.Points, sample.Point.Copy().SetTimestamp(ts))
seriess[h] = ss
}
@ -1384,7 +1384,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
newp := make([]Point, 0, steps-len(found))
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
if _, ok := found[ts]; !ok {
newp = append(newp, &FloatPoint{T: ts, V: 1})
newp = append(newp, getFloatPoint(ts, 1))
}
}
@ -1426,7 +1426,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar:
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
val := scalarBinop(e.Op, v[0].(Vector)[0].Point.(*FloatPoint).V, v[1].(Vector)[0].Point.(*FloatPoint).V)
return append(enh.Out, Sample{Point: &FloatPoint{V: val}}), nil
return append(enh.Out, Sample{Point: getFloatPoint(0, val)}), nil
}, e.LHS, e.RHS)
case lt == parser.ValueTypeVector && rt == parser.ValueTypeVector:
// Function to compute the join signature for each series.
@ -1467,7 +1467,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
case *parser.NumberLiteral:
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return append(enh.Out, Sample{Point: &FloatPoint{V: e.Val}}), nil
return append(enh.Out, Sample{Point: getFloatPoint(0, e.Val)}), nil
})
case *parser.StringLiteral:
@ -1493,9 +1493,9 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
if ev.currentSamples < ev.maxSamples {
var p Point
if h == nil {
p = &FloatPoint{ts, v}
p = getFloatPoint(ts, v)
} else {
p = &HistogramPoint{ts, h}
p = getHistogramPoint(ts, h)
}
ss.Points = append(ss.Points, p)
ev.currentSamples++
@ -1621,9 +1621,9 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect
if ok {
var p Point
if h == nil {
p = &FloatPoint{t, v}
p = getFloatPoint(t, v)
} else {
p = &HistogramPoint{t, h}
p = getHistogramPoint(t, h)
}
vec = append(vec, Sample{
Metric: node.Series[i].Labels(),
@ -1673,19 +1673,49 @@ func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, no
return t, v, h, true
}
var pointPool = sync.Pool{}
var pointSlicePool, floatPointPool, histogramPointPool sync.Pool
func getFloatPoint(t int64, v float64) *FloatPoint {
p := floatPointPool.Get()
if p != nil {
fp := p.(*FloatPoint)
fp.T = t
fp.V = v
return fp
}
return &FloatPoint{T: t, V: v}
}
func getHistogramPoint(t int64, v *histogram.Histogram) *HistogramPoint {
p := histogramPointPool.Get()
if p != nil {
fp := p.(*HistogramPoint)
fp.T = t
fp.V = v
return fp
}
return &HistogramPoint{T: t, V: v}
}
func getPointSlice(sz int) []Point {
p := pointPool.Get()
p := pointSlicePool.Get()
if p != nil {
return p.([]Point)
}
return make([]Point, 0, sz)
}
func putPointSlice(p []Point) {
func putPointSlice(points []Point) {
for _, p := range points {
switch p.(type) {
case *FloatPoint:
floatPointPool.Put(p)
case *HistogramPoint:
histogramPointPool.Put(p)
}
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
pointPool.Put(p[:0])
pointSlicePool.Put(points[:0])
}
// matrixSelector evaluates a *parser.MatrixSelector expression.
@ -1774,7 +1804,7 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
ev.error(ErrTooManySamples(env))
}
ev.currentSamples++
out = append(out, &HistogramPoint{T: t, V: h})
out = append(out, getHistogramPoint(t, h))
}
} else {
t, v := buf.At()
@ -1787,7 +1817,7 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
ev.error(ErrTooManySamples(env))
}
ev.currentSamples++
out = append(out, &FloatPoint{T: t, V: v})
out = append(out, getFloatPoint(t, v))
}
}
}
@ -1799,7 +1829,7 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
out = append(out, &HistogramPoint{T: t, V: h})
out = append(out, getHistogramPoint(t, h))
ev.currentSamples++
}
} else {
@ -1808,7 +1838,7 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
out = append(out, &FloatPoint{T: t, V: v})
out = append(out, getFloatPoint(t, v))
ev.currentSamples++
}
}
@ -2001,7 +2031,7 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *
enh.Out = append(enh.Out, Sample{
Metric: metric,
Point: &FloatPoint{V: value},
Point: getFloatPoint(0, value),
})
}
return enh.Out
@ -2289,13 +2319,13 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
case parser.TOPK, parser.QUANTILE:
result[groupingKey].heap = make(vectorByValueHeap, 1, resultSize)
result[groupingKey].heap[0] = Sample{
Point: &FloatPoint{V: value},
Point: getFloatPoint(0, value),
Metric: s.Metric,
}
case parser.BOTTOMK:
result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 1, resultSize)
result[groupingKey].reverseHeap[0] = Sample{
Point: &FloatPoint{V: value},
Point: getFloatPoint(0, value),
Metric: s.Metric,
}
case parser.GROUP:
@ -2358,7 +2388,7 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
if int64(len(group.heap)) == k {
if k == 1 { // For k==1 we can replace in-situ.
group.heap[0] = Sample{
Point: &FloatPoint{V: value},
Point: getFloatPoint(0, value),
Metric: s.Metric,
}
break
@ -2366,7 +2396,7 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
heap.Pop(&group.heap)
}
heap.Push(&group.heap, &Sample{
Point: &FloatPoint{V: value},
Point: getFloatPoint(0, value),
Metric: s.Metric,
})
}
@ -2377,7 +2407,7 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
if int64(len(group.reverseHeap)) == k {
if k == 1 { // For k==1 we can replace in-situ.
group.reverseHeap[0] = Sample{
Point: &FloatPoint{V: value},
Point: getFloatPoint(0, value),
Metric: s.Metric,
}
break
@ -2385,7 +2415,7 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
heap.Pop(&group.reverseHeap)
}
heap.Push(&group.reverseHeap, &Sample{
Point: &FloatPoint{V: value},
Point: getFloatPoint(0, value),
Metric: s.Metric,
})
}
@ -2422,7 +2452,7 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
enh.Out = append(enh.Out, Sample{
Metric: v.Metric,
// TODO(beorn7): Handle histogram gracefully.
Point: &FloatPoint{V: v.Point.(*FloatPoint).V},
Point: getFloatPoint(0, v.Point.(*FloatPoint).V),
})
}
continue // Bypass default append.
@ -2436,7 +2466,7 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
enh.Out = append(enh.Out, Sample{
Metric: v.Metric,
// TODO(beorn7): Handle histogram gracefully.
Point: &FloatPoint{V: v.Point.(*FloatPoint).V},
Point: getFloatPoint(0, v.Point.(*FloatPoint).V),
})
}
continue // Bypass default append.
@ -2450,7 +2480,7 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
enh.Out = append(enh.Out, Sample{
Metric: aggr.labels,
Point: &FloatPoint{V: aggr.value},
Point: getFloatPoint(0, aggr.value),
})
}
return enh.Out

View file

@ -143,6 +143,7 @@ func (p HistogramPoint) String() string { return fmt.Sprintf("%v @[%v]", p.
func (p GaugeHistogramPoint) String() string { return "implemente me" } // TODO(beorn7): Implement
// MarshalJSON implements json.Marshaler.
// TODO(beorn7): Weirdly, the API uses its own marshaling, overriding this one. Shouldn't we have just one?
func (p FloatPoint) MarshalJSON() ([]byte, error) {
v := strconv.FormatFloat(p.V, 'f', -1, 64)
return json.Marshal([...]interface{}{float64(p.T) / 1000, v})