Merge pull request #1107 from prometheus/expfmt

Update expfmt.NewDecoder usage
This commit is contained in:
Fabian Reinartz 2015-09-22 13:22:00 +02:00
commit a8126c9be1
7 changed files with 271 additions and 89 deletions

View file

@ -443,10 +443,7 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
return fmt.Errorf("server returned HTTP status %s", resp.Status) return fmt.Errorf("server returned HTTP status %s", resp.Status)
} }
dec, err := expfmt.NewDecoder(resp.Body, resp.Header) dec := expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header))
if err != nil {
return err
}
sdec := expfmt.SampleDecoder{ sdec := expfmt.SampleDecoder{
Dec: dec, Dec: dec,

View file

@ -36,39 +36,67 @@ type DecodeOptions struct {
Timestamp model.Time Timestamp model.Time
} }
// NewDecoder returns a new decoder based on the HTTP header. // ResponseFormat extracts the correct format from a HTTP response header.
func NewDecoder(r io.Reader, h http.Header) (Decoder, error) { // If no matching format can be found FormatUnknown is returned.
func ResponseFormat(h http.Header) Format {
ct := h.Get(hdrContentType) ct := h.Get(hdrContentType)
mediatype, params, err := mime.ParseMediaType(ct) mediatype, params, err := mime.ParseMediaType(ct)
if err != nil { if err != nil {
return nil, fmt.Errorf("invalid Content-Type header %q: %s", ct, err) return FmtUnknown
} }
const ( const (
protoType = ProtoType + "/" + ProtoSubType textType = "text/plain"
textType = "text/plain" jsonType = "application/json"
) )
switch mediatype { switch mediatype {
case protoType: case ProtoType:
if p := params["proto"]; p != ProtoProtocol { if p, ok := params["proto"]; ok && p != ProtoProtocol {
return nil, fmt.Errorf("unrecognized protocol message %s", p) return FmtUnknown
} }
if e := params["encoding"]; e != "delimited" { if e, ok := params["encoding"]; ok && e != "delimited" {
return nil, fmt.Errorf("unsupported encoding %s", e) return FmtUnknown
} }
return &protoDecoder{r: r}, nil return FmtProtoDelim
case textType: case textType:
if v, ok := params["version"]; ok && v != "0.0.4" { if v, ok := params["version"]; ok && v != TextVersion {
return nil, fmt.Errorf("unrecognized protocol version %s", v) return FmtUnknown
} }
return &textDecoder{r: r}, nil return FmtText
default: case jsonType:
return nil, fmt.Errorf("unsupported media type %q, expected %q or %q", mediatype, protoType, textType) var prometheusAPIVersion string
if params["schema"] == "prometheus/telemetry" && params["version"] != "" {
prometheusAPIVersion = params["version"]
} else {
prometheusAPIVersion = h.Get("X-Prometheus-API-Version")
}
switch prometheusAPIVersion {
case "0.0.2", "":
return fmtJSON2
default:
return FmtUnknown
}
} }
return FmtUnknown
}
// NewDecoder returns a new decoder based on the given input format.
// If the input format does not imply otherwise, a text format decoder is returned.
func NewDecoder(r io.Reader, format Format) Decoder {
switch format {
case FmtProtoDelim:
return &protoDecoder{r: r}
case fmtJSON2:
return newJSON2Decoder(r)
}
return &textDecoder{r: r}
} }
// protoDecoder implements the Decoder interface for protocol buffers. // protoDecoder implements the Decoder interface for protocol buffers.
@ -101,12 +129,15 @@ func (d *textDecoder) Decode(v *dto.MetricFamily) error {
if len(fams) == 0 { if len(fams) == 0 {
return io.EOF return io.EOF
} }
d.fams = make([]*dto.MetricFamily, 0, len(fams))
for _, f := range fams { for _, f := range fams {
d.fams = append(d.fams, f) d.fams = append(d.fams, f)
} }
} }
*v = *d.fams[len(d.fams)-1]
d.fams = d.fams[:len(d.fams)-1] *v = *d.fams[0]
d.fams = d.fams[1:]
return nil return nil
} }
@ -272,33 +303,29 @@ func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
}) })
} }
if m.Summary.SampleSum != nil { lset := make(model.LabelSet, len(m.Label)+1)
lset := make(model.LabelSet, len(m.Label)+1) for _, p := range m.Label {
for _, p := range m.Label { lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
}
lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
samples = append(samples, &model.Sample{
Metric: model.Metric(lset),
Value: model.SampleValue(m.Summary.GetSampleSum()),
Timestamp: timestamp,
})
} }
lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
if m.Summary.SampleCount != nil { samples = append(samples, &model.Sample{
lset := make(model.LabelSet, len(m.Label)+1) Metric: model.Metric(lset),
for _, p := range m.Label { Value: model.SampleValue(m.Summary.GetSampleSum()),
lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) Timestamp: timestamp,
} })
lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
samples = append(samples, &model.Sample{ lset = make(model.LabelSet, len(m.Label)+1)
Metric: model.Metric(lset), for _, p := range m.Label {
Value: model.SampleValue(m.Summary.GetSampleCount()), lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
Timestamp: timestamp,
})
} }
lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
samples = append(samples, &model.Sample{
Metric: model.Metric(lset),
Value: model.SampleValue(m.Summary.GetSampleCount()),
Timestamp: timestamp,
})
} }
return samples return samples
@ -338,50 +365,46 @@ func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
}) })
} }
if m.Histogram.SampleSum != nil { lset := make(model.LabelSet, len(m.Label)+1)
lset := make(model.LabelSet, len(m.Label)+1) for _, p := range m.Label {
lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
}
lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
samples = append(samples, &model.Sample{
Metric: model.Metric(lset),
Value: model.SampleValue(m.Histogram.GetSampleSum()),
Timestamp: timestamp,
})
lset = make(model.LabelSet, len(m.Label)+1)
for _, p := range m.Label {
lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
}
lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
count := &model.Sample{
Metric: model.Metric(lset),
Value: model.SampleValue(m.Histogram.GetSampleCount()),
Timestamp: timestamp,
}
samples = append(samples, count)
if !infSeen {
// Append an infinity bucket sample.
lset := make(model.LabelSet, len(m.Label)+2)
for _, p := range m.Label { for _, p := range m.Label {
lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
} }
lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum") lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
samples = append(samples, &model.Sample{ samples = append(samples, &model.Sample{
Metric: model.Metric(lset), Metric: model.Metric(lset),
Value: model.SampleValue(m.Histogram.GetSampleSum()), Value: count.Value,
Timestamp: timestamp, Timestamp: timestamp,
}) })
} }
if m.Histogram.SampleCount != nil {
lset := make(model.LabelSet, len(m.Label)+1)
for _, p := range m.Label {
lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
}
lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
count := &model.Sample{
Metric: model.Metric(lset),
Value: model.SampleValue(m.Histogram.GetSampleCount()),
Timestamp: timestamp,
}
samples = append(samples, count)
if !infSeen {
// Append a infinity bucket sample.
lset := make(model.LabelSet, len(m.Label)+2)
for _, p := range m.Label {
lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
}
lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
samples = append(samples, &model.Sample{
Metric: model.Metric(lset),
Value: count.Value,
Timestamp: timestamp,
})
}
}
} }
return samples return samples

View file

@ -41,7 +41,7 @@ func (e encoder) Encode(v *dto.MetricFamily) error {
func Negotiate(h http.Header) Format { func Negotiate(h http.Header) Format {
for _, ac := range goautoneg.ParseAccept(h.Get(hdrAccept)) { for _, ac := range goautoneg.ParseAccept(h.Get(hdrAccept)) {
// Check for protocol buffer // Check for protocol buffer
if ac.Type == ProtoType && ac.SubType == ProtoSubType && ac.Params["proto"] == ProtoProtocol { if ac.Type+"/"+ac.SubType == ProtoType && ac.Params["proto"] == ProtoProtocol {
switch ac.Params["encoding"] { switch ac.Params["encoding"] {
case "delimited": case "delimited":
return FmtProtoDelim return FmtProtoDelim

View file

@ -19,16 +19,19 @@ type Format string
const ( const (
TextVersion = "0.0.4" TextVersion = "0.0.4"
ProtoType = `application` ProtoType = `application/vnd.google.protobuf`
ProtoSubType = `vnd.google.protobuf`
ProtoProtocol = `io.prometheus.client.MetricFamily` ProtoProtocol = `io.prometheus.client.MetricFamily`
ProtoFmt = ProtoType + "/" + ProtoSubType + "; proto=" + ProtoProtocol + ";" ProtoFmt = ProtoType + "; proto=" + ProtoProtocol + ";"
// The Content-Type values for the different wire protocols. // The Content-Type values for the different wire protocols.
FmtUnknown Format = `<unknown>`
FmtText Format = `text/plain; version=` + TextVersion FmtText Format = `text/plain; version=` + TextVersion
FmtProtoDelim Format = ProtoFmt + ` encoding=delimited` FmtProtoDelim Format = ProtoFmt + ` encoding=delimited`
FmtProtoText Format = ProtoFmt + ` encoding=text` FmtProtoText Format = ProtoFmt + ` encoding=text`
FmtProtoCompact Format = ProtoFmt + ` encoding=compact-text` FmtProtoCompact Format = ProtoFmt + ` encoding=compact-text`
// fmtJSON2 is hidden as it is deprecated.
fmtJSON2 Format = `application/json; version=0.0.2`
) )
const ( const (

View file

@ -0,0 +1,162 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package expfmt
import (
"encoding/json"
"fmt"
"io"
"sort"
"github.com/golang/protobuf/proto"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
)
type json2Decoder struct {
dec *json.Decoder
fams []*dto.MetricFamily
}
func newJSON2Decoder(r io.Reader) Decoder {
return &json2Decoder{
dec: json.NewDecoder(r),
}
}
type histogram002 struct {
Labels model.LabelSet `json:"labels"`
Values map[string]float64 `json:"value"`
}
type counter002 struct {
Labels model.LabelSet `json:"labels"`
Value float64 `json:"value"`
}
func protoLabelSet(base, ext model.LabelSet) []*dto.LabelPair {
labels := base.Clone().Merge(ext)
delete(labels, model.MetricNameLabel)
names := make([]string, 0, len(labels))
for ln := range labels {
names = append(names, string(ln))
}
sort.Strings(names)
pairs := make([]*dto.LabelPair, 0, len(labels))
for _, ln := range names {
lv := labels[model.LabelName(ln)]
pairs = append(pairs, &dto.LabelPair{
Name: proto.String(ln),
Value: proto.String(string(lv)),
})
}
return pairs
}
func (d *json2Decoder) more() error {
var entities []struct {
BaseLabels model.LabelSet `json:"baseLabels"`
Docstring string `json:"docstring"`
Metric struct {
Type string `json:"type"`
Values json.RawMessage `json:"value"`
} `json:"metric"`
}
if err := d.dec.Decode(&entities); err != nil {
return err
}
for _, e := range entities {
f := &dto.MetricFamily{
Name: proto.String(string(e.BaseLabels[model.MetricNameLabel])),
Help: proto.String(e.Docstring),
Type: dto.MetricType_UNTYPED.Enum(),
Metric: []*dto.Metric{},
}
d.fams = append(d.fams, f)
switch e.Metric.Type {
case "counter", "gauge":
var values []counter002
if err := json.Unmarshal(e.Metric.Values, &values); err != nil {
return fmt.Errorf("could not extract %s value: %s", e.Metric.Type, err)
}
for _, ctr := range values {
f.Metric = append(f.Metric, &dto.Metric{
Label: protoLabelSet(e.BaseLabels, ctr.Labels),
Untyped: &dto.Untyped{
Value: proto.Float64(ctr.Value),
},
})
}
case "histogram":
var values []histogram002
if err := json.Unmarshal(e.Metric.Values, &values); err != nil {
return fmt.Errorf("could not extract %s value: %s", e.Metric.Type, err)
}
for _, hist := range values {
quants := make([]string, 0, len(values))
for q := range hist.Values {
quants = append(quants, q)
}
sort.Strings(quants)
for _, q := range quants {
value := hist.Values[q]
// The correct label is "quantile" but to not break old expressions
// this remains "percentile"
hist.Labels["percentile"] = model.LabelValue(q)
f.Metric = append(f.Metric, &dto.Metric{
Label: protoLabelSet(e.BaseLabels, hist.Labels),
Untyped: &dto.Untyped{
Value: proto.Float64(value),
},
})
}
}
default:
return fmt.Errorf("unknown metric type %q", e.Metric.Type)
}
}
return nil
}
// Decode implements the Decoder interface.
func (d *json2Decoder) Decode(v *dto.MetricFamily) error {
if len(d.fams) == 0 {
if err := d.more(); err != nil {
return err
}
}
*v = *d.fams[0]
d.fams = d.fams[1:]
return nil
}

View file

@ -41,9 +41,6 @@ func MetricFamilyToText(out io.Writer, in *dto.MetricFamily) (int, error) {
if name == "" { if name == "" {
return written, fmt.Errorf("MetricFamily has no name: %s", in) return written, fmt.Errorf("MetricFamily has no name: %s", in)
} }
if in.Type == nil {
return written, fmt.Errorf("MetricFamily has no type: %s", in)
}
// Comments, first HELP, then TYPE. // Comments, first HELP, then TYPE.
if in.Help != nil { if in.Help != nil {

4
vendor/vendor.json vendored
View file

@ -74,8 +74,8 @@
}, },
{ {
"path": "github.com/prometheus/common/expfmt", "path": "github.com/prometheus/common/expfmt",
"revision": "e5bcf05f3c1b99df9892e773349e88d038f0e93c", "revision": "51d43993bd4018e9470540600641285cad786840",
"revisionTime": "2015-09-17T12:22:22+02:00" "revisionTime": "2015-09-22T12:03:38+02:00"
}, },
{ {
"path": "github.com/prometheus/common/model", "path": "github.com/prometheus/common/model",