mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
remote/otlp: convert delta to cumulative (#15165)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
What Adds support for OTLP delta temporality to the OTLP endpoint. This is done by calling the deltatocumulative processor from the OpenTelemetry collector during OTLP conversion. Why Delta conversion is a naturally stateful process, which requires careful request routing when operated inside a collector. Prometheus is already stateful and doing the conversion in-server reduces the operational burden on the ingest architecture by only having one stateful component. How deltatocumulative is a OTel collector component that works as follows: * pmetric.Metrics come from a receiver or in this case from the HTTP client * It operates as an in-place update loop: * for each sample, if not delta, leave unmodified * if delta, do: * state += sample, where state is the in-memory sum of all previous samples * sample = state, sample value is now cumulative * this is supported for sums (counters), gauges, histograms (old histograms) and exponential histograms (native histograms) If a series receives no new samples for 5m, its state is removed from memory Performance Delta performance is a stateful operation and the OTel code is not highly optimized yet, e.g. it locks the entire processor for each request. Nonetheless, care has been taken to mitigate those effects: delta conversion is behind a feature flag. If disabled, no conversion code is ever invoked if enabled, conversion is not invoked if request not actually contains delta samples. This leads to no measureable performance difference between default-cumulative to convert-cumulative (only cumulative, feature on/off) Signed-off-by: sh0rez <me@shorez.de>
This commit is contained in:
parent
e5e8f90119
commit
5303e515af
|
@ -275,6 +275,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
|
|||
case "old-ui":
|
||||
c.web.UseOldUI = true
|
||||
logger.Info("Serving previous version of the Prometheus web UI.")
|
||||
case "otlp-deltatocumulative":
|
||||
c.web.ConvertOTLPDelta = true
|
||||
logger.Info("Converting delta OTLP metrics to cumulative")
|
||||
default:
|
||||
logger.Warn("Unknown option for --enable-feature", "option", o)
|
||||
}
|
||||
|
@ -516,7 +519,7 @@ func main() {
|
|||
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
|
||||
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
|
||||
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
Default("").StringsVar(&cfg.featureList)
|
||||
|
||||
a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode)
|
||||
|
|
|
@ -60,7 +60,7 @@ The Prometheus monitoring server
|
|||
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
|
||||
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
|
||||
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
|
||||
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
|
||||
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
|
||||
| <code class="text-nowrap">--agent</code> | Run Prometheus in 'Agent mode'. | |
|
||||
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
|
||||
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |
|
||||
|
|
|
@ -151,3 +151,25 @@ Configuration reloads are triggered by detecting changes in the checksum of the
|
|||
main configuration file or any referenced files, such as rule and scrape
|
||||
configurations. To ensure consistency and avoid issues during reloads, it's
|
||||
recommended to update these files atomically.
|
||||
|
||||
## OTLP Delta Conversion
|
||||
|
||||
`--enable-feature=otlp-deltatocumulative`
|
||||
|
||||
When enabled, Prometheus will convert OTLP metrics from delta temporality to their
|
||||
cumulative equivalent, instead of dropping them.
|
||||
|
||||
This uses
|
||||
[deltatocumulative][d2c]
|
||||
from the OTel collector, using its default settings.
|
||||
|
||||
Delta conversion keeps in-memory state to aggregate delta changes per-series over time.
|
||||
When Prometheus restarts, this state is lost, starting the aggregation from zero
|
||||
again. This results in a counter reset in the cumulative series.
|
||||
|
||||
This state is periodically ([`max_stale`][d2c]) cleared of inactive series.
|
||||
|
||||
Enabling this _can_ have negative impact on performance, because the in-memory
|
||||
state is mutex guarded. Cumulative-only OTLP requests are not affected.
|
||||
|
||||
[d2c]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor
|
||||
|
|
|
@ -1424,6 +1424,15 @@ endpoint is `/api/v1/otlp/v1/metrics`.
|
|||
|
||||
*New in v2.47*
|
||||
|
||||
### OTLP Delta
|
||||
|
||||
Prometheus can convert incoming metrics from delta temporality to their cumulative equivalent.
|
||||
This is done using [deltatocumulative](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor) from the OpenTelemetry Collector.
|
||||
|
||||
To enable, pass `--enable-feature=otlp-deltatocumulative`.
|
||||
|
||||
*New in v3.2*
|
||||
|
||||
## Notifications
|
||||
|
||||
The following endpoints provide information about active status notifications concerning the Prometheus server itself.
|
||||
|
|
11
go.mod
11
go.mod
|
@ -48,6 +48,7 @@ require (
|
|||
github.com/nsf/jsondiff v0.0.0-20230430225905-43f6cf3098c1
|
||||
github.com/oklog/run v1.1.0
|
||||
github.com/oklog/ulid v1.3.1
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.116.0
|
||||
github.com/ovh/go-ovh v1.6.0
|
||||
github.com/prometheus/alertmanager v0.27.0
|
||||
github.com/prometheus/client_golang v1.20.5
|
||||
|
@ -60,7 +61,10 @@ require (
|
|||
github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/vultr/govultr/v2 v2.17.2
|
||||
go.opentelemetry.io/collector/component v0.116.0
|
||||
go.opentelemetry.io/collector/consumer v1.22.0
|
||||
go.opentelemetry.io/collector/pdata v1.22.0
|
||||
go.opentelemetry.io/collector/processor v0.116.0
|
||||
go.opentelemetry.io/collector/semconv v0.116.0
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.58.0
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0
|
||||
|
@ -68,6 +72,7 @@ require (
|
|||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0
|
||||
go.opentelemetry.io/otel/metric v1.33.0
|
||||
go.opentelemetry.io/otel/sdk v1.33.0
|
||||
go.opentelemetry.io/otel/trace v1.33.0
|
||||
go.uber.org/atomic v1.11.0
|
||||
|
@ -168,6 +173,8 @@ require (
|
|||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/morikuni/aec v1.0.0 // indirect
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.116.0 // indirect
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.116.0 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/opencontainers/image-spec v1.0.2 // indirect
|
||||
github.com/opencontainers/runtime-spec v1.0.2 // indirect
|
||||
|
@ -184,8 +191,10 @@ require (
|
|||
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
|
||||
go.mongodb.org/mongo-driver v1.14.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.33.0 // indirect
|
||||
go.opentelemetry.io/collector/config/configtelemetry v0.116.0 // indirect
|
||||
go.opentelemetry.io/collector/pipeline v0.116.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
|
||||
go.uber.org/zap v1.27.0 // indirect
|
||||
golang.org/x/crypto v0.31.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
|
||||
golang.org/x/mod v0.22.0 // indirect
|
||||
|
|
54
go.sum
54
go.sum
|
@ -165,6 +165,8 @@ github.com/go-resty/resty/v2 v2.16.2/go.mod h1:0fHAoK7JoBy/Ch36N8VFeMsK7xQOHhvWa
|
|||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
|
||||
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
|
||||
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
|
||||
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
|
||||
github.com/go-zookeeper/zk v1.0.4 h1:DPzxraQx7OrPyXq2phlGlNSIyWEsAox0RJmjTseMV6I=
|
||||
github.com/go-zookeeper/zk v1.0.4/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
|
||||
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
|
||||
|
@ -304,6 +306,12 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
|
|||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
||||
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||
github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs=
|
||||
github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI=
|
||||
github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU=
|
||||
github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU=
|
||||
github.com/knadh/koanf/v2 v2.1.2 h1:I2rtLRqXRy1p01m/utEtpZSSA6dcJbgGVuE27kW2PzQ=
|
||||
github.com/knadh/koanf/v2 v2.1.2/go.mod h1:Gphfaen0q1Fc1HTgJgSTC4oRX9R2R5ErYMZJy8fLJBo=
|
||||
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
|
||||
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
|
@ -348,6 +356,8 @@ github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJys
|
|||
github.com/miekg/dns v1.1.62 h1:cN8OuEF1/x5Rq6Np+h1epln8OiyPWV+lROx9LxcGgIQ=
|
||||
github.com/miekg/dns v1.1.62/go.mod h1:mvDlcItzm+br7MToIKqkglaGhlFMHJ9DTNNWONWXbNQ=
|
||||
github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI=
|
||||
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
|
||||
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
|
||||
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
|
||||
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
|
||||
|
@ -355,6 +365,8 @@ github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp
|
|||
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
||||
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
|
||||
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
|
||||
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
|
||||
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
|
||||
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc=
|
||||
|
@ -383,6 +395,14 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA
|
|||
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
|
||||
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
|
||||
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.116.0 h1:Kxk5Ral+Dc6VB9UmTketVjs+rbMZP8JxQ4SXDx4RivQ=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.116.0/go.mod h1:ctT6oQmGmWGGGgUIKyx2fDwqz77N9+04gqKkDyAzKCg=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.116.0 h1:RlEK9MbxWyBHbLel8EJ1L7DbYVLai9dZL6Ljl2cBgyA=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.116.0/go.mod h1:AVUEyIjPb+0ARr7mhIkZkdNg3fd0ZcRhzAi53oZhl1Q=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.116.0 h1:jwnZYRBuPJnsKXE5H6ZvTEm91bXW5VP8+tLewzl54eg=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.116.0/go.mod h1:NT3Ag+DdnIAZQfD7l7OHwlYqnaAJ19SoPZ0nhD9yx4s=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.116.0 h1:ZBmLuipJv7BT9fho/2yAFsS8AtMsCOCe4ON8oqkX3n8=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.116.0/go.mod h1:f0GdYWGxUunyRZ088gHnoX78pc/gZc3dQlRtidiGXzg=
|
||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
|
||||
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
|
||||
|
@ -490,8 +510,36 @@ go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd
|
|||
go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
go.opentelemetry.io/collector/component v0.116.0 h1:SQE1YeVfYCN7bw1n4hknUwJE5U/1qJL552sDhAdSlaA=
|
||||
go.opentelemetry.io/collector/component v0.116.0/go.mod h1:MYgXFZWDTq0uPgF1mkLSFibtpNqksRVAOrmihckOQEs=
|
||||
go.opentelemetry.io/collector/component/componentstatus v0.116.0 h1:wpgY0H2K9IPBzaNAvavKziK86VZ7TuNFQbS9OC4Z6Cs=
|
||||
go.opentelemetry.io/collector/component/componentstatus v0.116.0/go.mod h1:ZRlVwHFMGNfcsAywEJqivOn5JzDZkpe3KZVSwMWu4tw=
|
||||
go.opentelemetry.io/collector/component/componenttest v0.116.0 h1:UIcnx4Rrs/oDRYSAZNHRMUiYs2FBlwgV5Nc0oMYfR6A=
|
||||
go.opentelemetry.io/collector/component/componenttest v0.116.0/go.mod h1:W40HaKPHdBFMVI7zzHE7dhdWC+CgAnAC9SmWetFBATY=
|
||||
go.opentelemetry.io/collector/config/configtelemetry v0.116.0 h1:Vl49VCHQwBOeMswDpFwcl2HD8e9y94xlrfII3SR2VeQ=
|
||||
go.opentelemetry.io/collector/config/configtelemetry v0.116.0/go.mod h1:SlBEwQg0qly75rXZ6W1Ig8jN25KBVBkFIIAUI1GiAAE=
|
||||
go.opentelemetry.io/collector/confmap v1.22.0 h1:ZKQzRuj5lKu+seKArAAZ1yPRroDPricaIVIREm/jr3w=
|
||||
go.opentelemetry.io/collector/confmap v1.22.0/go.mod h1:Rrhs+MWoaP6AswZp+ReQ2VO9dfOfcUjdjiSHBsG+nec=
|
||||
go.opentelemetry.io/collector/consumer v1.22.0 h1:QmfnNizyNZFt0uK3GG/EoT5h6PvZJ0dgVTc5hFEc1l0=
|
||||
go.opentelemetry.io/collector/consumer v1.22.0/go.mod h1:tiz2khNceFAPokxxfzAuFfIpShBasMT2AL2Sbc7+m0I=
|
||||
go.opentelemetry.io/collector/consumer/consumertest v0.116.0 h1:pIVR7FtQMNAzfxBUSMEIC2dX5Lfo3O9ZBfx+sAwrrrM=
|
||||
go.opentelemetry.io/collector/consumer/consumertest v0.116.0/go.mod h1:cV3cNDiPnls5JdhnOJJFVlclrClg9kPs04cXgYP9Gmk=
|
||||
go.opentelemetry.io/collector/consumer/xconsumer v0.116.0 h1:ZrWvq7HumB0jRYmS2ztZ3hhXRNpUVBWPKMbPhsVGmZM=
|
||||
go.opentelemetry.io/collector/consumer/xconsumer v0.116.0/go.mod h1:C+VFMk8vLzPun6XK8aMts6h4RaDjmzXHCPaiOxzRQzQ=
|
||||
go.opentelemetry.io/collector/pdata v1.22.0 h1:3yhjL46NLdTMoP8rkkcE9B0pzjf2973crn0KKhX5UrI=
|
||||
go.opentelemetry.io/collector/pdata v1.22.0/go.mod h1:nLLf6uDg8Kn5g3WNZwGyu8+kf77SwOqQvMTb5AXEbEY=
|
||||
go.opentelemetry.io/collector/pdata/pprofile v0.116.0 h1:iE6lqkO7Hi6lTIIml1RI7yQ55CKqW12R2qHinwF5Zuk=
|
||||
go.opentelemetry.io/collector/pdata/pprofile v0.116.0/go.mod h1:xQiPpjzIiXRFb+1fPxUy/3ygEZgo0Bu/xmLKOWu8vMQ=
|
||||
go.opentelemetry.io/collector/pdata/testdata v0.116.0 h1:zmn1zpeX2BvzL6vt2dBF4OuAyFF2ml/OXcqflNgFiP0=
|
||||
go.opentelemetry.io/collector/pdata/testdata v0.116.0/go.mod h1:ytWzICFN4XTDP6o65B4+Ed52JGdqgk9B8CpLHCeCpMo=
|
||||
go.opentelemetry.io/collector/pipeline v0.116.0 h1:o8eKEuWEszmRpfShy7ElBoQ3Jo6kCi9ucm3yRgdNb9s=
|
||||
go.opentelemetry.io/collector/pipeline v0.116.0/go.mod h1:qE3DmoB05AW0C3lmPvdxZqd/H4po84NPzd5MrqgtL74=
|
||||
go.opentelemetry.io/collector/processor v0.116.0 h1:Kyu4tPzTdWNHtZjcxvI/bGNAgyv8L8Kem2r/Mk4IDAw=
|
||||
go.opentelemetry.io/collector/processor v0.116.0/go.mod h1:+/Ugy48RAxlZEXmN2cw51W8t5wdHS9No+GAoP+moskk=
|
||||
go.opentelemetry.io/collector/processor/processortest v0.116.0 h1:+IqNEVEE0E2MsO2g7+Y/9dz35sDuvAXRXrLts9NdXrA=
|
||||
go.opentelemetry.io/collector/processor/processortest v0.116.0/go.mod h1:DLaQDBxzgeeaUO0ULMn/efos9PmHZkmYCHuxwCsiVHI=
|
||||
go.opentelemetry.io/collector/processor/xprocessor v0.116.0 h1:iin/UwuWvSLB7ZNfINFUYbZ5lxIi1NjZ2brkyyFdiRA=
|
||||
go.opentelemetry.io/collector/processor/xprocessor v0.116.0/go.mod h1:cnA43/XpKDbaOmd8buqKp/LGJ2l/OoCqbR//u5DMfn8=
|
||||
go.opentelemetry.io/collector/semconv v0.116.0 h1:63xCZomsKJAWmKGWD3lnORiE3WKW6AO4LjnzcHzGx3Y=
|
||||
go.opentelemetry.io/collector/semconv v0.116.0/go.mod h1:N6XE8Q0JKgBN2fAhkUQtqK9LT7rEGR6+Wu/Rtbal1iI=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.58.0 h1:xwH3QJv6zL4u+gkPUu59NeT1Gyw9nScWT8FQpKLUJJI=
|
||||
|
@ -510,8 +558,8 @@ go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5W
|
|||
go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M=
|
||||
go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM=
|
||||
go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ=
|
||||
go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s=
|
||||
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
|
||||
go.opentelemetry.io/proto/otlp v1.4.0 h1:TA9WRvW6zMwP+Ssb6fLoUIuirti1gGbP28GcKG1jgeg=
|
||||
|
@ -524,6 +572,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
|||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
||||
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
|
||||
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
|
||||
|
|
|
@ -38,6 +38,13 @@ import (
|
|||
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
|
||||
|
||||
deltatocumulative "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/pdata/pmetric"
|
||||
"go.opentelemetry.io/collector/processor"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
)
|
||||
|
||||
type writeHandler struct {
|
||||
|
@ -517,56 +524,107 @@ func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref stora
|
|||
return ref, err
|
||||
}
|
||||
|
||||
type OTLPOptions struct {
|
||||
// Convert delta samples to their cumulative equivalent by aggregating in-memory
|
||||
ConvertDelta bool
|
||||
}
|
||||
|
||||
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
|
||||
// writes them to the provided appendable.
|
||||
func NewOTLPWriteHandler(logger *slog.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler {
|
||||
rwHandler := &writeHandler{
|
||||
logger: logger,
|
||||
appendable: appendable,
|
||||
func NewOTLPWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, configFunc func() config.Config, opts OTLPOptions) http.Handler {
|
||||
ex := &rwExporter{
|
||||
writeHandler: &writeHandler{
|
||||
logger: logger,
|
||||
appendable: appendable,
|
||||
},
|
||||
config: configFunc,
|
||||
}
|
||||
|
||||
return &otlpWriteHandler{
|
||||
logger: logger,
|
||||
rwHandler: rwHandler,
|
||||
configFunc: configFunc,
|
||||
wh := &otlpWriteHandler{logger: logger, cumul: ex}
|
||||
|
||||
if opts.ConvertDelta {
|
||||
fac := deltatocumulative.NewFactory()
|
||||
set := processor.Settings{TelemetrySettings: component.TelemetrySettings{MeterProvider: noop.NewMeterProvider()}}
|
||||
d2c, err := fac.CreateMetrics(context.Background(), set, fac.CreateDefaultConfig(), wh.cumul)
|
||||
if err != nil {
|
||||
// fac.CreateMetrics directly calls [deltatocumulativeprocessor.createMetricsProcessor],
|
||||
// which only errors if:
|
||||
// - cfg.(type) != *Config
|
||||
// - telemetry.New fails due to bad set.TelemetrySettings
|
||||
//
|
||||
// both cannot be the case, as we pass a valid *Config and valid TelemetrySettings.
|
||||
// as such, we assume this error to never occur.
|
||||
// if it is, our assumptions are broken in which case a panic seems acceptable.
|
||||
panic(err)
|
||||
}
|
||||
if err := d2c.Start(context.Background(), nil); err != nil {
|
||||
// deltatocumulative does not error on start. see above for panic reasoning
|
||||
panic(err)
|
||||
}
|
||||
wh.delta = d2c
|
||||
}
|
||||
|
||||
return wh
|
||||
}
|
||||
|
||||
type otlpWriteHandler struct {
|
||||
logger *slog.Logger
|
||||
rwHandler *writeHandler
|
||||
configFunc func() config.Config
|
||||
type rwExporter struct {
|
||||
*writeHandler
|
||||
config func() config.Config
|
||||
}
|
||||
|
||||
func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
req, err := DecodeOTLPWriteRequest(r)
|
||||
if err != nil {
|
||||
h.logger.Error("Error decoding remote write request", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
otlpCfg := h.configFunc().OTLPConfig
|
||||
func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
|
||||
otlpCfg := rw.config().OTLPConfig
|
||||
|
||||
converter := otlptranslator.NewPrometheusConverter()
|
||||
annots, err := converter.FromMetrics(r.Context(), req.Metrics(), otlptranslator.Settings{
|
||||
annots, err := converter.FromMetrics(ctx, md, otlptranslator.Settings{
|
||||
AddMetricSuffixes: true,
|
||||
AllowUTF8: otlpCfg.TranslationStrategy == config.NoUTF8EscapingWithSuffixes,
|
||||
PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
|
||||
KeepIdentifyingResourceAttributes: otlpCfg.KeepIdentifyingResourceAttributes,
|
||||
})
|
||||
if err != nil {
|
||||
h.logger.Warn("Error translating OTLP metrics to Prometheus write request", "err", err)
|
||||
rw.logger.Warn("Error translating OTLP metrics to Prometheus write request", "err", err)
|
||||
}
|
||||
ws, _ := annots.AsStrings("", 0, 0)
|
||||
if len(ws) > 0 {
|
||||
h.logger.Warn("Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
|
||||
rw.logger.Warn("Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
|
||||
}
|
||||
|
||||
err = h.rwHandler.write(r.Context(), &prompb.WriteRequest{
|
||||
err = rw.write(ctx, &prompb.WriteRequest{
|
||||
Timeseries: converter.TimeSeries(),
|
||||
Metadata: converter.Metadata(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (rw *rwExporter) Capabilities() consumer.Capabilities {
|
||||
return consumer.Capabilities{MutatesData: false}
|
||||
}
|
||||
|
||||
type otlpWriteHandler struct {
|
||||
logger *slog.Logger
|
||||
|
||||
cumul consumer.Metrics // only cumulative
|
||||
delta consumer.Metrics // delta capable
|
||||
}
|
||||
|
||||
func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
req, err := DecodeOTLPWriteRequest(r)
|
||||
if err != nil {
|
||||
h.logger.Error("Error decoding OTLP write request", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
md := req.Metrics()
|
||||
// if delta conversion enabled AND delta samples exist, use slower delta capable path
|
||||
if h.delta != nil && hasDelta(md) {
|
||||
err = h.delta.ConsumeMetrics(r.Context(), md)
|
||||
} else {
|
||||
// deltatocumulative currently holds a sync.Mutex when entering ConsumeMetrics.
|
||||
// This is slow and not necessary when no delta samples exist anyways
|
||||
err = h.cumul.ConsumeMetrics(r.Context(), md)
|
||||
}
|
||||
|
||||
switch {
|
||||
case err == nil:
|
||||
|
@ -583,6 +641,31 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func hasDelta(md pmetric.Metrics) bool {
|
||||
for i := range md.ResourceMetrics().Len() {
|
||||
sms := md.ResourceMetrics().At(i).ScopeMetrics()
|
||||
for i := range sms.Len() {
|
||||
ms := sms.At(i).Metrics()
|
||||
for i := range ms.Len() {
|
||||
temporality := pmetric.AggregationTemporalityUnspecified
|
||||
m := ms.At(i)
|
||||
switch ms.At(i).Type() {
|
||||
case pmetric.MetricTypeSum:
|
||||
temporality = m.Sum().AggregationTemporality()
|
||||
case pmetric.MetricTypeExponentialHistogram:
|
||||
temporality = m.ExponentialHistogram().AggregationTemporality()
|
||||
case pmetric.MetricTypeHistogram:
|
||||
temporality = m.Histogram().AggregationTemporality()
|
||||
}
|
||||
if temporality == pmetric.AggregationTemporalityDelta {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type timeLimitAppender struct {
|
||||
storage.Appender
|
||||
|
||||
|
|
|
@ -15,13 +15,23 @@ package remote
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"math/rand/v2"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
common_config "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/model"
|
||||
|
@ -31,8 +41,10 @@ import (
|
|||
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/relabel"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
func testRemoteWriteConfig() *config.RemoteWriteConfig {
|
||||
|
@ -379,11 +391,11 @@ func TestOTLPWriteHandler(t *testing.T) {
|
|||
req.Header.Set("Content-Type", "application/x-protobuf")
|
||||
|
||||
appendable := &mockAppendable{}
|
||||
handler := NewOTLPWriteHandler(nil, appendable, func() config.Config {
|
||||
handler := NewOTLPWriteHandler(nil, nil, appendable, func() config.Config {
|
||||
return config.Config{
|
||||
OTLPConfig: config.DefaultOTLPConfig,
|
||||
}
|
||||
})
|
||||
}, OTLPOptions{})
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
handler.ServeHTTP(recorder, req)
|
||||
|
@ -476,3 +488,364 @@ func generateOTLPWriteRequest() pmetricotlp.ExportRequest {
|
|||
|
||||
return pmetricotlp.NewExportRequestFromMetrics(d)
|
||||
}
|
||||
|
||||
func TestOTLPDelta(t *testing.T) {
|
||||
log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
|
||||
appendable := &mockAppendable{}
|
||||
cfg := func() config.Config {
|
||||
return config.Config{OTLPConfig: config.DefaultOTLPConfig}
|
||||
}
|
||||
handler := NewOTLPWriteHandler(log, nil, appendable, cfg, OTLPOptions{ConvertDelta: true})
|
||||
|
||||
md := pmetric.NewMetrics()
|
||||
ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
|
||||
|
||||
m := ms.AppendEmpty()
|
||||
m.SetName("some.delta.total")
|
||||
|
||||
sum := m.SetEmptySum()
|
||||
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
|
||||
|
||||
ts := time.Date(2000, 1, 2, 3, 4, 0, 0, time.UTC)
|
||||
for i := range 3 {
|
||||
dp := sum.DataPoints().AppendEmpty()
|
||||
dp.SetIntValue(int64(i))
|
||||
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts.Add(time.Duration(i) * time.Second)))
|
||||
}
|
||||
|
||||
proto, err := pmetricotlp.NewExportRequestFromMetrics(md).MarshalProto()
|
||||
require.NoError(t, err)
|
||||
|
||||
req, err := http.NewRequest("", "", bytes.NewReader(proto))
|
||||
require.NoError(t, err)
|
||||
req.Header.Set("Content-Type", "application/x-protobuf")
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
handler.ServeHTTP(rec, req)
|
||||
require.Equal(t, http.StatusOK, rec.Result().StatusCode)
|
||||
|
||||
ls := labels.FromStrings("__name__", "some_delta_total")
|
||||
milli := func(sec int) int64 {
|
||||
return time.Date(2000, 1, 2, 3, 4, sec, 0, time.UTC).UnixMilli()
|
||||
}
|
||||
|
||||
want := []mockSample{
|
||||
{t: milli(0), l: ls, v: 0}, // +0
|
||||
{t: milli(1), l: ls, v: 1}, // +1
|
||||
{t: milli(2), l: ls, v: 3}, // +2
|
||||
}
|
||||
if diff := cmp.Diff(want, appendable.samples, cmp.Exporter(func(_ reflect.Type) bool { return true })); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkOTLP(b *testing.B) {
|
||||
start := time.Date(2000, 1, 2, 3, 4, 5, 0, time.UTC)
|
||||
|
||||
type Type struct {
|
||||
name string
|
||||
data func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric
|
||||
}
|
||||
types := []Type{{
|
||||
name: "sum",
|
||||
data: func() func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
|
||||
cumul := make(map[int]float64)
|
||||
return func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
|
||||
m := pmetric.NewMetric()
|
||||
sum := m.SetEmptySum()
|
||||
sum.SetAggregationTemporality(mode)
|
||||
dps := sum.DataPoints()
|
||||
for id := range dpc {
|
||||
dp := dps.AppendEmpty()
|
||||
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(start))
|
||||
dp.SetTimestamp(pcommon.NewTimestampFromTime(start.Add(time.Duration(epoch) * time.Minute)))
|
||||
dp.Attributes().PutStr("id", strconv.Itoa(id))
|
||||
v := float64(rand.IntN(100)) / 10
|
||||
switch mode {
|
||||
case pmetric.AggregationTemporalityDelta:
|
||||
dp.SetDoubleValue(v)
|
||||
case pmetric.AggregationTemporalityCumulative:
|
||||
cumul[id] += v
|
||||
dp.SetDoubleValue(cumul[id])
|
||||
}
|
||||
}
|
||||
return []pmetric.Metric{m}
|
||||
}
|
||||
}(),
|
||||
}, {
|
||||
name: "histogram",
|
||||
data: func() func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
|
||||
bounds := [4]float64{1, 10, 100, 1000}
|
||||
type state struct {
|
||||
counts [4]uint64
|
||||
count uint64
|
||||
sum float64
|
||||
}
|
||||
var cumul []state
|
||||
return func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
|
||||
if cumul == nil {
|
||||
cumul = make([]state, dpc)
|
||||
}
|
||||
m := pmetric.NewMetric()
|
||||
hist := m.SetEmptyHistogram()
|
||||
hist.SetAggregationTemporality(mode)
|
||||
dps := hist.DataPoints()
|
||||
for id := range dpc {
|
||||
dp := dps.AppendEmpty()
|
||||
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(start))
|
||||
dp.SetTimestamp(pcommon.NewTimestampFromTime(start.Add(time.Duration(epoch) * time.Minute)))
|
||||
dp.Attributes().PutStr("id", strconv.Itoa(id))
|
||||
dp.ExplicitBounds().FromRaw(bounds[:])
|
||||
|
||||
var obs *state
|
||||
switch mode {
|
||||
case pmetric.AggregationTemporalityDelta:
|
||||
obs = new(state)
|
||||
case pmetric.AggregationTemporalityCumulative:
|
||||
obs = &cumul[id]
|
||||
}
|
||||
|
||||
for i := range obs.counts {
|
||||
v := uint64(rand.IntN(10))
|
||||
obs.counts[i] += v
|
||||
obs.count++
|
||||
obs.sum += float64(v)
|
||||
}
|
||||
|
||||
dp.SetCount(obs.count)
|
||||
dp.SetSum(obs.sum)
|
||||
dp.BucketCounts().FromRaw(obs.counts[:])
|
||||
}
|
||||
return []pmetric.Metric{m}
|
||||
}
|
||||
}(),
|
||||
}, {
|
||||
name: "exponential",
|
||||
data: func() func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
|
||||
type state struct {
|
||||
counts [4]uint64
|
||||
count uint64
|
||||
sum float64
|
||||
}
|
||||
var cumul []state
|
||||
return func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
|
||||
if cumul == nil {
|
||||
cumul = make([]state, dpc)
|
||||
}
|
||||
m := pmetric.NewMetric()
|
||||
ex := m.SetEmptyExponentialHistogram()
|
||||
ex.SetAggregationTemporality(mode)
|
||||
dps := ex.DataPoints()
|
||||
for id := range dpc {
|
||||
dp := dps.AppendEmpty()
|
||||
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(start))
|
||||
dp.SetTimestamp(pcommon.NewTimestampFromTime(start.Add(time.Duration(epoch) * time.Minute)))
|
||||
dp.Attributes().PutStr("id", strconv.Itoa(id))
|
||||
dp.SetScale(2)
|
||||
|
||||
var obs *state
|
||||
switch mode {
|
||||
case pmetric.AggregationTemporalityDelta:
|
||||
obs = new(state)
|
||||
case pmetric.AggregationTemporalityCumulative:
|
||||
obs = &cumul[id]
|
||||
}
|
||||
|
||||
for i := range obs.counts {
|
||||
v := uint64(rand.IntN(10))
|
||||
obs.counts[i] += v
|
||||
obs.count++
|
||||
obs.sum += float64(v)
|
||||
}
|
||||
|
||||
dp.Positive().BucketCounts().FromRaw(obs.counts[:])
|
||||
dp.SetCount(obs.count)
|
||||
dp.SetSum(obs.sum)
|
||||
}
|
||||
|
||||
return []pmetric.Metric{m}
|
||||
}
|
||||
}(),
|
||||
}}
|
||||
|
||||
modes := []struct {
|
||||
name string
|
||||
data func(func(pmetric.AggregationTemporality, int, int) []pmetric.Metric, int) []pmetric.Metric
|
||||
}{{
|
||||
name: "cumulative",
|
||||
data: func(data func(pmetric.AggregationTemporality, int, int) []pmetric.Metric, epoch int) []pmetric.Metric {
|
||||
return data(pmetric.AggregationTemporalityCumulative, 10, epoch)
|
||||
},
|
||||
}, {
|
||||
name: "delta",
|
||||
data: func(data func(pmetric.AggregationTemporality, int, int) []pmetric.Metric, epoch int) []pmetric.Metric {
|
||||
return data(pmetric.AggregationTemporalityDelta, 10, epoch)
|
||||
},
|
||||
}, {
|
||||
name: "mixed",
|
||||
data: func(data func(pmetric.AggregationTemporality, int, int) []pmetric.Metric, epoch int) []pmetric.Metric {
|
||||
cumul := data(pmetric.AggregationTemporalityCumulative, 5, epoch)
|
||||
delta := data(pmetric.AggregationTemporalityDelta, 5, epoch)
|
||||
out := append(cumul, delta...)
|
||||
rand.Shuffle(len(out), func(i, j int) { out[i], out[j] = out[j], out[i] })
|
||||
return out
|
||||
},
|
||||
}}
|
||||
|
||||
configs := []struct {
|
||||
name string
|
||||
opts OTLPOptions
|
||||
}{
|
||||
{name: "default"},
|
||||
{name: "convert", opts: OTLPOptions{ConvertDelta: true}},
|
||||
}
|
||||
|
||||
Workers := runtime.GOMAXPROCS(0)
|
||||
for _, cs := range types {
|
||||
for _, mode := range modes {
|
||||
for _, cfg := range configs {
|
||||
b.Run(fmt.Sprintf("type=%s/temporality=%s/cfg=%s", cs.name, mode.name, cfg.name), func(b *testing.B) {
|
||||
if !cfg.opts.ConvertDelta && (mode.name == "delta" || mode.name == "mixed") {
|
||||
b.Skip("not possible")
|
||||
}
|
||||
|
||||
var total int
|
||||
|
||||
// reqs is a [b.N]*http.Request, divided across the workers.
|
||||
// deltatocumulative requires timestamps to be strictly in
|
||||
// order on a per-series basis. to ensure this, each reqs[k]
|
||||
// contains samples of differently named series, sorted
|
||||
// strictly in time order
|
||||
reqs := make([][]*http.Request, Workers)
|
||||
for n := range b.N {
|
||||
k := n % Workers
|
||||
|
||||
md := pmetric.NewMetrics()
|
||||
ms := md.ResourceMetrics().AppendEmpty().
|
||||
ScopeMetrics().AppendEmpty().
|
||||
Metrics()
|
||||
|
||||
for i, m := range mode.data(cs.data, n) {
|
||||
m.SetName(fmt.Sprintf("benchmark_%d_%d", k, i))
|
||||
m.MoveTo(ms.AppendEmpty())
|
||||
}
|
||||
|
||||
total += sampleCount(md)
|
||||
|
||||
ex := pmetricotlp.NewExportRequestFromMetrics(md)
|
||||
data, err := ex.MarshalProto()
|
||||
require.NoError(b, err)
|
||||
|
||||
req, err := http.NewRequest("", "", bytes.NewReader(data))
|
||||
require.NoError(b, err)
|
||||
req.Header.Set("Content-Type", "application/x-protobuf")
|
||||
|
||||
reqs[k] = append(reqs[k], req)
|
||||
}
|
||||
|
||||
log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
|
||||
mock := new(mockAppendable)
|
||||
appendable := syncAppendable{Appendable: mock, lock: new(sync.Mutex)}
|
||||
cfgfn := func() config.Config {
|
||||
return config.Config{OTLPConfig: config.DefaultOTLPConfig}
|
||||
}
|
||||
handler := NewOTLPWriteHandler(log, nil, appendable, cfgfn, cfg.opts)
|
||||
|
||||
fail := make(chan struct{})
|
||||
done := make(chan struct{})
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
// we use multiple workers to mimic a real-world scenario
|
||||
// where multiple OTel collectors are sending their
|
||||
// time-series in parallel.
|
||||
// this is necessary to exercise potential lock-contention
|
||||
// in this benchmark
|
||||
for k := range Workers {
|
||||
go func() {
|
||||
rec := httptest.NewRecorder()
|
||||
for _, req := range reqs[k] {
|
||||
handler.ServeHTTP(rec, req)
|
||||
if rec.Result().StatusCode != http.StatusOK {
|
||||
fail <- struct{}{}
|
||||
return
|
||||
}
|
||||
}
|
||||
done <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
for range Workers {
|
||||
select {
|
||||
case <-fail:
|
||||
b.FailNow()
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(b, total, len(mock.samples)+len(mock.histograms))
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sampleCount(md pmetric.Metrics) int {
|
||||
var total int
|
||||
rms := md.ResourceMetrics()
|
||||
for i := range rms.Len() {
|
||||
sms := rms.At(i).ScopeMetrics()
|
||||
for i := range sms.Len() {
|
||||
ms := sms.At(i).Metrics()
|
||||
for i := range ms.Len() {
|
||||
m := ms.At(i)
|
||||
switch m.Type() {
|
||||
case pmetric.MetricTypeSum:
|
||||
total += m.Sum().DataPoints().Len()
|
||||
case pmetric.MetricTypeGauge:
|
||||
total += m.Gauge().DataPoints().Len()
|
||||
case pmetric.MetricTypeHistogram:
|
||||
dps := m.Histogram().DataPoints()
|
||||
for i := range dps.Len() {
|
||||
total += dps.At(i).BucketCounts().Len()
|
||||
total++ // le=+Inf series
|
||||
total++ // _sum series
|
||||
total++ // _count series
|
||||
}
|
||||
case pmetric.MetricTypeExponentialHistogram:
|
||||
total += m.ExponentialHistogram().DataPoints().Len()
|
||||
case pmetric.MetricTypeSummary:
|
||||
total += m.Summary().DataPoints().Len()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
type syncAppendable struct {
|
||||
lock sync.Locker
|
||||
storage.Appendable
|
||||
}
|
||||
|
||||
type syncAppender struct {
|
||||
lock sync.Locker
|
||||
storage.Appender
|
||||
}
|
||||
|
||||
func (s syncAppendable) Appender(ctx context.Context) storage.Appender {
|
||||
return syncAppender{Appender: s.Appendable.Appender(ctx), lock: s.lock}
|
||||
}
|
||||
|
||||
func (s syncAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
return s.Appender.Append(ref, l, t, v)
|
||||
}
|
||||
|
||||
func (s syncAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, f *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
return s.Appender.AppendHistogram(ref, l, t, h, f)
|
||||
}
|
||||
|
|
|
@ -259,7 +259,7 @@ func NewAPI(
|
|||
statsRenderer StatsRenderer,
|
||||
rwEnabled bool,
|
||||
acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg,
|
||||
otlpEnabled bool,
|
||||
otlpEnabled, otlpDeltaToCumulative bool,
|
||||
ctZeroIngestionEnabled bool,
|
||||
) *API {
|
||||
a := &API{
|
||||
|
@ -307,7 +307,7 @@ func NewAPI(
|
|||
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled)
|
||||
}
|
||||
if otlpEnabled {
|
||||
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc)
|
||||
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ConvertDelta: otlpDeltaToCumulative})
|
||||
}
|
||||
|
||||
return a
|
||||
|
|
|
@ -143,6 +143,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route
|
|||
config.RemoteWriteProtoMsgs{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2},
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
|
||||
promRouter := route.New().WithPrefix("/api/v1")
|
||||
|
|
|
@ -289,6 +289,7 @@ type Options struct {
|
|||
RemoteReadBytesInFrame int
|
||||
EnableRemoteWriteReceiver bool
|
||||
EnableOTLPWriteReceiver bool
|
||||
ConvertOTLPDelta bool
|
||||
IsAgent bool
|
||||
CTZeroIngestionEnabled bool
|
||||
AppName string
|
||||
|
@ -387,6 +388,7 @@ func New(logger *slog.Logger, o *Options) *Handler {
|
|||
o.EnableRemoteWriteReceiver,
|
||||
o.AcceptRemoteWriteProtoMsgs,
|
||||
o.EnableOTLPWriteReceiver,
|
||||
o.ConvertOTLPDelta,
|
||||
o.CTZeroIngestionEnabled,
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in a new issue