From 5de2df752f39de05c6ab53a9ec93f797027da0b9 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 29 Jun 2021 23:45:23 +0200 Subject: [PATCH] Hacky implementation of protobuf parsing This "brings back" protobuf parsing, with the only goal to play with the new sparse histograms. The Prom-2.x style parser is highly adapted to the structure of the Prometheus text format (and later OpenMetrics). Some jumping through hoops is required to feed protobuf into it. This is not meant to be a model for the final implementation. It should just enable sparse histogram ingestion at a reasonable efficiency. Following known shortcomings and flaws: - No tests yet. - Summaries and legacy histograms, i.e. without sparse buckets, are ignored. - Staleness doesn't work (but this could be fixed in the appender, to be discussed). - No tricks have been tried that would be similar to the tricks the text parsers do (like direct pointers into the HTTP response body). That makes things weird here. Tricky optimizations only make sense once the final format is specified, which will almost certainly not be the old protobuf format. (Interestingly, I expect this implementation to be in fact much more efficient than the original protobuf ingestion in Prom-1.x.) - This is using a proto3 version of metrics.proto (mostly to be consistent with the other protobuf uses). However, proto3 sees no difference between an unset field. We depend on that to distinguish between an unset timestamp and the timestamp 0 (1970-01-01, 00:00:00 UTC). In this experimental code, we just assume that timestamp is never specified and therefore a timestamp of 0 always is interpreted as "not set". Signed-off-by: beorn7 --- pkg/textparse/interface.go | 36 ++-- pkg/textparse/openmetricsparse.go | 7 + pkg/textparse/promparse.go | 7 + pkg/textparse/protobufparse.go | 311 ++++++++++++++++++++++++++++++ scrape/scrape.go | 40 ++-- scrape/scrape_test.go | 4 +- 6 files changed, 380 insertions(+), 25 deletions(-) create mode 100644 pkg/textparse/protobufparse.go diff --git a/pkg/textparse/interface.go b/pkg/textparse/interface.go index 557e56662..1dbcc51d8 100644 --- a/pkg/textparse/interface.go +++ b/pkg/textparse/interface.go @@ -17,16 +17,22 @@ import ( "mime" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" ) // Parser parses samples from a byte slice of samples in the official // Prometheus and OpenMetrics text exposition formats. type Parser interface { - // Series returns the bytes of the series, the timestamp if set, and the value - // of the current sample. + // Series returns the bytes of a series with a simple float64 as a + // value, the timestamp if set, and the value of the current sample. Series() ([]byte, *int64, float64) + // Histogram returns the bytes of a series with a sparse histogram as a + // value, the timestamp if set, and the sparse histogram in the current + // sample. + Histogram() ([]byte, *int64, histogram.SparseHistogram) + // Help returns the metric name and help text in the current entry. // Must only be called after Next returned a help entry. // The returned byte slices become invalid after the next call to Next. @@ -63,22 +69,30 @@ type Parser interface { // New returns a new parser of the byte slice. func New(b []byte, contentType string) Parser { mediaType, _, err := mime.ParseMediaType(contentType) - if err == nil && mediaType == "application/openmetrics-text" { - return NewOpenMetricsParser(b) + if err != nil { + return NewPromParser(b) + } + switch mediaType { + case "application/openmetrics-text": + return NewOpenMetricsParser(b) + case "application/vnd.google.protobuf": + return NewProtobufParser(b) + default: + return NewPromParser(b) } - return NewPromParser(b) } // Entry represents the type of a parsed entry. type Entry int const ( - EntryInvalid Entry = -1 - EntryType Entry = 0 - EntryHelp Entry = 1 - EntrySeries Entry = 2 - EntryComment Entry = 3 - EntryUnit Entry = 4 + EntryInvalid Entry = -1 + EntryType Entry = 0 + EntryHelp Entry = 1 + EntrySeries Entry = 2 // A series with a simple float64 as value. + EntryComment Entry = 3 + EntryUnit Entry = 4 + EntryHistogram Entry = 5 // A series with a sparse histogram as a value. ) // MetricType represents metric type values. diff --git a/pkg/textparse/openmetricsparse.go b/pkg/textparse/openmetricsparse.go index 6cfdd8391..b5925fa58 100644 --- a/pkg/textparse/openmetricsparse.go +++ b/pkg/textparse/openmetricsparse.go @@ -28,6 +28,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/value" ) @@ -113,6 +114,12 @@ func (p *OpenMetricsParser) Series() ([]byte, *int64, float64) { return p.series, nil, p.val } +// Histogram always returns (nil, nil, SparseHistogram{}) because OpenMetrics +// does not support sparse histograms. +func (p *OpenMetricsParser) Histogram() ([]byte, *int64, histogram.SparseHistogram) { + return nil, nil, histogram.SparseHistogram{} +} + // Help returns the metric name and help text in the current entry. // Must only be called after Next returned a help entry. // The returned byte slices become invalid after the next call to Next. diff --git a/pkg/textparse/promparse.go b/pkg/textparse/promparse.go index 3c885af0b..407dcea6d 100644 --- a/pkg/textparse/promparse.go +++ b/pkg/textparse/promparse.go @@ -29,6 +29,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/value" ) @@ -168,6 +169,12 @@ func (p *PromParser) Series() ([]byte, *int64, float64) { return p.series, nil, p.val } +// Histogram always returns (nil, nil, SparseHistogram{}) because the Prometheus +// text format does not support sparse histograms. +func (p *PromParser) Histogram() ([]byte, *int64, histogram.SparseHistogram) { + return nil, nil, histogram.SparseHistogram{} +} + // Help returns the metric name and help text in the current entry. // Must only be called after Next returned a help entry. // The returned byte slices become invalid after the next call to Next. diff --git a/pkg/textparse/protobufparse.go b/pkg/textparse/protobufparse.go new file mode 100644 index 000000000..eeb6e3c58 --- /dev/null +++ b/pkg/textparse/protobufparse.go @@ -0,0 +1,311 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package textparse + +import ( + "bytes" + "encoding/binary" + "io" + "sort" + "unicode/utf8" + + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" + "github.com/prometheus/prometheus/pkg/labels" + + dto "github.com/prometheus/prometheus/prompb/io/prometheus/client" +) + +// ProtobufParser is a very inefficient way of unmarshaling the old Prometheus +// protobuf format and then present it as it if were parsed by a +// Prometheus-2-style text parser. This is only done so that we can easily plug +// in the protobuf format into Prometheus 2. For future use (with the final +// format that will be used for sparse histograms), we have to revisit the +// parsing. A lot of the efficiency tricks of the Prometheus-2-style parsing +// could be used in a similar fashion (byte-slice pointers into the raw +// payload), which requires some hand-coded protobuf handling. But the current +// parsers all expect the full series name (metric name plus label pairs) as one +// string, which is not how things are represented in the protobuf format. If +// the re-arrangement work is actually causing problems (which has to be seen), +// that expectation needs to be changed. +// +// TODO(beorn7): The parser currently ignores summaries and legacy histograms +// (those without sparse buckets) to keep things simple. +type ProtobufParser struct { + in []byte // The intput to parse. + inPos int // Position within the input. + state Entry // State is marked by the entry we are + // processing. EntryInvalid implies that we have to + // decode the next MetricFamily. + metricPos int // Position within Metric slice. + mf *dto.MetricFamily + + // The following are just shenanigans to satisfy the Parser interface. + metricBytes *bytes.Buffer // A somewhat fluid representation of the current metric. +} + +func NewProtobufParser(b []byte) Parser { + return &ProtobufParser{ + in: b, + state: EntryInvalid, + mf: &dto.MetricFamily{}, + metricBytes: &bytes.Buffer{}, + } +} + +// Series returns the bytes of a series with a simple float64 as a +// value, the timestamp if set, and the value of the current sample. +func (p *ProtobufParser) Series() ([]byte, *int64, float64) { + var ( + m = p.mf.GetMetric()[p.metricPos] + ts = m.GetTimestampMs() + v float64 + ) + switch p.mf.GetType() { + case dto.MetricType_COUNTER: + v = m.GetCounter().Value + case dto.MetricType_GAUGE: + v = m.GetGauge().Value + case dto.MetricType_UNTYPED: + v = m.GetUntyped().Value + default: + panic("encountered unexpected metric type, this is a bug") + } + if ts != 0 { + return p.metricBytes.Bytes(), &ts, v + } + // Nasty hack: Assume that ts==0 means no timestamp. That's not true in + // general, but proto3 has no distinction between unset and + // default. Need to avoid in the final format. + return p.metricBytes.Bytes(), nil, v +} + +// Histogram returns the bytes of a series with a sparse histogram as a +// value, the timestamp if set, and the sparse histogram in the current +// sample. +func (p *ProtobufParser) Histogram() ([]byte, *int64, histogram.SparseHistogram) { + var ( + m = p.mf.GetMetric()[p.metricPos] + ts = m.GetTimestampMs() + h = m.GetHistogram() + ) + sh := histogram.SparseHistogram{ + Count: h.GetSampleCount(), + Sum: h.GetSampleSum(), + ZeroThreshold: h.GetSbZeroThreshold(), + ZeroCount: h.GetSbZeroCount(), + Schema: h.GetSbSchema(), + PositiveSpans: make([]histogram.Span, len(h.GetSbPositive().GetSpan())), + PositiveBuckets: h.GetSbPositive().GetDelta(), + NegativeSpans: make([]histogram.Span, len(h.GetSbNegative().GetSpan())), + NegativeBuckets: h.GetSbNegative().GetDelta(), + } + for i, span := range h.GetSbPositive().GetSpan() { + sh.PositiveSpans[i].Offset = span.GetOffset() + sh.PositiveSpans[i].Length = span.GetLength() + } + for i, span := range h.GetSbNegative().GetSpan() { + sh.NegativeSpans[i].Offset = span.GetOffset() + sh.NegativeSpans[i].Length = span.GetLength() + } + if ts != 0 { + return p.metricBytes.Bytes(), &ts, sh + } + // Nasty hack: Assume that ts==0 means no timestamp. That's not true in + // general, but proto3 has no distinction between unset and + // default. Need to avoid in the final format. + return p.metricBytes.Bytes(), nil, sh +} + +// Help returns the metric name and help text in the current entry. +// Must only be called after Next returned a help entry. +// The returned byte slices become invalid after the next call to Next. +func (p *ProtobufParser) Help() ([]byte, []byte) { + return p.metricBytes.Bytes(), []byte(p.mf.GetHelp()) +} + +// Type returns the metric name and type in the current entry. +// Must only be called after Next returned a type entry. +// The returned byte slices become invalid after the next call to Next. +func (p *ProtobufParser) Type() ([]byte, MetricType) { + n := p.metricBytes.Bytes() + switch p.mf.GetType() { + case dto.MetricType_COUNTER: + return n, MetricTypeCounter + case dto.MetricType_GAUGE: + return n, MetricTypeGauge + case dto.MetricType_HISTOGRAM: + return n, MetricTypeGaugeHistogram + } + return n, MetricTypeUnknown +} + +// Unit always returns (nil, nil) because units aren't supported by the protobuf +// format. +func (p *ProtobufParser) Unit() ([]byte, []byte) { + return nil, nil +} + +// Comment always returns nil because comments aren't supported by the protobuf +// format. +func (p *ProtobufParser) Comment() []byte { + return nil +} + +// Metric writes the labels of the current sample into the passed labels. +// It returns the string from which the metric was parsed. +func (p *ProtobufParser) Metric(l *labels.Labels) string { + *l = append(*l, labels.Label{ + Name: labels.MetricName, + Value: p.mf.GetName(), + }) + + for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() { + *l = append(*l, labels.Label{ + Name: lp.GetName(), + Value: lp.GetValue(), + }) + } + + // Sort labels to maintain the sorted labels invariant. + sort.Sort(*l) + + return p.metricBytes.String() +} + +// Exemplar always returns false because exemplars aren't supported yet by the +// protobuf format. +func (p *ProtobufParser) Exemplar(l *exemplar.Exemplar) bool { + return false +} + +// Next advances the parser to the next "sample" (emulating the behavior of a +// text format parser). It returns (EntryInvalid, io.EOF) if no samples were +// read. +func (p *ProtobufParser) Next() (Entry, error) { + switch p.state { + case EntryInvalid: + p.metricPos = 0 + n, err := readDelimited(p.in[p.inPos:], p.mf) + p.inPos += n + if err != nil { + return p.state, err + } + + // Skip empty metric families. While checking for emptiness, ignore + // summaries and legacy histograms for now. + metricFound := false + metricType := p.mf.GetType() + for _, m := range p.mf.GetMetric() { + if metricType == dto.MetricType_COUNTER || + metricType == dto.MetricType_GAUGE || + metricType == dto.MetricType_UNTYPED || + (metricType == dto.MetricType_HISTOGRAM && + // A histogram with a non-zero SbZerothreshold + // is a sparse histogram. + m.GetHistogram().GetSbZeroThreshold() != 0) { + metricFound = true + break + } + } + if !metricFound { + return p.Next() + } + + // We are at the beginning of a metric family. Put only the name + // into metricBytes and validate only name and help for now. + name := p.mf.GetName() + if !model.IsValidMetricName(model.LabelValue(name)) { + return EntryInvalid, errors.Errorf("invalid metric name: %s", name) + } + if help := p.mf.GetHelp(); !utf8.ValidString(help) { + return EntryInvalid, errors.Errorf("invalid help for metric %q: %s", name, help) + } + p.metricBytes.Reset() + p.metricBytes.WriteString(name) + + p.state = EntryHelp + case EntryHelp: + p.state = EntryType + case EntryType: + if p.mf.GetType() == dto.MetricType_HISTOGRAM { + p.state = EntryHistogram + } else { + p.state = EntrySeries + } + if err := p.updateMetricBytes(); err != nil { + return EntryInvalid, err + } + case EntryHistogram, EntrySeries: + p.metricPos++ + if p.metricPos >= len(p.mf.GetMetric()) { + p.state = EntryInvalid + return p.Next() + } + if err := p.updateMetricBytes(); err != nil { + return EntryInvalid, err + } + default: + return EntryInvalid, errors.Errorf("invalid protobuf parsing state: %d", p.state) + } + return p.state, nil +} + +func (p *ProtobufParser) updateMetricBytes() error { + b := p.metricBytes + b.Reset() + b.WriteString(p.mf.GetName()) + for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() { + b.WriteByte(model.SeparatorByte) + n := lp.GetName() + if !model.LabelName(n).IsValid() { + return errors.Errorf("invalid label name: %s", n) + } + b.WriteString(n) + b.WriteByte(model.SeparatorByte) + v := lp.GetValue() + if !utf8.ValidString(v) { + return errors.Errorf("invalid label value: %s", v) + } + b.WriteString(v) + } + return nil +} + +var errInvalidVarint = errors.New("protobufparse: invalid varint encountered") + +// readDelimited is essentially doing what the function of the same name in +// github.com/matttproud/golang_protobuf_extensions/pbutil is doing, but it is +// specific to a MetricFamily, utilizes the more efficient gogo-protobuf +// unmarshaling, and acts on a byte slice directly without any additional +// staging buffers. +func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) { + if len(b) == 0 { + return 0, io.EOF + } + messageLength, varIntLength := proto.DecodeVarint(b) + if varIntLength == 0 || varIntLength > binary.MaxVarintLen32 { + return 0, errInvalidVarint + } + totalLength := varIntLength + int(messageLength) + if totalLength > len(b) { + return 0, errors.Errorf("protobufparse: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b)) + } + mf.Reset() + return totalLength, mf.Unmarshal(b[varIntLength:totalLength]) +} diff --git a/scrape/scrape.go b/scrape/scrape.go index d9ed8a02b..109d89dc3 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -40,6 +40,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/pool" "github.com/prometheus/prometheus/pkg/relabel" @@ -706,7 +707,7 @@ type targetScraper struct { var errBodySizeLimit = errors.New("body size limit exceeded") -const acceptHeader = `application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1` +const acceptHeader = `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited,application/openmetrics-text; version=0.0.1;q=0.5,text/plain;version=0.0.4;q=0.2,*/*;q=0.1` var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version) @@ -1378,9 +1379,13 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, loop: for { var ( - et textparse.Entry - sampleAdded bool - e exemplar.Exemplar + et textparse.Entry + sampleAdded, isHistogram bool + met []byte + parsedTimestamp *int64 + val float64 + his histogram.SparseHistogram + e exemplar.Exemplar ) if et, err = p.Next(); err != nil { if err == io.EOF { @@ -1400,17 +1405,23 @@ loop: continue case textparse.EntryComment: continue + case textparse.EntryHistogram: + isHistogram = true default: } total++ t := defTime - met, tp, v := p.Series() - if !sl.honorTimestamps { - tp = nil + if isHistogram { + met, parsedTimestamp, his = p.Histogram() + } else { + met, parsedTimestamp, val = p.Series() } - if tp != nil { - t = *tp + if !sl.honorTimestamps { + parsedTimestamp = nil + } + if parsedTimestamp != nil { + t = *parsedTimestamp } if sl.cache.getDropped(yoloString(met)) { @@ -1453,8 +1464,12 @@ loop: } } - ref, err = app.Append(ref, lset, t, v) - sampleAdded, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, &appErrs) + if isHistogram { + ref, err = app.AppendHistogram(ref, lset, t, his) + } else { + ref, err = app.Append(ref, lset, t, val) + } + sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &appErrs) if err != nil { if err != storage.ErrNotFound { level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err) @@ -1463,7 +1478,7 @@ loop: } if !ok { - if tp == nil { + if parsedTimestamp == nil { // Bypass staleness logic if there is an explicit timestamp. sl.cache.trackStaleness(hash, lset) } @@ -1512,6 +1527,7 @@ loop: if err == nil { sl.cache.forEachStale(func(lset labels.Labels) bool { // Series no longer exposed, mark it stale. + // TODO(beorn7): Appending staleness markers breaks horribly for histograms. _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN)) switch errors.Cause(err) { case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 56109d4e2..d87ed1fae 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1831,8 +1831,8 @@ func TestTargetScraperScrapeOK(t *testing.T) { server := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { accept := r.Header.Get("Accept") - if !strings.HasPrefix(accept, "application/openmetrics-text;") { - t.Errorf("Expected Accept header to prefer application/openmetrics-text, got %q", accept) + if !strings.HasPrefix(accept, "application/vnd.google.protobuf;") { + t.Errorf("Expected Accept header to prefer application/vnd.google.protobuf, got %q", accept) } timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")