diff --git a/promql/engine.go b/promql/engine.go index 23a5bb93e8..33b3667146 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -20,6 +20,7 @@ import ( "runtime" "sort" "strconv" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -524,6 +525,14 @@ type evaluator struct { ctx context.Context Timestamp int64 // time in milliseconds + + finalizers []func() +} + +func (ev *evaluator) close() { + for _, f := range ev.finalizers { + f() + } } // fatalf causes a panic with the input formatted into an error. @@ -621,6 +630,7 @@ func (ev *evaluator) evalOneOf(e Expr, t1, t2 ValueType) Value { func (ev *evaluator) Eval(expr Expr) (v Value, err error) { defer ev.recover(&err) + defer ev.close() return ev.eval(expr), nil } @@ -732,19 +742,55 @@ func (ev *evaluator) vectorSelector(node *VectorSelector) Vector { return vec } +var pointPool = sync.Pool{} + +func getPointSlice(sz int) []Point { + p := pointPool.Get() + if p != nil { + return p.([]Point) + } + return make([]Point, 0, sz) +} + +func putPointSlice(p []Point) { + pointPool.Put(p[:0]) +} + +var matrixPool = sync.Pool{} + +func getMatrix(sz int) Matrix { + m := matrixPool.Get() + if m != nil { + return m.(Matrix) + } + return make(Matrix, 0, sz) +} + +func putMatrix(m Matrix) { + matrixPool.Put(m[:0]) +} + // matrixSelector evaluates a *MatrixSelector expression. func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { var ( offset = durationMilliseconds(node.Offset) maxt = ev.Timestamp - offset mint = maxt - durationMilliseconds(node.Range) - Matrix = make(Matrix, 0, len(node.series)) + matrix = getMatrix(len(node.series)) + // Write all points into a single slice to avoid lots of tiny allocations. + allPoints = getPointSlice(5 * len(matrix)) + ) + + ev.finalizers = append(ev.finalizers, + func() { putPointSlice(allPoints) }, + func() { putMatrix(matrix) }, ) for i, it := range node.iterators { + start := len(allPoints) + ss := Series{ Metric: node.series[i].Labels(), - Points: make([]Point, 0, 16), } ok := it.Seek(maxt) @@ -760,19 +806,22 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { t, v := buf.At() // Values in the buffer are guaranteed to be smaller than maxt. if t >= mint { - ss.Points = append(ss.Points, Point{T: t, V: v}) + allPoints = append(allPoints, Point{T: t, V: v}) } } // The seeked sample might also be in the range. t, v = it.Values() if t == maxt { - ss.Points = append(ss.Points, Point{T: t, V: v}) + allPoints = append(allPoints, Point{T: t, V: v}) } + + ss.Points = allPoints[start:] + if len(ss.Points) > 0 { - Matrix = append(Matrix, ss) + matrix = append(matrix, ss) } } - return Matrix + return matrix } func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *VectorMatching) Vector { diff --git a/storage/buffer.go b/storage/buffer.go index 3076354680..cb989764f5 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -13,11 +13,14 @@ type BufferedSeriesIterator struct { // NewBuffer returns a new iterator that buffers the values within the time range // of the current element and the duration of delta before. func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { - return &BufferedSeriesIterator{ + bit := &BufferedSeriesIterator{ it: it, buf: newSampleRing(delta, 16), lastTime: math.MinInt64, } + it.Next() + + return bit } // PeekBack returns the previous element of the iterator. If there is none buffered, diff --git a/storage/buffer_test.go b/storage/buffer_test.go index e433fb9fa6..c1f34cb91c 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -124,6 +124,33 @@ func TestBufferedSeriesIterator(t *testing.T) { require.False(t, it.Next(), "next succeeded unexpectedly") } +func BenchmarkBufferedSeriesIterator(b *testing.B) { + var ( + samples []sample + lastT int64 + ) + for i := 0; i < b.N; i++ { + lastT += 30 + + samples = append(samples, sample{ + t: lastT, + v: 123, // doesn't matter + }) + } + + // Simulate a 5 minute rate. + it := NewBuffer(newListSeriesIterator(samples), 5*60) + + b.SetBytes(int64(b.N * 16)) + b.ReportAllocs() + b.ResetTimer() + + for it.Next() { + // scan everything + } + require.NoError(b, it.Err()) +} + type mockSeriesIterator struct { seek func(int64) bool values func() (int64, float64)