Improve Arrow Encoding Performance

This commite contains a few changes that improve the performance of
Arrow encoding:
1. Create an allocator based on the prometheus Pool implementation to
   reuse buffers. This results in B/op dropping by three orders of
   magnitude.
2. Reuse specific timestamp and float64 builders across series.

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
Chris Marchbanks 2023-01-17 16:28:22 -07:00
parent 8d0cb6ae3f
commit 351b70b94d
No known key found for this signature in database
GPG key ID: B7FD940BC86A8E7A
2 changed files with 135 additions and 33 deletions

View file

@ -29,9 +29,6 @@ import (
"time"
"unsafe"
"github.com/apache/arrow/go/v11/arrow"
"github.com/apache/arrow/go/v11/arrow/array"
"github.com/apache/arrow/go/v11/arrow/ipc"
"github.com/apache/arrow/go/v11/arrow/memory"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -214,6 +211,8 @@ type API struct {
remoteWriteHandler http.Handler
remoteReadHandler http.Handler
arrowAllocator memory.Allocator
}
func init() {
@ -1577,37 +1576,11 @@ func (api *API) tryArrowResponse(w http.ResponseWriter, r *http.Request, data in
w.Header().Set("Content-Type", mimeTypeArrowStream)
w.WriteHeader(http.StatusOK)
pool := memory.NewGoAllocator()
fields := []arrow.Field{
{Name: "t", Type: &arrow.TimestampType{Unit: arrow.Millisecond}},
{Name: "v", Type: arrow.PrimitiveTypes.Float64},
if api.arrowAllocator == nil {
api.arrowAllocator = newArrowAllocator()
}
for _, series := range matrix {
metadata := arrow.MetadataFrom(series.Metric.Map())
seriesSchema := arrow.NewSchema(fields, &metadata)
b := array.NewRecordBuilder(pool, seriesSchema)
b.Reserve(len(series.Points))
writer := ipc.NewWriter(w, ipc.WithAllocator(pool), ipc.WithSchema(seriesSchema))
for _, point := range series.Points {
// Since we reserve enough data for all points above we can use UnsafeAppend.
b.Field(0).(*array.TimestampBuilder).UnsafeAppend(arrow.Timestamp(point.T))
b.Field(1).(*array.Float64Builder).UnsafeAppend(point.V)
}
rec := b.NewRecord()
if err := writer.Write(rec); err != nil {
level.Error(api.logger).Log("msg", "error writing arrow response", "err", err)
return nil
}
writer.Close()
rec.Release()
b.Release()
}
return nil
enc := newArrowEncoder(api.logger, w, api.arrowAllocator)
return enc.EncodeMatrix(matrix)
}
func (api *API) respond(w http.ResponseWriter, r *http.Request, data interface{}, warnings storage.Warnings) {

129
web/api/v1/arrow.go Normal file
View file

@ -0,0 +1,129 @@
package v1
import (
"fmt"
"io"
"github.com/apache/arrow/go/v11/arrow"
"github.com/apache/arrow/go/v11/arrow/array"
"github.com/apache/arrow/go/v11/arrow/ipc"
"github.com/apache/arrow/go/v11/arrow/memory"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/pool"
)
type arrowEncoder struct {
logger log.Logger
w io.Writer
allocator memory.Allocator
}
func newArrowEncoder(logger log.Logger, w io.Writer, allocator memory.Allocator) *arrowEncoder {
return &arrowEncoder{
logger: logger,
w: w,
allocator: allocator,
}
}
func (enc *arrowEncoder) EncodeMatrix(matrix promql.Matrix) error {
longest := 0
for _, series := range matrix {
if len(series.Points) > longest {
longest = len(series.Points)
}
if len(series.Points) > 0 && series.Points[0].H != nil {
return fmt.Errorf("arrow not implemented for native histograms")
}
}
tsBuilder := array.NewTimestampBuilder(enc.allocator, &arrow.TimestampType{Unit: arrow.Millisecond})
tsBuilder.Reserve(longest)
values := array.NewFloat64Builder(enc.allocator)
values.Reserve(longest)
for _, series := range matrix {
err := enc.writeSeries(series, tsBuilder, values)
if err != nil {
level.Error(enc.logger).Log("msg", "error writing arrow response", "err", err)
return nil
}
}
return nil
}
func (enc *arrowEncoder) writeSeries(series promql.Series, timestamps *array.TimestampBuilder, values *array.Float64Builder) error {
for _, point := range series.Points {
timestamps.UnsafeAppend(arrow.Timestamp(point.T))
values.UnsafeAppend(point.V)
}
var fields = []arrow.Field{
{Name: "t", Type: timestamps.Type()},
{Name: "v", Type: values.Type()},
}
metadata := arrow.MetadataFrom(series.Metric.Map())
seriesSchema := arrow.NewSchema(fields, &metadata)
writer := ipc.NewWriter(enc.w, ipc.WithAllocator(enc.allocator), ipc.WithSchema(seriesSchema))
defer writer.Close()
b := array.NewRecordBuilder(enc.allocator, seriesSchema)
defer b.Release()
ts := timestamps.NewArray()
defer ts.Release()
vs := values.NewArray()
defer vs.Release()
rec := array.NewRecord(seriesSchema, []arrow.Array{
ts,
vs,
}, int64(ts.Len()))
defer rec.Release()
if err := writer.Write(rec); err != nil {
return err
}
return nil
}
type arrowAllocator struct {
memory.Allocator
pool *pool.Pool
}
func newArrowAllocator() *arrowAllocator {
base := memory.DefaultAllocator
return &arrowAllocator{
Allocator: memory.DefaultAllocator,
pool: pool.New(1e3, 100e6, 3, func(size int) interface{} {
return base.Allocate(size)
}),
}
}
func (a *arrowAllocator) Allocate(size int) []byte {
b := a.pool.Get(size).([]byte)
return b[:size]
}
func (a *arrowAllocator) Reallocate(size int, b []byte) []byte {
if cap(b) >= size {
return b[:size]
}
a.Free(b)
newB := a.pool.Get(size).([]byte)[:size]
copy(newB, b)
return newB
}
func (a *arrowAllocator) Free(b []byte) {
if b != nil {
a.pool.Put(b)
}
}