Revert "Move Arrow writing code to the Matrix and Series types"

This reverts commit c74dcecff06ae2e9b98c3c0d7bd27a46cbcf7858.

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
Chris Marchbanks 2023-01-13 11:18:38 -07:00
parent 61c6792853
commit a6e891966f
No known key found for this signature in database
GPG key ID: B7FD940BC86A8E7A
2 changed files with 43 additions and 73 deletions

View file

@ -17,15 +17,9 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"strings"
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
@ -107,34 +101,6 @@ func (s Series) MarshalJSON() ([]byte, error) {
return json.Marshal(series)
}
func (s Series) writeArrow(w io.Writer, pool memory.Allocator) error {
fields := []arrow.Field{
{Name: "t", Type: &arrow.TimestampType{Unit: arrow.Millisecond}},
{Name: "v", Type: arrow.PrimitiveTypes.Float64},
}
metadata := arrow.MetadataFrom(s.Metric.Map())
schema := arrow.NewSchema(fields, &metadata)
b := array.NewRecordBuilder(pool, schema)
defer b.Release()
b.Reserve(len(s.Points))
writer := ipc.NewWriter(w, ipc.WithAllocator(pool), ipc.WithSchema(schema))
defer writer.Close()
for _, point := range s.Points {
// Since we reserve enough data for all points above we can use UnsafeAppend.
b.Field(0).(*array.TimestampBuilder).UnsafeAppend(arrow.Timestamp(point.T))
b.Field(1).(*array.Float64Builder).UnsafeAppend(point.V)
}
rec := b.NewRecord()
defer rec.Release()
if err := writer.Write(rec); err != nil {
return err
}
return nil
}
// Point represents a single data point for a given timestamp.
// If H is not nil, then this is a histogram point and only (T, H) is valid.
// If H is nil, then only (T, V) is valid.
@ -323,33 +289,6 @@ func (m Matrix) ContainsSameLabelset() bool {
}
}
func (m Matrix) WriteArrow(w io.Writer) error {
pool := memory.DefaultAllocator
for _, series := range m {
if err := series.writeArrow(w, pool); err != nil {
return err
}
}
return nil
}
// ArrowEncodeable determines if a specific matrix can be encoded with Arrow or
// not.
//
// Currently native histograms are not handled with Arrow. Just adding a binary
// type here halves the performance of Arrow. In the future we could consider
// building a custom type in Arrow for histograms, or return a different schema
// for native histograms
func (m Matrix) ArrowEncodeable() bool {
for _, s := range m {
if len(s.Points) > 0 && s.Points[0].H != nil {
return false
}
}
return true
}
// Result holds the resulting value of an execution or an error
// if any occurred.
type Result struct {

View file

@ -16,7 +16,6 @@ package v1
import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net"
@ -30,6 +29,10 @@ import (
"time"
"unsafe"
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/regexp"
@ -1551,31 +1554,59 @@ func (api *API) cleanTombstones(r *http.Request) apiFuncResult {
return apiFuncResult{nil, nil, nil, nil}
}
type arrowWriter interface {
WriteArrow(io.Writer) error
ArrowEncodeable() bool
}
func (api *API) tryArrowResponse(w http.ResponseWriter, r *http.Request, data interface{}, warnings storage.Warnings) error {
result, ok := data.(*queryData)
if !ok {
return fmt.Errorf("invalid result data type: %T", data)
}
arrowWriter, ok := result.Result.(arrowWriter)
if !ok {
if result.ResultType != parser.ValueTypeMatrix {
return errors.Errorf("arrow not implemented for %s", result.ResultType)
}
matrix := result.Result.(promql.Matrix)
if !arrowWriter.ArrowEncodeable() {
return fmt.Errorf("could not encode this result with arrow")
// Currently histograms are not handled with Arrow, just adding a binary
// type here halves the performance of Arrow. In the future we could
// consider building a custom type in Arrow for histograms.
for _, series := range matrix {
if len(series.Points) > 0 && series.Points[0].H != nil {
return fmt.Errorf("arrow not implemented for native histograms")
}
}
w.Header().Set("Content-Type", mimeTypeArrowStream)
w.WriteHeader(http.StatusOK)
if err := arrowWriter.WriteArrow(w); err != nil {
level.Error(api.logger).Log("msg", "error writing arrow response", "err", err)
pool := memory.NewGoAllocator()
fields := []arrow.Field{
{Name: "t", Type: &arrow.TimestampType{Unit: arrow.Millisecond}},
{Name: "v", Type: arrow.PrimitiveTypes.Float64},
}
for _, series := range matrix {
metadata := arrow.MetadataFrom(series.Metric.Map())
seriesSchema := arrow.NewSchema(fields, &metadata)
b := array.NewRecordBuilder(pool, seriesSchema)
b.Reserve(len(series.Points))
writer := ipc.NewWriter(w, ipc.WithAllocator(pool), ipc.WithSchema(seriesSchema))
for _, point := range series.Points {
// Since we reserve enough data for all points above we can use UnsafeAppend.
b.Field(0).(*array.TimestampBuilder).UnsafeAppend(arrow.Timestamp(point.T))
b.Field(1).(*array.Float64Builder).UnsafeAppend(point.V)
}
rec := b.NewRecord()
if err := writer.Write(rec); err != nil {
level.Error(api.logger).Log("msg", "error writing arrow response", "err", err)
return nil
}
writer.Close()
rec.Release()
b.Release()
}
return nil
}