Merge pull request #195 from prometheus/refactor/processor002

Refactor processor for 0.0.2 schema
This commit is contained in:
Bernerd Schaefer 2013-04-26 05:56:48 -07:00
commit 90ee67a038
4 changed files with 104 additions and 117 deletions

View file

@ -34,6 +34,22 @@ const (
// match. // match.
type LabelSet map[LabelName]LabelValue type LabelSet map[LabelName]LabelValue
// Helper function to non-destructively merge two label sets.
func (l LabelSet) Merge(other LabelSet) LabelSet {
result := make(LabelSet, len(l))
for k, v := range l {
result[k] = v
}
for k, v := range other {
result[k] = v
}
return result
}
func (l LabelSet) String() string { func (l LabelSet) String() string {
var ( var (
buffer bytes.Buffer buffer bytes.Buffer

View file

@ -26,3 +26,24 @@ type Processor interface {
// Process performs the work on the input and closes the incoming stream. // Process performs the work on the input and closes the incoming stream.
Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error)
} }
// The ProcessorFunc type allows the use of ordinary functions for processors.
type ProcessorFunc func(io.ReadCloser, time.Time, model.LabelSet, chan Result) error
func (f ProcessorFunc) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) error {
return f(stream, timestamp, baseLabels, results)
}
// Helper function to convert map[string]string into model.LabelSet.
//
// NOTE: This should be deleted when support for go 1.0.3 is removed; 1.1 is
// smart enough to unmarshal JSON objects into model.LabelSet directly.
func LabelSet(labels map[string]string) model.LabelSet {
labelset := make(model.LabelSet, len(labels))
for k, v := range labels {
labelset[model.LabelName(k)] = model.LabelValue(v)
}
return labelset
}

View file

@ -17,139 +17,97 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility"
"io" "io"
"io/ioutil"
"time" "time"
) )
const ( // Processor for telemetry schema version 0.0.2.
baseLabels002 = "baseLabels" var Processor002 ProcessorFunc = func(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) error {
counter002 = "counter" // container for telemetry data
docstring002 = "docstring" var entities []struct {
gauge002 = "gauge"
histogram002 = "histogram"
labels002 = "labels"
metric002 = "metric"
type002 = "type"
value002 = "value"
percentile002 = "percentile"
)
var (
Processor002 Processor = &processor002{}
)
// processor002 is responsible for handling API version 0.0.2.
type processor002 struct {
time utility.Time
}
// entity002 represents a the JSON structure that 0.0.2 uses.
type entity002 []struct {
BaseLabels map[string]string `json:"baseLabels"` BaseLabels map[string]string `json:"baseLabels"`
Docstring string `json:"docstring"` Docstring string `json:"docstring"`
Metric struct { Metric struct {
MetricType string `json:"type"` Type string `json:"type"`
Value []struct { Values json.RawMessage `json:"value"`
Labels map[string]string `json:"labels"`
Value interface{} `json:"value"`
} `json:"value"`
} `json:"metric"` } `json:"metric"`
} }
// concrete type for histogram values
type histogram struct {
Labels map[string]string `json:"labels"`
Values map[string]model.SampleValue `json:"value"`
}
// concrete type for counter and gauge values
type counter struct {
Labels map[string]string `json:"labels"`
Value model.SampleValue `json:"value"`
}
func (p *processor002) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) {
// TODO(matt): Replace with plain-jane JSON unmarshalling.
defer stream.Close() defer stream.Close()
buffer, err := ioutil.ReadAll(stream) if err := json.NewDecoder(stream).Decode(&entities); err != nil {
if err != nil { return err
return
} }
entities := entity002{}
err = json.Unmarshal(buffer, &entities)
if err != nil {
return
}
// TODO(matt): This outer loop is a great basis for parallelization.
for _, entity := range entities { for _, entity := range entities {
for _, value := range entity.Metric.Value { entityLabels := baseLabels.Merge(LabelSet(entity.BaseLabels))
metric := model.Metric{}
for label, labelValue := range baseLabels {
metric[label] = labelValue
}
for label, labelValue := range entity.BaseLabels { switch entity.Metric.Type {
metric[model.LabelName(label)] = model.LabelValue(labelValue) case "counter", "gauge":
} var values []counter
for label, labelValue := range value.Labels { if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
metric[model.LabelName(label)] = model.LabelValue(labelValue) results <- Result{
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
} }
switch entity.Metric.MetricType {
case gauge002, counter002:
sampleValue, ok := value.Value.(float64)
if !ok {
err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value)
continue continue
} }
sample := model.Sample{ for _, counter := range values {
Metric: metric, labels := entityLabels.Merge(LabelSet(counter.Labels))
Timestamp: timestamp,
Value: model.SampleValue(sampleValue),
}
results <- Result{ results <- Result{
Err: err, Sample: model.Sample{
Sample: sample, Metric: model.Metric(labels),
}
break
case histogram002:
sampleValue, ok := value.Value.(map[string]interface{})
if !ok {
err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value)
continue
}
for percentile, percentileValue := range sampleValue {
individualValue, ok := percentileValue.(float64)
if !ok {
err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue)
continue
}
childMetric := make(map[model.LabelName]model.LabelValue, len(metric)+1)
for k, v := range metric {
childMetric[k] = v
}
childMetric[model.LabelName(percentile002)] = model.LabelValue(percentile)
sample := model.Sample{
Metric: childMetric,
Timestamp: timestamp, Timestamp: timestamp,
Value: model.SampleValue(individualValue), Value: counter.Value,
},
} }
}
case "histogram":
var values []histogram
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
results <- Result{
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
}
continue
}
for _, histogram := range values {
for percentile, value := range histogram.Values {
labels := entityLabels.Merge(LabelSet(histogram.Labels))
labels[model.LabelName("percentile")] = model.LabelValue(percentile)
results <- Result{ results <- Result{
Err: err, Sample: model.Sample{
Sample: sample, Metric: model.Metric(labels),
Timestamp: timestamp,
Value: value,
},
}
} }
} }
break
default: default:
results <- Result{
Err: fmt.Errorf("Unknown metric type %q", entity.Metric.Type),
} }
} }
} }
return return nil
} }

View file

@ -31,10 +31,10 @@ func testProcessor002Process(t test.Tester) {
err error err error
}{ }{
{ {
err: fmt.Errorf("unexpected end of JSON input"), err: fmt.Errorf("EOF"),
}, },
{ {
in: "[{\"baseLabels\":{\"name\":\"rpc_calls_total\"},\"docstring\":\"RPC calls.\",\"metric\":{\"type\":\"counter\",\"value\":[{\"labels\":{\"service\":\"zed\"},\"value\":25},{\"labels\":{\"service\":\"bar\"},\"value\":25},{\"labels\":{\"service\":\"foo\"},\"value\":25}]}},{\"baseLabels\":{\"name\":\"rpc_latency_microseconds\"},\"docstring\":\"RPC latency.\",\"metric\":{\"type\":\"histogram\",\"value\":[{\"labels\":{\"service\":\"foo\"},\"value\":{\"0.010000\":15.890724674774395,\"0.050000\":15.890724674774395,\"0.500000\":84.63044031436561,\"0.900000\":160.21100853053224,\"0.990000\":172.49828748957728}},{\"labels\":{\"service\":\"zed\"},\"value\":{\"0.010000\":0.0459814091918713,\"0.050000\":0.0459814091918713,\"0.500000\":0.6120456642749681,\"0.900000\":1.355915069887731,\"0.990000\":1.772733213161236}},{\"labels\":{\"service\":\"bar\"},\"value\":{\"0.010000\":78.48563317257356,\"0.050000\":78.48563317257356,\"0.500000\":97.31798360385088,\"0.900000\":109.89202084295582,\"0.990000\":109.99626121011262}}]}}]", in: `[{"baseLabels":{"name":"rpc_calls_total"},"docstring":"RPC calls.","metric":{"type":"counter","value":[{"labels":{"service":"zed"},"value":25},{"labels":{"service":"bar"},"value":25},{"labels":{"service":"foo"},"value":25}]}},{"baseLabels":{"name":"rpc_latency_microseconds"},"docstring":"RPC latency.","metric":{"type":"histogram","value":[{"labels":{"service":"foo"},"value":{"0.010000":15.890724674774395,"0.050000":15.890724674774395,"0.500000":84.63044031436561,"0.900000":160.21100853053224,"0.990000":172.49828748957728}},{"labels":{"service":"zed"},"value":{"0.010000":0.0459814091918713,"0.050000":0.0459814091918713,"0.500000":0.6120456642749681,"0.900000":1.355915069887731,"0.990000":1.772733213161236}},{"labels":{"service":"bar"},"value":{"0.010000":78.48563317257356,"0.050000":78.48563317257356,"0.500000":97.31798360385088,"0.900000":109.89202084295582,"0.990000":109.99626121011262}}]}}]`,
out: []Result{ out: []Result{
{ {
Sample: model.Sample{ Sample: model.Sample{
@ -179,14 +179,6 @@ func testProcessor002Process(t test.Tester) {
continue continue
} }
if scenario.err != nil && err != nil {
if scenario.err.Error() != err.Error() {
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
}
} else if scenario.err != err {
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
}
delivered := make([]Result, 0) delivered := make([]Result, 0)
for len(inputChannel) != 0 { for len(inputChannel) != 0 {