Refactor processor for 0.0.2 schema

Primary changes:

* Strictly typed unmarshalling of metric values
* Schema types are contained by the processor (no "type entity002")

Minor changes:

* Added ProcessorFunc type for expressing processors as simple
  functions.
* Added non-destructive `Merge` method to `model.LabelSet`
This commit is contained in:
Bernerd Schaefer 2013-04-26 11:52:26 +02:00
parent 7c3e04c546
commit dfd5c9ce28
4 changed files with 90 additions and 117 deletions

View file

@ -34,6 +34,22 @@ const (
// match. // match.
type LabelSet map[LabelName]LabelValue type LabelSet map[LabelName]LabelValue
// Helper function to non-destructively merge two label sets.
func (l LabelSet) Merge(other LabelSet) LabelSet {
result := make(LabelSet, len(l))
for k, v := range l {
result[k] = v
}
for k, v := range other {
result[k] = v
}
return result
}
func (l LabelSet) String() string { func (l LabelSet) String() string {
var ( var (
buffer bytes.Buffer buffer bytes.Buffer

View file

@ -26,3 +26,10 @@ type Processor interface {
// Process performs the work on the input and closes the incoming stream. // Process performs the work on the input and closes the incoming stream.
Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error)
} }
// The ProcessorFunc type allows the use of ordinary functions for processors.
type ProcessorFunc func(io.ReadCloser, time.Time, model.LabelSet, chan Result) error
func (f ProcessorFunc) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) error {
return f(stream, timestamp, baseLabels, results)
}

View file

@ -17,139 +17,97 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility"
"io" "io"
"io/ioutil"
"time" "time"
) )
const ( // Processor for telemetry schema version 0.0.2.
baseLabels002 = "baseLabels" var Processor002 ProcessorFunc = func(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) error {
counter002 = "counter" // container for telemetry data
docstring002 = "docstring" var entities []struct {
gauge002 = "gauge" BaseLabels model.LabelSet `json:"baseLabels"`
histogram002 = "histogram" Docstring string `json:"docstring"`
labels002 = "labels" Metric struct {
metric002 = "metric" Type string `json:"type"`
type002 = "type" Values json.RawMessage `json:"value"`
value002 = "value" } `json:"metric"`
percentile002 = "percentile" }
)
var ( // concrete type for histogram values
Processor002 Processor = &processor002{} type histogram struct {
) Labels model.LabelSet `json:"labels"`
Values map[string]model.SampleValue `json:"value"`
}
// processor002 is responsible for handling API version 0.0.2. // concrete type for counter and gauge values
type processor002 struct { type counter struct {
time utility.Time Labels model.LabelSet `json:"labels"`
} Value model.SampleValue `json:"value"`
}
// entity002 represents a the JSON structure that 0.0.2 uses.
type entity002 []struct {
BaseLabels map[string]string `json:"baseLabels"`
Docstring string `json:"docstring"`
Metric struct {
MetricType string `json:"type"`
Value []struct {
Labels map[string]string `json:"labels"`
Value interface{} `json:"value"`
} `json:"value"`
} `json:"metric"`
}
func (p *processor002) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) {
// TODO(matt): Replace with plain-jane JSON unmarshalling.
defer stream.Close() defer stream.Close()
buffer, err := ioutil.ReadAll(stream) if err := json.NewDecoder(stream).Decode(&entities); err != nil {
if err != nil { return err
return
} }
entities := entity002{}
err = json.Unmarshal(buffer, &entities)
if err != nil {
return
}
// TODO(matt): This outer loop is a great basis for parallelization.
for _, entity := range entities { for _, entity := range entities {
for _, value := range entity.Metric.Value { entityLabels := baseLabels.Merge(entity.BaseLabels)
metric := model.Metric{}
for label, labelValue := range baseLabels {
metric[label] = labelValue
}
for label, labelValue := range entity.BaseLabels { switch entity.Metric.Type {
metric[model.LabelName(label)] = model.LabelValue(labelValue) case "counter", "gauge":
} var values []counter
for label, labelValue := range value.Labels { if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
metric[model.LabelName(label)] = model.LabelValue(labelValue) results <- Result{
} Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
switch entity.Metric.MetricType {
case gauge002, counter002:
sampleValue, ok := value.Value.(float64)
if !ok {
err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value)
continue
} }
continue
}
sample := model.Sample{ for _, counter := range values {
Metric: metric, labels := entityLabels.Merge(counter.Labels)
Timestamp: timestamp,
Value: model.SampleValue(sampleValue),
}
results <- Result{ results <- Result{
Err: err, Sample: model.Sample{
Sample: sample, Metric: model.Metric(labels),
}
break
case histogram002:
sampleValue, ok := value.Value.(map[string]interface{})
if !ok {
err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value)
continue
}
for percentile, percentileValue := range sampleValue {
individualValue, ok := percentileValue.(float64)
if !ok {
err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue)
continue
}
childMetric := make(map[model.LabelName]model.LabelValue, len(metric)+1)
for k, v := range metric {
childMetric[k] = v
}
childMetric[model.LabelName(percentile002)] = model.LabelValue(percentile)
sample := model.Sample{
Metric: childMetric,
Timestamp: timestamp, Timestamp: timestamp,
Value: model.SampleValue(individualValue), Value: counter.Value,
} },
}
}
case "histogram":
var values []histogram
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
results <- Result{
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
}
continue
}
for _, histogram := range values {
for percentile, value := range histogram.Values {
labels := entityLabels.Merge(histogram.Labels)
labels[model.LabelName("percentile")] = model.LabelValue(percentile)
results <- Result{ results <- Result{
Err: err, Sample: model.Sample{
Sample: sample, Metric: model.Metric(labels),
Timestamp: timestamp,
Value: value,
},
} }
} }
}
break default:
default: results <- Result{
Err: fmt.Errorf("Unknown metric type %q", entity.Metric.Type),
} }
} }
} }
return return nil
} }

View file

@ -31,10 +31,10 @@ func testProcessor002Process(t test.Tester) {
err error err error
}{ }{
{ {
err: fmt.Errorf("unexpected end of JSON input"), err: fmt.Errorf("EOF"),
}, },
{ {
in: "[{\"baseLabels\":{\"name\":\"rpc_calls_total\"},\"docstring\":\"RPC calls.\",\"metric\":{\"type\":\"counter\",\"value\":[{\"labels\":{\"service\":\"zed\"},\"value\":25},{\"labels\":{\"service\":\"bar\"},\"value\":25},{\"labels\":{\"service\":\"foo\"},\"value\":25}]}},{\"baseLabels\":{\"name\":\"rpc_latency_microseconds\"},\"docstring\":\"RPC latency.\",\"metric\":{\"type\":\"histogram\",\"value\":[{\"labels\":{\"service\":\"foo\"},\"value\":{\"0.010000\":15.890724674774395,\"0.050000\":15.890724674774395,\"0.500000\":84.63044031436561,\"0.900000\":160.21100853053224,\"0.990000\":172.49828748957728}},{\"labels\":{\"service\":\"zed\"},\"value\":{\"0.010000\":0.0459814091918713,\"0.050000\":0.0459814091918713,\"0.500000\":0.6120456642749681,\"0.900000\":1.355915069887731,\"0.990000\":1.772733213161236}},{\"labels\":{\"service\":\"bar\"},\"value\":{\"0.010000\":78.48563317257356,\"0.050000\":78.48563317257356,\"0.500000\":97.31798360385088,\"0.900000\":109.89202084295582,\"0.990000\":109.99626121011262}}]}}]", in: `[{"baseLabels":{"name":"rpc_calls_total"},"docstring":"RPC calls.","metric":{"type":"counter","value":[{"labels":{"service":"zed"},"value":25},{"labels":{"service":"bar"},"value":25},{"labels":{"service":"foo"},"value":25}]}},{"baseLabels":{"name":"rpc_latency_microseconds"},"docstring":"RPC latency.","metric":{"type":"histogram","value":[{"labels":{"service":"foo"},"value":{"0.010000":15.890724674774395,"0.050000":15.890724674774395,"0.500000":84.63044031436561,"0.900000":160.21100853053224,"0.990000":172.49828748957728}},{"labels":{"service":"zed"},"value":{"0.010000":0.0459814091918713,"0.050000":0.0459814091918713,"0.500000":0.6120456642749681,"0.900000":1.355915069887731,"0.990000":1.772733213161236}},{"labels":{"service":"bar"},"value":{"0.010000":78.48563317257356,"0.050000":78.48563317257356,"0.500000":97.31798360385088,"0.900000":109.89202084295582,"0.990000":109.99626121011262}}]}}]`,
out: []Result{ out: []Result{
{ {
Sample: model.Sample{ Sample: model.Sample{
@ -179,14 +179,6 @@ func testProcessor002Process(t test.Tester) {
continue continue
} }
if scenario.err != nil && err != nil {
if scenario.err.Error() != err.Error() {
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
}
} else if scenario.err != err {
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
}
delivered := make([]Result, 0) delivered := make([]Result, 0)
for len(inputChannel) != 0 { for len(inputChannel) != 0 {