Merge branch 'prometheus:main' into patch-exemplar_ui

This commit is contained in:
Vanshika 2025-01-15 00:36:44 +05:30 committed by GitHub
commit f74d11c2fd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 683 additions and 85 deletions

View file

@ -195,7 +195,7 @@ jobs:
with:
args: --verbose
# Make sure to sync this with Makefile.common and scripts/golangci-lint.yml.
version: v1.62.0
version: v1.63.4
fuzzing:
uses: ./.github/workflows/fuzzing.yml
if: github.event_name == 'pull_request'

View file

@ -5,25 +5,28 @@ output:
sort-results: true
linters:
# Keep this list sorted alphabetically
enable:
- depguard
- errorlint
- exptostd
- gocritic
- godot
- gofumpt
- goimports
- loggercheck
- misspell
- nilnesserr
- nolintlint
- perfsprint
- predeclared
- revive
- sloglint
- testifylint
- unconvert
- unused
- usestdlibvars
- whitespace
- loggercheck
- sloglint
issues:
max-issues-per-linter: 0

View file

@ -61,7 +61,7 @@ PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_
SKIP_GOLANGCI_LINT :=
GOLANGCI_LINT :=
GOLANGCI_LINT_OPTS ?=
GOLANGCI_LINT_VERSION ?= v1.62.0
GOLANGCI_LINT_VERSION ?= v1.63.4
# golangci-lint only supports linux, darwin and windows platforms on i386/amd64/arm64.
# windows isn't included here because of the path separator being different.
ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux darwin))

View file

@ -275,6 +275,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "old-ui":
c.web.UseOldUI = true
logger.Info("Serving previous version of the Prometheus web UI.")
case "otlp-deltatocumulative":
c.web.ConvertOTLPDelta = true
logger.Info("Converting delta OTLP metrics to cumulative")
default:
logger.Warn("Unknown option for --enable-feature", "option", o)
}
@ -516,7 +519,7 @@ func main() {
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode)

View file

@ -60,7 +60,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--agent</code> | Run Prometheus in 'Agent mode'. | |
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |

View file

@ -151,3 +151,25 @@ Configuration reloads are triggered by detecting changes in the checksum of the
main configuration file or any referenced files, such as rule and scrape
configurations. To ensure consistency and avoid issues during reloads, it's
recommended to update these files atomically.
## OTLP Delta Conversion
`--enable-feature=otlp-deltatocumulative`
When enabled, Prometheus will convert OTLP metrics from delta temporality to their
cumulative equivalent, instead of dropping them.
This uses
[deltatocumulative][d2c]
from the OTel collector, using its default settings.
Delta conversion keeps in-memory state to aggregate delta changes per-series over time.
When Prometheus restarts, this state is lost, starting the aggregation from zero
again. This results in a counter reset in the cumulative series.
This state is periodically ([`max_stale`][d2c]) cleared of inactive series.
Enabling this _can_ have negative impact on performance, because the in-memory
state is mutex guarded. Cumulative-only OTLP requests are not affected.
[d2c]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor

View file

@ -1424,6 +1424,15 @@ endpoint is `/api/v1/otlp/v1/metrics`.
*New in v2.47*
### OTLP Delta
Prometheus can convert incoming metrics from delta temporality to their cumulative equivalent.
This is done using [deltatocumulative](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor) from the OpenTelemetry Collector.
To enable, pass `--enable-feature=otlp-deltatocumulative`.
*New in v3.2*
## Notifications
The following endpoints provide information about active status notifications concerning the Prometheus server itself.

11
go.mod
View file

@ -48,6 +48,7 @@ require (
github.com/nsf/jsondiff v0.0.0-20230430225905-43f6cf3098c1
github.com/oklog/run v1.1.0
github.com/oklog/ulid v1.3.1
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.116.0
github.com/ovh/go-ovh v1.6.0
github.com/prometheus/alertmanager v0.27.0
github.com/prometheus/client_golang v1.20.5
@ -60,7 +61,10 @@ require (
github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c
github.com/stretchr/testify v1.10.0
github.com/vultr/govultr/v2 v2.17.2
go.opentelemetry.io/collector/component v0.116.0
go.opentelemetry.io/collector/consumer v1.22.0
go.opentelemetry.io/collector/pdata v1.22.0
go.opentelemetry.io/collector/processor v0.116.0
go.opentelemetry.io/collector/semconv v0.116.0
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.58.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0
@ -68,6 +72,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0
go.opentelemetry.io/otel/metric v1.33.0
go.opentelemetry.io/otel/sdk v1.33.0
go.opentelemetry.io/otel/trace v1.33.0
go.uber.org/atomic v1.11.0
@ -168,6 +173,8 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.116.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.116.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
@ -184,8 +191,10 @@ require (
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
go.mongodb.org/mongo-driver v1.14.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.33.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.116.0 // indirect
go.opentelemetry.io/collector/pipeline v0.116.0 // indirect
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
golang.org/x/mod v0.22.0 // indirect

54
go.sum
View file

@ -165,6 +165,8 @@ github.com/go-resty/resty/v2 v2.16.2/go.mod h1:0fHAoK7JoBy/Ch36N8VFeMsK7xQOHhvWa
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/go-zookeeper/zk v1.0.4 h1:DPzxraQx7OrPyXq2phlGlNSIyWEsAox0RJmjTseMV6I=
github.com/go-zookeeper/zk v1.0.4/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
@ -304,6 +306,12 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs=
github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI=
github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU=
github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU=
github.com/knadh/koanf/v2 v2.1.2 h1:I2rtLRqXRy1p01m/utEtpZSSA6dcJbgGVuE27kW2PzQ=
github.com/knadh/koanf/v2 v2.1.2/go.mod h1:Gphfaen0q1Fc1HTgJgSTC4oRX9R2R5ErYMZJy8fLJBo=
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@ -348,6 +356,8 @@ github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJys
github.com/miekg/dns v1.1.62 h1:cN8OuEF1/x5Rq6Np+h1epln8OiyPWV+lROx9LxcGgIQ=
github.com/miekg/dns v1.1.62/go.mod h1:mvDlcItzm+br7MToIKqkglaGhlFMHJ9DTNNWONWXbNQ=
github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
@ -355,6 +365,8 @@ github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc=
@ -383,6 +395,14 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.116.0 h1:Kxk5Ral+Dc6VB9UmTketVjs+rbMZP8JxQ4SXDx4RivQ=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.116.0/go.mod h1:ctT6oQmGmWGGGgUIKyx2fDwqz77N9+04gqKkDyAzKCg=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.116.0 h1:RlEK9MbxWyBHbLel8EJ1L7DbYVLai9dZL6Ljl2cBgyA=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.116.0/go.mod h1:AVUEyIjPb+0ARr7mhIkZkdNg3fd0ZcRhzAi53oZhl1Q=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.116.0 h1:jwnZYRBuPJnsKXE5H6ZvTEm91bXW5VP8+tLewzl54eg=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.116.0/go.mod h1:NT3Ag+DdnIAZQfD7l7OHwlYqnaAJ19SoPZ0nhD9yx4s=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.116.0 h1:ZBmLuipJv7BT9fho/2yAFsS8AtMsCOCe4ON8oqkX3n8=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.116.0/go.mod h1:f0GdYWGxUunyRZ088gHnoX78pc/gZc3dQlRtidiGXzg=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
@ -490,8 +510,36 @@ go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd
go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/collector/component v0.116.0 h1:SQE1YeVfYCN7bw1n4hknUwJE5U/1qJL552sDhAdSlaA=
go.opentelemetry.io/collector/component v0.116.0/go.mod h1:MYgXFZWDTq0uPgF1mkLSFibtpNqksRVAOrmihckOQEs=
go.opentelemetry.io/collector/component/componentstatus v0.116.0 h1:wpgY0H2K9IPBzaNAvavKziK86VZ7TuNFQbS9OC4Z6Cs=
go.opentelemetry.io/collector/component/componentstatus v0.116.0/go.mod h1:ZRlVwHFMGNfcsAywEJqivOn5JzDZkpe3KZVSwMWu4tw=
go.opentelemetry.io/collector/component/componenttest v0.116.0 h1:UIcnx4Rrs/oDRYSAZNHRMUiYs2FBlwgV5Nc0oMYfR6A=
go.opentelemetry.io/collector/component/componenttest v0.116.0/go.mod h1:W40HaKPHdBFMVI7zzHE7dhdWC+CgAnAC9SmWetFBATY=
go.opentelemetry.io/collector/config/configtelemetry v0.116.0 h1:Vl49VCHQwBOeMswDpFwcl2HD8e9y94xlrfII3SR2VeQ=
go.opentelemetry.io/collector/config/configtelemetry v0.116.0/go.mod h1:SlBEwQg0qly75rXZ6W1Ig8jN25KBVBkFIIAUI1GiAAE=
go.opentelemetry.io/collector/confmap v1.22.0 h1:ZKQzRuj5lKu+seKArAAZ1yPRroDPricaIVIREm/jr3w=
go.opentelemetry.io/collector/confmap v1.22.0/go.mod h1:Rrhs+MWoaP6AswZp+ReQ2VO9dfOfcUjdjiSHBsG+nec=
go.opentelemetry.io/collector/consumer v1.22.0 h1:QmfnNizyNZFt0uK3GG/EoT5h6PvZJ0dgVTc5hFEc1l0=
go.opentelemetry.io/collector/consumer v1.22.0/go.mod h1:tiz2khNceFAPokxxfzAuFfIpShBasMT2AL2Sbc7+m0I=
go.opentelemetry.io/collector/consumer/consumertest v0.116.0 h1:pIVR7FtQMNAzfxBUSMEIC2dX5Lfo3O9ZBfx+sAwrrrM=
go.opentelemetry.io/collector/consumer/consumertest v0.116.0/go.mod h1:cV3cNDiPnls5JdhnOJJFVlclrClg9kPs04cXgYP9Gmk=
go.opentelemetry.io/collector/consumer/xconsumer v0.116.0 h1:ZrWvq7HumB0jRYmS2ztZ3hhXRNpUVBWPKMbPhsVGmZM=
go.opentelemetry.io/collector/consumer/xconsumer v0.116.0/go.mod h1:C+VFMk8vLzPun6XK8aMts6h4RaDjmzXHCPaiOxzRQzQ=
go.opentelemetry.io/collector/pdata v1.22.0 h1:3yhjL46NLdTMoP8rkkcE9B0pzjf2973crn0KKhX5UrI=
go.opentelemetry.io/collector/pdata v1.22.0/go.mod h1:nLLf6uDg8Kn5g3WNZwGyu8+kf77SwOqQvMTb5AXEbEY=
go.opentelemetry.io/collector/pdata/pprofile v0.116.0 h1:iE6lqkO7Hi6lTIIml1RI7yQ55CKqW12R2qHinwF5Zuk=
go.opentelemetry.io/collector/pdata/pprofile v0.116.0/go.mod h1:xQiPpjzIiXRFb+1fPxUy/3ygEZgo0Bu/xmLKOWu8vMQ=
go.opentelemetry.io/collector/pdata/testdata v0.116.0 h1:zmn1zpeX2BvzL6vt2dBF4OuAyFF2ml/OXcqflNgFiP0=
go.opentelemetry.io/collector/pdata/testdata v0.116.0/go.mod h1:ytWzICFN4XTDP6o65B4+Ed52JGdqgk9B8CpLHCeCpMo=
go.opentelemetry.io/collector/pipeline v0.116.0 h1:o8eKEuWEszmRpfShy7ElBoQ3Jo6kCi9ucm3yRgdNb9s=
go.opentelemetry.io/collector/pipeline v0.116.0/go.mod h1:qE3DmoB05AW0C3lmPvdxZqd/H4po84NPzd5MrqgtL74=
go.opentelemetry.io/collector/processor v0.116.0 h1:Kyu4tPzTdWNHtZjcxvI/bGNAgyv8L8Kem2r/Mk4IDAw=
go.opentelemetry.io/collector/processor v0.116.0/go.mod h1:+/Ugy48RAxlZEXmN2cw51W8t5wdHS9No+GAoP+moskk=
go.opentelemetry.io/collector/processor/processortest v0.116.0 h1:+IqNEVEE0E2MsO2g7+Y/9dz35sDuvAXRXrLts9NdXrA=
go.opentelemetry.io/collector/processor/processortest v0.116.0/go.mod h1:DLaQDBxzgeeaUO0ULMn/efos9PmHZkmYCHuxwCsiVHI=
go.opentelemetry.io/collector/processor/xprocessor v0.116.0 h1:iin/UwuWvSLB7ZNfINFUYbZ5lxIi1NjZ2brkyyFdiRA=
go.opentelemetry.io/collector/processor/xprocessor v0.116.0/go.mod h1:cnA43/XpKDbaOmd8buqKp/LGJ2l/OoCqbR//u5DMfn8=
go.opentelemetry.io/collector/semconv v0.116.0 h1:63xCZomsKJAWmKGWD3lnORiE3WKW6AO4LjnzcHzGx3Y=
go.opentelemetry.io/collector/semconv v0.116.0/go.mod h1:N6XE8Q0JKgBN2fAhkUQtqK9LT7rEGR6+Wu/Rtbal1iI=
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.58.0 h1:xwH3QJv6zL4u+gkPUu59NeT1Gyw9nScWT8FQpKLUJJI=
@ -510,8 +558,8 @@ go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5W
go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M=
go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM=
go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM=
go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc=
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU=
go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ=
go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s=
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
go.opentelemetry.io/proto/otlp v1.4.0 h1:TA9WRvW6zMwP+Ssb6fLoUIuirti1gGbP28GcKG1jgeg=
@ -524,6 +572,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=

View file

@ -363,17 +363,18 @@ grouping_label_list:
grouping_label : maybe_label
{
if !model.LabelName($1.Val).IsValid() {
yylex.(*parser).unexpected("grouping opts", "label")
yylex.(*parser).addParseErrf($1.PositionRange(),"invalid label name for grouping: %q", $1.Val)
}
$$ = $1
}
| STRING {
if !model.LabelName(yylex.(*parser).unquoteString($1.Val)).IsValid() {
yylex.(*parser).unexpected("grouping opts", "label")
unquoted := yylex.(*parser).unquoteString($1.Val)
if !model.LabelName(unquoted).IsValid() {
yylex.(*parser).addParseErrf($1.PositionRange(),"invalid label name for grouping: %q", unquoted)
}
$$ = $1
$$.Pos++
$$.Val = yylex.(*parser).unquoteString($$.Val)
$$.Val = unquoted
}
| error
{ yylex.(*parser).unexpected("grouping opts", "label"); $$ = Item{} }

View file

@ -1259,19 +1259,20 @@ yydefault:
yyDollar = yyS[yypt-1 : yypt+1]
{
if !model.LabelName(yyDollar[1].item.Val).IsValid() {
yylex.(*parser).unexpected("grouping opts", "label")
yylex.(*parser).addParseErrf(yyDollar[1].item.PositionRange(), "invalid label name for grouping: %q", yyDollar[1].item.Val)
}
yyVAL.item = yyDollar[1].item
}
case 59:
yyDollar = yyS[yypt-1 : yypt+1]
{
if !model.LabelName(yylex.(*parser).unquoteString(yyDollar[1].item.Val)).IsValid() {
yylex.(*parser).unexpected("grouping opts", "label")
unquoted := yylex.(*parser).unquoteString(yyDollar[1].item.Val)
if !model.LabelName(unquoted).IsValid() {
yylex.(*parser).addParseErrf(yyDollar[1].item.PositionRange(), "invalid label name for grouping: %q", unquoted)
}
yyVAL.item = yyDollar[1].item
yyVAL.item.Pos++
yyVAL.item.Val = yylex.(*parser).unquoteString(yyVAL.item.Val)
yyVAL.item.Val = unquoted
}
case 60:
yyDollar = yyS[yypt-1 : yypt+1]

View file

@ -46,15 +46,15 @@ type nopAppender struct{}
func (a nopAppender) SetOptions(opts *storage.AppendOptions) {}
func (a nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) {
return 0, nil
return 1, nil
}
func (a nopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar) (storage.SeriesRef, error) {
return 0, nil
return 2, nil
}
func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, nil
return 3, nil
}
func (a nopAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
@ -62,11 +62,11 @@ func (a nopAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels
}
func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
return 0, nil
return 4, nil
}
func (a nopAppender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) {
return 0, nil
return 5, nil
}
func (a nopAppender) Commit() error { return nil }

View file

@ -39,6 +39,7 @@ import (
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
@ -1256,42 +1257,73 @@ func TestScrapeLoopFailLegacyUnderUTF8(t *testing.T) {
func makeTestMetrics(n int) []byte {
// Construct a metrics string to parse
sb := bytes.Buffer{}
fmt.Fprintf(&sb, "# TYPE metric_a gauge\n")
fmt.Fprintf(&sb, "# HELP metric_a help text\n")
for i := 0; i < n; i++ {
fmt.Fprintf(&sb, "# TYPE metric_a gauge\n")
fmt.Fprintf(&sb, "# HELP metric_a help text\n")
fmt.Fprintf(&sb, "metric_a{foo=\"%d\",bar=\"%d\"} 1\n", i, i*100)
}
fmt.Fprintf(&sb, "# EOF\n")
return sb.Bytes()
}
func BenchmarkScrapeLoopAppend(b *testing.B) {
ctx, sl := simpleTestScrapeLoop(b)
func promTextToProto(tb testing.TB, text []byte) []byte {
tb.Helper()
slApp := sl.appender(ctx)
metrics := makeTestMetrics(100)
ts := time.Time{}
d := expfmt.NewDecoder(bytes.NewReader(text), expfmt.TextVersion)
b.ResetTimer()
for i := 0; i < b.N; i++ {
ts = ts.Add(time.Second)
_, _, _, _ = sl.append(slApp, metrics, "text/plain", ts)
pb := &dto.MetricFamily{}
if err := d.Decode(pb); err != nil {
tb.Fatal(err)
}
o, err := proto.Marshal(pb)
if err != nil {
tb.Fatal(err)
}
buf := bytes.Buffer{}
// Write first length, then binary protobuf.
varintBuf := binary.AppendUvarint(nil, uint64(len(o)))
buf.Write(varintBuf)
buf.Write(o)
return buf.Bytes()
}
func BenchmarkScrapeLoopAppendOM(b *testing.B) {
ctx, sl := simpleTestScrapeLoop(b)
/*
export bench=scrape-loop-v1 && go test \
-run '^$' -bench '^BenchmarkScrapeLoopAppend' \
-benchtime 5s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
*/
func BenchmarkScrapeLoopAppend(b *testing.B) {
metricsText := makeTestMetrics(100)
slApp := sl.appender(ctx)
metrics := makeTestMetrics(100)
ts := time.Time{}
// Create proto representation.
metricsProto := promTextToProto(b, metricsText)
b.ResetTimer()
for _, bcase := range []struct {
name string
contentType string
parsable []byte
}{
{name: "PromText", contentType: "text/plain", parsable: metricsText},
{name: "OMText", contentType: "application/openmetrics-text", parsable: metricsText},
{name: "PromProto", contentType: "application/vnd.google.protobuf", parsable: metricsProto},
} {
b.Run(fmt.Sprintf("fmt=%v", bcase.name), func(b *testing.B) {
ctx, sl := simpleTestScrapeLoop(b)
for i := 0; i < b.N; i++ {
ts = ts.Add(time.Second)
_, _, _, _ = sl.append(slApp, metrics, "application/openmetrics-text", ts)
slApp := sl.appender(ctx)
ts := time.Time{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
ts = ts.Add(time.Second)
_, _, _, err := sl.append(slApp, bcase.parsable, bcase.contentType, ts)
if err != nil {
b.Fatal(err)
}
}
})
}
}
@ -2454,18 +2486,7 @@ metric: <
buf := &bytes.Buffer{}
if test.contentType == "application/vnd.google.protobuf" {
// In case of protobuf, we have to create the binary representation.
pb := &dto.MetricFamily{}
// From text to proto message.
require.NoError(t, proto.UnmarshalText(test.scrapeText, pb))
// From proto message to binary protobuf.
protoBuf, err := proto.Marshal(pb)
require.NoError(t, err)
// Write first length, then binary protobuf.
varintBuf := binary.AppendUvarint(nil, uint64(len(protoBuf)))
buf.Write(varintBuf)
buf.Write(protoBuf)
require.NoError(t, textToProto(test.scrapeText, buf))
} else {
buf.WriteString(test.scrapeText)
}
@ -2480,6 +2501,26 @@ metric: <
}
}
func textToProto(text string, buf *bytes.Buffer) error {
// In case of protobuf, we have to create the binary representation.
pb := &dto.MetricFamily{}
// From text to proto message.
err := proto.UnmarshalText(text, pb)
if err != nil {
return err
}
// From proto message to binary protobuf.
protoBuf, err := proto.Marshal(pb)
if err != nil {
return err
}
// Write first length, then binary protobuf.
varintBuf := binary.AppendUvarint(nil, uint64(len(protoBuf)))
buf.Write(varintBuf)
buf.Write(protoBuf)
return nil
}
func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
scrapeText := []string{`metric_total{n="1"} 1 # {t="1"} 1.0 10000
# EOF`, `metric_total{n="1"} 2 # {t="2"} 2.0 20000

View file

@ -36,4 +36,4 @@ jobs:
uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8 # v6.1.1
with:
args: --verbose
version: v1.62.0
version: v1.63.4

View file

@ -38,6 +38,13 @@ import (
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
deltatocumulative "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/otel/metric/noop"
)
type writeHandler struct {
@ -517,56 +524,107 @@ func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref stora
return ref, err
}
type OTLPOptions struct {
// Convert delta samples to their cumulative equivalent by aggregating in-memory
ConvertDelta bool
}
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
// writes them to the provided appendable.
func NewOTLPWriteHandler(logger *slog.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler {
rwHandler := &writeHandler{
logger: logger,
appendable: appendable,
func NewOTLPWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, configFunc func() config.Config, opts OTLPOptions) http.Handler {
ex := &rwExporter{
writeHandler: &writeHandler{
logger: logger,
appendable: appendable,
},
config: configFunc,
}
return &otlpWriteHandler{
logger: logger,
rwHandler: rwHandler,
configFunc: configFunc,
wh := &otlpWriteHandler{logger: logger, cumul: ex}
if opts.ConvertDelta {
fac := deltatocumulative.NewFactory()
set := processor.Settings{TelemetrySettings: component.TelemetrySettings{MeterProvider: noop.NewMeterProvider()}}
d2c, err := fac.CreateMetrics(context.Background(), set, fac.CreateDefaultConfig(), wh.cumul)
if err != nil {
// fac.CreateMetrics directly calls [deltatocumulativeprocessor.createMetricsProcessor],
// which only errors if:
// - cfg.(type) != *Config
// - telemetry.New fails due to bad set.TelemetrySettings
//
// both cannot be the case, as we pass a valid *Config and valid TelemetrySettings.
// as such, we assume this error to never occur.
// if it is, our assumptions are broken in which case a panic seems acceptable.
panic(err)
}
if err := d2c.Start(context.Background(), nil); err != nil {
// deltatocumulative does not error on start. see above for panic reasoning
panic(err)
}
wh.delta = d2c
}
return wh
}
type otlpWriteHandler struct {
logger *slog.Logger
rwHandler *writeHandler
configFunc func() config.Config
type rwExporter struct {
*writeHandler
config func() config.Config
}
func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
req, err := DecodeOTLPWriteRequest(r)
if err != nil {
h.logger.Error("Error decoding remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
otlpCfg := h.configFunc().OTLPConfig
func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
otlpCfg := rw.config().OTLPConfig
converter := otlptranslator.NewPrometheusConverter()
annots, err := converter.FromMetrics(r.Context(), req.Metrics(), otlptranslator.Settings{
annots, err := converter.FromMetrics(ctx, md, otlptranslator.Settings{
AddMetricSuffixes: true,
AllowUTF8: otlpCfg.TranslationStrategy == config.NoUTF8EscapingWithSuffixes,
PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
KeepIdentifyingResourceAttributes: otlpCfg.KeepIdentifyingResourceAttributes,
})
if err != nil {
h.logger.Warn("Error translating OTLP metrics to Prometheus write request", "err", err)
rw.logger.Warn("Error translating OTLP metrics to Prometheus write request", "err", err)
}
ws, _ := annots.AsStrings("", 0, 0)
if len(ws) > 0 {
h.logger.Warn("Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
rw.logger.Warn("Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
}
err = h.rwHandler.write(r.Context(), &prompb.WriteRequest{
err = rw.write(ctx, &prompb.WriteRequest{
Timeseries: converter.TimeSeries(),
Metadata: converter.Metadata(),
})
return err
}
func (rw *rwExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
type otlpWriteHandler struct {
logger *slog.Logger
cumul consumer.Metrics // only cumulative
delta consumer.Metrics // delta capable
}
func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
req, err := DecodeOTLPWriteRequest(r)
if err != nil {
h.logger.Error("Error decoding OTLP write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
md := req.Metrics()
// if delta conversion enabled AND delta samples exist, use slower delta capable path
if h.delta != nil && hasDelta(md) {
err = h.delta.ConsumeMetrics(r.Context(), md)
} else {
// deltatocumulative currently holds a sync.Mutex when entering ConsumeMetrics.
// This is slow and not necessary when no delta samples exist anyways
err = h.cumul.ConsumeMetrics(r.Context(), md)
}
switch {
case err == nil:
@ -583,6 +641,31 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func hasDelta(md pmetric.Metrics) bool {
for i := range md.ResourceMetrics().Len() {
sms := md.ResourceMetrics().At(i).ScopeMetrics()
for i := range sms.Len() {
ms := sms.At(i).Metrics()
for i := range ms.Len() {
temporality := pmetric.AggregationTemporalityUnspecified
m := ms.At(i)
switch ms.At(i).Type() {
case pmetric.MetricTypeSum:
temporality = m.Sum().AggregationTemporality()
case pmetric.MetricTypeExponentialHistogram:
temporality = m.ExponentialHistogram().AggregationTemporality()
case pmetric.MetricTypeHistogram:
temporality = m.Histogram().AggregationTemporality()
}
if temporality == pmetric.AggregationTemporalityDelta {
return true
}
}
}
}
return false
}
type timeLimitAppender struct {
storage.Appender

View file

@ -15,13 +15,23 @@ package remote
import (
"bytes"
"context"
"errors"
"fmt"
"log/slog"
"math/rand/v2"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"runtime"
"strconv"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/prometheus/client_golang/prometheus"
common_config "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
@ -31,8 +41,10 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
)
func testRemoteWriteConfig() *config.RemoteWriteConfig {
@ -379,11 +391,11 @@ func TestOTLPWriteHandler(t *testing.T) {
req.Header.Set("Content-Type", "application/x-protobuf")
appendable := &mockAppendable{}
handler := NewOTLPWriteHandler(nil, appendable, func() config.Config {
handler := NewOTLPWriteHandler(nil, nil, appendable, func() config.Config {
return config.Config{
OTLPConfig: config.DefaultOTLPConfig,
}
})
}, OTLPOptions{})
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -476,3 +488,364 @@ func generateOTLPWriteRequest() pmetricotlp.ExportRequest {
return pmetricotlp.NewExportRequestFromMetrics(d)
}
func TestOTLPDelta(t *testing.T) {
log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
appendable := &mockAppendable{}
cfg := func() config.Config {
return config.Config{OTLPConfig: config.DefaultOTLPConfig}
}
handler := NewOTLPWriteHandler(log, nil, appendable, cfg, OTLPOptions{ConvertDelta: true})
md := pmetric.NewMetrics()
ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
m := ms.AppendEmpty()
m.SetName("some.delta.total")
sum := m.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
ts := time.Date(2000, 1, 2, 3, 4, 0, 0, time.UTC)
for i := range 3 {
dp := sum.DataPoints().AppendEmpty()
dp.SetIntValue(int64(i))
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts.Add(time.Duration(i) * time.Second)))
}
proto, err := pmetricotlp.NewExportRequestFromMetrics(md).MarshalProto()
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(proto))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/x-protobuf")
rec := httptest.NewRecorder()
handler.ServeHTTP(rec, req)
require.Equal(t, http.StatusOK, rec.Result().StatusCode)
ls := labels.FromStrings("__name__", "some_delta_total")
milli := func(sec int) int64 {
return time.Date(2000, 1, 2, 3, 4, sec, 0, time.UTC).UnixMilli()
}
want := []mockSample{
{t: milli(0), l: ls, v: 0}, // +0
{t: milli(1), l: ls, v: 1}, // +1
{t: milli(2), l: ls, v: 3}, // +2
}
if diff := cmp.Diff(want, appendable.samples, cmp.Exporter(func(_ reflect.Type) bool { return true })); diff != "" {
t.Fatal(diff)
}
}
func BenchmarkOTLP(b *testing.B) {
start := time.Date(2000, 1, 2, 3, 4, 5, 0, time.UTC)
type Type struct {
name string
data func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric
}
types := []Type{{
name: "sum",
data: func() func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
cumul := make(map[int]float64)
return func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
m := pmetric.NewMetric()
sum := m.SetEmptySum()
sum.SetAggregationTemporality(mode)
dps := sum.DataPoints()
for id := range dpc {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(start))
dp.SetTimestamp(pcommon.NewTimestampFromTime(start.Add(time.Duration(epoch) * time.Minute)))
dp.Attributes().PutStr("id", strconv.Itoa(id))
v := float64(rand.IntN(100)) / 10
switch mode {
case pmetric.AggregationTemporalityDelta:
dp.SetDoubleValue(v)
case pmetric.AggregationTemporalityCumulative:
cumul[id] += v
dp.SetDoubleValue(cumul[id])
}
}
return []pmetric.Metric{m}
}
}(),
}, {
name: "histogram",
data: func() func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
bounds := [4]float64{1, 10, 100, 1000}
type state struct {
counts [4]uint64
count uint64
sum float64
}
var cumul []state
return func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
if cumul == nil {
cumul = make([]state, dpc)
}
m := pmetric.NewMetric()
hist := m.SetEmptyHistogram()
hist.SetAggregationTemporality(mode)
dps := hist.DataPoints()
for id := range dpc {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(start))
dp.SetTimestamp(pcommon.NewTimestampFromTime(start.Add(time.Duration(epoch) * time.Minute)))
dp.Attributes().PutStr("id", strconv.Itoa(id))
dp.ExplicitBounds().FromRaw(bounds[:])
var obs *state
switch mode {
case pmetric.AggregationTemporalityDelta:
obs = new(state)
case pmetric.AggregationTemporalityCumulative:
obs = &cumul[id]
}
for i := range obs.counts {
v := uint64(rand.IntN(10))
obs.counts[i] += v
obs.count++
obs.sum += float64(v)
}
dp.SetCount(obs.count)
dp.SetSum(obs.sum)
dp.BucketCounts().FromRaw(obs.counts[:])
}
return []pmetric.Metric{m}
}
}(),
}, {
name: "exponential",
data: func() func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
type state struct {
counts [4]uint64
count uint64
sum float64
}
var cumul []state
return func(mode pmetric.AggregationTemporality, dpc, epoch int) []pmetric.Metric {
if cumul == nil {
cumul = make([]state, dpc)
}
m := pmetric.NewMetric()
ex := m.SetEmptyExponentialHistogram()
ex.SetAggregationTemporality(mode)
dps := ex.DataPoints()
for id := range dpc {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(start))
dp.SetTimestamp(pcommon.NewTimestampFromTime(start.Add(time.Duration(epoch) * time.Minute)))
dp.Attributes().PutStr("id", strconv.Itoa(id))
dp.SetScale(2)
var obs *state
switch mode {
case pmetric.AggregationTemporalityDelta:
obs = new(state)
case pmetric.AggregationTemporalityCumulative:
obs = &cumul[id]
}
for i := range obs.counts {
v := uint64(rand.IntN(10))
obs.counts[i] += v
obs.count++
obs.sum += float64(v)
}
dp.Positive().BucketCounts().FromRaw(obs.counts[:])
dp.SetCount(obs.count)
dp.SetSum(obs.sum)
}
return []pmetric.Metric{m}
}
}(),
}}
modes := []struct {
name string
data func(func(pmetric.AggregationTemporality, int, int) []pmetric.Metric, int) []pmetric.Metric
}{{
name: "cumulative",
data: func(data func(pmetric.AggregationTemporality, int, int) []pmetric.Metric, epoch int) []pmetric.Metric {
return data(pmetric.AggregationTemporalityCumulative, 10, epoch)
},
}, {
name: "delta",
data: func(data func(pmetric.AggregationTemporality, int, int) []pmetric.Metric, epoch int) []pmetric.Metric {
return data(pmetric.AggregationTemporalityDelta, 10, epoch)
},
}, {
name: "mixed",
data: func(data func(pmetric.AggregationTemporality, int, int) []pmetric.Metric, epoch int) []pmetric.Metric {
cumul := data(pmetric.AggregationTemporalityCumulative, 5, epoch)
delta := data(pmetric.AggregationTemporalityDelta, 5, epoch)
out := append(cumul, delta...)
rand.Shuffle(len(out), func(i, j int) { out[i], out[j] = out[j], out[i] })
return out
},
}}
configs := []struct {
name string
opts OTLPOptions
}{
{name: "default"},
{name: "convert", opts: OTLPOptions{ConvertDelta: true}},
}
Workers := runtime.GOMAXPROCS(0)
for _, cs := range types {
for _, mode := range modes {
for _, cfg := range configs {
b.Run(fmt.Sprintf("type=%s/temporality=%s/cfg=%s", cs.name, mode.name, cfg.name), func(b *testing.B) {
if !cfg.opts.ConvertDelta && (mode.name == "delta" || mode.name == "mixed") {
b.Skip("not possible")
}
var total int
// reqs is a [b.N]*http.Request, divided across the workers.
// deltatocumulative requires timestamps to be strictly in
// order on a per-series basis. to ensure this, each reqs[k]
// contains samples of differently named series, sorted
// strictly in time order
reqs := make([][]*http.Request, Workers)
for n := range b.N {
k := n % Workers
md := pmetric.NewMetrics()
ms := md.ResourceMetrics().AppendEmpty().
ScopeMetrics().AppendEmpty().
Metrics()
for i, m := range mode.data(cs.data, n) {
m.SetName(fmt.Sprintf("benchmark_%d_%d", k, i))
m.MoveTo(ms.AppendEmpty())
}
total += sampleCount(md)
ex := pmetricotlp.NewExportRequestFromMetrics(md)
data, err := ex.MarshalProto()
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(data))
require.NoError(b, err)
req.Header.Set("Content-Type", "application/x-protobuf")
reqs[k] = append(reqs[k], req)
}
log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
mock := new(mockAppendable)
appendable := syncAppendable{Appendable: mock, lock: new(sync.Mutex)}
cfgfn := func() config.Config {
return config.Config{OTLPConfig: config.DefaultOTLPConfig}
}
handler := NewOTLPWriteHandler(log, nil, appendable, cfgfn, cfg.opts)
fail := make(chan struct{})
done := make(chan struct{})
b.ResetTimer()
b.ReportAllocs()
// we use multiple workers to mimic a real-world scenario
// where multiple OTel collectors are sending their
// time-series in parallel.
// this is necessary to exercise potential lock-contention
// in this benchmark
for k := range Workers {
go func() {
rec := httptest.NewRecorder()
for _, req := range reqs[k] {
handler.ServeHTTP(rec, req)
if rec.Result().StatusCode != http.StatusOK {
fail <- struct{}{}
return
}
}
done <- struct{}{}
}()
}
for range Workers {
select {
case <-fail:
b.FailNow()
case <-done:
}
}
require.Equal(b, total, len(mock.samples)+len(mock.histograms))
})
}
}
}
}
func sampleCount(md pmetric.Metrics) int {
var total int
rms := md.ResourceMetrics()
for i := range rms.Len() {
sms := rms.At(i).ScopeMetrics()
for i := range sms.Len() {
ms := sms.At(i).Metrics()
for i := range ms.Len() {
m := ms.At(i)
switch m.Type() {
case pmetric.MetricTypeSum:
total += m.Sum().DataPoints().Len()
case pmetric.MetricTypeGauge:
total += m.Gauge().DataPoints().Len()
case pmetric.MetricTypeHistogram:
dps := m.Histogram().DataPoints()
for i := range dps.Len() {
total += dps.At(i).BucketCounts().Len()
total++ // le=+Inf series
total++ // _sum series
total++ // _count series
}
case pmetric.MetricTypeExponentialHistogram:
total += m.ExponentialHistogram().DataPoints().Len()
case pmetric.MetricTypeSummary:
total += m.Summary().DataPoints().Len()
}
}
}
}
return total
}
type syncAppendable struct {
lock sync.Locker
storage.Appendable
}
type syncAppender struct {
lock sync.Locker
storage.Appender
}
func (s syncAppendable) Appender(ctx context.Context) storage.Appender {
return syncAppender{Appender: s.Appendable.Appender(ctx), lock: s.lock}
}
func (s syncAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
s.lock.Lock()
defer s.lock.Unlock()
return s.Appender.Append(ref, l, t, v)
}
func (s syncAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, f *histogram.FloatHistogram) (storage.SeriesRef, error) {
s.lock.Lock()
defer s.lock.Unlock()
return s.Appender.AppendHistogram(ref, l, t, h, f)
}

View file

@ -259,7 +259,7 @@ func NewAPI(
statsRenderer StatsRenderer,
rwEnabled bool,
acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg,
otlpEnabled bool,
otlpEnabled, otlpDeltaToCumulative bool,
ctZeroIngestionEnabled bool,
) *API {
a := &API{
@ -307,7 +307,7 @@ func NewAPI(
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled)
}
if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc)
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ConvertDelta: otlpDeltaToCumulative})
}
return a

View file

@ -143,6 +143,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route
config.RemoteWriteProtoMsgs{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2},
false,
false,
false,
)
promRouter := route.New().WithPrefix("/api/v1")

View file

@ -289,6 +289,7 @@ type Options struct {
RemoteReadBytesInFrame int
EnableRemoteWriteReceiver bool
EnableOTLPWriteReceiver bool
ConvertOTLPDelta bool
IsAgent bool
CTZeroIngestionEnabled bool
AppName string
@ -387,6 +388,7 @@ func New(logger *slog.Logger, o *Options) *Handler {
o.EnableRemoteWriteReceiver,
o.AcceptRemoteWriteProtoMsgs,
o.EnableOTLPWriteReceiver,
o.ConvertOTLPDelta,
o.CTZeroIngestionEnabled,
)