Merge pull request #14200 from aknuds1/feat/promote-attributes

OTLP Translator prometheusremotewrite: Support resource attribute promotion
This commit is contained in:
Goutham Veeramachaneni 2024-07-21 13:27:23 +02:00 committed by GitHub
commit 1fa9ba838a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 290 additions and 28 deletions

View file

@ -2,6 +2,7 @@
## unreleased ## unreleased
* [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200
* [FEATURE] Remote-Write: Add sender and receiver support for [Remote Write 2.0-rc.2](https://prometheus.io/docs/specs/remote_write_spec_2_0/) specification #14395 #14427 #14444 * [FEATURE] Remote-Write: Add sender and receiver support for [Remote Write 2.0-rc.2](https://prometheus.io/docs/specs/remote_write_spec_2_0/) specification #14395 #14427 #14444
* [ENHANCEMENT] Remote-Write: 1.x messages against Remote Write 2.x Receivers will have now correct values for `prometheus_storage_<samples|histograms|exemplar>_failed_total` in case of partial errors #14444 * [ENHANCEMENT] Remote-Write: 1.x messages against Remote Write 2.x Receivers will have now correct values for `prometheus_storage_<samples|histograms|exemplar>_failed_total` in case of partial errors #14444

View file

@ -227,6 +227,9 @@ var (
DefaultExemplarsConfig = ExemplarsConfig{ DefaultExemplarsConfig = ExemplarsConfig{
MaxExemplars: 100000, MaxExemplars: 100000,
} }
// DefaultOTLPConfig is the default OTLP configuration.
DefaultOTLPConfig = OTLPConfig{}
) )
// Config is the top-level configuration for Prometheus's config files. // Config is the top-level configuration for Prometheus's config files.
@ -242,6 +245,7 @@ type Config struct {
RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"` RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"` RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
OTLPConfig OTLPConfig `yaml:"otlp,omitempty"`
} }
// SetDirectory joins any relative file paths with dir. // SetDirectory joins any relative file paths with dir.
@ -1304,3 +1308,35 @@ func getGoGCEnv() int {
} }
return DefaultRuntimeConfig.GoGC return DefaultRuntimeConfig.GoGC
} }
// OTLPConfig is the configuration for writing to the OTLP endpoint.
type OTLPConfig struct {
PromoteResourceAttributes []string `yaml:"promote_resource_attributes,omitempty"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *OTLPConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultOTLPConfig
type plain OTLPConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
seen := map[string]struct{}{}
var err error
for i, attr := range c.PromoteResourceAttributes {
attr = strings.TrimSpace(attr)
if attr == "" {
err = errors.Join(err, fmt.Errorf("empty promoted OTel resource attribute"))
continue
}
if _, exists := seen[attr]; exists {
err = errors.Join(err, fmt.Errorf("duplicated promoted OTel resource attribute %q", attr))
continue
}
seen[attr] = struct{}{}
c.PromoteResourceAttributes[i] = attr
}
return err
}

View file

@ -156,6 +156,12 @@ var expectedConf = &Config{
}, },
}, },
OTLPConfig: OTLPConfig{
PromoteResourceAttributes: []string{
"k8s.cluster.name", "k8s.job.name", "k8s.namespace.name",
},
},
RemoteReadConfigs: []*RemoteReadConfig{ RemoteReadConfigs: []*RemoteReadConfig{
{ {
URL: mustParseURL("http://remote1/read"), URL: mustParseURL("http://remote1/read"),
@ -1471,6 +1477,26 @@ func TestRemoteWriteRetryOnRateLimit(t *testing.T) {
require.False(t, got.RemoteWriteConfigs[1].QueueConfig.RetryOnRateLimit) require.False(t, got.RemoteWriteConfigs[1].QueueConfig.RetryOnRateLimit)
} }
func TestOTLPSanitizeResourceAttributes(t *testing.T) {
t.Run("good config", func(t *testing.T) {
want, err := LoadFile(filepath.Join("testdata", "otlp_sanitize_resource_attributes.good.yml"), false, false, log.NewNopLogger())
require.NoError(t, err)
out, err := yaml.Marshal(want)
require.NoError(t, err)
var got Config
require.NoError(t, yaml.UnmarshalStrict(out, &got))
require.Equal(t, []string{"k8s.cluster.name", "k8s.job.name", "k8s.namespace.name"}, got.OTLPConfig.PromoteResourceAttributes)
})
t.Run("bad config", func(t *testing.T) {
_, err := LoadFile(filepath.Join("testdata", "otlp_sanitize_resource_attributes.bad.yml"), false, false, log.NewNopLogger())
require.ErrorContains(t, err, `duplicated promoted OTel resource attribute "k8s.job.name"`)
require.ErrorContains(t, err, `empty promoted OTel resource attribute`)
})
}
func TestLoadConfig(t *testing.T) { func TestLoadConfig(t *testing.T) {
// Parse a valid file that sets a global scrape timeout. This tests whether parsing // Parse a valid file that sets a global scrape timeout. This tests whether parsing
// an overwritten default field in the global config permanently changes the default. // an overwritten default field in the global config permanently changes the default.

View file

@ -45,6 +45,9 @@ remote_write:
headers: headers:
name: value name: value
otlp:
promote_resource_attributes: ["k8s.cluster.name", "k8s.job.name", "k8s.namespace.name"]
remote_read: remote_read:
- url: http://remote1/read - url: http://remote1/read
read_recent: true read_recent: true

View file

@ -0,0 +1,2 @@
otlp:
promote_resource_attributes: ["k8s.cluster.name", " k8s.job.name ", "k8s.namespace.name", "k8s.job.name", ""]

View file

@ -0,0 +1,2 @@
otlp:
promote_resource_attributes: ["k8s.cluster.name", " k8s.job.name ", "k8s.namespace.name"]

View file

@ -152,6 +152,10 @@ alerting:
remote_write: remote_write:
[ - <remote_write> ... ] [ - <remote_write> ... ]
# Settings related to the OTLP receiver feature.
otlp:
[ promote_resource_attributes: [<string>, ...] | default = [ ] ]
# Settings related to the remote read feature. # Settings related to the remote read feature.
remote_read: remote_read:
[ - <remote_read> ... ] [ - <remote_read> ... ]

View file

@ -65,14 +65,14 @@ type bucketBoundsData struct {
bound float64 bound float64
} }
// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds // byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds.
type byBucketBoundsData []bucketBoundsData type byBucketBoundsData []bucketBoundsData
func (m byBucketBoundsData) Len() int { return len(m) } func (m byBucketBoundsData) Len() int { return len(m) }
func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound } func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound }
func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] } func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
// ByLabelName enables the usage of sort.Sort() with a slice of labels // ByLabelName enables the usage of sort.Sort() with a slice of labels.
type ByLabelName []prompb.Label type ByLabelName []prompb.Label
func (a ByLabelName) Len() int { return len(a) } func (a ByLabelName) Len() int { return len(a) }
@ -115,14 +115,23 @@ var seps = []byte{'\xff'}
// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values. // createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values.
// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and // Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. // if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, // If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, settings Settings,
ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label { ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label {
resourceAttrs := resource.Attributes() resourceAttrs := resource.Attributes()
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName) serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID) instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID)
promotedAttrs := make([]prompb.Label, 0, len(settings.PromoteResourceAttributes))
for _, name := range settings.PromoteResourceAttributes {
if value, exists := resourceAttrs.Get(name); exists {
promotedAttrs = append(promotedAttrs, prompb.Label{Name: name, Value: value.AsString()})
}
}
sort.Stable(ByLabelName(promotedAttrs))
// Calculate the maximum possible number of labels we could return so we can preallocate l // Calculate the maximum possible number of labels we could return so we can preallocate l
maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 maxLabelCount := attributes.Len() + len(settings.ExternalLabels) + len(promotedAttrs) + len(extras)/2
if haveServiceName { if haveServiceName {
maxLabelCount++ maxLabelCount++
@ -132,9 +141,6 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
maxLabelCount++ maxLabelCount++
} }
// map ensures no duplicate label name
l := make(map[string]string, maxLabelCount)
// Ensure attributes are sorted by key for consistent merging of keys which // Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized. // collide when sanitized.
labels := make([]prompb.Label, 0, maxLabelCount) labels := make([]prompb.Label, 0, maxLabelCount)
@ -148,6 +154,8 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
}) })
sort.Stable(ByLabelName(labels)) sort.Stable(ByLabelName(labels))
// map ensures no duplicate label names.
l := make(map[string]string, maxLabelCount)
for _, label := range labels { for _, label := range labels {
var finalKey = prometheustranslator.NormalizeLabel(label.Name) var finalKey = prometheustranslator.NormalizeLabel(label.Name)
if existingValue, alreadyExists := l[finalKey]; alreadyExists { if existingValue, alreadyExists := l[finalKey]; alreadyExists {
@ -157,6 +165,13 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
} }
} }
for _, lbl := range promotedAttrs {
normalized := prometheustranslator.NormalizeLabel(lbl.Name)
if _, exists := l[normalized]; !exists {
l[normalized] = lbl.Value
}
}
// Map service.name + service.namespace to job // Map service.name + service.namespace to job
if haveServiceName { if haveServiceName {
val := serviceName.AsString() val := serviceName.AsString()
@ -169,7 +184,7 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
if haveInstanceID { if haveInstanceID {
l[model.InstanceLabel] = instance.AsString() l[model.InstanceLabel] = instance.AsString()
} }
for key, value := range externalLabels { for key, value := range settings.ExternalLabels {
// External labels have already been sanitized // External labels have already been sanitized
if _, alreadyExists := l[key]; alreadyExists { if _, alreadyExists := l[key]; alreadyExists {
// Skip external labels if they are overridden by metric attributes // Skip external labels if they are overridden by metric attributes
@ -232,7 +247,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.Histogra
for x := 0; x < dataPoints.Len(); x++ { for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x) pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp()) timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false)
// If the sum is unset, it indicates the _sum metric point should be // If the sum is unset, it indicates the _sum metric point should be
// omitted // omitted
@ -408,7 +423,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDat
for x := 0; x < dataPoints.Len(); x++ { for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x) pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp()) timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false)
// treat sum as a sample in an individual TimeSeries // treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{ sum := &prompb.Sample{
@ -554,7 +569,8 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta
name = settings.Namespace + "_" + name name = settings.Namespace + "_" + name
} }
labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name) settings.PromoteResourceAttributes = nil
labels := createAttributes(resource, attributes, settings, identifyingAttrs, false, model.MetricNameLabel, name)
haveIdentifier := false haveIdentifier := false
for _, l := range labels { for _, l := range labels {
if l.Name == model.JobLabel || l.Name == model.InstanceLabel { if l.Name == model.JobLabel || l.Name == model.InstanceLabel {

View file

@ -0,0 +1,161 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheusremotewrite
import (
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"github.com/prometheus/prometheus/prompb"
)
func TestCreateAttributes(t *testing.T) {
resourceAttrs := map[string]string{
"service.name": "service name",
"service.instance.id": "service ID",
"existent-attr": "resource value",
// This one is for testing conflict with metric attribute.
"metric-attr": "resource value",
// This one is for testing conflict with auto-generated job attribute.
"job": "resource value",
// This one is for testing conflict with auto-generated instance attribute.
"instance": "resource value",
}
resource := pcommon.NewResource()
for k, v := range resourceAttrs {
resource.Attributes().PutStr(k, v)
}
attrs := pcommon.NewMap()
attrs.PutStr("__name__", "test_metric")
attrs.PutStr("metric-attr", "metric value")
testCases := []struct {
name string
promoteResourceAttributes []string
expectedLabels []prompb.Label
}{
{
name: "Successful conversion without resource attribute promotion",
promoteResourceAttributes: nil,
expectedLabels: []prompb.Label{
{
Name: "__name__",
Value: "test_metric",
},
{
Name: "instance",
Value: "service ID",
},
{
Name: "job",
Value: "service name",
},
{
Name: "metric_attr",
Value: "metric value",
},
},
},
{
name: "Successful conversion with resource attribute promotion",
promoteResourceAttributes: []string{"non-existent-attr", "existent-attr"},
expectedLabels: []prompb.Label{
{
Name: "__name__",
Value: "test_metric",
},
{
Name: "instance",
Value: "service ID",
},
{
Name: "job",
Value: "service name",
},
{
Name: "metric_attr",
Value: "metric value",
},
{
Name: "existent_attr",
Value: "resource value",
},
},
},
{
name: "Successful conversion with resource attribute promotion, conflicting resource attributes are ignored",
promoteResourceAttributes: []string{"non-existent-attr", "existent-attr", "metric-attr", "job", "instance"},
expectedLabels: []prompb.Label{
{
Name: "__name__",
Value: "test_metric",
},
{
Name: "instance",
Value: "service ID",
},
{
Name: "job",
Value: "service name",
},
{
Name: "existent_attr",
Value: "resource value",
},
{
Name: "metric_attr",
Value: "metric value",
},
},
},
{
name: "Successful conversion with resource attribute promotion, attributes are only promoted once",
promoteResourceAttributes: []string{"existent-attr", "existent-attr"},
expectedLabels: []prompb.Label{
{
Name: "__name__",
Value: "test_metric",
},
{
Name: "instance",
Value: "service ID",
},
{
Name: "job",
Value: "service name",
},
{
Name: "existent_attr",
Value: "resource value",
},
{
Name: "metric_attr",
Value: "metric value",
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
settings := Settings{
PromoteResourceAttributes: tc.promoteResourceAttributes,
}
lbls := createAttributes(resource, attrs, settings, nil, false)
assert.ElementsMatch(t, lbls, tc.expectedLabels)
})
}
}

View file

@ -45,7 +45,7 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetr
lbls := createAttributes( lbls := createAttributes(
resource, resource,
pt.Attributes(), pt.Attributes(),
settings.ExternalLabels, settings,
nil, nil,
true, true,
model.MetricNameLabel, model.MetricNameLabel,

View file

@ -36,6 +36,7 @@ type Settings struct {
ExportCreatedMetric bool ExportCreatedMetric bool
AddMetricSuffixes bool AddMetricSuffixes bool
SendMetadata bool SendMetadata bool
PromoteResourceAttributes []string
} }
// PrometheusConverter converts from OTel write format to Prometheus remote write format. // PrometheusConverter converts from OTel write format to Prometheus remote write format.

View file

@ -34,7 +34,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.Number
labels := createAttributes( labels := createAttributes(
resource, resource,
pt.Attributes(), pt.Attributes(),
settings.ExternalLabels, settings,
nil, nil,
true, true,
model.MetricNameLabel, model.MetricNameLabel,
@ -64,7 +64,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDa
lbls := createAttributes( lbls := createAttributes(
resource, resource,
pt.Attributes(), pt.Attributes(),
settings.ExternalLabels, settings,
nil, nil,
true, true,
model.MetricNameLabel, model.MetricNameLabel,

View file

@ -472,7 +472,7 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
// writes them to the provided appendable. // writes them to the provided appendable.
func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler { func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler {
rwHandler := &writeHandler{ rwHandler := &writeHandler{
logger: logger, logger: logger,
appendable: appendable, appendable: appendable,
@ -481,12 +481,14 @@ func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable) http.
return &otlpWriteHandler{ return &otlpWriteHandler{
logger: logger, logger: logger,
rwHandler: rwHandler, rwHandler: rwHandler,
configFunc: configFunc,
} }
} }
type otlpWriteHandler struct { type otlpWriteHandler struct {
logger log.Logger logger log.Logger
rwHandler *writeHandler rwHandler *writeHandler
configFunc func() config.Config
} }
func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -497,9 +499,12 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
otlpCfg := h.configFunc().OTLPConfig
converter := otlptranslator.NewPrometheusConverter() converter := otlptranslator.NewPrometheusConverter()
if err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{ if err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{
AddMetricSuffixes: true, AddMetricSuffixes: true,
PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
}); err != nil { }); err != nil {
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
} }

View file

@ -379,7 +379,11 @@ func TestOTLPWriteHandler(t *testing.T) {
req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("Content-Type", "application/x-protobuf")
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewOTLPWriteHandler(nil, appendable) handler := NewOTLPWriteHandler(nil, appendable, func() config.Config {
return config.Config{
OTLPConfig: config.DefaultOTLPConfig,
}
})
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)

View file

@ -295,7 +295,7 @@ func NewAPI(
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs) a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs)
} }
if otlpEnabled { if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap) a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc)
} }
return a return a

View file

@ -359,6 +359,7 @@ var samplePrometheusCfg = config.Config{
ScrapeConfigs: []*config.ScrapeConfig{}, ScrapeConfigs: []*config.ScrapeConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{}, RemoteWriteConfigs: []*config.RemoteWriteConfig{},
RemoteReadConfigs: []*config.RemoteReadConfig{}, RemoteReadConfigs: []*config.RemoteReadConfig{},
OTLPConfig: config.OTLPConfig{},
} }
var sampleFlagMap = map[string]string{ var sampleFlagMap = map[string]string{