diff --git a/model/textparse/interface_test.go b/model/textparse/interface_test.go index 504a4917d9..a5ca12859e 100644 --- a/model/textparse/interface_test.go +++ b/model/textparse/interface_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -210,6 +211,17 @@ func requireEntries(t *testing.T, exp, got []parsedEntry) { t.Helper() testutil.RequireEqualWithOptions(t, exp, got, []cmp.Option{ + // We reuse slices so we sometimes have empty vs nil differences + // we need to ignore with cmpopts.EquateEmpty(). + // However we have to filter out labels, as only + // one comparer per type has to be specified, + // and RequireEqualWithOptions uses + // cmp.Comparer(labels.Equal). + cmp.FilterValues(func(x, y any) bool { + _, xIsLabels := x.(labels.Labels) + _, yIsLabels := y.(labels.Labels) + return !xIsLabels && !yIsLabels + }, cmpopts.EquateEmpty()), cmp.AllowUnexported(parsedEntry{}), }) } @@ -230,15 +242,20 @@ func testParse(t *testing.T, p Parser) (ret []parsedEntry) { case EntryInvalid: t.Fatal("entry invalid not expected") case EntrySeries, EntryHistogram: + var ts *int64 if et == EntrySeries { - m, got.t, got.v = p.Series() - got.m = string(m) + m, ts, got.v = p.Series() } else { - m, got.t, got.shs, got.fhs = p.Histogram() - got.m = string(m) + m, ts, got.shs, got.fhs = p.Histogram() } - + if ts != nil { + // TODO(bwplotka): Change to 0 in the interface for set check to + // avoid pointer mangling. + got.t = int64p(*ts) + } + got.m = string(m) p.Labels(&got.lset) + // Parser reuses int pointer. if ct := p.CreatedTimestamp(); ct != nil { got.ct = int64p(*ct) diff --git a/model/textparse/nhcbparse_test.go b/model/textparse/nhcbparse_test.go index 859bcc1cb7..f75c7f5677 100644 --- a/model/textparse/nhcbparse_test.go +++ b/model/textparse/nhcbparse_test.go @@ -931,7 +931,7 @@ func createTestPromHistogram() string { return `# HELP test_histogram1 Test histogram 1 # TYPE test_histogram1 histogram test_histogram1_count 175 1234568 -test_histogram1_sum 0.0008280461746287094 1234768 +test_histogram1_sum 0.0008280461746287094 1234568 test_histogram1_bucket{le="-0.0004899999999999998"} 2 1234568 test_histogram1_bucket{le="-0.0003899999999999998"} 4 1234568 test_histogram1_bucket{le="-0.0002899999999999998"} 16 1234568 diff --git a/model/textparse/promparse.go b/model/textparse/promparse.go index 844ab2f15d..f3f2571b00 100644 --- a/model/textparse/promparse.go +++ b/model/textparse/promparse.go @@ -502,6 +502,10 @@ func yoloString(b []byte) string { return unsafe.String(unsafe.SliceData(b), len(b)) } +func yoloBytes(b string) []byte { + return unsafe.Slice(unsafe.StringData(b), len(b)) +} + func parseFloat(s string) (float64, error) { // Keep to pre-Go 1.13 float formats. if strings.ContainsAny(s, "pP_") { diff --git a/model/textparse/protobufparse.go b/model/textparse/protobufparse.go index 6d074a2695..380c9918cf 100644 --- a/model/textparse/protobufparse.go +++ b/model/textparse/protobufparse.go @@ -15,7 +15,6 @@ package textparse import ( "bytes" - "encoding/binary" "errors" "fmt" "io" @@ -25,7 +24,6 @@ import ( "sync" "unicode/utf8" - "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" "github.com/prometheus/common/model" @@ -45,24 +43,24 @@ var floatFormatBufPool = sync.Pool{ }, } -// ProtobufParser is a very inefficient way of unmarshaling the old Prometheus -// protobuf format and then present it as it if were parsed by a -// Prometheus-2-style text parser. This is only done so that we can easily plug -// in the protobuf format into Prometheus 2. For future use (with the final -// format that will be used for native histograms), we have to revisit the -// parsing. A lot of the efficiency tricks of the Prometheus-2-style parsing -// could be used in a similar fashion (byte-slice pointers into the raw -// payload), which requires some hand-coded protobuf handling. But the current -// parsers all expect the full series name (metric name plus label pairs) as one -// string, which is not how things are represented in the protobuf format. If -// the re-arrangement work is actually causing problems (which has to be seen), -// that expectation needs to be changed. +// ProtobufParser parses the old Prometheus protobuf format and present it +// as the text-style textparse.Parser interface. +// +// It uses a tailored streaming protobuf dto.MetricStreamingDecoder that +// reuses internal protobuf structs and allows direct unmarshalling to Prometheus +// types like labels. type ProtobufParser struct { - in []byte // The input to parse. - inPos int // Position within the input. - metricPos int // Position within Metric slice. + dec *dto.MetricStreamingDecoder + + // Used for both the string returned by Series and Histogram, as well as, + // metric family for Type, Unit and Help. + entryBytes *bytes.Buffer + + lset labels.Labels + builder labels.ScratchBuilder // Held here to reduce allocations when building Labels. + // fieldPos is the position within a Summary or (legacy) Histogram. -2 - // is the count. -1 is the sum. Otherwise it is the index within + // is the count. -1 is the sum. Otherwise, it is the index within // quantiles/buckets. fieldPos int fieldsDone bool // true if no more fields of a Summary or (legacy) Histogram to be processed. @@ -78,27 +76,20 @@ type ProtobufParser struct { // that we have to decode the next MetricFamily. state Entry - builder labels.ScratchBuilder // held here to reduce allocations when building Labels - - mf *dto.MetricFamily - // Whether to also parse a classic histogram that is also present as a // native histogram. parseClassicHistograms bool - - // The following are just shenanigans to satisfy the Parser interface. - metricBytes *bytes.Buffer // A somewhat fluid representation of the current metric. } // NewProtobufParser returns a parser for the payload in the byte slice. func NewProtobufParser(b []byte, parseClassicHistograms bool, st *labels.SymbolTable) Parser { return &ProtobufParser{ - in: b, + dec: dto.NewMetricStreamingDecoder(b), + entryBytes: &bytes.Buffer{}, + builder: labels.NewScratchBuilderWithSymbolTable(st, 16), // TODO(bwplotka): Try base builder. + state: EntryInvalid, - mf: &dto.MetricFamily{}, - metricBytes: &bytes.Buffer{}, parseClassicHistograms: parseClassicHistograms, - builder: labels.NewScratchBuilderWithSymbolTable(st, 16), } } @@ -106,19 +97,18 @@ func NewProtobufParser(b []byte, parseClassicHistograms bool, st *labels.SymbolT // value, the timestamp if set, and the value of the current sample. func (p *ProtobufParser) Series() ([]byte, *int64, float64) { var ( - m = p.mf.GetMetric()[p.metricPos] - ts = m.GetTimestampMs() + ts = &p.dec.TimestampMs // To save memory allocations, never nil. v float64 ) - switch p.mf.GetType() { + switch p.dec.GetType() { case dto.MetricType_COUNTER: - v = m.GetCounter().GetValue() + v = p.dec.GetCounter().GetValue() case dto.MetricType_GAUGE: - v = m.GetGauge().GetValue() + v = p.dec.GetGauge().GetValue() case dto.MetricType_UNTYPED: - v = m.GetUntyped().GetValue() + v = p.dec.GetUntyped().GetValue() case dto.MetricType_SUMMARY: - s := m.GetSummary() + s := p.dec.GetSummary() switch p.fieldPos { case -2: v = float64(s.GetSampleCount()) @@ -133,7 +123,7 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) { } case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: // This should only happen for a classic histogram. - h := m.GetHistogram() + h := p.dec.GetHistogram() switch p.fieldPos { case -2: v = h.GetSampleCountFloat() @@ -159,8 +149,8 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) { default: panic("encountered unexpected metric type, this is a bug") } - if ts != 0 { - return p.metricBytes.Bytes(), &ts, v + if *ts != 0 { + return p.entryBytes.Bytes(), ts, v } // TODO(beorn7): We assume here that ts==0 means no timestamp. That's // not true in general, but proto3 originally has no distinction between @@ -171,7 +161,7 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) { // away from gogo-protobuf to an actively maintained protobuf // implementation. Once that's done, we can simply use the `optional` // keyword and check for the unset state explicitly. - return p.metricBytes.Bytes(), nil, v + return p.entryBytes.Bytes(), nil, v } // Histogram returns the bytes of a series with a native histogram as a value, @@ -186,47 +176,56 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) { // value. func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) { var ( - m = p.mf.GetMetric()[p.metricPos] - ts = m.GetTimestampMs() - h = m.GetHistogram() + ts = &p.dec.TimestampMs // To save memory allocations, never nil. + h = p.dec.GetHistogram() ) + if p.parseClassicHistograms && len(h.GetBucket()) > 0 { p.redoClassic = true } if h.GetSampleCountFloat() > 0 || h.GetZeroCountFloat() > 0 { // It is a float histogram. fh := histogram.FloatHistogram{ - Count: h.GetSampleCountFloat(), - Sum: h.GetSampleSum(), - ZeroThreshold: h.GetZeroThreshold(), - ZeroCount: h.GetZeroCountFloat(), - Schema: h.GetSchema(), + Count: h.GetSampleCountFloat(), + Sum: h.GetSampleSum(), + ZeroThreshold: h.GetZeroThreshold(), + ZeroCount: h.GetZeroCountFloat(), + Schema: h.GetSchema(), + + // Decoder reuses slices, so we need to copy. PositiveSpans: make([]histogram.Span, len(h.GetPositiveSpan())), - PositiveBuckets: h.GetPositiveCount(), + PositiveBuckets: make([]float64, len(h.GetPositiveCount())), NegativeSpans: make([]histogram.Span, len(h.GetNegativeSpan())), - NegativeBuckets: h.GetNegativeCount(), + NegativeBuckets: make([]float64, len(h.GetNegativeCount())), } for i, span := range h.GetPositiveSpan() { fh.PositiveSpans[i].Offset = span.GetOffset() fh.PositiveSpans[i].Length = span.GetLength() } + for i, cnt := range h.GetPositiveCount() { + fh.PositiveBuckets[i] = cnt + } for i, span := range h.GetNegativeSpan() { fh.NegativeSpans[i].Offset = span.GetOffset() fh.NegativeSpans[i].Length = span.GetLength() } - if p.mf.GetType() == dto.MetricType_GAUGE_HISTOGRAM { + for i, cnt := range h.GetNegativeCount() { + fh.NegativeBuckets[i] = cnt + } + if p.dec.GetType() == dto.MetricType_GAUGE_HISTOGRAM { fh.CounterResetHint = histogram.GaugeType } fh.Compact(0) - if ts != 0 { - return p.metricBytes.Bytes(), &ts, nil, &fh + if *ts != 0 { + return p.entryBytes.Bytes(), ts, nil, &fh } // Nasty hack: Assume that ts==0 means no timestamp. That's not true in // general, but proto3 has no distinction between unset and // default. Need to avoid in the final format. - return p.metricBytes.Bytes(), nil, nil, &fh + return p.entryBytes.Bytes(), nil, nil, &fh } + // TODO(bwplotka): Create sync.Pool for those structs. sh := histogram.Histogram{ Count: h.GetSampleCount(), Sum: h.GetSampleSum(), @@ -234,41 +233,47 @@ func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *his ZeroCount: h.GetZeroCount(), Schema: h.GetSchema(), PositiveSpans: make([]histogram.Span, len(h.GetPositiveSpan())), - PositiveBuckets: h.GetPositiveDelta(), + PositiveBuckets: make([]int64, len(h.GetPositiveDelta())), NegativeSpans: make([]histogram.Span, len(h.GetNegativeSpan())), - NegativeBuckets: h.GetNegativeDelta(), + NegativeBuckets: make([]int64, len(h.GetNegativeDelta())), } for i, span := range h.GetPositiveSpan() { sh.PositiveSpans[i].Offset = span.GetOffset() sh.PositiveSpans[i].Length = span.GetLength() } + for i, cnt := range h.GetPositiveDelta() { + sh.PositiveBuckets[i] = cnt + } for i, span := range h.GetNegativeSpan() { sh.NegativeSpans[i].Offset = span.GetOffset() sh.NegativeSpans[i].Length = span.GetLength() } - if p.mf.GetType() == dto.MetricType_GAUGE_HISTOGRAM { + for i, cnt := range h.GetNegativeDelta() { + sh.NegativeBuckets[i] = cnt + } + if p.dec.GetType() == dto.MetricType_GAUGE_HISTOGRAM { sh.CounterResetHint = histogram.GaugeType } sh.Compact(0) - if ts != 0 { - return p.metricBytes.Bytes(), &ts, &sh, nil + if *ts != 0 { + return p.entryBytes.Bytes(), ts, &sh, nil } - return p.metricBytes.Bytes(), nil, &sh, nil + return p.entryBytes.Bytes(), nil, &sh, nil } // Help returns the metric name and help text in the current entry. // Must only be called after Next returned a help entry. // The returned byte slices become invalid after the next call to Next. func (p *ProtobufParser) Help() ([]byte, []byte) { - return p.metricBytes.Bytes(), []byte(p.mf.GetHelp()) + return p.entryBytes.Bytes(), yoloBytes(p.dec.GetHelp()) } // Type returns the metric name and type in the current entry. // Must only be called after Next returned a type entry. // The returned byte slices become invalid after the next call to Next. func (p *ProtobufParser) Type() ([]byte, model.MetricType) { - n := p.metricBytes.Bytes() - switch p.mf.GetType() { + n := p.entryBytes.Bytes() + switch p.dec.GetType() { case dto.MetricType_COUNTER: return n, model.MetricTypeCounter case dto.MetricType_GAUGE: @@ -287,7 +292,7 @@ func (p *ProtobufParser) Type() ([]byte, model.MetricType) { // Must only be called after Next returned a unit entry. // The returned byte slices become invalid after the next call to Next. func (p *ProtobufParser) Unit() ([]byte, []byte) { - return p.metricBytes.Bytes(), []byte(p.mf.GetUnit()) + return p.entryBytes.Bytes(), []byte(p.dec.GetUnit()) } // Comment always returns nil because comments aren't supported by the protobuf @@ -297,21 +302,8 @@ func (p *ProtobufParser) Comment() []byte { } // Labels writes the labels of the current sample into the passed labels. -// It returns the string from which the metric was parsed. func (p *ProtobufParser) Labels(l *labels.Labels) { - p.builder.Reset() - p.builder.Add(labels.MetricName, p.getMagicName()) - - for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() { - p.builder.Add(lp.GetName(), lp.GetValue()) - } - if needed, name, value := p.getMagicLabel(); needed { - p.builder.Add(name, value) - } - - // Sort labels to maintain the sorted labels invariant. - p.builder.Sort() - *l = p.builder.Labels() + *l = p.lset.Copy() } // Exemplar writes the exemplar of the current sample into the passed @@ -324,15 +316,14 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool { // We only ever return one exemplar per (non-native-histogram) series. return false } - m := p.mf.GetMetric()[p.metricPos] var exProto *dto.Exemplar - switch p.mf.GetType() { + switch p.dec.GetType() { case dto.MetricType_COUNTER: - exProto = m.GetCounter().GetExemplar() + exProto = p.dec.GetCounter().GetExemplar() case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: isClassic := p.state == EntrySeries - if !isClassic && len(m.GetHistogram().GetExemplars()) > 0 { - exs := m.GetHistogram().GetExemplars() + if !isClassic && len(p.dec.GetHistogram().GetExemplars()) > 0 { + exs := p.dec.GetHistogram().GetExemplars() for p.exemplarPos < len(exs) { exProto = exs[p.exemplarPos] p.exemplarPos++ @@ -344,7 +335,7 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool { return false } } else { - bb := m.GetHistogram().GetBucket() + bb := p.dec.GetHistogram().GetBucket() if p.fieldPos < 0 { if isClassic { return false // At _count or _sum. @@ -392,13 +383,13 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool { // invalid (as timestamp e.g. negative value) on counters, summaries or histograms. func (p *ProtobufParser) CreatedTimestamp() *int64 { var ct *types.Timestamp - switch p.mf.GetType() { + switch p.dec.GetType() { case dto.MetricType_COUNTER: - ct = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp() + ct = p.dec.GetCounter().GetCreatedTimestamp() case dto.MetricType_SUMMARY: - ct = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp() + ct = p.dec.GetSummary().GetCreatedTimestamp() case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: - ct = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp() + ct = p.dec.GetHistogram().GetCreatedTimestamp() default: } ctAsTime, err := types.TimestampFromProto(ct) @@ -416,31 +407,34 @@ func (p *ProtobufParser) CreatedTimestamp() *int64 { func (p *ProtobufParser) Next() (Entry, error) { p.exemplarReturned = false switch p.state { + // Invalid state occurs on: + // * First Next() call. + // * Recursive call that tells Next to move to the next metric family. case EntryInvalid: - p.metricPos = 0 p.exemplarPos = 0 p.fieldPos = -2 - n, err := readDelimited(p.in[p.inPos:], p.mf) - p.inPos += n - if err != nil { + + if err := p.dec.NextMetricFamily(); err != nil { return p.state, err } - - // Skip empty metric families. - if len(p.mf.GetMetric()) == 0 { - return p.Next() + if err := p.dec.NextMetric(); err != nil { + // Skip empty metric families. + if errors.Is(err, io.EOF) { + return p.Next() + } + return EntryInvalid, err } // We are at the beginning of a metric family. Put only the name - // into metricBytes and validate only name, help, and type for now. - name := p.mf.GetName() + // into entryBytes and validate only name, help, and type for now. + name := p.dec.GetName() if !model.IsValidMetricName(model.LabelValue(name)) { return EntryInvalid, fmt.Errorf("invalid metric name: %s", name) } - if help := p.mf.GetHelp(); !utf8.ValidString(help) { + if help := p.dec.GetHelp(); !utf8.ValidString(help) { return EntryInvalid, fmt.Errorf("invalid help for metric %q: %s", name, help) } - switch p.mf.GetType() { + switch p.dec.GetType() { case dto.MetricType_COUNTER, dto.MetricType_GAUGE, dto.MetricType_HISTOGRAM, @@ -449,11 +443,11 @@ func (p *ProtobufParser) Next() (Entry, error) { dto.MetricType_UNTYPED: // All good. default: - return EntryInvalid, fmt.Errorf("unknown metric type for metric %q: %s", name, p.mf.GetType()) + return EntryInvalid, fmt.Errorf("unknown metric type for metric %q: %s", name, p.dec.GetType()) } - unit := p.mf.GetUnit() + unit := p.dec.GetUnit() if len(unit) > 0 { - if p.mf.GetType() == dto.MetricType_COUNTER && strings.HasSuffix(name, "_total") { + if p.dec.GetType() == dto.MetricType_COUNTER && strings.HasSuffix(name, "_total") { if !strings.HasSuffix(name[:len(name)-6], unit) || len(name)-6 < len(unit)+1 || name[len(name)-6-len(unit)-1] != '_' { return EntryInvalid, fmt.Errorf("unit %q not a suffix of counter %q", unit, name) } @@ -461,12 +455,11 @@ func (p *ProtobufParser) Next() (Entry, error) { return EntryInvalid, fmt.Errorf("unit %q not a suffix of metric %q", unit, name) } } - p.metricBytes.Reset() - p.metricBytes.WriteString(name) - + p.entryBytes.Reset() + p.entryBytes.WriteString(name) p.state = EntryHelp case EntryHelp: - if p.mf.Unit != "" { + if p.dec.Unit != "" { p.state = EntryUnit } else { p.state = EntryType @@ -474,48 +467,78 @@ func (p *ProtobufParser) Next() (Entry, error) { case EntryUnit: p.state = EntryType case EntryType: - t := p.mf.GetType() + t := p.dec.GetType() if (t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM) && - isNativeHistogram(p.mf.GetMetric()[0].GetHistogram()) { + isNativeHistogram(p.dec.GetHistogram()) { p.state = EntryHistogram } else { p.state = EntrySeries } - if err := p.updateMetricBytes(); err != nil { + if err := p.onSeriesOrHistogramUpdate(); err != nil { return EntryInvalid, err } - case EntryHistogram, EntrySeries: - if p.redoClassic { - p.redoClassic = false - p.state = EntrySeries - p.fieldPos = -3 - p.fieldsDone = false - } - t := p.mf.GetType() - if p.state == EntrySeries && !p.fieldsDone && - (t == dto.MetricType_SUMMARY || - t == dto.MetricType_HISTOGRAM || - t == dto.MetricType_GAUGE_HISTOGRAM) { - p.fieldPos++ - } else { - p.metricPos++ + case EntrySeries: + // Potentially a second series in the metric family. + t := p.dec.GetType() + if t == dto.MetricType_SUMMARY || + t == dto.MetricType_HISTOGRAM || + t == dto.MetricType_GAUGE_HISTOGRAM { + // Non-trivial series (complex metrics, with magic suffixes). + + // Did we iterate over all the classic representations fields? + // NOTE: p.fieldsDone is updated on p.onSeriesOrHistogramUpdate. + if !p.fieldsDone { + // Still some fields to iterate over. + p.fieldPos++ + if err := p.onSeriesOrHistogramUpdate(); err != nil { + return EntryInvalid, err + } + return p.state, nil + } + + // Reset histogram fields. p.fieldPos = -2 p.fieldsDone = false p.exemplarPos = 0 + // If this is a metric family containing native - // histograms, we have to switch back to native - // histograms after parsing a classic histogram. - if p.state == EntrySeries && - (t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM) && - isNativeHistogram(p.mf.GetMetric()[0].GetHistogram()) { + // histograms, it means we are here thanks to redoClassic state. + // Return to native histograms for the consistent flow. + if (t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM) && + isNativeHistogram(p.dec.GetHistogram()) { p.state = EntryHistogram } } - if p.metricPos >= len(p.mf.GetMetric()) { - p.state = EntryInvalid - return p.Next() + // Is there another series? + if err := p.dec.NextMetric(); err != nil { + if errors.Is(err, io.EOF) { + p.state = EntryInvalid + return p.Next() + } + return EntryInvalid, err } - if err := p.updateMetricBytes(); err != nil { + if err := p.onSeriesOrHistogramUpdate(); err != nil { + return EntryInvalid, err + } + case EntryHistogram: + // Was Histogram() called and parseClassicHistograms is true? + if p.redoClassic { + p.redoClassic = false + p.fieldPos = -3 + p.fieldsDone = false + p.state = EntrySeries + return p.Next() // Switch to classic histogram. + } + + // Is there another series? + if err := p.dec.NextMetric(); err != nil { + if errors.Is(err, io.EOF) { + p.state = EntryInvalid + return p.Next() + } + return EntryInvalid, err + } + if err := p.onSeriesOrHistogramUpdate(); err != nil { return EntryInvalid, err } default: @@ -524,30 +547,39 @@ func (p *ProtobufParser) Next() (Entry, error) { return p.state, nil } -func (p *ProtobufParser) updateMetricBytes() error { - b := p.metricBytes - b.Reset() - b.WriteString(p.getMagicName()) - for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() { - b.WriteByte(model.SeparatorByte) - n := lp.GetName() - if !model.LabelName(n).IsValid() { - return fmt.Errorf("invalid label name: %s", n) - } - b.WriteString(n) - b.WriteByte(model.SeparatorByte) - v := lp.GetValue() - if !utf8.ValidString(v) { - return fmt.Errorf("invalid label value: %s", v) - } - b.WriteString(v) +// onSeriesOrHistogramUpdate updates internal state before returning +// a series or histogram. It updates: +// * p.lset. +// * p.entryBytes. +// * p.fieldsDone depending on p.fieldPos. +func (p *ProtobufParser) onSeriesOrHistogramUpdate() error { + p.builder.Reset() + p.builder.Add(labels.MetricName, p.getMagicName()) + + if err := p.dec.Label(&p.builder); err != nil { + return err } - if needed, n, v := p.getMagicLabel(); needed { - b.WriteByte(model.SeparatorByte) - b.WriteString(n) - b.WriteByte(model.SeparatorByte) - b.WriteString(v) + + if needed, name, value := p.getMagicLabel(); needed { + p.builder.Add(name, value) } + + // Sort labels to maintain the sorted labels invariant. + p.builder.Sort() + p.builder.Overwrite(&p.lset) + + // entryBytes has to be unique for each series. + p.entryBytes.Reset() + p.lset.Range(func(l labels.Label) { + if l.Name == labels.MetricName { + p.entryBytes.WriteString(l.Value) + return + } + p.entryBytes.WriteByte(model.SeparatorByte) + p.entryBytes.WriteString(l.Name) + p.entryBytes.WriteByte(model.SeparatorByte) + p.entryBytes.WriteString(l.Value) + }) return nil } @@ -555,36 +587,37 @@ func (p *ProtobufParser) updateMetricBytes() error { // ("_count", "_sum", "_bucket") if needed according to the current parser // state. func (p *ProtobufParser) getMagicName() string { - t := p.mf.GetType() + t := p.dec.GetType() if p.state == EntryHistogram || (t != dto.MetricType_HISTOGRAM && t != dto.MetricType_GAUGE_HISTOGRAM && t != dto.MetricType_SUMMARY) { - return p.mf.GetName() + return p.dec.GetName() } if p.fieldPos == -2 { - return p.mf.GetName() + "_count" + return p.dec.GetName() + "_count" } if p.fieldPos == -1 { - return p.mf.GetName() + "_sum" + return p.dec.GetName() + "_sum" } if t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM { - return p.mf.GetName() + "_bucket" + return p.dec.GetName() + "_bucket" } - return p.mf.GetName() + return p.dec.GetName() } // getMagicLabel returns if a magic label ("quantile" or "le") is needed and, if // so, its name and value. It also sets p.fieldsDone if applicable. func (p *ProtobufParser) getMagicLabel() (bool, string, string) { + // Native histogram or _count and _sum series. if p.state == EntryHistogram || p.fieldPos < 0 { return false, "", "" } - switch p.mf.GetType() { + switch p.dec.GetType() { case dto.MetricType_SUMMARY: - qq := p.mf.GetMetric()[p.metricPos].GetSummary().GetQuantile() + qq := p.dec.GetSummary().GetQuantile() q := qq[p.fieldPos] p.fieldsDone = p.fieldPos == len(qq)-1 return true, model.QuantileLabel, formatOpenMetricsFloat(q.GetQuantile()) case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: - bb := p.mf.GetMetric()[p.metricPos].GetHistogram().GetBucket() + bb := p.dec.GetHistogram().GetBucket() if p.fieldPos >= len(bb) { p.fieldsDone = true return true, model.BucketLabel, "+Inf" @@ -596,29 +629,6 @@ func (p *ProtobufParser) getMagicLabel() (bool, string, string) { return false, "", "" } -var errInvalidVarint = errors.New("protobufparse: invalid varint encountered") - -// readDelimited is essentially doing what the function of the same name in -// github.com/matttproud/golang_protobuf_extensions/pbutil is doing, but it is -// specific to a MetricFamily, utilizes the more efficient gogo-protobuf -// unmarshaling, and acts on a byte slice directly without any additional -// staging buffers. -func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) { - if len(b) == 0 { - return 0, io.EOF - } - messageLength, varIntLength := proto.DecodeVarint(b) - if varIntLength == 0 || varIntLength > binary.MaxVarintLen32 { - return 0, errInvalidVarint - } - totalLength := varIntLength + int(messageLength) - if totalLength > len(b) { - return 0, fmt.Errorf("protobufparse: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b)) - } - mf.Reset() - return totalLength, mf.Unmarshal(b[varIntLength:totalLength]) -} - // formatOpenMetricsFloat works like the usual Go string formatting of a float // but appends ".0" if the resulting number would otherwise contain neither a // "." nor an "e". diff --git a/model/textparse/protobufparse_test.go b/model/textparse/protobufparse_test.go index 065459a69a..381e6a9355 100644 --- a/model/textparse/protobufparse_test.go +++ b/model/textparse/protobufparse_test.go @@ -1246,7 +1246,7 @@ func TestProtobufParse(t *testing.T) { ), }, { - m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.5", + m: "rpc_durations_seconds\xffquantile\xff0.5\xffservice\xffexponential", v: 6.442786329648548e-07, lset: labels.FromStrings( "__name__", "rpc_durations_seconds", @@ -1255,7 +1255,7 @@ func TestProtobufParse(t *testing.T) { ), }, { - m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.9", + m: "rpc_durations_seconds\xffquantile\xff0.9\xffservice\xffexponential", v: 1.9435742936658396e-06, lset: labels.FromStrings( "__name__", "rpc_durations_seconds", @@ -1264,7 +1264,7 @@ func TestProtobufParse(t *testing.T) { ), }, { - m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.99", + m: "rpc_durations_seconds\xffquantile\xff0.99\xffservice\xffexponential", v: 4.0471608667037015e-06, lset: labels.FromStrings( "__name__", "rpc_durations_seconds", @@ -2199,7 +2199,7 @@ func TestProtobufParse(t *testing.T) { ), }, { - m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.5", + m: "rpc_durations_seconds\xffquantile\xff0.5\xffservice\xffexponential", v: 6.442786329648548e-07, lset: labels.FromStrings( "__name__", "rpc_durations_seconds", @@ -2208,7 +2208,7 @@ func TestProtobufParse(t *testing.T) { ), }, { - m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.9", + m: "rpc_durations_seconds\xffquantile\xff0.9\xffservice\xffexponential", v: 1.9435742936658396e-06, lset: labels.FromStrings( "__name__", "rpc_durations_seconds", @@ -2217,7 +2217,7 @@ func TestProtobufParse(t *testing.T) { ), }, { - m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.99", + m: "rpc_durations_seconds\xffquantile\xff0.99\xffservice\xffexponential", v: 4.0471608667037015e-06, lset: labels.FromStrings( "__name__", "rpc_durations_seconds", diff --git a/prompb/io/prometheus/client/decoder.go b/prompb/io/prometheus/client/decoder.go new file mode 100644 index 0000000000..b21f78cc9c --- /dev/null +++ b/prompb/io/prometheus/client/decoder.go @@ -0,0 +1,780 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io_prometheus_client //nolint:revive + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + "unicode/utf8" + "unsafe" + + proto "github.com/gogo/protobuf/proto" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/model/labels" +) + +type MetricStreamingDecoder struct { + in []byte + inPos int + + // TODO(bwplotka): Switch to generator/plugin that won't have those fields accessible e.g. OpaqueAPI + // We leverage the fact those two don't collide. + *MetricFamily // Without Metric, guarded by overridden GetMetric method. + *Metric // Without Label, guarded by overridden GetLabel method. + + mfData []byte + metrics []pos + metricIndex int + + mData []byte + labels []pos +} + +// NewMetricStreamingDecoder returns a Go iterator that unmarshals given protobuf bytes one +// metric family and metric at the time, allowing efficient streaming. +// +// Do not modify MetricStreamingDecoder between iterations as it's reused to save allocations. +// GetGauge, GetCounter, etc are also cached, which means GetGauge will work for counter +// if previously gauge was parsed. It's up to the caller to use Type to decide what +// method to use when checking the value. +// +// TODO(bwplotka): io.Reader approach is possible too, but textparse has access to whole scrape for now. +func NewMetricStreamingDecoder(data []byte) *MetricStreamingDecoder { + return &MetricStreamingDecoder{ + in: data, + MetricFamily: &MetricFamily{}, + Metric: &Metric{}, + metrics: make([]pos, 0, 100), + } +} + +var errInvalidVarint = errors.New("clientpb: invalid varint encountered") + +func (m *MetricStreamingDecoder) NextMetricFamily() error { + b := m.in[m.inPos:] + if len(b) == 0 { + return io.EOF + } + messageLength, varIntLength := proto.DecodeVarint(b) // TODO(bwplotka): Get rid of gogo. + if varIntLength == 0 || varIntLength > binary.MaxVarintLen32 { + return errInvalidVarint + } + totalLength := varIntLength + int(messageLength) + if totalLength > len(b) { + return fmt.Errorf("clientpb: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b)) + } + m.resetMetricFamily() + m.mfData = b[varIntLength:totalLength] + + m.inPos += totalLength + return m.MetricFamily.unmarshalWithoutMetrics(m, m.mfData) +} + +// resetMetricFamily resets all the fields in m to equal the zero value, but re-using slice memory. +func (m *MetricStreamingDecoder) resetMetricFamily() { + m.metrics = m.metrics[:0] + m.metricIndex = 0 + m.MetricFamily.Reset() +} + +func (m *MetricStreamingDecoder) NextMetric() error { + if m.metricIndex >= len(m.metrics) { + return io.EOF + } + + m.resetMetric() + m.mData = m.mfData[m.metrics[m.metricIndex].start:m.metrics[m.metricIndex].end] + if err := m.Metric.unmarshalWithoutLabels(m, m.mData); err != nil { + return err + } + m.metricIndex++ + return nil +} + +// resetMetric resets all the fields in m to equal the zero value, but re-using slices memory. +func (m *MetricStreamingDecoder) resetMetric() { + m.labels = m.labels[:0] + m.TimestampMs = 0 + + // TODO(bwplotka): Autogenerate reset functions. + if m.Metric.Counter != nil { + m.Metric.Counter.Value = 0 + m.Metric.Counter.CreatedTimestamp = nil + m.Metric.Counter.Exemplar = nil + } + if m.Metric.Gauge != nil { + m.Metric.Gauge.Value = 0 + } + if m.Metric.Histogram != nil { + m.Metric.Histogram.SampleCount = 0 + m.Metric.Histogram.SampleCountFloat = 0 + m.Metric.Histogram.SampleSum = 0 + m.Metric.Histogram.Bucket = m.Metric.Histogram.Bucket[:0] + m.Metric.Histogram.CreatedTimestamp = nil + m.Metric.Histogram.Schema = 0 + m.Metric.Histogram.ZeroThreshold = 0 + m.Metric.Histogram.ZeroCount = 0 + m.Metric.Histogram.ZeroCountFloat = 0 + m.Metric.Histogram.NegativeSpan = m.Metric.Histogram.NegativeSpan[:0] + m.Metric.Histogram.NegativeDelta = m.Metric.Histogram.NegativeDelta[:0] + m.Metric.Histogram.NegativeCount = m.Metric.Histogram.NegativeCount[:0] + m.Metric.Histogram.PositiveSpan = m.Metric.Histogram.PositiveSpan[:0] + m.Metric.Histogram.PositiveDelta = m.Metric.Histogram.PositiveDelta[:0] + m.Metric.Histogram.PositiveCount = m.Metric.Histogram.PositiveCount[:0] + m.Metric.Histogram.Exemplars = m.Metric.Histogram.Exemplars[:0] + } + if m.Metric.Summary != nil { + m.Metric.Summary.SampleCount = 0 + m.Metric.Summary.SampleSum = 0 + m.Metric.Summary.Quantile = m.Metric.Summary.Quantile[:0] + m.Metric.Summary.CreatedTimestamp = nil + } +} + +func (m *MetricStreamingDecoder) GetMetric() { + panic("don't use GetMetric, use Metric directly") +} + +func (m *MetricStreamingDecoder) GetLabel() { + panic("don't use GetLabel, use Label instead") +} + +// Label parses labels into labels scratch builder. Metric name is missing +// given the protobuf metric model and has to be deduced from the metric family name. +// TODO: The method name intentionally hide MetricStreamingDecoder.Metric.Label +// field to avoid direct use (it's not parsed). In future generator will generate +// structs tailored for streaming decoding. +func (m *MetricStreamingDecoder) Label(b *labels.ScratchBuilder) error { + for _, l := range m.labels { + if err := parseLabel(m.mData[l.start:l.end], b); err != nil { + return err + } + } + return nil +} + +// parseLabels is essentially LabelPair.Unmarshal but directly adding into scratch builder +// and reusing strings. +func parseLabel(dAtA []byte, b *labels.ScratchBuilder) error { + var name, value string + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return errors.New("proto: LabelPair: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelPair: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + name = yoloString(dAtA[iNdEx:postIndex]) + if !model.LabelName(name).IsValid() { + return fmt.Errorf("invalid label name: %s", name) + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + value = yoloString(dAtA[iNdEx:postIndex]) + if !utf8.ValidString(value) { + return fmt.Errorf("invalid label value: %s", value) + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMetrics(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthMetrics + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + if iNdEx > l { + return io.ErrUnexpectedEOF + } + b.Add(name, value) + return nil +} + +func yoloString(b []byte) string { + return unsafe.String(unsafe.SliceData(b), len(b)) +} + +type pos struct { + start, end int +} + +func (m *Metric) unmarshalWithoutLabels(p *MetricStreamingDecoder, dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return errors.New("proto: Metric: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Metric: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Label", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + p.labels = append(p.labels, pos{start: iNdEx, end: postIndex}) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Gauge", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Gauge == nil { + m.Gauge = &Gauge{} + } + if err := m.Gauge.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Counter", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Counter == nil { + m.Counter = &Counter{} + } + if err := m.Counter.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Summary", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Summary == nil { + m.Summary = &Summary{} + } + if err := m.Summary.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Untyped", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Untyped == nil { + m.Untyped = &Untyped{} + } + if err := m.Untyped.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TimestampMs", wireType) + } + m.TimestampMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TimestampMs |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Histogram", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Histogram == nil { + m.Histogram = &Histogram{} + } + if err := m.Histogram.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMetrics(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthMetrics + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} + +func (m *MetricFamily) unmarshalWithoutMetrics(buf *MetricStreamingDecoder, dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return errors.New("proto: MetricFamily: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: MetricFamily: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = yoloString(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Help", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Help = yoloString(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Type |= MetricType(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metric", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + buf.metrics = append(buf.metrics, pos{start: iNdEx, end: postIndex}) + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Unit", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetrics + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMetrics + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMetrics + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Unit = yoloString(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMetrics(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthMetrics + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} diff --git a/prompb/io/prometheus/client/decoder_test.go b/prompb/io/prometheus/client/decoder_test.go new file mode 100644 index 0000000000..8697b78fca --- /dev/null +++ b/prompb/io/prometheus/client/decoder_test.go @@ -0,0 +1,171 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io_prometheus_client //nolint:revive + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" +) + +const ( + testGauge = `name: "go_build_info" +help: "Build information about the main Go module." +type: GAUGE +metric: < + label: < + name: "checksum" + value: "" + > + label: < + name: "path" + value: "github.com/prometheus/client_golang" + > + label: < + name: "version" + value: "(devel)" + > + gauge: < + value: 1 + > +> +metric: < + label: < + name: "checksum" + value: "" + > + label: < + name: "path" + value: "github.com/prometheus/prometheus" + > + label: < + name: "version" + value: "v3.0.0" + > + gauge: < + value: 2 + > +> + +` + testCounter = `name: "go_memstats_alloc_bytes_total" +help: "Total number of bytes allocated, even if freed." +type: COUNTER +unit: "bytes" +metric: < + counter: < + value: 1.546544e+06 + exemplar: < + label: < + name: "dummyID" + value: "42" + > + value: 12 + timestamp: < + seconds: 1625851151 + nanos: 233181499 + > + > + > +> + +` +) + +func TestMetricStreamingDecoder(t *testing.T) { + varintBuf := make([]byte, binary.MaxVarintLen32) + buf := bytes.Buffer{} + for _, m := range []string{testGauge, testCounter} { + mf := &MetricFamily{} + require.NoError(t, proto.UnmarshalText(m, mf)) + // From proto message to binary protobuf. + protoBuf, err := proto.Marshal(mf) + require.NoError(t, err) + + // Write first length, then binary protobuf. + varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf))) + buf.Write(varintBuf[:varintLength]) + buf.Write(protoBuf) + } + + d := NewMetricStreamingDecoder(buf.Bytes()) + require.NoError(t, d.NextMetricFamily()) + nextFn := func() error { + for { + err := d.NextMetric() + if errors.Is(err, io.EOF) { + if err := d.NextMetricFamily(); err != nil { + return err + } + continue + } + return err + } + } + + var firstMetricLset labels.Labels + { + require.NoError(t, nextFn()) + + require.Equal(t, "go_build_info", d.GetName()) + require.Equal(t, "Build information about the main Go module.", d.GetHelp()) + require.Equal(t, MetricType_GAUGE, d.GetType()) + + require.Equal(t, float64(1), d.GetGauge().GetValue()) + b := labels.NewScratchBuilder(0) + require.NoError(t, d.Label(&b)) + + firstMetricLset = b.Labels() + + require.Equal(t, `{checksum="", path="github.com/prometheus/client_golang", version="(devel)"}`, firstMetricLset.String()) + } + + { + require.NoError(t, nextFn()) + + require.Equal(t, "go_build_info", d.GetName()) + require.Equal(t, "Build information about the main Go module.", d.GetHelp()) + require.Equal(t, MetricType_GAUGE, d.GetType()) + + require.Equal(t, float64(2), d.GetGauge().GetValue()) + b := labels.NewScratchBuilder(0) + require.NoError(t, d.Label(&b)) + require.Equal(t, `{checksum="", path="github.com/prometheus/prometheus", version="v3.0.0"}`, b.Labels().String()) + } + { + // Different mf now. + require.NoError(t, nextFn()) + + require.Equal(t, "go_memstats_alloc_bytes_total", d.GetName()) + require.Equal(t, "Total number of bytes allocated, even if freed.", d.GetHelp()) + require.Equal(t, "bytes", d.GetUnit()) + require.Equal(t, MetricType_COUNTER, d.GetType()) + + require.Equal(t, 1.546544e+06, d.Metric.GetCounter().GetValue()) + b := labels.NewScratchBuilder(0) + require.NoError(t, d.Label(&b)) + require.Equal(t, `{}`, b.Labels().String()) + } + require.Equal(t, io.EOF, nextFn()) + + // Expect labels and metricBytes to be static and reusable even after parsing. + require.Equal(t, `{checksum="", path="github.com/prometheus/client_golang", version="(devel)"}`, firstMetricLset.String()) +} diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index ddc9b2853f..b30dbfa1b9 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1895,6 +1895,7 @@ func TestScrapeLoopAppend(t *testing.T) { } func requireEqual(t *testing.T, expected, actual interface{}, msgAndArgs ...interface{}) { + t.Helper() testutil.RequireEqualWithOptions(t, expected, actual, []cmp.Option{cmp.Comparer(equalFloatSamples), cmp.AllowUnexported(histogramSample{})}, msgAndArgs...)