mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-02 09:27:27 -08:00
7a8bb8222c
A lot of this code was hacked together, literally during a hackathon. This commit intends not to change the code substantially, but just make the code obey the usual style practices. A (possibly incomplete) list of areas: * Generally address linter warnings. * The `pgk` directory is deprecated as per dev-summit. No new packages should be added to it. I moved the new `pkg/histogram` package to `model` anticipating what's proposed in #9478. * Make the naming of the Sparse Histogram more consistent. Including abbreviations, there were just too many names for it: SparseHistogram, Histogram, Histo, hist, his, shs, h. The idea is to call it "Histogram" in general. Only add "Sparse" if it is needed to avoid confusion with conventional Histograms (which is rare because the TSDB really has no notion of conventional Histograms). Use abbreviations only in local scope, and then really abbreviate (not just removing three out of seven letters like in "Histo"). This is in the spirit of https://github.com/golang/go/wiki/CodeReviewComments#variable-names * Several other minor name changes. * A lot of formatting of doc comments. For one, following https://github.com/golang/go/wiki/CodeReviewComments#comment-sentences , but also layout question, anticipating how things will look like when rendered by `godoc` (even where `godoc` doesn't render them right now because they are for unexported types or not a doc comment at all but just a normal code comment - consistency is queen!). * Re-enabled `TestQueryLog` and `TestEndopints` (they pass now, leaving them disabled was presumably an oversight). * Bucket iterator for histogram.Histogram is now created with a method. * HistogramChunk.iterator now allows iterator recycling. (I think @dieterbe only commented it out because he was confused by the question in the comment.) * HistogramAppender.Append panics now because we decided to treat staleness marker differently. Signed-off-by: beorn7 <beorn@grafana.com>
483 lines
15 KiB
Go
483 lines
15 KiB
Go
// Copyright 2021 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package textparse
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"sort"
|
|
"strings"
|
|
"unicode/utf8"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
|
|
dto "github.com/prometheus/prometheus/prompb/io/prometheus/client"
|
|
)
|
|
|
|
// ProtobufParser is a very inefficient way of unmarshaling the old Prometheus
|
|
// protobuf format and then present it as it if were parsed by a
|
|
// Prometheus-2-style text parser. This is only done so that we can easily plug
|
|
// in the protobuf format into Prometheus 2. For future use (with the final
|
|
// format that will be used for sparse histograms), we have to revisit the
|
|
// parsing. A lot of the efficiency tricks of the Prometheus-2-style parsing
|
|
// could be used in a similar fashion (byte-slice pointers into the raw
|
|
// payload), which requires some hand-coded protobuf handling. But the current
|
|
// parsers all expect the full series name (metric name plus label pairs) as one
|
|
// string, which is not how things are represented in the protobuf format. If
|
|
// the re-arrangement work is actually causing problems (which has to be seen),
|
|
// that expectation needs to be changed.
|
|
type ProtobufParser struct {
|
|
in []byte // The intput to parse.
|
|
inPos int // Position within the input.
|
|
metricPos int // Position within Metric slice.
|
|
// fieldPos is the position within a Summary or (legacy) Histogram. -2
|
|
// is the count. -1 is the sum. Otherwise it is the index within
|
|
// quantiles/buckets.
|
|
fieldPos int
|
|
fieldsDone bool // true if no more fields of a Summary or (legacy) Histogram to be processed.
|
|
// state is marked by the entry we are processing. EntryInvalid implies
|
|
// that we have to decode the next MetricFamily.
|
|
state Entry
|
|
|
|
mf *dto.MetricFamily
|
|
|
|
// The following are just shenanigans to satisfy the Parser interface.
|
|
metricBytes *bytes.Buffer // A somewhat fluid representation of the current metric.
|
|
}
|
|
|
|
// NewProtobufParser returns a parser for the payload in the byte slice.
|
|
func NewProtobufParser(b []byte) Parser {
|
|
return &ProtobufParser{
|
|
in: b,
|
|
state: EntryInvalid,
|
|
mf: &dto.MetricFamily{},
|
|
metricBytes: &bytes.Buffer{},
|
|
}
|
|
}
|
|
|
|
// Series returns the bytes of a series with a simple float64 as a
|
|
// value, the timestamp if set, and the value of the current sample.
|
|
func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
|
|
var (
|
|
m = p.mf.GetMetric()[p.metricPos]
|
|
ts = m.GetTimestampMs()
|
|
v float64
|
|
)
|
|
switch p.mf.GetType() {
|
|
case dto.MetricType_COUNTER:
|
|
v = m.GetCounter().GetValue()
|
|
case dto.MetricType_GAUGE:
|
|
v = m.GetGauge().GetValue()
|
|
case dto.MetricType_UNTYPED:
|
|
v = m.GetUntyped().GetValue()
|
|
case dto.MetricType_SUMMARY:
|
|
s := m.GetSummary()
|
|
switch p.fieldPos {
|
|
case -2:
|
|
v = float64(s.GetSampleCount())
|
|
case -1:
|
|
v = s.GetSampleSum()
|
|
// Need to detect a summaries without quantile here.
|
|
if len(s.GetQuantile()) == 0 {
|
|
p.fieldsDone = true
|
|
}
|
|
default:
|
|
v = s.GetQuantile()[p.fieldPos].GetValue()
|
|
}
|
|
case dto.MetricType_HISTOGRAM:
|
|
// This should only happen for a legacy histogram.
|
|
h := m.GetHistogram()
|
|
switch p.fieldPos {
|
|
case -2:
|
|
v = float64(h.GetSampleCount())
|
|
case -1:
|
|
v = h.GetSampleSum()
|
|
default:
|
|
bb := h.GetBucket()
|
|
if p.fieldPos >= len(bb) {
|
|
v = float64(h.GetSampleCount())
|
|
} else {
|
|
v = float64(bb[p.fieldPos].GetCumulativeCount())
|
|
}
|
|
}
|
|
default:
|
|
panic("encountered unexpected metric type, this is a bug")
|
|
}
|
|
if ts != 0 {
|
|
return p.metricBytes.Bytes(), &ts, v
|
|
}
|
|
// Nasty hack: Assume that ts==0 means no timestamp. That's not true in
|
|
// general, but proto3 has no distinction between unset and
|
|
// default. Need to avoid in the final format.
|
|
return p.metricBytes.Bytes(), nil, v
|
|
}
|
|
|
|
// Histogram returns the bytes of a series with a sparse histogram as a
|
|
// value, the timestamp if set, and the sparse histogram in the current
|
|
// sample.
|
|
func (p *ProtobufParser) Histogram() ([]byte, *int64, histogram.Histogram) {
|
|
var (
|
|
m = p.mf.GetMetric()[p.metricPos]
|
|
ts = m.GetTimestampMs()
|
|
h = m.GetHistogram()
|
|
)
|
|
sh := histogram.Histogram{
|
|
Count: h.GetSampleCount(),
|
|
Sum: h.GetSampleSum(),
|
|
ZeroThreshold: h.GetSbZeroThreshold(),
|
|
ZeroCount: h.GetSbZeroCount(),
|
|
Schema: h.GetSbSchema(),
|
|
PositiveSpans: make([]histogram.Span, len(h.GetSbPositive().GetSpan())),
|
|
PositiveBuckets: h.GetSbPositive().GetDelta(),
|
|
NegativeSpans: make([]histogram.Span, len(h.GetSbNegative().GetSpan())),
|
|
NegativeBuckets: h.GetSbNegative().GetDelta(),
|
|
}
|
|
for i, span := range h.GetSbPositive().GetSpan() {
|
|
sh.PositiveSpans[i].Offset = span.GetOffset()
|
|
sh.PositiveSpans[i].Length = span.GetLength()
|
|
}
|
|
for i, span := range h.GetSbNegative().GetSpan() {
|
|
sh.NegativeSpans[i].Offset = span.GetOffset()
|
|
sh.NegativeSpans[i].Length = span.GetLength()
|
|
}
|
|
if ts != 0 {
|
|
return p.metricBytes.Bytes(), &ts, sh
|
|
}
|
|
// Nasty hack: Assume that ts==0 means no timestamp. That's not true in
|
|
// general, but proto3 has no distinction between unset and
|
|
// default. Need to avoid in the final format.
|
|
return p.metricBytes.Bytes(), nil, sh
|
|
}
|
|
|
|
// Help returns the metric name and help text in the current entry.
|
|
// Must only be called after Next returned a help entry.
|
|
// The returned byte slices become invalid after the next call to Next.
|
|
func (p *ProtobufParser) Help() ([]byte, []byte) {
|
|
return p.metricBytes.Bytes(), []byte(p.mf.GetHelp())
|
|
}
|
|
|
|
// Type returns the metric name and type in the current entry.
|
|
// Must only be called after Next returned a type entry.
|
|
// The returned byte slices become invalid after the next call to Next.
|
|
func (p *ProtobufParser) Type() ([]byte, MetricType) {
|
|
n := p.metricBytes.Bytes()
|
|
switch p.mf.GetType() {
|
|
case dto.MetricType_COUNTER:
|
|
return n, MetricTypeCounter
|
|
case dto.MetricType_GAUGE:
|
|
return n, MetricTypeGauge
|
|
case dto.MetricType_HISTOGRAM:
|
|
return n, MetricTypeHistogram
|
|
case dto.MetricType_SUMMARY:
|
|
return n, MetricTypeSummary
|
|
}
|
|
return n, MetricTypeUnknown
|
|
}
|
|
|
|
// Unit always returns (nil, nil) because units aren't supported by the protobuf
|
|
// format.
|
|
func (p *ProtobufParser) Unit() ([]byte, []byte) {
|
|
return nil, nil
|
|
}
|
|
|
|
// Comment always returns nil because comments aren't supported by the protobuf
|
|
// format.
|
|
func (p *ProtobufParser) Comment() []byte {
|
|
return nil
|
|
}
|
|
|
|
// Metric writes the labels of the current sample into the passed labels.
|
|
// It returns the string from which the metric was parsed.
|
|
func (p *ProtobufParser) Metric(l *labels.Labels) string {
|
|
*l = append(*l, labels.Label{
|
|
Name: labels.MetricName,
|
|
Value: p.getMagicName(),
|
|
})
|
|
|
|
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
|
|
*l = append(*l, labels.Label{
|
|
Name: lp.GetName(),
|
|
Value: lp.GetValue(),
|
|
})
|
|
}
|
|
if needed, name, value := p.getMagicLabel(); needed {
|
|
*l = append(*l, labels.Label{Name: name, Value: value})
|
|
}
|
|
|
|
// Sort labels to maintain the sorted labels invariant.
|
|
sort.Sort(*l)
|
|
|
|
return p.metricBytes.String()
|
|
}
|
|
|
|
// Exemplar writes the exemplar of the current sample into the passed
|
|
// exemplar. It returns if an exemplar exists or not. In case of a sparse
|
|
// histogram, the legacy bucket section is still used for exemplars. To ingest
|
|
// all examplars, call the Exemplar method repeatedly until it returns false.
|
|
func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
|
|
m := p.mf.GetMetric()[p.metricPos]
|
|
var exProto *dto.Exemplar
|
|
switch p.mf.GetType() {
|
|
case dto.MetricType_COUNTER:
|
|
exProto = m.GetCounter().GetExemplar()
|
|
case dto.MetricType_HISTOGRAM:
|
|
bb := m.GetHistogram().GetBucket()
|
|
if p.fieldPos < 0 {
|
|
if p.state == EntrySeries {
|
|
return false // At _count or _sum.
|
|
}
|
|
p.fieldPos = 0 // Start at 1st bucket for sparse histograms.
|
|
}
|
|
for p.fieldPos < len(bb) {
|
|
exProto = bb[p.fieldPos].GetExemplar()
|
|
if p.state == EntrySeries {
|
|
break
|
|
}
|
|
p.fieldPos++
|
|
if exProto != nil {
|
|
break
|
|
}
|
|
}
|
|
default:
|
|
return false
|
|
}
|
|
if exProto == nil {
|
|
return false
|
|
}
|
|
ex.Value = exProto.GetValue()
|
|
if ts := exProto.GetTimestamp(); ts != nil {
|
|
ex.HasTs = true
|
|
ex.Ts = ts.GetSeconds()*1000 + int64(ts.GetNanos()/1_000_000)
|
|
}
|
|
for _, lp := range exProto.GetLabel() {
|
|
ex.Labels = append(ex.Labels, labels.Label{
|
|
Name: lp.GetName(),
|
|
Value: lp.GetValue(),
|
|
})
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Next advances the parser to the next "sample" (emulating the behavior of a
|
|
// text format parser). It returns (EntryInvalid, io.EOF) if no samples were
|
|
// read.
|
|
func (p *ProtobufParser) Next() (Entry, error) {
|
|
switch p.state {
|
|
case EntryInvalid:
|
|
p.metricPos = 0
|
|
p.fieldPos = -2
|
|
n, err := readDelimited(p.in[p.inPos:], p.mf)
|
|
p.inPos += n
|
|
if err != nil {
|
|
return p.state, err
|
|
}
|
|
|
|
// Skip empty metric families.
|
|
if len(p.mf.GetMetric()) == 0 {
|
|
return p.Next()
|
|
}
|
|
|
|
// We are at the beginning of a metric family. Put only the name
|
|
// into metricBytes and validate only name and help for now.
|
|
name := p.mf.GetName()
|
|
if !model.IsValidMetricName(model.LabelValue(name)) {
|
|
return EntryInvalid, errors.Errorf("invalid metric name: %s", name)
|
|
}
|
|
if help := p.mf.GetHelp(); !utf8.ValidString(help) {
|
|
return EntryInvalid, errors.Errorf("invalid help for metric %q: %s", name, help)
|
|
}
|
|
p.metricBytes.Reset()
|
|
p.metricBytes.WriteString(name)
|
|
|
|
p.state = EntryHelp
|
|
case EntryHelp:
|
|
p.state = EntryType
|
|
case EntryType:
|
|
if p.mf.GetType() == dto.MetricType_HISTOGRAM &&
|
|
isSparseHistogram(p.mf.GetMetric()[0].GetHistogram()) {
|
|
p.state = EntryHistogram
|
|
} else {
|
|
p.state = EntrySeries
|
|
}
|
|
if err := p.updateMetricBytes(); err != nil {
|
|
return EntryInvalid, err
|
|
}
|
|
case EntryHistogram, EntrySeries:
|
|
if p.state == EntrySeries && !p.fieldsDone &&
|
|
(p.mf.GetType() == dto.MetricType_SUMMARY || p.mf.GetType() == dto.MetricType_HISTOGRAM) {
|
|
p.fieldPos++
|
|
} else {
|
|
p.metricPos++
|
|
p.fieldPos = -2
|
|
p.fieldsDone = false
|
|
}
|
|
if p.metricPos >= len(p.mf.GetMetric()) {
|
|
p.state = EntryInvalid
|
|
return p.Next()
|
|
}
|
|
if err := p.updateMetricBytes(); err != nil {
|
|
return EntryInvalid, err
|
|
}
|
|
default:
|
|
return EntryInvalid, errors.Errorf("invalid protobuf parsing state: %d", p.state)
|
|
}
|
|
return p.state, nil
|
|
}
|
|
|
|
func (p *ProtobufParser) updateMetricBytes() error {
|
|
b := p.metricBytes
|
|
b.Reset()
|
|
b.WriteString(p.getMagicName())
|
|
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
|
|
b.WriteByte(model.SeparatorByte)
|
|
n := lp.GetName()
|
|
if !model.LabelName(n).IsValid() {
|
|
return errors.Errorf("invalid label name: %s", n)
|
|
}
|
|
b.WriteString(n)
|
|
b.WriteByte(model.SeparatorByte)
|
|
v := lp.GetValue()
|
|
if !utf8.ValidString(v) {
|
|
return errors.Errorf("invalid label value: %s", v)
|
|
}
|
|
b.WriteString(v)
|
|
}
|
|
if needed, n, v := p.getMagicLabel(); needed {
|
|
b.WriteByte(model.SeparatorByte)
|
|
b.WriteString(n)
|
|
b.WriteByte(model.SeparatorByte)
|
|
b.WriteString(v)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// getMagicName usually just returns p.mf.GetType() but adds a magic suffix
|
|
// ("_count", "_sum", "_bucket") if needed according to the current parser
|
|
// state.
|
|
func (p *ProtobufParser) getMagicName() string {
|
|
t := p.mf.GetType()
|
|
if p.state == EntryHistogram || (t != dto.MetricType_HISTOGRAM && t != dto.MetricType_SUMMARY) {
|
|
return p.mf.GetName()
|
|
}
|
|
if p.fieldPos == -2 {
|
|
return p.mf.GetName() + "_count"
|
|
}
|
|
if p.fieldPos == -1 {
|
|
return p.mf.GetName() + "_sum"
|
|
}
|
|
if t == dto.MetricType_HISTOGRAM {
|
|
return p.mf.GetName() + "_bucket"
|
|
}
|
|
return p.mf.GetName()
|
|
}
|
|
|
|
// getMagicLabel returns if a magic label ("quantile" or "le") is needed and, if
|
|
// so, its name and value. It also sets p.fieldsDone if applicable.
|
|
func (p *ProtobufParser) getMagicLabel() (bool, string, string) {
|
|
if p.state == EntryHistogram || p.fieldPos < 0 {
|
|
return false, "", ""
|
|
}
|
|
switch p.mf.GetType() {
|
|
case dto.MetricType_SUMMARY:
|
|
qq := p.mf.GetMetric()[p.metricPos].GetSummary().GetQuantile()
|
|
q := qq[p.fieldPos]
|
|
p.fieldsDone = p.fieldPos == len(qq)-1
|
|
return true, model.QuantileLabel, formatOpenMetricsFloat(q.GetQuantile())
|
|
case dto.MetricType_HISTOGRAM:
|
|
bb := p.mf.GetMetric()[p.metricPos].GetHistogram().GetBucket()
|
|
if p.fieldPos >= len(bb) {
|
|
p.fieldsDone = true
|
|
return true, model.BucketLabel, "+Inf"
|
|
}
|
|
b := bb[p.fieldPos]
|
|
p.fieldsDone = math.IsInf(b.GetUpperBound(), +1)
|
|
return true, model.BucketLabel, formatOpenMetricsFloat(b.GetUpperBound())
|
|
}
|
|
return false, "", ""
|
|
}
|
|
|
|
var errInvalidVarint = errors.New("protobufparse: invalid varint encountered")
|
|
|
|
// readDelimited is essentially doing what the function of the same name in
|
|
// github.com/matttproud/golang_protobuf_extensions/pbutil is doing, but it is
|
|
// specific to a MetricFamily, utilizes the more efficient gogo-protobuf
|
|
// unmarshaling, and acts on a byte slice directly without any additional
|
|
// staging buffers.
|
|
func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) {
|
|
if len(b) == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
messageLength, varIntLength := proto.DecodeVarint(b)
|
|
if varIntLength == 0 || varIntLength > binary.MaxVarintLen32 {
|
|
return 0, errInvalidVarint
|
|
}
|
|
totalLength := varIntLength + int(messageLength)
|
|
if totalLength > len(b) {
|
|
return 0, errors.Errorf("protobufparse: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b))
|
|
}
|
|
mf.Reset()
|
|
return totalLength, mf.Unmarshal(b[varIntLength:totalLength])
|
|
}
|
|
|
|
// formatOpenMetricsFloat works like the usual Go string formatting of a fleat
|
|
// but appends ".0" if the resulting number would otherwise contain neither a
|
|
// "." nor an "e".
|
|
func formatOpenMetricsFloat(f float64) string {
|
|
// A few common cases hardcoded.
|
|
switch {
|
|
case f == 1:
|
|
return "1.0"
|
|
case f == 0:
|
|
return "0.0"
|
|
case f == -1:
|
|
return "-1.0"
|
|
case math.IsNaN(f):
|
|
return "NaN"
|
|
case math.IsInf(f, +1):
|
|
return "+Inf"
|
|
case math.IsInf(f, -1):
|
|
return "-Inf"
|
|
}
|
|
s := fmt.Sprint(f)
|
|
if strings.ContainsAny(s, "e.") {
|
|
return s
|
|
}
|
|
return s + ".0"
|
|
}
|
|
|
|
// isSparseHistogram returns false iff the provided histograms has no
|
|
// SparseBuckets and a zero threshold of 0 and a zero count of 0. In principle,
|
|
// this could still be meant to be a sparse histgram (with a zero threshold of 0
|
|
// and no observations yet), but for now, we'll treat this case as a conventional
|
|
// histogram.
|
|
//
|
|
// TODO(beorn7): In the final format, there should be an unambiguous way of
|
|
// deciding if a histogram should be ingested as a conventional one or a sparse
|
|
// one.
|
|
func isSparseHistogram(h *dto.Histogram) bool {
|
|
return len(h.GetSbNegative().GetDelta()) > 0 ||
|
|
len(h.GetSbPositive().GetDelta()) > 0 ||
|
|
h.GetSbZeroCount() > 0 ||
|
|
h.GetSbZeroThreshold() > 0
|
|
}
|