Remove documentation/examples/remote_storage

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
Julien Pivotto 2022-02-02 11:44:25 +01:00
parent 201aa3a964
commit d819219dd4
13 changed files with 0 additions and 1547 deletions

View file

@ -1,24 +0,0 @@
## Remote Write Adapter Example
This is a simple example of how to write a server to
receive samples from the remote storage output.
To use it:
```
go build
./example_write_adapter
```
...and then add the following to your `prometheus.yml`:
```yaml
remote_write:
- url: "http://localhost:1234/receive"
```
Then start Prometheus:
```
./prometheus
```

View file

@ -1,56 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"log"
"net/http"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote"
)
func main() {
http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) {
req, err := remote.DecodeWriteRequest(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, ts := range req.Timeseries {
m := make(model.Metric, len(ts.Labels))
for _, l := range ts.Labels {
m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
fmt.Println(m)
for _, s := range ts.Samples {
fmt.Printf("\tSample: %f %d\n", s.Value, s.Timestamp)
}
for _, e := range ts.Exemplars {
m := make(model.Metric, len(e.Labels))
for _, l := range e.Labels {
m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
fmt.Printf("\tExemplar: %+v %f %d\n", m, e.Value, e.Timestamp)
}
}
})
log.Fatal(http.ListenAndServe(":1234", nil))
}

View file

@ -1,55 +0,0 @@
# Remote storage adapter
This is a write adapter that receives samples via Prometheus's remote write
protocol and stores them in Graphite, InfluxDB, or OpenTSDB. It is meant as a
replacement for the built-in specific remote storage implementations that have
been removed from Prometheus.
For InfluxDB, this binary is also a read adapter that supports reading back
data through Prometheus via Prometheus's remote read protocol.
## Building
```
go build
```
## Running
Graphite example:
```
./remote_storage_adapter --graphite-address=localhost:8080
```
OpenTSDB example:
```
./remote_storage_adapter --opentsdb-url=http://localhost:8081/
```
InfluxDB example:
```
./remote_storage_adapter --influxdb-url=http://localhost:8086/ --influxdb.database=prometheus --influxdb.retention-policy=autogen
```
To show all flags:
```
./remote_storage_adapter -h
```
## Configuring Prometheus
To configure Prometheus to send samples to this binary, add the following to your `prometheus.yml`:
```yaml
# Remote write configuration (for Graphite, OpenTSDB, or InfluxDB).
remote_write:
- url: "http://localhost:9201/write"
# Remote read configuration (for InfluxDB only at the moment).
remote_read:
- url: "http://localhost:9201/read"
```

View file

@ -1,109 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package graphite
import (
"bytes"
"fmt"
"math"
"net"
"sort"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
)
// Client allows sending batches of Prometheus samples to Graphite.
type Client struct {
logger log.Logger
address string
transport string
timeout time.Duration
prefix string
}
// NewClient creates a new Client.
func NewClient(logger log.Logger, address, transport string, timeout time.Duration, prefix string) *Client {
if logger == nil {
logger = log.NewNopLogger()
}
return &Client{
logger: logger,
address: address,
transport: transport,
timeout: timeout,
prefix: prefix,
}
}
func pathFromMetric(m model.Metric, prefix string) string {
var buffer bytes.Buffer
buffer.WriteString(prefix)
buffer.WriteString(escape(m[model.MetricNameLabel]))
// We want to sort the labels.
labels := make(model.LabelNames, 0, len(m))
for l := range m {
labels = append(labels, l)
}
sort.Sort(labels)
// For each label, in order, add ".<label>.<value>".
for _, l := range labels {
v := m[l]
if l == model.MetricNameLabel || len(l) == 0 {
continue
}
// Since we use '.' instead of '=' to separate label and values
// it means that we can't have an '.' in the metric name. Fortunately
// this is prohibited in prometheus metrics.
buffer.WriteString(fmt.Sprintf(
".%s.%s", string(l), escape(v)))
}
return buffer.String()
}
// Write sends a batch of samples to Graphite.
func (c *Client) Write(samples model.Samples) error {
conn, err := net.DialTimeout(c.transport, c.address, c.timeout)
if err != nil {
return err
}
defer conn.Close()
var buf bytes.Buffer
for _, s := range samples {
k := pathFromMetric(s.Metric, c.prefix)
t := float64(s.Timestamp.UnixNano()) / 1e9
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
level.Debug(c.logger).Log("msg", "Cannot send value to Graphite, skipping sample", "value", v, "sample", s)
continue
}
fmt.Fprintf(&buf, "%s %f %f\n", k, v, t)
}
_, err = conn.Write(buf.Bytes())
return err
}
// Name identifies the client as a Graphite client.
func (c Client) Name() string {
return "graphite"
}

View file

@ -1,50 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package graphite
import (
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
var metric = model.Metric{
model.MetricNameLabel: "test:metric",
"testlabel": "test:value",
"many_chars": "abc!ABC:012-3!45ö67~89./(){},=.\"\\",
}
func TestEscape(t *testing.T) {
// Can we correctly keep and escape valid chars.
value := "abzABZ019(){},'\"\\"
expected := "abzABZ019\\(\\)\\{\\}\\,\\'\\\"\\\\"
actual := escape(model.LabelValue(value))
require.Equal(t, expected, actual)
// Test percent-encoding.
value = "é/|_;:%."
expected = "%C3%A9%2F|_;:%25%2E"
actual = escape(model.LabelValue(value))
require.Equal(t, expected, actual)
}
func TestPathFromMetric(t *testing.T) {
expected := ("prefix." +
"test:metric" +
".many_chars.abc!ABC:012-3!45%C3%B667~89%2E%2F\\(\\)\\{\\}\\,%3D%2E\\\"\\\\" +
".testlabel.test:value")
actual := pathFromMetric(metric, "prefix.")
require.Equal(t, expected, actual)
}

View file

@ -1,103 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package graphite
import (
"bytes"
"fmt"
"strings"
"github.com/prometheus/common/model"
)
const (
// From https://github.com/graphite-project/graphite-web/blob/master/webapp/graphite/render/grammar.py#L83
symbols = "(){},=.'\"\\"
printables = ("0123456789abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
"!\"#$%&\\'()*+,-./:;<=>?@[\\]^_`{|}~")
)
// Graphite doesn't support tags, so label names and values must be
// encoded into the metric path. The list of characters that are usable
// with Graphite is rather fuzzy. One 'source of truth' might be the grammar
// used to parse requests in the webapp:
// https://github.com/graphite-project/graphite-web/blob/master/webapp/graphite/render/grammar.py#L83
// The list of valid symbols is defined as:
// legal = printables - symbols + escaped(symbols)
//
// The default storage backend for Graphite (whisper) stores data
// in filenames, so we also need to use only valid filename characters.
// Fortunately on UNIX only '/' isn't, and Windows is completely unsupported
// by Graphite: http://graphite.readthedocs.org/en/latest/install.html#windows-users
// escape escapes a model.LabelValue into runes allowed in Graphite. The runes
// allowed in Graphite are all single-byte. This function encodes the arbitrary
// byte sequence found in this TagValue in way very similar to the traditional
// percent-encoding (https://en.wikipedia.org/wiki/Percent-encoding):
//
// - The string that underlies TagValue is scanned byte by byte.
//
// - If a byte represents a legal Graphite rune with the exception of '%', '/',
// '=' and '.', that byte is directly copied to the resulting byte slice.
// % is used for percent-encoding of other bytes.
// / is not usable in filenames.
// = is used when generating the path to associate values to labels.
// . already means something for Graphite and thus can't be used in a value.
//
// - If the byte is any of (){},=.'"\, then a '\' will be prepended to it. We
// do not percent-encode them since they are explicitly usable in this
// way in Graphite.
//
// - All other bytes are replaced by '%' followed by two bytes containing the
// uppercase ASCII representation of their hexadecimal value.
//
// This encoding allows to save arbitrary Go strings in Graphite. That's
// required because Prometheus label values can contain anything. Using
// percent encoding makes it easy to unescape, even in javascript.
//
// Examples:
//
// "foo-bar-42" -> "foo-bar-42"
//
// "foo_bar%42" -> "foo_bar%2542"
//
// "http://example.org:8080" -> "http:%2F%2Fexample%2Eorg:8080"
//
// "Björn's email: bjoern@soundcloud.com" ->
// "Bj%C3%B6rn's%20email:%20bjoern%40soundcloud.com"
//
// "日" -> "%E6%97%A5"
func escape(tv model.LabelValue) string {
length := len(tv)
result := bytes.NewBuffer(make([]byte, 0, length))
for i := 0; i < length; i++ {
b := tv[i]
switch {
// . is reserved by graphite, % is used to escape other bytes.
case b == '.' || b == '%' || b == '/' || b == '=':
fmt.Fprintf(result, "%%%X", b)
// These symbols are ok only if backslash escaped.
case strings.IndexByte(symbols, b) != -1:
result.WriteString("\\" + string(b))
// These are all fine.
case strings.IndexByte(printables, b) != -1:
result.WriteByte(b)
// Defaults to percent-encoding.
default:
fmt.Fprintf(result, "%%%X", b)
}
}
return result.String()
}

View file

@ -1,323 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package influxdb
import (
"encoding/json"
"fmt"
"math"
"os"
"strings"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)
// Client allows sending batches of Prometheus samples to InfluxDB.
type Client struct {
logger log.Logger
client influx.Client
database string
retentionPolicy string
ignoredSamples prometheus.Counter
}
// NewClient creates a new Client.
func NewClient(logger log.Logger, conf influx.HTTPConfig, db, rp string) *Client {
c, err := influx.NewHTTPClient(conf)
// Currently influx.NewClient() *should* never return an error.
if err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
if logger == nil {
logger = log.NewNopLogger()
}
return &Client{
logger: logger,
client: c,
database: db,
retentionPolicy: rp,
ignoredSamples: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_influxdb_ignored_samples_total",
Help: "The total number of samples not sent to InfluxDB due to unsupported float values (Inf, -Inf, NaN).",
},
),
}
}
// tagsFromMetric extracts InfluxDB tags from a Prometheus metric.
func tagsFromMetric(m model.Metric) map[string]string {
tags := make(map[string]string, len(m)-1)
for l, v := range m {
if l != model.MetricNameLabel {
tags[string(l)] = string(v)
}
}
return tags
}
// Write sends a batch of samples to InfluxDB via its HTTP API.
func (c *Client) Write(samples model.Samples) error {
points := make([]*influx.Point, 0, len(samples))
for _, s := range samples {
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
level.Debug(c.logger).Log("msg", "Cannot send to InfluxDB, skipping sample", "value", v, "sample", s)
c.ignoredSamples.Inc()
continue
}
p, err := influx.NewPoint(
string(s.Metric[model.MetricNameLabel]),
tagsFromMetric(s.Metric),
map[string]interface{}{"value": v},
s.Timestamp.Time(),
)
if err != nil {
return err
}
points = append(points, p)
}
bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{
Precision: "ms",
Database: c.database,
RetentionPolicy: c.retentionPolicy,
})
if err != nil {
return err
}
bps.AddPoints(points)
return c.client.Write(bps)
}
func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
labelsToSeries := map[string]*prompb.TimeSeries{}
for _, q := range req.Queries {
command, err := c.buildCommand(q)
if err != nil {
return nil, err
}
query := influx.NewQuery(command, c.database, "ms")
resp, err := c.client.Query(query)
if err != nil {
return nil, err
}
if resp.Err != "" {
return nil, errors.New(resp.Err)
}
if err = mergeResult(labelsToSeries, resp.Results); err != nil {
return nil, err
}
}
resp := prompb.ReadResponse{
Results: []*prompb.QueryResult{
{Timeseries: make([]*prompb.TimeSeries, 0, len(labelsToSeries))},
},
}
for _, ts := range labelsToSeries {
resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts)
}
return &resp, nil
}
func (c *Client) buildCommand(q *prompb.Query) (string, error) {
matchers := make([]string, 0, len(q.Matchers))
// If we don't find a metric name matcher, query all metrics
// (InfluxDB measurements) by default.
from := "FROM /.+/"
for _, m := range q.Matchers {
if m.Name == model.MetricNameLabel {
switch m.Type {
case prompb.LabelMatcher_EQ:
from = fmt.Sprintf("FROM %q.%q", c.retentionPolicy, m.Value)
case prompb.LabelMatcher_RE:
from = fmt.Sprintf("FROM %q./^%s$/", c.retentionPolicy, escapeSlashes(m.Value))
default:
// TODO: Figure out how to support these efficiently.
return "", errors.New("non-equal or regex-non-equal matchers are not supported on the metric name yet")
}
continue
}
switch m.Type {
case prompb.LabelMatcher_EQ:
matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value)))
case prompb.LabelMatcher_NEQ:
matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value)))
case prompb.LabelMatcher_RE:
matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value)))
case prompb.LabelMatcher_NRE:
matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value)))
default:
return "", errors.Errorf("unknown match type %v", m.Type)
}
}
matchers = append(matchers, fmt.Sprintf("time >= %vms", q.StartTimestampMs))
matchers = append(matchers, fmt.Sprintf("time <= %vms", q.EndTimestampMs))
return fmt.Sprintf("SELECT value %s WHERE %v GROUP BY *", from, strings.Join(matchers, " AND ")), nil
}
func escapeSingleQuotes(str string) string {
return strings.Replace(str, `'`, `\'`, -1)
}
func escapeSlashes(str string) string {
return strings.Replace(str, `/`, `\/`, -1)
}
func mergeResult(labelsToSeries map[string]*prompb.TimeSeries, results []influx.Result) error {
for _, r := range results {
for _, s := range r.Series {
k := concatLabels(s.Tags)
ts, ok := labelsToSeries[k]
if !ok {
ts = &prompb.TimeSeries{
Labels: tagsToLabelPairs(s.Name, s.Tags),
}
labelsToSeries[k] = ts
}
samples, err := valuesToSamples(s.Values)
if err != nil {
return err
}
ts.Samples = mergeSamples(ts.Samples, samples)
}
}
return nil
}
func concatLabels(labels map[string]string) string {
// 0xff cannot occur in valid UTF-8 sequences, so use it
// as a separator here.
separator := "\xff"
pairs := make([]string, 0, len(labels))
for k, v := range labels {
pairs = append(pairs, k+separator+v)
}
return strings.Join(pairs, separator)
}
func tagsToLabelPairs(name string, tags map[string]string) []prompb.Label {
pairs := make([]prompb.Label, 0, len(tags))
for k, v := range tags {
if v == "" {
// If we select metrics with different sets of labels names,
// InfluxDB returns *all* possible tag names on all returned
// series, with empty tag values on series where they don't
// apply. In Prometheus, an empty label value is equivalent
// to a non-existent label, so we just skip empty ones here
// to make the result correct.
continue
}
pairs = append(pairs, prompb.Label{
Name: k,
Value: v,
})
}
pairs = append(pairs, prompb.Label{
Name: model.MetricNameLabel,
Value: name,
})
return pairs
}
func valuesToSamples(values [][]interface{}) ([]prompb.Sample, error) {
samples := make([]prompb.Sample, 0, len(values))
for _, v := range values {
if len(v) != 2 {
return nil, errors.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v)
}
jsonTimestamp, ok := v[0].(json.Number)
if !ok {
return nil, errors.Errorf("bad timestamp: %v", v[0])
}
jsonValue, ok := v[1].(json.Number)
if !ok {
return nil, errors.Errorf("bad sample value: %v", v[1])
}
timestamp, err := jsonTimestamp.Int64()
if err != nil {
return nil, errors.Wrap(err, "unable to convert sample timestamp to int64")
}
value, err := jsonValue.Float64()
if err != nil {
return nil, errors.Wrap(err, "unable to convert sample value to float64")
}
samples = append(samples, prompb.Sample{
Timestamp: timestamp,
Value: value,
})
}
return samples, nil
}
// mergeSamples merges two lists of sample pairs and removes duplicate
// timestamps. It assumes that both lists are sorted by timestamp.
func mergeSamples(a, b []prompb.Sample) []prompb.Sample {
result := make([]prompb.Sample, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].Timestamp < b[j].Timestamp {
result = append(result, a[i])
i++
} else if a[i].Timestamp > b[j].Timestamp {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
i++
j++
}
}
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
}
// Name identifies the client as an InfluxDB client.
func (c Client) Name() string {
return "influxdb"
}
// Describe implements prometheus.Collector.
func (c *Client) Describe(ch chan<- *prometheus.Desc) {
ch <- c.ignoredSamples.Desc()
}
// Collect implements prometheus.Collector.
func (c *Client) Collect(ch chan<- prometheus.Metric) {
ch <- c.ignoredSamples
}

View file

@ -1,98 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package influxdb
import (
"io/ioutil"
"math"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func TestClient(t *testing.T) {
samples := model.Samples{
{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
"test_label": "test_label_value1",
},
Timestamp: model.Time(123456789123),
Value: 1.23,
},
{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
"test_label": "test_label_value2",
},
Timestamp: model.Time(123456789123),
Value: 5.1234,
},
{
Metric: model.Metric{
model.MetricNameLabel: "nan_value",
},
Timestamp: model.Time(123456789123),
Value: model.SampleValue(math.NaN()),
},
{
Metric: model.Metric{
model.MetricNameLabel: "pos_inf_value",
},
Timestamp: model.Time(123456789123),
Value: model.SampleValue(math.Inf(1)),
},
{
Metric: model.Metric{
model.MetricNameLabel: "neg_inf_value",
},
Timestamp: model.Time(123456789123),
Value: model.SampleValue(math.Inf(-1)),
},
}
expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123
testmetric,test_label=test_label_value2 value=5.1234 123456789123
`
server := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "POST", r.Method, "Unexpected method.")
require.Equal(t, "/write", r.URL.Path, "Unexpected path.")
b, err := ioutil.ReadAll(r.Body)
require.NoError(t, err, "Error reading body.")
require.Equal(t, expectedBody, string(b), "Unexpected request body.")
},
))
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err, "Unable to parse server URL.")
conf := influx.HTTPConfig{
Addr: serverURL.String(),
Username: "testuser",
Password: "testpass",
Timeout: time.Minute,
}
c := NewClient(nil, conf, "test_db", "default")
err = c.Write(samples)
require.NoError(t, err, "Error sending samples.")
}

View file

@ -1,320 +0,0 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The main package for the Prometheus server executable.
package main
import (
"fmt"
"io/ioutil"
"net/http"
_ "net/http/pprof"
"net/url"
"os"
"path/filepath"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"
"gopkg.in/alecthomas/kingpin.v2"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/graphite"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/influxdb"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/opentsdb"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
)
type config struct {
graphiteAddress string
graphiteTransport string
graphitePrefix string
opentsdbURL string
influxdbURL string
influxdbRetentionPolicy string
influxdbUsername string
influxdbDatabase string
influxdbPassword string
remoteTimeout time.Duration
listenAddr string
telemetryPath string
promlogConfig promlog.Config
}
var (
receivedSamples = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "received_samples_total",
Help: "Total number of received samples.",
},
)
sentSamples = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "sent_samples_total",
Help: "Total number of processed samples sent to remote storage.",
},
[]string{"remote"},
)
failedSamples = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "failed_samples_total",
Help: "Total number of processed samples which failed on send to remote storage.",
},
[]string{"remote"},
)
sentBatchDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "sent_batch_duration_seconds",
Help: "Duration of sample batch send calls to the remote storage.",
Buckets: prometheus.DefBuckets,
},
[]string{"remote"},
)
)
func init() {
prometheus.MustRegister(receivedSamples)
prometheus.MustRegister(sentSamples)
prometheus.MustRegister(failedSamples)
prometheus.MustRegister(sentBatchDuration)
}
func main() {
cfg := parseFlags()
http.Handle(cfg.telemetryPath, promhttp.Handler())
logger := promlog.New(&cfg.promlogConfig)
writers, readers := buildClients(logger, cfg)
if err := serve(logger, cfg.listenAddr, writers, readers); err != nil {
level.Error(logger).Log("msg", "Failed to listen", "addr", cfg.listenAddr, "err", err)
os.Exit(1)
}
}
func parseFlags() *config {
a := kingpin.New(filepath.Base(os.Args[0]), "Remote storage adapter")
a.HelpFlag.Short('h')
cfg := &config{
influxdbPassword: os.Getenv("INFLUXDB_PW"),
promlogConfig: promlog.Config{},
}
a.Flag("graphite-address", "The host:port of the Graphite server to send samples to. None, if empty.").
Default("").StringVar(&cfg.graphiteAddress)
a.Flag("graphite-transport", "Transport protocol to use to communicate with Graphite. 'tcp', if empty.").
Default("tcp").StringVar(&cfg.graphiteTransport)
a.Flag("graphite-prefix", "The prefix to prepend to all metrics exported to Graphite. None, if empty.").
Default("").StringVar(&cfg.graphitePrefix)
a.Flag("opentsdb-url", "The URL of the remote OpenTSDB server to send samples to. None, if empty.").
Default("").StringVar(&cfg.opentsdbURL)
a.Flag("influxdb-url", "The URL of the remote InfluxDB server to send samples to. None, if empty.").
Default("").StringVar(&cfg.influxdbURL)
a.Flag("influxdb.retention-policy", "The InfluxDB retention policy to use.").
Default("autogen").StringVar(&cfg.influxdbRetentionPolicy)
a.Flag("influxdb.username", "The username to use when sending samples to InfluxDB. The corresponding password must be provided via the INFLUXDB_PW environment variable.").
Default("").StringVar(&cfg.influxdbUsername)
a.Flag("influxdb.database", "The name of the database to use for storing samples in InfluxDB.").
Default("prometheus").StringVar(&cfg.influxdbDatabase)
a.Flag("send-timeout", "The timeout to use when sending samples to the remote storage.").
Default("30s").DurationVar(&cfg.remoteTimeout)
a.Flag("web.listen-address", "Address to listen on for web endpoints.").
Default(":9201").StringVar(&cfg.listenAddr)
a.Flag("web.telemetry-path", "Address to listen on for web endpoints.").
Default("/metrics").StringVar(&cfg.telemetryPath)
flag.AddFlags(a, &cfg.promlogConfig)
_, err := a.Parse(os.Args[1:])
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "Error parsing commandline arguments"))
a.Usage(os.Args[1:])
os.Exit(2)
}
return cfg
}
type writer interface {
Write(samples model.Samples) error
Name() string
}
type reader interface {
Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error)
Name() string
}
func buildClients(logger log.Logger, cfg *config) ([]writer, []reader) {
var writers []writer
var readers []reader
if cfg.graphiteAddress != "" {
c := graphite.NewClient(
log.With(logger, "storage", "Graphite"),
cfg.graphiteAddress, cfg.graphiteTransport,
cfg.remoteTimeout, cfg.graphitePrefix)
writers = append(writers, c)
}
if cfg.opentsdbURL != "" {
c := opentsdb.NewClient(
log.With(logger, "storage", "OpenTSDB"),
cfg.opentsdbURL,
cfg.remoteTimeout,
)
writers = append(writers, c)
}
if cfg.influxdbURL != "" {
url, err := url.Parse(cfg.influxdbURL)
if err != nil {
level.Error(logger).Log("msg", "Failed to parse InfluxDB URL", "url", cfg.influxdbURL, "err", err)
os.Exit(1)
}
conf := influx.HTTPConfig{
Addr: url.String(),
Username: cfg.influxdbUsername,
Password: cfg.influxdbPassword,
Timeout: cfg.remoteTimeout,
}
c := influxdb.NewClient(
log.With(logger, "storage", "InfluxDB"),
conf,
cfg.influxdbDatabase,
cfg.influxdbRetentionPolicy,
)
prometheus.MustRegister(c)
writers = append(writers, c)
readers = append(readers, c)
}
level.Info(logger).Log("msg", "Starting up...")
return writers, readers
}
func serve(logger log.Logger, addr string, writers []writer, readers []reader) error {
http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
req, err := remote.DecodeWriteRequest(r.Body)
if err != nil {
level.Error(logger).Log("msg", "Read error", "err", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
samples := protoToSamples(req)
receivedSamples.Add(float64(len(samples)))
var wg sync.WaitGroup
for _, w := range writers {
wg.Add(1)
go func(rw writer) {
sendSamples(logger, rw, samples)
wg.Done()
}(w)
}
wg.Wait()
})
http.HandleFunc("/read", func(w http.ResponseWriter, r *http.Request) {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
level.Error(logger).Log("msg", "Read error", "err", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
level.Error(logger).Log("msg", "Decode error", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var req prompb.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
level.Error(logger).Log("msg", "Unmarshal error", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// TODO: Support reading from more than one reader and merging the results.
if len(readers) != 1 {
http.Error(w, fmt.Sprintf("expected exactly one reader, found %d readers", len(readers)), http.StatusInternalServerError)
return
}
reader := readers[0]
var resp *prompb.ReadResponse
resp, err = reader.Read(&req)
if err != nil {
level.Warn(logger).Log("msg", "Error executing query", "query", req, "storage", reader.Name(), "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
data, err := proto.Marshal(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy")
compressed = snappy.Encode(nil, data)
if _, err := w.Write(compressed); err != nil {
level.Warn(logger).Log("msg", "Error writing response", "storage", reader.Name(), "err", err)
}
})
return http.ListenAndServe(addr, nil)
}
func protoToSamples(req *prompb.WriteRequest) model.Samples {
var samples model.Samples
for _, ts := range req.Timeseries {
metric := make(model.Metric, len(ts.Labels))
for _, l := range ts.Labels {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
for _, s := range ts.Samples {
samples = append(samples, &model.Sample{
Metric: metric,
Value: model.SampleValue(s.Value),
Timestamp: model.Time(s.Timestamp),
})
}
}
return samples
}
func sendSamples(logger log.Logger, w writer, samples model.Samples) {
begin := time.Now()
err := w.Write(samples)
duration := time.Since(begin).Seconds()
if err != nil {
level.Warn(logger).Log("msg", "Error sending samples to remote storage", "err", err, "storage", w.Name(), "num_samples", len(samples))
failedSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))
}
sentSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))
sentBatchDuration.WithLabelValues(w.Name()).Observe(duration)
}

View file

@ -1,146 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opentsdb
import (
"bytes"
"context"
"encoding/json"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
const (
putEndpoint = "/api/put"
contentTypeJSON = "application/json"
)
// Client allows sending batches of Prometheus samples to OpenTSDB.
type Client struct {
logger log.Logger
url string
timeout time.Duration
}
// NewClient creates a new Client.
func NewClient(logger log.Logger, url string, timeout time.Duration) *Client {
return &Client{
logger: logger,
url: url,
timeout: timeout,
}
}
// StoreSamplesRequest is used for building a JSON request for storing samples
// via the OpenTSDB.
type StoreSamplesRequest struct {
Metric TagValue `json:"metric"`
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
Tags map[string]TagValue `json:"tags"`
}
// tagsFromMetric translates Prometheus metric into OpenTSDB tags.
func tagsFromMetric(m model.Metric) map[string]TagValue {
tags := make(map[string]TagValue, len(m)-1)
for l, v := range m {
if l == model.MetricNameLabel {
continue
}
tags[string(l)] = TagValue(v)
}
return tags
}
// Write sends a batch of samples to OpenTSDB via its HTTP API.
func (c *Client) Write(samples model.Samples) error {
reqs := make([]StoreSamplesRequest, 0, len(samples))
for _, s := range samples {
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
level.Debug(c.logger).Log("msg", "Cannot send value to OpenTSDB, skipping sample", "value", v, "sample", s)
continue
}
metric := TagValue(s.Metric[model.MetricNameLabel])
reqs = append(reqs, StoreSamplesRequest{
Metric: metric,
Timestamp: s.Timestamp.Unix(),
Value: v,
Tags: tagsFromMetric(s.Metric),
})
}
u, err := url.Parse(c.url)
if err != nil {
return err
}
u.Path = putEndpoint
buf, err := json.Marshal(reqs)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
req, err := http.NewRequest("POST", u.String(), bytes.NewBuffer(buf))
if err != nil {
return err
}
req.Header.Set("Content-Type", contentTypeJSON)
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
// API returns status code 204 for successful writes.
// http://opentsdb.net/docs/build/html/api_http/put.html
if resp.StatusCode == http.StatusNoContent {
return nil
}
// API returns status code 400 on error, encoding error details in the
// response content in JSON.
buf, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
var r map[string]int
if err := json.Unmarshal(buf, &r); err != nil {
return err
}
return errors.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"])
}
// Name identifies the client as an OpenTSDB client.
func (c Client) Name() string {
return "opentsdb"
}

View file

@ -1,56 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opentsdb
import (
"encoding/json"
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
var metric = model.Metric{
model.MetricNameLabel: "test:metric",
"testlabel": "test:value",
"many_chars": "abc!ABC:012-3!45ö67~89./",
}
func TestTagsFromMetric(t *testing.T) {
expected := map[string]TagValue{
"testlabel": TagValue("test:value"),
"many_chars": TagValue("abc!ABC:012-3!45ö67~89./"),
}
actual := tagsFromMetric(metric)
require.Equal(t, expected, actual)
}
func TestMarshalStoreSamplesRequest(t *testing.T) {
request := StoreSamplesRequest{
Metric: TagValue("test:metric"),
Timestamp: 4711,
Value: 3.1415,
Tags: tagsFromMetric(metric),
}
expectedJSON := []byte(`{"metric":"test_.metric","timestamp":4711,"value":3.1415,"tags":{"many_chars":"abc_21ABC_.012-3_2145_C3_B667_7E89./","testlabel":"test_.value"}}`)
resultingJSON, err := json.Marshal(request)
require.NoError(t, err, "Marshal(request) resulted in err.")
require.Equal(t, expectedJSON, resultingJSON)
var unmarshaledRequest StoreSamplesRequest
err = json.Unmarshal(expectedJSON, &unmarshaledRequest)
require.NoError(t, err, "Unmarshal(expectedJSON, &unmarshaledRequest) resulted in err.")
require.Equal(t, request, unmarshaledRequest)
}

View file

@ -1,158 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opentsdb
import (
"bytes"
"fmt"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
// TagValue is a model.LabelValue that implements json.Marshaler and
// json.Unmarshaler. These implementations avoid characters illegal in
// OpenTSDB. See the MarshalJSON for details. TagValue is used for the values of
// OpenTSDB tags as well as for OpenTSDB metric names.
type TagValue model.LabelValue
// MarshalJSON marshals this TagValue into JSON that only contains runes allowed
// in OpenTSDB. It implements json.Marshaler. The runes allowed in OpenTSDB are
// all single-byte. This function encodes the arbitrary byte sequence found in
// this TagValue in the following way:
//
// - The string that underlies TagValue is scanned byte by byte.
//
// - If a byte represents a legal OpenTSDB rune with the exception of '_', that
// byte is directly copied to the resulting JSON byte slice.
//
// - If '_' is encountered, it is replaced by '__'.
//
// - If ':' is encountered, it is replaced by '_.'.
//
// - All other bytes are replaced by '_' followed by two bytes containing the
// uppercase ASCII representation of their hexadecimal value.
//
// This encoding allows to save arbitrary Go strings in OpenTSDB. That's
// required because Prometheus label values can contain anything, and even
// Prometheus metric names may (and often do) contain ':' (which is disallowed
// in OpenTSDB strings). The encoding uses '_' as an escape character and
// renders a ':' more or less recognizable as '_.'
//
// Examples:
//
// "foo-bar-42" -> "foo-bar-42"
//
// "foo_bar_42" -> "foo__bar__42"
//
// "http://example.org:8080" -> "http_.//example.org_.8080"
//
// "Björn's email: bjoern@soundcloud.com" ->
// "Bj_C3_B6rn_27s_20email_._20bjoern_40soundcloud.com"
//
// "日" -> "_E6_97_A5"
func (tv TagValue) MarshalJSON() ([]byte, error) {
length := len(tv)
// Need at least two more bytes than in tv.
result := bytes.NewBuffer(make([]byte, 0, length+2))
result.WriteByte('"')
for i := 0; i < length; i++ {
b := tv[i]
switch {
case (b >= '-' && b <= '9') || // '-', '.', '/', 0-9
(b >= 'A' && b <= 'Z') ||
(b >= 'a' && b <= 'z'):
result.WriteByte(b)
case b == '_':
result.WriteString("__")
case b == ':':
result.WriteString("_.")
default:
result.WriteString(fmt.Sprintf("_%X", b))
}
}
result.WriteByte('"')
return result.Bytes(), nil
}
// UnmarshalJSON unmarshals JSON strings coming from OpenTSDB into Go strings
// by applying the inverse of what is described for the MarshalJSON method.
func (tv *TagValue) UnmarshalJSON(json []byte) error {
escapeLevel := 0 // How many bytes after '_'.
var parsedByte byte
// Might need fewer bytes, but let's avoid realloc.
result := bytes.NewBuffer(make([]byte, 0, len(json)-2))
for i, b := range json {
if i == 0 {
if b != '"' {
return errors.Errorf("expected '\"', got %q", b)
}
continue
}
if i == len(json)-1 {
if b != '"' {
return errors.Errorf("expected '\"', got %q", b)
}
break
}
switch escapeLevel {
case 0:
if b == '_' {
escapeLevel = 1
continue
}
result.WriteByte(b)
case 1:
switch {
case b == '_':
result.WriteByte('_')
escapeLevel = 0
case b == '.':
result.WriteByte(':')
escapeLevel = 0
case b >= '0' && b <= '9':
parsedByte = (b - 48) << 4
escapeLevel = 2
case b >= 'A' && b <= 'F': // A-F
parsedByte = (b - 55) << 4
escapeLevel = 2
default:
return errors.Errorf(
"illegal escape sequence at byte %d (%c)",
i, b,
)
}
case 2:
switch {
case b >= '0' && b <= '9':
parsedByte += b - 48
case b >= 'A' && b <= 'F': // A-F
parsedByte += b - 55
default:
return errors.Errorf(
"illegal escape sequence at byte %d (%c)",
i, b,
)
}
result.WriteByte(parsedByte)
escapeLevel = 0
default:
panic("unexpected escape level")
}
}
*tv = TagValue(result.String())
return nil
}

View file

@ -1,49 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opentsdb
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/require"
)
var stringtests = []struct {
tv TagValue
json []byte
}{
{TagValue("foo-bar-42"), []byte(`"foo-bar-42"`)},
{TagValue("foo_bar_42"), []byte(`"foo__bar__42"`)},
{TagValue("http://example.org:8080"), []byte(`"http_.//example.org_.8080"`)},
{TagValue("Björn's email: bjoern@soundcloud.com"), []byte(`"Bj_C3_B6rn_27s_20email_._20bjoern_40soundcloud.com"`)},
{TagValue("日"), []byte(`"_E6_97_A5"`)},
}
func TestTagValueMarshaling(t *testing.T) {
for i, tt := range stringtests {
got, err := json.Marshal(tt.tv)
require.NoError(t, err, "%d. Marshal(%q) returned error.", i, tt.tv)
require.Equal(t, tt.json, got, "%d. Marshal(%q) not equal.", i, tt.tv)
}
}
func TestTagValueUnMarshaling(t *testing.T) {
for i, tt := range stringtests {
var tv TagValue
err := json.Unmarshal(tt.json, &tv)
require.NoError(t, err, "%d. Unmarshal(%q, &str) returned error.", i, tt.json)
require.Equal(t, tt.tv, tv, "%d. Unmarshal(%q, &str) not equal.", i, tt.json)
}
}