Merge branch 'master' into dev-2.0

This commit is contained in:
Fabian Reinartz 2017-04-04 14:44:39 +02:00
commit 8ffc851147
52 changed files with 3926 additions and 2946 deletions

View file

@ -1,3 +1,5 @@
[NOTICE]: <> (If your question is around usage and not a bug in Prometheus please use: https://groups.google.com/forum/#!forum/prometheus-users)
**What did you do?**
**What did you expect to see?**

View file

@ -3,7 +3,7 @@ sudo: false
language: go
go:
- 1.7.4
- 1.8
go_import_path: github.com/prometheus/prometheus

View file

@ -12,7 +12,7 @@
# limitations under the License.
GO := GO15VENDOREXPERIMENT=1 go
FIRST_GOPATH := $(firstword $(subst :, ,$(GOPATH)))
FIRST_GOPATH := $(firstword $(subst :, ,$(shell $(GO) env GOPATH)))
PROMU := $(FIRST_GOPATH)/bin/promu
pkgs = $(shell $(GO) list ./... | grep -v /vendor/)

View file

@ -28,7 +28,7 @@ Prometheus' main distinguishing features as compared to other monitoring systems
## Architecture overview
![](https://cdn.rawgit.com/prometheus/prometheus/e761f0d/documentation/images/architecture.svg)
![](https://cdn.rawgit.com/prometheus/prometheus/c34257d069c630685da35bcef084632ffd5d6209/documentation/images/architecture.svg)
## Install

View file

@ -2,7 +2,7 @@ machine:
environment:
DOCKER_IMAGE_NAME: prom/prometheus
QUAY_IMAGE_NAME: quay.io/prometheus/prometheus
DOCKER_TEST_IMAGE_NAME: quay.io/prometheus/golang-builder:1.7-base
DOCKER_TEST_IMAGE_NAME: quay.io/prometheus/golang-builder:1.8-base
REPO_PATH: github.com/prometheus/prometheus
pre:
- sudo curl -L -o /usr/bin/docker 'https://s3-external-1.amazonaws.com/circle-downloads/docker-1.9.1-circleci'

View file

@ -26,6 +26,7 @@ import (
"unicode"
"github.com/asaskevich/govalidator"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
@ -53,8 +54,15 @@ var cfg = struct {
alertmanagerURLs stringset
prometheusURL string
// Deprecated storage flags, kept for backwards compatibility.
deprecatedMemoryChunks uint64
deprecatedMaxChunksToPersist uint64
}{
alertmanagerURLs: stringset{},
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
},
}
func init() {

View file

@ -20,6 +20,7 @@ import (
_ "net/http/pprof" // Comment this line to disable pprof endpoint.
"os"
"os/signal"
"runtime/debug"
"syscall"
"time"
@ -41,6 +42,13 @@ func main() {
os.Exit(Main())
}
// defaultGCPercent is the value used to to call SetGCPercent if the GOGC
// environment variable is not set or empty. The value here is intended to hit
// the sweet spot between memory utilization and GC effort. It is lower than the
// usual default of 100 as a lot of the heap in Prometheus is used to cache
// memory chunks, which have a lifetime of hours if not days or weeks.
const defaultGCPercent = 40
var (
configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "prometheus",
@ -70,6 +78,10 @@ func Main() int {
return 0
}
if os.Getenv("GOGC") == "" {
debug.SetGCPercent(defaultGCPercent)
}
log.Infoln("Starting prometheus", version.Info())
log.Infoln("Build context", version.BuildContext())
@ -167,7 +179,6 @@ func Main() int {
// defer remoteStorage.Stop()
prometheus.MustRegister(notifier)
prometheus.MustRegister(configSuccess)
prometheus.MustRegister(configSuccessTime)

View file

@ -167,6 +167,11 @@ var (
DefaultRemoteWriteConfig = RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second),
}
// DefaultRemoteReadConfig is the default remote read configuration.
DefaultRemoteReadConfig = RemoteReadConfig{
RemoteTimeout: model.Duration(1 * time.Minute),
}
)
// URL is a custom URL type that allows validation at configuration load time.
@ -205,6 +210,7 @@ type Config struct {
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`
RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
@ -1296,15 +1302,16 @@ func (re Regexp) MarshalYAML() (interface{}, error) {
return nil, nil
}
// RemoteWriteConfig is the configuration for remote storage.
// RemoteWriteConfig is the configuration for writing to remote storage.
type RemoteWriteConfig struct {
URL *URL `yaml:"url,omitempty"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
ProxyURL URL `yaml:"proxy_url,omitempty"`
WriteRelabelConfigs []*RelabelConfig `yaml:"write_relabel_configs,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
HTTPClientConfig HTTPClientConfig `yaml:",inline"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
@ -1321,3 +1328,29 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
}
return nil
}
// RemoteReadConfig is the configuration for reading from remote storage.
type RemoteReadConfig struct {
URL *URL `yaml:"url,omitempty"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
HTTPClientConfig HTTPClientConfig `yaml:",inline"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *RemoteReadConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultRemoteReadConfig
type plain RemoteReadConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
if err := checkOverflow(c.XXX, "remote_read"); err != nil {
return err
}
return nil
}

View file

@ -1,10 +1,13 @@
# Remote storage bridge
This is a bridge that receives samples in Prometheus's remote storage
format and forwards them to Graphite, InfluxDB, or OpenTSDB. It is meant
This is a bridge that receives samples via Prometheus's remote write
protocol and stores them in Graphite, InfluxDB, or OpenTSDB. It is meant
as a replacement for the built-in specific remote storage implementations
that have been removed from Prometheus.
For InfluxDB, this bridge also supports reading back data through
Prometheus via Prometheus's remote read protocol.
## Building
```
@ -13,10 +16,22 @@ go build
## Running
Example:
Graphite example:
```
./remote_storage_bridge -graphite-address=localhost:8080 -opentsdb-url=http://localhost:8081/
./remote_storage_bridge -graphite-address=localhost:8080
```
OpenTSDB example:
```
./remote_storage_bridge -opentsdb-url=http://localhost:8081/
```
InfluxDB example:
```
./remote_storage_bridge -influxdb-url=http://localhost:8086/ -influxdb.database=prometheus -influxdb.retention-policy=autogen
```
To show all flags:
@ -30,6 +45,11 @@ To show all flags:
To configure Prometheus to send samples to this bridge, add the following to your `prometheus.yml`:
```yaml
# Remote write configuration (for Graphite, OpenTSDB, or InfluxDB).
remote_write:
url: "http://localhost:9201/receive"
- url: "http://localhost:9201/write"
# Remote read configuration (for InfluxDB only at the moment).
remote_read:
- url: "http://localhost:9201/read"
```

View file

@ -72,8 +72,8 @@ func pathFromMetric(m model.Metric, prefix string) string {
return buffer.String()
}
// Store sends a batch of samples to Graphite.
func (c *Client) Store(samples model.Samples) error {
// Write sends a batch of samples to Graphite.
func (c *Client) Write(samples model.Samples) error {
conn, err := net.DialTimeout(c.transport, c.address, c.timeout)
if err != nil {
return err

View file

@ -14,26 +14,30 @@
package influxdb
import (
"encoding/json"
"fmt"
"math"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote"
influx "github.com/influxdb/influxdb/client"
influx "github.com/influxdata/influxdb/client/v2"
)
// Client allows sending batches of Prometheus samples to InfluxDB.
type Client struct {
client *influx.Client
client influx.Client
database string
retentionPolicy string
ignoredSamples prometheus.Counter
}
// NewClient creates a new Client.
func NewClient(conf influx.Config, db string, rp string) *Client {
c, err := influx.NewClient(conf)
func NewClient(conf influx.HTTPConfig, db string, rp string) *Client {
c, err := influx.NewHTTPClient(conf)
// Currently influx.NewClient() *should* never return an error.
if err != nil {
log.Fatal(err)
@ -63,9 +67,9 @@ func tagsFromMetric(m model.Metric) map[string]string {
return tags
}
// Store sends a batch of samples to InfluxDB via its HTTP API.
func (c *Client) Store(samples model.Samples) error {
points := make([]influx.Point, 0, len(samples))
// Write sends a batch of samples to InfluxDB via its HTTP API.
func (c *Client) Write(samples model.Samples) error {
points := make([]*influx.Point, 0, len(samples))
for _, s := range samples {
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
@ -73,24 +77,221 @@ func (c *Client) Store(samples model.Samples) error {
c.ignoredSamples.Inc()
continue
}
points = append(points, influx.Point{
Measurement: string(s.Metric[model.MetricNameLabel]),
Tags: tagsFromMetric(s.Metric),
Time: s.Timestamp.Time(),
Precision: "ms",
Fields: map[string]interface{}{
"value": v,
},
})
p, err := influx.NewPoint(
string(s.Metric[model.MetricNameLabel]),
tagsFromMetric(s.Metric),
map[string]interface{}{"value": v},
s.Timestamp.Time(),
)
if err != nil {
return err
}
points = append(points, p)
}
bps := influx.BatchPoints{
Points: points,
bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{
Precision: "ms",
Database: c.database,
RetentionPolicy: c.retentionPolicy,
})
if err != nil {
return err
}
_, err := c.client.Write(bps)
return err
bps.AddPoints(points)
return c.client.Write(bps)
}
func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) {
labelsToSeries := map[string]*remote.TimeSeries{}
for _, q := range req.Queries {
command, err := buildCommand(q)
if err != nil {
return nil, err
}
query := influx.NewQuery(command, c.database, "ms")
resp, err := c.client.Query(query)
if err != nil {
return nil, err
}
if resp.Err != "" {
return nil, fmt.Errorf(resp.Err)
}
if err = mergeResult(labelsToSeries, resp.Results); err != nil {
return nil, err
}
}
resp := remote.ReadResponse{
Timeseries: make([]*remote.TimeSeries, 0, len(labelsToSeries)),
}
for _, ts := range labelsToSeries {
resp.Timeseries = append(resp.Timeseries, ts)
}
return &resp, nil
}
func buildCommand(q *remote.Query) (string, error) {
matchers := make([]string, 0, len(q.Matchers))
// If we don't find a metric name matcher, query all metrics
// (InfluxDB measurements) by default.
from := "FROM /.+/"
for _, m := range q.Matchers {
if m.Name == model.MetricNameLabel {
switch m.Type {
case remote.MatchType_EQUAL:
from = fmt.Sprintf("FROM %q", m.Value)
case remote.MatchType_REGEX_MATCH:
from = fmt.Sprintf("FROM /^%s$/", escapeSlashes(m.Value))
default:
// TODO: Figure out how to support these efficiently.
return "", fmt.Errorf("non-equal or regex-non-equal matchers are not supported on the metric name yet")
}
continue
}
switch m.Type {
case remote.MatchType_EQUAL:
matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value)))
case remote.MatchType_NOT_EQUAL:
matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value)))
case remote.MatchType_REGEX_MATCH:
matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value)))
case remote.MatchType_REGEX_NO_MATCH:
matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value)))
default:
return "", fmt.Errorf("unknown match type %v", m.Type)
}
}
matchers = append(matchers, fmt.Sprintf("time >= %vms", q.StartTimestampMs))
matchers = append(matchers, fmt.Sprintf("time <= %vms", q.EndTimestampMs))
return fmt.Sprintf("SELECT value %s WHERE %v GROUP BY *", from, strings.Join(matchers, " AND ")), nil
}
func escapeSingleQuotes(str string) string {
return strings.Replace(str, `'`, `\'`, -1)
}
func escapeSlashes(str string) string {
return strings.Replace(str, `/`, `\/`, -1)
}
func mergeResult(labelsToSeries map[string]*remote.TimeSeries, results []influx.Result) error {
for _, r := range results {
for _, s := range r.Series {
k := concatLabels(s.Tags)
ts, ok := labelsToSeries[k]
if !ok {
ts = &remote.TimeSeries{
Labels: tagsToLabelPairs(s.Name, s.Tags),
}
labelsToSeries[k] = ts
}
samples, err := valuesToSamples(s.Values)
if err != nil {
return err
}
ts.Samples = mergeSamples(ts.Samples, samples)
}
}
return nil
}
func concatLabels(labels map[string]string) string {
// 0xff cannot cannot occur in valid UTF-8 sequences, so use it
// as a separator here.
separator := "\xff"
pairs := make([]string, 0, len(labels))
for k, v := range labels {
pairs = append(pairs, k+separator+v)
}
return strings.Join(pairs, separator)
}
func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair {
pairs := make([]*remote.LabelPair, 0, len(tags))
for k, v := range tags {
if v == "" {
// If we select metrics with different sets of labels names,
// InfluxDB returns *all* possible tag names on all returned
// series, with empty tag values on series where they don't
// apply. In Prometheus, an empty label value is equivalent
// to a non-existent label, so we just skip empty ones here
// to make the result correct.
continue
}
pairs = append(pairs, &remote.LabelPair{
Name: k,
Value: v,
})
}
pairs = append(pairs, &remote.LabelPair{
Name: model.MetricNameLabel,
Value: name,
})
return pairs
}
func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) {
samples := make([]*remote.Sample, 0, len(values))
for _, v := range values {
if len(v) != 2 {
return nil, fmt.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v)
}
jsonTimestamp, ok := v[0].(json.Number)
if !ok {
return nil, fmt.Errorf("bad timestamp: %v", v[0])
}
jsonValue, ok := v[1].(json.Number)
if !ok {
return nil, fmt.Errorf("bad sample value: %v", v[1])
}
timestamp, err := jsonTimestamp.Int64()
if err != nil {
return nil, fmt.Errorf("unable to convert sample timestamp to int64: %v", err)
}
value, err := jsonValue.Float64()
if err != nil {
return nil, fmt.Errorf("unable to convert sample value to float64: %v", err)
}
samples = append(samples, &remote.Sample{
TimestampMs: timestamp,
Value: value,
})
}
return samples, nil
}
// mergeSamples merges two lists of sample pairs and removes duplicate
// timestamps. It assumes that both lists are sorted by timestamp.
func mergeSamples(a, b []*remote.Sample) []*remote.Sample {
result := make([]*remote.Sample, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].TimestampMs < b[j].TimestampMs {
result = append(result, a[i])
i++
} else if a[i].TimestampMs > b[j].TimestampMs {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
i++
j++
}
}
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
}
// Name identifies the client as an InfluxDB client.

View file

@ -22,7 +22,7 @@ import (
"testing"
"time"
influx "github.com/influxdb/influxdb/client"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/prometheus/common/model"
)
@ -68,8 +68,8 @@ func TestClient(t *testing.T) {
},
}
expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123000000
testmetric,test_label=test_label_value2 value=5.1234 123456789123000000
expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123
testmetric,test_label=test_label_value2 value=5.1234 123456789123
`
server := httptest.NewServer(http.HandlerFunc(
@ -97,15 +97,15 @@ testmetric,test_label=test_label_value2 value=5.1234 123456789123000000
t.Fatalf("Unable to parse server URL %s: %s", server.URL, err)
}
conf := influx.Config{
URL: *serverURL,
conf := influx.HTTPConfig{
Addr: serverURL.String(),
Username: "testuser",
Password: "testpass",
Timeout: time.Minute,
}
c := NewClient(conf, "test_db", "default")
if err := c.Store(samples); err != nil {
if err := c.Write(samples); err != nil {
t.Fatalf("Error sending samples: %s", err)
}
}

View file

@ -16,6 +16,7 @@ package main
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
_ "net/http/pprof"
@ -30,7 +31,7 @@ import (
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
influx "github.com/influxdb/influxdb/client"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/graphite"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/influxdb"
@ -95,8 +96,8 @@ func main() {
cfg := parseFlags()
http.Handle(cfg.telemetryPath, prometheus.Handler())
clients := buildClients(cfg)
serve(cfg.listenAddr, clients)
writers, readers := buildClients(cfg)
serve(cfg.listenAddr, writers, readers)
}
func parseFlags() *config {
@ -119,7 +120,7 @@ func parseFlags() *config {
flag.StringVar(&cfg.influxdbURL, "influxdb-url", "",
"The URL of the remote InfluxDB server to send samples to. None, if empty.",
)
flag.StringVar(&cfg.influxdbRetentionPolicy, "influxdb.retention-policy", "default",
flag.StringVar(&cfg.influxdbRetentionPolicy, "influxdb.retention-policy", "autogen",
"The InfluxDB retention policy to use.",
)
flag.StringVar(&cfg.influxdbUsername, "influxdb.username", "",
@ -139,38 +140,50 @@ func parseFlags() *config {
return cfg
}
func buildClients(cfg *config) []remote.StorageClient {
var clients []remote.StorageClient
type writer interface {
Write(samples model.Samples) error
Name() string
}
type reader interface {
Read(req *remote.ReadRequest) (*remote.ReadResponse, error)
Name() string
}
func buildClients(cfg *config) ([]writer, []reader) {
var writers []writer
var readers []reader
if cfg.graphiteAddress != "" {
c := graphite.NewClient(
cfg.graphiteAddress, cfg.graphiteTransport,
cfg.remoteTimeout, cfg.graphitePrefix)
clients = append(clients, c)
writers = append(writers, c)
}
if cfg.opentsdbURL != "" {
c := opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteTimeout)
clients = append(clients, c)
writers = append(writers, c)
}
if cfg.influxdbURL != "" {
url, err := url.Parse(cfg.influxdbURL)
if err != nil {
log.Fatalf("Failed to parse InfluxDB URL %q: %v", cfg.influxdbURL, err)
}
conf := influx.Config{
URL: *url,
conf := influx.HTTPConfig{
Addr: url.String(),
Username: cfg.influxdbUsername,
Password: cfg.influxdbPassword,
Timeout: cfg.remoteTimeout,
}
c := influxdb.NewClient(conf, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy)
prometheus.MustRegister(c)
clients = append(clients, c)
writers = append(writers, c)
readers = append(readers, c)
}
return clients
return writers, readers
}
func serve(addr string, clients []remote.StorageClient) error {
http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) {
func serve(addr string, writers []writer, readers []reader) error {
http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
@ -187,16 +200,57 @@ func serve(addr string, clients []remote.StorageClient) error {
receivedSamples.Add(float64(len(samples)))
var wg sync.WaitGroup
for _, c := range clients {
for _, w := range writers {
wg.Add(1)
go func(rc remote.StorageClient) {
sendSamples(rc, samples)
go func(rw writer) {
sendSamples(rw, samples)
wg.Done()
}(c)
}(w)
}
wg.Wait()
})
http.HandleFunc("/read", func(w http.ResponseWriter, r *http.Request) {
reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var req remote.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// TODO: Support reading from more than one reader and merging the results.
if len(readers) != 1 {
http.Error(w, fmt.Sprintf("expected exactly one reader, found %d readers", len(readers)), http.StatusInternalServerError)
return
}
reader := readers[0]
var resp *remote.ReadResponse
resp, err = reader.Read(&req)
if err != nil {
log.With("query", req).With("storage", reader.Name()).With("err", err).Warnf("Error executing query")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
data, err := proto.Marshal(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/x-protobuf")
if _, err := snappy.NewWriter(w).Write(data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
})
return http.ListenAndServe(addr, nil)
}
@ -219,14 +273,14 @@ func protoToSamples(req *remote.WriteRequest) model.Samples {
return samples
}
func sendSamples(c remote.StorageClient, samples model.Samples) {
func sendSamples(w writer, samples model.Samples) {
begin := time.Now()
err := c.Store(samples)
err := w.Write(samples)
duration := time.Since(begin).Seconds()
if err != nil {
log.Warnf("Error sending %d samples to remote storage %q: %v", len(samples), c.Name(), err)
failedSamples.WithLabelValues(c.Name()).Add(float64(len(samples)))
log.With("num_samples", len(samples)).With("storage", w.Name()).With("err", err).Warnf("Error sending samples to remote storage")
failedSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))
}
sentSamples.WithLabelValues(c.Name()).Add(float64(len(samples)))
sentBatchDuration.WithLabelValues(c.Name()).Observe(duration)
sentSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))
sentBatchDuration.WithLabelValues(w.Name()).Observe(duration)
}

View file

@ -24,10 +24,9 @@ import (
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/util/httputil"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
)
const (
@ -37,15 +36,15 @@ const (
// Client allows sending batches of Prometheus samples to OpenTSDB.
type Client struct {
url string
httpClient *http.Client
url string
timeout time.Duration
}
// NewClient creates a new Client.
func NewClient(url string, timeout time.Duration) *Client {
return &Client{
url: url,
httpClient: httputil.NewDeadlineClient(timeout, nil),
url: url,
timeout: timeout,
}
}
@ -70,8 +69,8 @@ func tagsFromMetric(m model.Metric) map[string]TagValue {
return tags
}
// Store sends a batch of samples to OpenTSDB via its HTTP API.
func (c *Client) Store(samples model.Samples) error {
// Write sends a batch of samples to OpenTSDB via its HTTP API.
func (c *Client) Write(samples model.Samples) error {
reqs := make([]StoreSamplesRequest, 0, len(samples))
for _, s := range samples {
v := float64(s.Value)
@ -100,11 +99,10 @@ func (c *Client) Store(samples model.Samples) error {
return err
}
resp, err := c.httpClient.Post(
u.String(),
contentTypeJSON,
bytes.NewBuffer(buf),
)
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
resp, err := ctxhttp.Post(ctx, http.DefaultClient, u.String(), contentTypeJSON, bytes.NewBuffer(buf))
if err != nil {
return err
}

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 25 KiB

After

Width:  |  Height:  |  Size: 25 KiB

View file

@ -1 +1 @@
<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36" type="device"><diagram>7R1dd+q48dfkMTkQvpLHEJK7be9u09KebR8dEOBeY1FjbnL31+9I1hh7JHNtPDYhCZzDwWMhSzOj+Za46N2vX79E3mb1q5yL4OK6M3+96E0urq9vB334VIAfCWA46CWAZeTPE1B3D5j6fwgD7Bjozp+Lba5hLGUQ+5s8cCbDUMziHGwhg/wjNt4Su98DpjMvsKG/+/N4lUBvrod7+C/CX67wMd3hbXJnG//APuZi4e2C+FKD4J66vfawLz2r3gMgLJISulHf1q/3IlBIQ4QkU38suJsOMhKhGcjhH/TNKL57wc4M0hr1y8qPxXTjzdT1C1DyojdexesArrrwdbvy5vIFLjpwsQy8rUK++r6NI/lN3MtARrqj3k1HvdM7iEbVycIPgkzLh6F6q/4ib+7DXPBeKEMYxdgMWkSxMMzkmLkGmWl/EXIt4ugHNDE/uL41XGSYrzc01y976l4jbJWh7MjAPMNky7TrPZ7hi0G1G+31sS4jL1QMSzH3+Phwfw+jKMLcEdSae9uVUANXz20A8+l6/hnmuwMG1BuZcwj1gKON+gpT8oJABL960TcBWMlxPUVXf6LeTBgaGkGIGEKey2CoayaSRRAHfgYl8GMt37w40MhbvwIHblZXi0C+zFZeFF9tIjkTit/GLt5mQBsKM1zRLqwZeZhDW48BbUYNnCPaBgRtsOhbQ9vofNE2JGhDqdYG2m4sLIk5WCvmUkbxSi5l6AUPe+g4krtwruW4EuoZHIpwfhdFWik8B3L2LQE9gl4pKevUsw8jEQwzL1oKFFNuvEYi8GL/e74rF5bMT5+kDw/ZC80OUSvIx9hFMgTzq6xNRDtC3VPU0VbuopmwOgIkemoe2GyjGmyLB9xFvUees+eBpMc9R6TIKsUktv15TkyiBSo/k1Da4qrlpi2ufPIcLtoi65wpcbXYb5y4PZQI3MRFEU6ew0ZcM/M6TpmlNMsb/mP95lGSKQlwHQyMfZnRkmimZZUkXZbHKMkug29robE9zKHWOYA5bMKOOeN+nCfm+shQp+C5Eu5TZcxZLv4jBEh4cEVc8VSyZY1YhGVx1efA1Rkb/+hppxrARhtKPm7bv2uM/7P0mfJoQy5qBW285vCrH/9Hga8G5uq/mTtPIvJhgCpmpH/Yin2VGDdZHmE3sKjHkoqGn7hYR9hAaD9zGbjw4Ay91GVKsLaN30RH8Fu/VO3RQDWX9YvqlDyHz7Wxzd/NTjdRvfgzGNswAFyO5/53+LpUXxH0HFEIPM7R7qifEn4EMajgGTaLxNb/w3vWDRQHGlRC68H4YqA5pjf2An8ZAmwGPKKjykqi+pBoujM31v58rtk58J5FMPZm35aasUkkP8/s5c37lHtLZGEerx8P5BIWMoxNVg5cluS6UkTcJOYMxtJkVW4BZdJiLo1y2bnqYyjcMOSl6f3Y9YRN5GKxhTVbT471mvDlsvQlNBuM1JvJkO6bnxwKqLv0NUceoufy3pJ1qfhMcw/Oevj/nUqYwpx7C/3KgpLV+8sE2sGwHuFzOlW/wSWe9FZ6jefXE66EDBEMqPwyd5FfQmtllwFkBe0E/KB4iXOQGngmnwSwfaahaZKlNK7YWpS2U3JPu+1q6cXiRWmmD0qSdGWhseWyltEgY6eJ7cdOV2B0XQYgUOdw43/yWdsIH5I0w7wF1LfXCvrV7HSx84x/1ZQwou3hdQNUEtHHpc2IpuVs4jh1Fgt17IjGNIZKDfAPPio50rQoeguOMhtXrQcLOexIyT+VByEA8HEJQiPyDoK4oqMsBDExmKymj+T6H18/rt1FnfYWqYHWvcPC1m5vhh5oTSuyXBoM30GLQCyU7Uxs7dSjdvYR+KG4xLmoTrpX6rmkD8UXIl6JndJtUxEBljM2O/Rc01V/p4ubprc7Du3nKkphqaxzZY1STkgo8xvUALvp+CmOCx2vpozJfolaSLbiX2C7Ccy1Sn6rueLfIQ1ku3wr1zrB9VUL7Qw5wgaifbe3UDh8INrHUddKtJ0L6yiJcoW/SK5aaLddJ6VXfEAxsKa/namlDz02GSB+nzIsrUMo9oedSWQWGWZcrjMthLptpsyth2FnWgLxk1zQTzvq0qRSQcavalKpaMB8SSWnceKt1XoJn7dJfhvm1ipo4YcQS6PNmEzwE8+tHChhnzSj95lxewMZt0SkFkp7SKHnQ2x1820oAnKdEnnFkovDcbdeOdNgAVdqL6HudeXoXMqXo6YG92u+K4TSGiXXdpjGEOpIhbkCIW/Y0CwpJDkoRb0Jh+vcWIJsYLyJs7E7m6+2p05aWRsz3cxARRizjUmfgwOuOi4umxRT7ue6i0sL/wbYKm8I9OhqZSpjS6UAeQ4XdXF1cNeb3mbqTffVp7Ti9JANCuoVOQU3hSuYYRVSr2pa7FlpL7yrFqVq3cbv6tJ8FiZ8uT3Uguew8UuJQjGbLhkWyW3TVxdPXgwMoZQ4dN3pu7iio1+WZwI4qKFXMurXFcxDWO0QR74II72uHuIg9axoRrCHOMiA8+qnPgOV8G4cSuLQmQYHDuzI65ZMffX1jTLE0wrrztVQe6oaQKXUkfXTBYqnJb7D6GZds4emfq2OmPiuTxUdN9+VcAJb4LvBSFW9ZvhuMGLmu+R4iVPxXb/Aeq3MdwOacaQGFhffUfOZeZfs0PaV7wLwJ9deCAVXn04yagWS6XQUmDRWD4d1ko1VmGw3XlijyGQDZcZqioptkhpKtki3O6gPCEuGXHqPyqlRdDD52fZcKmdgGTwu/J4oFYhLH95NVtG9quqq2wrxcMriOE/NKPCc8qvrvZWWGXaa/nehKsn+/ZeqVRLN52vaKFWhJ4m4AvyuKCfHFm3c8kHi0RPwLj+JkTXCWiGGXc4M52ouwNx5k7SAV0NIR4u6FaTbJct3TyCJOrNAsZi9z+J9o951yFlTqEdBxxxBteOnqM5Tf9Hc66kDSZwOZOta/jg9jro0q8gLnNhq3mmxL1gjPY87d5qmN0cEvGwktQbtmjkja0CPFaKn3jB5/0MaZTDp8aKoBG3PfWDayESpzpK5+AVDM+eDnIy58pnZ9pnLhKI+JdfeZGJnLovotBa/IebCOCYbs9Qv/z/WiCx/AHtqRDbnSw2IbenYDOA6347l1GE7xvCkYsaTXfy2zxtoL25sUafc6YMcUeOR7XJ9yAVCNym1uECw349OghHVBTYJEMROArse5GHt+W9743qLeS1yBq9DPrkowyGfbpux9nInCSaWXJpmAJvh1HmGI2u+DK5yhz/XtQ9LE8r1pwxJ3mgH7fKZpMCnkEPHTPXn6m1nqia/TQuPlkrB9qN4Hv633TPUe0GYKrufpuUx3Mtwq5F7oudfXV2dcPK7bSzXigNBoIJWin2p8qOVRwPALIO6BX6hsK4hmBvcunQ4T5ncldFcRORORlSqif8dlJAPVnpvoo0E/bsnufU1qh2q7CtpkGIJlZ/OOOs0+cwPl3ioHtlnZe7+S4Lim1yqQiMD+ap/PbmEs4xT2FjGigsAqmouK2/ROn47tSsz6KpIov9ncZRs5S1eL6MET68Dq22/Y9GVLGfFVg6BEMbKh8vssB/ZCcIcXbvl/bebQlZ7l+zEHoYbkcBAQ5t1KrIgHVXF5uxHGBcfXlq2EirS8rrBsq7TbwYPZewv0hM3StaTfW4JP8GW8EQEHzqEeXAD2fI3ewpzWuGaWZDaUzj/WEpt+teIhjUWc4HL/X/gJvTe/4Nw7+FP</diagram></mxfile>
<mxfile userAgent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.98 Safari/537.36" type="device" version="6.4.0" editor="www.draw.io"><diagram name="Page-1">7R1bm6I49tf4WH7ckcdSy57Z7e6tXXe/mXlEico0Ehewq2p+/SSQICRBowTUttuHhkNIwjkn557UwJxs3z8l/m7zBQYgGhha8D4wpwPD0C3DQf9hyEcB8TS7AKyTMCCNDoB5+BcgQI1A92EA0lrDDMIoC3d14BLGMVhmNZifJPCt3mwFo/qoO38NOMB86Uc89LcwyDYFdEQ/C8N/AeF6Q0fWHa94svCX39YJ3MdkvIFhrvJ/xeOtT/siH5pu/AC+VUDmy8CcJBBmxdX2fQIijFuKtuK9WcPTct4JiDOZFyyjeOO7H+0BnXI+seyDIuNtE2ZgvvOX+P4NEXxgjjfZNkJ3OrosP0FDN+vIT1NynWYJ/AYmMIJJ3pE50vCvfEIxiztZhVFUafni4B/uL/GDEH0LfRbDGM1izH8m+fLvIMnAewVEPvsTgFuQJR+oCXmKeLJ4hbCo6ZD7twPBDQrbVIjtEphPeGxddn3AM7ogqBajvT3WYeLHmIdZzM1mL5PJrBlzF1Ar8NMNCMi4HWC+XPWnMK/bClBvnUY9wtEOX6Kv9KMIRF/85BtIGK5n0WVN8U8RhhyzjiGXx5Bu8QhSgR9bAj/c8q2Lgxx52/c11g3DVQTflhs/yYa7BC4B5rexiLcVoI0KM7qiRVgzBHxlKkCbc79osxm0jXpEm3u/aHMYtHk9om3EYQkEyIAhtzDJNnANYz96OUDHuVWSy3GtjkMQB8/YZkK3iwguvxWgWRi1kHWZn6wBlUoFCM/wKF4TEPlZ+L1ugomwRF59hSEa9SA0NUatjBhEF3Mib1VtIrYj50RHKdwnS8B1hJDof1Sa7XCDtHnCui4e58ADRY8HjiiRJcUk3j0xidMTk7C09TqirSEeRxVtKevcB3Hd6xDX1DoirikeRxlxqS5q4ZRxSlPe8B/nPzVK0mSFsm1zWtIVKEl2WV6iJHUFvi2Hxv4wNzqNuVFXmDPvGXOWe0Wek3CfzsYc5+LPzDw4ogBXjCteSraqEWsKcGWpwNUdG/82qwF4tGkd2f766H7Rxihoq0+0qTWH38Psdwwe2uTuj8qTV5CEaII4ZpS/2IV9VdgydZbo2sBiPRadJUyDi3WBDWQoNnDRwBV64duSYB0bv3pf/i+r9thAtSrrl6pTZhx1rg1v/u72eRPcS7hEc3OiDAemw+/ocp3llChAi4SFoOEE7S56leFHJPOyOpslIA3/8hd5A8yBBJWotT0e2DnHmGM/Ctcxgi0R0+RRZSw+w6UfPZMH2zAIcnaO/AWIxmVGiYnk15ld3rwvuVciCzMzZkdyCSsYZyR3p+vkXm6Z0NXNaY4ypUewOKimxUQa5UkbWjQUThjyifR06XqiTeBqlYK2cszswper0pehme3inyJD2tLrVo4ooC7S1yryEKbIeyvWJeaznFPoVzv/38NiiZbZ1gOoWL2/TFE7DY9uaPP5tLLEi96k13h9PdGVUCECAckvcxH5IWqN7TIE2aB2ID62xFWQ2nbrpB7xPpPj8pQ2VFCaT8m97tPN2s/Am//xsCTRmdCJLrKWdcHqU0IT3o+db5DR9RQhgRqgB3/CRfqwpHHqFpDFrxWvK7rwecZ/5JQgou3lfYeoBJLHpY1r1AWZxxNHqLOUUIePaMwzmOBio0clh8kWewjKbES1HkrIwUdK/oM9CIAAj0sQNiIvIIgoOqqEIB6v6RO4/ffnh6UGF6vukRqWyDWpuMcVelBrGpPliWD4GbWIwCrjbe3Soxb2EYUxeKLfgjvRhwbfB+YLkG3AHuu2OUgQSis2+6K1q/5jspPJprc1gfYTFaUoqawTZY1KTigo8xUGQEzHR6UYI45FjldXxqQlUQuprPgXsd1Ukyr+LfNb3RX/OmwgW+RbidaJowLtCnKEHUT7PG8yORbtU4B2NkQtwrotwHrp5rZCO+86Yb0SIhQj1gzTJV76H90GiH9MGWYwpQoCf1iYRFYiw1wO7TdcCOX1VOZmmvXIbVkCcSIXdLIjnU0qNWT8zk0qNU1YXVJJaJz4W7xe4kVa5Le1nkGrMA74ZopM8Ct/mxyoYJ/0YoH6M+MmmXGjcrJ9xk0bMhsG2ubbqAiodcrIKyW5ODrv3itnOizg0plKJEuUoxMpXxU1NbZEneDdIZStURJth+kMoYJUmCgQcsOGpqSQVEEp1psQuM6dJchs59btzt6r7VknTdbGNPQGEabYxmTHoRM+d16qbFL7rlwXKuu7Z6u6IWCyq1VRGVspBZhxVFGXrg7V9abe4FBveqg+ZStOj9mgSL1STqGbwjGMsApTr0paHFjpqPCuFqVa/fALl8/yGNNRlYfaMI4yfpEoFOPpUmGR2jZ9fPPqZ4gh4hxiaJaIK7T8H+eZGK30SkX9ioJ5FNY6xFEvwijvzw9xMPWso44YiJ2wrljgSHg3AiVx7EyDIwd21HVLpb7aGDmDSoW1NnT0sgUrpaQUjy6reHriO0tMxrP5jk39ch0p4juLVXSq+U7CCeyB72zXrfOd7bbjO3qaxI3wndVgvZ7NdzabcWQNLFV8x5rPinfJOryv/Bwh53Hrx/76p5NcnlLBZDoFBSad1cM5okynygqTdOfHwm7kikx2+3SDPxGzTVFDqSzSLQ7qI4QVU5beo3JtFB1Nfvb9LWdnYBV4XPS6UCpGuX2sYTfZme7Vua46rxCPpywaJUjVUzMEyk+59yYtM/g0/W8AV5L971eOvCeqJLrP1/RRqsKeJCIK8IuinCq2aDt8QfOnxF8hDXuTtJh1XL/C0qK0wXqhBV/N/PyK1oS2jPDX8hX/t0ORLlBv94h6uuQUx/L4SB5VLKXnQp6ZZpMr07u+kdIoVIhXVYqsO3Wen9TslbRIFNM9JF3TW0UsVjamJ0+7nk5rstkDbtjzVxT5oQ7r75rH0z5se9VHd7nmHTNXa8HQ00kVV2Mu98rM1b62+FK7QP505x4sNZsxFwSVxqLDs5Qcaco7MK84IDXdZ7e9mbm/oBRHHbmjzVSEpFx+C9pDLhB2B0SPC4T2++gkcLWTJLC6IgGfbH7Z+uFt74rtMWhunpRPIsqokE+exdFA+TFlWj2G6V4/iNlIs1pBCUFN7WRZIs77j1N6ohPfi6D0PmLD1FHIQo6dYWMF+MeHwadf543n1pRgfig1g/9zvwBJDDKQXm8OExinOXKvNP5wOLzix+/TDG4xByKBirRSFsL4ktkgYJVBxQK/UVi3EMwd7os4ngQpnsIkAAnzpCIq8Yf/CymhMMNCPzcS8vdeYRrmqBaoss9MgxJLVPnl6aw8B7cM4zXBA7uJgzz9L0SKb/rkHiCf87enT3oFNoYZ5gIEtQbn7P9ovVdTlHYQlTuwh+VfJFvVVsbKKMHr68DL9/bI6spuDqI8O2TCMJZ3PGJiO8eatw6YeGr/lEYjq/0Y7NR12M41xEJHddTuPBZkZ3Vmc+XnoxqNlqdsmUWSy+sOa0auv9M0hlm4+hicV6xy0vv9ud9UUohQuarkhFd7ZNY3hdzUEa9l+VxlQeaewv3HUlorkRbRsM5iLuj28Ac2C3of/oqp+fI3</diagram></mxfile>

View file

@ -36,7 +36,7 @@ import (
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/util/httputil"
)
const (
@ -104,18 +104,13 @@ type Notifier struct {
queue []*Alert
opts *Options
metrics *alertMetrics
more chan struct{}
mtx sync.RWMutex
ctx context.Context
cancel func()
latency *prometheus.SummaryVec
errors *prometheus.CounterVec
sent *prometheus.CounterVec
dropped prometheus.Counter
queueLength prometheus.Gauge
queueCapacity prometheus.Metric
alertmanagers []*alertmanagerSet
cancelDiscovery func()
}
@ -127,23 +122,21 @@ type Options struct {
RelabelConfigs []*config.RelabelConfig
// Used for sending HTTP requests to the Alertmanager.
Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error)
Registerer prometheus.Registerer
}
// New constructs a new Notifier.
func New(o *Options) *Notifier {
ctx, cancel := context.WithCancel(context.Background())
if o.Do == nil {
o.Do = ctxhttp.Do
}
return &Notifier{
queue: make([]*Alert, 0, o.QueueCapacity),
ctx: ctx,
cancel: cancel,
more: make(chan struct{}, 1),
opts: o,
type alertMetrics struct {
latency *prometheus.SummaryVec
errors *prometheus.CounterVec
sent *prometheus.CounterVec
dropped prometheus.Counter
queueLength prometheus.GaugeFunc
queueCapacity prometheus.Gauge
}
func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen func() float64) *alertMetrics {
m := &alertMetrics{
latency: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -174,22 +167,55 @@ func New(o *Options) *Notifier {
Name: "dropped_total",
Help: "Total number of alerts dropped due to errors when sending to Alertmanager.",
}),
queueLength: prometheus.NewGauge(prometheus.GaugeOpts{
queueLength: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_length",
Help: "The number of alert notifications in the queue.",
}, queueLen),
queueCapacity: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_capacity",
Help: "The capacity of the alert notifications queue.",
}),
queueCapacity: prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "queue_capacity"),
"The capacity of the alert notifications queue.",
nil, nil,
),
prometheus.GaugeValue,
float64(o.QueueCapacity),
),
}
m.queueCapacity.Set(float64(queueCap))
if r != nil {
r.MustRegister(
m.latency,
m.errors,
m.sent,
m.dropped,
m.queueLength,
m.queueCapacity,
)
}
return m
}
// New constructs a new Notifier.
func New(o *Options) *Notifier {
ctx, cancel := context.WithCancel(context.Background())
if o.Do == nil {
o.Do = ctxhttp.Do
}
n := &Notifier{
queue: make([]*Alert, 0, o.QueueCapacity),
ctx: ctx,
cancel: cancel,
more: make(chan struct{}, 1),
opts: o,
}
queueLenFunc := func() float64 { return float64(n.queueLen()) }
n.metrics = newAlertMetrics(o.Registerer, o.QueueCapacity, queueLenFunc)
return n
}
// ApplyConfig updates the status state as the new config requires.
@ -208,6 +234,9 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
if err != nil {
return err
}
ams.metrics = n.metrics
amSets = append(amSets, ams)
}
@ -264,7 +293,7 @@ func (n *Notifier) Run() {
alerts := n.nextBatch()
if !n.sendAll(alerts...) {
n.dropped.Add(float64(len(alerts)))
n.metrics.dropped.Add(float64(len(alerts)))
}
// If the queue still has items left, kick off the next iteration.
if n.queueLen() > 0 {
@ -300,7 +329,7 @@ func (n *Notifier) Send(alerts ...*Alert) {
alerts = alerts[d:]
log.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d)
n.dropped.Add(float64(d))
n.metrics.dropped.Add(float64(d))
}
// If the queue is full, remove the oldest alerts in favor
@ -309,7 +338,7 @@ func (n *Notifier) Send(alerts ...*Alert) {
n.queue = n.queue[d:]
log.Warnf("Alert notification queue full, dropping %d alerts", d)
n.dropped.Add(float64(d))
n.metrics.dropped.Add(float64(d))
}
n.queue = append(n.queue, alerts...)
@ -392,12 +421,12 @@ func (n *Notifier) sendAll(alerts ...*Alert) bool {
if err := n.sendOne(ctx, ams.client, u, b); err != nil {
log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err)
n.errors.WithLabelValues(u).Inc()
n.metrics.errors.WithLabelValues(u).Inc()
} else {
atomic.AddUint64(&numSuccess, 1)
}
n.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds())
n.sent.WithLabelValues(u).Add(float64(len(alerts)))
n.metrics.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds())
n.metrics.sent.WithLabelValues(u).Add(float64(len(alerts)))
wg.Done()
}(am)
@ -434,30 +463,6 @@ func (n *Notifier) Stop() {
n.cancel()
}
// Describe implements prometheus.Collector.
func (n *Notifier) Describe(ch chan<- *prometheus.Desc) {
n.latency.Describe(ch)
n.errors.Describe(ch)
n.sent.Describe(ch)
ch <- n.dropped.Desc()
ch <- n.queueLength.Desc()
ch <- n.queueCapacity.Desc()
}
// Collect implements prometheus.Collector.
func (n *Notifier) Collect(ch chan<- prometheus.Metric) {
n.queueLength.Set(float64(n.queueLen()))
n.latency.Collect(ch)
n.errors.Collect(ch)
n.sent.Collect(ch)
ch <- n.dropped
ch <- n.queueLength
ch <- n.queueCapacity
}
// alertmanager holds Alertmanager endpoint information.
type alertmanager interface {
url() string
@ -483,12 +488,14 @@ type alertmanagerSet struct {
cfg *config.AlertmanagerConfig
client *http.Client
metrics *alertMetrics
mtx sync.RWMutex
ams []alertmanager
}
func newAlertmanagerSet(cfg *config.AlertmanagerConfig) (*alertmanagerSet, error) {
client, err := retrieval.NewHTTPClient(cfg.HTTPClientConfig)
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil {
return nil, err
}
@ -527,6 +534,10 @@ func (s *alertmanagerSet) Sync(tgs []*config.TargetGroup) {
continue
}
// This will initialise the Counters for the AM to 0.
s.metrics.sent.WithLabelValues(us)
s.metrics.errors.WithLabelValues(us)
seen[us] = struct{}{}
s.ams = append(s.ams, am)
}

View file

@ -14,9 +14,10 @@ const sep = '\xff'
// Well-known label names used by Prometheus components.
const (
MetricName = "__name__"
AlertName = "alertname"
BucketLabel = "le"
MetricName = "__name__"
AlertName = "alertname"
BucketLabel = "le"
InstanceName = "instance"
)
// Label is a key/value pair of strings.

View file

@ -36,6 +36,7 @@ import (
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/httputil"
)
const (
@ -166,7 +167,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
client, err := NewHTTPClient(cfg.HTTPClientConfig)
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil {
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)

View file

@ -30,6 +30,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/util/httputil"
)
const (
@ -150,7 +151,7 @@ func TestNewHTTPBearerToken(t *testing.T) {
cfg := config.HTTPClientConfig{
BearerToken: "1234",
}
c, err := NewHTTPClient(cfg)
c, err := httputil.NewClientFromConfig(cfg)
if err != nil {
t.Fatal(err)
}
@ -177,7 +178,7 @@ func TestNewHTTPBearerTokenFile(t *testing.T) {
cfg := config.HTTPClientConfig{
BearerTokenFile: "testdata/bearertoken.txt",
}
c, err := NewHTTPClient(cfg)
c, err := httputil.NewClientFromConfig(cfg)
if err != nil {
t.Fatal(err)
}
@ -206,7 +207,7 @@ func TestNewHTTPBasicAuth(t *testing.T) {
Password: "password123",
},
}
c, err := NewHTTPClient(cfg)
c, err := httputil.NewClientFromConfig(cfg)
if err != nil {
t.Fatal(err)
}
@ -234,7 +235,7 @@ func TestNewHTTPCACert(t *testing.T) {
CAFile: caCertPath,
},
}
c, err := NewHTTPClient(cfg)
c, err := httputil.NewClientFromConfig(cfg)
if err != nil {
t.Fatal(err)
}
@ -268,7 +269,7 @@ func TestNewHTTPClientCert(t *testing.T) {
KeyFile: "testdata/client.key",
},
}
c, err := NewHTTPClient(cfg)
c, err := httputil.NewClientFromConfig(cfg)
if err != nil {
t.Fatal(err)
}
@ -297,7 +298,7 @@ func TestNewHTTPWithServerName(t *testing.T) {
ServerName: "prometheus.rocks",
},
}
c, err := NewHTTPClient(cfg)
c, err := httputil.NewClientFromConfig(cfg)
if err != nil {
t.Fatal(err)
}
@ -326,7 +327,7 @@ func TestNewHTTPWithBadServerName(t *testing.T) {
ServerName: "badname",
},
}
c, err := NewHTTPClient(cfg)
c, err := httputil.NewClientFromConfig(cfg)
if err != nil {
t.Fatal(err)
}
@ -365,7 +366,7 @@ func TestNewClientWithBadTLSConfig(t *testing.T) {
KeyFile: "testdata/nonexistent_client.key",
},
}
_, err := NewHTTPClient(cfg)
_, err := httputil.NewClientFromConfig(cfg)
if err == nil {
t.Fatalf("Expected error, got nil.")
}

View file

@ -135,7 +135,7 @@ func (tm *TargetManager) reload() {
}
}
// Targets returns the targets currently being scraped bucketed by their job name.
// Targets returns the targets currently being scraped.
func (tm *TargetManager) Targets() []*Target {
tm.mtx.RLock()
defer tm.mtx.RUnlock()

View file

@ -74,10 +74,15 @@ var (
Name: "evaluator_iterations_skipped_total",
Help: "The total number of rule group evaluations skipped due to throttled metric storage.",
})
iterationsMissed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "evaluator_iterations_missed_total",
Help: "The total number of rule group evaluations missed due to slow rule group evaluation.",
})
iterationsScheduled = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "evaluator_iterations_total",
Help: "The total number of scheduled rule group evaluations, whether skipped or executed.",
Help: "The total number of scheduled rule group evaluations, whether executed, missed or skipped.",
})
)
@ -88,7 +93,9 @@ func init() {
evalFailures.WithLabelValues(string(ruleTypeRecording))
prometheus.MustRegister(iterationDuration)
prometheus.MustRegister(iterationsScheduled)
prometheus.MustRegister(iterationsSkipped)
prometheus.MustRegister(iterationsMissed)
prometheus.MustRegister(evalFailures)
prometheus.MustRegister(evalDuration)
}
@ -154,6 +161,7 @@ func (g *Group) run() {
iterationDuration.Observe(time.Since(start).Seconds())
}
lastTriggered := time.Now()
iter()
tick := time.NewTicker(g.interval)
@ -168,6 +176,12 @@ func (g *Group) run() {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(lastTriggered).Nanoseconds() / g.interval.Nanoseconds()) - 1
if missed > 0 {
iterationsMissed.Add(float64(missed))
iterationsScheduled.Add(float64(missed))
}
lastTriggered = time.Now()
iter()
}
}

View file

@ -1,122 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"bytes"
"fmt"
"net/http"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil"
)
// Client allows sending batches of Prometheus samples to an HTTP endpoint.
type Client struct {
index int // Used to differentiate metrics.
url config.URL
client *http.Client
timeout time.Duration
}
// NewClient creates a new Client.
func NewClient(index int, conf *config.RemoteWriteConfig) (*Client, error) {
tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig)
if err != nil {
return nil, err
}
// The only timeout we care about is the configured push timeout.
// It is applied on request. So we leave out any timings here.
var rt http.RoundTripper = &http.Transport{
Proxy: http.ProxyURL(conf.ProxyURL.URL),
TLSClientConfig: tlsConfig,
}
if conf.BasicAuth != nil {
rt = httputil.NewBasicAuthRoundTripper(conf.BasicAuth.Username, conf.BasicAuth.Password, rt)
}
return &Client{
index: index,
url: *conf.URL,
client: httputil.NewClient(rt),
timeout: time.Duration(conf.RemoteTimeout),
}, nil
}
// Store sends a batch of samples to the HTTP endpoint.
func (c *Client) Store(samples model.Samples) error {
req := &WriteRequest{
Timeseries: make([]*TimeSeries, 0, len(samples)),
}
for _, s := range samples {
ts := &TimeSeries{
Labels: make([]*LabelPair, 0, len(s.Metric)),
}
for k, v := range s.Metric {
ts.Labels = append(ts.Labels,
&LabelPair{
Name: string(k),
Value: string(v),
})
}
ts.Samples = []*Sample{
{
Value: float64(s.Value),
TimestampMs: int64(s.Timestamp),
},
}
req.Timeseries = append(req.Timeseries, ts)
}
data, err := proto.Marshal(req)
if err != nil {
return err
}
buf := bytes.Buffer{}
if _, err := snappy.NewWriter(&buf).Write(data); err != nil {
return err
}
httpReq, err := http.NewRequest("POST", c.url.String(), &buf)
if err != nil {
return err
}
httpReq.Header.Add("Content-Encoding", "snappy")
ctx, _ := context.WithTimeout(context.Background(), c.timeout)
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)
if err != nil {
return err
}
defer httpResp.Body.Close()
if httpResp.StatusCode/100 != 2 {
return fmt.Errorf("server returned HTTP status %s", httpResp.Status)
}
return nil
}
// Name identifies the client.
func (c Client) Name() string {
return fmt.Sprintf("%d:%s", c.index, c.url)
}

View file

@ -1,66 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"sync"
"sync/atomic"
"time"
)
// ewmaRate tracks an exponentially weighted moving average of a per-second rate.
type ewmaRate struct {
newEvents int64
alpha float64
interval time.Duration
lastRate float64
init bool
mutex sync.Mutex
}
func newEWMARate(alpha float64, interval time.Duration) ewmaRate {
return ewmaRate{
alpha: alpha,
interval: interval,
}
}
// rate returns the per-second rate.
func (r *ewmaRate) rate() float64 {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.lastRate
}
// tick assumes to be called every r.interval.
func (r *ewmaRate) tick() {
newEvents := atomic.LoadInt64(&r.newEvents)
atomic.AddInt64(&r.newEvents, -newEvents)
instantRate := float64(newEvents) / r.interval.Seconds()
r.mutex.Lock()
defer r.mutex.Unlock()
if r.init {
r.lastRate += r.alpha * (instantRate - r.lastRate)
} else {
r.init = true
r.lastRate = instantRate
}
}
// inc counts one event.
func (r *ewmaRate) incr(incr int64) {
atomic.AddInt64(&r.newEvents, incr)
}

View file

@ -1,487 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"math"
"sync"
"time"
"golang.org/x/time/rate"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/relabel"
)
// String constants for instrumentation.
const (
namespace = "prometheus"
subsystem = "remote_storage"
queue = "queue"
// With a maximum of 1000 shards, assuming an average of 100ms remote write
// time and 100 samples per batch, we will be able to push 1M samples/s.
defaultMaxShards = 1000
defaultMaxSamplesPerSend = 100
// defaultQueueCapacity is per shard - at 1000 shards, this will buffer
// 100M samples. It is configured to buffer 1000 batches, which at 100ms
// per batch is 1:40mins.
defaultQueueCapacity = defaultMaxSamplesPerSend * 1000
defaultBatchSendDeadline = 5 * time.Second
// We track samples in/out and how long pushes take using an Exponentially
// Weighted Moving Average.
ewmaWeight = 0.2
shardUpdateDuration = 10 * time.Second
// Allow 30% too many shards before scaling down.
shardToleranceFraction = 0.3
// Limit to 1 log event every 10s
logRateLimit = 0.1
logBurst = 10
)
var (
sentSamplesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_samples_total",
Help: "Total number of processed samples sent to remote storage.",
},
[]string{queue},
)
failedSamplesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "failed_samples_total",
Help: "Total number of processed samples which failed on send to remote storage.",
},
[]string{queue},
)
droppedSamplesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dropped_samples_total",
Help: "Total number of samples which were dropped due to the queue being full.",
},
[]string{queue},
)
sentBatchDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_batch_duration_seconds",
Help: "Duration of sample batch send calls to the remote storage.",
Buckets: prometheus.DefBuckets,
},
[]string{queue},
)
queueLength = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_length",
Help: "The number of processed samples queued to be sent to the remote storage.",
},
[]string{queue},
)
queueCapacity = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_capacity",
Help: "The capacity of the queue of samples to be sent to the remote storage.",
},
[]string{queue},
)
numShards = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "shards",
Help: "The number of shards used for parallel sending to the remote storage.",
},
[]string{queue},
)
)
func init() {
prometheus.MustRegister(sentSamplesTotal)
prometheus.MustRegister(failedSamplesTotal)
prometheus.MustRegister(droppedSamplesTotal)
prometheus.MustRegister(sentBatchDuration)
prometheus.MustRegister(queueLength)
prometheus.MustRegister(queueCapacity)
prometheus.MustRegister(numShards)
}
// StorageClient defines an interface for sending a batch of samples to an
// external timeseries database.
type StorageClient interface {
// Store stores the given samples in the remote storage.
Store(model.Samples) error
// Name identifies the remote storage implementation.
Name() string
}
// QueueManagerConfig configures a storage queue.
type QueueManagerConfig struct {
QueueCapacity int // Number of samples to buffer per shard before we start dropping them.
MaxShards int // Max number of shards, i.e. amount of concurrency.
MaxSamplesPerSend int // Maximum number of samples per send.
BatchSendDeadline time.Duration // Maximum time sample will wait in buffer.
ExternalLabels model.LabelSet
RelabelConfigs []*config.RelabelConfig
Client StorageClient
}
// QueueManager manages a queue of samples to be sent to the Storage
// indicated by the provided StorageClient.
type QueueManager struct {
cfg QueueManagerConfig
queueName string
logLimiter *rate.Limiter
shardsMtx sync.Mutex
shards *shards
numShards int
reshardChan chan int
quit chan struct{}
wg sync.WaitGroup
samplesIn, samplesOut, samplesOutDuration ewmaRate
integralAccumulator float64
}
// NewQueueManager builds a new QueueManager.
func NewQueueManager(cfg QueueManagerConfig) *QueueManager {
if cfg.QueueCapacity == 0 {
cfg.QueueCapacity = defaultQueueCapacity
}
if cfg.MaxShards == 0 {
cfg.MaxShards = defaultMaxShards
}
if cfg.MaxSamplesPerSend == 0 {
cfg.MaxSamplesPerSend = defaultMaxSamplesPerSend
}
if cfg.BatchSendDeadline == 0 {
cfg.BatchSendDeadline = defaultBatchSendDeadline
}
t := &QueueManager{
cfg: cfg,
queueName: cfg.Client.Name(),
logLimiter: rate.NewLimiter(logRateLimit, logBurst),
numShards: 1,
reshardChan: make(chan int),
quit: make(chan struct{}),
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
}
t.shards = t.newShards(t.numShards)
numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity))
return t
}
// Append queues a sample to be sent to the remote storage. It drops the
// sample on the floor if the queue is full.
// Always returns nil.
func (t *QueueManager) Append(s *model.Sample) error {
var snew model.Sample
snew = *s
snew.Metric = s.Metric.Clone()
for ln, lv := range t.cfg.ExternalLabels {
if _, ok := s.Metric[ln]; !ok {
snew.Metric[ln] = lv
}
}
snew.Metric = model.Metric(
relabel.Process(model.LabelSet(snew.Metric), t.cfg.RelabelConfigs...))
if snew.Metric == nil {
return nil
}
t.shardsMtx.Lock()
enqueued := t.shards.enqueue(&snew)
t.shardsMtx.Unlock()
if enqueued {
queueLength.WithLabelValues(t.queueName).Inc()
} else {
droppedSamplesTotal.WithLabelValues(t.queueName).Inc()
if t.logLimiter.Allow() {
log.Warn("Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.")
}
}
return nil
}
// NeedsThrottling implements storage.SampleAppender. It will always return
// false as a remote storage drops samples on the floor if backlogging instead
// of asking for throttling.
func (*QueueManager) NeedsThrottling() bool {
return false
}
// Start the queue manager sending samples to the remote storage.
// Does not block.
func (t *QueueManager) Start() {
t.wg.Add(2)
go t.updateShardsLoop()
go t.reshardLoop()
t.shardsMtx.Lock()
defer t.shardsMtx.Unlock()
t.shards.start()
}
// Stop stops sending samples to the remote storage and waits for pending
// sends to complete.
func (t *QueueManager) Stop() {
log.Infof("Stopping remote storage...")
close(t.quit)
t.wg.Wait()
t.shardsMtx.Lock()
defer t.shardsMtx.Unlock()
t.shards.stop()
log.Info("Remote storage stopped.")
}
func (t *QueueManager) updateShardsLoop() {
defer t.wg.Done()
ticker := time.Tick(shardUpdateDuration)
for {
select {
case <-ticker:
t.calculateDesiredShards()
case <-t.quit:
return
}
}
}
func (t *QueueManager) calculateDesiredShards() {
t.samplesIn.tick()
t.samplesOut.tick()
t.samplesOutDuration.tick()
// We use the number of incoming samples as a prediction of how much work we
// will need to do next iteration. We add to this any pending samples
// (received - send) so we can catch up with any backlog. We use the average
// outgoing batch latency to work out how many shards we need.
var (
samplesIn = t.samplesIn.rate()
samplesOut = t.samplesOut.rate()
samplesPending = samplesIn - samplesOut
samplesOutDuration = t.samplesOutDuration.rate()
)
// We use an integral accumulator, like in a PID, to help dampen oscillation.
t.integralAccumulator = t.integralAccumulator + (samplesPending * 0.1)
if samplesOut <= 0 {
return
}
var (
timePerSample = samplesOutDuration / samplesOut
desiredShards = (timePerSample * (samplesIn + samplesPending + t.integralAccumulator)) / float64(time.Second)
)
log.Debugf("QueueManager.caclulateDesiredShards samplesIn=%f, samplesOut=%f, samplesPending=%f, desiredShards=%f",
samplesIn, samplesOut, samplesPending, desiredShards)
// Changes in the number of shards must be greater than shardToleranceFraction.
var (
lowerBound = float64(t.numShards) * (1. - shardToleranceFraction)
upperBound = float64(t.numShards) * (1. + shardToleranceFraction)
)
log.Debugf("QueueManager.updateShardsLoop %f <= %f <= %f", lowerBound, desiredShards, upperBound)
if lowerBound <= desiredShards && desiredShards <= upperBound {
return
}
numShards := int(math.Ceil(desiredShards))
if numShards > t.cfg.MaxShards {
numShards = t.cfg.MaxShards
}
if numShards == t.numShards {
return
}
// Resharding can take some time, and we want this loop
// to stay close to shardUpdateDuration.
select {
case t.reshardChan <- numShards:
log.Infof("Remote storage resharding from %d to %d shards.", t.numShards, numShards)
t.numShards = numShards
default:
log.Infof("Currently resharding, skipping.")
}
}
func (t *QueueManager) reshardLoop() {
defer t.wg.Done()
for {
select {
case numShards := <-t.reshardChan:
t.reshard(numShards)
case <-t.quit:
return
}
}
}
func (t *QueueManager) reshard(n int) {
numShards.WithLabelValues(t.queueName).Set(float64(n))
t.shardsMtx.Lock()
newShards := t.newShards(n)
oldShards := t.shards
t.shards = newShards
t.shardsMtx.Unlock()
oldShards.stop()
// We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in
// order.
newShards.start()
}
type shards struct {
qm *QueueManager
queues []chan *model.Sample
done chan struct{}
wg sync.WaitGroup
}
func (t *QueueManager) newShards(numShards int) *shards {
queues := make([]chan *model.Sample, numShards)
for i := 0; i < numShards; i++ {
queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity)
}
s := &shards{
qm: t,
queues: queues,
done: make(chan struct{}),
}
s.wg.Add(numShards)
return s
}
func (s *shards) len() int {
return len(s.queues)
}
func (s *shards) start() {
for i := 0; i < len(s.queues); i++ {
go s.runShard(i)
}
}
func (s *shards) stop() {
for _, shard := range s.queues {
close(shard)
}
s.wg.Wait()
}
func (s *shards) enqueue(sample *model.Sample) bool {
s.qm.samplesIn.incr(1)
fp := sample.Metric.FastFingerprint()
shard := uint64(fp) % uint64(len(s.queues))
select {
case s.queues[shard] <- sample:
return true
default:
return false
}
}
func (s *shards) runShard(i int) {
defer s.wg.Done()
queue := s.queues[i]
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline
// anyways.
pendingSamples := model.Samples{}
for {
select {
case sample, ok := <-queue:
if !ok {
if len(pendingSamples) > 0 {
log.Debugf("Flushing %d samples to remote storage...", len(pendingSamples))
s.sendSamples(pendingSamples)
log.Debugf("Done flushing.")
}
return
}
queueLength.WithLabelValues(s.qm.queueName).Dec()
pendingSamples = append(pendingSamples, sample)
for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend {
s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend])
pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:]
}
case <-time.After(s.qm.cfg.BatchSendDeadline):
if len(pendingSamples) > 0 {
s.sendSamples(pendingSamples)
pendingSamples = pendingSamples[:0]
}
}
}
}
func (s *shards) sendSamples(samples model.Samples) {
// Samples are sent to the remote storage on a best-effort basis. If a
// sample isn't sent correctly the first time, it's simply dropped on the
// floor.
begin := time.Now()
err := s.qm.cfg.Client.Store(samples)
duration := time.Since(begin)
if err != nil {
log.Warnf("error sending %d samples to remote storage: %s", len(samples), err)
failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
} else {
sentSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
}
sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(duration.Seconds())
s.qm.samplesOut.incr(int64(len(samples)))
s.qm.samplesOutDuration.incr(int64(duration))
}

View file

@ -1,259 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/prometheus/common/model"
)
type TestStorageClient struct {
receivedSamples map[string]model.Samples
expectedSamples map[string]model.Samples
wg sync.WaitGroup
mtx sync.Mutex
}
func NewTestStorageClient() *TestStorageClient {
return &TestStorageClient{
receivedSamples: map[string]model.Samples{},
expectedSamples: map[string]model.Samples{},
}
}
func (c *TestStorageClient) expectSamples(ss model.Samples) {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, s := range ss {
ts := s.Metric.String()
c.expectedSamples[ts] = append(c.expectedSamples[ts], s)
}
c.wg.Add(len(ss))
}
func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) {
c.wg.Wait()
c.mtx.Lock()
defer c.mtx.Unlock()
for ts, expectedSamples := range c.expectedSamples {
for i, expected := range expectedSamples {
if !expected.Equal(c.receivedSamples[ts][i]) {
t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i])
}
}
}
}
func (c *TestStorageClient) Store(ss model.Samples) error {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, s := range ss {
ts := s.Metric.String()
c.receivedSamples[ts] = append(c.receivedSamples[ts], s)
}
c.wg.Add(-len(ss))
return nil
}
func (c *TestStorageClient) Name() string {
return "teststorageclient"
}
func TestSampleDelivery(t *testing.T) {
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
n := defaultQueueCapacity * 2
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i))
samples = append(samples, &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: name,
},
Value: model.SampleValue(i),
})
}
c := NewTestStorageClient()
c.expectSamples(samples[:len(samples)/2])
m := NewQueueManager(QueueManagerConfig{
Client: c,
MaxShards: 1,
})
// These should be received by the client.
for _, s := range samples[:len(samples)/2] {
m.Append(s)
}
// These will be dropped because the queue is full.
for _, s := range samples[len(samples)/2:] {
m.Append(s)
}
m.Start()
defer m.Stop()
c.waitForExpectedSamples(t)
}
func TestSampleDeliveryOrder(t *testing.T) {
ts := 10
n := defaultMaxSamplesPerSend * ts
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i%ts))
samples = append(samples, &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: name,
},
Value: model.SampleValue(i),
Timestamp: model.Time(i),
})
}
c := NewTestStorageClient()
c.expectSamples(samples)
m := NewQueueManager(QueueManagerConfig{
Client: c,
// Ensure we don't drop samples in this test.
QueueCapacity: n,
})
// These should be received by the client.
for _, s := range samples {
m.Append(s)
}
m.Start()
defer m.Stop()
c.waitForExpectedSamples(t)
}
// TestBlockingStorageClient is a queue_manager StorageClient which will block
// on any calls to Store(), until the `block` channel is closed, at which point
// the `numCalls` property will contain a count of how many times Store() was
// called.
type TestBlockingStorageClient struct {
numCalls uint64
block chan bool
}
func NewTestBlockedStorageClient() *TestBlockingStorageClient {
return &TestBlockingStorageClient{
block: make(chan bool),
numCalls: 0,
}
}
func (c *TestBlockingStorageClient) Store(s model.Samples) error {
atomic.AddUint64(&c.numCalls, 1)
<-c.block
return nil
}
func (c *TestBlockingStorageClient) NumCalls() uint64 {
return atomic.LoadUint64(&c.numCalls)
}
func (c *TestBlockingStorageClient) unlock() {
close(c.block)
}
func (c *TestBlockingStorageClient) Name() string {
return "testblockingstorageclient"
}
func (t *QueueManager) queueLen() int {
t.shardsMtx.Lock()
defer t.shardsMtx.Unlock()
queueLength := 0
for _, shard := range t.shards.queues {
queueLength += len(shard)
}
return queueLength
}
func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
// Our goal is to fully empty the queue:
// `MaxSamplesPerSend*Shards` samples should be consumed by the
// per-shard goroutines, and then another `MaxSamplesPerSend`
// should be left on the queue.
n := defaultMaxSamplesPerSend*1 + defaultMaxSamplesPerSend
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i))
samples = append(samples, &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: name,
},
Value: model.SampleValue(i),
})
}
c := NewTestBlockedStorageClient()
m := NewQueueManager(QueueManagerConfig{
Client: c,
QueueCapacity: n,
MaxShards: 1,
})
m.Start()
defer func() {
c.unlock()
m.Stop()
}()
for _, s := range samples {
m.Append(s)
}
// Wait until the runShard() loops drain the queue. If things went right, it
// should then immediately block in sendSamples(), but, in case of error,
// it would spawn too many goroutines, and thus we'd see more calls to
// client.Store()
//
// The timed wait is maybe non-ideal, but, in order to verify that we're
// not spawning too many concurrent goroutines, we have to wait on the
// Run() loop to consume a specific number of elements from the
// queue... and it doesn't signal that in any obvious way, except by
// draining the queue. We cap the waiting at 1 second -- that should give
// plenty of time, and keeps the failure fairly quick if we're not draining
// the queue properly.
for i := 0; i < 100 && m.queueLen() > 0; i++ {
time.Sleep(10 * time.Millisecond)
}
if m.queueLen() != defaultMaxSamplesPerSend {
t.Fatalf("Failed to drain QueueManager queue, %d elements left",
m.queueLen(),
)
}
numCalls := c.NumCalls()
if numCalls != uint64(1) {
t.Errorf("Saw %d concurrent sends, expected 1", numCalls)
}
}

View file

@ -1,84 +0,0 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"sync"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
)
// Storage allows queueing samples for remote writes.
type Storage struct {
mtx sync.RWMutex
queues []*QueueManager
}
// ApplyConfig updates the state as the new config requires.
func (s *Storage) ApplyConfig(conf *config.Config) error {
s.mtx.Lock()
defer s.mtx.Unlock()
newQueues := []*QueueManager{}
// TODO: we should only stop & recreate queues which have changes,
// as this can be quite disruptive.
for i, rwConf := range conf.RemoteWriteConfigs {
c, err := NewClient(i, rwConf)
if err != nil {
return err
}
newQueues = append(newQueues, NewQueueManager(QueueManagerConfig{
Client: c,
ExternalLabels: conf.GlobalConfig.ExternalLabels,
RelabelConfigs: rwConf.WriteRelabelConfigs,
}))
}
for _, q := range s.queues {
q.Stop()
}
s.queues = newQueues
for _, q := range s.queues {
q.Start()
}
return nil
}
// Stop the background processing of the storage queues.
func (s *Storage) Stop() {
for _, q := range s.queues {
q.Stop()
}
}
// Append implements storage.SampleAppender. Always returns nil.
func (s *Storage) Append(smpl *model.Sample) error {
s.mtx.RLock()
defer s.mtx.RUnlock()
for _, q := range s.queues {
q.Append(smpl)
}
return nil
}
// NeedsThrottling implements storage.SampleAppender. It will always return
// false as a remote storage drops samples on the floor if backlogging instead
// of asking for throttling.
func (s *Storage) NeedsThrottling() bool {
return false
}

View file

@ -1,120 +0,0 @@
// Code generated by protoc-gen-go.
// source: remote.proto
// DO NOT EDIT!
/*
Package remote is a generated protocol buffer package.
It is generated from these files:
remote.proto
It has these top-level messages:
Sample
LabelPair
TimeSeries
WriteRequest
*/
package remote
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Sample struct {
Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"`
TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"`
}
func (m *Sample) Reset() { *m = Sample{} }
func (m *Sample) String() string { return proto.CompactTextString(m) }
func (*Sample) ProtoMessage() {}
func (*Sample) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type LabelPair struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"`
}
func (m *LabelPair) Reset() { *m = LabelPair{} }
func (m *LabelPair) String() string { return proto.CompactTextString(m) }
func (*LabelPair) ProtoMessage() {}
func (*LabelPair) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
type TimeSeries struct {
Labels []*LabelPair `protobuf:"bytes,1,rep,name=labels" json:"labels,omitempty"`
// Sorted by time, oldest sample first.
Samples []*Sample `protobuf:"bytes,2,rep,name=samples" json:"samples,omitempty"`
}
func (m *TimeSeries) Reset() { *m = TimeSeries{} }
func (m *TimeSeries) String() string { return proto.CompactTextString(m) }
func (*TimeSeries) ProtoMessage() {}
func (*TimeSeries) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *TimeSeries) GetLabels() []*LabelPair {
if m != nil {
return m.Labels
}
return nil
}
func (m *TimeSeries) GetSamples() []*Sample {
if m != nil {
return m.Samples
}
return nil
}
type WriteRequest struct {
Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"`
}
func (m *WriteRequest) Reset() { *m = WriteRequest{} }
func (m *WriteRequest) String() string { return proto.CompactTextString(m) }
func (*WriteRequest) ProtoMessage() {}
func (*WriteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *WriteRequest) GetTimeseries() []*TimeSeries {
if m != nil {
return m.Timeseries
}
return nil
}
func init() {
proto.RegisterType((*Sample)(nil), "remote.Sample")
proto.RegisterType((*LabelPair)(nil), "remote.LabelPair")
proto.RegisterType((*TimeSeries)(nil), "remote.TimeSeries")
proto.RegisterType((*WriteRequest)(nil), "remote.WriteRequest")
}
func init() { proto.RegisterFile("remote.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 216 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x4c, 0x90, 0x3f, 0x4f, 0x80, 0x30,
0x10, 0xc5, 0x03, 0x68, 0x0d, 0x07, 0x31, 0xf1, 0xe2, 0xc0, 0xa8, 0x9d, 0x70, 0x61, 0xc0, 0xf8,
0x01, 0x74, 0xd6, 0xc4, 0x14, 0x13, 0x47, 0x53, 0x92, 0x1b, 0x9a, 0xb4, 0x82, 0x6d, 0xf1, 0xf3,
0x5b, 0x5a, 0xfe, 0xb8, 0xf5, 0xdd, 0xbd, 0x7b, 0xf7, 0xeb, 0x41, 0x6d, 0xc9, 0x4c, 0x9e, 0xba,
0xd9, 0x4e, 0x7e, 0x42, 0x96, 0x14, 0x7f, 0x06, 0x36, 0x48, 0x33, 0x6b, 0xc2, 0x5b, 0xb8, 0xfc,
0x95, 0x7a, 0xa1, 0x26, 0xbb, 0xcb, 0xda, 0x4c, 0x24, 0x81, 0xf7, 0x50, 0x7b, 0x65, 0xc8, 0xf9,
0x60, 0xfa, 0x32, 0xae, 0xc9, 0x43, 0xb3, 0x10, 0xd5, 0x51, 0x7b, 0x73, 0xfc, 0x09, 0xca, 0x57,
0x39, 0x92, 0x7e, 0x97, 0xca, 0x22, 0xc2, 0xc5, 0xb7, 0x34, 0x29, 0xa4, 0x14, 0xf1, 0x7d, 0x26,
0xe7, 0xb1, 0x98, 0x04, 0x97, 0x00, 0x1f, 0x21, 0x65, 0x20, 0xab, 0xc8, 0xe1, 0x03, 0x30, 0xbd,
0x86, 0xb8, 0x30, 0x59, 0xb4, 0x55, 0x7f, 0xd3, 0x6d, 0xb8, 0x47, 0xb4, 0xd8, 0x0c, 0xd8, 0xc2,
0x95, 0x8b, 0xc8, 0x2b, 0xcd, 0xea, 0xbd, 0xde, 0xbd, 0xe9, 0x27, 0x62, 0x6f, 0xf3, 0x17, 0xa8,
0x3f, 0xad, 0xf2, 0x24, 0xe8, 0x67, 0x09, 0xb8, 0xd8, 0x03, 0x44, 0xf0, 0xb8, 0x72, 0x5b, 0x84,
0xfb, 0xf0, 0x09, 0x23, 0xfe, 0xb9, 0x46, 0x16, 0xef, 0xf5, 0xf8, 0x17, 0x00, 0x00, 0xff, 0xff,
0x73, 0xb4, 0xd1, 0xb6, 0x3f, 0x01, 0x00, 0x00,
}

View file

@ -1,36 +0,0 @@
// Copyright 2016 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package remote;
message Sample {
double value = 1;
int64 timestamp_ms = 2;
}
message LabelPair {
string name = 1;
string value = 2;
}
message TimeSeries {
repeated LabelPair labels = 1;
// Sorted by time, oldest sample first.
repeated Sample samples = 2;
}
message WriteRequest {
repeated TimeSeries timeseries = 1;
}

View file

@ -21,6 +21,7 @@ import (
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/prometheus/prometheus/config"
@ -31,10 +32,42 @@ func NewClient(rt http.RoundTripper) *http.Client {
return &http.Client{Transport: rt}
}
// NewDeadlineClient returns a new http.Client which will time out long running
// requests.
func NewDeadlineClient(timeout time.Duration, proxyURL *url.URL) *http.Client {
return NewClient(NewDeadlineRoundTripper(timeout, proxyURL))
// NewClientFromConfig returns a new HTTP client configured for the
// given config.HTTPClientConfig.
func NewClientFromConfig(cfg config.HTTPClientConfig) (*http.Client, error) {
tlsConfig, err := NewTLSConfig(cfg.TLSConfig)
if err != nil {
return nil, err
}
// The only timeout we care about is the configured scrape timeout.
// It is applied on request. So we leave out any timings here.
var rt http.RoundTripper = &http.Transport{
Proxy: http.ProxyURL(cfg.ProxyURL.URL),
DisableKeepAlives: true,
TLSClientConfig: tlsConfig,
}
// If a bearer token is provided, create a round tripper that will set the
// Authorization header correctly on each request.
bearerToken := cfg.BearerToken
if len(bearerToken) == 0 && len(cfg.BearerTokenFile) > 0 {
b, err := ioutil.ReadFile(cfg.BearerTokenFile)
if err != nil {
return nil, fmt.Errorf("unable to read bearer token file %s: %s", cfg.BearerTokenFile, err)
}
bearerToken = strings.TrimSpace(string(b))
}
if len(bearerToken) > 0 {
rt = NewBearerAuthRoundTripper(bearerToken, rt)
}
if cfg.BasicAuth != nil {
rt = NewBasicAuthRoundTripper(cfg.BasicAuth.Username, cfg.BasicAuth.Password, rt)
}
// Return a new client with the configured round tripper.
return NewClient(rt), nil
}
// NewDeadlineRoundTripper returns a new http.RoundTripper which will time out
@ -119,6 +152,7 @@ func cloneRequest(r *http.Request) *http.Request {
return r2
}
// NewTLSConfig creates a new tls.Config from the given config.TLSConfig.
func NewTLSConfig(cfg config.TLSConfig) (*tls.Config, error) {
tlsConfig := &tls.Config{InsecureSkipVerify: cfg.InsecureSkipVerify}

37
vendor/github.com/dgryski/go-bits/clz.go generated vendored Normal file
View file

@ -0,0 +1,37 @@
// +build !amd64 appengine
package bits
// Clz counts leading zeroes
func Clz(x uint64) uint64 {
var n uint64
n = 1
if (x >> 32) == 0 {
n = n + 32
x = x << 32
}
if (x >> (32 + 16)) == 0 {
n = n + 16
x = x << 16
}
if (x >> (32 + 16 + 8)) == 0 {
n = n + 8
x = x << 8
}
if (x >> (32 + 16 + 8 + 4)) == 0 {
n = n + 4
x = x << 4
}
if (x >> (32 + 16 + 8 + 4 + 2)) == 0 {
n = n + 2
x = x << 2
}
n = n - (x >> 63)
return uint64(n)
}

39
vendor/github.com/dgryski/go-bits/ctz.go generated vendored Normal file
View file

@ -0,0 +1,39 @@
// +build !amd64 appengine
package bits
// Ctz counts trailing zeroes
func Ctz(x uint64) uint64 {
if x == 0 {
return 64
}
var n uint64
if (x & 0x00000000FFFFFFFF) == 0 {
n = n + 32
x = x >> 32
}
if (x & 0x000000000000FFFF) == 0 {
n = n + 16
x = x >> 16
}
if (x & 0x00000000000000FF) == 0 {
n = n + 8
x = x >> 8
}
if (x & 0x000000000000000F) == 0 {
n = n + 4
x = x >> 4
}
if (x & 0x0000000000000003) == 0 {
n = n + 2
x = x >> 2
}
if (x & 0x0000000000000001) == 0 {
n = n + 1
}
return n
}

15
vendor/github.com/dgryski/go-bits/popcnt.go generated vendored Normal file
View file

@ -0,0 +1,15 @@
// +build !amd64 appengine popcntgo
package bits
// Popcnt counts the number of bits set
func Popcnt(x uint64) uint64 {
// bit population count, see
// http://graphics.stanford.edu/~seander/bithacks.html#CountBitsSetParallel
x -= (x >> 1) & 0x5555555555555555
x = (x>>2)&0x3333333333333333 + x&0x3333333333333333
x += x >> 4
x &= 0x0f0f0f0f0f0f0f0f
x *= 0x0101010101010101
return x >> 56
}

View file

@ -0,0 +1,609 @@
// Package client (v2) is the current official Go client for InfluxDB.
package client // import "github.com/influxdata/influxdb/client/v2"
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb/models"
)
// HTTPConfig is the config data needed to create an HTTP Client.
type HTTPConfig struct {
// Addr should be of the form "http://host:port"
// or "http://[ipv6-host%zone]:port".
Addr string
// Username is the influxdb username, optional.
Username string
// Password is the influxdb password, optional.
Password string
// UserAgent is the http User Agent, defaults to "InfluxDBClient".
UserAgent string
// Timeout for influxdb writes, defaults to no timeout.
Timeout time.Duration
// InsecureSkipVerify gets passed to the http client, if true, it will
// skip https certificate verification. Defaults to false.
InsecureSkipVerify bool
// TLSConfig allows the user to set their own TLS config for the HTTP
// Client. If set, this option overrides InsecureSkipVerify.
TLSConfig *tls.Config
}
// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct.
type BatchPointsConfig struct {
// Precision is the write precision of the points, defaults to "ns".
Precision string
// Database is the database to write points to.
Database string
// RetentionPolicy is the retention policy of the points.
RetentionPolicy string
// Write consistency is the number of servers required to confirm write.
WriteConsistency string
}
// Client is a client interface for writing & querying the database.
type Client interface {
// Ping checks that status of cluster, and will always return 0 time and no
// error for UDP clients.
Ping(timeout time.Duration) (time.Duration, string, error)
// Write takes a BatchPoints object and writes all Points to InfluxDB.
Write(bp BatchPoints) error
// Query makes an InfluxDB Query on the database. This will fail if using
// the UDP client.
Query(q Query) (*Response, error)
// Close releases any resources a Client may be using.
Close() error
}
// NewHTTPClient returns a new Client from the provided config.
// Client is safe for concurrent use by multiple goroutines.
func NewHTTPClient(conf HTTPConfig) (Client, error) {
if conf.UserAgent == "" {
conf.UserAgent = "InfluxDBClient"
}
u, err := url.Parse(conf.Addr)
if err != nil {
return nil, err
} else if u.Scheme != "http" && u.Scheme != "https" {
m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
" must start with http:// or https://", u.Scheme)
return nil, errors.New(m)
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
}
return &client{
url: *u,
username: conf.Username,
password: conf.Password,
useragent: conf.UserAgent,
httpClient: &http.Client{
Timeout: conf.Timeout,
Transport: tr,
},
transport: tr,
}, nil
}
// Ping will check to see if the server is up with an optional timeout on waiting for leader.
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
now := time.Now()
u := c.url
u.Path = "ping"
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return 0, "", err
}
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
if timeout > 0 {
params := req.URL.Query()
params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
req.URL.RawQuery = params.Encode()
}
resp, err := c.httpClient.Do(req)
if err != nil {
return 0, "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, "", err
}
if resp.StatusCode != http.StatusNoContent {
var err = fmt.Errorf(string(body))
return 0, "", err
}
version := resp.Header.Get("X-Influxdb-Version")
return time.Since(now), version, nil
}
// Close releases the client's resources.
func (c *client) Close() error {
c.transport.CloseIdleConnections()
return nil
}
// client is safe for concurrent use as the fields are all read-only
// once the client is instantiated.
type client struct {
// N.B - if url.UserInfo is accessed in future modifications to the
// methods on client, you will need to syncronise access to url.
url url.URL
username string
password string
useragent string
httpClient *http.Client
transport *http.Transport
}
// BatchPoints is an interface into a batched grouping of points to write into
// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
// batch for each goroutine.
type BatchPoints interface {
// AddPoint adds the given point to the Batch of points.
AddPoint(p *Point)
// AddPoints adds the given points to the Batch of points.
AddPoints(ps []*Point)
// Points lists the points in the Batch.
Points() []*Point
// Precision returns the currently set precision of this Batch.
Precision() string
// SetPrecision sets the precision of this batch.
SetPrecision(s string) error
// Database returns the currently set database of this Batch.
Database() string
// SetDatabase sets the database of this Batch.
SetDatabase(s string)
// WriteConsistency returns the currently set write consistency of this Batch.
WriteConsistency() string
// SetWriteConsistency sets the write consistency of this Batch.
SetWriteConsistency(s string)
// RetentionPolicy returns the currently set retention policy of this Batch.
RetentionPolicy() string
// SetRetentionPolicy sets the retention policy of this Batch.
SetRetentionPolicy(s string)
}
// NewBatchPoints returns a BatchPoints interface based on the given config.
func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
if conf.Precision == "" {
conf.Precision = "ns"
}
if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
return nil, err
}
bp := &batchpoints{
database: conf.Database,
precision: conf.Precision,
retentionPolicy: conf.RetentionPolicy,
writeConsistency: conf.WriteConsistency,
}
return bp, nil
}
type batchpoints struct {
points []*Point
database string
precision string
retentionPolicy string
writeConsistency string
}
func (bp *batchpoints) AddPoint(p *Point) {
bp.points = append(bp.points, p)
}
func (bp *batchpoints) AddPoints(ps []*Point) {
bp.points = append(bp.points, ps...)
}
func (bp *batchpoints) Points() []*Point {
return bp.points
}
func (bp *batchpoints) Precision() string {
return bp.precision
}
func (bp *batchpoints) Database() string {
return bp.database
}
func (bp *batchpoints) WriteConsistency() string {
return bp.writeConsistency
}
func (bp *batchpoints) RetentionPolicy() string {
return bp.retentionPolicy
}
func (bp *batchpoints) SetPrecision(p string) error {
if _, err := time.ParseDuration("1" + p); err != nil {
return err
}
bp.precision = p
return nil
}
func (bp *batchpoints) SetDatabase(db string) {
bp.database = db
}
func (bp *batchpoints) SetWriteConsistency(wc string) {
bp.writeConsistency = wc
}
func (bp *batchpoints) SetRetentionPolicy(rp string) {
bp.retentionPolicy = rp
}
// Point represents a single data point.
type Point struct {
pt models.Point
}
// NewPoint returns a point with the given timestamp. If a timestamp is not
// given, then data is sent to the database without a timestamp, in which case
// the server will assign local time upon reception. NOTE: it is recommended to
// send data with a timestamp.
func NewPoint(
name string,
tags map[string]string,
fields map[string]interface{},
t ...time.Time,
) (*Point, error) {
var T time.Time
if len(t) > 0 {
T = t[0]
}
pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
if err != nil {
return nil, err
}
return &Point{
pt: pt,
}, nil
}
// String returns a line-protocol string of the Point.
func (p *Point) String() string {
return p.pt.String()
}
// PrecisionString returns a line-protocol string of the Point,
// with the timestamp formatted for the given precision.
func (p *Point) PrecisionString(precison string) string {
return p.pt.PrecisionString(precison)
}
// Name returns the measurement name of the point.
func (p *Point) Name() string {
return p.pt.Name()
}
// Tags returns the tags associated with the point.
func (p *Point) Tags() map[string]string {
return p.pt.Tags().Map()
}
// Time return the timestamp for the point.
func (p *Point) Time() time.Time {
return p.pt.Time()
}
// UnixNano returns timestamp of the point in nanoseconds since Unix epoch.
func (p *Point) UnixNano() int64 {
return p.pt.UnixNano()
}
// Fields returns the fields for the point.
func (p *Point) Fields() (map[string]interface{}, error) {
return p.pt.Fields()
}
// NewPointFrom returns a point from the provided models.Point.
func NewPointFrom(pt models.Point) *Point {
return &Point{pt: pt}
}
func (c *client) Write(bp BatchPoints) error {
var b bytes.Buffer
for _, p := range bp.Points() {
if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
return err
}
if err := b.WriteByte('\n'); err != nil {
return err
}
}
u := c.url
u.Path = "write"
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", bp.Database())
params.Set("rp", bp.RetentionPolicy())
params.Set("precision", bp.Precision())
params.Set("consistency", bp.WriteConsistency())
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = fmt.Errorf(string(body))
return err
}
return nil
}
// Query defines a query to send to the server.
type Query struct {
Command string
Database string
Precision string
Chunked bool
ChunkSize int
Parameters map[string]interface{}
}
// NewQuery returns a query object.
// The database and precision arguments can be empty strings if they are not needed for the query.
func NewQuery(command, database, precision string) Query {
return Query{
Command: command,
Database: database,
Precision: precision,
Parameters: make(map[string]interface{}),
}
}
// NewQueryWithParameters returns a query object.
// The database and precision arguments can be empty strings if they are not needed for the query.
// parameters is a map of the parameter names used in the command to their values.
func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query {
return Query{
Command: command,
Database: database,
Precision: precision,
Parameters: parameters,
}
}
// Response represents a list of statement results.
type Response struct {
Results []Result
Err string `json:"error,omitempty"`
}
// Error returns the first error from any statement.
// It returns nil if no errors occurred on any statements.
func (r *Response) Error() error {
if r.Err != "" {
return fmt.Errorf(r.Err)
}
for _, result := range r.Results {
if result.Err != "" {
return fmt.Errorf(result.Err)
}
}
return nil
}
// Message represents a user message.
type Message struct {
Level string
Text string
}
// Result represents a resultset returned from a single statement.
type Result struct {
Series []models.Row
Messages []*Message
Err string `json:"error,omitempty"`
}
// Query sends a command to the server and returns the Response.
func (c *client) Query(q Query) (*Response, error) {
u := c.url
u.Path = "query"
jsonParameters, err := json.Marshal(q.Parameters)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("q", q.Command)
params.Set("db", q.Database)
params.Set("params", string(jsonParameters))
if q.Chunked {
params.Set("chunked", "true")
if q.ChunkSize > 0 {
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
}
if q.Precision != "" {
params.Set("epoch", q.Precision)
}
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var response Response
if q.Chunked {
cr := NewChunkedResponse(resp.Body)
for {
r, err := cr.NextResponse()
if err != nil {
// If we got an error while decoding the response, send that back.
return nil, err
}
if r == nil {
break
}
response.Results = append(response.Results, r.Results...)
if r.Err != "" {
response.Err = r.Err
break
}
}
} else {
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)
// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
}
}
// If we don't have an error in our json response, and didn't get statusOK
// then send back an error
if resp.StatusCode != http.StatusOK && response.Error() == nil {
return &response, fmt.Errorf("received status code %d from server",
resp.StatusCode)
}
return &response, nil
}
// duplexReader reads responses and writes it to another writer while
// satisfying the reader interface.
type duplexReader struct {
r io.Reader
w io.Writer
}
func (r *duplexReader) Read(p []byte) (n int, err error) {
n, err = r.r.Read(p)
if err == nil {
r.w.Write(p[:n])
}
return n, err
}
// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
dec *json.Decoder
duplex *duplexReader
buf bytes.Buffer
}
// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
resp := &ChunkedResponse{}
resp.duplex = &duplexReader{r: r, w: &resp.buf}
resp.dec = json.NewDecoder(resp.duplex)
resp.dec.UseNumber()
return resp
}
// NextResponse reads the next line of the stream and returns a response.
func (r *ChunkedResponse) NextResponse() (*Response, error) {
var response Response
if err := r.dec.Decode(&response); err != nil {
if err == io.EOF {
return nil, nil
}
// A decoding error happened. This probably means the server crashed
// and sent a last-ditch error message to us. Ensure we have read the
// entirety of the connection to get any remaining error text.
io.Copy(ioutil.Discard, r.duplex)
return nil, errors.New(strings.TrimSpace(r.buf.String()))
}
r.buf.Reset()
return &response, nil
}

112
vendor/github.com/influxdata/influxdb/client/v2/udp.go generated vendored Normal file
View file

@ -0,0 +1,112 @@
package client
import (
"fmt"
"io"
"net"
"time"
)
const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512
)
// UDPConfig is the config data needed to create a UDP Client.
type UDPConfig struct {
// Addr should be of the form "host:port"
// or "[ipv6-host%zone]:port".
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPPayloadSize.
PayloadSize int
}
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
// service from the given config.
func NewUDPClient(conf UDPConfig) (Client, error) {
var udpAddr *net.UDPAddr
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
payloadSize := conf.PayloadSize
if payloadSize == 0 {
payloadSize = UDPPayloadSize
}
return &udpclient{
conn: conn,
payloadSize: payloadSize,
}, nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
}
type udpclient struct {
conn io.WriteCloser
payloadSize int
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
var d, _ = time.ParseDuration("1" + bp.Precision())
var delayedError error
var checkBuffer = func(n int) {
if len(b) > 0 && len(b)+n > uc.payloadSize {
if _, err := uc.conn.Write(b); err != nil {
delayedError = err
}
b = b[:0]
}
}
for _, p := range bp.Points() {
p.pt.Round(d)
pointSize := p.pt.StringSize() + 1 // include newline in size
//point := p.pt.RoundedString(d) + "\n"
checkBuffer(pointSize)
if p.Time().IsZero() || pointSize <= uc.payloadSize {
b = p.pt.AppendString(b)
b = append(b, '\n')
continue
}
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
for _, sp := range points {
checkBuffer(sp.StringSize() + 1)
b = sp.AppendString(b)
b = append(b, '\n')
}
}
if len(b) > 0 {
if _, err := uc.conn.Write(b); err != nil {
return err
}
}
return delayedError
}
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}

View file

@ -0,0 +1,48 @@
package models
import (
"errors"
"strings"
)
// ConsistencyLevel represent a required replication criteria before a write can
// be returned as successful.
//
// The consistency level is handled in open-source InfluxDB but only applicable to clusters.
type ConsistencyLevel int
const (
// ConsistencyLevelAny allows for hinted handoff, potentially no write happened yet.
ConsistencyLevelAny ConsistencyLevel = iota
// ConsistencyLevelOne requires at least one data node acknowledged a write.
ConsistencyLevelOne
// ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write.
ConsistencyLevelQuorum
// ConsistencyLevelAll requires all data nodes to acknowledge a write.
ConsistencyLevelAll
)
var (
// ErrInvalidConsistencyLevel is returned when parsing the string version
// of a consistency level.
ErrInvalidConsistencyLevel = errors.New("invalid consistency level")
)
// ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const.
func ParseConsistencyLevel(level string) (ConsistencyLevel, error) {
switch strings.ToLower(level) {
case "any":
return ConsistencyLevelAny, nil
case "one":
return ConsistencyLevelOne, nil
case "quorum":
return ConsistencyLevelQuorum, nil
case "all":
return ConsistencyLevelAll, nil
default:
return 0, ErrInvalidConsistencyLevel
}
}

View file

@ -0,0 +1,32 @@
package models // import "github.com/influxdata/influxdb/models"
// from stdlib hash/fnv/fnv.go
const (
prime64 = 1099511628211
offset64 = 14695981039346656037
)
// InlineFNV64a is an alloc-free port of the standard library's fnv64a.
// See https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function.
type InlineFNV64a uint64
// NewInlineFNV64a returns a new instance of InlineFNV64a.
func NewInlineFNV64a() InlineFNV64a {
return offset64
}
// Write adds data to the running hash.
func (s *InlineFNV64a) Write(data []byte) (int, error) {
hash := uint64(*s)
for _, c := range data {
hash ^= uint64(c)
hash *= prime64
}
*s = InlineFNV64a(hash)
return len(data), nil
}
// Sum64 returns the uint64 of the current resulting hash.
func (s *InlineFNV64a) Sum64() uint64 {
return uint64(*s)
}

View file

@ -0,0 +1,38 @@
package models // import "github.com/influxdata/influxdb/models"
import (
"reflect"
"strconv"
"unsafe"
)
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseInt(s, base, bitSize)
}
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
s := unsafeBytesToString(b)
return strconv.ParseFloat(s, bitSize)
}
// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
func parseBoolBytes(b []byte) (bool, error) {
return strconv.ParseBool(unsafeBytesToString(b))
}
// unsafeBytesToString converts a []byte to a string without a heap allocation.
//
// It is unsafe, and is intended to prepare input to short-lived functions
// that require strings.
func unsafeBytesToString(in []byte) string {
src := *(*reflect.SliceHeader)(unsafe.Pointer(&in))
dst := reflect.StringHeader{
Data: src.Data,
Len: src.Len,
}
s := *(*string)(unsafe.Pointer(&dst))
return s
}

2035
vendor/github.com/influxdata/influxdb/models/points.go generated vendored Normal file

File diff suppressed because it is too large Load diff

62
vendor/github.com/influxdata/influxdb/models/rows.go generated vendored Normal file
View file

@ -0,0 +1,62 @@
package models
import (
"sort"
)
// Row represents a single row returned from the execution of a statement.
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
Partial bool `json:"partial,omitempty"`
}
// SameSeries returns true if r contains values for the same series as o.
func (r *Row) SameSeries(o *Row) bool {
return r.tagsHash() == o.tagsHash() && r.Name == o.Name
}
// tagsHash returns a hash of tag key/value pairs.
func (r *Row) tagsHash() uint64 {
h := NewInlineFNV64a()
keys := r.tagsKeys()
for _, k := range keys {
h.Write([]byte(k))
h.Write([]byte(r.Tags[k]))
}
return h.Sum64()
}
// tagKeys returns a sorted list of tag keys.
func (r *Row) tagsKeys() []string {
a := make([]string, 0, len(r.Tags))
for k := range r.Tags {
a = append(a, k)
}
sort.Strings(a)
return a
}
// Rows represents a collection of rows. Rows implements sort.Interface.
type Rows []*Row
// Len implements sort.Interface.
func (p Rows) Len() int { return len(p) }
// Less implements sort.Interface.
func (p Rows) Less(i, j int) bool {
// Sort by name first.
if p[i].Name != p[j].Name {
return p[i].Name < p[j].Name
}
// Sort by tag set hash. Tags don't have a meaningful sort order so we
// just compute a hash and sort by that instead. This allows the tests
// to receive rows in a predictable order every time.
return p[i].tagsHash() < p[j].tagsHash()
}
// Swap implements sort.Interface.
func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View file

@ -0,0 +1,42 @@
package models
// Statistic is the representation of a statistic used by the monitoring service.
type Statistic struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}
// NewStatistic returns an initialized Statistic.
func NewStatistic(name string) Statistic {
return Statistic{
Name: name,
Tags: make(map[string]string),
Values: make(map[string]interface{}),
}
}
// StatisticTags is a map that can be merged with others without causing
// mutations to either map.
type StatisticTags map[string]string
// Merge creates a new map containing the merged contents of tags and t.
// If both tags and the receiver map contain the same key, the value in tags
// is used in the resulting map.
//
// Merge always returns a usable map.
func (t StatisticTags) Merge(tags map[string]string) map[string]string {
// Add everything in tags to the result.
out := make(map[string]string, len(tags))
for k, v := range tags {
out[k] = v
}
// Only add values from t that don't appear in tags.
for k, v := range t {
if _, ok := tags[k]; !ok {
out[k] = v
}
}
return out
}

74
vendor/github.com/influxdata/influxdb/models/time.go generated vendored Normal file
View file

@ -0,0 +1,74 @@
package models
// Helper time methods since parsing time can easily overflow and we only support a
// specific time range.
import (
"fmt"
"math"
"time"
)
const (
// MinNanoTime is the minumum time that can be represented.
//
// 1677-09-21 00:12:43.145224194 +0000 UTC
//
// The two lowest minimum integers are used as sentinel values. The
// minimum value needs to be used as a value lower than any other value for
// comparisons and another separate value is needed to act as a sentinel
// default value that is unusable by the user, but usable internally.
// Because these two values need to be used for a special purpose, we do
// not allow users to write points at these two times.
MinNanoTime = int64(math.MinInt64) + 2
// MaxNanoTime is the maximum time that can be represented.
//
// 2262-04-11 23:47:16.854775806 +0000 UTC
//
// The highest time represented by a nanosecond needs to be used for an
// exclusive range in the shard group, so the maximum time needs to be one
// less than the possible maximum number of nanoseconds representable by an
// int64 so that we don't lose a point at that one time.
MaxNanoTime = int64(math.MaxInt64) - 1
)
var (
minNanoTime = time.Unix(0, MinNanoTime).UTC()
maxNanoTime = time.Unix(0, MaxNanoTime).UTC()
// ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch.
ErrTimeOutOfRange = fmt.Errorf("time outside range %d - %d", MinNanoTime, MaxNanoTime)
)
// SafeCalcTime safely calculates the time given. Will return error if the time is outside the
// supported range.
func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
mult := GetPrecisionMultiplier(precision)
if t, ok := safeSignedMult(timestamp, mult); ok {
tme := time.Unix(0, t).UTC()
return tme, CheckTime(tme)
}
return time.Time{}, ErrTimeOutOfRange
}
// CheckTime checks that a time is within the safe range.
func CheckTime(t time.Time) error {
if t.Before(minNanoTime) || t.After(maxNanoTime) {
return ErrTimeOutOfRange
}
return nil
}
// Perform the multiplication and check to make sure it didn't overflow.
func safeSignedMult(a, b int64) (int64, bool) {
if a == 0 || b == 0 || a == 1 || b == 1 {
return a * b, true
}
if a == MinNanoTime || b == MaxNanoTime {
return 0, false
}
c := a * b
return c, c/b == a
}

View file

@ -0,0 +1,111 @@
// Package escape contains utilities for escaping parts of InfluxQL
// and InfluxDB line protocol.
package escape // import "github.com/influxdata/influxdb/pkg/escape"
import (
"bytes"
"strings"
)
// Codes is a map of bytes to be escaped.
var Codes = map[byte][]byte{
',': []byte(`\,`),
'"': []byte(`\"`),
' ': []byte(`\ `),
'=': []byte(`\=`),
}
// Bytes escapes characters on the input slice, as defined by Codes.
func Bytes(in []byte) []byte {
for b, esc := range Codes {
in = bytes.Replace(in, []byte{b}, esc, -1)
}
return in
}
const escapeChars = `," =`
// IsEscaped returns whether b has any escaped characters,
// i.e. whether b seems to have been processed by Bytes.
func IsEscaped(b []byte) bool {
for len(b) > 0 {
i := bytes.IndexByte(b, '\\')
if i < 0 {
return false
}
if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 {
return true
}
b = b[i+1:]
}
return false
}
// AppendUnescaped appends the unescaped version of src to dst
// and returns the resulting slice.
func AppendUnescaped(dst, src []byte) []byte {
var pos int
for len(src) > 0 {
next := bytes.IndexByte(src[pos:], '\\')
if next < 0 || pos+next+1 >= len(src) {
return append(dst, src...)
}
if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 {
if pos+next > 0 {
dst = append(dst, src[:pos+next]...)
}
src = src[pos+next+1:]
pos = 0
} else {
pos += next + 1
}
}
return dst
}
// Unescape returns a new slice containing the unescaped version of in.
func Unescape(in []byte) []byte {
if len(in) == 0 {
return nil
}
if bytes.IndexByte(in, '\\') == -1 {
return in
}
i := 0
inLen := len(in)
var out []byte
for {
if i >= inLen {
break
}
if in[i] == '\\' && i+1 < inLen {
switch in[i+1] {
case ',':
out = append(out, ',')
i += 2
continue
case '"':
out = append(out, '"')
i += 2
continue
case ' ':
out = append(out, ' ')
i += 2
continue
case '=':
out = append(out, '=')
i += 2
continue
}
}
out = append(out, in[i])
i += 1
}
return out
}

View file

@ -0,0 +1,21 @@
package escape
import "strings"
var (
escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`)
unescaper = strings.NewReplacer(`\,`, `,`, `\"`, `"`, `\ `, ` `, `\=`, `=`)
)
// UnescapeString returns unescaped version of in.
func UnescapeString(in string) string {
if strings.IndexByte(in, '\\') == -1 {
return in
}
return unescaper.Replace(in)
}
// String returns the escaped version of in.
func String(in string) string {
return escaper.Replace(in)
}

View file

@ -1,180 +0,0 @@
package client
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/influxdb/influxdb/tsdb"
)
const (
// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
DefaultTimeout = 0
)
// Config is used to specify what server to connect to.
// URL: The URL of the server connecting to.
// Username/Password are optional. They will be passed via basic auth if provided.
// UserAgent: If not provided, will default "InfluxDBClient",
// Timeout: If not provided, will default to 0 (no timeout)
type Config struct {
URL url.URL
Username string
Password string
UserAgent string
Timeout time.Duration
Precision string
}
// NewConfig will create a config to be used in connecting to the client
func NewConfig() Config {
return Config{
Timeout: DefaultTimeout,
}
}
// Client is used to make calls to the server.
type Client struct {
url url.URL
username string
password string
httpClient *http.Client
userAgent string
precision string
}
const (
ConsistencyOne = "one"
ConsistencyAll = "all"
ConsistencyQuorum = "quorum"
ConsistencyAny = "any"
)
// NewClient will instantiate and return a connected client to issue commands to the server.
func NewClient(c Config) (*Client, error) {
client := Client{
url: c.URL,
username: c.Username,
password: c.Password,
httpClient: &http.Client{Timeout: c.Timeout},
userAgent: c.UserAgent,
precision: c.Precision,
}
if client.userAgent == "" {
client.userAgent = "InfluxDBClient"
}
return &client, nil
}
// Write takes BatchPoints and allows for writing of multiple points with defaults
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) Write(bp BatchPoints) (*Response, error) {
u := c.url
u.Path = "write"
var b bytes.Buffer
for _, p := range bp.Points {
if p.Raw != "" {
if _, err := b.WriteString(p.Raw); err != nil {
return nil, err
}
} else {
for k, v := range bp.Tags {
if p.Tags == nil {
p.Tags = make(map[string]string, len(bp.Tags))
}
p.Tags[k] = v
}
if _, err := b.WriteString(p.MarshalString()); err != nil {
return nil, err
}
}
if err := b.WriteByte('\n'); err != nil {
return nil, err
}
}
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.userAgent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", bp.Database)
params.Set("rp", bp.RetentionPolicy)
params.Set("precision", bp.Precision)
params.Set("consistency", bp.WriteConsistency)
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = fmt.Errorf(string(body))
response.Err = err
return &response, err
}
return nil, nil
}
// Structs
// Response represents a list of statement results.
type Response struct {
Err error
}
// Point defines the fields that will be written to the database
// Measurement, Time, and Fields are required
// Precision can be specified if the time is in epoch format (integer).
// Valid values for Precision are n, u, ms, s, m, and h
type Point struct {
Measurement string
Tags map[string]string
Time time.Time
Fields map[string]interface{}
Precision string
Raw string
}
func (p *Point) MarshalString() string {
return tsdb.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time).String()
}
// BatchPoints is used to send batched data in a single write.
// Database and Points are required
// If no retention policy is specified, it will use the databases default retention policy.
// If tags are specified, they will be "merged" with all points. If a point already has that tag, it is ignored.
// If time is specified, it will be applied to any point with an empty time.
// Precision can be specified if the time is in epoch format (integer).
// Valid values for Precision are n, u, ms, s, m, and h
type BatchPoints struct {
Points []Point `json:"points,omitempty"`
Database string `json:"database,omitempty"`
RetentionPolicy string `json:"retentionPolicy,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Time time.Time `json:"time,omitempty"`
Precision string `json:"precision,omitempty"`
WriteConsistency string `json:"-"`
}

File diff suppressed because it is too large Load diff

15
vendor/vendor.json vendored
View file

@ -490,6 +490,21 @@
"revision": "1d4fa605f6ff3ed628d7ae5eda7c0e56803e72a5",
"revisionTime": "2016-10-07T00:41:22Z"
},
{
"path": "github.com/influxdata/influxdb/client/v2",
"revision": "15e594fc09f112cb696c084a20beaca25538a5fa",
"revisionTime": "2017-03-31T16:09:02-05:00"
},
{
"path": "github.com/influxdata/influxdb/models",
"revision": "15e594fc09f112cb696c084a20beaca25538a5fa",
"revisionTime": "2017-03-31T16:09:02-05:00"
},
{
"path": "github.com/influxdata/influxdb/pkg/escape",
"revision": "15e594fc09f112cb696c084a20beaca25538a5fa",
"revisionTime": "2017-03-31T16:09:02-05:00"
},
{
"checksumSHA1": "0ZrwvB6KoGPj2PoDNSEJwxQ6Mog=",
"path": "github.com/jmespath/go-jmespath",

View file

@ -113,6 +113,16 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
sort.Sort(byName(vec))
externalLabels := h.externalLabels.Clone()
if _, ok := externalLabels[model.InstanceLabel]; !ok {
externalLabels[model.InstanceLabel] = ""
}
externalLabelNames := make(model.LabelNames, 0, len(externalLabels))
for ln := range externalLabels {
externalLabelNames = append(externalLabelNames, ln)
}
sort.Sort(externalLabelNames)
var (
lastMetricName string
protMetricFam *dto.MetricFamily

View file

@ -27,10 +27,11 @@ import (
)
var scenarios = map[string]struct {
params string
accept string
code int
body string
params string
accept string
externalLabels model.LabelSet
code int
body string
}{
"empty": {
params: "",
@ -58,72 +59,100 @@ var scenarios = map[string]struct {
params: "match[]=test_metric1",
code: 200,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar"} 10000 6000000
test_metric1{foo="boo"} 1 6000000
test_metric1{foo="bar",instance="i"} 10000 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
`,
},
"test_metric2": {
params: "match[]=test_metric2",
code: 200,
body: `# TYPE test_metric2 untyped
test_metric2{foo="boo"} 1 6000000
test_metric2{foo="boo",instance="i"} 1 6000000
`,
},
"test_metric_without_labels": {
params: "match[]=test_metric_without_labels",
code: 200,
body: `# TYPE test_metric_without_labels untyped
test_metric_without_labels 1001 6000000
test_metric_without_labels{instance=""} 1001 6000000
`,
},
"{foo='boo'}": {
params: "match[]={foo='boo'}",
code: 200,
body: `# TYPE test_metric1 untyped
test_metric1{foo="boo"} 1 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
# TYPE test_metric2 untyped
test_metric2{foo="boo"} 1 6000000
test_metric2{foo="boo",instance="i"} 1 6000000
`,
},
"two matchers": {
params: "match[]=test_metric1&match[]=test_metric2",
code: 200,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar"} 10000 6000000
test_metric1{foo="boo"} 1 6000000
test_metric1{foo="bar",instance="i"} 10000 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
# TYPE test_metric2 untyped
test_metric2{foo="boo"} 1 6000000
test_metric2{foo="boo",instance="i"} 1 6000000
`,
},
"everything": {
params: "match[]={__name__=~'.%2b'}", // '%2b' is an URL-encoded '+'.
code: 200,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar"} 10000 6000000
test_metric1{foo="boo"} 1 6000000
test_metric1{foo="bar",instance="i"} 10000 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
# TYPE test_metric2 untyped
test_metric2{foo="boo"} 1 6000000
test_metric2{foo="boo",instance="i"} 1 6000000
# TYPE test_metric_without_labels untyped
test_metric_without_labels 1001 6000000
test_metric_without_labels{instance=""} 1001 6000000
`,
},
"empty label value matches everything that doesn't have that label": {
params: "match[]={foo='',__name__=~'.%2b'}",
code: 200,
body: `# TYPE test_metric_without_labels untyped
test_metric_without_labels 1001 6000000
test_metric_without_labels{instance=""} 1001 6000000
`,
},
"empty label value for a label that doesn't exist at all, matches everything": {
params: "match[]={bar='',__name__=~'.%2b'}",
code: 200,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar"} 10000 6000000
test_metric1{foo="boo"} 1 6000000
test_metric1{foo="bar",instance="i"} 10000 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
# TYPE test_metric2 untyped
test_metric2{foo="boo"} 1 6000000
test_metric2{foo="boo",instance="i"} 1 6000000
# TYPE test_metric_without_labels untyped
test_metric_without_labels 1001 6000000
test_metric_without_labels{instance=""} 1001 6000000
`,
},
"external labels are added if not already present": {
params: "match[]={__name__=~'.%2b'}", // '%2b' is an URL-encoded '+'.
externalLabels: model.LabelSet{"zone": "ie", "foo": "baz"},
code: 200,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar",instance="i",zone="ie"} 10000 6000000
test_metric1{foo="boo",instance="i",zone="ie"} 1 6000000
# TYPE test_metric2 untyped
test_metric2{foo="boo",instance="i",zone="ie"} 1 6000000
# TYPE test_metric_without_labels untyped
test_metric_without_labels{foo="baz",instance="",zone="ie"} 1001 6000000
`,
},
"instance is an external label": {
// This makes no sense as a configuration, but we should
// know what it does anyway.
params: "match[]={__name__=~'.%2b'}", // '%2b' is an URL-encoded '+'.
externalLabels: model.LabelSet{"instance": "baz"},
code: 200,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar",instance="i"} 10000 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
# TYPE test_metric2 untyped
test_metric2{foo="boo",instance="i"} 1 6000000
# TYPE test_metric_without_labels untyped
test_metric_without_labels{instance="baz"} 1001 6000000
`,
},
}
@ -131,9 +160,9 @@ test_metric_without_labels 1001 6000000
func TestFederation(t *testing.T) {
suite, err := promql.NewTest(t, `
load 1m
test_metric1{foo="bar"} 0+100x100
test_metric1{foo="boo"} 1+0x100
test_metric2{foo="boo"} 1+0x100
test_metric1{foo="bar",instance="i"} 0+100x100
test_metric1{foo="boo",instance="i"} 1+0x100
test_metric2{foo="boo",instance="i"} 1+0x100
test_metric_without_labels 1+10x100
`)
if err != nil {
@ -152,6 +181,7 @@ func TestFederation(t *testing.T) {
}
for name, scenario := range scenarios {
h.externalLabels = scenario.externalLabels
req, err := http.ReadRequest(bufio.NewReader(strings.NewReader(
"GET http://example.org/federate?" + scenario.params + " HTTP/1.0\r\n\r\n",
)))
@ -174,7 +204,7 @@ func TestFederation(t *testing.T) {
t.Errorf("Scenario %q: got code %d, want %d", name, got, want)
}
if got, want := normalizeBody(res.Body), scenario.body; got != want {
t.Errorf("Scenario %q: got body %q, want %q", name, got, want)
t.Errorf("Scenario %q: got body %s, want %s", name, got, want)
}
}
}

File diff suppressed because one or more lines are too long

View file

@ -415,7 +415,8 @@ Prometheus.Graph.prototype.submitQuery = function() {
return;
}
var duration = new Date().getTime() - startTime;
self.evalStats.html("Load time: " + duration + "ms <br /> Resolution: " + resolution + "s");
var totalTimeSeries = xhr.responseJSON.data.result.length;
self.evalStats.html("Load time: " + duration + "ms <br /> Resolution: " + resolution + "s <br />" + "Total time series: " + totalTimeSeries);
self.spinner.hide();
}
});

View file

@ -41,6 +41,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules"
@ -379,6 +380,12 @@ func (h *Handler) targets(w http.ResponseWriter, r *http.Request) {
tps[job] = append(tps[job], t)
}
for _, targets := range tps {
sort.Slice(targets, func(i, j int) bool {
return targets[i].Labels().Get(labels.InstanceName) < targets[j].Labels().Get(labels.InstanceName)
})
}
h.executeTemplate(w, "targets.html", struct {
TargetPools map[string][]*retrieval.Target
}{