Add InfluxDB read-back support to remote storage bridge

This commit is contained in:
Julius Volz 2017-04-02 11:59:11 +02:00
parent 5a896033e3
commit b391cbb808
5 changed files with 309 additions and 54 deletions

View file

@ -72,8 +72,8 @@ func pathFromMetric(m model.Metric, prefix string) string {
return buffer.String() return buffer.String()
} }
// Store sends a batch of samples to Graphite. // Write sends a batch of samples to Graphite.
func (c *Client) Store(samples model.Samples) error { func (c *Client) Write(samples model.Samples) error {
conn, err := net.DialTimeout(c.transport, c.address, c.timeout) conn, err := net.DialTimeout(c.transport, c.address, c.timeout)
if err != nil { if err != nil {
return err return err

View file

@ -14,26 +14,30 @@
package influxdb package influxdb
import ( import (
"encoding/json"
"fmt"
"math" "math"
"strings"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote"
influx "github.com/influxdb/influxdb/client" influx "github.com/influxdata/influxdb/client/v2"
) )
// Client allows sending batches of Prometheus samples to InfluxDB. // Client allows sending batches of Prometheus samples to InfluxDB.
type Client struct { type Client struct {
client *influx.Client client influx.Client
database string database string
retentionPolicy string retentionPolicy string
ignoredSamples prometheus.Counter ignoredSamples prometheus.Counter
} }
// NewClient creates a new Client. // NewClient creates a new Client.
func NewClient(conf influx.Config, db string, rp string) *Client { func NewClient(conf influx.HTTPConfig, db string, rp string) *Client {
c, err := influx.NewClient(conf) c, err := influx.NewHTTPClient(conf)
// Currently influx.NewClient() *should* never return an error. // Currently influx.NewClient() *should* never return an error.
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -63,9 +67,9 @@ func tagsFromMetric(m model.Metric) map[string]string {
return tags return tags
} }
// Store sends a batch of samples to InfluxDB via its HTTP API. // Write sends a batch of samples to InfluxDB via its HTTP API.
func (c *Client) Store(samples model.Samples) error { func (c *Client) Write(samples model.Samples) error {
points := make([]influx.Point, 0, len(samples)) points := make([]*influx.Point, 0, len(samples))
for _, s := range samples { for _, s := range samples {
v := float64(s.Value) v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) { if math.IsNaN(v) || math.IsInf(v, 0) {
@ -73,24 +77,221 @@ func (c *Client) Store(samples model.Samples) error {
c.ignoredSamples.Inc() c.ignoredSamples.Inc()
continue continue
} }
points = append(points, influx.Point{ p, err := influx.NewPoint(
Measurement: string(s.Metric[model.MetricNameLabel]), string(s.Metric[model.MetricNameLabel]),
Tags: tagsFromMetric(s.Metric), tagsFromMetric(s.Metric),
Time: s.Timestamp.Time(), map[string]interface{}{"value": v},
Precision: "ms", s.Timestamp.Time(),
Fields: map[string]interface{}{ )
"value": v, if err != nil {
}, return err
}) }
points = append(points, p)
} }
bps := influx.BatchPoints{ bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{
Points: points, Precision: "ms",
Database: c.database, Database: c.database,
RetentionPolicy: c.retentionPolicy, RetentionPolicy: c.retentionPolicy,
})
if err != nil {
return err
} }
_, err := c.client.Write(bps) bps.AddPoints(points)
return err return c.client.Write(bps)
}
func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) {
labelsToSeries := map[string]*remote.TimeSeries{}
for _, q := range req.Queries {
command, err := buildCommand(q)
if err != nil {
return nil, err
}
query := influx.NewQuery(command, c.database, "ms")
resp, err := c.client.Query(query)
if err != nil {
return nil, err
}
if resp.Err != "" {
return nil, fmt.Errorf(resp.Err)
}
if err = mergeResult(labelsToSeries, resp.Results); err != nil {
return nil, err
}
}
resp := remote.ReadResponse{
Timeseries: make([]*remote.TimeSeries, 0, len(labelsToSeries)),
}
for _, ts := range labelsToSeries {
resp.Timeseries = append(resp.Timeseries, ts)
}
return &resp, nil
}
func buildCommand(q *remote.Query) (string, error) {
matchers := make([]string, 0, len(q.Matchers))
// If we don't find a metric name matcher, query all metrics
// (InfluxDB measurements) by default.
from := "FROM /.+/"
for _, m := range q.Matchers {
if m.Name == model.MetricNameLabel {
switch m.Type {
case remote.MatchType_EQUAL:
from = fmt.Sprintf("FROM %q", m.Value)
case remote.MatchType_REGEX_MATCH:
from = fmt.Sprintf("FROM /^%s$/", escapeSlashes(m.Value))
default:
// TODO: Figure out how to support these efficiently.
return "", fmt.Errorf("non-equal or regex-non-equal matchers are not supported on the metric name yet")
}
continue
}
switch m.Type {
case remote.MatchType_EQUAL:
matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value)))
case remote.MatchType_NOT_EQUAL:
matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value)))
case remote.MatchType_REGEX_MATCH:
matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value)))
case remote.MatchType_REGEX_NO_MATCH:
matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value)))
default:
return "", fmt.Errorf("unknown match type %v", m.Type)
}
}
matchers = append(matchers, fmt.Sprintf("time >= %vms", q.StartTimestampMs))
matchers = append(matchers, fmt.Sprintf("time <= %vms", q.EndTimestampMs))
return fmt.Sprintf("SELECT value %s WHERE %v GROUP BY *", from, strings.Join(matchers, " AND ")), nil
}
func escapeSingleQuotes(str string) string {
return strings.Replace(str, `'`, `\'`, -1)
}
func escapeSlashes(str string) string {
return strings.Replace(str, `/`, `\/`, -1)
}
func mergeResult(labelsToSeries map[string]*remote.TimeSeries, results []influx.Result) error {
for _, r := range results {
for _, s := range r.Series {
k := concatLabels(s.Tags)
ts, ok := labelsToSeries[k]
if !ok {
ts = &remote.TimeSeries{
Labels: tagsToLabelPairs(s.Name, s.Tags),
}
labelsToSeries[k] = ts
}
samples, err := valuesToSamples(s.Values)
if err != nil {
return err
}
ts.Samples = mergeSamples(ts.Samples, samples)
}
}
return nil
}
func concatLabels(labels map[string]string) string {
// 0xff cannot cannot occur in valid UTF-8 sequences, so use it
// as a separator here.
separator := "\xff"
pairs := make([]string, 0, len(labels))
for k, v := range labels {
pairs = append(pairs, k+separator+v)
}
return strings.Join(pairs, separator)
}
func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair {
pairs := make([]*remote.LabelPair, 0, len(tags))
for k, v := range tags {
if v == "" {
// If we select metrics with different sets of labels names,
// InfluxDB returns *all* possible tag names on all returned
// series, with empty tag values on series where they don't
// apply. In Prometheus, an empty label value is equivalent
// to a non-existent label, so we just skip empty ones here
// to make the result correct.
continue
}
pairs = append(pairs, &remote.LabelPair{
Name: k,
Value: v,
})
}
pairs = append(pairs, &remote.LabelPair{
Name: model.MetricNameLabel,
Value: name,
})
return pairs
}
func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) {
samples := make([]*remote.Sample, 0, len(values))
for _, v := range values {
if len(v) != 2 {
return nil, fmt.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v)
}
jsonTimestamp, ok := v[0].(json.Number)
if !ok {
return nil, fmt.Errorf("bad timestamp: %v", v[0])
}
jsonValue, ok := v[1].(json.Number)
if !ok {
return nil, fmt.Errorf("bad sample value: %v", v[1])
}
timestamp, err := jsonTimestamp.Int64()
if err != nil {
return nil, fmt.Errorf("unable to convert sample timestamp to int64: %v", err)
}
value, err := jsonValue.Float64()
if err != nil {
return nil, fmt.Errorf("unable to convert sample value to float64: %v", err)
}
samples = append(samples, &remote.Sample{
TimestampMs: timestamp,
Value: value,
})
}
return samples, nil
}
// mergeSamples merges two lists of sample pairs and removes duplicate
// timestamps. It assumes that both lists are sorted by timestamp.
func mergeSamples(a, b []*remote.Sample) []*remote.Sample {
result := make([]*remote.Sample, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].TimestampMs < b[j].TimestampMs {
result = append(result, a[i])
i++
} else if a[i].TimestampMs > b[j].TimestampMs {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
i++
j++
}
}
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
} }
// Name identifies the client as an InfluxDB client. // Name identifies the client as an InfluxDB client.

View file

@ -22,7 +22,7 @@ import (
"testing" "testing"
"time" "time"
influx "github.com/influxdb/influxdb/client" influx "github.com/influxdata/influxdb/client/v2"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
) )
@ -68,8 +68,8 @@ func TestClient(t *testing.T) {
}, },
} }
expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123000000 expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123
testmetric,test_label=test_label_value2 value=5.1234 123456789123000000 testmetric,test_label=test_label_value2 value=5.1234 123456789123
` `
server := httptest.NewServer(http.HandlerFunc( server := httptest.NewServer(http.HandlerFunc(
@ -97,15 +97,15 @@ testmetric,test_label=test_label_value2 value=5.1234 123456789123000000
t.Fatalf("Unable to parse server URL %s: %s", server.URL, err) t.Fatalf("Unable to parse server URL %s: %s", server.URL, err)
} }
conf := influx.Config{ conf := influx.HTTPConfig{
URL: *serverURL, Addr: serverURL.String(),
Username: "testuser", Username: "testuser",
Password: "testpass", Password: "testpass",
Timeout: time.Minute, Timeout: time.Minute,
} }
c := NewClient(conf, "test_db", "default") c := NewClient(conf, "test_db", "default")
if err := c.Store(samples); err != nil { if err := c.Write(samples); err != nil {
t.Fatalf("Error sending samples: %s", err) t.Fatalf("Error sending samples: %s", err)
} }
} }

View file

@ -16,6 +16,7 @@ package main
import ( import (
"flag" "flag"
"fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
@ -30,7 +31,7 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
influx "github.com/influxdb/influxdb/client" influx "github.com/influxdata/influxdb/client/v2"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/graphite" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/graphite"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/influxdb" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/influxdb"
@ -95,8 +96,8 @@ func main() {
cfg := parseFlags() cfg := parseFlags()
http.Handle(cfg.telemetryPath, prometheus.Handler()) http.Handle(cfg.telemetryPath, prometheus.Handler())
clients := buildClients(cfg) writers, readers := buildClients(cfg)
serve(cfg.listenAddr, clients) serve(cfg.listenAddr, writers, readers)
} }
func parseFlags() *config { func parseFlags() *config {
@ -119,7 +120,7 @@ func parseFlags() *config {
flag.StringVar(&cfg.influxdbURL, "influxdb-url", "", flag.StringVar(&cfg.influxdbURL, "influxdb-url", "",
"The URL of the remote InfluxDB server to send samples to. None, if empty.", "The URL of the remote InfluxDB server to send samples to. None, if empty.",
) )
flag.StringVar(&cfg.influxdbRetentionPolicy, "influxdb.retention-policy", "default", flag.StringVar(&cfg.influxdbRetentionPolicy, "influxdb.retention-policy", "autogen",
"The InfluxDB retention policy to use.", "The InfluxDB retention policy to use.",
) )
flag.StringVar(&cfg.influxdbUsername, "influxdb.username", "", flag.StringVar(&cfg.influxdbUsername, "influxdb.username", "",
@ -139,38 +140,50 @@ func parseFlags() *config {
return cfg return cfg
} }
func buildClients(cfg *config) []remote.StorageClient { type writer interface {
var clients []remote.StorageClient Write(samples model.Samples) error
Name() string
}
type reader interface {
Read(req *remote.ReadRequest) (*remote.ReadResponse, error)
Name() string
}
func buildClients(cfg *config) ([]writer, []reader) {
var writers []writer
var readers []reader
if cfg.graphiteAddress != "" { if cfg.graphiteAddress != "" {
c := graphite.NewClient( c := graphite.NewClient(
cfg.graphiteAddress, cfg.graphiteTransport, cfg.graphiteAddress, cfg.graphiteTransport,
cfg.remoteTimeout, cfg.graphitePrefix) cfg.remoteTimeout, cfg.graphitePrefix)
clients = append(clients, c) writers = append(writers, c)
} }
if cfg.opentsdbURL != "" { if cfg.opentsdbURL != "" {
c := opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteTimeout) c := opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteTimeout)
clients = append(clients, c) writers = append(writers, c)
} }
if cfg.influxdbURL != "" { if cfg.influxdbURL != "" {
url, err := url.Parse(cfg.influxdbURL) url, err := url.Parse(cfg.influxdbURL)
if err != nil { if err != nil {
log.Fatalf("Failed to parse InfluxDB URL %q: %v", cfg.influxdbURL, err) log.Fatalf("Failed to parse InfluxDB URL %q: %v", cfg.influxdbURL, err)
} }
conf := influx.Config{ conf := influx.HTTPConfig{
URL: *url, Addr: url.String(),
Username: cfg.influxdbUsername, Username: cfg.influxdbUsername,
Password: cfg.influxdbPassword, Password: cfg.influxdbPassword,
Timeout: cfg.remoteTimeout, Timeout: cfg.remoteTimeout,
} }
c := influxdb.NewClient(conf, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy) c := influxdb.NewClient(conf, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy)
prometheus.MustRegister(c) prometheus.MustRegister(c)
clients = append(clients, c) writers = append(writers, c)
readers = append(readers, c)
} }
return clients return writers, readers
} }
func serve(addr string, clients []remote.StorageClient) error { func serve(addr string, writers []writer, readers []reader) error {
http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body)) reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body))
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
@ -187,16 +200,57 @@ func serve(addr string, clients []remote.StorageClient) error {
receivedSamples.Add(float64(len(samples))) receivedSamples.Add(float64(len(samples)))
var wg sync.WaitGroup var wg sync.WaitGroup
for _, c := range clients { for _, w := range writers {
wg.Add(1) wg.Add(1)
go func(rc remote.StorageClient) { go func(rw writer) {
sendSamples(rc, samples) sendSamples(rw, samples)
wg.Done() wg.Done()
}(c) }(w)
} }
wg.Wait() wg.Wait()
}) })
http.HandleFunc("/read", func(w http.ResponseWriter, r *http.Request) {
reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var req remote.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// TODO: Support reading from more than one reader and merging the results.
if len(readers) != 1 {
http.Error(w, fmt.Sprintf("expected exactly one reader, found %d readers", len(readers)), http.StatusInternalServerError)
return
}
reader := readers[0]
var resp *remote.ReadResponse
resp, err = reader.Read(&req)
if err != nil {
log.With("query", req).With("storage", reader.Name()).With("err", err).Warnf("Error executing query")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
data, err := proto.Marshal(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/x-protobuf")
if _, err := snappy.NewWriter(w).Write(data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
})
return http.ListenAndServe(addr, nil) return http.ListenAndServe(addr, nil)
} }
@ -219,14 +273,14 @@ func protoToSamples(req *remote.WriteRequest) model.Samples {
return samples return samples
} }
func sendSamples(c remote.StorageClient, samples model.Samples) { func sendSamples(w writer, samples model.Samples) {
begin := time.Now() begin := time.Now()
err := c.Store(samples) err := w.Write(samples)
duration := time.Since(begin).Seconds() duration := time.Since(begin).Seconds()
if err != nil { if err != nil {
log.Warnf("Error sending %d samples to remote storage %q: %v", len(samples), c.Name(), err) log.With("num_samples", len(samples)).With("storage", w.Name()).With("err", err).Warnf("Error sending samples to remote storage")
failedSamples.WithLabelValues(c.Name()).Add(float64(len(samples))) failedSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))
} }
sentSamples.WithLabelValues(c.Name()).Add(float64(len(samples))) sentSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))
sentBatchDuration.WithLabelValues(c.Name()).Observe(duration) sentBatchDuration.WithLabelValues(w.Name()).Observe(duration)
} }

View file

@ -69,8 +69,8 @@ func tagsFromMetric(m model.Metric) map[string]TagValue {
return tags return tags
} }
// Store sends a batch of samples to OpenTSDB via its HTTP API. // Write sends a batch of samples to OpenTSDB via its HTTP API.
func (c *Client) Store(samples model.Samples) error { func (c *Client) Write(samples model.Samples) error {
reqs := make([]StoreSamplesRequest, 0, len(samples)) reqs := make([]StoreSamplesRequest, 0, len(samples))
for _, s := range samples { for _, s := range samples {
v := float64(s.Value) v := float64(s.Value)