New encoding for OpenTSDB tag values (and metric names).

Change-Id: I0f4393f638c6e2bb2b2ce14e58e38b49ce456da8
This commit is contained in:
Bjoern Rabenstein 2014-03-21 12:49:24 +01:00
parent 0a65b691cc
commit caf47b2fbc
4 changed files with 260 additions and 30 deletions

View file

@ -5,11 +5,13 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math"
"net/http" "net/http"
"net/url" "net/url"
"regexp" "regexp"
"time" "time"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
@ -41,26 +43,20 @@ func NewClient(url string, timeout time.Duration) *Client {
// StoreSamplesRequest is used for building a JSON request for storing samples // StoreSamplesRequest is used for building a JSON request for storing samples
// via the OpenTSDB. // via the OpenTSDB.
type StoreSamplesRequest struct { type StoreSamplesRequest struct {
Metric string `json:"metric"` Metric TagValue `json:"metric"`
Timestamp int64 `json:"timestamp"` Timestamp int64 `json:"timestamp"`
Value clientmodel.SampleValue `json:"value"` Value float64 `json:"value"`
Tags map[string]string `json:"tags"` Tags map[string]TagValue `json:"tags"`
}
// escapeTagValue escapes Prometheus label values to valid tag values for
// OpenTSDB.
func escapeTagValue(l clientmodel.LabelValue) string {
return illegalCharsRE.ReplaceAllString(string(l), "_")
} }
// tagsFromMetric translates Prometheus metric into OpenTSDB tags. // tagsFromMetric translates Prometheus metric into OpenTSDB tags.
func tagsFromMetric(m clientmodel.Metric) map[string]string { func tagsFromMetric(m clientmodel.Metric) map[string]TagValue {
tags := make(map[string]string, len(m)-1) tags := make(map[string]TagValue, len(m)-1)
for l, v := range m { for l, v := range m {
if l == clientmodel.MetricNameLabel { if l == clientmodel.MetricNameLabel {
continue continue
} }
tags[string(l)] = escapeTagValue(v) tags[string(l)] = TagValue(v)
} }
return tags return tags
} }
@ -69,11 +65,16 @@ func tagsFromMetric(m clientmodel.Metric) map[string]string {
func (c *Client) Store(samples clientmodel.Samples) error { func (c *Client) Store(samples clientmodel.Samples) error {
reqs := make([]StoreSamplesRequest, 0, len(samples)) reqs := make([]StoreSamplesRequest, 0, len(samples))
for _, s := range samples { for _, s := range samples {
metric := escapeTagValue(s.Metric[clientmodel.MetricNameLabel]) v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
glog.Warningf("cannot send value %d to OpenTSDB, skipping sample %#v", v, s)
continue
}
metric := TagValue(s.Metric[clientmodel.MetricNameLabel])
reqs = append(reqs, StoreSamplesRequest{ reqs = append(reqs, StoreSamplesRequest{
Metric: metric, Metric: metric,
Timestamp: s.Timestamp.Unix(), Timestamp: s.Timestamp.Unix(),
Value: s.Value, Value: v,
Tags: tagsFromMetric(s.Metric), Tags: tagsFromMetric(s.Metric),
}) })
} }

View file

@ -14,28 +14,62 @@
package opentsdb package opentsdb
import ( import (
"bytes"
"encoding/json"
"reflect"
"testing" "testing"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
) )
func TestTagsFromMetric(t *testing.T) { var (
input := clientmodel.Metric{ metric = clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric", clientmodel.MetricNameLabel: "test:metric",
"test:label": "test:value", "testlabel": "test:value",
"many_chars": "abc!ABC:012-3!45ö67~89./", "many_chars": "abc!ABC:012-3!45ö67~89./",
} }
expected := map[string]string{ )
"test:label": "test_value",
"many_chars": "abc_ABC_012-3_45_67_89./", func TestTagsFromMetric(t *testing.T) {
expected := map[string]TagValue{
"testlabel": TagValue("test:value"),
"many_chars": TagValue("abc!ABC:012-3!45ö67~89./"),
} }
actual := tagsFromMetric(input) actual := tagsFromMetric(metric)
if len(actual) != len(expected) { if !reflect.DeepEqual(actual, expected) {
t.Fatalf("Expected %v, got %v", expected, actual) t.Errorf("Expected %#v, got %#v", expected, actual)
} }
for k, v := range expected { }
if v != actual[k] {
t.Fatalf("Expected %s => %s, got %s => %s", k, v, k, actual[k]) func TestMarshalStoreSamplesRequest(t *testing.T) {
} request := StoreSamplesRequest{
Metric: TagValue("test:metric"),
Timestamp: 4711,
Value: 3.1415,
Tags: tagsFromMetric(metric),
}
expectedJSON := []byte(`{"metric":"test_.metric","timestamp":4711,"value":3.1415,"tags":{"many_chars":"abc_21ABC_.012-3_2145_C3_B667_7E89./","testlabel":"test_.value"}}`)
resultingJSON, err := json.Marshal(request)
if err != nil {
t.Fatalf("Marshal(request) resulted in err: %s", err)
}
if !bytes.Equal(resultingJSON, expectedJSON) {
t.Errorf(
"Marshal(request) => %q, want %q",
resultingJSON, expectedJSON,
)
}
var unmarshaledRequest StoreSamplesRequest
err = json.Unmarshal(expectedJSON, &unmarshaledRequest)
if err != nil {
t.Fatalf("Unarshal(expectedJSON, &unmarshaledRequest) resulted in err: %s", err)
}
if !reflect.DeepEqual(unmarshaledRequest, request) {
t.Errorf(
"Unarshal(expectedJSON, &unmarshaledRequest) => %#v, want %#v",
unmarshaledRequest, request,
)
} }
} }

View file

@ -0,0 +1,144 @@
package opentsdb
import (
"bytes"
"fmt"
clientmodel "github.com/prometheus/client_golang/model"
)
// TagValue is a clientmodel.LabelValue that implements json.Marshaler and
// json.Unmarshaler. These implementations avoid characters illegal in
// OpenTSDB. See the MarshalJSON for details. TagValue is used for the values of
// OpenTSDB tags as well as for OpenTSDB metric names.
type TagValue clientmodel.LabelValue
// MarshalJSON marshals this TagValue into JSON that only contains runes allowed
// in OpenTSDB. It implements json.Marshaler. The runes allowed in OpenTSDB are
// all single-byte. This function encodes the arbitrary byte sequence found in
// this TagValue in the following way:
//
// - The string that underlies TagValue is scanned byte by byte.
//
// - If a byte represents a legal OpenTSDB rune with the exception of '_', that
// byte is directly copied to the resulting JSON byte slice.
//
// - If '_' is encountered, it is replaced by '__'.
//
// - If ':' is encountered, it is replaced by '_.'.
//
// - All other bytes are replaced by '_' followed by two bytes containing the
// uppercase ASCII representation of their hexadecimal value.
//
// This encoding allows to save arbitrary Go strings in OpenTSDB. That's
// required because Prometheus label values can contain anything, and even
// Prometheus metric names may (and often do) contain ':' (which is disallowed
// in OpenTSDB strings). The encoding uses '_' as an escape character and
// renders a ':' more or less recognizable as '_.'
//
// Examples:
//
// "foo-bar-42" -> "foo-bar-42"
//
// "foo_bar_42" -> "foo__bar__42"
//
// "http://example.org:8080" -> "http_.//example.org_.8080"
//
// "Björn's email: bjoern@soundcloud.com" ->
// "Bj_C3_B6rn_27s_20email_._20bjoern_40soundcloud.com"
//
// "日" -> "_E6_97_A5"
func (tv TagValue) MarshalJSON() ([]byte, error) {
length := len(tv)
// Need at least two more bytes than in tv.
result := bytes.NewBuffer(make([]byte, 1, length+2))
result.WriteByte('"')
for i := 0; i < length; i++ {
b := tv[i]
switch {
case (b >= '-' && b <= '9') || // '-', '.', '/', 0-9
(b >= 'A' && b <= 'Z') ||
(b >= 'a' && b <= 'z'):
result.WriteByte(b)
case b == '_':
result.WriteString("__")
case b == ':':
result.WriteString("_.")
default:
result.WriteString(fmt.Sprintf("_%X", b))
}
}
result.WriteByte('"')
return result.Bytes(), nil
}
// UnmarshalJSON unmarshals JSON strings coming from OpenTSDB into Go strings
// by applying the inverse of what is described for the MarshalJSON method.
func (tv *TagValue) UnmarshalJSON(json []byte) error {
escapeLevel := 0 // How many bytes after '_'.
var parsedByte byte
// Might need fewer bytes, but let's avoid realloc.
result := bytes.NewBuffer(make([]byte, 0, len(json)-2))
for i, b := range json {
if i == 0 {
if b != '"' {
return fmt.Errorf("expected '\"', got %q", b)
}
continue
}
if i == len(json)-1 {
if b != '"' {
return fmt.Errorf("expected '\"', got %q", b)
}
break
}
switch escapeLevel {
case 0:
if b == '_' {
escapeLevel = 1
continue
}
result.WriteByte(b)
case 1:
switch {
case b == '_':
result.WriteByte('_')
escapeLevel = 0
case b == '.':
result.WriteByte(':')
escapeLevel = 0
case b >= '0' && b <= '9':
parsedByte = (b - 48) << 4
escapeLevel = 2
case b >= 'A' && b <= 'F': // A-F
parsedByte = (b - 55) << 4
escapeLevel = 2
default:
return fmt.Errorf(
"illegal escape sequence at byte %d (%c)",
i, b,
)
}
case 2:
switch {
case b >= '0' && b <= '9':
parsedByte += b - 48
case b >= 'A' && b <= 'F': // A-F
parsedByte += b - 55
default:
return fmt.Errorf(
"illegal escape sequence at byte %d (%c)",
i, b,
)
}
result.WriteByte(parsedByte)
escapeLevel = 0
default:
panic("unexpected escape level")
}
}
*tv = TagValue(result.String())
return nil
}

View file

@ -0,0 +1,51 @@
package opentsdb
import (
"bytes"
"encoding/json"
"testing"
)
var stringtests = []struct {
tv TagValue
json []byte
}{
{TagValue("foo-bar-42"), []byte(`"foo-bar-42"`)},
{TagValue("foo_bar_42"), []byte(`"foo__bar__42"`)},
{TagValue("http://example.org:8080"), []byte(`"http_.//example.org_.8080"`)},
{TagValue("Björn's email: bjoern@soundcloud.com"), []byte(`"Bj_C3_B6rn_27s_20email_._20bjoern_40soundcloud.com"`)},
{TagValue("日"), []byte(`"_E6_97_A5"`)},
}
func TestTagValueMarshaling(t *testing.T) {
for i, tt := range stringtests {
json, err := json.Marshal(tt.tv)
if err != nil {
t.Errorf("%d. Marshal(%q) returned err: %s", i, tt.tv, err)
} else {
if !bytes.Equal(json, tt.json) {
t.Errorf(
"%d. Marshal(%q) => %q, want %q",
i, tt.tv, json, tt.json,
)
}
}
}
}
func TestTagValueUnMarshaling(t *testing.T) {
for i, tt := range stringtests {
var tv TagValue
err := json.Unmarshal(tt.json, &tv)
if err != nil {
t.Errorf("%d. Unmarshal(%q, &str) returned err: %s", i, tt.json, err)
} else {
if tv != tt.tv {
t.Errorf(
"%d. Unmarshal(%q, &str) => str==%q, want %q",
i, tt.json, tv, tt.tv,
)
}
}
}
}