From e722a3923fdf072abdf0be3c67414b846e926ab7 Mon Sep 17 00:00:00 2001 From: Zhang Zhanpeng Date: Sat, 11 Jan 2025 22:00:39 +0800 Subject: [PATCH] upgrade influxdb client to v2 Signed-off-by: Zhang Zhanpeng --- documentation/examples/remote_storage/go.mod | 5 +- documentation/examples/remote_storage/go.sum | 14 +- .../remote_storage_adapter/README.md | 2 +- .../remote_storage_adapter/influxdb/client.go | 219 ++++++++++-------- .../influxdb/client_test.go | 13 +- .../remote_storage_adapter/main.go | 53 ++--- 6 files changed, 160 insertions(+), 146 deletions(-) diff --git a/documentation/examples/remote_storage/go.mod b/documentation/examples/remote_storage/go.mod index 643fbc901c..da8ef55332 100644 --- a/documentation/examples/remote_storage/go.mod +++ b/documentation/examples/remote_storage/go.mod @@ -6,7 +6,7 @@ require ( github.com/alecthomas/kingpin/v2 v2.4.0 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 - github.com/influxdata/influxdb v1.11.8 + github.com/influxdata/influxdb-client-go/v2 v2.14.0 github.com/prometheus/client_golang v1.20.5 github.com/prometheus/common v0.61.0 github.com/prometheus/prometheus v1.99.0 @@ -19,6 +19,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/internal v1.8.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/aws/aws-sdk-go v1.55.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -32,6 +33,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -41,6 +43,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect + github.com/oapi-codegen/runtime v1.0.0 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/documentation/examples/remote_storage/go.sum b/documentation/examples/remote_storage/go.sum index 23ac21c12c..53077c877a 100644 --- a/documentation/examples/remote_storage/go.sum +++ b/documentation/examples/remote_storage/go.sum @@ -14,6 +14,7 @@ github.com/Code-Hex/go-generics-cache v1.5.1 h1:6vhZGc5M7Y/YD8cIUcY8kcuQLB4cHR7U github.com/Code-Hex/go-generics-cache v1.5.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/alecthomas/kingpin/v2 v2.4.0 h1:f48lwail6p8zpO1bC4TxtqACaGqHYA22qkHjHpqDjYY= github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -23,6 +24,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 h1:ez/4by2iGztzR4L0zgAOR8lTQK9VlyBVVd7G4omaOQs= github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= @@ -34,6 +37,7 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -166,8 +170,10 @@ github.com/hetznercloud/hcloud-go/v2 v2.9.0 h1:s0N6R7Zoi2DPfMtUF5o9VeUBzTtHVY6MI github.com/hetznercloud/hcloud-go/v2 v2.9.0/go.mod h1:qtW/TuU7Bs16ibXl/ktJarWqU2LwHr7eGlwoilHxtgg= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= -github.com/influxdata/influxdb v1.11.8 h1:lX8MJDfk91O7nqzzonQkjk87gOeQy9V/Xp3gpELhG1s= -github.com/influxdata/influxdb v1.11.8/go.mod h1:zRTAuk/Ie/V1LGxJUv8jfDmfv+ypz22lxfhc1MxC3rI= +github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4= +github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/ionos-cloud/sdk-go/v6 v6.1.11 h1:J/uRN4UWO3wCyGOeDdMKv8LWRzKu6UIkLEaes38Kzh8= github.com/ionos-cloud/sdk-go/v6 v6.1.11/go.mod h1:EzEgRIDxBELvfoa/uBN0kOQaqovLjUWEB7iW4/Q+t4k= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -183,6 +189,7 @@ github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -232,6 +239,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= +github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -285,6 +294,7 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= diff --git a/documentation/examples/remote_storage/remote_storage_adapter/README.md b/documentation/examples/remote_storage/remote_storage_adapter/README.md index ce0735bff5..5f37c4d9eb 100644 --- a/documentation/examples/remote_storage/remote_storage_adapter/README.md +++ b/documentation/examples/remote_storage/remote_storage_adapter/README.md @@ -31,7 +31,7 @@ OpenTSDB example: InfluxDB example: ``` -./remote_storage_adapter --influxdb-url=http://localhost:8086/ --influxdb.database=prometheus --influxdb.retention-policy=autogen +INFLUXDB_AUTH_TOKEN= ./remote_storage_adapter --influxdb-url=http://localhost:8086/ --influxdb.organization= --influxdb.bucket= ``` To show all flags: diff --git a/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go b/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go index 6ae40f8173..12dc1732a5 100644 --- a/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go +++ b/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go @@ -14,15 +14,17 @@ package influxdb import ( - "encoding/json" + "context" "errors" "fmt" "log/slog" "math" - "os" "strings" + "time" - influx "github.com/influxdata/influxdb/client/v2" + influx "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api/query" + "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" @@ -34,36 +36,39 @@ import ( type Client struct { logger *slog.Logger - client influx.Client - database string - retentionPolicy string - ignoredSamples prometheus.Counter + client influx.Client + organization string + bucket string + ignoredSamples prometheus.Counter + + context context.Context } // NewClient creates a new Client. -func NewClient(logger *slog.Logger, conf influx.HTTPConfig, db, rp string) *Client { - c, err := influx.NewHTTPClient(conf) - // Currently influx.NewClient() *should* never return an error. - if err != nil { - logger.Error("Error creating influx HTTP client", "err", err) - os.Exit(1) - } +func NewClient(logger *slog.Logger, url, authToken, organization, bucket string) *Client { + c := influx.NewClientWithOptions( + url, + authToken, + influx.DefaultOptions().SetPrecision(time.Millisecond), + ) if logger == nil { logger = promslog.NewNopLogger() } return &Client{ - logger: logger, - client: c, - database: db, - retentionPolicy: rp, + logger: logger, + client: c, + organization: organization, + bucket: bucket, ignoredSamples: prometheus.NewCounter( prometheus.CounterOpts{ Name: "prometheus_influxdb_ignored_samples_total", Help: "The total number of samples not sent to InfluxDB due to unsupported float values (Inf, -Inf, NaN).", }, ), + + context: context.Background(), } } @@ -80,39 +85,41 @@ func tagsFromMetric(m model.Metric) map[string]string { // Write sends a batch of samples to InfluxDB via its HTTP API. func (c *Client) Write(samples model.Samples) error { - points := make([]*influx.Point, 0, len(samples)) + points := make([]*write.Point, 0, len(samples)) for _, s := range samples { v := float64(s.Value) if math.IsNaN(v) || math.IsInf(v, 0) { - c.logger.Debug("Cannot send to InfluxDB, skipping sample", "value", v, "sample", s) + c.logger.Debug("Cannot send to InfluxDB, skipping sample", "value", v, "sample", s) c.ignoredSamples.Inc() continue } - p, err := influx.NewPoint( + p := influx.NewPoint( string(s.Metric[model.MetricNameLabel]), tagsFromMetric(s.Metric), map[string]interface{}{"value": v}, s.Timestamp.Time(), ) - if err != nil { - return err - } points = append(points, p) } - bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{ - Precision: "ms", - Database: c.database, - RetentionPolicy: c.retentionPolicy, - }) - if err != nil { + writeAPI := c.client.WriteAPIBlocking(c.organization, c.bucket) + writeAPI.EnableBatching() // default 5_000 + var err error + for _, p := range points { + if err = writeAPI.WritePoint(c.context, p); err != nil { + return err + } + } + if err = writeAPI.Flush(c.context); err != nil { return err } - bps.AddPoints(points) - return c.client.Write(bps) + + return nil } func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) { + queryAPI := c.client.QueryAPI(c.organization) + labelsToSeries := map[string]*prompb.TimeSeries{} for _, q := range req.Queries { command, err := c.buildCommand(q) @@ -120,17 +127,18 @@ func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) { return nil, err } - query := influx.NewQuery(command, c.database, "ms") - resp, err := c.client.Query(query) + resp, err := queryAPI.Query(c.context, command) if err != nil { return nil, err } - if resp.Err != "" { - return nil, errors.New(resp.Err) + if resp.Err() != nil { + return nil, resp.Err() } - if err = mergeResult(labelsToSeries, resp.Results); err != nil { - return nil, err + for resp.Next() { + if err = mergeResult(labelsToSeries, resp.Record()); err != nil { + return nil, err + } } } @@ -146,17 +154,20 @@ func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) { } func (c *Client) buildCommand(q *prompb.Query) (string, error) { - matchers := make([]string, 0, len(q.Matchers)) + rangeInNs := fmt.Sprintf("start: time(v: %v), stop: time(v: %v)", q.StartTimestampMs*time.Millisecond.Nanoseconds(), q.EndTimestampMs*time.Millisecond.Nanoseconds()) + // If we don't find a metric name matcher, query all metrics // (InfluxDB measurements) by default. - from := "FROM /.+/" + measurement := `r._measurement` + matchers := make([]string, 0, len(q.Matchers)) + var joinedMatchers string for _, m := range q.Matchers { if m.Name == model.MetricNameLabel { switch m.Type { case prompb.LabelMatcher_EQ: - from = fmt.Sprintf("FROM %q.%q", c.retentionPolicy, m.Value) + measurement += fmt.Sprintf(" == \"%s\"", m.Value) case prompb.LabelMatcher_RE: - from = fmt.Sprintf("FROM %q./^%s$/", c.retentionPolicy, escapeSlashes(m.Value)) + measurement += fmt.Sprintf(" =~ /%s/", escapeSlashes(m.Value)) default: // TODO: Figure out how to support these efficiently. return "", errors.New("non-equal or regex-non-equal matchers are not supported on the metric name yet") @@ -166,21 +177,28 @@ func (c *Client) buildCommand(q *prompb.Query) (string, error) { switch m.Type { case prompb.LabelMatcher_EQ: - matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value))) + matchers = append(matchers, fmt.Sprintf("r.%s == \"%s\"", m.Name, escapeSingleQuotes(m.Value))) case prompb.LabelMatcher_NEQ: - matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value))) + matchers = append(matchers, fmt.Sprintf("r.%s != \"%s\"", m.Name, escapeSingleQuotes(m.Value))) case prompb.LabelMatcher_RE: - matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value))) + matchers = append(matchers, fmt.Sprintf("r.%s =~ /%s/", m.Name, escapeSingleQuotes(m.Value))) case prompb.LabelMatcher_NRE: - matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value))) + matchers = append(matchers, fmt.Sprintf("r.%s !~ /%s/", m.Name, escapeSingleQuotes(m.Value))) default: return "", fmt.Errorf("unknown match type %v", m.Type) } } - matchers = append(matchers, fmt.Sprintf("time >= %vms", q.StartTimestampMs)) - matchers = append(matchers, fmt.Sprintf("time <= %vms", q.EndTimestampMs)) + if len(matchers) > 0 { + joinedMatchers = fmt.Sprintf(" and %s", strings.Join(matchers, " and ")) + } - return fmt.Sprintf("SELECT value %s WHERE %v GROUP BY *", from, strings.Join(matchers, " AND ")), nil + // _measurement must be retained, otherwise "invalid metric name" shall be thrown + command := fmt.Sprintf( + "from(bucket: \"%s\") |> range(%s) |> filter(fn: (r) => %s%s)", + c.bucket, rangeInNs, measurement, joinedMatchers, + ) + + return command, nil } func escapeSingleQuotes(str string) string { @@ -191,44 +209,60 @@ func escapeSlashes(str string) string { return strings.ReplaceAll(str, `/`, `\/`) } -func mergeResult(labelsToSeries map[string]*prompb.TimeSeries, results []influx.Result) error { - for _, r := range results { - for _, s := range r.Series { - k := concatLabels(s.Tags) - ts, ok := labelsToSeries[k] - if !ok { - ts = &prompb.TimeSeries{ - Labels: tagsToLabelPairs(s.Name, s.Tags), - } - labelsToSeries[k] = ts - } +func mergeResult(labelsToSeries map[string]*prompb.TimeSeries, record *query.FluxRecord) error { + builtIntime := record.Time() + builtInvalue := record.Value() + builtInMeasurement := record.Measurement() + labels := record.Values() - samples, err := valuesToSamples(s.Values) - if err != nil { - return err - } + filterOutBuiltInLabels(labels) - ts.Samples = mergeSamples(ts.Samples, samples) + k := concatLabels(labels) + + ts, ok := labelsToSeries[k] + if !ok { + ts = &prompb.TimeSeries{ + Labels: tagsToLabelPairs(builtInMeasurement, labels), } + labelsToSeries[k] = ts } + + sample, err := valuesToSamples(builtIntime, builtInvalue) + if err != nil { + return err + } + + ts.Samples = mergeSamples(ts.Samples, []prompb.Sample{sample}) + return nil } -func concatLabels(labels map[string]string) string { +func filterOutBuiltInLabels(labels map[string]interface{}) { + delete(labels, "table") + delete(labels, "_start") + delete(labels, "_stop") + delete(labels, "_time") + delete(labels, "_value") + delete(labels, "_field") + delete(labels, "result") + delete(labels, "_measurement") +} + +func concatLabels(labels map[string]interface{}) string { // 0xff cannot occur in valid UTF-8 sequences, so use it // as a separator here. separator := "\xff" pairs := make([]string, 0, len(labels)) for k, v := range labels { - pairs = append(pairs, k+separator+v) + pairs = append(pairs, fmt.Sprintf("%s%s%v", k, separator, v)) } return strings.Join(pairs, separator) } -func tagsToLabelPairs(name string, tags map[string]string) []prompb.Label { +func tagsToLabelPairs(name string, tags map[string]interface{}) []prompb.Label { pairs := make([]prompb.Label, 0, len(tags)) for k, v := range tags { - if v == "" { + if v == nil { // If we select metrics with different sets of labels names, // InfluxDB returns *all* possible tag names on all returned // series, with empty tag values on series where they don't @@ -239,7 +273,7 @@ func tagsToLabelPairs(name string, tags map[string]string) []prompb.Label { } pairs = append(pairs, prompb.Label{ Name: k, - Value: v, + Value: fmt.Sprintf("%v", v), }) } pairs = append(pairs, prompb.Label{ @@ -249,39 +283,22 @@ func tagsToLabelPairs(name string, tags map[string]string) []prompb.Label { return pairs } -func valuesToSamples(values [][]interface{}) ([]prompb.Sample, error) { - samples := make([]prompb.Sample, 0, len(values)) - for _, v := range values { - if len(v) != 2 { - return nil, fmt.Errorf("bad sample tuple length, expected [, ], got %v", v) +func valuesToSamples(timestamp time.Time, value interface{}) (prompb.Sample, error) { + var valueFloat64 float64 + var valueInt64 int64 + var ok bool + if valueFloat64, ok = value.(float64); !ok { + if valueInt64, ok = value.(int64); ok { + valueFloat64 = float64(valueInt64) + } else { + return prompb.Sample{}, fmt.Errorf("unable to convert sample value to float64: %v", value) } - - jsonTimestamp, ok := v[0].(json.Number) - if !ok { - return nil, fmt.Errorf("bad timestamp: %v", v[0]) - } - - jsonValue, ok := v[1].(json.Number) - if !ok { - return nil, fmt.Errorf("bad sample value: %v", v[1]) - } - - timestamp, err := jsonTimestamp.Int64() - if err != nil { - return nil, fmt.Errorf("unable to convert sample timestamp to int64: %w", err) - } - - value, err := jsonValue.Float64() - if err != nil { - return nil, fmt.Errorf("unable to convert sample value to float64: %w", err) - } - - samples = append(samples, prompb.Sample{ - Timestamp: timestamp, - Value: value, - }) } - return samples, nil + + return prompb.Sample{ + Timestamp: timestamp.UnixMilli(), + Value: valueFloat64, + }, nil } // mergeSamples merges two lists of sample pairs and removes duplicate diff --git a/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client_test.go b/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client_test.go index a738c01dcd..3f756a9be4 100644 --- a/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client_test.go +++ b/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client_test.go @@ -20,9 +20,7 @@ import ( "net/http/httptest" "net/url" "testing" - "time" - influx "github.com/influxdata/influxdb/client/v2" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) @@ -69,13 +67,14 @@ func TestClient(t *testing.T) { } expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123 + testmetric,test_label=test_label_value2 value=5.1234 123456789123 ` server := httptest.NewServer(http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { require.Equal(t, http.MethodPost, r.Method, "Unexpected method.") - require.Equal(t, "/write", r.URL.Path, "Unexpected path.") + require.Equal(t, "/api/v2/write", r.URL.Path, "Unexpected path.") b, err := io.ReadAll(r.Body) require.NoError(t, err, "Error reading body.") require.Equal(t, expectedBody, string(b), "Unexpected request body.") @@ -86,13 +85,7 @@ testmetric,test_label=test_label_value2 value=5.1234 123456789123 serverURL, err := url.Parse(server.URL) require.NoError(t, err, "Unable to parse server URL.") - conf := influx.HTTPConfig{ - Addr: serverURL.String(), - Username: "testuser", - Password: "testpass", - Timeout: time.Minute, - } - c := NewClient(nil, conf, "test_db", "default") + c := NewClient(nil, serverURL.String(), "auth_token", "test_organization", "test_bucket") err = c.Write(samples) require.NoError(t, err, "Error sending samples.") } diff --git a/documentation/examples/remote_storage/remote_storage_adapter/main.go b/documentation/examples/remote_storage/remote_storage_adapter/main.go index 7f62990d2e..9ea9b8e5f9 100644 --- a/documentation/examples/remote_storage/remote_storage_adapter/main.go +++ b/documentation/examples/remote_storage/remote_storage_adapter/main.go @@ -29,7 +29,6 @@ import ( "github.com/alecthomas/kingpin/v2" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" - influx "github.com/influxdata/influxdb/client/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/model" @@ -44,19 +43,18 @@ import ( ) type config struct { - graphiteAddress string - graphiteTransport string - graphitePrefix string - opentsdbURL string - influxdbURL string - influxdbRetentionPolicy string - influxdbUsername string - influxdbDatabase string - influxdbPassword string - remoteTimeout time.Duration - listenAddr string - telemetryPath string - promslogConfig promslog.Config + graphiteAddress string + graphiteTransport string + graphitePrefix string + opentsdbURL string + influxdbURL string + bucket string + organization string + influxdbAuthToken string + remoteTimeout time.Duration + listenAddr string + telemetryPath string + promslogConfig promslog.Config } var ( @@ -118,8 +116,8 @@ func parseFlags() *config { a.HelpFlag.Short('h') cfg := &config{ - influxdbPassword: os.Getenv("INFLUXDB_PW"), - promslogConfig: promslog.Config{}, + influxdbAuthToken: os.Getenv("INFLUXDB_AUTH_TOKEN"), + promslogConfig: promslog.Config{}, } a.Flag("graphite-address", "The host:port of the Graphite server to send samples to. None, if empty."). @@ -132,12 +130,10 @@ func parseFlags() *config { Default("").StringVar(&cfg.opentsdbURL) a.Flag("influxdb-url", "The URL of the remote InfluxDB server to send samples to. None, if empty."). Default("").StringVar(&cfg.influxdbURL) - a.Flag("influxdb.retention-policy", "The InfluxDB retention policy to use."). - Default("autogen").StringVar(&cfg.influxdbRetentionPolicy) - a.Flag("influxdb.username", "The username to use when sending samples to InfluxDB. The corresponding password must be provided via the INFLUXDB_PW environment variable."). - Default("").StringVar(&cfg.influxdbUsername) - a.Flag("influxdb.database", "The name of the database to use for storing samples in InfluxDB."). - Default("prometheus").StringVar(&cfg.influxdbDatabase) + a.Flag("influxdb.bucket", "The InfluxDB bucket to use."). + Default("").StringVar(&cfg.bucket) + a.Flag("influxdb.organization", "The name of the organization to use for storing samples in InfluxDB."). + Default("").StringVar(&cfg.organization) a.Flag("send-timeout", "The timeout to use when sending samples to the remote storage."). Default("30s").DurationVar(&cfg.remoteTimeout) a.Flag("web.listen-address", "Address to listen on for web endpoints."). @@ -191,17 +187,12 @@ func buildClients(logger *slog.Logger, cfg *config) ([]writer, []reader) { logger.Error("Failed to parse InfluxDB URL", "url", cfg.influxdbURL, "err", err) os.Exit(1) } - conf := influx.HTTPConfig{ - Addr: url.String(), - Username: cfg.influxdbUsername, - Password: cfg.influxdbPassword, - Timeout: cfg.remoteTimeout, - } c := influxdb.NewClient( logger.With("storage", "InfluxDB"), - conf, - cfg.influxdbDatabase, - cfg.influxdbRetentionPolicy, + url.String(), + cfg.influxdbAuthToken, + cfg.organization, + cfg.bucket, ) prometheus.MustRegister(c) writers = append(writers, c)