Merge pull request #2557 from prometheus/influxdb-read

Add InfluxDB read-back support to remote storage bridge
This commit is contained in:
Julius Volz 2017-04-03 18:29:22 +02:00 committed by GitHub
commit eda4286484
20 changed files with 3529 additions and 1637 deletions

View file

@ -1,10 +1,13 @@
# Remote storage bridge # Remote storage bridge
This is a bridge that receives samples in Prometheus's remote storage This is a bridge that receives samples via Prometheus's remote write
format and forwards them to Graphite, InfluxDB, or OpenTSDB. It is meant protocol and stores them in Graphite, InfluxDB, or OpenTSDB. It is meant
as a replacement for the built-in specific remote storage implementations as a replacement for the built-in specific remote storage implementations
that have been removed from Prometheus. that have been removed from Prometheus.
For InfluxDB, this bridge also supports reading back data through
Prometheus via Prometheus's remote read protocol.
## Building ## Building
``` ```
@ -13,10 +16,22 @@ go build
## Running ## Running
Example: Graphite example:
``` ```
./remote_storage_bridge -graphite-address=localhost:8080 -opentsdb-url=http://localhost:8081/ ./remote_storage_bridge -graphite-address=localhost:8080
```
OpenTSDB example:
```
./remote_storage_bridge -opentsdb-url=http://localhost:8081/
```
InfluxDB example:
```
./remote_storage_bridge -influxdb-url=http://localhost:8086/ -influxdb.database=prometheus -influxdb.retention-policy=autogen
``` ```
To show all flags: To show all flags:
@ -30,6 +45,11 @@ To show all flags:
To configure Prometheus to send samples to this bridge, add the following to your `prometheus.yml`: To configure Prometheus to send samples to this bridge, add the following to your `prometheus.yml`:
```yaml ```yaml
# Remote write configuration (for Graphite, OpenTSDB, or InfluxDB).
remote_write: remote_write:
url: "http://localhost:9201/receive" - url: "http://localhost:9201/write"
# Remote read configuration (for InfluxDB only at the moment).
remote_read:
- url: "http://localhost:9201/read"
``` ```

View file

@ -72,8 +72,8 @@ func pathFromMetric(m model.Metric, prefix string) string {
return buffer.String() return buffer.String()
} }
// Store sends a batch of samples to Graphite. // Write sends a batch of samples to Graphite.
func (c *Client) Store(samples model.Samples) error { func (c *Client) Write(samples model.Samples) error {
conn, err := net.DialTimeout(c.transport, c.address, c.timeout) conn, err := net.DialTimeout(c.transport, c.address, c.timeout)
if err != nil { if err != nil {
return err return err

View file

@ -14,26 +14,30 @@
package influxdb package influxdb
import ( import (
"encoding/json"
"fmt"
"math" "math"
"strings"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote"
influx "github.com/influxdb/influxdb/client" influx "github.com/influxdata/influxdb/client/v2"
) )
// Client allows sending batches of Prometheus samples to InfluxDB. // Client allows sending batches of Prometheus samples to InfluxDB.
type Client struct { type Client struct {
client *influx.Client client influx.Client
database string database string
retentionPolicy string retentionPolicy string
ignoredSamples prometheus.Counter ignoredSamples prometheus.Counter
} }
// NewClient creates a new Client. // NewClient creates a new Client.
func NewClient(conf influx.Config, db string, rp string) *Client { func NewClient(conf influx.HTTPConfig, db string, rp string) *Client {
c, err := influx.NewClient(conf) c, err := influx.NewHTTPClient(conf)
// Currently influx.NewClient() *should* never return an error. // Currently influx.NewClient() *should* never return an error.
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -63,9 +67,9 @@ func tagsFromMetric(m model.Metric) map[string]string {
return tags return tags
} }
// Store sends a batch of samples to InfluxDB via its HTTP API. // Write sends a batch of samples to InfluxDB via its HTTP API.
func (c *Client) Store(samples model.Samples) error { func (c *Client) Write(samples model.Samples) error {
points := make([]influx.Point, 0, len(samples)) points := make([]*influx.Point, 0, len(samples))
for _, s := range samples { for _, s := range samples {
v := float64(s.Value) v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) { if math.IsNaN(v) || math.IsInf(v, 0) {
@ -73,24 +77,221 @@ func (c *Client) Store(samples model.Samples) error {
c.ignoredSamples.Inc() c.ignoredSamples.Inc()
continue continue
} }
points = append(points, influx.Point{ p, err := influx.NewPoint(
Measurement: string(s.Metric[model.MetricNameLabel]), string(s.Metric[model.MetricNameLabel]),
Tags: tagsFromMetric(s.Metric), tagsFromMetric(s.Metric),
Time: s.Timestamp.Time(), map[string]interface{}{"value": v},
Precision: "ms", s.Timestamp.Time(),
Fields: map[string]interface{}{ )
"value": v, if err != nil {
}, return err
}) }
points = append(points, p)
} }
bps := influx.BatchPoints{ bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{
Points: points, Precision: "ms",
Database: c.database, Database: c.database,
RetentionPolicy: c.retentionPolicy, RetentionPolicy: c.retentionPolicy,
})
if err != nil {
return err
} }
_, err := c.client.Write(bps) bps.AddPoints(points)
return err return c.client.Write(bps)
}
func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) {
labelsToSeries := map[string]*remote.TimeSeries{}
for _, q := range req.Queries {
command, err := buildCommand(q)
if err != nil {
return nil, err
}
query := influx.NewQuery(command, c.database, "ms")
resp, err := c.client.Query(query)
if err != nil {
return nil, err
}
if resp.Err != "" {
return nil, fmt.Errorf(resp.Err)
}
if err = mergeResult(labelsToSeries, resp.Results); err != nil {
return nil, err
}
}
resp := remote.ReadResponse{
Timeseries: make([]*remote.TimeSeries, 0, len(labelsToSeries)),
}
for _, ts := range labelsToSeries {
resp.Timeseries = append(resp.Timeseries, ts)
}
return &resp, nil
}
func buildCommand(q *remote.Query) (string, error) {
matchers := make([]string, 0, len(q.Matchers))
// If we don't find a metric name matcher, query all metrics
// (InfluxDB measurements) by default.
from := "FROM /.+/"
for _, m := range q.Matchers {
if m.Name == model.MetricNameLabel {
switch m.Type {
case remote.MatchType_EQUAL:
from = fmt.Sprintf("FROM %q", m.Value)
case remote.MatchType_REGEX_MATCH:
from = fmt.Sprintf("FROM /^%s$/", escapeSlashes(m.Value))
default:
// TODO: Figure out how to support these efficiently.
return "", fmt.Errorf("non-equal or regex-non-equal matchers are not supported on the metric name yet")
}
continue
}
switch m.Type {
case remote.MatchType_EQUAL:
matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value)))
case remote.MatchType_NOT_EQUAL:
matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value)))
case remote.MatchType_REGEX_MATCH:
matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value)))
case remote.MatchType_REGEX_NO_MATCH:
matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value)))
default:
return "", fmt.Errorf("unknown match type %v", m.Type)
}
}
matchers = append(matchers, fmt.Sprintf("time >= %vms", q.StartTimestampMs))
matchers = append(matchers, fmt.Sprintf("time <= %vms", q.EndTimestampMs))
return fmt.Sprintf("SELECT value %s WHERE %v GROUP BY *", from, strings.Join(matchers, " AND ")), nil
}
func escapeSingleQuotes(str string) string {
return strings.Replace(str, `'`, `\'`, -1)
}
func escapeSlashes(str string) string {
return strings.Replace(str, `/`, `\/`, -1)
}
func mergeResult(labelsToSeries map[string]*remote.TimeSeries, results []influx.Result) error {
for _, r := range results {
for _, s := range r.Series {
k := concatLabels(s.Tags)
ts, ok := labelsToSeries[k]
if !ok {
ts = &remote.TimeSeries{
Labels: tagsToLabelPairs(s.Name, s.Tags),
}
labelsToSeries[k] = ts
}
samples, err := valuesToSamples(s.Values)
if err != nil {
return err
}
ts.Samples = mergeSamples(ts.Samples, samples)
}
}
return nil
}
func concatLabels(labels map[string]string) string {
// 0xff cannot cannot occur in valid UTF-8 sequences, so use it
// as a separator here.
separator := "\xff"
pairs := make([]string, 0, len(labels))
for k, v := range labels {
pairs = append(pairs, k+separator+v)
}
return strings.Join(pairs, separator)
}
func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair {
pairs := make([]*remote.LabelPair, 0, len(tags))
for k, v := range tags {
if v == "" {
// If we select metrics with different sets of labels names,
// InfluxDB returns *all* possible tag names on all returned
// series, with empty tag values on series where they don't
// apply. In Prometheus, an empty label value is equivalent
// to a non-existent label, so we just skip empty ones here
// to make the result correct.
continue
}
pairs = append(pairs, &remote.LabelPair{
Name: k,
Value: v,
})
}
pairs = append(pairs, &remote.LabelPair{
Name: model.MetricNameLabel,
Value: name,
})
return pairs
}
func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) {
samples := make([]*remote.Sample, 0, len(values))
for _, v := range values {
if len(v) != 2 {
return nil, fmt.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v)
}
jsonTimestamp, ok := v[0].(json.Number)
if !ok {
return nil, fmt.Errorf("bad timestamp: %v", v[0])
}
jsonValue, ok := v[1].(json.Number)
if !ok {
return nil, fmt.Errorf("bad sample value: %v", v[1])
}
timestamp, err := jsonTimestamp.Int64()
if err != nil {
return nil, fmt.Errorf("unable to convert sample timestamp to int64: %v", err)
}
value, err := jsonValue.Float64()
if err != nil {
return nil, fmt.Errorf("unable to convert sample value to float64: %v", err)
}
samples = append(samples, &remote.Sample{
TimestampMs: timestamp,
Value: value,
})
}
return samples, nil
}
// mergeSamples merges two lists of sample pairs and removes duplicate
// timestamps. It assumes that both lists are sorted by timestamp.
func mergeSamples(a, b []*remote.Sample) []*remote.Sample {
result := make([]*remote.Sample, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].TimestampMs < b[j].TimestampMs {
result = append(result, a[i])
i++
} else if a[i].TimestampMs > b[j].TimestampMs {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
i++
j++
}
}
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
} }
// Name identifies the client as an InfluxDB client. // Name identifies the client as an InfluxDB client.

View file

@ -22,7 +22,7 @@ import (
"testing" "testing"
"time" "time"
influx "github.com/influxdb/influxdb/client" influx "github.com/influxdata/influxdb/client/v2"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
) )
@ -68,8 +68,8 @@ func TestClient(t *testing.T) {
}, },
} }
expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123000000 expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123
testmetric,test_label=test_label_value2 value=5.1234 123456789123000000 testmetric,test_label=test_label_value2 value=5.1234 123456789123
` `
server := httptest.NewServer(http.HandlerFunc( server := httptest.NewServer(http.HandlerFunc(
@ -97,15 +97,15 @@ testmetric,test_label=test_label_value2 value=5.1234 123456789123000000
t.Fatalf("Unable to parse server URL %s: %s", server.URL, err) t.Fatalf("Unable to parse server URL %s: %s", server.URL, err)
} }
conf := influx.Config{ conf := influx.HTTPConfig{
URL: *serverURL, Addr: serverURL.String(),
Username: "testuser", Username: "testuser",
Password: "testpass", Password: "testpass",
Timeout: time.Minute, Timeout: time.Minute,
} }
c := NewClient(conf, "test_db", "default") c := NewClient(conf, "test_db", "default")
if err := c.Store(samples); err != nil { if err := c.Write(samples); err != nil {
t.Fatalf("Error sending samples: %s", err) t.Fatalf("Error sending samples: %s", err)
} }
} }

View file

@ -16,6 +16,7 @@ package main
import ( import (
"flag" "flag"
"fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
@ -30,7 +31,7 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
influx "github.com/influxdb/influxdb/client" influx "github.com/influxdata/influxdb/client/v2"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/graphite" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/graphite"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/influxdb" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/influxdb"
@ -95,8 +96,8 @@ func main() {
cfg := parseFlags() cfg := parseFlags()
http.Handle(cfg.telemetryPath, prometheus.Handler()) http.Handle(cfg.telemetryPath, prometheus.Handler())
clients := buildClients(cfg) writers, readers := buildClients(cfg)
serve(cfg.listenAddr, clients) serve(cfg.listenAddr, writers, readers)
} }
func parseFlags() *config { func parseFlags() *config {
@ -119,7 +120,7 @@ func parseFlags() *config {
flag.StringVar(&cfg.influxdbURL, "influxdb-url", "", flag.StringVar(&cfg.influxdbURL, "influxdb-url", "",
"The URL of the remote InfluxDB server to send samples to. None, if empty.", "The URL of the remote InfluxDB server to send samples to. None, if empty.",
) )
flag.StringVar(&cfg.influxdbRetentionPolicy, "influxdb.retention-policy", "default", flag.StringVar(&cfg.influxdbRetentionPolicy, "influxdb.retention-policy", "autogen",
"The InfluxDB retention policy to use.", "The InfluxDB retention policy to use.",
) )
flag.StringVar(&cfg.influxdbUsername, "influxdb.username", "", flag.StringVar(&cfg.influxdbUsername, "influxdb.username", "",
@ -139,38 +140,50 @@ func parseFlags() *config {
return cfg return cfg
} }
func buildClients(cfg *config) []remote.StorageClient { type writer interface {
var clients []remote.StorageClient Write(samples model.Samples) error
Name() string
}
type reader interface {
Read(req *remote.ReadRequest) (*remote.ReadResponse, error)
Name() string
}
func buildClients(cfg *config) ([]writer, []reader) {
var writers []writer
var readers []reader
if cfg.graphiteAddress != "" { if cfg.graphiteAddress != "" {
c := graphite.NewClient( c := graphite.NewClient(
cfg.graphiteAddress, cfg.graphiteTransport, cfg.graphiteAddress, cfg.graphiteTransport,
cfg.remoteTimeout, cfg.graphitePrefix) cfg.remoteTimeout, cfg.graphitePrefix)
clients = append(clients, c) writers = append(writers, c)
} }
if cfg.opentsdbURL != "" { if cfg.opentsdbURL != "" {
c := opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteTimeout) c := opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteTimeout)
clients = append(clients, c) writers = append(writers, c)
} }
if cfg.influxdbURL != "" { if cfg.influxdbURL != "" {
url, err := url.Parse(cfg.influxdbURL) url, err := url.Parse(cfg.influxdbURL)
if err != nil { if err != nil {
log.Fatalf("Failed to parse InfluxDB URL %q: %v", cfg.influxdbURL, err) log.Fatalf("Failed to parse InfluxDB URL %q: %v", cfg.influxdbURL, err)
} }
conf := influx.Config{ conf := influx.HTTPConfig{
URL: *url, Addr: url.String(),
Username: cfg.influxdbUsername, Username: cfg.influxdbUsername,
Password: cfg.influxdbPassword, Password: cfg.influxdbPassword,
Timeout: cfg.remoteTimeout, Timeout: cfg.remoteTimeout,
} }
c := influxdb.NewClient(conf, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy) c := influxdb.NewClient(conf, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy)
prometheus.MustRegister(c) prometheus.MustRegister(c)
clients = append(clients, c) writers = append(writers, c)
readers = append(readers, c)
} }
return clients return writers, readers
} }
func serve(addr string, clients []remote.StorageClient) error { func serve(addr string, writers []writer, readers []reader) error {
http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body)) reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body))
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
@ -187,16 +200,57 @@ func serve(addr string, clients []remote.StorageClient) error {
receivedSamples.Add(float64(len(samples))) receivedSamples.Add(float64(len(samples)))
var wg sync.WaitGroup var wg sync.WaitGroup
for _, c := range clients { for _, w := range writers {
wg.Add(1) wg.Add(1)
go func(rc remote.StorageClient) { go func(rw writer) {
sendSamples(rc, samples) sendSamples(rw, samples)
wg.Done() wg.Done()
}(c) }(w)
} }
wg.Wait() wg.Wait()
}) })
http.HandleFunc("/read", func(w http.ResponseWriter, r *http.Request) {
reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var req remote.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// TODO: Support reading from more than one reader and merging the results.
if len(readers) != 1 {
http.Error(w, fmt.Sprintf("expected exactly one reader, found %d readers", len(readers)), http.StatusInternalServerError)
return
}
reader := readers[0]
var resp *remote.ReadResponse
resp, err = reader.Read(&req)
if err != nil {
log.With("query", req).With("storage", reader.Name()).With("err", err).Warnf("Error executing query")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
data, err := proto.Marshal(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/x-protobuf")
if _, err := snappy.NewWriter(w).Write(data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
})
return http.ListenAndServe(addr, nil) return http.ListenAndServe(addr, nil)
} }
@ -219,14 +273,14 @@ func protoToSamples(req *remote.WriteRequest) model.Samples {
return samples return samples
} }
func sendSamples(c remote.StorageClient, samples model.Samples) { func sendSamples(w writer, samples model.Samples) {
begin := time.Now() begin := time.Now()
err := c.Store(samples) err := w.Write(samples)
duration := time.Since(begin).Seconds() duration := time.Since(begin).Seconds()
if err != nil { if err != nil {
log.Warnf("Error sending %d samples to remote storage %q: %v", len(samples), c.Name(), err) log.With("num_samples", len(samples)).With("storage", w.Name()).With("err", err).Warnf("Error sending samples to remote storage")
failedSamples.WithLabelValues(c.Name()).Add(float64(len(samples))) failedSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))
} }
sentSamples.WithLabelValues(c.Name()).Add(float64(len(samples))) sentSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))
sentBatchDuration.WithLabelValues(c.Name()).Observe(duration) sentBatchDuration.WithLabelValues(w.Name()).Observe(duration)
} }

View file

@ -69,8 +69,8 @@ func tagsFromMetric(m model.Metric) map[string]TagValue {
return tags return tags
} }
// Store sends a batch of samples to OpenTSDB via its HTTP API. // Write sends a batch of samples to OpenTSDB via its HTTP API.
func (c *Client) Store(samples model.Samples) error { func (c *Client) Write(samples model.Samples) error {
reqs := make([]StoreSamplesRequest, 0, len(samples)) reqs := make([]StoreSamplesRequest, 0, len(samples))
for _, s := range samples { for _, s := range samples {
v := float64(s.Value) v := float64(s.Value)

View file

@ -0,0 +1,609 @@
// Package client (v2) is the current official Go client for InfluxDB.
package client // import "github.com/influxdata/influxdb/client/v2"
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb/models"
)
// HTTPConfig is the config data needed to create an HTTP Client.
type HTTPConfig struct {
// Addr should be of the form "http://host:port"
// or "http://[ipv6-host%zone]:port".
Addr string
// Username is the influxdb username, optional.
Username string
// Password is the influxdb password, optional.
Password string
// UserAgent is the http User Agent, defaults to "InfluxDBClient".
UserAgent string
// Timeout for influxdb writes, defaults to no timeout.
Timeout time.Duration
// InsecureSkipVerify gets passed to the http client, if true, it will
// skip https certificate verification. Defaults to false.
InsecureSkipVerify bool
// TLSConfig allows the user to set their own TLS config for the HTTP
// Client. If set, this option overrides InsecureSkipVerify.
TLSConfig *tls.Config
}
// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct.
type BatchPointsConfig struct {
// Precision is the write precision of the points, defaults to "ns".
Precision string
// Database is the database to write points to.
Database string
// RetentionPolicy is the retention policy of the points.
RetentionPolicy string
// Write consistency is the number of servers required to confirm write.
WriteConsistency string
}
// Client is a client interface for writing & querying the database.
type Client interface {
// Ping checks that status of cluster, and will always return 0 time and no
// error for UDP clients.
Ping(timeout time.Duration) (time.Duration, string, error)
// Write takes a BatchPoints object and writes all Points to InfluxDB.
Write(bp BatchPoints) error
// Query makes an InfluxDB Query on the database. This will fail if using
// the UDP client.
Query(q Query) (*Response, error)
// Close releases any resources a Client may be using.
Close() error
}
// NewHTTPClient returns a new Client from the provided config.
// Client is safe for concurrent use by multiple goroutines.
func NewHTTPClient(conf HTTPConfig) (Client, error) {
if conf.UserAgent == "" {
conf.UserAgent = "InfluxDBClient"
}
u, err := url.Parse(conf.Addr)
if err != nil {
return nil, err
} else if u.Scheme != "http" && u.Scheme != "https" {
m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
" must start with http:// or https://", u.Scheme)
return nil, errors.New(m)
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
}
return &client{
url: *u,
username: conf.Username,
password: conf.Password,
useragent: conf.UserAgent,
httpClient: &http.Client{
Timeout: conf.Timeout,
Transport: tr,
},
transport: tr,
}, nil
}
// Ping will check to see if the server is up with an optional timeout on waiting for leader.
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
now := time.Now()
u := c.url
u.Path = "ping"
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return 0, "", err
}
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
if timeout > 0 {
params := req.URL.Query()
params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
req.URL.RawQuery = params.Encode()
}
resp, err := c.httpClient.Do(req)
if err != nil {
return 0, "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, "", err
}
if resp.StatusCode != http.StatusNoContent {
var err = fmt.Errorf(string(body))
return 0, "", err
}
version := resp.Header.Get("X-Influxdb-Version")
return time.Since(now), version, nil
}
// Close releases the client's resources.
func (c *client) Close() error {
c.transport.CloseIdleConnections()
return nil
}
// client is safe for concurrent use as the fields are all read-only
// once the client is instantiated.
type client struct {
// N.B - if url.UserInfo is accessed in future modifications to the
// methods on client, you will need to syncronise access to url.
url url.URL
username string
password string
useragent string
httpClient *http.Client
transport *http.Transport
}
// BatchPoints is an interface into a batched grouping of points to write into
// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
// batch for each goroutine.
type BatchPoints interface {
// AddPoint adds the given point to the Batch of points.
AddPoint(p *Point)
// AddPoints adds the given points to the Batch of points.
AddPoints(ps []*Point)
// Points lists the points in the Batch.
Points() []*Point
// Precision returns the currently set precision of this Batch.
Precision() string
// SetPrecision sets the precision of this batch.
SetPrecision(s string) error
// Database returns the currently set database of this Batch.
Database() string
// SetDatabase sets the database of this Batch.
SetDatabase(s string)
// WriteConsistency returns the currently set write consistency of this Batch.
WriteConsistency() string
// SetWriteConsistency sets the write consistency of this Batch.
SetWriteConsistency(s string)
// RetentionPolicy returns the currently set retention policy of this Batch.
RetentionPolicy() string
// SetRetentionPolicy sets the retention policy of this Batch.
SetRetentionPolicy(s string)
}
// NewBatchPoints returns a BatchPoints interface based on the given config.
func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
if conf.Precision == "" {
conf.Precision = "ns"
}
if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
return nil, err
}
bp := &batchpoints{
database: conf.Database,
precision: conf.Precision,
retentionPolicy: conf.RetentionPolicy,
writeConsistency: conf.WriteConsistency,
}
return bp, nil
}
type batchpoints struct {
points []*Point
database string
precision string
retentionPolicy string
writeConsistency string
}
func (bp *batchpoints) AddPoint(p *Point) {
bp.points = append(bp.points, p)
}
func (bp *batchpoints) AddPoints(ps []*Point) {
bp.points = append(bp.points, ps...)
}
func (bp *batchpoints) Points() []*Point {
return bp.points
}
func (bp *batchpoints) Precision() string {
return bp.precision
}
func (bp *batchpoints) Database() string {
return bp.database
}
func (bp *batchpoints) WriteConsistency() string {
return bp.writeConsistency
}
func (bp *batchpoints) RetentionPolicy() string {
return bp.retentionPolicy
}
func (bp *batchpoints) SetPrecision(p string) error {
if _, err := time.ParseDuration("1" + p); err != nil {
return err
}
bp.precision = p
return nil
}
func (bp *batchpoints) SetDatabase(db string) {
bp.database = db
}
func (bp *batchpoints) SetWriteConsistency(wc string) {
bp.writeConsistency = wc
}
func (bp *batchpoints) SetRetentionPolicy(rp string) {
bp.retentionPolicy = rp
}
// Point represents a single data point.
type Point struct {
pt models.Point
}
// NewPoint returns a point with the given timestamp. If a timestamp is not
// given, then data is sent to the database without a timestamp, in which case
// the server will assign local time upon reception. NOTE: it is recommended to
// send data with a timestamp.
func NewPoint(
name string,
tags map[string]string,
fields map[string]interface{},
t ...time.Time,
) (*Point, error) {
var T time.Time
if len(t) > 0 {
T = t[0]
}
pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
if err != nil {
return nil, err
}
return &Point{
pt: pt,
}, nil
}
// String returns a line-protocol string of the Point.
func (p *Point) String() string {
return p.pt.String()
}
// PrecisionString returns a line-protocol string of the Point,
// with the timestamp formatted for the given precision.
func (p *Point) PrecisionString(precison string) string {
return p.pt.PrecisionString(precison)
}
// Name returns the measurement name of the point.
func (p *Point) Name() string {
return p.pt.Name()
}
// Tags returns the tags associated with the point.
func (p *Point) Tags() map[string]string {
return p.pt.Tags().Map()
}
// Time return the timestamp for the point.
func (p *Point) Time() time.Time {
return p.pt.Time()
}
// UnixNano returns timestamp of the point in nanoseconds since Unix epoch.
func (p *Point) UnixNano() int64 {
return p.pt.UnixNano()
}
// Fields returns the fields for the point.
func (p *Point) Fields() (map[string]interface{}, error) {
return p.pt.Fields()
}
// NewPointFrom returns a point from the provided models.Point.
func NewPointFrom(pt models.Point) *Point {
return &Point{pt: pt}
}
func (c *client) Write(bp BatchPoints) error {
var b bytes.Buffer
for _, p := range bp.Points() {
if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
return err
}
if err := b.WriteByte('\n'); err != nil {
return err
}
}
u := c.url
u.Path = "write"
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", bp.Database())
params.Set("rp", bp.RetentionPolicy())
params.Set("precision", bp.Precision())
params.Set("consistency", bp.WriteConsistency())
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = fmt.Errorf(string(body))
return err
}
return nil
}
// Query defines a query to send to the server.
type Query struct {
Command string
Database string
Precision string
Chunked bool
ChunkSize int
Parameters map[string]interface{}
}
// NewQuery returns a query object.
// The database and precision arguments can be empty strings if they are not needed for the query.
func NewQuery(command, database, precision string) Query {
return Query{
Command: command,
Database: database,
Precision: precision,
Parameters: make(map[string]interface{}),
}
}
// NewQueryWithParameters returns a query object.
// The database and precision arguments can be empty strings if they are not needed for the query.
// parameters is a map of the parameter names used in the command to their values.
func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query {
return Query{
Command: command,
Database: database,
Precision: precision,
Parameters: parameters,
}
}
// Response represents a list of statement results.
type Response struct {
Results []Result
Err string `json:"error,omitempty"`
}
// Error returns the first error from any statement.
// It returns nil if no errors occurred on any statements.
func (r *Response) Error() error {
if r.Err != "" {
return fmt.Errorf(r.Err)
}
for _, result := range r.Results {
if result.Err != "" {
return fmt.Errorf(result.Err)
}
}
return nil
}
// Message represents a user message.
type Message struct {
Level string
Text string
}
// Result represents a resultset returned from a single statement.
type Result struct {
Series []models.Row
Messages []*Message
Err string `json:"error,omitempty"`
}
// Query sends a command to the server and returns the Response.
func (c *client) Query(q Query) (*Response, error) {
u := c.url
u.Path = "query"
jsonParameters, err := json.Marshal(q.Parameters)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("q", q.Command)
params.Set("db", q.Database)
params.Set("params", string(jsonParameters))
if q.Chunked {
params.Set("chunked", "true")
if q.ChunkSize > 0 {
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
}
if q.Precision != "" {
params.Set("epoch", q.Precision)
}
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var response Response
if q.Chunked {
cr := NewChunkedResponse(resp.Body)
for {
r, err := cr.NextResponse()
if err != nil {
// If we got an error while decoding the response, send that back.
return nil, err
}
if r == nil {
break
}
response.Results = append(response.Results, r.Results...)
if r.Err != "" {
response.Err = r.Err
break
}
}
} else {
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)
// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
}
}
// If we don't have an error in our json response, and didn't get statusOK
// then send back an error
if resp.StatusCode != http.StatusOK && response.Error() == nil {
return &response, fmt.Errorf("received status code %d from server",
resp.StatusCode)
}
return &response, nil
}
// duplexReader reads responses and writes it to another writer while
// satisfying the reader interface.
type duplexReader struct {
r io.Reader
w io.Writer
}
func (r *duplexReader) Read(p []byte) (n int, err error) {
n, err = r.r.Read(p)
if err == nil {
r.w.Write(p[:n])
}
return n, err
}
// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
dec *json.Decoder
duplex *duplexReader
buf bytes.Buffer
}
// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
resp := &ChunkedResponse{}
resp.duplex = &duplexReader{r: r, w: &resp.buf}
resp.dec = json.NewDecoder(resp.duplex)
resp.dec.UseNumber()
return resp
}
// NextResponse reads the next line of the stream and returns a response.
func (r *ChunkedResponse) NextResponse() (*Response, error) {
var response Response
if err := r.dec.Decode(&response); err != nil {
if err == io.EOF {
return nil, nil
}
// A decoding error happened. This probably means the server crashed
// and sent a last-ditch error message to us. Ensure we have read the
// entirety of the connection to get any remaining error text.
io.Copy(ioutil.Discard, r.duplex)
return nil, errors.New(strings.TrimSpace(r.buf.String()))
}
r.buf.Reset()
return &response, nil
}

112
vendor/github.com/influxdata/influxdb/client/v2/udp.go generated vendored Normal file
View file

@ -0,0 +1,112 @@
package client
import (
"fmt"
"io"
"net"
"time"
)
const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512
)
// UDPConfig is the config data needed to create a UDP Client.
type UDPConfig struct {
// Addr should be of the form "host:port"
// or "[ipv6-host%zone]:port".
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPPayloadSize.
PayloadSize int
}
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
// service from the given config.
func NewUDPClient(conf UDPConfig) (Client, error) {
var udpAddr *net.UDPAddr
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
payloadSize := conf.PayloadSize
if payloadSize == 0 {
payloadSize = UDPPayloadSize
}
return &udpclient{
conn: conn,
payloadSize: payloadSize,
}, nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
}
type udpclient struct {
conn io.WriteCloser
payloadSize int
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
var d, _ = time.ParseDuration("1" + bp.Precision())
var delayedError error
var checkBuffer = func(n int) {
if len(b) > 0 && len(b)+n > uc.payloadSize {
if _, err := uc.conn.Write(b); err != nil {
delayedError = err
}
b = b[:0]
}
}
for _, p := range bp.Points() {
p.pt.Round(d)
pointSize := p.pt.StringSize() + 1 // include newline in size
//point := p.pt.RoundedString(d) + "\n"
checkBuffer(pointSize)
if p.Time().IsZero() || pointSize <= uc.payloadSize {
b = p.pt.AppendString(b)
b = append(b, '\n')
continue
}
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
for _, sp := range points {
checkBuffer(sp.StringSize() + 1)
b = sp.AppendString(b)
b = append(b, '\n')
}
}
if len(b) > 0 {
if _, err := uc.conn.Write(b); err != nil {
return err
}
}
return delayedError
}
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}

View file

@ -0,0 +1,48 @@
package models
import (
"errors"
"strings"
)
// ConsistencyLevel represent a required replication criteria before a write can
// be returned as successful.
//
// The consistency level is handled in open-source InfluxDB but only applicable to clusters.
type ConsistencyLevel int
const (
// ConsistencyLevelAny allows for hinted handoff, potentially no write happened yet.
ConsistencyLevelAny ConsistencyLevel = iota
// ConsistencyLevelOne requires at least one data node acknowledged a write.
ConsistencyLevelOne
// ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write.
ConsistencyLevelQuorum
// ConsistencyLevelAll requires all data nodes to acknowledge a write.
ConsistencyLevelAll
)
var (
// ErrInvalidConsistencyLevel is returned when parsing the string version
// of a consistency level.
ErrInvalidConsistencyLevel = errors.New("invalid consistency level")
)
// ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const.
func ParseConsistencyLevel(level string) (ConsistencyLevel, error) {
switch strings.ToLower(level) {
case "any":
return ConsistencyLevelAny, nil
case "one":
return ConsistencyLevelOne, nil
case "quorum":
return ConsistencyLevelQuorum, nil
case "all":
return ConsistencyLevelAll, nil
default:
return 0, ErrInvalidConsistencyLevel
}
}

View file

@ -0,0 +1,32 @@
package models // import "github.com/influxdata/influxdb/models"
// from stdlib hash/fnv/fnv.go
const (
prime64 = 1099511628211
offset64 = 14695981039346656037
)
// InlineFNV64a is an alloc-free port of the standard library's fnv64a.
// See https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function.
type InlineFNV64a uint64
// NewInlineFNV64a returns a new instance of InlineFNV64a.
func NewInlineFNV64a() InlineFNV64a {
return offset64
}
// Write adds data to the running hash.
func (s *InlineFNV64a) Write(data []byte) (int, error) {
hash := uint64(*s)
for _, c := range data {
hash ^= uint64(c)
hash *= prime64
}
*s = InlineFNV64a(hash)
return len(data), nil
}
// Sum64 returns the uint64 of the current resulting hash.
func (s *InlineFNV64a) Sum64() uint64 {
return uint64(*s)
}

View file

@ -0,0 +1,38 @@
package models // import "github.com/influxdata/influxdb/models"
import (
"reflect"
"strconv"
"unsafe"
)
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseInt(s, base, bitSize)
}
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
s := unsafeBytesToString(b)
return strconv.ParseFloat(s, bitSize)
}
// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
func parseBoolBytes(b []byte) (bool, error) {
return strconv.ParseBool(unsafeBytesToString(b))
}
// unsafeBytesToString converts a []byte to a string without a heap allocation.
//
// It is unsafe, and is intended to prepare input to short-lived functions
// that require strings.
func unsafeBytesToString(in []byte) string {
src := *(*reflect.SliceHeader)(unsafe.Pointer(&in))
dst := reflect.StringHeader{
Data: src.Data,
Len: src.Len,
}
s := *(*string)(unsafe.Pointer(&dst))
return s
}

2035
vendor/github.com/influxdata/influxdb/models/points.go generated vendored Normal file

File diff suppressed because it is too large Load diff

62
vendor/github.com/influxdata/influxdb/models/rows.go generated vendored Normal file
View file

@ -0,0 +1,62 @@
package models
import (
"sort"
)
// Row represents a single row returned from the execution of a statement.
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
Partial bool `json:"partial,omitempty"`
}
// SameSeries returns true if r contains values for the same series as o.
func (r *Row) SameSeries(o *Row) bool {
return r.tagsHash() == o.tagsHash() && r.Name == o.Name
}
// tagsHash returns a hash of tag key/value pairs.
func (r *Row) tagsHash() uint64 {
h := NewInlineFNV64a()
keys := r.tagsKeys()
for _, k := range keys {
h.Write([]byte(k))
h.Write([]byte(r.Tags[k]))
}
return h.Sum64()
}
// tagKeys returns a sorted list of tag keys.
func (r *Row) tagsKeys() []string {
a := make([]string, 0, len(r.Tags))
for k := range r.Tags {
a = append(a, k)
}
sort.Strings(a)
return a
}
// Rows represents a collection of rows. Rows implements sort.Interface.
type Rows []*Row
// Len implements sort.Interface.
func (p Rows) Len() int { return len(p) }
// Less implements sort.Interface.
func (p Rows) Less(i, j int) bool {
// Sort by name first.
if p[i].Name != p[j].Name {
return p[i].Name < p[j].Name
}
// Sort by tag set hash. Tags don't have a meaningful sort order so we
// just compute a hash and sort by that instead. This allows the tests
// to receive rows in a predictable order every time.
return p[i].tagsHash() < p[j].tagsHash()
}
// Swap implements sort.Interface.
func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View file

@ -0,0 +1,42 @@
package models
// Statistic is the representation of a statistic used by the monitoring service.
type Statistic struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}
// NewStatistic returns an initialized Statistic.
func NewStatistic(name string) Statistic {
return Statistic{
Name: name,
Tags: make(map[string]string),
Values: make(map[string]interface{}),
}
}
// StatisticTags is a map that can be merged with others without causing
// mutations to either map.
type StatisticTags map[string]string
// Merge creates a new map containing the merged contents of tags and t.
// If both tags and the receiver map contain the same key, the value in tags
// is used in the resulting map.
//
// Merge always returns a usable map.
func (t StatisticTags) Merge(tags map[string]string) map[string]string {
// Add everything in tags to the result.
out := make(map[string]string, len(tags))
for k, v := range tags {
out[k] = v
}
// Only add values from t that don't appear in tags.
for k, v := range t {
if _, ok := tags[k]; !ok {
out[k] = v
}
}
return out
}

74
vendor/github.com/influxdata/influxdb/models/time.go generated vendored Normal file
View file

@ -0,0 +1,74 @@
package models
// Helper time methods since parsing time can easily overflow and we only support a
// specific time range.
import (
"fmt"
"math"
"time"
)
const (
// MinNanoTime is the minumum time that can be represented.
//
// 1677-09-21 00:12:43.145224194 +0000 UTC
//
// The two lowest minimum integers are used as sentinel values. The
// minimum value needs to be used as a value lower than any other value for
// comparisons and another separate value is needed to act as a sentinel
// default value that is unusable by the user, but usable internally.
// Because these two values need to be used for a special purpose, we do
// not allow users to write points at these two times.
MinNanoTime = int64(math.MinInt64) + 2
// MaxNanoTime is the maximum time that can be represented.
//
// 2262-04-11 23:47:16.854775806 +0000 UTC
//
// The highest time represented by a nanosecond needs to be used for an
// exclusive range in the shard group, so the maximum time needs to be one
// less than the possible maximum number of nanoseconds representable by an
// int64 so that we don't lose a point at that one time.
MaxNanoTime = int64(math.MaxInt64) - 1
)
var (
minNanoTime = time.Unix(0, MinNanoTime).UTC()
maxNanoTime = time.Unix(0, MaxNanoTime).UTC()
// ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch.
ErrTimeOutOfRange = fmt.Errorf("time outside range %d - %d", MinNanoTime, MaxNanoTime)
)
// SafeCalcTime safely calculates the time given. Will return error if the time is outside the
// supported range.
func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
mult := GetPrecisionMultiplier(precision)
if t, ok := safeSignedMult(timestamp, mult); ok {
tme := time.Unix(0, t).UTC()
return tme, CheckTime(tme)
}
return time.Time{}, ErrTimeOutOfRange
}
// CheckTime checks that a time is within the safe range.
func CheckTime(t time.Time) error {
if t.Before(minNanoTime) || t.After(maxNanoTime) {
return ErrTimeOutOfRange
}
return nil
}
// Perform the multiplication and check to make sure it didn't overflow.
func safeSignedMult(a, b int64) (int64, bool) {
if a == 0 || b == 0 || a == 1 || b == 1 {
return a * b, true
}
if a == MinNanoTime || b == MaxNanoTime {
return 0, false
}
c := a * b
return c, c/b == a
}

View file

@ -0,0 +1,111 @@
// Package escape contains utilities for escaping parts of InfluxQL
// and InfluxDB line protocol.
package escape // import "github.com/influxdata/influxdb/pkg/escape"
import (
"bytes"
"strings"
)
// Codes is a map of bytes to be escaped.
var Codes = map[byte][]byte{
',': []byte(`\,`),
'"': []byte(`\"`),
' ': []byte(`\ `),
'=': []byte(`\=`),
}
// Bytes escapes characters on the input slice, as defined by Codes.
func Bytes(in []byte) []byte {
for b, esc := range Codes {
in = bytes.Replace(in, []byte{b}, esc, -1)
}
return in
}
const escapeChars = `," =`
// IsEscaped returns whether b has any escaped characters,
// i.e. whether b seems to have been processed by Bytes.
func IsEscaped(b []byte) bool {
for len(b) > 0 {
i := bytes.IndexByte(b, '\\')
if i < 0 {
return false
}
if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 {
return true
}
b = b[i+1:]
}
return false
}
// AppendUnescaped appends the unescaped version of src to dst
// and returns the resulting slice.
func AppendUnescaped(dst, src []byte) []byte {
var pos int
for len(src) > 0 {
next := bytes.IndexByte(src[pos:], '\\')
if next < 0 || pos+next+1 >= len(src) {
return append(dst, src...)
}
if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 {
if pos+next > 0 {
dst = append(dst, src[:pos+next]...)
}
src = src[pos+next+1:]
pos = 0
} else {
pos += next + 1
}
}
return dst
}
// Unescape returns a new slice containing the unescaped version of in.
func Unescape(in []byte) []byte {
if len(in) == 0 {
return nil
}
if bytes.IndexByte(in, '\\') == -1 {
return in
}
i := 0
inLen := len(in)
var out []byte
for {
if i >= inLen {
break
}
if in[i] == '\\' && i+1 < inLen {
switch in[i+1] {
case ',':
out = append(out, ',')
i += 2
continue
case '"':
out = append(out, '"')
i += 2
continue
case ' ':
out = append(out, ' ')
i += 2
continue
case '=':
out = append(out, '=')
i += 2
continue
}
}
out = append(out, in[i])
i += 1
}
return out
}

View file

@ -0,0 +1,21 @@
package escape
import "strings"
var (
escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`)
unescaper = strings.NewReplacer(`\,`, `,`, `\"`, `"`, `\ `, ` `, `\=`, `=`)
)
// UnescapeString returns unescaped version of in.
func UnescapeString(in string) string {
if strings.IndexByte(in, '\\') == -1 {
return in
}
return unescaper.Replace(in)
}
// String returns the escaped version of in.
func String(in string) string {
return escaper.Replace(in)
}

View file

@ -1,180 +0,0 @@
package client
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/influxdb/influxdb/tsdb"
)
const (
// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
DefaultTimeout = 0
)
// Config is used to specify what server to connect to.
// URL: The URL of the server connecting to.
// Username/Password are optional. They will be passed via basic auth if provided.
// UserAgent: If not provided, will default "InfluxDBClient",
// Timeout: If not provided, will default to 0 (no timeout)
type Config struct {
URL url.URL
Username string
Password string
UserAgent string
Timeout time.Duration
Precision string
}
// NewConfig will create a config to be used in connecting to the client
func NewConfig() Config {
return Config{
Timeout: DefaultTimeout,
}
}
// Client is used to make calls to the server.
type Client struct {
url url.URL
username string
password string
httpClient *http.Client
userAgent string
precision string
}
const (
ConsistencyOne = "one"
ConsistencyAll = "all"
ConsistencyQuorum = "quorum"
ConsistencyAny = "any"
)
// NewClient will instantiate and return a connected client to issue commands to the server.
func NewClient(c Config) (*Client, error) {
client := Client{
url: c.URL,
username: c.Username,
password: c.Password,
httpClient: &http.Client{Timeout: c.Timeout},
userAgent: c.UserAgent,
precision: c.Precision,
}
if client.userAgent == "" {
client.userAgent = "InfluxDBClient"
}
return &client, nil
}
// Write takes BatchPoints and allows for writing of multiple points with defaults
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) Write(bp BatchPoints) (*Response, error) {
u := c.url
u.Path = "write"
var b bytes.Buffer
for _, p := range bp.Points {
if p.Raw != "" {
if _, err := b.WriteString(p.Raw); err != nil {
return nil, err
}
} else {
for k, v := range bp.Tags {
if p.Tags == nil {
p.Tags = make(map[string]string, len(bp.Tags))
}
p.Tags[k] = v
}
if _, err := b.WriteString(p.MarshalString()); err != nil {
return nil, err
}
}
if err := b.WriteByte('\n'); err != nil {
return nil, err
}
}
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.userAgent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", bp.Database)
params.Set("rp", bp.RetentionPolicy)
params.Set("precision", bp.Precision)
params.Set("consistency", bp.WriteConsistency)
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = fmt.Errorf(string(body))
response.Err = err
return &response, err
}
return nil, nil
}
// Structs
// Response represents a list of statement results.
type Response struct {
Err error
}
// Point defines the fields that will be written to the database
// Measurement, Time, and Fields are required
// Precision can be specified if the time is in epoch format (integer).
// Valid values for Precision are n, u, ms, s, m, and h
type Point struct {
Measurement string
Tags map[string]string
Time time.Time
Fields map[string]interface{}
Precision string
Raw string
}
func (p *Point) MarshalString() string {
return tsdb.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time).String()
}
// BatchPoints is used to send batched data in a single write.
// Database and Points are required
// If no retention policy is specified, it will use the databases default retention policy.
// If tags are specified, they will be "merged" with all points. If a point already has that tag, it is ignored.
// If time is specified, it will be applied to any point with an empty time.
// Precision can be specified if the time is in epoch format (integer).
// Valid values for Precision are n, u, ms, s, m, and h
type BatchPoints struct {
Points []Point `json:"points,omitempty"`
Database string `json:"database,omitempty"`
RetentionPolicy string `json:"retentionPolicy,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Time time.Time `json:"time,omitempty"`
Precision string `json:"precision,omitempty"`
WriteConsistency string `json:"-"`
}

File diff suppressed because it is too large Load diff

17
vendor/vendor.json vendored
View file

@ -445,14 +445,19 @@
"revisionTime": "2016-10-07T00:41:22Z" "revisionTime": "2016-10-07T00:41:22Z"
}, },
{ {
"path": "github.com/influxdb/influxdb/client", "path": "github.com/influxdata/influxdb/client/v2",
"revision": "291aaeb9485b43b16875c238482b2f7d0a22a13b", "revision": "15e594fc09f112cb696c084a20beaca25538a5fa",
"revisionTime": "2015-09-16T14:41:53+02:00" "revisionTime": "2017-03-31T16:09:02-05:00"
}, },
{ {
"path": "github.com/influxdb/influxdb/tsdb", "path": "github.com/influxdata/influxdb/models",
"revision": "291aaeb9485b43b16875c238482b2f7d0a22a13b", "revision": "15e594fc09f112cb696c084a20beaca25538a5fa",
"revisionTime": "2015-09-16T14:41:53+02:00" "revisionTime": "2017-03-31T16:09:02-05:00"
},
{
"path": "github.com/influxdata/influxdb/pkg/escape",
"revision": "15e594fc09f112cb696c084a20beaca25538a5fa",
"revisionTime": "2017-03-31T16:09:02-05:00"
}, },
{ {
"checksumSHA1": "0ZrwvB6KoGPj2PoDNSEJwxQ6Mog=", "checksumSHA1": "0ZrwvB6KoGPj2PoDNSEJwxQ6Mog=",