diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index b75a7896e..b291cf2bc 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,6 +1,7 @@ /web/ui @juliusv /web/ui/module @juliusv @nexucis /storage/remote @csmarchbanks @cstyan @bwplotka @tomwilkie +/storage/remote/otlptranslator @gouthamve @jesusvazquez /discovery/kubernetes @brancz /tsdb @jesusvazquez /promql @roidelapluie diff --git a/.golangci.yml b/.golangci.yml index 4a6daae59..5aa01e48a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,6 +3,9 @@ run: skip-files: # Skip autogenerated files. - ^.*\.(pb|y)\.go$ + skip-dirs: + # Copied it from a different source + - storage/remote/otlptranslator/prometheusremotewrite output: sort-results: true diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index debc0d3f9..d48898b94 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -170,6 +170,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "remote-write-receiver": c.web.EnableRemoteWriteReceiver = true level.Warn(logger).Log("msg", "Remote write receiver enabled via feature flag remote-write-receiver. This is DEPRECATED. Use --web.enable-remote-write-receiver.") + case "otlp-write-receiver": + c.web.EnableOTLPWriteReceiver = true + level.Info(logger).Log("msg", "Experimental OTLP write receiver enabled") case "expand-external-labels": c.enableExpandExternalLabels = true level.Info(logger).Log("msg", "Experimental expand-external-labels enabled") @@ -420,7 +423,7 @@ func main() { a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates."). Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) promlogflag.AddFlags(a, &cfg.promlogConfig) diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 46f286e00..78ec205f2 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -52,7 +52,7 @@ The Prometheus monitoring server | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | --query.max-samples | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | -| --enable-feature | Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | +| --enable-feature | Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | --log.level | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | --log.format | Output format of log messages. One of: [logfmt, json] | `logfmt` | diff --git a/go.mod b/go.mod index 82e53f185..1ddf77184 100644 --- a/go.mod +++ b/go.mod @@ -54,6 +54,8 @@ require ( github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c github.com/stretchr/testify v1.8.4 github.com/vultr/govultr/v2 v2.17.2 + go.opentelemetry.io/collector/pdata v0.66.0 + go.opentelemetry.io/collector/semconv v0.81.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 @@ -64,6 +66,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/automaxprocs v1.5.2 go.uber.org/goleak v1.2.1 + go.uber.org/multierr v1.8.0 golang.org/x/net v0.12.0 golang.org/x/oauth2 v0.10.0 golang.org/x/sync v0.3.0 diff --git a/go.sum b/go.sum index 4a5496485..63c60005a 100644 --- a/go.sum +++ b/go.sum @@ -448,7 +448,7 @@ github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= -github.com/hashicorp/go-version v1.2.1 h1:zEfKbn2+PDgroKdiOzqiE8rsmLqU2uwi5PB5pBJ3TkI= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -797,6 +797,10 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/collector/pdata v0.66.0 h1:UdE5U6MsDNzuiWaXdjGx2lC3ElVqWmN/hiUE8vyvSuM= +go.opentelemetry.io/collector/pdata v0.66.0/go.mod h1:pqyaznLzk21m+1KL6fwOsRryRELL+zNM0qiVSn0MbVc= +go.opentelemetry.io/collector/semconv v0.81.0 h1:lCYNNo3powDvFIaTPP2jDKIrBiV1T92NK4QgL/aHYXw= +go.opentelemetry.io/collector/semconv v0.81.0/go.mod h1:TlYPtzvsXyHOgr5eATi43qEMqwSmIziivJB2uctKswo= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY7+onl4qN1vl0xW/V/v6OBZ0vVdH+esuJgvmM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0/go.mod h1:XiYsayHc36K3EByOO6nbAXnAWbrUxdjUROCEeeROOH8= go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= @@ -820,6 +824,7 @@ go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lI go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME= @@ -828,6 +833,8 @@ go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= +go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 3f426204e..4927c16fd 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -14,6 +14,7 @@ package remote import ( + "compress/gzip" "errors" "fmt" "io" @@ -26,6 +27,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/common/model" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "golang.org/x/exp/slices" "github.com/prometheus/prometheus/model/exemplar" @@ -38,8 +40,13 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" ) -// decodeReadLimit is the maximum size of a read request body in bytes. -const decodeReadLimit = 32 * 1024 * 1024 +const ( + // decodeReadLimit is the maximum size of a read request body in bytes. + decodeReadLimit = 32 * 1024 * 1024 + + pbContentType = "application/x-protobuf" + jsonContentType = "application/json" +) type HTTPError struct { msg string @@ -806,3 +813,56 @@ func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { return &req, nil } + +func DecodeOTLPWriteRequest(r *http.Request) (pmetricotlp.ExportRequest, error) { + contentType := r.Header.Get("Content-Type") + var decoderFunc func(buf []byte) (pmetricotlp.ExportRequest, error) + switch contentType { + case pbContentType: + decoderFunc = func(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + return req, req.UnmarshalProto(buf) + } + + case jsonContentType: + decoderFunc = func(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + return req, req.UnmarshalJSON(buf) + } + + default: + return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported content type: %s, supported: [%s, %s]", contentType, jsonContentType, pbContentType) + } + + reader := r.Body + // Handle compression. + switch r.Header.Get("Content-Encoding") { + case "gzip": + gr, err := gzip.NewReader(reader) + if err != nil { + return pmetricotlp.NewExportRequest(), err + } + reader = gr + + case "": + // No compression. + + default: + return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported compression: %s. Only \"gzip\" or no compression supported", r.Header.Get("Content-Encoding")) + } + + body, err := io.ReadAll(reader) + if err != nil { + r.Body.Close() + return pmetricotlp.NewExportRequest(), err + } + if err = r.Body.Close(); err != nil { + return pmetricotlp.NewExportRequest(), err + } + otlpReq, err := decoderFunc(body) + if err != nil { + return pmetricotlp.NewExportRequest(), err + } + + return otlpReq, nil +} diff --git a/storage/remote/otlptranslator/README.md b/storage/remote/otlptranslator/README.md new file mode 100644 index 000000000..c2b04e5af --- /dev/null +++ b/storage/remote/otlptranslator/README.md @@ -0,0 +1,23 @@ +## Copying from opentelemetry/opentelemetry-collector-contrib + +This files in the `prometheus/` and `prometheusremotewrite/` are copied from the OpenTelemetry Project[^1]. + +This is done instead of adding a go.mod dependency because OpenTelemetry depends on `prometheus/prometheus` and a cyclic dependency will be created. This is just a temporary solution and the long-term solution is to move the required packages from OpenTelemetry into `prometheus/prometheus`. +We don't copy in `./prometheus` through this script because that package imports a collector specific featuregate package we don't want to import. The featuregate package is being removed now, and in the future we will copy this folder too. + +To update the dependency is a multi-step process: +1. Vendor the latest `prometheus/prometheus`@`main` into [`opentelemetry/opentelemetry-collector-contrib`](https://github.com/open-telemetry/opentelemetry-collector-contrib) +1. Update the VERSION in `update-copy.sh`. +1. Run `./update-copy.sh`. + +### Why copy? + +This is because the packages we copy depend on the [`prompb`](https://github.com/prometheus/prometheus/blob/main/prompb) package. While the package is relatively stable, there are still changes. For example, https://github.com/prometheus/prometheus/pull/11935 changed the types. +This means if we depend on the upstream packages directly, we will never able to make the changes like above. Hence we're copying the code for now. + +### I need to manually change these files + +When we do want to make changes to the types in `prompb`, we might need to edit the files directly. That is OK, please let @gouthamve or @jesusvazquez know so they can take care of updating the upstream code (by vendoring in `prometheus/prometheus` upstream and resolving conflicts) and then will run the copy +script again to keep things updated. + +[^1]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/prometheus and https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/prometheusremotewrite \ No newline at end of file diff --git a/storage/remote/otlptranslator/prometheus/normalize_label.go b/storage/remote/otlptranslator/prometheus/normalize_label.go new file mode 100644 index 000000000..cfcb5652e --- /dev/null +++ b/storage/remote/otlptranslator/prometheus/normalize_label.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package normalize + +import ( + "strings" + "unicode" +) + +// Normalizes the specified label to follow Prometheus label names standard +// +// See rules at https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels +// +// Labels that start with non-letter rune will be prefixed with "key_" +// +// Exception is made for double-underscores which are allowed +func NormalizeLabel(label string) string { + // Trivial case + if len(label) == 0 { + return label + } + + // Replace all non-alphanumeric runes with underscores + label = strings.Map(sanitizeRune, label) + + // If label starts with a number, prepend with "key_" + if unicode.IsDigit(rune(label[0])) { + label = "key_" + label + } + + return label +} + +// Return '_' for anything non-alphanumeric +func sanitizeRune(r rune) rune { + if unicode.IsLetter(r) || unicode.IsDigit(r) { + return r + } + return '_' +} diff --git a/storage/remote/otlptranslator/prometheus/normalize_label_test.go b/storage/remote/otlptranslator/prometheus/normalize_label_test.go new file mode 100644 index 000000000..7346b20f9 --- /dev/null +++ b/storage/remote/otlptranslator/prometheus/normalize_label_test.go @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package normalize + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSanitizeDropSanitization(t *testing.T) { + require.Equal(t, "", NormalizeLabel("")) + require.Equal(t, "_test", NormalizeLabel("_test")) + require.Equal(t, "key_0test", NormalizeLabel("0test")) + require.Equal(t, "test", NormalizeLabel("test")) + require.Equal(t, "test__", NormalizeLabel("test_/")) + require.Equal(t, "__test", NormalizeLabel("__test")) +} diff --git a/storage/remote/otlptranslator/prometheus/normalize_name.go b/storage/remote/otlptranslator/prometheus/normalize_name.go new file mode 100644 index 000000000..0b62a28f2 --- /dev/null +++ b/storage/remote/otlptranslator/prometheus/normalize_name.go @@ -0,0 +1,251 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package normalize + +import ( + "strings" + "unicode" + + "go.opentelemetry.io/collector/pdata/pmetric" +) + +// The map to translate OTLP units to Prometheus units +// OTLP metrics use the c/s notation as specified at https://ucum.org/ucum.html +// (See also https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/README.md#instrument-units) +// Prometheus best practices for units: https://prometheus.io/docs/practices/naming/#base-units +// OpenMetrics specification for units: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#units-and-base-units +var unitMap = map[string]string{ + // Time + "d": "days", + "h": "hours", + "min": "minutes", + "s": "seconds", + "ms": "milliseconds", + "us": "microseconds", + "ns": "nanoseconds", + + // Bytes + "By": "bytes", + "KiBy": "kibibytes", + "MiBy": "mebibytes", + "GiBy": "gibibytes", + "TiBy": "tibibytes", + "KBy": "kilobytes", + "MBy": "megabytes", + "GBy": "gigabytes", + "TBy": "terabytes", + "B": "bytes", + "KB": "kilobytes", + "MB": "megabytes", + "GB": "gigabytes", + "TB": "terabytes", + + // SI + "m": "meters", + "V": "volts", + "A": "amperes", + "J": "joules", + "W": "watts", + "g": "grams", + + // Misc + "Cel": "celsius", + "Hz": "hertz", + "1": "", + "%": "percent", + "$": "dollars", +} + +// The map that translates the "per" unit +// Example: s => per second (singular) +var perUnitMap = map[string]string{ + "s": "second", + "m": "minute", + "h": "hour", + "d": "day", + "w": "week", + "mo": "month", + "y": "year", +} + +// Build a Prometheus-compliant metric name for the specified metric +// +// Metric name is prefixed with specified namespace and underscore (if any). +// Namespace is not cleaned up. Make sure specified namespace follows Prometheus +// naming convention. +// +// See rules at https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels +// and https://prometheus.io/docs/practices/naming/#metric-and-label-naming +func BuildPromCompliantName(metric pmetric.Metric, namespace string) string { + // Split metric name in "tokens" (remove all non-alphanumeric) + nameTokens := strings.FieldsFunc( + metric.Name(), + func(r rune) bool { return !unicode.IsLetter(r) && !unicode.IsDigit(r) }, + ) + + // Split unit at the '/' if any + unitTokens := strings.SplitN(metric.Unit(), "/", 2) + + // Main unit + // Append if not blank, doesn't contain '{}', and is not present in metric name already + if len(unitTokens) > 0 { + mainUnitOtel := strings.TrimSpace(unitTokens[0]) + if mainUnitOtel != "" && !strings.ContainsAny(mainUnitOtel, "{}") { + mainUnitProm := CleanUpString(unitMapGetOrDefault(mainUnitOtel)) + if mainUnitProm != "" && !contains(nameTokens, mainUnitProm) { + nameTokens = append(nameTokens, mainUnitProm) + } + } + + // Per unit + // Append if not blank, doesn't contain '{}', and is not present in metric name already + if len(unitTokens) > 1 && unitTokens[1] != "" { + perUnitOtel := strings.TrimSpace(unitTokens[1]) + if perUnitOtel != "" && !strings.ContainsAny(perUnitOtel, "{}") { + perUnitProm := CleanUpString(perUnitMapGetOrDefault(perUnitOtel)) + if perUnitProm != "" && !contains(nameTokens, perUnitProm) { + nameTokens = append(append(nameTokens, "per"), perUnitProm) + } + } + } + + } + + // Append _total for Counters + if metric.Type() == pmetric.MetricTypeSum && metric.Sum().IsMonotonic() { + nameTokens = append(removeItem(nameTokens, "total"), "total") + } + + // Append _ratio for metrics with unit "1" + // Some Otel receivers improperly use unit "1" for counters of objects + // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aissue+some+metric+units+don%27t+follow+otel+semantic+conventions + // Until these issues have been fixed, we're appending `_ratio` for gauges ONLY + // Theoretically, counters could be ratios as well, but it's absurd (for mathematical reasons) + if metric.Unit() == "1" && metric.Type() == pmetric.MetricTypeGauge { + nameTokens = append(removeItem(nameTokens, "ratio"), "ratio") + } + + // Namespace? + if namespace != "" { + nameTokens = append([]string{namespace}, nameTokens...) + } + + // Build the string from the tokens, separated with underscores + normalizedName := strings.Join(nameTokens, "_") + + // Metric name cannot start with a digit, so prefix it with "_" in this case + if normalizedName != "" && unicode.IsDigit(rune(normalizedName[0])) { + normalizedName = "_" + normalizedName + } + + return normalizedName +} + +// TrimPromSuffixes trims type and unit prometheus suffixes from a metric name. +// Following the [OpenTelemetry specs] for converting Prometheus Metric points to OTLP. +// +// [OpenTelemetry specs]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#metric-metadata +func TrimPromSuffixes(promName string, metricType pmetric.MetricType, unit string) string { + nameTokens := strings.Split(promName, "_") + if len(nameTokens) == 1 { + return promName + } + + nameTokens = removeTypeSuffixes(nameTokens, metricType) + nameTokens = removeUnitSuffixes(nameTokens, unit) + + return strings.Join(nameTokens, "_") +} + +func removeTypeSuffixes(tokens []string, metricType pmetric.MetricType) []string { + switch metricType { + case pmetric.MetricTypeSum: + // Only counters are expected to have a type suffix at this point. + // for other types, suffixes are removed during scrape. + return removeSuffix(tokens, "total") + default: + return tokens + } +} + +func removeUnitSuffixes(nameTokens []string, unit string) []string { + l := len(nameTokens) + unitTokens := strings.Split(unit, "_") + lu := len(unitTokens) + + if lu == 0 || l <= lu { + return nameTokens + } + + suffixed := true + for i := range unitTokens { + if nameTokens[l-i-1] != unitTokens[lu-i-1] { + suffixed = false + break + } + } + + if suffixed { + return nameTokens[:l-lu] + } + + return nameTokens +} + +func removeSuffix(tokens []string, suffix string) []string { + l := len(tokens) + if tokens[l-1] == suffix { + return tokens[:l-1] + } + + return tokens +} + +// Clean up specified string so it's Prometheus compliant +func CleanUpString(s string) string { + return strings.Join(strings.FieldsFunc(s, func(r rune) bool { return !unicode.IsLetter(r) && !unicode.IsDigit(r) }), "_") +} + +func RemovePromForbiddenRunes(s string) string { + return strings.Join(strings.FieldsFunc(s, func(r rune) bool { return !unicode.IsLetter(r) && !unicode.IsDigit(r) && r != '_' && r != ':' }), "_") +} + +// Retrieve the Prometheus "basic" unit corresponding to the specified "basic" unit +// Returns the specified unit if not found in unitMap +func unitMapGetOrDefault(unit string) string { + if promUnit, ok := unitMap[unit]; ok { + return promUnit + } + return unit +} + +// Retrieve the Prometheus "per" unit corresponding to the specified "per" unit +// Returns the specified unit if not found in perUnitMap +func perUnitMapGetOrDefault(perUnit string) string { + if promPerUnit, ok := perUnitMap[perUnit]; ok { + return promPerUnit + } + return perUnit +} + +// Returns whether the slice contains the specified value +func contains(slice []string, value string) bool { + for _, sliceEntry := range slice { + if sliceEntry == value { + return true + } + } + return false +} + +// Remove the specified value from the slice +func removeItem(slice []string, value string) []string { + newSlice := make([]string, 0, len(slice)) + for _, sliceEntry := range slice { + if sliceEntry != value { + newSlice = append(newSlice, sliceEntry) + } + } + return newSlice +} diff --git a/storage/remote/otlptranslator/prometheus/normalize_name_test.go b/storage/remote/otlptranslator/prometheus/normalize_name_test.go new file mode 100644 index 000000000..33910636a --- /dev/null +++ b/storage/remote/otlptranslator/prometheus/normalize_name_test.go @@ -0,0 +1,180 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package normalize + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func TestByte(t *testing.T) { + require.Equal(t, "system_filesystem_usage_bytes", BuildPromCompliantName(createGauge("system.filesystem.usage", "By"), "")) +} + +func TestByteCounter(t *testing.T) { + require.Equal(t, "system_io_bytes_total", BuildPromCompliantName(createCounter("system.io", "By"), "")) + require.Equal(t, "network_transmitted_bytes_total", BuildPromCompliantName(createCounter("network_transmitted_bytes_total", "By"), "")) +} + +func TestWhiteSpaces(t *testing.T) { + require.Equal(t, "system_filesystem_usage_bytes", BuildPromCompliantName(createGauge("\t system.filesystem.usage ", " By\t"), "")) +} + +func TestNonStandardUnit(t *testing.T) { + require.Equal(t, "system_network_dropped", BuildPromCompliantName(createGauge("system.network.dropped", "{packets}"), "")) +} + +func TestNonStandardUnitCounter(t *testing.T) { + require.Equal(t, "system_network_dropped_total", BuildPromCompliantName(createCounter("system.network.dropped", "{packets}"), "")) +} + +func TestBrokenUnit(t *testing.T) { + require.Equal(t, "system_network_dropped_packets", BuildPromCompliantName(createGauge("system.network.dropped", "packets"), "")) + require.Equal(t, "system_network_packets_dropped", BuildPromCompliantName(createGauge("system.network.packets.dropped", "packets"), "")) + require.Equal(t, "system_network_packets", BuildPromCompliantName(createGauge("system.network.packets", "packets"), "")) +} + +func TestBrokenUnitCounter(t *testing.T) { + require.Equal(t, "system_network_dropped_packets_total", BuildPromCompliantName(createCounter("system.network.dropped", "packets"), "")) + require.Equal(t, "system_network_packets_dropped_total", BuildPromCompliantName(createCounter("system.network.packets.dropped", "packets"), "")) + require.Equal(t, "system_network_packets_total", BuildPromCompliantName(createCounter("system.network.packets", "packets"), "")) +} + +func TestRatio(t *testing.T) { + require.Equal(t, "hw_gpu_memory_utilization_ratio", BuildPromCompliantName(createGauge("hw.gpu.memory.utilization", "1"), "")) + require.Equal(t, "hw_fan_speed_ratio", BuildPromCompliantName(createGauge("hw.fan.speed_ratio", "1"), "")) + require.Equal(t, "objects_total", BuildPromCompliantName(createCounter("objects", "1"), "")) +} + +func TestHertz(t *testing.T) { + require.Equal(t, "hw_cpu_speed_limit_hertz", BuildPromCompliantName(createGauge("hw.cpu.speed_limit", "Hz"), "")) +} + +func TestPer(t *testing.T) { + require.Equal(t, "broken_metric_speed_km_per_hour", BuildPromCompliantName(createGauge("broken.metric.speed", "km/h"), "")) + require.Equal(t, "astro_light_speed_limit_meters_per_second", BuildPromCompliantName(createGauge("astro.light.speed_limit", "m/s"), "")) +} + +func TestPercent(t *testing.T) { + require.Equal(t, "broken_metric_success_ratio_percent", BuildPromCompliantName(createGauge("broken.metric.success_ratio", "%"), "")) + require.Equal(t, "broken_metric_success_percent", BuildPromCompliantName(createGauge("broken.metric.success_percent", "%"), "")) +} + +func TestDollar(t *testing.T) { + require.Equal(t, "crypto_bitcoin_value_dollars", BuildPromCompliantName(createGauge("crypto.bitcoin.value", "$"), "")) + require.Equal(t, "crypto_bitcoin_value_dollars", BuildPromCompliantName(createGauge("crypto.bitcoin.value.dollars", "$"), "")) +} + +func TestEmpty(t *testing.T) { + require.Equal(t, "test_metric_no_unit", BuildPromCompliantName(createGauge("test.metric.no_unit", ""), "")) + require.Equal(t, "test_metric_spaces", BuildPromCompliantName(createGauge("test.metric.spaces", " \t "), "")) +} + +func TestUnsupportedRunes(t *testing.T) { + require.Equal(t, "unsupported_metric_temperature_F", BuildPromCompliantName(createGauge("unsupported.metric.temperature", "°F"), "")) + require.Equal(t, "unsupported_metric_weird", BuildPromCompliantName(createGauge("unsupported.metric.weird", "+=.:,!* & #"), "")) + require.Equal(t, "unsupported_metric_redundant_test_per_C", BuildPromCompliantName(createGauge("unsupported.metric.redundant", "__test $/°C"), "")) +} + +func TestOtelReceivers(t *testing.T) { + require.Equal(t, "active_directory_ds_replication_network_io_bytes_total", BuildPromCompliantName(createCounter("active_directory.ds.replication.network.io", "By"), "")) + require.Equal(t, "active_directory_ds_replication_sync_object_pending_total", BuildPromCompliantName(createCounter("active_directory.ds.replication.sync.object.pending", "{objects}"), "")) + require.Equal(t, "active_directory_ds_replication_object_rate_per_second", BuildPromCompliantName(createGauge("active_directory.ds.replication.object.rate", "{objects}/s"), "")) + require.Equal(t, "active_directory_ds_name_cache_hit_rate_percent", BuildPromCompliantName(createGauge("active_directory.ds.name_cache.hit_rate", "%"), "")) + require.Equal(t, "active_directory_ds_ldap_bind_last_successful_time_milliseconds", BuildPromCompliantName(createGauge("active_directory.ds.ldap.bind.last_successful.time", "ms"), "")) + require.Equal(t, "apache_current_connections", BuildPromCompliantName(createGauge("apache.current_connections", "connections"), "")) + require.Equal(t, "apache_workers_connections", BuildPromCompliantName(createGauge("apache.workers", "connections"), "")) + require.Equal(t, "apache_requests_total", BuildPromCompliantName(createCounter("apache.requests", "1"), "")) + require.Equal(t, "bigip_virtual_server_request_count_total", BuildPromCompliantName(createCounter("bigip.virtual_server.request.count", "{requests}"), "")) + require.Equal(t, "system_cpu_utilization_ratio", BuildPromCompliantName(createGauge("system.cpu.utilization", "1"), "")) + require.Equal(t, "system_disk_operation_time_seconds_total", BuildPromCompliantName(createCounter("system.disk.operation_time", "s"), "")) + require.Equal(t, "system_cpu_load_average_15m_ratio", BuildPromCompliantName(createGauge("system.cpu.load_average.15m", "1"), "")) + require.Equal(t, "memcached_operation_hit_ratio_percent", BuildPromCompliantName(createGauge("memcached.operation_hit_ratio", "%"), "")) + require.Equal(t, "mongodbatlas_process_asserts_per_second", BuildPromCompliantName(createGauge("mongodbatlas.process.asserts", "{assertions}/s"), "")) + require.Equal(t, "mongodbatlas_process_journaling_data_files_mebibytes", BuildPromCompliantName(createGauge("mongodbatlas.process.journaling.data_files", "MiBy"), "")) + require.Equal(t, "mongodbatlas_process_network_io_bytes_per_second", BuildPromCompliantName(createGauge("mongodbatlas.process.network.io", "By/s"), "")) + require.Equal(t, "mongodbatlas_process_oplog_rate_gibibytes_per_hour", BuildPromCompliantName(createGauge("mongodbatlas.process.oplog.rate", "GiBy/h"), "")) + require.Equal(t, "mongodbatlas_process_db_query_targeting_scanned_per_returned", BuildPromCompliantName(createGauge("mongodbatlas.process.db.query_targeting.scanned_per_returned", "{scanned}/{returned}"), "")) + require.Equal(t, "nginx_requests", BuildPromCompliantName(createGauge("nginx.requests", "requests"), "")) + require.Equal(t, "nginx_connections_accepted", BuildPromCompliantName(createGauge("nginx.connections_accepted", "connections"), "")) + require.Equal(t, "nsxt_node_memory_usage_kilobytes", BuildPromCompliantName(createGauge("nsxt.node.memory.usage", "KBy"), "")) + require.Equal(t, "redis_latest_fork_microseconds", BuildPromCompliantName(createGauge("redis.latest_fork", "us"), "")) +} + +func TestTrimPromSuffixes(t *testing.T) { + require.Equal(t, "active_directory_ds_replication_network_io", TrimPromSuffixes("active_directory_ds_replication_network_io_bytes_total", pmetric.MetricTypeSum, "bytes")) + require.Equal(t, "active_directory_ds_name_cache_hit_rate", TrimPromSuffixes("active_directory_ds_name_cache_hit_rate_percent", pmetric.MetricTypeGauge, "percent")) + require.Equal(t, "active_directory_ds_ldap_bind_last_successful_time", TrimPromSuffixes("active_directory_ds_ldap_bind_last_successful_time_milliseconds", pmetric.MetricTypeGauge, "milliseconds")) + require.Equal(t, "apache_requests", TrimPromSuffixes("apache_requests_total", pmetric.MetricTypeSum, "1")) + require.Equal(t, "system_cpu_utilization", TrimPromSuffixes("system_cpu_utilization_ratio", pmetric.MetricTypeGauge, "ratio")) + require.Equal(t, "mongodbatlas_process_journaling_data_files", TrimPromSuffixes("mongodbatlas_process_journaling_data_files_mebibytes", pmetric.MetricTypeGauge, "mebibytes")) + require.Equal(t, "mongodbatlas_process_network_io", TrimPromSuffixes("mongodbatlas_process_network_io_bytes_per_second", pmetric.MetricTypeGauge, "bytes_per_second")) + require.Equal(t, "mongodbatlas_process_oplog_rate", TrimPromSuffixes("mongodbatlas_process_oplog_rate_gibibytes_per_hour", pmetric.MetricTypeGauge, "gibibytes_per_hour")) + require.Equal(t, "nsxt_node_memory_usage", TrimPromSuffixes("nsxt_node_memory_usage_kilobytes", pmetric.MetricTypeGauge, "kilobytes")) + require.Equal(t, "redis_latest_fork", TrimPromSuffixes("redis_latest_fork_microseconds", pmetric.MetricTypeGauge, "microseconds")) + require.Equal(t, "up", TrimPromSuffixes("up", pmetric.MetricTypeGauge, "")) + + // These are not necessarily valid OM units, only tested for the sake of completeness. + require.Equal(t, "active_directory_ds_replication_sync_object_pending", TrimPromSuffixes("active_directory_ds_replication_sync_object_pending_total", pmetric.MetricTypeSum, "{objects}")) + require.Equal(t, "apache_current", TrimPromSuffixes("apache_current_connections", pmetric.MetricTypeGauge, "connections")) + require.Equal(t, "bigip_virtual_server_request_count", TrimPromSuffixes("bigip_virtual_server_request_count_total", pmetric.MetricTypeSum, "{requests}")) + require.Equal(t, "mongodbatlas_process_db_query_targeting_scanned_per_returned", TrimPromSuffixes("mongodbatlas_process_db_query_targeting_scanned_per_returned", pmetric.MetricTypeGauge, "{scanned}/{returned}")) + require.Equal(t, "nginx_connections_accepted", TrimPromSuffixes("nginx_connections_accepted", pmetric.MetricTypeGauge, "connections")) + require.Equal(t, "apache_workers", TrimPromSuffixes("apache_workers_connections", pmetric.MetricTypeGauge, "connections")) + require.Equal(t, "nginx", TrimPromSuffixes("nginx_requests", pmetric.MetricTypeGauge, "requests")) + + // Units shouldn't be trimmed if the unit is not a direct match with the suffix, i.e, a suffix "_seconds" shouldn't be removed if unit is "sec" or "s" + require.Equal(t, "system_cpu_load_average_15m_ratio", TrimPromSuffixes("system_cpu_load_average_15m_ratio", pmetric.MetricTypeGauge, "1")) + require.Equal(t, "mongodbatlas_process_asserts_per_second", TrimPromSuffixes("mongodbatlas_process_asserts_per_second", pmetric.MetricTypeGauge, "{assertions}/s")) + require.Equal(t, "memcached_operation_hit_ratio_percent", TrimPromSuffixes("memcached_operation_hit_ratio_percent", pmetric.MetricTypeGauge, "%")) + require.Equal(t, "active_directory_ds_replication_object_rate_per_second", TrimPromSuffixes("active_directory_ds_replication_object_rate_per_second", pmetric.MetricTypeGauge, "{objects}/s")) + require.Equal(t, "system_disk_operation_time_seconds", TrimPromSuffixes("system_disk_operation_time_seconds_total", pmetric.MetricTypeSum, "s")) +} + +func TestNamespace(t *testing.T) { + require.Equal(t, "space_test", BuildPromCompliantName(createGauge("test", ""), "space")) + require.Equal(t, "space_test", BuildPromCompliantName(createGauge("#test", ""), "space")) +} + +func TestCleanUpString(t *testing.T) { + require.Equal(t, "", CleanUpString("")) + require.Equal(t, "a_b", CleanUpString("a b")) + require.Equal(t, "hello_world", CleanUpString("hello, world!")) + require.Equal(t, "hello_you_2", CleanUpString("hello you 2")) + require.Equal(t, "1000", CleanUpString("$1000")) + require.Equal(t, "", CleanUpString("*+$^=)")) +} + +func TestUnitMapGetOrDefault(t *testing.T) { + require.Equal(t, "", unitMapGetOrDefault("")) + require.Equal(t, "seconds", unitMapGetOrDefault("s")) + require.Equal(t, "invalid", unitMapGetOrDefault("invalid")) +} + +func TestPerUnitMapGetOrDefault(t *testing.T) { + require.Equal(t, "", perUnitMapGetOrDefault("")) + require.Equal(t, "second", perUnitMapGetOrDefault("s")) + require.Equal(t, "invalid", perUnitMapGetOrDefault("invalid")) +} + +func TestRemoveItem(t *testing.T) { + require.Equal(t, []string{}, removeItem([]string{}, "test")) + require.Equal(t, []string{}, removeItem([]string{}, "")) + require.Equal(t, []string{"a", "b", "c"}, removeItem([]string{"a", "b", "c"}, "d")) + require.Equal(t, []string{"a", "b", "c"}, removeItem([]string{"a", "b", "c"}, "")) + require.Equal(t, []string{"a", "b"}, removeItem([]string{"a", "b", "c"}, "c")) + require.Equal(t, []string{"a", "c"}, removeItem([]string{"a", "b", "c"}, "b")) + require.Equal(t, []string{"b", "c"}, removeItem([]string{"a", "b", "c"}, "a")) +} + +func TestBuildPromCompliantName(t *testing.T) { + require.Equal(t, "system_io_bytes_total", BuildPromCompliantName(createCounter("system.io", "By"), "")) + require.Equal(t, "system_network_io_bytes_total", BuildPromCompliantName(createCounter("network.io", "By"), "system")) + require.Equal(t, "_3_14_digits", BuildPromCompliantName(createGauge("3.14 digits", ""), "")) + require.Equal(t, "envoy_rule_engine_zlib_buf_error", BuildPromCompliantName(createGauge("envoy__rule_engine_zlib_buf_error", ""), "")) + require.Equal(t, "foo_bar", BuildPromCompliantName(createGauge(":foo::bar", ""), "")) + require.Equal(t, "foo_bar_total", BuildPromCompliantName(createCounter(":foo::bar", ""), "")) +} diff --git a/storage/remote/otlptranslator/prometheus/testutils_test.go b/storage/remote/otlptranslator/prometheus/testutils_test.go new file mode 100644 index 000000000..1a45e46b0 --- /dev/null +++ b/storage/remote/otlptranslator/prometheus/testutils_test.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package normalize + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" +) + +var ilm pmetric.ScopeMetrics + +func init() { + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + ilm = resourceMetrics.ScopeMetrics().AppendEmpty() +} + +// Returns a new Metric of type "Gauge" with specified name and unit +func createGauge(name, unit string) pmetric.Metric { + gauge := ilm.Metrics().AppendEmpty() + gauge.SetName(name) + gauge.SetUnit(unit) + gauge.SetEmptyGauge() + return gauge +} + +// Returns a new Metric of type Monotonic Sum with specified name and unit +func createCounter(name, unit string) pmetric.Metric { + counter := ilm.Metrics().AppendEmpty() + counter.SetEmptySum().SetIsMonotonic(true) + counter.SetName(name) + counter.SetUnit(unit) + return counter +} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go new file mode 100644 index 000000000..beae5c80e --- /dev/null +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -0,0 +1,559 @@ +// DO NOT EDIT. COPIED AS-IS. SEE README.md + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" + +import ( + "encoding/hex" + "fmt" + "log" + "math" + "sort" + "strconv" + "strings" + "time" + "unicode/utf8" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + + prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" +) + +const ( + nameStr = "__name__" + sumStr = "_sum" + countStr = "_count" + bucketStr = "_bucket" + leStr = "le" + quantileStr = "quantile" + pInfStr = "+Inf" + createdSuffix = "_created" + // maxExemplarRunes is the maximum number of UTF-8 exemplar characters + // according to the prometheus specification + // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars + maxExemplarRunes = 128 + // Trace and Span id keys are defined as part of the spec: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification%2Fmetrics%2Fdatamodel.md#exemplars-2 + traceIDKey = "trace_id" + spanIDKey = "span_id" + infoType = "info" + targetMetricName = "target_info" +) + +type bucketBoundsData struct { + sig string + bound float64 +} + +// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds +type byBucketBoundsData []bucketBoundsData + +func (m byBucketBoundsData) Len() int { return len(m) } +func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound } +func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] } + +// ByLabelName enables the usage of sort.Sort() with a slice of labels +type ByLabelName []prompb.Label + +func (a ByLabelName) Len() int { return len(a) } +func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// addSample finds a TimeSeries in tsMap that corresponds to the label set labels, and add sample to the TimeSeries; it +// creates a new TimeSeries in the map if not found and returns the time series signature. +// tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil. +func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, labels []prompb.Label, + datatype string) string { + + if sample == nil || labels == nil || tsMap == nil { + return "" + } + + sig := timeSeriesSignature(datatype, &labels) + ts, ok := tsMap[sig] + + if ok { + ts.Samples = append(ts.Samples, *sample) + } else { + newTs := &prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{*sample}, + } + tsMap[sig] = newTs + } + + return sig +} + +// addExemplars finds a bucket bound that corresponds to the exemplars value and add the exemplar to the specific sig; +// we only add exemplars if samples are presents +// tsMap is unmodified if either of its parameters is nil and samples are nil. +func addExemplars(tsMap map[string]*prompb.TimeSeries, exemplars []prompb.Exemplar, bucketBoundsData []bucketBoundsData) { + if tsMap == nil || bucketBoundsData == nil || exemplars == nil { + return + } + + sort.Sort(byBucketBoundsData(bucketBoundsData)) + + for _, exemplar := range exemplars { + addExemplar(tsMap, bucketBoundsData, exemplar) + } +} + +func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBoundsData, exemplar prompb.Exemplar) { + for _, bucketBound := range bucketBounds { + sig := bucketBound.sig + bound := bucketBound.bound + + _, ok := tsMap[sig] + if ok { + if tsMap[sig].Samples != nil { + if exemplar.Value <= bound { + tsMap[sig].Exemplars = append(tsMap[sig].Exemplars, exemplar) + return + } + } + } + } +} + +// timeSeries return a string signature in the form of: +// +// TYPE-label1-value1- ... -labelN-valueN +// +// the label slice should not contain duplicate label names; this method sorts the slice by label name before creating +// the signature. +func timeSeriesSignature(datatype string, labels *[]prompb.Label) string { + b := strings.Builder{} + b.WriteString(datatype) + + sort.Sort(ByLabelName(*labels)) + + for _, lb := range *labels { + b.WriteString("-") + b.WriteString(lb.GetName()) + b.WriteString("-") + b.WriteString(lb.GetValue()) + } + + return b.String() +} + +// createAttributes creates a slice of Cortex Label with OTLP attributes and pairs of string values. +// Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is +// logged. Resultant label names are sanitized. +func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, extras ...string) []prompb.Label { + // map ensures no duplicate label name + l := map[string]prompb.Label{} + + // Ensure attributes are sorted by key for consistent merging of keys which + // collide when sanitized. + labels := make([]prompb.Label, 0, attributes.Len()) + attributes.Range(func(key string, value pcommon.Value) bool { + labels = append(labels, prompb.Label{Name: key, Value: value.AsString()}) + return true + }) + sort.Stable(ByLabelName(labels)) + + for _, label := range labels { + var finalKey = prometheustranslator.NormalizeLabel(label.Name) + if existingLabel, alreadyExists := l[finalKey]; alreadyExists { + existingLabel.Value = existingLabel.Value + ";" + label.Value + l[finalKey] = existingLabel + } else { + l[finalKey] = prompb.Label{ + Name: finalKey, + Value: label.Value, + } + } + } + + // Map service.name + service.namespace to job + if serviceName, ok := resource.Attributes().Get(conventions.AttributeServiceName); ok { + val := serviceName.AsString() + if serviceNamespace, ok := resource.Attributes().Get(conventions.AttributeServiceNamespace); ok { + val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val) + } + l[model.JobLabel] = prompb.Label{ + Name: model.JobLabel, + Value: val, + } + } + // Map service.instance.id to instance + if instance, ok := resource.Attributes().Get(conventions.AttributeServiceInstanceID); ok { + l[model.InstanceLabel] = prompb.Label{ + Name: model.InstanceLabel, + Value: instance.AsString(), + } + } + for key, value := range externalLabels { + // External labels have already been sanitized + if _, alreadyExists := l[key]; alreadyExists { + // Skip external labels if they are overridden by metric attributes + continue + } + l[key] = prompb.Label{ + Name: key, + Value: value, + } + } + + for i := 0; i < len(extras); i += 2 { + if i+1 >= len(extras) { + break + } + _, found := l[extras[i]] + if found { + log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.") + } + // internal labels should be maintained + name := extras[i] + if !(len(name) > 4 && name[:2] == "__" && name[len(name)-2:] == "__") { + name = prometheustranslator.NormalizeLabel(name) + } + l[name] = prompb.Label{ + Name: name, + Value: extras[i+1], + } + } + + s := make([]prompb.Label, 0, len(l)) + for _, lb := range l { + s = append(s, lb) + } + + return s +} + +// isValidAggregationTemporality checks whether an OTel metric has a valid +// aggregation temporality for conversion to a Prometheus metric. +func isValidAggregationTemporality(metric pmetric.Metric) bool { + switch metric.Type() { + case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary: + return true + case pmetric.MetricTypeSum: + return metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + case pmetric.MetricTypeHistogram: + return metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + case pmetric.MetricTypeExponentialHistogram: + return metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + } + return false +} + +// addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It +// ignore extra buckets if len(ExplicitBounds) > len(BucketCounts) +func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries) { + timestamp := convertTimeStamp(pt.Timestamp()) + // sum, count, and buckets of the histogram should append suffix to baseName + baseName := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) + + // If the sum is unset, it indicates the _sum metric point should be + // omitted + if pt.HasSum() { + // treat sum as a sample in an individual TimeSeries + sum := &prompb.Sample{ + Value: pt.Sum(), + Timestamp: timestamp, + } + if pt.Flags().NoRecordedValue() { + sum.Value = math.Float64frombits(value.StaleNaN) + } + + sumlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+sumStr) + addSample(tsMap, sum, sumlabels, metric.Type().String()) + + } + + // treat count as a sample in an individual TimeSeries + count := &prompb.Sample{ + Value: float64(pt.Count()), + Timestamp: timestamp, + } + if pt.Flags().NoRecordedValue() { + count.Value = math.Float64frombits(value.StaleNaN) + } + + countlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+countStr) + addSample(tsMap, count, countlabels, metric.Type().String()) + + // cumulative count for conversion to cumulative histogram + var cumulativeCount uint64 + + promExemplars := getPromExemplars[pmetric.HistogramDataPoint](pt) + + var bucketBounds []bucketBoundsData + + // process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1 + for i := 0; i < pt.ExplicitBounds().Len() && i < pt.BucketCounts().Len(); i++ { + bound := pt.ExplicitBounds().At(i) + cumulativeCount += pt.BucketCounts().At(i) + bucket := &prompb.Sample{ + Value: float64(cumulativeCount), + Timestamp: timestamp, + } + if pt.Flags().NoRecordedValue() { + bucket.Value = math.Float64frombits(value.StaleNaN) + } + boundStr := strconv.FormatFloat(bound, 'f', -1, 64) + labels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+bucketStr, leStr, boundStr) + sig := addSample(tsMap, bucket, labels, metric.Type().String()) + + bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: bound}) + } + // add le=+Inf bucket + infBucket := &prompb.Sample{ + Timestamp: timestamp, + } + if pt.Flags().NoRecordedValue() { + infBucket.Value = math.Float64frombits(value.StaleNaN) + } else { + infBucket.Value = float64(pt.Count()) + } + infLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+bucketStr, leStr, pInfStr) + sig := addSample(tsMap, infBucket, infLabels, metric.Type().String()) + + bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: math.Inf(1)}) + addExemplars(tsMap, promExemplars, bucketBounds) + + // add _created time series if needed + startTimestamp := pt.StartTimestamp() + if settings.ExportCreatedMetric && startTimestamp != 0 { + createdLabels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nameStr, + baseName+createdSuffix, + ) + addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String()) + } +} + +type exemplarType interface { + pmetric.ExponentialHistogramDataPoint | pmetric.HistogramDataPoint | pmetric.NumberDataPoint + Exemplars() pmetric.ExemplarSlice +} + +func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar { + var promExemplars []prompb.Exemplar + + for i := 0; i < pt.Exemplars().Len(); i++ { + exemplar := pt.Exemplars().At(i) + exemplarRunes := 0 + + promExemplar := &prompb.Exemplar{ + Value: exemplar.DoubleValue(), + Timestamp: timestamp.FromTime(exemplar.Timestamp().AsTime()), + } + if traceID := exemplar.TraceID(); !traceID.IsEmpty() { + val := hex.EncodeToString(traceID[:]) + exemplarRunes += utf8.RuneCountInString(traceIDKey) + utf8.RuneCountInString(val) + promLabel := prompb.Label{ + Name: traceIDKey, + Value: val, + } + promExemplar.Labels = append(promExemplar.Labels, promLabel) + } + if spanID := exemplar.SpanID(); !spanID.IsEmpty() { + val := hex.EncodeToString(spanID[:]) + exemplarRunes += utf8.RuneCountInString(spanIDKey) + utf8.RuneCountInString(val) + promLabel := prompb.Label{ + Name: spanIDKey, + Value: val, + } + promExemplar.Labels = append(promExemplar.Labels, promLabel) + } + var labelsFromAttributes []prompb.Label + + exemplar.FilteredAttributes().Range(func(key string, value pcommon.Value) bool { + val := value.AsString() + exemplarRunes += utf8.RuneCountInString(key) + utf8.RuneCountInString(val) + promLabel := prompb.Label{ + Name: key, + Value: val, + } + + labelsFromAttributes = append(labelsFromAttributes, promLabel) + + return true + }) + if exemplarRunes <= maxExemplarRunes { + // only append filtered attributes if it does not cause exemplar + // labels to exceed the max number of runes + promExemplar.Labels = append(promExemplar.Labels, labelsFromAttributes...) + } + + promExemplars = append(promExemplars, *promExemplar) + } + + return promExemplars +} + +// mostRecentTimestampInMetric returns the latest timestamp in a batch of metrics +func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp { + var ts pcommon.Timestamp + // handle individual metric based on type + switch metric.Type() { + case pmetric.MetricTypeGauge: + dataPoints := metric.Gauge().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = maxTimestamp(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeSum: + dataPoints := metric.Sum().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = maxTimestamp(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeHistogram: + dataPoints := metric.Histogram().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = maxTimestamp(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = maxTimestamp(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeSummary: + dataPoints := metric.Summary().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = maxTimestamp(ts, dataPoints.At(x).Timestamp()) + } + } + return ts +} + +func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp { + if a > b { + return a + } + return b +} + +// addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples. +func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, + tsMap map[string]*prompb.TimeSeries) { + timestamp := convertTimeStamp(pt.Timestamp()) + // sum and count of the summary should append suffix to baseName + baseName := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) + // treat sum as a sample in an individual TimeSeries + sum := &prompb.Sample{ + Value: pt.Sum(), + Timestamp: timestamp, + } + if pt.Flags().NoRecordedValue() { + sum.Value = math.Float64frombits(value.StaleNaN) + } + sumlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+sumStr) + addSample(tsMap, sum, sumlabels, metric.Type().String()) + + // treat count as a sample in an individual TimeSeries + count := &prompb.Sample{ + Value: float64(pt.Count()), + Timestamp: timestamp, + } + if pt.Flags().NoRecordedValue() { + count.Value = math.Float64frombits(value.StaleNaN) + } + countlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+countStr) + addSample(tsMap, count, countlabels, metric.Type().String()) + + // process each percentile/quantile + for i := 0; i < pt.QuantileValues().Len(); i++ { + qt := pt.QuantileValues().At(i) + quantile := &prompb.Sample{ + Value: qt.Value(), + Timestamp: timestamp, + } + if pt.Flags().NoRecordedValue() { + quantile.Value = math.Float64frombits(value.StaleNaN) + } + percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) + qtlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName, quantileStr, percentileStr) + addSample(tsMap, quantile, qtlabels, metric.Type().String()) + } + + // add _created time series if needed + startTimestamp := pt.StartTimestamp() + if settings.ExportCreatedMetric && startTimestamp != 0 { + createdLabels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nameStr, + baseName+createdSuffix, + ) + addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String()) + } +} + +// addCreatedTimeSeriesIfNeeded adds {name}_created time series with a single +// sample. If the series exists, then new samples won't be added. +func addCreatedTimeSeriesIfNeeded( + series map[string]*prompb.TimeSeries, + labels []prompb.Label, + startTimestamp pcommon.Timestamp, + metricType string, +) { + sig := timeSeriesSignature(metricType, &labels) + if _, ok := series[sig]; !ok { + series[sig] = &prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{ + { // convert ns to ms + Value: float64(convertTimeStamp(startTimestamp)), + }, + }, + } + } +} + +// addResourceTargetInfo converts the resource to the target info metric +func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, tsMap map[string]*prompb.TimeSeries) { + if settings.DisableTargetInfo { + return + } + // Use resource attributes (other than those used for job+instance) as the + // metric labels for the target info metric + attributes := pcommon.NewMap() + resource.Attributes().CopyTo(attributes) + attributes.RemoveIf(func(k string, _ pcommon.Value) bool { + switch k { + case conventions.AttributeServiceName, conventions.AttributeServiceNamespace, conventions.AttributeServiceInstanceID: + // Remove resource attributes used for job + instance + return true + default: + return false + } + }) + if attributes.Len() == 0 { + // If we only have job + instance, then target_info isn't useful, so don't add it. + return + } + // create parameters for addSample + name := targetMetricName + if len(settings.Namespace) > 0 { + name = settings.Namespace + "_" + name + } + labels := createAttributes(resource, attributes, settings.ExternalLabels, nameStr, name) + sample := &prompb.Sample{ + Value: float64(1), + // convert ns to ms + Timestamp: convertTimeStamp(timestamp), + } + addSample(tsMap, sample, labels, infoType) +} + +// convertTimeStamp converts OTLP timestamp in ns to timestamp in ms +func convertTimeStamp(timestamp pcommon.Timestamp) int64 { + return timestamp.AsTime().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go new file mode 100644 index 000000000..9a4ec6e11 --- /dev/null +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -0,0 +1,158 @@ +// DO NOT EDIT. COPIED AS-IS. SEE README.md + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" + +import ( + "fmt" + "math" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +const defaultZeroThreshold = 1e-128 + +func addSingleExponentialHistogramDataPoint( + metric string, + pt pmetric.ExponentialHistogramDataPoint, + resource pcommon.Resource, + settings Settings, + series map[string]*prompb.TimeSeries, +) error { + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + model.MetricNameLabel, metric, + ) + + sig := timeSeriesSignature( + pmetric.MetricTypeExponentialHistogram.String(), + &labels, + ) + ts, ok := series[sig] + if !ok { + ts = &prompb.TimeSeries{ + Labels: labels, + } + series[sig] = ts + } + + histogram, err := exponentialToNativeHistogram(pt) + if err != nil { + return err + } + ts.Histograms = append(ts.Histograms, histogram) + + exemplars := getPromExemplars[pmetric.ExponentialHistogramDataPoint](pt) + ts.Exemplars = append(ts.Exemplars, exemplars...) + + return nil +} + +// exponentialToNativeHistogram translates OTel Exponential Histogram data point +// to Prometheus Native Histogram. +func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, error) { + scale := p.Scale() + if scale < -4 || scale > 8 { + return prompb.Histogram{}, + fmt.Errorf("cannot convert exponential to native histogram."+ + " Scale must be <= 8 and >= -4, was %d", scale) + // TODO: downscale to 8 if scale > 8 + } + + pSpans, pDeltas := convertBucketsLayout(p.Positive()) + nSpans, nDeltas := convertBucketsLayout(p.Negative()) + + h := prompb.Histogram{ + Schema: scale, + + ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: p.ZeroCount()}, + // TODO use zero_threshold, if set, see + // https://github.com/open-telemetry/opentelemetry-proto/pull/441 + ZeroThreshold: defaultZeroThreshold, + + PositiveSpans: pSpans, + PositiveDeltas: pDeltas, + NegativeSpans: nSpans, + NegativeDeltas: nDeltas, + + Timestamp: convertTimeStamp(p.Timestamp()), + } + + if p.Flags().NoRecordedValue() { + h.Sum = math.Float64frombits(value.StaleNaN) + h.Count = &prompb.Histogram_CountInt{CountInt: value.StaleNaN} + } else { + if p.HasSum() { + h.Sum = p.Sum() + } + h.Count = &prompb.Histogram_CountInt{CountInt: p.Count()} + } + return h, nil +} + +// convertBucketsLayout translates OTel Exponential Histogram dense buckets +// representation to Prometheus Native Histogram sparse bucket representation. +// +// The translation logic is taken from the client_golang `histogram.go#makeBuckets` +// function, see `makeBuckets` https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go +// The bucket indexes conversion was adjusted, since OTel exp. histogram bucket +// index 0 corresponds to the range (1, base] while Prometheus bucket index 0 +// to the range (base 1]. +func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets) ([]prompb.BucketSpan, []int64) { + bucketCounts := buckets.BucketCounts() + if bucketCounts.Len() == 0 { + return nil, nil + } + + var ( + spans []prompb.BucketSpan + deltas []int64 + prevCount int64 + nextBucketIdx int32 + ) + + appendDelta := func(count int64) { + spans[len(spans)-1].Length++ + deltas = append(deltas, count-prevCount) + prevCount = count + } + + for i := 0; i < bucketCounts.Len(); i++ { + count := int64(bucketCounts.At(i)) + if count == 0 { + continue + } + + // The offset is adjusted by 1 as described above. + bucketIdx := int32(i) + buckets.Offset() + 1 + delta := bucketIdx - nextBucketIdx + if i == 0 || delta > 2 { + // We have to create a new span, either because we are + // at the very beginning, or because we have found a gap + // of more than two buckets. The constant 2 is copied from the logic in + // https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296 + spans = append(spans, prompb.BucketSpan{ + Offset: delta, + Length: 0, + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for j := int32(0); j < delta; j++ { + appendDelta(0) + } + } + appendDelta(count) + nextBucketIdx = bucketIdx + 1 + } + + return spans, deltas +} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go new file mode 100644 index 000000000..34ee762dd --- /dev/null +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -0,0 +1,114 @@ +// DO NOT EDIT. COPIED AS-IS. SEE README.md + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" + +import ( + "errors" + "fmt" + + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" + + prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" +) + +type Settings struct { + Namespace string + ExternalLabels map[string]string + DisableTargetInfo bool + ExportCreatedMetric bool +} + +// FromMetrics converts pmetric.Metrics to prometheus remote write format. +func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*prompb.TimeSeries, errs error) { + tsMap = make(map[string]*prompb.TimeSeries) + + resourceMetricsSlice := md.ResourceMetrics() + for i := 0; i < resourceMetricsSlice.Len(); i++ { + resourceMetrics := resourceMetricsSlice.At(i) + resource := resourceMetrics.Resource() + scopeMetricsSlice := resourceMetrics.ScopeMetrics() + // keep track of the most recent timestamp in the ResourceMetrics for + // use with the "target" info metric + var mostRecentTimestamp pcommon.Timestamp + for j := 0; j < scopeMetricsSlice.Len(); j++ { + scopeMetrics := scopeMetricsSlice.At(j) + metricSlice := scopeMetrics.Metrics() + + // TODO: decide if instrumentation library information should be exported as labels + for k := 0; k < metricSlice.Len(); k++ { + metric := metricSlice.At(k) + mostRecentTimestamp = maxTimestamp(mostRecentTimestamp, mostRecentTimestampInMetric(metric)) + + if !isValidAggregationTemporality(metric) { + errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name())) + continue + } + + // handle individual metric based on type + switch metric.Type() { + case pmetric.MetricTypeGauge: + dataPoints := metric.Gauge().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + for x := 0; x < dataPoints.Len(); x++ { + addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + } + case pmetric.MetricTypeSum: + dataPoints := metric.Sum().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + for x := 0; x < dataPoints.Len(); x++ { + addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + } + case pmetric.MetricTypeHistogram: + dataPoints := metric.Histogram().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + for x := 0; x < dataPoints.Len(); x++ { + addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + } + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) + for x := 0; x < dataPoints.Len(); x++ { + errs = multierr.Append( + errs, + addSingleExponentialHistogramDataPoint( + name, + dataPoints.At(x), + resource, + settings, + tsMap, + ), + ) + } + case pmetric.MetricTypeSummary: + dataPoints := metric.Summary().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + for x := 0; x < dataPoints.Len(); x++ { + addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + } + default: + errs = multierr.Append(errs, errors.New("unsupported metric type")) + } + } + } + addResourceTargetInfo(resource, settings, mostRecentTimestamp, tsMap) + } + + return +} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go new file mode 100644 index 000000000..3a5d201dd --- /dev/null +++ b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go @@ -0,0 +1,104 @@ +// DO NOT EDIT. COPIED AS-IS. SEE README.md + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" + +import ( + "math" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" +) + +// addSingleSumNumberDataPoint converts the Gauge metric data point to a +// Prometheus time series with samples and labels. The result is stored in the +// series map. +func addSingleGaugeNumberDataPoint( + pt pmetric.NumberDataPoint, + resource pcommon.Resource, + metric pmetric.Metric, + settings Settings, + series map[string]*prompb.TimeSeries, +) { + name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + model.MetricNameLabel, name, + ) + sample := &prompb.Sample{ + // convert ns to ms + Timestamp: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + addSample(series, sample, labels, metric.Type().String()) +} + +// addSingleSumNumberDataPoint converts the Sum metric data point to a Prometheus +// time series with samples, labels and exemplars. The result is stored in the +// series map. +func addSingleSumNumberDataPoint( + pt pmetric.NumberDataPoint, + resource pcommon.Resource, + metric pmetric.Metric, + settings Settings, + series map[string]*prompb.TimeSeries, +) { + name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + model.MetricNameLabel, name, + ) + sample := &prompb.Sample{ + // convert ns to ms + Timestamp: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + sig := addSample(series, sample, labels, metric.Type().String()) + + if ts, ok := series[sig]; sig != "" && ok { + exemplars := getPromExemplars[pmetric.NumberDataPoint](pt) + ts.Exemplars = append(ts.Exemplars, exemplars...) + } + + // add _created time series if needed + if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { + startTimestamp := pt.StartTimestamp() + if startTimestamp != 0 { + createdLabels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nameStr, + name+createdSuffix, + ) + addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, metric.Type().String()) + } + } +} diff --git a/storage/remote/otlptranslator/update-copy.sh b/storage/remote/otlptranslator/update-copy.sh new file mode 100755 index 000000000..13a2a7a2e --- /dev/null +++ b/storage/remote/otlptranslator/update-copy.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +OTEL_VERSION=v0.81.0 + +git clone https://github.com/open-telemetry/opentelemetry-collector-contrib ./tmp +cd ./tmp +git checkout $OTEL_VERSION +cd .. +rm -rf ./prometheusremotewrite/* +cp -r ./tmp/pkg/translator/prometheusremotewrite/*.go ./prometheusremotewrite +rm -rf ./prometheusremotewrite/*_test.go +rm -rf ./tmp + +sed -i '' 's#github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus#github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus#g' ./prometheusremotewrite/*.go +sed -i '' '1s#^#// DO NOT EDIT. COPIED AS-IS. SEE README.md\n\n#g' ./prometheusremotewrite/*.go diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 96d8cbd90..6c0cd8a29 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" + otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" ) type writeHandler struct { @@ -178,3 +179,60 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err return nil } + +// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and +// writes them to the provided appendable. +func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler { + rwHandler := &writeHandler{ + logger: logger, + appendable: appendable, + } + + return &otlpWriteHandler{ + logger: logger, + rwHandler: rwHandler, + } +} + +type otlpWriteHandler struct { + logger log.Logger + rwHandler *writeHandler +} + +func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + req, err := DecodeOTLPWriteRequest(r) + if err != nil { + level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + prwMetricsMap, errs := otlptranslator.FromMetrics(req.Metrics(), otlptranslator.Settings{}) + if errs != nil { + level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", errs) + } + + prwMetrics := make([]prompb.TimeSeries, 0, len(prwMetricsMap)) + + for _, ts := range prwMetricsMap { + prwMetrics = append(prwMetrics, *ts) + } + + err = h.rwHandler.write(r.Context(), &prompb.WriteRequest{ + Timeseries: prwMetrics, + }) + + switch err { + case nil: + case storage.ErrOutOfOrderSample, storage.ErrOutOfBounds, storage.ErrDuplicateSampleForTimestamp: + // Indicated an out of order sample is a bad request to prevent retries. + http.Error(w, err.Error(), http.StatusBadRequest) + return + default: + level.Error(h.logger).Log("msg", "Error appending remote write", "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index b1f1acbc7..824e319c2 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -14,6 +14,9 @@ package remote import ( + "bytes" + "net/http" + "net/http/httptest" "net/url" "testing" "time" @@ -22,6 +25,9 @@ import ( common_config "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" @@ -373,3 +379,107 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { err = s.Close() require.NoError(t, err) } + +func TestOTLPWriteHandler(t *testing.T) { + exportRequest := generateOTLPWriteRequest(t) + + buf, err := exportRequest.MarshalProto() + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/x-protobuf") + + appendable := &mockAppendable{} + handler := NewOTLPWriteHandler(nil, appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusOK, resp.StatusCode) + + require.Equal(t, 12, len(appendable.samples)) // 1 (counter) + 1 (gauge) + 1 (target_info) + 7 (hist_bucket) + 2 (hist_sum, hist_count) + require.Equal(t, 1, len(appendable.histograms)) // 1 (exponential histogram) + require.Equal(t, 1, len(appendable.exemplars)) // 1 (exemplar) +} + +func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest { + d := pmetric.NewMetrics() + + // Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram + // with resource attributes: service.name="test-service", service.instance.id="test-instance", host.name="test-host" + // with metric attibute: foo.bar="baz" + + timestamp := time.Now() + + resourceMetric := d.ResourceMetrics().AppendEmpty() + resourceMetric.Resource().Attributes().PutStr("service.name", "test-service") + resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance") + resourceMetric.Resource().Attributes().PutStr("host.name", "test-host") + + scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() + + // Generate One Counter + counterMetric := scopeMetric.Metrics().AppendEmpty() + counterMetric.SetName("test-counter") + counterMetric.SetDescription("test-counter-description") + counterMetric.SetEmptySum() + counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + counterMetric.Sum().SetIsMonotonic(true) + + counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty() + counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + counterDataPoint.SetDoubleValue(10.0) + counterDataPoint.Attributes().PutStr("foo.bar", "baz") + + counterExemplar := counterDataPoint.Exemplars().AppendEmpty() + counterExemplar.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + counterExemplar.SetDoubleValue(10.0) + counterExemplar.SetSpanID(pcommon.SpanID{0, 1, 2, 3, 4, 5, 6, 7}) + counterExemplar.SetTraceID(pcommon.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}) + + // Generate One Gauge + gaugeMetric := scopeMetric.Metrics().AppendEmpty() + gaugeMetric.SetName("test-gauge") + gaugeMetric.SetDescription("test-gauge-description") + gaugeMetric.SetEmptyGauge() + + gaugeDataPoint := gaugeMetric.Gauge().DataPoints().AppendEmpty() + gaugeDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + gaugeDataPoint.SetDoubleValue(10.0) + gaugeDataPoint.Attributes().PutStr("foo.bar", "baz") + + // Generate One Histogram + histogramMetric := scopeMetric.Metrics().AppendEmpty() + histogramMetric.SetName("test-histogram") + histogramMetric.SetDescription("test-histogram-description") + histogramMetric.SetEmptyHistogram() + histogramMetric.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + histogramDataPoint := histogramMetric.Histogram().DataPoints().AppendEmpty() + histogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + histogramDataPoint.ExplicitBounds().FromRaw([]float64{0.0, 1.0, 2.0, 3.0, 4.0, 5.0}) + histogramDataPoint.BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2, 2}) + histogramDataPoint.SetCount(10) + histogramDataPoint.SetSum(30.0) + histogramDataPoint.Attributes().PutStr("foo.bar", "baz") + + // Generate One Exponential-Histogram + exponentialHistogramMetric := scopeMetric.Metrics().AppendEmpty() + exponentialHistogramMetric.SetName("test-exponential-histogram") + exponentialHistogramMetric.SetDescription("test-exponential-histogram-description") + exponentialHistogramMetric.SetEmptyExponentialHistogram() + exponentialHistogramMetric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + exponentialHistogramDataPoint := exponentialHistogramMetric.ExponentialHistogram().DataPoints().AppendEmpty() + exponentialHistogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + exponentialHistogramDataPoint.SetScale(2.0) + exponentialHistogramDataPoint.Positive().BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2}) + exponentialHistogramDataPoint.SetZeroCount(2) + exponentialHistogramDataPoint.SetCount(10) + exponentialHistogramDataPoint.SetSum(30.0) + exponentialHistogramDataPoint.Attributes().PutStr("foo.bar", "baz") + + return pmetricotlp.NewExportRequestFromMetrics(d) +} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 73892aaf6..99589ac46 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -217,6 +217,7 @@ type API struct { remoteWriteHandler http.Handler remoteReadHandler http.Handler + otlpWriteHandler http.Handler codecs []Codec } @@ -249,6 +250,8 @@ func NewAPI( gatherer prometheus.Gatherer, registerer prometheus.Registerer, statsRenderer StatsRenderer, + rwEnabled bool, + otlpEnabled bool, ) *API { a := &API{ QueryEngine: qe, @@ -285,9 +288,16 @@ func NewAPI( a.statsRenderer = statsRenderer } - if ap != nil { + if ap == nil && (rwEnabled || otlpEnabled) { + panic("remote write or otlp write enabled, but no appender passed in.") + } + + if rwEnabled { a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap) } + if otlpEnabled { + a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap) + } return a } @@ -380,6 +390,7 @@ func (api *API) Register(r *route.Router) { r.Get("/status/walreplay", api.serveWALReplayStatus) r.Post("/read", api.ready(api.remoteRead)) r.Post("/write", api.ready(api.remoteWrite)) + r.Post("/otlp/v1/metrics", api.ready(api.otlpWrite)) r.Get("/alerts", wrapAgent(api.alerts)) r.Get("/rules", wrapAgent(api.rules)) @@ -1581,6 +1592,14 @@ func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) { } } +func (api *API) otlpWrite(w http.ResponseWriter, r *http.Request) { + if api.otlpWriteHandler != nil { + api.otlpWriteHandler.ServeHTTP(w, r) + } else { + http.Error(w, "otlp write receiver needs to be enabled with --enable-feature=otlp-write-receiver", http.StatusNotFound) + } +} + func (api *API) deleteSeries(r *http.Request) apiFuncResult { if !api.enableAdmin { return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil} diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index 4947afd81..afdd67337 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -134,6 +134,8 @@ func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router { prometheus.DefaultGatherer, nil, nil, + false, + false, ) promRouter := route.New().WithPrefix("/api/v1") diff --git a/web/web.go b/web/web.go index 6a3eab327..7022db64e 100644 --- a/web/web.go +++ b/web/web.go @@ -259,6 +259,7 @@ type Options struct { RemoteReadConcurrencyLimit int RemoteReadBytesInFrame int EnableRemoteWriteReceiver bool + EnableOTLPWriteReceiver bool IsAgent bool AppName string @@ -317,7 +318,7 @@ func New(logger log.Logger, o *Options) *Handler { FactoryRr := func(_ context.Context) api_v1.RulesRetriever { return h.ruleManager } var app storage.Appendable - if o.EnableRemoteWriteReceiver { + if o.EnableRemoteWriteReceiver || o.EnableOTLPWriteReceiver { app = h.storage } @@ -349,6 +350,8 @@ func New(logger log.Logger, o *Options) *Handler { o.Gatherer, o.Registerer, nil, + o.EnableRemoteWriteReceiver, + o.EnableOTLPWriteReceiver, ) if o.RoutePrefix != "/" {