promql: Use buffer pool for matrix allocations

This commit is contained in:
Fabian Reinartz 2017-03-14 10:57:34 +01:00
parent b09b90a940
commit 0ecd205794
3 changed files with 86 additions and 7 deletions

View file

@ -20,6 +20,7 @@ import (
"runtime" "runtime"
"sort" "sort"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -524,6 +525,14 @@ type evaluator struct {
ctx context.Context ctx context.Context
Timestamp int64 // time in milliseconds Timestamp int64 // time in milliseconds
finalizers []func()
}
func (ev *evaluator) close() {
for _, f := range ev.finalizers {
f()
}
} }
// fatalf causes a panic with the input formatted into an error. // fatalf causes a panic with the input formatted into an error.
@ -621,6 +630,7 @@ func (ev *evaluator) evalOneOf(e Expr, t1, t2 ValueType) Value {
func (ev *evaluator) Eval(expr Expr) (v Value, err error) { func (ev *evaluator) Eval(expr Expr) (v Value, err error) {
defer ev.recover(&err) defer ev.recover(&err)
defer ev.close()
return ev.eval(expr), nil return ev.eval(expr), nil
} }
@ -732,19 +742,55 @@ func (ev *evaluator) vectorSelector(node *VectorSelector) Vector {
return vec return vec
} }
var pointPool = sync.Pool{}
func getPointSlice(sz int) []Point {
p := pointPool.Get()
if p != nil {
return p.([]Point)
}
return make([]Point, 0, sz)
}
func putPointSlice(p []Point) {
pointPool.Put(p[:0])
}
var matrixPool = sync.Pool{}
func getMatrix(sz int) Matrix {
m := matrixPool.Get()
if m != nil {
return m.(Matrix)
}
return make(Matrix, 0, sz)
}
func putMatrix(m Matrix) {
matrixPool.Put(m[:0])
}
// matrixSelector evaluates a *MatrixSelector expression. // matrixSelector evaluates a *MatrixSelector expression.
func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
var ( var (
offset = durationMilliseconds(node.Offset) offset = durationMilliseconds(node.Offset)
maxt = ev.Timestamp - offset maxt = ev.Timestamp - offset
mint = maxt - durationMilliseconds(node.Range) mint = maxt - durationMilliseconds(node.Range)
Matrix = make(Matrix, 0, len(node.series)) matrix = getMatrix(len(node.series))
// Write all points into a single slice to avoid lots of tiny allocations.
allPoints = getPointSlice(5 * len(matrix))
)
ev.finalizers = append(ev.finalizers,
func() { putPointSlice(allPoints) },
func() { putMatrix(matrix) },
) )
for i, it := range node.iterators { for i, it := range node.iterators {
start := len(allPoints)
ss := Series{ ss := Series{
Metric: node.series[i].Labels(), Metric: node.series[i].Labels(),
Points: make([]Point, 0, 16),
} }
ok := it.Seek(maxt) ok := it.Seek(maxt)
@ -760,19 +806,22 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
t, v := buf.At() t, v := buf.At()
// Values in the buffer are guaranteed to be smaller than maxt. // Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint { if t >= mint {
ss.Points = append(ss.Points, Point{T: t, V: v}) allPoints = append(allPoints, Point{T: t, V: v})
} }
} }
// The seeked sample might also be in the range. // The seeked sample might also be in the range.
t, v = it.Values() t, v = it.Values()
if t == maxt { if t == maxt {
ss.Points = append(ss.Points, Point{T: t, V: v}) allPoints = append(allPoints, Point{T: t, V: v})
} }
ss.Points = allPoints[start:]
if len(ss.Points) > 0 { if len(ss.Points) > 0 {
Matrix = append(Matrix, ss) matrix = append(matrix, ss)
} }
} }
return Matrix return matrix
} }
func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *VectorMatching) Vector { func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *VectorMatching) Vector {

View file

@ -13,11 +13,14 @@ type BufferedSeriesIterator struct {
// NewBuffer returns a new iterator that buffers the values within the time range // NewBuffer returns a new iterator that buffers the values within the time range
// of the current element and the duration of delta before. // of the current element and the duration of delta before.
func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator {
return &BufferedSeriesIterator{ bit := &BufferedSeriesIterator{
it: it, it: it,
buf: newSampleRing(delta, 16), buf: newSampleRing(delta, 16),
lastTime: math.MinInt64, lastTime: math.MinInt64,
} }
it.Next()
return bit
} }
// PeekBack returns the previous element of the iterator. If there is none buffered, // PeekBack returns the previous element of the iterator. If there is none buffered,

View file

@ -124,6 +124,33 @@ func TestBufferedSeriesIterator(t *testing.T) {
require.False(t, it.Next(), "next succeeded unexpectedly") require.False(t, it.Next(), "next succeeded unexpectedly")
} }
func BenchmarkBufferedSeriesIterator(b *testing.B) {
var (
samples []sample
lastT int64
)
for i := 0; i < b.N; i++ {
lastT += 30
samples = append(samples, sample{
t: lastT,
v: 123, // doesn't matter
})
}
// Simulate a 5 minute rate.
it := NewBuffer(newListSeriesIterator(samples), 5*60)
b.SetBytes(int64(b.N * 16))
b.ReportAllocs()
b.ResetTimer()
for it.Next() {
// scan everything
}
require.NoError(b, it.Err())
}
type mockSeriesIterator struct { type mockSeriesIterator struct {
seek func(int64) bool seek func(int64) bool
values func() (int64, float64) values func() (int64, float64)