mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-24 21:24:05 -08:00
remote: remove hard-coded remote storages
This commit removes the flag-configured remote storage integrations in favor of the generic remote write path.
This commit is contained in:
parent
93b70ee4ea
commit
11a731ba82
|
@ -32,7 +32,6 @@ import (
|
|||
"github.com/prometheus/prometheus/storage/local"
|
||||
"github.com/prometheus/prometheus/storage/local/chunk"
|
||||
"github.com/prometheus/prometheus/storage/local/index"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
"github.com/prometheus/prometheus/web"
|
||||
)
|
||||
|
||||
|
@ -50,7 +49,6 @@ var cfg = struct {
|
|||
notifierTimeout time.Duration
|
||||
queryEngine promql.EngineOptions
|
||||
web web.Options
|
||||
remote remote.Options
|
||||
|
||||
alertmanagerURLs stringset
|
||||
prometheusURL string
|
||||
|
@ -188,45 +186,6 @@ func init() {
|
|||
"Local storage engine. Supported values are: 'persisted' (full local storage with on-disk persistence) and 'none' (no local storage).",
|
||||
)
|
||||
|
||||
// Remote storage.
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.GraphiteAddress, "storage.remote.graphite-address", "",
|
||||
"The host:port of the remote Graphite server to send samples to. None, if empty.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.GraphiteTransport, "storage.remote.graphite-transport", "tcp",
|
||||
"Transport protocol to use to communicate with Graphite. 'tcp', if empty.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.GraphitePrefix, "storage.remote.graphite-prefix", "",
|
||||
"The prefix to prepend to all metrics exported to Graphite. None, if empty.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.OpentsdbURL, "storage.remote.opentsdb-url", "",
|
||||
"The URL of the remote OpenTSDB server to send samples to. None, if empty.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.influxdbURL, "storage.remote.influxdb-url", "",
|
||||
"The URL of the remote InfluxDB server to send samples to. None, if empty.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.InfluxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default",
|
||||
"The InfluxDB retention policy to use.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.InfluxdbUsername, "storage.remote.influxdb.username", "",
|
||||
"The username to use when sending samples to InfluxDB. The corresponding password must be provided via the INFLUXDB_PW environment variable.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus",
|
||||
"The name of the database to use for storing samples in InfluxDB.",
|
||||
)
|
||||
|
||||
cfg.fs.DurationVar(
|
||||
&cfg.remote.StorageTimeout, "storage.remote.timeout", 30*time.Second,
|
||||
"The timeout to use when sending samples to the remote storage.",
|
||||
)
|
||||
|
||||
// Alertmanager.
|
||||
cfg.fs.Var(
|
||||
&cfg.alertmanagerURLs, "alertmanager.url",
|
||||
|
@ -285,17 +244,12 @@ func parse(args []string) error {
|
|||
// RoutePrefix must always be at least '/'.
|
||||
cfg.web.RoutePrefix = "/" + strings.Trim(cfg.web.RoutePrefix, "/")
|
||||
|
||||
if err := parseInfluxdbURL(); err != nil {
|
||||
return err
|
||||
}
|
||||
for u := range cfg.alertmanagerURLs {
|
||||
if err := validateAlertmanagerURL(u); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cfg.remote.InfluxdbPassword = os.Getenv("INFLUXDB_PW")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -330,24 +284,6 @@ func parsePrometheusURL() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func parseInfluxdbURL() error {
|
||||
if cfg.influxdbURL == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if ok := govalidator.IsURL(cfg.influxdbURL); !ok {
|
||||
return fmt.Errorf("invalid InfluxDB URL: %s", cfg.influxdbURL)
|
||||
}
|
||||
|
||||
url, err := url.Parse(cfg.influxdbURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cfg.remote.InfluxdbURL = url
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateAlertmanagerURL(u string) error {
|
||||
if u == "" {
|
||||
return nil
|
||||
|
|
|
@ -94,17 +94,7 @@ func Main() int {
|
|||
return 1
|
||||
}
|
||||
|
||||
remoteStorage, err := remote.New(&cfg.remote)
|
||||
if err != nil {
|
||||
log.Errorf("Error initializing remote storage: %s", err)
|
||||
return 1
|
||||
}
|
||||
if remoteStorage != nil {
|
||||
sampleAppender = append(sampleAppender, remoteStorage)
|
||||
reloadables = append(reloadables, remoteStorage)
|
||||
}
|
||||
|
||||
reloadableRemoteStorage := remote.NewConfigurable()
|
||||
reloadableRemoteStorage := remote.New()
|
||||
sampleAppender = append(sampleAppender, reloadableRemoteStorage)
|
||||
reloadables = append(reloadables, reloadableRemoteStorage)
|
||||
|
||||
|
@ -190,11 +180,6 @@ func Main() int {
|
|||
}
|
||||
}()
|
||||
|
||||
if remoteStorage != nil {
|
||||
remoteStorage.Start()
|
||||
defer remoteStorage.Stop()
|
||||
}
|
||||
|
||||
defer reloadableRemoteStorage.Stop()
|
||||
|
||||
// The storage has to be fully initialized before registering.
|
||||
|
|
|
@ -1,107 +0,0 @@
|
|||
// Copyright 2015 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package graphite
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math"
|
||||
"net"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
// Client allows sending batches of Prometheus samples to Graphite.
|
||||
type Client struct {
|
||||
address string
|
||||
transport string
|
||||
timeout time.Duration
|
||||
prefix string
|
||||
}
|
||||
|
||||
// NewClient creates a new Client.
|
||||
func NewClient(address string, transport string, timeout time.Duration, prefix string) *Client {
|
||||
return &Client{
|
||||
address: address,
|
||||
transport: transport,
|
||||
timeout: timeout,
|
||||
prefix: prefix,
|
||||
}
|
||||
}
|
||||
|
||||
func pathFromMetric(m model.Metric, prefix string) string {
|
||||
var buffer bytes.Buffer
|
||||
|
||||
buffer.WriteString(prefix)
|
||||
buffer.WriteString(escape(m[model.MetricNameLabel]))
|
||||
|
||||
// We want to sort the labels.
|
||||
labels := make(model.LabelNames, 0, len(m))
|
||||
for l := range m {
|
||||
labels = append(labels, l)
|
||||
}
|
||||
sort.Sort(labels)
|
||||
|
||||
// For each label, in order, add ".<label>.<value>".
|
||||
for _, l := range labels {
|
||||
v := m[l]
|
||||
|
||||
if l == model.MetricNameLabel || len(l) == 0 {
|
||||
continue
|
||||
}
|
||||
// Since we use '.' instead of '=' to separate label and values
|
||||
// it means that we can't have an '.' in the metric name. Fortunately
|
||||
// this is prohibited in prometheus metrics.
|
||||
buffer.WriteString(fmt.Sprintf(
|
||||
".%s.%s", string(l), escape(v)))
|
||||
}
|
||||
return buffer.String()
|
||||
}
|
||||
|
||||
// Store sends a batch of samples to Graphite.
|
||||
func (c *Client) Store(samples model.Samples) error {
|
||||
conn, err := net.DialTimeout(c.transport, c.address, c.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
var buf bytes.Buffer
|
||||
for _, s := range samples {
|
||||
k := pathFromMetric(s.Metric, c.prefix)
|
||||
t := float64(s.Timestamp.UnixNano()) / 1e9
|
||||
v := float64(s.Value)
|
||||
if math.IsNaN(v) || math.IsInf(v, 0) {
|
||||
log.Warnf("cannot send value %f to Graphite,"+
|
||||
"skipping sample %#v", v, s)
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(&buf, "%s %f %f\n", k, v, t)
|
||||
}
|
||||
|
||||
_, err = conn.Write(buf.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Name identifies the client as a Graphite client.
|
||||
func (c Client) Name() string {
|
||||
return "graphite"
|
||||
}
|
|
@ -1,57 +0,0 @@
|
|||
// Copyright 2015 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package graphite
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
var (
|
||||
metric = model.Metric{
|
||||
model.MetricNameLabel: "test:metric",
|
||||
"testlabel": "test:value",
|
||||
"many_chars": "abc!ABC:012-3!45ö67~89./(){},=.\"\\",
|
||||
}
|
||||
)
|
||||
|
||||
func TestEscape(t *testing.T) {
|
||||
// Can we correctly keep and escape valid chars.
|
||||
value := "abzABZ019(){},'\"\\"
|
||||
expected := "abzABZ019\\(\\)\\{\\}\\,\\'\\\"\\\\"
|
||||
actual := escape(model.LabelValue(value))
|
||||
if expected != actual {
|
||||
t.Errorf("Expected %s, got %s", expected, actual)
|
||||
}
|
||||
|
||||
// Test percent-encoding.
|
||||
value = "é/|_;:%."
|
||||
expected = "%C3%A9%2F|_;:%25%2E"
|
||||
actual = escape(model.LabelValue(value))
|
||||
if expected != actual {
|
||||
t.Errorf("Expected %s, got %s", expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPathFromMetric(t *testing.T) {
|
||||
expected := ("prefix." +
|
||||
"test:metric" +
|
||||
".many_chars.abc!ABC:012-3!45%C3%B667~89%2E%2F\\(\\)\\{\\}\\,%3D%2E\\\"\\\\" +
|
||||
".testlabel.test:value")
|
||||
actual := pathFromMetric(metric, "prefix.")
|
||||
if expected != actual {
|
||||
t.Errorf("Expected %s, got %s", expected, actual)
|
||||
}
|
||||
}
|
|
@ -1,103 +0,0 @@
|
|||
// Copyright 2015 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package graphite
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
const (
|
||||
// From https://github.com/graphite-project/graphite-web/blob/master/webapp/graphite/render/grammar.py#L83
|
||||
symbols = "(){},=.'\"\\"
|
||||
printables = ("0123456789abcdefghijklmnopqrstuvwxyz" +
|
||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
|
||||
"!\"#$%&\\'()*+,-./:;<=>?@[\\]^_`{|}~")
|
||||
)
|
||||
|
||||
// Graphite doesn't support tags, so label names and values must be
|
||||
// encoded into the metric path. The list of characters that are usable
|
||||
// with Graphite is rather fuzzy. One 'source of truth' might be the grammar
|
||||
// used to parse requests in the webapp:
|
||||
// https://github.com/graphite-project/graphite-web/blob/master/webapp/graphite/render/grammar.py#L83
|
||||
// The list of valid symbols is defined as:
|
||||
// legal = printables - symbols + escaped(symbols)
|
||||
//
|
||||
// The default storage backend for Graphite (whisper) stores data
|
||||
// in filenames, so we also need to use only valid filename characters.
|
||||
// Fortunately on UNIX only '/' isn't, and Windows is completely unsupported
|
||||
// by Graphite: http://graphite.readthedocs.org/en/latest/install.html#windows-users
|
||||
|
||||
// escape escapes a model.LabelValue into runes allowed in Graphite. The runes
|
||||
// allowed in Graphite are all single-byte. This function encodes the arbitrary
|
||||
// byte sequence found in this TagValue in way very similar to the traditional
|
||||
// percent-encoding (https://en.wikipedia.org/wiki/Percent-encoding):
|
||||
//
|
||||
// - The string that underlies TagValue is scanned byte by byte.
|
||||
//
|
||||
// - If a byte represents a legal Graphite rune with the exception of '%', '/',
|
||||
// '=' and '.', that byte is directly copied to the resulting byte slice.
|
||||
// % is used for percent-encoding of other bytes.
|
||||
// / is not usable in filenames.
|
||||
// = is used when generating the path to associate values to labels.
|
||||
// . already means something for Graphite and thus can't be used in a value.
|
||||
//
|
||||
// - If the byte is any of (){},=.'"\, then a '\' will be prepended to it. We
|
||||
// do not percent-encode them since they are explicitly usable in this
|
||||
// way in Graphite.
|
||||
//
|
||||
// - All other bytes are replaced by '%' followed by two bytes containing the
|
||||
// uppercase ASCII representation of their hexadecimal value.
|
||||
//
|
||||
// This encoding allows to save arbitrary Go strings in Graphite. That's
|
||||
// required because Prometheus label values can contain anything. Using
|
||||
// percent encoding makes it easy to unescape, even in javascript.
|
||||
//
|
||||
// Examples:
|
||||
//
|
||||
// "foo-bar-42" -> "foo-bar-42"
|
||||
//
|
||||
// "foo_bar%42" -> "foo_bar%2542"
|
||||
//
|
||||
// "http://example.org:8080" -> "http:%2F%2Fexample%2Eorg:8080"
|
||||
//
|
||||
// "Björn's email: bjoern@soundcloud.com" ->
|
||||
// "Bj%C3%B6rn's%20email:%20bjoern%40soundcloud.com"
|
||||
//
|
||||
// "日" -> "%E6%97%A5"
|
||||
func escape(tv model.LabelValue) string {
|
||||
length := len(tv)
|
||||
result := bytes.NewBuffer(make([]byte, 0, length))
|
||||
for i := 0; i < length; i++ {
|
||||
b := tv[i]
|
||||
switch {
|
||||
// . is reserved by graphite, % is used to escape other bytes.
|
||||
case b == '.' || b == '%' || b == '/' || b == '=':
|
||||
fmt.Fprintf(result, "%%%X", b)
|
||||
// These symbols are ok only if backslash escaped.
|
||||
case strings.IndexByte(symbols, b) != -1:
|
||||
result.WriteString("\\" + string(b))
|
||||
// These are all fine.
|
||||
case strings.IndexByte(printables, b) != -1:
|
||||
result.WriteByte(b)
|
||||
// Defaults to percent-encoding.
|
||||
default:
|
||||
fmt.Fprintf(result, "%%%X", b)
|
||||
}
|
||||
}
|
||||
return result.String()
|
||||
}
|
|
@ -1,109 +0,0 @@
|
|||
// Copyright 2015 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"math"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
influx "github.com/influxdb/influxdb/client"
|
||||
)
|
||||
|
||||
// Client allows sending batches of Prometheus samples to InfluxDB.
|
||||
type Client struct {
|
||||
client *influx.Client
|
||||
database string
|
||||
retentionPolicy string
|
||||
ignoredSamples prometheus.Counter
|
||||
}
|
||||
|
||||
// NewClient creates a new Client.
|
||||
func NewClient(conf influx.Config, db string, rp string) *Client {
|
||||
c, err := influx.NewClient(conf)
|
||||
// Currently influx.NewClient() *should* never return an error.
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
return &Client{
|
||||
client: c,
|
||||
database: db,
|
||||
retentionPolicy: rp,
|
||||
ignoredSamples: prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_influxdb_ignored_samples_total",
|
||||
Help: "The total number of samples not sent to InfluxDB due to unsupported float values (Inf, -Inf, NaN).",
|
||||
},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// tagsFromMetric extracts InfluxDB tags from a Prometheus metric.
|
||||
func tagsFromMetric(m model.Metric) map[string]string {
|
||||
tags := make(map[string]string, len(m)-1)
|
||||
for l, v := range m {
|
||||
if l != model.MetricNameLabel {
|
||||
tags[string(l)] = string(v)
|
||||
}
|
||||
}
|
||||
return tags
|
||||
}
|
||||
|
||||
// Store sends a batch of samples to InfluxDB via its HTTP API.
|
||||
func (c *Client) Store(samples model.Samples) error {
|
||||
points := make([]influx.Point, 0, len(samples))
|
||||
for _, s := range samples {
|
||||
v := float64(s.Value)
|
||||
if math.IsNaN(v) || math.IsInf(v, 0) {
|
||||
log.Debugf("cannot send value %f to InfluxDB, skipping sample %#v", v, s)
|
||||
c.ignoredSamples.Inc()
|
||||
continue
|
||||
}
|
||||
points = append(points, influx.Point{
|
||||
Measurement: string(s.Metric[model.MetricNameLabel]),
|
||||
Tags: tagsFromMetric(s.Metric),
|
||||
Time: s.Timestamp.Time(),
|
||||
Precision: "ms",
|
||||
Fields: map[string]interface{}{
|
||||
"value": v,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
bps := influx.BatchPoints{
|
||||
Points: points,
|
||||
Database: c.database,
|
||||
RetentionPolicy: c.retentionPolicy,
|
||||
}
|
||||
_, err := c.client.Write(bps)
|
||||
return err
|
||||
}
|
||||
|
||||
// Name identifies the client as an InfluxDB client.
|
||||
func (c Client) Name() string {
|
||||
return "influxdb"
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (c *Client) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- c.ignoredSamples.Desc()
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (c *Client) Collect(ch chan<- prometheus.Metric) {
|
||||
ch <- c.ignoredSamples
|
||||
}
|
|
@ -1,111 +0,0 @@
|
|||
// Copyright 2015 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
influx "github.com/influxdb/influxdb/client"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
func TestClient(t *testing.T) {
|
||||
samples := model.Samples{
|
||||
{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "testmetric",
|
||||
"test_label": "test_label_value1",
|
||||
},
|
||||
Timestamp: model.Time(123456789123),
|
||||
Value: 1.23,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "testmetric",
|
||||
"test_label": "test_label_value2",
|
||||
},
|
||||
Timestamp: model.Time(123456789123),
|
||||
Value: 5.1234,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "nan_value",
|
||||
},
|
||||
Timestamp: model.Time(123456789123),
|
||||
Value: model.SampleValue(math.NaN()),
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "pos_inf_value",
|
||||
},
|
||||
Timestamp: model.Time(123456789123),
|
||||
Value: model.SampleValue(math.Inf(1)),
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "neg_inf_value",
|
||||
},
|
||||
Timestamp: model.Time(123456789123),
|
||||
Value: model.SampleValue(math.Inf(-1)),
|
||||
},
|
||||
}
|
||||
|
||||
expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123000000
|
||||
testmetric,test_label=test_label_value2 value=5.1234 123456789123000000
|
||||
`
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(
|
||||
func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
t.Fatalf("Unexpected method; expected POST, got %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/write" {
|
||||
t.Fatalf("Unexpected path; expected %s, got %s", "/write", r.URL.Path)
|
||||
}
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading body: %s", err)
|
||||
}
|
||||
|
||||
if string(b) != expectedBody {
|
||||
t.Fatalf("Unexpected request body; expected:\n\n%s\n\ngot:\n\n%s", expectedBody, string(b))
|
||||
}
|
||||
},
|
||||
))
|
||||
defer server.Close()
|
||||
|
||||
serverURL, err := url.Parse(server.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to parse server URL %s: %s", server.URL, err)
|
||||
}
|
||||
|
||||
conf := influx.Config{
|
||||
URL: *serverURL,
|
||||
Username: "testuser",
|
||||
Password: "testpass",
|
||||
Timeout: time.Minute,
|
||||
}
|
||||
c := NewClient(conf, "test_db", "default")
|
||||
|
||||
if err := c.Store(samples); err != nil {
|
||||
t.Fatalf("Error sending samples: %s", err)
|
||||
}
|
||||
}
|
|
@ -1,136 +0,0 @@
|
|||
// Copyright 2013 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package opentsdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/log"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/util/httputil"
|
||||
)
|
||||
|
||||
const (
|
||||
putEndpoint = "/api/put"
|
||||
contentTypeJSON = "application/json"
|
||||
)
|
||||
|
||||
// Client allows sending batches of Prometheus samples to OpenTSDB.
|
||||
type Client struct {
|
||||
url string
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// NewClient creates a new Client.
|
||||
func NewClient(url string, timeout time.Duration) *Client {
|
||||
return &Client{
|
||||
url: url,
|
||||
httpClient: httputil.NewDeadlineClient(timeout, nil),
|
||||
}
|
||||
}
|
||||
|
||||
// StoreSamplesRequest is used for building a JSON request for storing samples
|
||||
// via the OpenTSDB.
|
||||
type StoreSamplesRequest struct {
|
||||
Metric TagValue `json:"metric"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Value float64 `json:"value"`
|
||||
Tags map[string]TagValue `json:"tags"`
|
||||
}
|
||||
|
||||
// tagsFromMetric translates Prometheus metric into OpenTSDB tags.
|
||||
func tagsFromMetric(m model.Metric) map[string]TagValue {
|
||||
tags := make(map[string]TagValue, len(m)-1)
|
||||
for l, v := range m {
|
||||
if l == model.MetricNameLabel {
|
||||
continue
|
||||
}
|
||||
tags[string(l)] = TagValue(v)
|
||||
}
|
||||
return tags
|
||||
}
|
||||
|
||||
// Store sends a batch of samples to OpenTSDB via its HTTP API.
|
||||
func (c *Client) Store(samples model.Samples) error {
|
||||
reqs := make([]StoreSamplesRequest, 0, len(samples))
|
||||
for _, s := range samples {
|
||||
v := float64(s.Value)
|
||||
if math.IsNaN(v) || math.IsInf(v, 0) {
|
||||
log.Warnf("cannot send value %f to OpenTSDB, skipping sample %#v", v, s)
|
||||
continue
|
||||
}
|
||||
metric := TagValue(s.Metric[model.MetricNameLabel])
|
||||
reqs = append(reqs, StoreSamplesRequest{
|
||||
Metric: metric,
|
||||
Timestamp: s.Timestamp.Unix(),
|
||||
Value: v,
|
||||
Tags: tagsFromMetric(s.Metric),
|
||||
})
|
||||
}
|
||||
|
||||
u, err := url.Parse(c.url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
u.Path = putEndpoint
|
||||
|
||||
buf, err := json.Marshal(reqs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Post(
|
||||
u.String(),
|
||||
contentTypeJSON,
|
||||
bytes.NewBuffer(buf),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// API returns status code 204 for successful writes.
|
||||
// http://opentsdb.net/docs/build/html/api_http/put.html
|
||||
if resp.StatusCode == http.StatusNoContent {
|
||||
return nil
|
||||
}
|
||||
|
||||
// API returns status code 400 on error, encoding error details in the
|
||||
// response content in JSON.
|
||||
buf, err = ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var r map[string]int
|
||||
if err := json.Unmarshal(buf, &r); err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"])
|
||||
}
|
||||
|
||||
// Name identifies the client as an OpenTSDB client.
|
||||
func (c Client) Name() string {
|
||||
return "opentsdb"
|
||||
}
|
|
@ -1,75 +0,0 @@
|
|||
// Copyright 2013 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package opentsdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
var (
|
||||
metric = model.Metric{
|
||||
model.MetricNameLabel: "test:metric",
|
||||
"testlabel": "test:value",
|
||||
"many_chars": "abc!ABC:012-3!45ö67~89./",
|
||||
}
|
||||
)
|
||||
|
||||
func TestTagsFromMetric(t *testing.T) {
|
||||
expected := map[string]TagValue{
|
||||
"testlabel": TagValue("test:value"),
|
||||
"many_chars": TagValue("abc!ABC:012-3!45ö67~89./"),
|
||||
}
|
||||
actual := tagsFromMetric(metric)
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("Expected %#v, got %#v", expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalStoreSamplesRequest(t *testing.T) {
|
||||
request := StoreSamplesRequest{
|
||||
Metric: TagValue("test:metric"),
|
||||
Timestamp: 4711,
|
||||
Value: 3.1415,
|
||||
Tags: tagsFromMetric(metric),
|
||||
}
|
||||
expectedJSON := []byte(`{"metric":"test_.metric","timestamp":4711,"value":3.1415,"tags":{"many_chars":"abc_21ABC_.012-3_2145_C3_B667_7E89./","testlabel":"test_.value"}}`)
|
||||
|
||||
resultingJSON, err := json.Marshal(request)
|
||||
if err != nil {
|
||||
t.Fatalf("Marshal(request) resulted in err: %s", err)
|
||||
}
|
||||
if !bytes.Equal(resultingJSON, expectedJSON) {
|
||||
t.Errorf(
|
||||
"Marshal(request) => %q, want %q",
|
||||
resultingJSON, expectedJSON,
|
||||
)
|
||||
}
|
||||
|
||||
var unmarshaledRequest StoreSamplesRequest
|
||||
err = json.Unmarshal(expectedJSON, &unmarshaledRequest)
|
||||
if err != nil {
|
||||
t.Fatalf("Unmarshal(expectedJSON, &unmarshaledRequest) resulted in err: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(unmarshaledRequest, request) {
|
||||
t.Errorf(
|
||||
"Unmarshal(expectedJSON, &unmarshaledRequest) => %#v, want %#v",
|
||||
unmarshaledRequest, request,
|
||||
)
|
||||
}
|
||||
}
|
|
@ -1,157 +0,0 @@
|
|||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package opentsdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
// TagValue is a model.LabelValue that implements json.Marshaler and
|
||||
// json.Unmarshaler. These implementations avoid characters illegal in
|
||||
// OpenTSDB. See the MarshalJSON for details. TagValue is used for the values of
|
||||
// OpenTSDB tags as well as for OpenTSDB metric names.
|
||||
type TagValue model.LabelValue
|
||||
|
||||
// MarshalJSON marshals this TagValue into JSON that only contains runes allowed
|
||||
// in OpenTSDB. It implements json.Marshaler. The runes allowed in OpenTSDB are
|
||||
// all single-byte. This function encodes the arbitrary byte sequence found in
|
||||
// this TagValue in the following way:
|
||||
//
|
||||
// - The string that underlies TagValue is scanned byte by byte.
|
||||
//
|
||||
// - If a byte represents a legal OpenTSDB rune with the exception of '_', that
|
||||
// byte is directly copied to the resulting JSON byte slice.
|
||||
//
|
||||
// - If '_' is encountered, it is replaced by '__'.
|
||||
//
|
||||
// - If ':' is encountered, it is replaced by '_.'.
|
||||
//
|
||||
// - All other bytes are replaced by '_' followed by two bytes containing the
|
||||
// uppercase ASCII representation of their hexadecimal value.
|
||||
//
|
||||
// This encoding allows to save arbitrary Go strings in OpenTSDB. That's
|
||||
// required because Prometheus label values can contain anything, and even
|
||||
// Prometheus metric names may (and often do) contain ':' (which is disallowed
|
||||
// in OpenTSDB strings). The encoding uses '_' as an escape character and
|
||||
// renders a ':' more or less recognizable as '_.'
|
||||
//
|
||||
// Examples:
|
||||
//
|
||||
// "foo-bar-42" -> "foo-bar-42"
|
||||
//
|
||||
// "foo_bar_42" -> "foo__bar__42"
|
||||
//
|
||||
// "http://example.org:8080" -> "http_.//example.org_.8080"
|
||||
//
|
||||
// "Björn's email: bjoern@soundcloud.com" ->
|
||||
// "Bj_C3_B6rn_27s_20email_._20bjoern_40soundcloud.com"
|
||||
//
|
||||
// "日" -> "_E6_97_A5"
|
||||
func (tv TagValue) MarshalJSON() ([]byte, error) {
|
||||
length := len(tv)
|
||||
// Need at least two more bytes than in tv.
|
||||
result := bytes.NewBuffer(make([]byte, 0, length+2))
|
||||
result.WriteByte('"')
|
||||
for i := 0; i < length; i++ {
|
||||
b := tv[i]
|
||||
switch {
|
||||
case (b >= '-' && b <= '9') || // '-', '.', '/', 0-9
|
||||
(b >= 'A' && b <= 'Z') ||
|
||||
(b >= 'a' && b <= 'z'):
|
||||
result.WriteByte(b)
|
||||
case b == '_':
|
||||
result.WriteString("__")
|
||||
case b == ':':
|
||||
result.WriteString("_.")
|
||||
default:
|
||||
result.WriteString(fmt.Sprintf("_%X", b))
|
||||
}
|
||||
}
|
||||
result.WriteByte('"')
|
||||
return result.Bytes(), nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON unmarshals JSON strings coming from OpenTSDB into Go strings
|
||||
// by applying the inverse of what is described for the MarshalJSON method.
|
||||
func (tv *TagValue) UnmarshalJSON(json []byte) error {
|
||||
escapeLevel := 0 // How many bytes after '_'.
|
||||
var parsedByte byte
|
||||
|
||||
// Might need fewer bytes, but let's avoid realloc.
|
||||
result := bytes.NewBuffer(make([]byte, 0, len(json)-2))
|
||||
|
||||
for i, b := range json {
|
||||
if i == 0 {
|
||||
if b != '"' {
|
||||
return fmt.Errorf("expected '\"', got %q", b)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if i == len(json)-1 {
|
||||
if b != '"' {
|
||||
return fmt.Errorf("expected '\"', got %q", b)
|
||||
}
|
||||
break
|
||||
}
|
||||
switch escapeLevel {
|
||||
case 0:
|
||||
if b == '_' {
|
||||
escapeLevel = 1
|
||||
continue
|
||||
}
|
||||
result.WriteByte(b)
|
||||
case 1:
|
||||
switch {
|
||||
case b == '_':
|
||||
result.WriteByte('_')
|
||||
escapeLevel = 0
|
||||
case b == '.':
|
||||
result.WriteByte(':')
|
||||
escapeLevel = 0
|
||||
case b >= '0' && b <= '9':
|
||||
parsedByte = (b - 48) << 4
|
||||
escapeLevel = 2
|
||||
case b >= 'A' && b <= 'F': // A-F
|
||||
parsedByte = (b - 55) << 4
|
||||
escapeLevel = 2
|
||||
default:
|
||||
return fmt.Errorf(
|
||||
"illegal escape sequence at byte %d (%c)",
|
||||
i, b,
|
||||
)
|
||||
}
|
||||
case 2:
|
||||
switch {
|
||||
case b >= '0' && b <= '9':
|
||||
parsedByte += b - 48
|
||||
case b >= 'A' && b <= 'F': // A-F
|
||||
parsedByte += b - 55
|
||||
default:
|
||||
return fmt.Errorf(
|
||||
"illegal escape sequence at byte %d (%c)",
|
||||
i, b,
|
||||
)
|
||||
}
|
||||
result.WriteByte(parsedByte)
|
||||
escapeLevel = 0
|
||||
default:
|
||||
panic("unexpected escape level")
|
||||
}
|
||||
}
|
||||
*tv = TagValue(result.String())
|
||||
return nil
|
||||
}
|
|
@ -1,64 +0,0 @@
|
|||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package opentsdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var stringtests = []struct {
|
||||
tv TagValue
|
||||
json []byte
|
||||
}{
|
||||
{TagValue("foo-bar-42"), []byte(`"foo-bar-42"`)},
|
||||
{TagValue("foo_bar_42"), []byte(`"foo__bar__42"`)},
|
||||
{TagValue("http://example.org:8080"), []byte(`"http_.//example.org_.8080"`)},
|
||||
{TagValue("Björn's email: bjoern@soundcloud.com"), []byte(`"Bj_C3_B6rn_27s_20email_._20bjoern_40soundcloud.com"`)},
|
||||
{TagValue("日"), []byte(`"_E6_97_A5"`)},
|
||||
}
|
||||
|
||||
func TestTagValueMarshaling(t *testing.T) {
|
||||
for i, tt := range stringtests {
|
||||
json, err := json.Marshal(tt.tv)
|
||||
if err != nil {
|
||||
t.Errorf("%d. Marshal(%q) returned err: %s", i, tt.tv, err)
|
||||
} else {
|
||||
if !bytes.Equal(json, tt.json) {
|
||||
t.Errorf(
|
||||
"%d. Marshal(%q) => %q, want %q",
|
||||
i, tt.tv, json, tt.json,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTagValueUnMarshaling(t *testing.T) {
|
||||
for i, tt := range stringtests {
|
||||
var tv TagValue
|
||||
err := json.Unmarshal(tt.json, &tv)
|
||||
if err != nil {
|
||||
t.Errorf("%d. Unmarshal(%q, &str) returned err: %s", i, tt.json, err)
|
||||
} else {
|
||||
if tv != tt.tv {
|
||||
t.Errorf(
|
||||
"%d. Unmarshal(%q, &str) => str==%q, want %q",
|
||||
i, tt.json, tv, tt.tv,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright 2015 The Prometheus Authors
|
||||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
|
@ -14,101 +14,72 @@
|
|||
package remote
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
influx "github.com/influxdb/influxdb/client"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/relabel"
|
||||
"github.com/prometheus/prometheus/storage/remote/graphite"
|
||||
"github.com/prometheus/prometheus/storage/remote/influxdb"
|
||||
"github.com/prometheus/prometheus/storage/remote/opentsdb"
|
||||
)
|
||||
|
||||
// Storage collects multiple remote storage queues.
|
||||
type Storage struct {
|
||||
queues []*StorageQueueManager
|
||||
externalLabels model.LabelSet
|
||||
relabelConfigs []*config.RelabelConfig
|
||||
mtx sync.RWMutex
|
||||
externalLabels model.LabelSet
|
||||
conf config.RemoteWriteConfig
|
||||
|
||||
queue *StorageQueueManager
|
||||
}
|
||||
|
||||
// ApplyConfig updates the status state as the new config requires.
|
||||
// New returns a new remote Storage.
|
||||
func New() *Storage {
|
||||
return &Storage{}
|
||||
}
|
||||
|
||||
// ApplyConfig updates the state as the new config requires.
|
||||
func (s *Storage) ApplyConfig(conf *config.Config) error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
// TODO: we should only stop & recreate queues which have changes,
|
||||
// as this can be quite disruptive.
|
||||
var newQueue *StorageQueueManager
|
||||
|
||||
if conf.RemoteWriteConfig.URL != nil {
|
||||
c, err := NewClient(conf.RemoteWriteConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newQueue = NewStorageQueueManager(c, nil)
|
||||
}
|
||||
|
||||
if s.queue != nil {
|
||||
s.queue.Stop()
|
||||
}
|
||||
s.queue = newQueue
|
||||
s.conf = conf.RemoteWriteConfig
|
||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||
s.relabelConfigs = conf.RemoteWriteConfig.WriteRelabelConfigs
|
||||
if s.queue != nil {
|
||||
s.queue.Start()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// New returns a new remote Storage.
|
||||
func New(o *Options) (*Storage, error) {
|
||||
s := &Storage{}
|
||||
if o.GraphiteAddress != "" {
|
||||
c := graphite.NewClient(
|
||||
o.GraphiteAddress, o.GraphiteTransport,
|
||||
o.StorageTimeout, o.GraphitePrefix)
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
if o.OpentsdbURL != "" {
|
||||
c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout)
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
if o.InfluxdbURL != nil {
|
||||
conf := influx.Config{
|
||||
URL: *o.InfluxdbURL,
|
||||
Username: o.InfluxdbUsername,
|
||||
Password: o.InfluxdbPassword,
|
||||
Timeout: o.StorageTimeout,
|
||||
}
|
||||
c := influxdb.NewClient(conf, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy)
|
||||
prometheus.MustRegister(c)
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
if len(s.queues) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Options contains configuration parameters for a remote storage.
|
||||
type Options struct {
|
||||
StorageTimeout time.Duration
|
||||
InfluxdbURL *url.URL
|
||||
InfluxdbRetentionPolicy string
|
||||
InfluxdbUsername string
|
||||
InfluxdbPassword string
|
||||
InfluxdbDatabase string
|
||||
OpentsdbURL string
|
||||
GraphiteAddress string
|
||||
GraphiteTransport string
|
||||
GraphitePrefix string
|
||||
}
|
||||
|
||||
// Start starts the background processing of the storage queues.
|
||||
func (s *Storage) Start() {
|
||||
for _, q := range s.queues {
|
||||
q.Start()
|
||||
}
|
||||
}
|
||||
|
||||
// Stop the background processing of the storage queues.
|
||||
func (s *Storage) Stop() {
|
||||
for _, q := range s.queues {
|
||||
q.Stop()
|
||||
if s.queue != nil {
|
||||
s.queue.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Append implements storage.SampleAppender. Always returns nil.
|
||||
func (s *Storage) Append(smpl *model.Sample) error {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
if s.queue == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var snew model.Sample
|
||||
snew = *smpl
|
||||
|
@ -120,16 +91,12 @@ func (s *Storage) Append(smpl *model.Sample) error {
|
|||
}
|
||||
}
|
||||
snew.Metric = model.Metric(
|
||||
relabel.Process(model.LabelSet(snew.Metric), s.relabelConfigs...))
|
||||
s.mtx.RUnlock()
|
||||
relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...))
|
||||
|
||||
if snew.Metric == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, q := range s.queues {
|
||||
q.Append(&snew)
|
||||
}
|
||||
s.queue.Append(&snew)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,108 +0,0 @@
|
|||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/relabel"
|
||||
)
|
||||
|
||||
// Storage collects multiple remote storage queues.
|
||||
type ReloadableStorage struct {
|
||||
mtx sync.RWMutex
|
||||
externalLabels model.LabelSet
|
||||
conf config.RemoteWriteConfig
|
||||
|
||||
queue *StorageQueueManager
|
||||
}
|
||||
|
||||
// New returns a new remote Storage.
|
||||
func NewConfigurable() *ReloadableStorage {
|
||||
return &ReloadableStorage{}
|
||||
}
|
||||
|
||||
// ApplyConfig updates the state as the new config requires.
|
||||
func (s *ReloadableStorage) ApplyConfig(conf *config.Config) error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
// TODO: we should only stop & recreate queues which have changes,
|
||||
// as this can be quite disruptive.
|
||||
var newQueue *StorageQueueManager
|
||||
|
||||
if conf.RemoteWriteConfig.URL != nil {
|
||||
c, err := NewClient(conf.RemoteWriteConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newQueue = NewStorageQueueManager(c, nil)
|
||||
}
|
||||
|
||||
if s.queue != nil {
|
||||
s.queue.Stop()
|
||||
}
|
||||
s.queue = newQueue
|
||||
s.conf = conf.RemoteWriteConfig
|
||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||
if s.queue != nil {
|
||||
s.queue.Start()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop the background processing of the storage queues.
|
||||
func (s *ReloadableStorage) Stop() {
|
||||
if s.queue != nil {
|
||||
s.queue.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Append implements storage.SampleAppender. Always returns nil.
|
||||
func (s *ReloadableStorage) Append(smpl *model.Sample) error {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
if s.queue == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var snew model.Sample
|
||||
snew = *smpl
|
||||
snew.Metric = smpl.Metric.Clone()
|
||||
|
||||
for ln, lv := range s.externalLabels {
|
||||
if _, ok := smpl.Metric[ln]; !ok {
|
||||
snew.Metric[ln] = lv
|
||||
}
|
||||
}
|
||||
snew.Metric = model.Metric(
|
||||
relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...))
|
||||
|
||||
if snew.Metric == nil {
|
||||
return nil
|
||||
}
|
||||
s.queue.Append(&snew)
|
||||
return nil
|
||||
}
|
||||
|
||||
// NeedsThrottling implements storage.SampleAppender. It will always return
|
||||
// false as a remote storage drops samples on the floor if backlogging instead
|
||||
// of asking for throttling.
|
||||
func (s *ReloadableStorage) NeedsThrottling() bool {
|
||||
return false
|
||||
}
|
Loading…
Reference in a new issue