POC parsing a compact native histogram format

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
Chris Marchbanks 2024-01-05 13:22:59 -07:00
parent a87c7b5084
commit 93ccaa5f70
No known key found for this signature in database
GPG key ID: B7FD940BC86A8E7A
2 changed files with 183 additions and 7 deletions

View file

@ -25,7 +25,6 @@ import (
"strings" "strings"
"unicode/utf8" "unicode/utf8"
"github.com/gogo/protobuf/jsonpb"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
@ -504,9 +503,7 @@ func (p *OpenMetricsParser) getHistogramValue(t token) (*histogram.Histogram, er
return nil, p.parseError("expected value after metric", t) return nil, p.parseError("expected value after metric", t)
} }
h := dto.Histogram{} h, err := parseHistogram(p.l.buf()[1:])
unparsed := yoloString(p.l.buf()[1:])
err := jsonpb.UnmarshalString(unparsed, &h)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -515,6 +512,185 @@ func (p *OpenMetricsParser) getHistogramValue(t token) (*histogram.Histogram, er
if p.mtype == model.MetricTypeGaugeHistogram { if p.mtype == model.MetricTypeGaugeHistogram {
ht = dto.MetricType_GAUGE_HISTOGRAM ht = dto.MetricType_GAUGE_HISTOGRAM
} }
sh := convertHistogram(&h, ht) sh := convertHistogram(h, ht)
return &sh, nil return &sh, nil
} }
func parseHistogram(val []byte) (*dto.Histogram, error) {
r := bytes.NewReader(val)
ch, _, err := r.ReadRune()
if err != nil || ch != '{' {
return nil, fmt.Errorf("expected histogram to start with '{': %w", err)
}
h := dto.Histogram{}
for {
key, err := readKey(r)
if err != nil {
break
}
switch key {
case "schema":
var v int32
_, err := fmt.Fscanf(r, "%d", &v)
if err != nil {
return nil, err
}
h.Schema = v
case "zero_threshold":
var v float64
_, err := fmt.Fscanf(r, "%f", &v)
if err != nil {
return nil, err
}
h.ZeroThreshold = v
case "zero_count":
var v uint64
_, err := fmt.Fscanf(r, "%d", &v)
if err != nil {
return nil, err
}
h.ZeroCount = v
case "sample_count":
var v uint64
_, err := fmt.Fscanf(r, "%d", &v)
if err != nil {
return nil, err
}
h.SampleCount = v
case "sample_sum":
var v float64
_, err := fmt.Fscanf(r, "%f", &v)
if err != nil {
return nil, err
}
h.SampleSum = v
case "positive_span":
spans, err := parseSpans(r)
if err != nil {
return nil, err
}
h.PositiveSpan = spans
case "negative_span":
spans, err := parseSpans(r)
if err != nil {
return nil, err
}
h.NegativeSpan = spans
case "positive_delta":
deltas, err := parseDeltas(r)
if err != nil {
return nil, err
}
h.PositiveDelta = deltas
case "negative_delta":
deltas, err := parseDeltas(r)
if err != nil {
return nil, err
}
h.NegativeDelta = deltas
default:
return nil, fmt.Errorf("unknown key: '%s'", key)
}
}
return &h, nil
}
func readKey(r *bytes.Reader) (string, error) {
var b strings.Builder
for {
ch, _, err := r.ReadRune()
if err != nil {
return "", err
}
if ch == ',' {
continue
}
if !isKeyRune(ch) {
if ch == '}' {
return "", io.EOF
}
return b.String(), nil
}
_, err = b.WriteRune(ch)
if err != nil {
return "", err
}
}
}
func isKeyRune(ch rune) bool {
return (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || ch == '_'
}
func parseSpans(r *bytes.Reader) ([]dto.BucketSpan, error) {
ch, _, err := r.ReadRune()
if err != nil {
return nil, err
}
if ch != '[' {
return nil, errors.New("expected spans to begin with '['")
}
spans := []dto.BucketSpan{}
for {
ch, _, err := r.ReadRune()
if err != nil {
return nil, err
}
if ch == ',' {
continue
}
if ch == ']' {
return spans, nil
}
// Unread the first character of the number before parsing the bucket.
if err = r.UnreadRune(); err != nil {
return nil, err
}
var (
offset int32
length uint32
)
_, err = fmt.Fscanf(r, "%d:%d", &offset, &length)
if err != nil {
return nil, fmt.Errorf("could not parse bucket: %w", err)
}
spans = append(spans, dto.BucketSpan{
Offset: offset,
Length: length,
})
}
}
func parseDeltas(r *bytes.Reader) ([]int64, error) {
ch, _, err := r.ReadRune()
if err != nil {
return nil, err
}
if ch != '[' {
return nil, errors.New("expected deltas to begin with '['")
}
deltas := []int64{}
for {
ch, _, err := r.ReadRune()
if err != nil {
return nil, err
}
if ch == ',' {
continue
}
if ch == ']' {
return deltas, nil
}
// Unread the first character of the number before parsing the value.
if err = r.UnreadRune(); err != nil {
return nil, err
}
var delta int64
fmt.Fscanf(r, "%d", &delta)
deltas = append(deltas, delta)
}
}

View file

@ -68,7 +68,7 @@ testmetric{label="\"bar\""} 1
# TYPE foo counter # TYPE foo counter
foo_total 17.0 1520879607.789 # {id="counter-test"} 5 foo_total 17.0 1520879607.789 # {id="counter-test"} 5
# TYPE nativehistogram histogram # TYPE nativehistogram histogram
nativehistogram {"sample_count":24,"sample_sum":100,"schema":0,"zero_threshold":0.001,"zero_count":4,"positive_span":[{"offset":0,"length":2},{"offset":1,"length":2}],"negative_span":[{"offset":0,"length":2},{"offset":1,"length":2}],"positive_delta":[2,1,-2,3],"negative_delta":[2,1,-2,3]}` nativehistogram {sample_count:24,sample_sum:100,schema:0,zero_threshold:0.001,zero_count:4,positive_span:[0:2,1:2],negative_span:[0:2,1:2],positive_delta:[2,1,-2,3],negative_delta:[2,1,-2,3]}`
input += "\n# HELP metric foo\x00bar" input += "\n# HELP metric foo\x00bar"
input += "\nnull_byte_metric{a=\"abc\x00\"} 1" input += "\nnull_byte_metric{a=\"abc\x00\"} 1"
@ -645,7 +645,7 @@ func TestOMNullByteHandling(t *testing.T) {
}, },
{ {
input: "a{b\x00=\"hiih\"} 1", input: "a{b\x00=\"hiih\"} 1",
err: "expected equal, got \"\\x00\" (\"INVALID\") while parsing: \"a{b\\x00\"", err: "expected equal, got \"\\x00\" (\"INVALID\") while parsing: \"a{b\\x00\"",
}, },
{ {
input: "a\x00{b=\"ddd\"} 1", input: "a\x00{b=\"ddd\"} 1",