Initial support for returning histograms with Arrow

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
Chris Marchbanks 2023-01-13 16:00:39 -07:00
parent a6e891966f
commit bd159f63e7
No known key found for this signature in database
GPG key ID: B7FD940BC86A8E7A
2 changed files with 156 additions and 38 deletions

View file

@ -16,6 +16,7 @@ package v1
import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net"
@ -1565,49 +1566,122 @@ func (api *API) tryArrowResponse(w http.ResponseWriter, r *http.Request, data in
}
matrix := result.Result.(promql.Matrix)
// Currently histograms are not handled with Arrow, just adding a binary
// type here halves the performance of Arrow. In the future we could
// consider building a custom type in Arrow for histograms.
for _, series := range matrix {
if len(series.Points) > 0 && series.Points[0].H != nil {
return fmt.Errorf("arrow not implemented for native histograms")
}
}
w.Header().Set("Content-Type", mimeTypeArrowStream)
w.WriteHeader(http.StatusOK)
pool := memory.NewGoAllocator()
for _, series := range matrix {
var err error
if len(series.Points) > 0 && series.Points[0].H != nil {
err = api.writeArrowHistogram(w, pool, series)
} else {
err = api.writeArrow(w, pool, series)
}
if err != nil {
level.Error(api.logger).Log("msg", "error writing arrow response", "err", err)
}
}
return nil
}
func (api *API) writeArrow(w io.Writer, pool memory.Allocator, series promql.Series) error {
fields := []arrow.Field{
{Name: "t", Type: &arrow.TimestampType{Unit: arrow.Millisecond}},
{Name: "v", Type: arrow.PrimitiveTypes.Float64},
}
for _, series := range matrix {
metadata := arrow.MetadataFrom(series.Metric.Map())
seriesSchema := arrow.NewSchema(fields, &metadata)
b := array.NewRecordBuilder(pool, seriesSchema)
b.Reserve(len(series.Points))
metadata := arrow.MetadataFrom(series.Metric.Map())
seriesSchema := arrow.NewSchema(fields, &metadata)
b := array.NewRecordBuilder(pool, seriesSchema)
defer b.Release()
b.Reserve(len(series.Points))
writer := ipc.NewWriter(w, ipc.WithAllocator(pool), ipc.WithSchema(seriesSchema))
writer := ipc.NewWriter(w, ipc.WithAllocator(pool), ipc.WithSchema(seriesSchema))
defer writer.Close()
for _, point := range series.Points {
// Since we reserve enough data for all points above we can use UnsafeAppend.
b.Field(0).(*array.TimestampBuilder).UnsafeAppend(arrow.Timestamp(point.T))
b.Field(1).(*array.Float64Builder).UnsafeAppend(point.V)
}
rec := b.NewRecord()
if err := writer.Write(rec); err != nil {
level.Error(api.logger).Log("msg", "error writing arrow response", "err", err)
return nil
}
for _, point := range series.Points {
// Since we reserve enough data for all points above we can use UnsafeAppend.
b.Field(0).(*array.TimestampBuilder).UnsafeAppend(arrow.Timestamp(point.T))
b.Field(1).(*array.Float64Builder).UnsafeAppend(point.V)
}
rec := b.NewRecord()
defer rec.Release()
return writer.Write(rec)
}
writer.Close()
rec.Release()
b.Release()
func (api *API) writeArrowHistogram(w io.Writer, pool memory.Allocator, series promql.Series) error {
metadata := arrow.MetadataFrom(series.Metric.Map())
spanFields := []arrow.Field{
{Name: "offset", Type: arrow.PrimitiveTypes.Int32},
{Name: "length", Type: arrow.PrimitiveTypes.Uint32},
}
fields := []arrow.Field{
{Name: "t", Type: &arrow.TimestampType{Unit: arrow.Millisecond}},
{Name: "counterResetHint", Type: arrow.PrimitiveTypes.Uint8},
{Name: "schema", Type: arrow.PrimitiveTypes.Int32},
{Name: "zeroThreshold", Type: arrow.PrimitiveTypes.Float64},
{Name: "zeroCount", Type: arrow.PrimitiveTypes.Float64},
{Name: "count", Type: arrow.PrimitiveTypes.Float64},
{Name: "sum", Type: arrow.PrimitiveTypes.Float64},
// Spans are flattened into two separate lists for ease of Arrow encoding.
{Name: "positiveSpans", Type: arrow.ListOf(arrow.StructOf(spanFields...))},
{Name: "negativeSpans", Type: arrow.ListOf(arrow.StructOf(spanFields...))},
{Name: "positiveBuckets", Type: arrow.ListOf(arrow.PrimitiveTypes.Float64)},
{Name: "negativeBuckets", Type: arrow.ListOf(arrow.PrimitiveTypes.Float64)},
}
return nil
schema := arrow.NewSchema(fields, &metadata)
b := array.NewRecordBuilder(pool, schema)
defer b.Release()
b.Reserve(len(series.Points))
writer := ipc.NewWriter(w, ipc.WithAllocator(pool), ipc.WithSchema(schema))
defer writer.Close()
for _, point := range series.Points {
// Since we reserve enough data for all points above we can use UnsafeAppend.
b.Field(0).(*array.TimestampBuilder).UnsafeAppend(arrow.Timestamp(point.T))
b.Field(1).(*array.Uint8Builder).UnsafeAppend(uint8(point.H.CounterResetHint))
b.Field(2).(*array.Int32Builder).UnsafeAppend(point.H.Schema)
b.Field(3).(*array.Float64Builder).UnsafeAppend(point.H.ZeroThreshold)
b.Field(4).(*array.Float64Builder).UnsafeAppend(point.H.ZeroCount)
b.Field(5).(*array.Float64Builder).UnsafeAppend(point.H.Count)
b.Field(6).(*array.Float64Builder).UnsafeAppend(point.H.Sum)
positiveSpanBuilder := b.Field(7).(*array.ListBuilder)
positiveSpanBuilder.Append(true)
positiveSpanValueBuilder := positiveSpanBuilder.ValueBuilder().(*array.StructBuilder)
positiveSpanValueBuilder.Reserve(len(point.H.PositiveSpans))
for _, span := range point.H.PositiveSpans {
positiveSpanValueBuilder.FieldBuilder(0).(*array.Int32Builder).UnsafeAppend(span.Offset)
positiveSpanValueBuilder.FieldBuilder(1).(*array.Uint32Builder).UnsafeAppend(span.Length)
positiveSpanValueBuilder.Append(true)
}
negativeSpanBuilder := b.Field(8).(*array.ListBuilder)
negativeSpanBuilder.Append(true)
negativeSpanValueBuilder := negativeSpanBuilder.ValueBuilder().(*array.StructBuilder)
negativeSpanValueBuilder.Reserve(len(point.H.NegativeSpans))
for _, span := range point.H.NegativeSpans {
negativeSpanValueBuilder.FieldBuilder(0).(*array.Int32Builder).UnsafeAppend(span.Offset)
negativeSpanValueBuilder.FieldBuilder(1).(*array.Uint32Builder).UnsafeAppend(span.Length)
negativeSpanValueBuilder.Append(true)
}
positiveBucketsBuilder := b.Field(9).(*array.ListBuilder)
positiveBucketsBuilder.Append(true)
positiveBucketsBuilder.ValueBuilder().(*array.Float64Builder).AppendValues(point.H.PositiveBuckets, nil)
negativeBucketsBuilder := b.Field(10).(*array.ListBuilder)
negativeBucketsBuilder.Append(true)
negativeBucketsBuilder.ValueBuilder().(*array.Float64Builder).AppendValues(point.H.NegativeBuckets, nil)
}
rec := b.NewRecord()
defer rec.Release()
return writer.Write(rec)
}
func (api *API) respond(w http.ResponseWriter, r *http.Request, data interface{}, warnings storage.Warnings) {

View file

@ -3244,15 +3244,7 @@ func arrowToJSONResponse(reader io.Reader) ([]byte, error) {
if series.Metric == nil {
series.Metric = labelsFromArrowMetadata(schema.Metadata())
}
rec := r.Record()
for i := 0; i < int(rec.NumRows()); i++ {
t := rec.Column(0).(*array.Timestamp).Value(i)
v := rec.Column(1).(*array.Float64).Value(i)
series.Points = append(series.Points, promql.Point{
T: int64(t),
V: v,
})
}
series.Points = pointsFromRecord(r.Record())
}
data = append(data, series)
}
@ -3277,6 +3269,58 @@ func labelsFromArrowMetadata(metadata arrow.Metadata) labels.Labels {
return labels.FromStrings(strs...)
}
func pointsFromRecord(rec array.Record) []promql.Point {
points := make([]promql.Point, 0, rec.NumRows())
isHistogram := !rec.Schema().HasField("v")
for i := 0; i < int(rec.NumRows()); i++ {
t := rec.Column(0).(*array.Timestamp).Value(i)
if isHistogram {
h := &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(rec.Column(1).(*array.Uint8).Value(i)),
Schema: rec.Column(2).(*array.Int32).Value(i),
ZeroThreshold: rec.Column(3).(*array.Float64).Value(i),
ZeroCount: rec.Column(4).(*array.Float64).Value(i),
Count: rec.Column(5).(*array.Float64).Value(i),
Sum: rec.Column(6).(*array.Float64).Value(i),
PositiveSpans: spans(rec.Column(7).(*array.List), i),
NegativeSpans: spans(rec.Column(8).(*array.List), i),
PositiveBuckets: buckets(rec.Column(9).(*array.List), i),
NegativeBuckets: buckets(rec.Column(10).(*array.List), i),
}
points = append(points, promql.Point{
T: int64(t),
H: h,
})
} else {
v := rec.Column(1).(*array.Float64).Value(i)
points = append(points, promql.Point{
T: int64(t),
V: v,
})
}
}
return points
}
func spans(encoded *array.List, i int) []histogram.Span {
spans := make([]histogram.Span, 0, encoded.ListValues().Len())
for i := 0; i < encoded.ListValues().Len(); i++ {
spans = append(spans, histogram.Span{
Offset: encoded.ListValues().(*array.Struct).Field(0).(*array.Int32).Value(i),
Length: encoded.ListValues().(*array.Struct).Field(1).(*array.Uint32).Value(i),
})
}
return spans
}
func buckets(encoded *array.List, i int) []float64 {
offsets := encoded.Offsets()
return encoded.ListValues().(*array.Float64).Float64Values()[offsets[i]:offsets[i+1]]
}
func TestTSDBStatus(t *testing.T) {
tsdb := &fakeDB{}
tsdbStatusAPI := func(api *API) apiFunc { return api.serveTSDBStatus }