textparse: Optimized protobuf parser with custom streaming unmarshal. (#15731)

* textparse: Optimized protobuf parser with custom streaming decoder.

Signed-off-by: bwplotka <bwplotka@gmail.com>

Update model/textparse/protobufparse.go

Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Addressing comments.

Signed-off-by: bwplotka <bwplotka@gmail.com>

decoder: reuse histograms and summaries.

Signed-off-by: bwplotka <bwplotka@gmail.com>

optimize help returning (5% of mem utilization).

Signed-off-by: bwplotka <bwplotka@gmail.com>

Apply suggestions from code review

Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Update prompb/io/prometheus/client/decoder.go

Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Fix build.

Signed-off-by: bwplotka <bwplotka@gmail.com>

* Update model/textparse/protobufparse.go

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

---------

Signed-off-by: bwplotka <bwplotka@gmail.com>
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2025-02-13 11:38:35 +01:00 committed by GitHub
parent 9b4c8f6be2
commit 733a5e9eb4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 1183 additions and 200 deletions

View file

@ -19,6 +19,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
@ -210,6 +211,17 @@ func requireEntries(t *testing.T, exp, got []parsedEntry) {
t.Helper()
testutil.RequireEqualWithOptions(t, exp, got, []cmp.Option{
// We reuse slices so we sometimes have empty vs nil differences
// we need to ignore with cmpopts.EquateEmpty().
// However we have to filter out labels, as only
// one comparer per type has to be specified,
// and RequireEqualWithOptions uses
// cmp.Comparer(labels.Equal).
cmp.FilterValues(func(x, y any) bool {
_, xIsLabels := x.(labels.Labels)
_, yIsLabels := y.(labels.Labels)
return !xIsLabels && !yIsLabels
}, cmpopts.EquateEmpty()),
cmp.AllowUnexported(parsedEntry{}),
})
}
@ -230,15 +242,20 @@ func testParse(t *testing.T, p Parser) (ret []parsedEntry) {
case EntryInvalid:
t.Fatal("entry invalid not expected")
case EntrySeries, EntryHistogram:
var ts *int64
if et == EntrySeries {
m, got.t, got.v = p.Series()
got.m = string(m)
m, ts, got.v = p.Series()
} else {
m, got.t, got.shs, got.fhs = p.Histogram()
got.m = string(m)
m, ts, got.shs, got.fhs = p.Histogram()
}
if ts != nil {
// TODO(bwplotka): Change to 0 in the interface for set check to
// avoid pointer mangling.
got.t = int64p(*ts)
}
got.m = string(m)
p.Labels(&got.lset)
// Parser reuses int pointer.
if ct := p.CreatedTimestamp(); ct != nil {
got.ct = int64p(*ct)

View file

@ -931,7 +931,7 @@ func createTestPromHistogram() string {
return `# HELP test_histogram1 Test histogram 1
# TYPE test_histogram1 histogram
test_histogram1_count 175 1234568
test_histogram1_sum 0.0008280461746287094 1234768
test_histogram1_sum 0.0008280461746287094 1234568
test_histogram1_bucket{le="-0.0004899999999999998"} 2 1234568
test_histogram1_bucket{le="-0.0003899999999999998"} 4 1234568
test_histogram1_bucket{le="-0.0002899999999999998"} 16 1234568

View file

@ -502,6 +502,10 @@ func yoloString(b []byte) string {
return unsafe.String(unsafe.SliceData(b), len(b))
}
func yoloBytes(b string) []byte {
return unsafe.Slice(unsafe.StringData(b), len(b))
}
func parseFloat(s string) (float64, error) {
// Keep to pre-Go 1.13 float formats.
if strings.ContainsAny(s, "pP_") {

View file

@ -15,7 +15,6 @@ package textparse
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
@ -25,7 +24,6 @@ import (
"sync"
"unicode/utf8"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/prometheus/common/model"
@ -45,24 +43,24 @@ var floatFormatBufPool = sync.Pool{
},
}
// ProtobufParser is a very inefficient way of unmarshaling the old Prometheus
// protobuf format and then present it as it if were parsed by a
// Prometheus-2-style text parser. This is only done so that we can easily plug
// in the protobuf format into Prometheus 2. For future use (with the final
// format that will be used for native histograms), we have to revisit the
// parsing. A lot of the efficiency tricks of the Prometheus-2-style parsing
// could be used in a similar fashion (byte-slice pointers into the raw
// payload), which requires some hand-coded protobuf handling. But the current
// parsers all expect the full series name (metric name plus label pairs) as one
// string, which is not how things are represented in the protobuf format. If
// the re-arrangement work is actually causing problems (which has to be seen),
// that expectation needs to be changed.
// ProtobufParser parses the old Prometheus protobuf format and present it
// as the text-style textparse.Parser interface.
//
// It uses a tailored streaming protobuf dto.MetricStreamingDecoder that
// reuses internal protobuf structs and allows direct unmarshalling to Prometheus
// types like labels.
type ProtobufParser struct {
in []byte // The input to parse.
inPos int // Position within the input.
metricPos int // Position within Metric slice.
dec *dto.MetricStreamingDecoder
// Used for both the string returned by Series and Histogram, as well as,
// metric family for Type, Unit and Help.
entryBytes *bytes.Buffer
lset labels.Labels
builder labels.ScratchBuilder // Held here to reduce allocations when building Labels.
// fieldPos is the position within a Summary or (legacy) Histogram. -2
// is the count. -1 is the sum. Otherwise it is the index within
// is the count. -1 is the sum. Otherwise, it is the index within
// quantiles/buckets.
fieldPos int
fieldsDone bool // true if no more fields of a Summary or (legacy) Histogram to be processed.
@ -78,27 +76,20 @@ type ProtobufParser struct {
// that we have to decode the next MetricFamily.
state Entry
builder labels.ScratchBuilder // held here to reduce allocations when building Labels
mf *dto.MetricFamily
// Whether to also parse a classic histogram that is also present as a
// native histogram.
parseClassicHistograms bool
// The following are just shenanigans to satisfy the Parser interface.
metricBytes *bytes.Buffer // A somewhat fluid representation of the current metric.
}
// NewProtobufParser returns a parser for the payload in the byte slice.
func NewProtobufParser(b []byte, parseClassicHistograms bool, st *labels.SymbolTable) Parser {
return &ProtobufParser{
in: b,
dec: dto.NewMetricStreamingDecoder(b),
entryBytes: &bytes.Buffer{},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16), // TODO(bwplotka): Try base builder.
state: EntryInvalid,
mf: &dto.MetricFamily{},
metricBytes: &bytes.Buffer{},
parseClassicHistograms: parseClassicHistograms,
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
}
}
@ -106,19 +97,18 @@ func NewProtobufParser(b []byte, parseClassicHistograms bool, st *labels.SymbolT
// value, the timestamp if set, and the value of the current sample.
func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
var (
m = p.mf.GetMetric()[p.metricPos]
ts = m.GetTimestampMs()
ts = &p.dec.TimestampMs // To save memory allocations, never nil.
v float64
)
switch p.mf.GetType() {
switch p.dec.GetType() {
case dto.MetricType_COUNTER:
v = m.GetCounter().GetValue()
v = p.dec.GetCounter().GetValue()
case dto.MetricType_GAUGE:
v = m.GetGauge().GetValue()
v = p.dec.GetGauge().GetValue()
case dto.MetricType_UNTYPED:
v = m.GetUntyped().GetValue()
v = p.dec.GetUntyped().GetValue()
case dto.MetricType_SUMMARY:
s := m.GetSummary()
s := p.dec.GetSummary()
switch p.fieldPos {
case -2:
v = float64(s.GetSampleCount())
@ -133,7 +123,7 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
}
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
// This should only happen for a classic histogram.
h := m.GetHistogram()
h := p.dec.GetHistogram()
switch p.fieldPos {
case -2:
v = h.GetSampleCountFloat()
@ -159,8 +149,8 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
default:
panic("encountered unexpected metric type, this is a bug")
}
if ts != 0 {
return p.metricBytes.Bytes(), &ts, v
if *ts != 0 {
return p.entryBytes.Bytes(), ts, v
}
// TODO(beorn7): We assume here that ts==0 means no timestamp. That's
// not true in general, but proto3 originally has no distinction between
@ -171,7 +161,7 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
// away from gogo-protobuf to an actively maintained protobuf
// implementation. Once that's done, we can simply use the `optional`
// keyword and check for the unset state explicitly.
return p.metricBytes.Bytes(), nil, v
return p.entryBytes.Bytes(), nil, v
}
// Histogram returns the bytes of a series with a native histogram as a value,
@ -186,47 +176,56 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
// value.
func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) {
var (
m = p.mf.GetMetric()[p.metricPos]
ts = m.GetTimestampMs()
h = m.GetHistogram()
ts = &p.dec.TimestampMs // To save memory allocations, never nil.
h = p.dec.GetHistogram()
)
if p.parseClassicHistograms && len(h.GetBucket()) > 0 {
p.redoClassic = true
}
if h.GetSampleCountFloat() > 0 || h.GetZeroCountFloat() > 0 {
// It is a float histogram.
fh := histogram.FloatHistogram{
Count: h.GetSampleCountFloat(),
Sum: h.GetSampleSum(),
ZeroThreshold: h.GetZeroThreshold(),
ZeroCount: h.GetZeroCountFloat(),
Schema: h.GetSchema(),
Count: h.GetSampleCountFloat(),
Sum: h.GetSampleSum(),
ZeroThreshold: h.GetZeroThreshold(),
ZeroCount: h.GetZeroCountFloat(),
Schema: h.GetSchema(),
// Decoder reuses slices, so we need to copy.
PositiveSpans: make([]histogram.Span, len(h.GetPositiveSpan())),
PositiveBuckets: h.GetPositiveCount(),
PositiveBuckets: make([]float64, len(h.GetPositiveCount())),
NegativeSpans: make([]histogram.Span, len(h.GetNegativeSpan())),
NegativeBuckets: h.GetNegativeCount(),
NegativeBuckets: make([]float64, len(h.GetNegativeCount())),
}
for i, span := range h.GetPositiveSpan() {
fh.PositiveSpans[i].Offset = span.GetOffset()
fh.PositiveSpans[i].Length = span.GetLength()
}
for i, cnt := range h.GetPositiveCount() {
fh.PositiveBuckets[i] = cnt
}
for i, span := range h.GetNegativeSpan() {
fh.NegativeSpans[i].Offset = span.GetOffset()
fh.NegativeSpans[i].Length = span.GetLength()
}
if p.mf.GetType() == dto.MetricType_GAUGE_HISTOGRAM {
for i, cnt := range h.GetNegativeCount() {
fh.NegativeBuckets[i] = cnt
}
if p.dec.GetType() == dto.MetricType_GAUGE_HISTOGRAM {
fh.CounterResetHint = histogram.GaugeType
}
fh.Compact(0)
if ts != 0 {
return p.metricBytes.Bytes(), &ts, nil, &fh
if *ts != 0 {
return p.entryBytes.Bytes(), ts, nil, &fh
}
// Nasty hack: Assume that ts==0 means no timestamp. That's not true in
// general, but proto3 has no distinction between unset and
// default. Need to avoid in the final format.
return p.metricBytes.Bytes(), nil, nil, &fh
return p.entryBytes.Bytes(), nil, nil, &fh
}
// TODO(bwplotka): Create sync.Pool for those structs.
sh := histogram.Histogram{
Count: h.GetSampleCount(),
Sum: h.GetSampleSum(),
@ -234,41 +233,47 @@ func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *his
ZeroCount: h.GetZeroCount(),
Schema: h.GetSchema(),
PositiveSpans: make([]histogram.Span, len(h.GetPositiveSpan())),
PositiveBuckets: h.GetPositiveDelta(),
PositiveBuckets: make([]int64, len(h.GetPositiveDelta())),
NegativeSpans: make([]histogram.Span, len(h.GetNegativeSpan())),
NegativeBuckets: h.GetNegativeDelta(),
NegativeBuckets: make([]int64, len(h.GetNegativeDelta())),
}
for i, span := range h.GetPositiveSpan() {
sh.PositiveSpans[i].Offset = span.GetOffset()
sh.PositiveSpans[i].Length = span.GetLength()
}
for i, cnt := range h.GetPositiveDelta() {
sh.PositiveBuckets[i] = cnt
}
for i, span := range h.GetNegativeSpan() {
sh.NegativeSpans[i].Offset = span.GetOffset()
sh.NegativeSpans[i].Length = span.GetLength()
}
if p.mf.GetType() == dto.MetricType_GAUGE_HISTOGRAM {
for i, cnt := range h.GetNegativeDelta() {
sh.NegativeBuckets[i] = cnt
}
if p.dec.GetType() == dto.MetricType_GAUGE_HISTOGRAM {
sh.CounterResetHint = histogram.GaugeType
}
sh.Compact(0)
if ts != 0 {
return p.metricBytes.Bytes(), &ts, &sh, nil
if *ts != 0 {
return p.entryBytes.Bytes(), ts, &sh, nil
}
return p.metricBytes.Bytes(), nil, &sh, nil
return p.entryBytes.Bytes(), nil, &sh, nil
}
// Help returns the metric name and help text in the current entry.
// Must only be called after Next returned a help entry.
// The returned byte slices become invalid after the next call to Next.
func (p *ProtobufParser) Help() ([]byte, []byte) {
return p.metricBytes.Bytes(), []byte(p.mf.GetHelp())
return p.entryBytes.Bytes(), yoloBytes(p.dec.GetHelp())
}
// Type returns the metric name and type in the current entry.
// Must only be called after Next returned a type entry.
// The returned byte slices become invalid after the next call to Next.
func (p *ProtobufParser) Type() ([]byte, model.MetricType) {
n := p.metricBytes.Bytes()
switch p.mf.GetType() {
n := p.entryBytes.Bytes()
switch p.dec.GetType() {
case dto.MetricType_COUNTER:
return n, model.MetricTypeCounter
case dto.MetricType_GAUGE:
@ -287,7 +292,7 @@ func (p *ProtobufParser) Type() ([]byte, model.MetricType) {
// Must only be called after Next returned a unit entry.
// The returned byte slices become invalid after the next call to Next.
func (p *ProtobufParser) Unit() ([]byte, []byte) {
return p.metricBytes.Bytes(), []byte(p.mf.GetUnit())
return p.entryBytes.Bytes(), []byte(p.dec.GetUnit())
}
// Comment always returns nil because comments aren't supported by the protobuf
@ -297,21 +302,8 @@ func (p *ProtobufParser) Comment() []byte {
}
// Labels writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
func (p *ProtobufParser) Labels(l *labels.Labels) {
p.builder.Reset()
p.builder.Add(labels.MetricName, p.getMagicName())
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
p.builder.Add(lp.GetName(), lp.GetValue())
}
if needed, name, value := p.getMagicLabel(); needed {
p.builder.Add(name, value)
}
// Sort labels to maintain the sorted labels invariant.
p.builder.Sort()
*l = p.builder.Labels()
*l = p.lset.Copy()
}
// Exemplar writes the exemplar of the current sample into the passed
@ -324,15 +316,14 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
// We only ever return one exemplar per (non-native-histogram) series.
return false
}
m := p.mf.GetMetric()[p.metricPos]
var exProto *dto.Exemplar
switch p.mf.GetType() {
switch p.dec.GetType() {
case dto.MetricType_COUNTER:
exProto = m.GetCounter().GetExemplar()
exProto = p.dec.GetCounter().GetExemplar()
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
isClassic := p.state == EntrySeries
if !isClassic && len(m.GetHistogram().GetExemplars()) > 0 {
exs := m.GetHistogram().GetExemplars()
if !isClassic && len(p.dec.GetHistogram().GetExemplars()) > 0 {
exs := p.dec.GetHistogram().GetExemplars()
for p.exemplarPos < len(exs) {
exProto = exs[p.exemplarPos]
p.exemplarPos++
@ -344,7 +335,7 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
return false
}
} else {
bb := m.GetHistogram().GetBucket()
bb := p.dec.GetHistogram().GetBucket()
if p.fieldPos < 0 {
if isClassic {
return false // At _count or _sum.
@ -392,13 +383,13 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
// invalid (as timestamp e.g. negative value) on counters, summaries or histograms.
func (p *ProtobufParser) CreatedTimestamp() *int64 {
var ct *types.Timestamp
switch p.mf.GetType() {
switch p.dec.GetType() {
case dto.MetricType_COUNTER:
ct = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp()
ct = p.dec.GetCounter().GetCreatedTimestamp()
case dto.MetricType_SUMMARY:
ct = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp()
ct = p.dec.GetSummary().GetCreatedTimestamp()
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
ct = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp()
ct = p.dec.GetHistogram().GetCreatedTimestamp()
default:
}
ctAsTime, err := types.TimestampFromProto(ct)
@ -416,31 +407,34 @@ func (p *ProtobufParser) CreatedTimestamp() *int64 {
func (p *ProtobufParser) Next() (Entry, error) {
p.exemplarReturned = false
switch p.state {
// Invalid state occurs on:
// * First Next() call.
// * Recursive call that tells Next to move to the next metric family.
case EntryInvalid:
p.metricPos = 0
p.exemplarPos = 0
p.fieldPos = -2
n, err := readDelimited(p.in[p.inPos:], p.mf)
p.inPos += n
if err != nil {
if err := p.dec.NextMetricFamily(); err != nil {
return p.state, err
}
// Skip empty metric families.
if len(p.mf.GetMetric()) == 0 {
return p.Next()
if err := p.dec.NextMetric(); err != nil {
// Skip empty metric families.
if errors.Is(err, io.EOF) {
return p.Next()
}
return EntryInvalid, err
}
// We are at the beginning of a metric family. Put only the name
// into metricBytes and validate only name, help, and type for now.
name := p.mf.GetName()
// into entryBytes and validate only name, help, and type for now.
name := p.dec.GetName()
if !model.IsValidMetricName(model.LabelValue(name)) {
return EntryInvalid, fmt.Errorf("invalid metric name: %s", name)
}
if help := p.mf.GetHelp(); !utf8.ValidString(help) {
if help := p.dec.GetHelp(); !utf8.ValidString(help) {
return EntryInvalid, fmt.Errorf("invalid help for metric %q: %s", name, help)
}
switch p.mf.GetType() {
switch p.dec.GetType() {
case dto.MetricType_COUNTER,
dto.MetricType_GAUGE,
dto.MetricType_HISTOGRAM,
@ -449,11 +443,11 @@ func (p *ProtobufParser) Next() (Entry, error) {
dto.MetricType_UNTYPED:
// All good.
default:
return EntryInvalid, fmt.Errorf("unknown metric type for metric %q: %s", name, p.mf.GetType())
return EntryInvalid, fmt.Errorf("unknown metric type for metric %q: %s", name, p.dec.GetType())
}
unit := p.mf.GetUnit()
unit := p.dec.GetUnit()
if len(unit) > 0 {
if p.mf.GetType() == dto.MetricType_COUNTER && strings.HasSuffix(name, "_total") {
if p.dec.GetType() == dto.MetricType_COUNTER && strings.HasSuffix(name, "_total") {
if !strings.HasSuffix(name[:len(name)-6], unit) || len(name)-6 < len(unit)+1 || name[len(name)-6-len(unit)-1] != '_' {
return EntryInvalid, fmt.Errorf("unit %q not a suffix of counter %q", unit, name)
}
@ -461,12 +455,11 @@ func (p *ProtobufParser) Next() (Entry, error) {
return EntryInvalid, fmt.Errorf("unit %q not a suffix of metric %q", unit, name)
}
}
p.metricBytes.Reset()
p.metricBytes.WriteString(name)
p.entryBytes.Reset()
p.entryBytes.WriteString(name)
p.state = EntryHelp
case EntryHelp:
if p.mf.Unit != "" {
if p.dec.Unit != "" {
p.state = EntryUnit
} else {
p.state = EntryType
@ -474,48 +467,78 @@ func (p *ProtobufParser) Next() (Entry, error) {
case EntryUnit:
p.state = EntryType
case EntryType:
t := p.mf.GetType()
t := p.dec.GetType()
if (t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM) &&
isNativeHistogram(p.mf.GetMetric()[0].GetHistogram()) {
isNativeHistogram(p.dec.GetHistogram()) {
p.state = EntryHistogram
} else {
p.state = EntrySeries
}
if err := p.updateMetricBytes(); err != nil {
if err := p.onSeriesOrHistogramUpdate(); err != nil {
return EntryInvalid, err
}
case EntryHistogram, EntrySeries:
if p.redoClassic {
p.redoClassic = false
p.state = EntrySeries
p.fieldPos = -3
p.fieldsDone = false
}
t := p.mf.GetType()
if p.state == EntrySeries && !p.fieldsDone &&
(t == dto.MetricType_SUMMARY ||
t == dto.MetricType_HISTOGRAM ||
t == dto.MetricType_GAUGE_HISTOGRAM) {
p.fieldPos++
} else {
p.metricPos++
case EntrySeries:
// Potentially a second series in the metric family.
t := p.dec.GetType()
if t == dto.MetricType_SUMMARY ||
t == dto.MetricType_HISTOGRAM ||
t == dto.MetricType_GAUGE_HISTOGRAM {
// Non-trivial series (complex metrics, with magic suffixes).
// Did we iterate over all the classic representations fields?
// NOTE: p.fieldsDone is updated on p.onSeriesOrHistogramUpdate.
if !p.fieldsDone {
// Still some fields to iterate over.
p.fieldPos++
if err := p.onSeriesOrHistogramUpdate(); err != nil {
return EntryInvalid, err
}
return p.state, nil
}
// Reset histogram fields.
p.fieldPos = -2
p.fieldsDone = false
p.exemplarPos = 0
// If this is a metric family containing native
// histograms, we have to switch back to native
// histograms after parsing a classic histogram.
if p.state == EntrySeries &&
(t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM) &&
isNativeHistogram(p.mf.GetMetric()[0].GetHistogram()) {
// histograms, it means we are here thanks to redoClassic state.
// Return to native histograms for the consistent flow.
if (t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM) &&
isNativeHistogram(p.dec.GetHistogram()) {
p.state = EntryHistogram
}
}
if p.metricPos >= len(p.mf.GetMetric()) {
p.state = EntryInvalid
return p.Next()
// Is there another series?
if err := p.dec.NextMetric(); err != nil {
if errors.Is(err, io.EOF) {
p.state = EntryInvalid
return p.Next()
}
return EntryInvalid, err
}
if err := p.updateMetricBytes(); err != nil {
if err := p.onSeriesOrHistogramUpdate(); err != nil {
return EntryInvalid, err
}
case EntryHistogram:
// Was Histogram() called and parseClassicHistograms is true?
if p.redoClassic {
p.redoClassic = false
p.fieldPos = -3
p.fieldsDone = false
p.state = EntrySeries
return p.Next() // Switch to classic histogram.
}
// Is there another series?
if err := p.dec.NextMetric(); err != nil {
if errors.Is(err, io.EOF) {
p.state = EntryInvalid
return p.Next()
}
return EntryInvalid, err
}
if err := p.onSeriesOrHistogramUpdate(); err != nil {
return EntryInvalid, err
}
default:
@ -524,30 +547,39 @@ func (p *ProtobufParser) Next() (Entry, error) {
return p.state, nil
}
func (p *ProtobufParser) updateMetricBytes() error {
b := p.metricBytes
b.Reset()
b.WriteString(p.getMagicName())
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
b.WriteByte(model.SeparatorByte)
n := lp.GetName()
if !model.LabelName(n).IsValid() {
return fmt.Errorf("invalid label name: %s", n)
}
b.WriteString(n)
b.WriteByte(model.SeparatorByte)
v := lp.GetValue()
if !utf8.ValidString(v) {
return fmt.Errorf("invalid label value: %s", v)
}
b.WriteString(v)
// onSeriesOrHistogramUpdate updates internal state before returning
// a series or histogram. It updates:
// * p.lset.
// * p.entryBytes.
// * p.fieldsDone depending on p.fieldPos.
func (p *ProtobufParser) onSeriesOrHistogramUpdate() error {
p.builder.Reset()
p.builder.Add(labels.MetricName, p.getMagicName())
if err := p.dec.Label(&p.builder); err != nil {
return err
}
if needed, n, v := p.getMagicLabel(); needed {
b.WriteByte(model.SeparatorByte)
b.WriteString(n)
b.WriteByte(model.SeparatorByte)
b.WriteString(v)
if needed, name, value := p.getMagicLabel(); needed {
p.builder.Add(name, value)
}
// Sort labels to maintain the sorted labels invariant.
p.builder.Sort()
p.builder.Overwrite(&p.lset)
// entryBytes has to be unique for each series.
p.entryBytes.Reset()
p.lset.Range(func(l labels.Label) {
if l.Name == labels.MetricName {
p.entryBytes.WriteString(l.Value)
return
}
p.entryBytes.WriteByte(model.SeparatorByte)
p.entryBytes.WriteString(l.Name)
p.entryBytes.WriteByte(model.SeparatorByte)
p.entryBytes.WriteString(l.Value)
})
return nil
}
@ -555,36 +587,37 @@ func (p *ProtobufParser) updateMetricBytes() error {
// ("_count", "_sum", "_bucket") if needed according to the current parser
// state.
func (p *ProtobufParser) getMagicName() string {
t := p.mf.GetType()
t := p.dec.GetType()
if p.state == EntryHistogram || (t != dto.MetricType_HISTOGRAM && t != dto.MetricType_GAUGE_HISTOGRAM && t != dto.MetricType_SUMMARY) {
return p.mf.GetName()
return p.dec.GetName()
}
if p.fieldPos == -2 {
return p.mf.GetName() + "_count"
return p.dec.GetName() + "_count"
}
if p.fieldPos == -1 {
return p.mf.GetName() + "_sum"
return p.dec.GetName() + "_sum"
}
if t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM {
return p.mf.GetName() + "_bucket"
return p.dec.GetName() + "_bucket"
}
return p.mf.GetName()
return p.dec.GetName()
}
// getMagicLabel returns if a magic label ("quantile" or "le") is needed and, if
// so, its name and value. It also sets p.fieldsDone if applicable.
func (p *ProtobufParser) getMagicLabel() (bool, string, string) {
// Native histogram or _count and _sum series.
if p.state == EntryHistogram || p.fieldPos < 0 {
return false, "", ""
}
switch p.mf.GetType() {
switch p.dec.GetType() {
case dto.MetricType_SUMMARY:
qq := p.mf.GetMetric()[p.metricPos].GetSummary().GetQuantile()
qq := p.dec.GetSummary().GetQuantile()
q := qq[p.fieldPos]
p.fieldsDone = p.fieldPos == len(qq)-1
return true, model.QuantileLabel, formatOpenMetricsFloat(q.GetQuantile())
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
bb := p.mf.GetMetric()[p.metricPos].GetHistogram().GetBucket()
bb := p.dec.GetHistogram().GetBucket()
if p.fieldPos >= len(bb) {
p.fieldsDone = true
return true, model.BucketLabel, "+Inf"
@ -596,29 +629,6 @@ func (p *ProtobufParser) getMagicLabel() (bool, string, string) {
return false, "", ""
}
var errInvalidVarint = errors.New("protobufparse: invalid varint encountered")
// readDelimited is essentially doing what the function of the same name in
// github.com/matttproud/golang_protobuf_extensions/pbutil is doing, but it is
// specific to a MetricFamily, utilizes the more efficient gogo-protobuf
// unmarshaling, and acts on a byte slice directly without any additional
// staging buffers.
func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) {
if len(b) == 0 {
return 0, io.EOF
}
messageLength, varIntLength := proto.DecodeVarint(b)
if varIntLength == 0 || varIntLength > binary.MaxVarintLen32 {
return 0, errInvalidVarint
}
totalLength := varIntLength + int(messageLength)
if totalLength > len(b) {
return 0, fmt.Errorf("protobufparse: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b))
}
mf.Reset()
return totalLength, mf.Unmarshal(b[varIntLength:totalLength])
}
// formatOpenMetricsFloat works like the usual Go string formatting of a float
// but appends ".0" if the resulting number would otherwise contain neither a
// "." nor an "e".

View file

@ -1246,7 +1246,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.5",
m: "rpc_durations_seconds\xffquantile\xff0.5\xffservice\xffexponential",
v: 6.442786329648548e-07,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
@ -1255,7 +1255,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.9",
m: "rpc_durations_seconds\xffquantile\xff0.9\xffservice\xffexponential",
v: 1.9435742936658396e-06,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
@ -1264,7 +1264,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.99",
m: "rpc_durations_seconds\xffquantile\xff0.99\xffservice\xffexponential",
v: 4.0471608667037015e-06,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
@ -2199,7 +2199,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.5",
m: "rpc_durations_seconds\xffquantile\xff0.5\xffservice\xffexponential",
v: 6.442786329648548e-07,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
@ -2208,7 +2208,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.9",
m: "rpc_durations_seconds\xffquantile\xff0.9\xffservice\xffexponential",
v: 1.9435742936658396e-06,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
@ -2217,7 +2217,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.99",
m: "rpc_durations_seconds\xffquantile\xff0.99\xffservice\xffexponential",
v: 4.0471608667037015e-06,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",

View file

@ -0,0 +1,780 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package io_prometheus_client //nolint:revive
import (
"encoding/binary"
"errors"
"fmt"
"io"
"unicode/utf8"
"unsafe"
proto "github.com/gogo/protobuf/proto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)
type MetricStreamingDecoder struct {
in []byte
inPos int
// TODO(bwplotka): Switch to generator/plugin that won't have those fields accessible e.g. OpaqueAPI
// We leverage the fact those two don't collide.
*MetricFamily // Without Metric, guarded by overridden GetMetric method.
*Metric // Without Label, guarded by overridden GetLabel method.
mfData []byte
metrics []pos
metricIndex int
mData []byte
labels []pos
}
// NewMetricStreamingDecoder returns a Go iterator that unmarshals given protobuf bytes one
// metric family and metric at the time, allowing efficient streaming.
//
// Do not modify MetricStreamingDecoder between iterations as it's reused to save allocations.
// GetGauge, GetCounter, etc are also cached, which means GetGauge will work for counter
// if previously gauge was parsed. It's up to the caller to use Type to decide what
// method to use when checking the value.
//
// TODO(bwplotka): io.Reader approach is possible too, but textparse has access to whole scrape for now.
func NewMetricStreamingDecoder(data []byte) *MetricStreamingDecoder {
return &MetricStreamingDecoder{
in: data,
MetricFamily: &MetricFamily{},
Metric: &Metric{},
metrics: make([]pos, 0, 100),
}
}
var errInvalidVarint = errors.New("clientpb: invalid varint encountered")
func (m *MetricStreamingDecoder) NextMetricFamily() error {
b := m.in[m.inPos:]
if len(b) == 0 {
return io.EOF
}
messageLength, varIntLength := proto.DecodeVarint(b) // TODO(bwplotka): Get rid of gogo.
if varIntLength == 0 || varIntLength > binary.MaxVarintLen32 {
return errInvalidVarint
}
totalLength := varIntLength + int(messageLength)
if totalLength > len(b) {
return fmt.Errorf("clientpb: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b))
}
m.resetMetricFamily()
m.mfData = b[varIntLength:totalLength]
m.inPos += totalLength
return m.MetricFamily.unmarshalWithoutMetrics(m, m.mfData)
}
// resetMetricFamily resets all the fields in m to equal the zero value, but re-using slice memory.
func (m *MetricStreamingDecoder) resetMetricFamily() {
m.metrics = m.metrics[:0]
m.metricIndex = 0
m.MetricFamily.Reset()
}
func (m *MetricStreamingDecoder) NextMetric() error {
if m.metricIndex >= len(m.metrics) {
return io.EOF
}
m.resetMetric()
m.mData = m.mfData[m.metrics[m.metricIndex].start:m.metrics[m.metricIndex].end]
if err := m.Metric.unmarshalWithoutLabels(m, m.mData); err != nil {
return err
}
m.metricIndex++
return nil
}
// resetMetric resets all the fields in m to equal the zero value, but re-using slices memory.
func (m *MetricStreamingDecoder) resetMetric() {
m.labels = m.labels[:0]
m.TimestampMs = 0
// TODO(bwplotka): Autogenerate reset functions.
if m.Metric.Counter != nil {
m.Metric.Counter.Value = 0
m.Metric.Counter.CreatedTimestamp = nil
m.Metric.Counter.Exemplar = nil
}
if m.Metric.Gauge != nil {
m.Metric.Gauge.Value = 0
}
if m.Metric.Histogram != nil {
m.Metric.Histogram.SampleCount = 0
m.Metric.Histogram.SampleCountFloat = 0
m.Metric.Histogram.SampleSum = 0
m.Metric.Histogram.Bucket = m.Metric.Histogram.Bucket[:0]
m.Metric.Histogram.CreatedTimestamp = nil
m.Metric.Histogram.Schema = 0
m.Metric.Histogram.ZeroThreshold = 0
m.Metric.Histogram.ZeroCount = 0
m.Metric.Histogram.ZeroCountFloat = 0
m.Metric.Histogram.NegativeSpan = m.Metric.Histogram.NegativeSpan[:0]
m.Metric.Histogram.NegativeDelta = m.Metric.Histogram.NegativeDelta[:0]
m.Metric.Histogram.NegativeCount = m.Metric.Histogram.NegativeCount[:0]
m.Metric.Histogram.PositiveSpan = m.Metric.Histogram.PositiveSpan[:0]
m.Metric.Histogram.PositiveDelta = m.Metric.Histogram.PositiveDelta[:0]
m.Metric.Histogram.PositiveCount = m.Metric.Histogram.PositiveCount[:0]
m.Metric.Histogram.Exemplars = m.Metric.Histogram.Exemplars[:0]
}
if m.Metric.Summary != nil {
m.Metric.Summary.SampleCount = 0
m.Metric.Summary.SampleSum = 0
m.Metric.Summary.Quantile = m.Metric.Summary.Quantile[:0]
m.Metric.Summary.CreatedTimestamp = nil
}
}
func (m *MetricStreamingDecoder) GetMetric() {
panic("don't use GetMetric, use Metric directly")
}
func (m *MetricStreamingDecoder) GetLabel() {
panic("don't use GetLabel, use Label instead")
}
// Label parses labels into labels scratch builder. Metric name is missing
// given the protobuf metric model and has to be deduced from the metric family name.
// TODO: The method name intentionally hide MetricStreamingDecoder.Metric.Label
// field to avoid direct use (it's not parsed). In future generator will generate
// structs tailored for streaming decoding.
func (m *MetricStreamingDecoder) Label(b *labels.ScratchBuilder) error {
for _, l := range m.labels {
if err := parseLabel(m.mData[l.start:l.end], b); err != nil {
return err
}
}
return nil
}
// parseLabels is essentially LabelPair.Unmarshal but directly adding into scratch builder
// and reusing strings.
func parseLabel(dAtA []byte, b *labels.ScratchBuilder) error {
var name, value string
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return errors.New("proto: LabelPair: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LabelPair: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
name = yoloString(dAtA[iNdEx:postIndex])
if !model.LabelName(name).IsValid() {
return fmt.Errorf("invalid label name: %s", name)
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
value = yoloString(dAtA[iNdEx:postIndex])
if !utf8.ValidString(value) {
return fmt.Errorf("invalid label value: %s", value)
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMetrics(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetrics
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
b.Add(name, value)
return nil
}
func yoloString(b []byte) string {
return unsafe.String(unsafe.SliceData(b), len(b))
}
type pos struct {
start, end int
}
func (m *Metric) unmarshalWithoutLabels(p *MetricStreamingDecoder, dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return errors.New("proto: Metric: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Metric: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Label", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
p.labels = append(p.labels, pos{start: iNdEx, end: postIndex})
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Gauge", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Gauge == nil {
m.Gauge = &Gauge{}
}
if err := m.Gauge.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Counter", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Counter == nil {
m.Counter = &Counter{}
}
if err := m.Counter.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Summary", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Summary == nil {
m.Summary = &Summary{}
}
if err := m.Summary.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Untyped", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Untyped == nil {
m.Untyped = &Untyped{}
}
if err := m.Untyped.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 6:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field TimestampMs", wireType)
}
m.TimestampMs = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.TimestampMs |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Histogram", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Histogram == nil {
m.Histogram = &Histogram{}
}
if err := m.Histogram.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMetrics(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetrics
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *MetricFamily) unmarshalWithoutMetrics(buf *MetricStreamingDecoder, dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return errors.New("proto: MetricFamily: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MetricFamily: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Help", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Help = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
}
m.Type = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Type |= MetricType(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Metric", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
buf.metrics = append(buf.metrics, pos{start: iNdEx, end: postIndex})
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Unit", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Unit = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMetrics(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetrics
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}

View file

@ -0,0 +1,171 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package io_prometheus_client //nolint:revive
import (
"bytes"
"encoding/binary"
"errors"
"io"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
)
const (
testGauge = `name: "go_build_info"
help: "Build information about the main Go module."
type: GAUGE
metric: <
label: <
name: "checksum"
value: ""
>
label: <
name: "path"
value: "github.com/prometheus/client_golang"
>
label: <
name: "version"
value: "(devel)"
>
gauge: <
value: 1
>
>
metric: <
label: <
name: "checksum"
value: ""
>
label: <
name: "path"
value: "github.com/prometheus/prometheus"
>
label: <
name: "version"
value: "v3.0.0"
>
gauge: <
value: 2
>
>
`
testCounter = `name: "go_memstats_alloc_bytes_total"
help: "Total number of bytes allocated, even if freed."
type: COUNTER
unit: "bytes"
metric: <
counter: <
value: 1.546544e+06
exemplar: <
label: <
name: "dummyID"
value: "42"
>
value: 12
timestamp: <
seconds: 1625851151
nanos: 233181499
>
>
>
>
`
)
func TestMetricStreamingDecoder(t *testing.T) {
varintBuf := make([]byte, binary.MaxVarintLen32)
buf := bytes.Buffer{}
for _, m := range []string{testGauge, testCounter} {
mf := &MetricFamily{}
require.NoError(t, proto.UnmarshalText(m, mf))
// From proto message to binary protobuf.
protoBuf, err := proto.Marshal(mf)
require.NoError(t, err)
// Write first length, then binary protobuf.
varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf)))
buf.Write(varintBuf[:varintLength])
buf.Write(protoBuf)
}
d := NewMetricStreamingDecoder(buf.Bytes())
require.NoError(t, d.NextMetricFamily())
nextFn := func() error {
for {
err := d.NextMetric()
if errors.Is(err, io.EOF) {
if err := d.NextMetricFamily(); err != nil {
return err
}
continue
}
return err
}
}
var firstMetricLset labels.Labels
{
require.NoError(t, nextFn())
require.Equal(t, "go_build_info", d.GetName())
require.Equal(t, "Build information about the main Go module.", d.GetHelp())
require.Equal(t, MetricType_GAUGE, d.GetType())
require.Equal(t, float64(1), d.GetGauge().GetValue())
b := labels.NewScratchBuilder(0)
require.NoError(t, d.Label(&b))
firstMetricLset = b.Labels()
require.Equal(t, `{checksum="", path="github.com/prometheus/client_golang", version="(devel)"}`, firstMetricLset.String())
}
{
require.NoError(t, nextFn())
require.Equal(t, "go_build_info", d.GetName())
require.Equal(t, "Build information about the main Go module.", d.GetHelp())
require.Equal(t, MetricType_GAUGE, d.GetType())
require.Equal(t, float64(2), d.GetGauge().GetValue())
b := labels.NewScratchBuilder(0)
require.NoError(t, d.Label(&b))
require.Equal(t, `{checksum="", path="github.com/prometheus/prometheus", version="v3.0.0"}`, b.Labels().String())
}
{
// Different mf now.
require.NoError(t, nextFn())
require.Equal(t, "go_memstats_alloc_bytes_total", d.GetName())
require.Equal(t, "Total number of bytes allocated, even if freed.", d.GetHelp())
require.Equal(t, "bytes", d.GetUnit())
require.Equal(t, MetricType_COUNTER, d.GetType())
require.Equal(t, 1.546544e+06, d.Metric.GetCounter().GetValue())
b := labels.NewScratchBuilder(0)
require.NoError(t, d.Label(&b))
require.Equal(t, `{}`, b.Labels().String())
}
require.Equal(t, io.EOF, nextFn())
// Expect labels and metricBytes to be static and reusable even after parsing.
require.Equal(t, `{checksum="", path="github.com/prometheus/client_golang", version="(devel)"}`, firstMetricLset.String())
}

View file

@ -1895,6 +1895,7 @@ func TestScrapeLoopAppend(t *testing.T) {
}
func requireEqual(t *testing.T, expected, actual interface{}, msgAndArgs ...interface{}) {
t.Helper()
testutil.RequireEqualWithOptions(t, expected, actual,
[]cmp.Option{cmp.Comparer(equalFloatSamples), cmp.AllowUnexported(histogramSample{})},
msgAndArgs...)