mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-01 08:57:26 -08:00
Merge pull request #9027 from prometheus/beorn7/histogram2
Hacky implementation of protobuf parsing
This commit is contained in:
commit
dbab4957f7
|
@ -17,16 +17,22 @@ import (
|
||||||
"mime"
|
"mime"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/pkg/histogram"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Parser parses samples from a byte slice of samples in the official
|
// Parser parses samples from a byte slice of samples in the official
|
||||||
// Prometheus and OpenMetrics text exposition formats.
|
// Prometheus and OpenMetrics text exposition formats.
|
||||||
type Parser interface {
|
type Parser interface {
|
||||||
// Series returns the bytes of the series, the timestamp if set, and the value
|
// Series returns the bytes of a series with a simple float64 as a
|
||||||
// of the current sample.
|
// value, the timestamp if set, and the value of the current sample.
|
||||||
Series() ([]byte, *int64, float64)
|
Series() ([]byte, *int64, float64)
|
||||||
|
|
||||||
|
// Histogram returns the bytes of a series with a sparse histogram as a
|
||||||
|
// value, the timestamp if set, and the sparse histogram in the current
|
||||||
|
// sample.
|
||||||
|
Histogram() ([]byte, *int64, histogram.SparseHistogram)
|
||||||
|
|
||||||
// Help returns the metric name and help text in the current entry.
|
// Help returns the metric name and help text in the current entry.
|
||||||
// Must only be called after Next returned a help entry.
|
// Must only be called after Next returned a help entry.
|
||||||
// The returned byte slices become invalid after the next call to Next.
|
// The returned byte slices become invalid after the next call to Next.
|
||||||
|
@ -63,22 +69,30 @@ type Parser interface {
|
||||||
// New returns a new parser of the byte slice.
|
// New returns a new parser of the byte slice.
|
||||||
func New(b []byte, contentType string) Parser {
|
func New(b []byte, contentType string) Parser {
|
||||||
mediaType, _, err := mime.ParseMediaType(contentType)
|
mediaType, _, err := mime.ParseMediaType(contentType)
|
||||||
if err == nil && mediaType == "application/openmetrics-text" {
|
if err != nil {
|
||||||
return NewOpenMetricsParser(b)
|
return NewPromParser(b)
|
||||||
|
}
|
||||||
|
switch mediaType {
|
||||||
|
case "application/openmetrics-text":
|
||||||
|
return NewOpenMetricsParser(b)
|
||||||
|
case "application/vnd.google.protobuf":
|
||||||
|
return NewProtobufParser(b)
|
||||||
|
default:
|
||||||
|
return NewPromParser(b)
|
||||||
}
|
}
|
||||||
return NewPromParser(b)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Entry represents the type of a parsed entry.
|
// Entry represents the type of a parsed entry.
|
||||||
type Entry int
|
type Entry int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
EntryInvalid Entry = -1
|
EntryInvalid Entry = -1
|
||||||
EntryType Entry = 0
|
EntryType Entry = 0
|
||||||
EntryHelp Entry = 1
|
EntryHelp Entry = 1
|
||||||
EntrySeries Entry = 2
|
EntrySeries Entry = 2 // A series with a simple float64 as value.
|
||||||
EntryComment Entry = 3
|
EntryComment Entry = 3
|
||||||
EntryUnit Entry = 4
|
EntryUnit Entry = 4
|
||||||
|
EntryHistogram Entry = 5 // A series with a sparse histogram as a value.
|
||||||
)
|
)
|
||||||
|
|
||||||
// MetricType represents metric type values.
|
// MetricType represents metric type values.
|
||||||
|
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/pkg/histogram"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/value"
|
"github.com/prometheus/prometheus/pkg/value"
|
||||||
)
|
)
|
||||||
|
@ -113,6 +114,12 @@ func (p *OpenMetricsParser) Series() ([]byte, *int64, float64) {
|
||||||
return p.series, nil, p.val
|
return p.series, nil, p.val
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Histogram always returns (nil, nil, SparseHistogram{}) because OpenMetrics
|
||||||
|
// does not support sparse histograms.
|
||||||
|
func (p *OpenMetricsParser) Histogram() ([]byte, *int64, histogram.SparseHistogram) {
|
||||||
|
return nil, nil, histogram.SparseHistogram{}
|
||||||
|
}
|
||||||
|
|
||||||
// Help returns the metric name and help text in the current entry.
|
// Help returns the metric name and help text in the current entry.
|
||||||
// Must only be called after Next returned a help entry.
|
// Must only be called after Next returned a help entry.
|
||||||
// The returned byte slices become invalid after the next call to Next.
|
// The returned byte slices become invalid after the next call to Next.
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/pkg/histogram"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/value"
|
"github.com/prometheus/prometheus/pkg/value"
|
||||||
)
|
)
|
||||||
|
@ -168,6 +169,12 @@ func (p *PromParser) Series() ([]byte, *int64, float64) {
|
||||||
return p.series, nil, p.val
|
return p.series, nil, p.val
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Histogram always returns (nil, nil, SparseHistogram{}) because the Prometheus
|
||||||
|
// text format does not support sparse histograms.
|
||||||
|
func (p *PromParser) Histogram() ([]byte, *int64, histogram.SparseHistogram) {
|
||||||
|
return nil, nil, histogram.SparseHistogram{}
|
||||||
|
}
|
||||||
|
|
||||||
// Help returns the metric name and help text in the current entry.
|
// Help returns the metric name and help text in the current entry.
|
||||||
// Must only be called after Next returned a help entry.
|
// Must only be called after Next returned a help entry.
|
||||||
// The returned byte slices become invalid after the next call to Next.
|
// The returned byte slices become invalid after the next call to Next.
|
||||||
|
|
311
pkg/textparse/protobufparse.go
Normal file
311
pkg/textparse/protobufparse.go
Normal file
|
@ -0,0 +1,311 @@
|
||||||
|
// Copyright 2021 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package textparse
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"io"
|
||||||
|
"sort"
|
||||||
|
"unicode/utf8"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/pkg/histogram"
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
|
||||||
|
dto "github.com/prometheus/prometheus/prompb/io/prometheus/client"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ProtobufParser is a very inefficient way of unmarshaling the old Prometheus
|
||||||
|
// protobuf format and then present it as it if were parsed by a
|
||||||
|
// Prometheus-2-style text parser. This is only done so that we can easily plug
|
||||||
|
// in the protobuf format into Prometheus 2. For future use (with the final
|
||||||
|
// format that will be used for sparse histograms), we have to revisit the
|
||||||
|
// parsing. A lot of the efficiency tricks of the Prometheus-2-style parsing
|
||||||
|
// could be used in a similar fashion (byte-slice pointers into the raw
|
||||||
|
// payload), which requires some hand-coded protobuf handling. But the current
|
||||||
|
// parsers all expect the full series name (metric name plus label pairs) as one
|
||||||
|
// string, which is not how things are represented in the protobuf format. If
|
||||||
|
// the re-arrangement work is actually causing problems (which has to be seen),
|
||||||
|
// that expectation needs to be changed.
|
||||||
|
//
|
||||||
|
// TODO(beorn7): The parser currently ignores summaries and legacy histograms
|
||||||
|
// (those without sparse buckets) to keep things simple.
|
||||||
|
type ProtobufParser struct {
|
||||||
|
in []byte // The intput to parse.
|
||||||
|
inPos int // Position within the input.
|
||||||
|
state Entry // State is marked by the entry we are
|
||||||
|
// processing. EntryInvalid implies that we have to
|
||||||
|
// decode the next MetricFamily.
|
||||||
|
metricPos int // Position within Metric slice.
|
||||||
|
mf *dto.MetricFamily
|
||||||
|
|
||||||
|
// The following are just shenanigans to satisfy the Parser interface.
|
||||||
|
metricBytes *bytes.Buffer // A somewhat fluid representation of the current metric.
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProtobufParser(b []byte) Parser {
|
||||||
|
return &ProtobufParser{
|
||||||
|
in: b,
|
||||||
|
state: EntryInvalid,
|
||||||
|
mf: &dto.MetricFamily{},
|
||||||
|
metricBytes: &bytes.Buffer{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Series returns the bytes of a series with a simple float64 as a
|
||||||
|
// value, the timestamp if set, and the value of the current sample.
|
||||||
|
func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
|
||||||
|
var (
|
||||||
|
m = p.mf.GetMetric()[p.metricPos]
|
||||||
|
ts = m.GetTimestampMs()
|
||||||
|
v float64
|
||||||
|
)
|
||||||
|
switch p.mf.GetType() {
|
||||||
|
case dto.MetricType_COUNTER:
|
||||||
|
v = m.GetCounter().Value
|
||||||
|
case dto.MetricType_GAUGE:
|
||||||
|
v = m.GetGauge().Value
|
||||||
|
case dto.MetricType_UNTYPED:
|
||||||
|
v = m.GetUntyped().Value
|
||||||
|
default:
|
||||||
|
panic("encountered unexpected metric type, this is a bug")
|
||||||
|
}
|
||||||
|
if ts != 0 {
|
||||||
|
return p.metricBytes.Bytes(), &ts, v
|
||||||
|
}
|
||||||
|
// Nasty hack: Assume that ts==0 means no timestamp. That's not true in
|
||||||
|
// general, but proto3 has no distinction between unset and
|
||||||
|
// default. Need to avoid in the final format.
|
||||||
|
return p.metricBytes.Bytes(), nil, v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Histogram returns the bytes of a series with a sparse histogram as a
|
||||||
|
// value, the timestamp if set, and the sparse histogram in the current
|
||||||
|
// sample.
|
||||||
|
func (p *ProtobufParser) Histogram() ([]byte, *int64, histogram.SparseHistogram) {
|
||||||
|
var (
|
||||||
|
m = p.mf.GetMetric()[p.metricPos]
|
||||||
|
ts = m.GetTimestampMs()
|
||||||
|
h = m.GetHistogram()
|
||||||
|
)
|
||||||
|
sh := histogram.SparseHistogram{
|
||||||
|
Count: h.GetSampleCount(),
|
||||||
|
Sum: h.GetSampleSum(),
|
||||||
|
ZeroThreshold: h.GetSbZeroThreshold(),
|
||||||
|
ZeroCount: h.GetSbZeroCount(),
|
||||||
|
Schema: h.GetSbSchema(),
|
||||||
|
PositiveSpans: make([]histogram.Span, len(h.GetSbPositive().GetSpan())),
|
||||||
|
PositiveBuckets: h.GetSbPositive().GetDelta(),
|
||||||
|
NegativeSpans: make([]histogram.Span, len(h.GetSbNegative().GetSpan())),
|
||||||
|
NegativeBuckets: h.GetSbNegative().GetDelta(),
|
||||||
|
}
|
||||||
|
for i, span := range h.GetSbPositive().GetSpan() {
|
||||||
|
sh.PositiveSpans[i].Offset = span.GetOffset()
|
||||||
|
sh.PositiveSpans[i].Length = span.GetLength()
|
||||||
|
}
|
||||||
|
for i, span := range h.GetSbNegative().GetSpan() {
|
||||||
|
sh.NegativeSpans[i].Offset = span.GetOffset()
|
||||||
|
sh.NegativeSpans[i].Length = span.GetLength()
|
||||||
|
}
|
||||||
|
if ts != 0 {
|
||||||
|
return p.metricBytes.Bytes(), &ts, sh
|
||||||
|
}
|
||||||
|
// Nasty hack: Assume that ts==0 means no timestamp. That's not true in
|
||||||
|
// general, but proto3 has no distinction between unset and
|
||||||
|
// default. Need to avoid in the final format.
|
||||||
|
return p.metricBytes.Bytes(), nil, sh
|
||||||
|
}
|
||||||
|
|
||||||
|
// Help returns the metric name and help text in the current entry.
|
||||||
|
// Must only be called after Next returned a help entry.
|
||||||
|
// The returned byte slices become invalid after the next call to Next.
|
||||||
|
func (p *ProtobufParser) Help() ([]byte, []byte) {
|
||||||
|
return p.metricBytes.Bytes(), []byte(p.mf.GetHelp())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type returns the metric name and type in the current entry.
|
||||||
|
// Must only be called after Next returned a type entry.
|
||||||
|
// The returned byte slices become invalid after the next call to Next.
|
||||||
|
func (p *ProtobufParser) Type() ([]byte, MetricType) {
|
||||||
|
n := p.metricBytes.Bytes()
|
||||||
|
switch p.mf.GetType() {
|
||||||
|
case dto.MetricType_COUNTER:
|
||||||
|
return n, MetricTypeCounter
|
||||||
|
case dto.MetricType_GAUGE:
|
||||||
|
return n, MetricTypeGauge
|
||||||
|
case dto.MetricType_HISTOGRAM:
|
||||||
|
return n, MetricTypeGaugeHistogram
|
||||||
|
}
|
||||||
|
return n, MetricTypeUnknown
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unit always returns (nil, nil) because units aren't supported by the protobuf
|
||||||
|
// format.
|
||||||
|
func (p *ProtobufParser) Unit() ([]byte, []byte) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Comment always returns nil because comments aren't supported by the protobuf
|
||||||
|
// format.
|
||||||
|
func (p *ProtobufParser) Comment() []byte {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metric writes the labels of the current sample into the passed labels.
|
||||||
|
// It returns the string from which the metric was parsed.
|
||||||
|
func (p *ProtobufParser) Metric(l *labels.Labels) string {
|
||||||
|
*l = append(*l, labels.Label{
|
||||||
|
Name: labels.MetricName,
|
||||||
|
Value: p.mf.GetName(),
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
|
||||||
|
*l = append(*l, labels.Label{
|
||||||
|
Name: lp.GetName(),
|
||||||
|
Value: lp.GetValue(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort labels to maintain the sorted labels invariant.
|
||||||
|
sort.Sort(*l)
|
||||||
|
|
||||||
|
return p.metricBytes.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exemplar always returns false because exemplars aren't supported yet by the
|
||||||
|
// protobuf format.
|
||||||
|
func (p *ProtobufParser) Exemplar(l *exemplar.Exemplar) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next advances the parser to the next "sample" (emulating the behavior of a
|
||||||
|
// text format parser). It returns (EntryInvalid, io.EOF) if no samples were
|
||||||
|
// read.
|
||||||
|
func (p *ProtobufParser) Next() (Entry, error) {
|
||||||
|
switch p.state {
|
||||||
|
case EntryInvalid:
|
||||||
|
p.metricPos = 0
|
||||||
|
n, err := readDelimited(p.in[p.inPos:], p.mf)
|
||||||
|
p.inPos += n
|
||||||
|
if err != nil {
|
||||||
|
return p.state, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip empty metric families. While checking for emptiness, ignore
|
||||||
|
// summaries and legacy histograms for now.
|
||||||
|
metricFound := false
|
||||||
|
metricType := p.mf.GetType()
|
||||||
|
for _, m := range p.mf.GetMetric() {
|
||||||
|
if metricType == dto.MetricType_COUNTER ||
|
||||||
|
metricType == dto.MetricType_GAUGE ||
|
||||||
|
metricType == dto.MetricType_UNTYPED ||
|
||||||
|
(metricType == dto.MetricType_HISTOGRAM &&
|
||||||
|
// A histogram with a non-zero SbZerothreshold
|
||||||
|
// is a sparse histogram.
|
||||||
|
m.GetHistogram().GetSbZeroThreshold() != 0) {
|
||||||
|
metricFound = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !metricFound {
|
||||||
|
return p.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
// We are at the beginning of a metric family. Put only the name
|
||||||
|
// into metricBytes and validate only name and help for now.
|
||||||
|
name := p.mf.GetName()
|
||||||
|
if !model.IsValidMetricName(model.LabelValue(name)) {
|
||||||
|
return EntryInvalid, errors.Errorf("invalid metric name: %s", name)
|
||||||
|
}
|
||||||
|
if help := p.mf.GetHelp(); !utf8.ValidString(help) {
|
||||||
|
return EntryInvalid, errors.Errorf("invalid help for metric %q: %s", name, help)
|
||||||
|
}
|
||||||
|
p.metricBytes.Reset()
|
||||||
|
p.metricBytes.WriteString(name)
|
||||||
|
|
||||||
|
p.state = EntryHelp
|
||||||
|
case EntryHelp:
|
||||||
|
p.state = EntryType
|
||||||
|
case EntryType:
|
||||||
|
if p.mf.GetType() == dto.MetricType_HISTOGRAM {
|
||||||
|
p.state = EntryHistogram
|
||||||
|
} else {
|
||||||
|
p.state = EntrySeries
|
||||||
|
}
|
||||||
|
if err := p.updateMetricBytes(); err != nil {
|
||||||
|
return EntryInvalid, err
|
||||||
|
}
|
||||||
|
case EntryHistogram, EntrySeries:
|
||||||
|
p.metricPos++
|
||||||
|
if p.metricPos >= len(p.mf.GetMetric()) {
|
||||||
|
p.state = EntryInvalid
|
||||||
|
return p.Next()
|
||||||
|
}
|
||||||
|
if err := p.updateMetricBytes(); err != nil {
|
||||||
|
return EntryInvalid, err
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return EntryInvalid, errors.Errorf("invalid protobuf parsing state: %d", p.state)
|
||||||
|
}
|
||||||
|
return p.state, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ProtobufParser) updateMetricBytes() error {
|
||||||
|
b := p.metricBytes
|
||||||
|
b.Reset()
|
||||||
|
b.WriteString(p.mf.GetName())
|
||||||
|
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
|
||||||
|
b.WriteByte(model.SeparatorByte)
|
||||||
|
n := lp.GetName()
|
||||||
|
if !model.LabelName(n).IsValid() {
|
||||||
|
return errors.Errorf("invalid label name: %s", n)
|
||||||
|
}
|
||||||
|
b.WriteString(n)
|
||||||
|
b.WriteByte(model.SeparatorByte)
|
||||||
|
v := lp.GetValue()
|
||||||
|
if !utf8.ValidString(v) {
|
||||||
|
return errors.Errorf("invalid label value: %s", v)
|
||||||
|
}
|
||||||
|
b.WriteString(v)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var errInvalidVarint = errors.New("protobufparse: invalid varint encountered")
|
||||||
|
|
||||||
|
// readDelimited is essentially doing what the function of the same name in
|
||||||
|
// github.com/matttproud/golang_protobuf_extensions/pbutil is doing, but it is
|
||||||
|
// specific to a MetricFamily, utilizes the more efficient gogo-protobuf
|
||||||
|
// unmarshaling, and acts on a byte slice directly without any additional
|
||||||
|
// staging buffers.
|
||||||
|
func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) {
|
||||||
|
if len(b) == 0 {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
messageLength, varIntLength := proto.DecodeVarint(b)
|
||||||
|
if varIntLength == 0 || varIntLength > binary.MaxVarintLen32 {
|
||||||
|
return 0, errInvalidVarint
|
||||||
|
}
|
||||||
|
totalLength := varIntLength + int(messageLength)
|
||||||
|
if totalLength > len(b) {
|
||||||
|
return 0, errors.Errorf("protobufparse: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b))
|
||||||
|
}
|
||||||
|
mf.Reset()
|
||||||
|
return totalLength, mf.Unmarshal(b[varIntLength:totalLength])
|
||||||
|
}
|
|
@ -40,6 +40,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/pkg/histogram"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/pool"
|
"github.com/prometheus/prometheus/pkg/pool"
|
||||||
"github.com/prometheus/prometheus/pkg/relabel"
|
"github.com/prometheus/prometheus/pkg/relabel"
|
||||||
|
@ -706,7 +707,7 @@ type targetScraper struct {
|
||||||
|
|
||||||
var errBodySizeLimit = errors.New("body size limit exceeded")
|
var errBodySizeLimit = errors.New("body size limit exceeded")
|
||||||
|
|
||||||
const acceptHeader = `application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`
|
const acceptHeader = `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited,application/openmetrics-text; version=0.0.1;q=0.5,text/plain;version=0.0.4;q=0.2,*/*;q=0.1`
|
||||||
|
|
||||||
var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version)
|
var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version)
|
||||||
|
|
||||||
|
@ -1378,9 +1379,13 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
var (
|
var (
|
||||||
et textparse.Entry
|
et textparse.Entry
|
||||||
sampleAdded bool
|
sampleAdded, isHistogram bool
|
||||||
e exemplar.Exemplar
|
met []byte
|
||||||
|
parsedTimestamp *int64
|
||||||
|
val float64
|
||||||
|
his histogram.SparseHistogram
|
||||||
|
e exemplar.Exemplar
|
||||||
)
|
)
|
||||||
if et, err = p.Next(); err != nil {
|
if et, err = p.Next(); err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
|
@ -1400,17 +1405,23 @@ loop:
|
||||||
continue
|
continue
|
||||||
case textparse.EntryComment:
|
case textparse.EntryComment:
|
||||||
continue
|
continue
|
||||||
|
case textparse.EntryHistogram:
|
||||||
|
isHistogram = true
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
total++
|
total++
|
||||||
|
|
||||||
t := defTime
|
t := defTime
|
||||||
met, tp, v := p.Series()
|
if isHistogram {
|
||||||
if !sl.honorTimestamps {
|
met, parsedTimestamp, his = p.Histogram()
|
||||||
tp = nil
|
} else {
|
||||||
|
met, parsedTimestamp, val = p.Series()
|
||||||
}
|
}
|
||||||
if tp != nil {
|
if !sl.honorTimestamps {
|
||||||
t = *tp
|
parsedTimestamp = nil
|
||||||
|
}
|
||||||
|
if parsedTimestamp != nil {
|
||||||
|
t = *parsedTimestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
if sl.cache.getDropped(yoloString(met)) {
|
if sl.cache.getDropped(yoloString(met)) {
|
||||||
|
@ -1453,8 +1464,12 @@ loop:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ref, err = app.Append(ref, lset, t, v)
|
if isHistogram {
|
||||||
sampleAdded, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, &appErrs)
|
ref, err = app.AppendHistogram(ref, lset, t, his)
|
||||||
|
} else {
|
||||||
|
ref, err = app.Append(ref, lset, t, val)
|
||||||
|
}
|
||||||
|
sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &appErrs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != storage.ErrNotFound {
|
if err != storage.ErrNotFound {
|
||||||
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
|
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
|
||||||
|
@ -1463,7 +1478,7 @@ loop:
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
if tp == nil {
|
if parsedTimestamp == nil {
|
||||||
// Bypass staleness logic if there is an explicit timestamp.
|
// Bypass staleness logic if there is an explicit timestamp.
|
||||||
sl.cache.trackStaleness(hash, lset)
|
sl.cache.trackStaleness(hash, lset)
|
||||||
}
|
}
|
||||||
|
@ -1512,6 +1527,7 @@ loop:
|
||||||
if err == nil {
|
if err == nil {
|
||||||
sl.cache.forEachStale(func(lset labels.Labels) bool {
|
sl.cache.forEachStale(func(lset labels.Labels) bool {
|
||||||
// Series no longer exposed, mark it stale.
|
// Series no longer exposed, mark it stale.
|
||||||
|
// TODO(beorn7): Appending staleness markers breaks horribly for histograms.
|
||||||
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
|
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
|
||||||
switch errors.Cause(err) {
|
switch errors.Cause(err) {
|
||||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||||
|
|
|
@ -1831,8 +1831,8 @@ func TestTargetScraperScrapeOK(t *testing.T) {
|
||||||
server := httptest.NewServer(
|
server := httptest.NewServer(
|
||||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
accept := r.Header.Get("Accept")
|
accept := r.Header.Get("Accept")
|
||||||
if !strings.HasPrefix(accept, "application/openmetrics-text;") {
|
if !strings.HasPrefix(accept, "application/vnd.google.protobuf;") {
|
||||||
t.Errorf("Expected Accept header to prefer application/openmetrics-text, got %q", accept)
|
t.Errorf("Expected Accept header to prefer application/vnd.google.protobuf, got %q", accept)
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")
|
timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")
|
||||||
|
|
Loading…
Reference in a new issue