diff --git a/promql/value.go b/promql/value.go index 4db976e979..10dcdc6f62 100644 --- a/promql/value.go +++ b/promql/value.go @@ -17,9 +17,15 @@ import ( "encoding/json" "errors" "fmt" + "io" "strconv" "strings" + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/ipc" + "github.com/apache/arrow/go/arrow/memory" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" @@ -101,6 +107,34 @@ func (s Series) MarshalJSON() ([]byte, error) { return json.Marshal(series) } +func (s Series) writeArrow(w io.Writer, pool memory.Allocator) error { + fields := []arrow.Field{ + {Name: "t", Type: &arrow.TimestampType{Unit: arrow.Millisecond}}, + {Name: "v", Type: arrow.PrimitiveTypes.Float64}, + } + metadata := arrow.MetadataFrom(s.Metric.Map()) + schema := arrow.NewSchema(fields, &metadata) + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + b.Reserve(len(s.Points)) + + writer := ipc.NewWriter(w, ipc.WithAllocator(pool), ipc.WithSchema(schema)) + defer writer.Close() + + for _, point := range s.Points { + // Since we reserve enough data for all points above we can use UnsafeAppend. + b.Field(0).(*array.TimestampBuilder).UnsafeAppend(arrow.Timestamp(point.T)) + b.Field(1).(*array.Float64Builder).UnsafeAppend(point.V) + } + rec := b.NewRecord() + defer rec.Release() + if err := writer.Write(rec); err != nil { + return err + } + + return nil +} + // Point represents a single data point for a given timestamp. // If H is not nil, then this is a histogram point and only (T, H) is valid. // If H is nil, then only (T, V) is valid. @@ -289,6 +323,33 @@ func (m Matrix) ContainsSameLabelset() bool { } } +func (m Matrix) WriteArrow(w io.Writer) error { + pool := memory.DefaultAllocator + for _, series := range m { + if err := series.writeArrow(w, pool); err != nil { + return err + } + } + return nil +} + +// ArrowEncodeable determines if a specific matrix can be encoded with Arrow or +// not. +// +// Currently native histograms are not handled with Arrow. Just adding a binary +// type here halves the performance of Arrow. In the future we could consider +// building a custom type in Arrow for histograms, or return a different schema +// for native histograms +func (m Matrix) ArrowEncodeable() bool { + for _, s := range m { + if len(s.Points) > 0 && s.Points[0].H != nil { + return false + } + } + + return true +} + // Result holds the resulting value of an execution or an error // if any occurred. type Result struct { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index dbb1b4a0b7..c8d7e2af5d 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -16,6 +16,7 @@ package v1 import ( "context" "fmt" + "io" "math" "math/rand" "net" @@ -29,10 +30,6 @@ import ( "time" "unsafe" - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/apache/arrow/go/arrow/ipc" - "github.com/apache/arrow/go/arrow/memory" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/regexp" @@ -1554,59 +1551,31 @@ func (api *API) cleanTombstones(r *http.Request) apiFuncResult { return apiFuncResult{nil, nil, nil, nil} } +type arrowWriter interface { + WriteArrow(io.Writer) error + ArrowEncodeable() bool +} + func (api *API) tryArrowResponse(w http.ResponseWriter, r *http.Request, data interface{}, warnings storage.Warnings) error { result, ok := data.(*queryData) if !ok { return fmt.Errorf("invalid result data type: %T", data) } - if result.ResultType != parser.ValueTypeMatrix { + arrowWriter, ok := result.Result.(arrowWriter) + if !ok { return errors.Errorf("arrow not implemented for %s", result.ResultType) } - matrix := result.Result.(promql.Matrix) - // Currently histograms are not handled with Arrow, just adding a binary - // type here halves the performance of Arrow. In the future we could - // consider building a custom type in Arrow for histograms. - for _, series := range matrix { - if len(series.Points) > 0 && series.Points[0].H != nil { - return fmt.Errorf("arrow not implemented for native histograms") - } + if !arrowWriter.ArrowEncodeable() { + return fmt.Errorf("could not encode this result with arrow") } w.Header().Set("Content-Type", mimeTypeArrowStream) w.WriteHeader(http.StatusOK) - - pool := memory.NewGoAllocator() - fields := []arrow.Field{ - {Name: "t", Type: &arrow.TimestampType{Unit: arrow.Millisecond}}, - {Name: "v", Type: arrow.PrimitiveTypes.Float64}, + if err := arrowWriter.WriteArrow(w); err != nil { + level.Error(api.logger).Log("msg", "error writing arrow response", "err", err) } - - for _, series := range matrix { - metadata := arrow.MetadataFrom(series.Metric.Map()) - seriesSchema := arrow.NewSchema(fields, &metadata) - b := array.NewRecordBuilder(pool, seriesSchema) - b.Reserve(len(series.Points)) - - writer := ipc.NewWriter(w, ipc.WithAllocator(pool), ipc.WithSchema(seriesSchema)) - - for _, point := range series.Points { - // Since we reserve enough data for all points above we can use UnsafeAppend. - b.Field(0).(*array.TimestampBuilder).UnsafeAppend(arrow.Timestamp(point.T)) - b.Field(1).(*array.Float64Builder).UnsafeAppend(point.V) - } - rec := b.NewRecord() - if err := writer.Write(rec); err != nil { - level.Error(api.logger).Log("msg", "error writing arrow response", "err", err) - return nil - } - - writer.Close() - rec.Release() - b.Release() - } - return nil }