Merge pull request #1087 from prometheus/fix-influxdb

Fix InfluxDB write support to work with InfluxDB 0.9.x.
This commit is contained in:
Julius Volz 2015-09-16 17:40:22 +02:00
commit 681eebfaee
8 changed files with 1737 additions and 127 deletions

View file

@ -48,6 +48,7 @@ var cfg = struct {
remote remote.Options
prometheusURL string
influxdbURL string
}{}
func init() {
@ -167,13 +168,17 @@ func init() {
"The URL of the remote OpenTSDB server to send samples to. None, if empty.",
)
cfg.fs.StringVar(
&cfg.remote.InfluxdbURL, "storage.remote.influxdb-url", "",
&cfg.influxdbURL, "storage.remote.influxdb-url", "",
"The URL of the remote InfluxDB server to send samples to. None, if empty.",
)
cfg.fs.StringVar(
&cfg.remote.InfluxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default",
"The InfluxDB retention policy to use.",
)
cfg.fs.StringVar(
&cfg.remote.InfluxdbUsername, "storage.remote.influxdb.username", "",
"The username to use when sending samples to InfluxDB.",
)
cfg.fs.StringVar(
&cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus",
"The name of the database to use for storing samples in InfluxDB.",
@ -221,6 +226,20 @@ func parse(args []string) error {
return err
}
if err := parsePrometheusURL(); err != nil {
return err
}
if err := parseInfluxdbURL(); err != nil {
return err
}
cfg.remote.InfluxdbPassword = os.Getenv("INFLUXDB_PW")
return nil
}
func parsePrometheusURL() error {
if cfg.prometheusURL == "" {
hostname, err := os.Hostname()
if err != nil {
@ -244,7 +263,20 @@ func parse(args []string) error {
ppref = "/" + ppref
}
cfg.web.ExternalURL.Path = ppref
return nil
}
func parseInfluxdbURL() error {
if cfg.influxdbURL == "" {
return nil
}
url, err := url.Parse(cfg.influxdbURL)
if err != nil {
return err
}
cfg.remote.InfluxdbURL = url
return nil
}

View file

@ -14,148 +14,96 @@
package influxdb
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"net/url"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/log"
"github.com/prometheus/prometheus/util/httputil"
)
const (
writeEndpoint = "/write"
contentTypeJSON = "application/json"
influx "github.com/influxdb/influxdb/client"
)
// Client allows sending batches of Prometheus samples to InfluxDB.
type Client struct {
url string
httpClient *http.Client
retentionPolicy string
client *influx.Client
database string
retentionPolicy string
ignoredSamples prometheus.Counter
}
// NewClient creates a new Client.
func NewClient(url string, timeout time.Duration, database, retentionPolicy string) *Client {
func NewClient(conf influx.Config, db string, rp string) *Client {
c, err := influx.NewClient(conf)
// Currently influx.NewClient() *should* never return an error.
if err != nil {
log.Fatal(err)
}
return &Client{
url: url,
httpClient: httputil.NewDeadlineClient(timeout, nil),
retentionPolicy: retentionPolicy,
database: database,
client: c,
database: db,
retentionPolicy: rp,
ignoredSamples: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_influxdb_ignored_samples_total",
Help: "The total number of samples not sent to InfluxDB due to unsupported float values (Inf, -Inf, NaN).",
},
),
}
}
// StoreSamplesRequest is used for building a JSON request for storing samples
// in InfluxDB.
type StoreSamplesRequest struct {
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Points []point `json:"points"`
}
// point represents a single InfluxDB measurement.
type point struct {
Timestamp int64 `json:"timestamp"`
Precision string `json:"precision"`
Name model.LabelValue `json:"name"`
Tags model.LabelSet `json:"tags"`
Fields fields `json:"fields"`
}
// fields represents the fields/columns sent to InfluxDB for a given measurement.
type fields struct {
Value model.SampleValue `json:"value"`
}
// tagsFromMetric extracts InfluxDB tags from a Prometheus metric.
func tagsFromMetric(m model.Metric) model.LabelSet {
tags := make(model.LabelSet, len(m)-1)
func tagsFromMetric(m model.Metric) map[string]string {
tags := make(map[string]string, len(m)-1)
for l, v := range m {
if l == model.MetricNameLabel {
continue
if l != model.MetricNameLabel {
tags[string(l)] = string(v)
}
tags[l] = v
}
return tags
}
// Store sends a batch of samples to InfluxDB via its HTTP API.
func (c *Client) Store(samples model.Samples) error {
points := make([]point, 0, len(samples))
points := make([]influx.Point, 0, len(samples))
for _, s := range samples {
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
// TODO(julius): figure out if it's possible to insert special float
// values into InfluxDB somehow.
log.Warnf("cannot send value %f to InfluxDB, skipping sample %#v", v, s)
log.Debugf("cannot send value %f to InfluxDB, skipping sample %#v", v, s)
c.ignoredSamples.Inc()
continue
}
metric := s.Metric[model.MetricNameLabel]
points = append(points, point{
Timestamp: s.Timestamp.UnixNano(),
Precision: "n",
Name: metric,
Tags: tagsFromMetric(s.Metric),
Fields: fields{
Value: s.Value,
points = append(points, influx.Point{
Measurement: string(s.Metric[model.MetricNameLabel]),
Tags: tagsFromMetric(s.Metric),
Time: s.Timestamp.Time(),
Precision: "ms",
Fields: map[string]interface{}{
"value": v,
},
})
}
u, err := url.Parse(c.url)
if err != nil {
return err
}
u.Path = writeEndpoint
req := StoreSamplesRequest{
bps := influx.BatchPoints{
Points: points,
Database: c.database,
RetentionPolicy: c.retentionPolicy,
Points: points,
}
buf, err := json.Marshal(req)
if err != nil {
return err
}
resp, err := c.httpClient.Post(
u.String(),
contentTypeJSON,
bytes.NewBuffer(buf),
)
if err != nil {
return err
}
defer resp.Body.Close()
// API returns status code 200 for successful writes.
// http://influxdb.com/docs/v0.9/concepts/reading_and_writing_data.html#response
if resp.StatusCode == http.StatusOK {
return nil
}
// API returns error details in the response content in JSON.
buf, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
var r map[string]string
if err := json.Unmarshal(buf, &r); err != nil {
return err
}
return fmt.Errorf("failed to write samples into InfluxDB. Error: %s", r["error"])
_, err := c.client.Write(bps)
return err
}
// Name identifies the client as an InfluxDB client.
func (c Client) Name() string {
return "influxdb"
}
// Describe implements prometheus.Collector.
func (c *Client) Describe(ch chan<- *prometheus.Desc) {
ch <- c.ignoredSamples.Desc()
}
// Collect implements prometheus.Collector.
func (c *Client) Collect(ch chan<- prometheus.Metric) {
ch <- c.ignoredSamples
}

View file

@ -18,9 +18,12 @@ import (
"math"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
influx "github.com/influxdb/influxdb/client"
"github.com/prometheus/common/model"
)
@ -44,43 +47,63 @@ func TestClient(t *testing.T) {
},
{
Metric: model.Metric{
model.MetricNameLabel: "special_float_value",
model.MetricNameLabel: "nan_value",
},
Timestamp: model.Time(123456789123),
Value: model.SampleValue(math.NaN()),
},
{
Metric: model.Metric{
model.MetricNameLabel: "pos_inf_value",
},
Timestamp: model.Time(123456789123),
Value: model.SampleValue(math.Inf(1)),
},
{
Metric: model.Metric{
model.MetricNameLabel: "neg_inf_value",
},
Timestamp: model.Time(123456789123),
Value: model.SampleValue(math.Inf(-1)),
},
}
expectedJSON := `{"database":"prometheus","retentionPolicy":"default","points":[{"timestamp":123456789123000000,"precision":"n","name":"testmetric","tags":{"test_label":"test_label_value1"},"fields":{"value":"1.23"}},{"timestamp":123456789123000000,"precision":"n","name":"testmetric","tags":{"test_label":"test_label_value2"},"fields":{"value":"5.1234"}}]}`
expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123000000
testmetric,test_label=test_label_value2 value=5.1234 123456789123000000
`
server := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
t.Fatalf("Unexpected method; expected POST, got %s", r.Method)
}
if r.URL.Path != writeEndpoint {
t.Fatalf("Unexpected path; expected %s, got %s", writeEndpoint, r.URL.Path)
}
ct := r.Header["Content-Type"]
if len(ct) != 1 {
t.Fatalf("Unexpected number of 'Content-Type' headers; got %d, want 1", len(ct))
}
if ct[0] != contentTypeJSON {
t.Fatalf("Unexpected 'Content-type'; expected %s, got %s", contentTypeJSON, ct[0])
if r.URL.Path != "/write" {
t.Fatalf("Unexpected path; expected %s, got %s", "/write", r.URL.Path)
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatalf("Error reading body: %s", err)
}
if string(b) != expectedJSON {
t.Fatalf("Unexpected request body; expected:\n\n%s\n\ngot:\n\n%s", expectedJSON, string(b))
if string(b) != expectedBody {
t.Fatalf("Unexpected request body; expected:\n\n%s\n\ngot:\n\n%s", expectedBody, string(b))
}
},
))
defer server.Close()
c := NewClient(server.URL, time.Minute, "prometheus", "default")
serverURL, err := url.Parse(server.URL)
if err != nil {
t.Fatalf("Unable to parse server URL %s: %s", server.URL, err)
}
conf := influx.Config{
URL: *serverURL,
Username: "testuser",
Password: "testpass",
Timeout: time.Minute,
}
c := NewClient(conf, "test_db", "default")
if err := c.Store(samples); err != nil {
t.Fatalf("Error sending samples: %s", err)

View file

@ -62,7 +62,8 @@ type StorageQueueManager struct {
samplesCount *prometheus.CounterVec
sendLatency prometheus.Summary
sendErrors prometheus.Counter
failedBatches prometheus.Counter
failedSamples prometheus.Counter
queueLength prometheus.Gauge
queueCapacity prometheus.Metric
}
@ -92,15 +93,22 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue
sendLatency: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_latency_milliseconds",
Name: "send_latency_seconds",
Help: "Latency quantiles for sending sample batches to the remote storage.",
ConstLabels: constLabels,
}),
sendErrors: prometheus.NewCounter(prometheus.CounterOpts{
failedBatches: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_errors_total",
Help: "Total number of errors sending sample batches to the remote storage.",
Name: "failed_batches_total",
Help: "Total number of sample batches that encountered an error while being sent to the remote storage.",
ConstLabels: constLabels,
}),
failedSamples: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "failed_samples_total",
Help: "Total number of samples that encountered an error while being sent to the remote storage.",
ConstLabels: constLabels,
}),
queueLength: prometheus.NewGauge(prometheus.GaugeOpts{
@ -151,6 +159,8 @@ func (t *StorageQueueManager) Stop() {
func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) {
t.samplesCount.Describe(ch)
t.sendLatency.Describe(ch)
ch <- t.failedBatches.Desc()
ch <- t.failedSamples.Desc()
ch <- t.queueLength.Desc()
ch <- t.queueCapacity.Desc()
}
@ -160,6 +170,8 @@ func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) {
t.samplesCount.Collect(ch)
t.sendLatency.Collect(ch)
t.queueLength.Set(float64(len(t.queue)))
ch <- t.failedBatches
ch <- t.failedSamples
ch <- t.queueLength
ch <- t.queueCapacity
}
@ -175,13 +187,14 @@ func (t *StorageQueueManager) sendSamples(s model.Samples) {
// floor.
begin := time.Now()
err := t.tsdb.Store(s)
duration := time.Since(begin) / time.Millisecond
duration := time.Since(begin) / time.Second
labelValue := success
if err != nil {
log.Warnf("error sending %d samples to remote storage: %s", len(s), err)
labelValue = failure
t.sendErrors.Inc()
t.failedBatches.Inc()
t.failedSamples.Add(float64(len(s)))
}
t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s)))
t.sendLatency.Observe(float64(duration))

View file

@ -14,12 +14,15 @@
package remote
import (
"net/url"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
influx "github.com/influxdb/influxdb/client"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage/remote/influxdb"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
@ -49,8 +52,15 @@ func New(o *Options) *Storage {
c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
}
if o.InfluxdbURL != "" {
c := influxdb.NewClient(o.InfluxdbURL, o.StorageTimeout, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy)
if o.InfluxdbURL != nil {
conf := influx.Config{
URL: *o.InfluxdbURL,
Username: o.InfluxdbUsername,
Password: o.InfluxdbPassword,
Timeout: o.StorageTimeout,
}
c := influxdb.NewClient(conf, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy)
prometheus.MustRegister(c)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
}
if len(s.queues) == 0 {
@ -62,8 +72,10 @@ func New(o *Options) *Storage {
// Options contains configuration parameters for a remote storage.
type Options struct {
StorageTimeout time.Duration
InfluxdbURL string
InfluxdbURL *url.URL
InfluxdbRetentionPolicy string
InfluxdbUsername string
InfluxdbPassword string
InfluxdbDatabase string
OpentsdbURL string
}

180
vendor/github.com/influxdb/influxdb/client/influxdb.go generated vendored Normal file
View file

@ -0,0 +1,180 @@
package client
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/influxdb/influxdb/tsdb"
)
const (
// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
DefaultTimeout = 0
)
// Config is used to specify what server to connect to.
// URL: The URL of the server connecting to.
// Username/Password are optional. They will be passed via basic auth if provided.
// UserAgent: If not provided, will default "InfluxDBClient",
// Timeout: If not provided, will default to 0 (no timeout)
type Config struct {
URL url.URL
Username string
Password string
UserAgent string
Timeout time.Duration
Precision string
}
// NewConfig will create a config to be used in connecting to the client
func NewConfig() Config {
return Config{
Timeout: DefaultTimeout,
}
}
// Client is used to make calls to the server.
type Client struct {
url url.URL
username string
password string
httpClient *http.Client
userAgent string
precision string
}
const (
ConsistencyOne = "one"
ConsistencyAll = "all"
ConsistencyQuorum = "quorum"
ConsistencyAny = "any"
)
// NewClient will instantiate and return a connected client to issue commands to the server.
func NewClient(c Config) (*Client, error) {
client := Client{
url: c.URL,
username: c.Username,
password: c.Password,
httpClient: &http.Client{Timeout: c.Timeout},
userAgent: c.UserAgent,
precision: c.Precision,
}
if client.userAgent == "" {
client.userAgent = "InfluxDBClient"
}
return &client, nil
}
// Write takes BatchPoints and allows for writing of multiple points with defaults
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) Write(bp BatchPoints) (*Response, error) {
u := c.url
u.Path = "write"
var b bytes.Buffer
for _, p := range bp.Points {
if p.Raw != "" {
if _, err := b.WriteString(p.Raw); err != nil {
return nil, err
}
} else {
for k, v := range bp.Tags {
if p.Tags == nil {
p.Tags = make(map[string]string, len(bp.Tags))
}
p.Tags[k] = v
}
if _, err := b.WriteString(p.MarshalString()); err != nil {
return nil, err
}
}
if err := b.WriteByte('\n'); err != nil {
return nil, err
}
}
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.userAgent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", bp.Database)
params.Set("rp", bp.RetentionPolicy)
params.Set("precision", bp.Precision)
params.Set("consistency", bp.WriteConsistency)
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = fmt.Errorf(string(body))
response.Err = err
return &response, err
}
return nil, nil
}
// Structs
// Response represents a list of statement results.
type Response struct {
Err error
}
// Point defines the fields that will be written to the database
// Measurement, Time, and Fields are required
// Precision can be specified if the time is in epoch format (integer).
// Valid values for Precision are n, u, ms, s, m, and h
type Point struct {
Measurement string
Tags map[string]string
Time time.Time
Fields map[string]interface{}
Precision string
Raw string
}
func (p *Point) MarshalString() string {
return tsdb.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time).String()
}
// BatchPoints is used to send batched data in a single write.
// Database and Points are required
// If no retention policy is specified, it will use the databases default retention policy.
// If tags are specified, they will be "merged" with all points. If a point already has that tag, it is ignored.
// If time is specified, it will be applied to any point with an empty time.
// Precision can be specified if the time is in epoch format (integer).
// Valid values for Precision are n, u, ms, s, m, and h
type BatchPoints struct {
Points []Point `json:"points,omitempty"`
Database string `json:"database,omitempty"`
RetentionPolicy string `json:"retentionPolicy,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Time time.Time `json:"time,omitempty"`
Precision string `json:"precision,omitempty"`
WriteConsistency string `json:"-"`
}

1392
vendor/github.com/influxdb/influxdb/tsdb/points.go generated vendored Normal file

File diff suppressed because it is too large Load diff

12
vendor/vendor.json vendored
View file

@ -32,6 +32,16 @@
"revision": "34f98f7bdf2eec7517e3aac44691566963152721",
"revisionTime": "2015-09-08T11:01:49-04:00"
},
{
"path": "github.com/influxdb/influxdb/client",
"revision": "291aaeb9485b43b16875c238482b2f7d0a22a13b",
"revisionTime": "2015-09-16T14:41:53+02:00"
},
{
"path": "github.com/influxdb/influxdb/tsdb",
"revision": "291aaeb9485b43b16875c238482b2f7d0a22a13b",
"revisionTime": "2015-09-16T14:41:53+02:00"
},
{
"path": "github.com/julienschmidt/httprouter",
"revision": "109e267447e95ad1bb48b758e40dd7453eb7b039",
@ -163,4 +173,4 @@
"revisionTime": "2015-06-24T11:29:02+01:00"
}
]
}
}