Merge pull request #10604 from prometheus/beorn7/sparsehistogram

Support sparse histograms in the JSON query API
This commit is contained in:
Björn Rabenstein 2022-04-26 15:43:55 +02:00 committed by GitHub
commit bcc919cb19
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 273 additions and 10 deletions

View file

@ -65,8 +65,8 @@ func (s Scalar) MarshalJSON() ([]byte, error) {
// Series is a stream of data points belonging to a metric. // Series is a stream of data points belonging to a metric.
type Series struct { type Series struct {
Metric labels.Labels `json:"metric"` Metric labels.Labels
Points []Point `json:"values"` Points []Point
} }
func (s Series) String() string { func (s Series) String() string {
@ -77,6 +77,31 @@ func (s Series) String() string {
return fmt.Sprintf("%s =>\n%s", s.Metric, strings.Join(vals, "\n")) return fmt.Sprintf("%s =>\n%s", s.Metric, strings.Join(vals, "\n"))
} }
// MarshalJSON is mirrored in web/api/v1/api.go for efficiency reasons.
// This implementation is still provided for debug purposes and usage
// without jsoniter.
func (s Series) MarshalJSON() ([]byte, error) {
// Note that this is rather inefficient because it re-creates the whole
// series, just separated by Histogram Points and Value Points. For API
// purposes, there is a more efficcient jsoniter implementation in
// web/api/v1/api.go.
series := struct {
M labels.Labels `json:"metric"`
V []Point `json:"values,omitempty"`
H []Point `json:"histograms,omitempty"`
}{
M: s.Metric,
}
for _, p := range s.Points {
if p.H == nil {
series.V = append(series.V, p)
continue
}
series.H = append(series.H, p)
}
return json.Marshal(series)
}
// Point represents a single data point for a given timestamp. // Point represents a single data point for a given timestamp.
// If H is not nil, then this is a histogram point and only (T, H) is valid. // If H is not nil, then this is a histogram point and only (T, H) is valid.
// If H is nil, then only (T, V) is valid. // If H is nil, then only (T, V) is valid.
@ -106,10 +131,43 @@ func (p Point) String() string {
// slightly different results in terms of formatting and rounding of the // slightly different results in terms of formatting and rounding of the
// timestamp. // timestamp.
func (p Point) MarshalJSON() ([]byte, error) { func (p Point) MarshalJSON() ([]byte, error) {
// TODO(beorn7): Support histogram. if p.H == nil {
v := strconv.FormatFloat(p.V, 'f', -1, 64) v := strconv.FormatFloat(p.V, 'f', -1, 64)
return json.Marshal([...]interface{}{float64(p.T) / 1000, v}) return json.Marshal([...]interface{}{float64(p.T) / 1000, v})
} }
h := struct {
Count string `json:"count"`
Sum string `json:"sum"`
Buckets [][]interface{} `json:"buckets,omitempty"`
}{
Count: strconv.FormatFloat(p.H.Count, 'f', -1, 64),
Sum: strconv.FormatFloat(p.H.Sum, 'f', -1, 64),
}
it := p.H.AllBucketIterator()
for it.Next() {
bucket := it.At()
boundaries := 2 // Exclusive on both sides AKA open interval.
if bucket.LowerInclusive {
if bucket.UpperInclusive {
boundaries = 3 // Inclusive on both sides AKA closed interval.
} else {
boundaries = 1 // Inclusive only on lower end AKA right open.
}
} else {
if bucket.UpperInclusive {
boundaries = 0 // Inclusive only on upper end AKA left open.
}
}
bucketToMarshal := []interface{}{
boundaries,
strconv.FormatFloat(bucket.Lower, 'f', -1, 64),
strconv.FormatFloat(bucket.Upper, 'f', -1, 64),
strconv.FormatFloat(bucket.Count, 'f', -1, 64),
}
h.Buckets = append(h.Buckets, bucketToMarshal)
}
return json.Marshal([...]interface{}{float64(p.T) / 1000, h})
}
// Sample is a single sample belonging to a metric. // Sample is a single sample belonging to a metric.
type Sample struct { type Sample struct {
@ -122,7 +180,10 @@ func (s Sample) String() string {
return fmt.Sprintf("%s => %s", s.Metric, s.Point) return fmt.Sprintf("%s => %s", s.Metric, s.Point)
} }
// MarshalJSON is mirrored in web/api/v1/api.go with jsoniter because Point
// wouldn't be marshaled with jsoniter in all cases otherwise.
func (s Sample) MarshalJSON() ([]byte, error) { func (s Sample) MarshalJSON() ([]byte, error) {
if s.Point.H == nil {
v := struct { v := struct {
M labels.Labels `json:"metric"` M labels.Labels `json:"metric"`
V Point `json:"value"` V Point `json:"value"`
@ -132,6 +193,15 @@ func (s Sample) MarshalJSON() ([]byte, error) {
} }
return json.Marshal(v) return json.Marshal(v)
} }
h := struct {
M labels.Labels `json:"metric"`
H Point `json:"histogram"`
}{
M: s.Metric,
H: s.Point,
}
return json.Marshal(h)
}
// Vector is basically only an alias for model.Samples, but the // Vector is basically only an alias for model.Samples, but the
// contract is that in a Vector, all Samples have the same timestamp. // contract is that in a Vector, all Samples have the same timestamp.

View file

@ -40,6 +40,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/timestamp"
@ -202,6 +203,8 @@ type API struct {
} }
func init() { func init() {
jsoniter.RegisterTypeEncoderFunc("promql.Series", marshalSeriesJSON, marshalSeriesJSONIsEmpty)
jsoniter.RegisterTypeEncoderFunc("promql.Sample", marshalSampleJSON, marshalSampleJSONIsEmpty)
jsoniter.RegisterTypeEncoderFunc("promql.Point", marshalPointJSON, marshalPointJSONIsEmpty) jsoniter.RegisterTypeEncoderFunc("promql.Point", marshalPointJSON, marshalPointJSONIsEmpty)
jsoniter.RegisterTypeEncoderFunc("exemplar.Exemplar", marshalExemplarJSON, marshalExemplarJSONEmpty) jsoniter.RegisterTypeEncoderFunc("exemplar.Exemplar", marshalExemplarJSON, marshalExemplarJSONEmpty)
} }
@ -1813,13 +1816,134 @@ OUTER:
return matcherSets, nil return matcherSets, nil
} }
// marshalSeriesJSON writes something like the following:
//
// {
// "metric" : {
// "__name__" : "up",
// "job" : "prometheus",
// "instance" : "localhost:9090"
// },
// "values": [
// [ 1435781451.781, "1" ],
// < more values>
// ],
// "histograms": [
// [ 1435781451.781, { < histogram, see below > } ],
// < more histograms >
// ],
// },
func marshalSeriesJSON(ptr unsafe.Pointer, stream *jsoniter.Stream) {
s := *((*promql.Series)(ptr))
stream.WriteObjectStart()
stream.WriteObjectField(`metric`)
m, err := s.Metric.MarshalJSON()
if err != nil {
stream.Error = err
return
}
stream.SetBuffer(append(stream.Buffer(), m...))
// We make two passes through the series here: In the first marshaling
// all value points, in the second marshaling all histogram
// points. That's probably cheaper than just one pass in which we copy
// out histogram Points into a newly allocated slice for separate
// marshaling. (Could be benchmarked, though.)
var foundValue, foundHistogram bool
for _, p := range s.Points {
if p.H == nil {
stream.WriteMore()
if !foundValue {
stream.WriteObjectField(`values`)
stream.WriteArrayStart()
}
foundValue = true
marshalPointJSON(unsafe.Pointer(&p), stream)
} else {
foundHistogram = true
}
}
if foundValue {
stream.WriteArrayEnd()
}
if foundHistogram {
firstHistogram := true
for _, p := range s.Points {
if p.H != nil {
stream.WriteMore()
if firstHistogram {
stream.WriteObjectField(`histograms`)
stream.WriteArrayStart()
}
firstHistogram = false
marshalPointJSON(unsafe.Pointer(&p), stream)
}
}
stream.WriteArrayEnd()
}
stream.WriteObjectEnd()
}
func marshalSeriesJSONIsEmpty(ptr unsafe.Pointer) bool {
return false
}
// marshalSampleJSON writes something like the following for normal value samples:
//
// {
// "metric" : {
// "__name__" : "up",
// "job" : "prometheus",
// "instance" : "localhost:9090"
// },
// "value": [ 1435781451.781, "1" ]
// },
//
// For histogram samples, it writes something like this:
//
// {
// "metric" : {
// "__name__" : "up",
// "job" : "prometheus",
// "instance" : "localhost:9090"
// },
// "histogram": [ 1435781451.781, { < histogram, see below > } ]
// },
func marshalSampleJSON(ptr unsafe.Pointer, stream *jsoniter.Stream) {
s := *((*promql.Sample)(ptr))
stream.WriteObjectStart()
stream.WriteObjectField(`metric`)
m, err := s.Metric.MarshalJSON()
if err != nil {
stream.Error = err
return
}
stream.SetBuffer(append(stream.Buffer(), m...))
stream.WriteMore()
if s.Point.H == nil {
stream.WriteObjectField(`value`)
} else {
stream.WriteObjectField(`histogram`)
}
marshalPointJSON(unsafe.Pointer(&s.Point), stream)
stream.WriteObjectEnd()
}
func marshalSampleJSONIsEmpty(ptr unsafe.Pointer) bool {
return false
}
// marshalPointJSON writes `[ts, "val"]`. // marshalPointJSON writes `[ts, "val"]`.
func marshalPointJSON(ptr unsafe.Pointer, stream *jsoniter.Stream) { func marshalPointJSON(ptr unsafe.Pointer, stream *jsoniter.Stream) {
p := *((*promql.Point)(ptr)) p := *((*promql.Point)(ptr))
stream.WriteArrayStart() stream.WriteArrayStart()
marshalTimestamp(p.T, stream) marshalTimestamp(p.T, stream)
stream.WriteMore() stream.WriteMore()
if p.H == nil {
marshalValue(p.V, stream) marshalValue(p.V, stream)
} else {
marshalHistogram(p.H, stream)
}
stream.WriteArrayEnd() stream.WriteArrayEnd()
} }
@ -1827,6 +1951,74 @@ func marshalPointJSONIsEmpty(ptr unsafe.Pointer) bool {
return false return false
} }
// marshalHistogramJSON writes something like:
//
// {
// "count": "42",
// "sum": "34593.34",
// "buckets": [
// [ 3, "-0.25", "0.25", "3"],
// [ 0, "0.25", "0.5", "12"],
// [ 0, "0.5", "1", "21"],
// [ 0, "2", "4", "6"]
// ]
// }
//
// The 1st element in each bucket array determines if the boundaries are
// inclusive (AKA closed) or exclusive (AKA open):
// 0: lower exclusive, upper inclusive
// 1: lower inclusive, upper exclusive
// 2: both exclusive
// 3: both inclusive
//
// The 2nd and 3rd elements are the lower and upper boundary. The 4th element is
// the bucket count.
func marshalHistogram(h *histogram.FloatHistogram, stream *jsoniter.Stream) {
stream.WriteObjectStart()
stream.WriteObjectField(`count`)
marshalValue(h.Count, stream)
stream.WriteMore()
stream.WriteObjectField(`sum`)
marshalValue(h.Sum, stream)
bucketFound := false
it := h.AllBucketIterator()
for it.Next() {
stream.WriteMore()
if !bucketFound {
stream.WriteObjectField(`buckets`)
stream.WriteArrayStart()
}
bucketFound = true
bucket := it.At()
boundaries := 2 // Exclusive on both sides AKA open interval.
if bucket.LowerInclusive {
if bucket.UpperInclusive {
boundaries = 3 // Inclusive on both sides AKA closed interval.
} else {
boundaries = 1 // Inclusive only on lower end AKA right open.
}
} else {
if bucket.UpperInclusive {
boundaries = 0 // Inclusive only on upper end AKA left open.
}
}
stream.WriteArrayStart()
stream.WriteInt(boundaries)
stream.WriteMore()
marshalValue(bucket.Lower, stream)
stream.WriteMore()
marshalValue(bucket.Upper, stream)
stream.WriteMore()
marshalValue(bucket.Count, stream)
stream.WriteArrayEnd()
}
if bucketFound {
stream.WriteArrayEnd()
}
stream.WriteObjectEnd()
}
// marshalExemplarJSON writes. // marshalExemplarJSON writes.
// { // {
// labels: <labels>, // labels: <labels>,

View file

@ -2784,6 +2784,7 @@ func TestRespond(t *testing.T) {
Result: promql.Matrix{ Result: promql.Matrix{
promql.Series{ promql.Series{
Points: []promql.Point{{V: 1, T: 1000}}, Points: []promql.Point{{V: 1, T: 1000}},
// TODO(beorn7): Add histogram points.
Metric: labels.FromStrings("__name__", "foo"), Metric: labels.FromStrings("__name__", "foo"),
}, },
}, },