prometheusremotewrite: Support resource attribute promotion

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Arve Knudsen 2024-06-03 18:02:26 +02:00
parent e892483768
commit a25b626792
10 changed files with 75 additions and 28 deletions

View file

@ -227,6 +227,9 @@ var (
DefaultExemplarsConfig = ExemplarsConfig{ DefaultExemplarsConfig = ExemplarsConfig{
MaxExemplars: 100000, MaxExemplars: 100000,
} }
// DefaultOTLPConfig is the default OTLP configuration.
DefaultOTLPConfig = OTLPConfig{}
) )
// Config is the top-level configuration for Prometheus's config files. // Config is the top-level configuration for Prometheus's config files.
@ -242,6 +245,7 @@ type Config struct {
RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"` RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"` RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
OTLPConfig OTLPConfig `yaml:"otlp,omitempty"`
} }
// SetDirectory joins any relative file paths with dir. // SetDirectory joins any relative file paths with dir.
@ -1304,3 +1308,15 @@ func getGoGCEnv() int {
} }
return DefaultRuntimeConfig.GoGC return DefaultRuntimeConfig.GoGC
} }
// OTLPConfig is the configuration for writing to the OTLP endpoint.
type OTLPConfig struct {
PromoteResourceAttributes []string `yaml:"promote_resource_attributes,omitempty"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *OTLPConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultOTLPConfig
type plain OTLPConfig
return unmarshal((*plain)(c))
}

View file

@ -152,6 +152,10 @@ alerting:
remote_write: remote_write:
[ - <remote_write> ... ] [ - <remote_write> ... ]
# Settings related to the OTLP receiver feature.
otlp:
[ promote_resource_attributes: [<string>, ...] | default = [ ] ]
# Settings related to the remote read feature. # Settings related to the remote read feature.
remote_read: remote_read:
[ - <remote_read> ... ] [ - <remote_read> ... ]

View file

@ -65,14 +65,14 @@ type bucketBoundsData struct {
bound float64 bound float64
} }
// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds // byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds.
type byBucketBoundsData []bucketBoundsData type byBucketBoundsData []bucketBoundsData
func (m byBucketBoundsData) Len() int { return len(m) } func (m byBucketBoundsData) Len() int { return len(m) }
func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound } func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound }
func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] } func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
// ByLabelName enables the usage of sort.Sort() with a slice of labels // ByLabelName enables the usage of sort.Sort() with a slice of labels.
type ByLabelName []prompb.Label type ByLabelName []prompb.Label
func (a ByLabelName) Len() int { return len(a) } func (a ByLabelName) Len() int { return len(a) }
@ -115,14 +115,23 @@ var seps = []byte{'\xff'}
// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values. // createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values.
// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and // Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. // if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, // If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, settings Settings,
ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label { ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label {
resourceAttrs := resource.Attributes() resourceAttrs := resource.Attributes()
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName) serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID) instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID)
promotedAttrs := make([]prompb.Label, 0, len(settings.PromoteResourceAttributes))
for _, name := range settings.PromoteResourceAttributes {
if value, exists := resourceAttrs.Get(name); exists {
promotedAttrs = append(promotedAttrs, prompb.Label{Name: name, Value: value.AsString()})
}
}
sort.Stable(ByLabelName(promotedAttrs))
// Calculate the maximum possible number of labels we could return so we can preallocate l // Calculate the maximum possible number of labels we could return so we can preallocate l
maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 maxLabelCount := attributes.Len() + len(settings.ExternalLabels) + len(promotedAttrs) + len(extras)/2
if haveServiceName { if haveServiceName {
maxLabelCount++ maxLabelCount++
@ -132,9 +141,6 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
maxLabelCount++ maxLabelCount++
} }
// map ensures no duplicate label name
l := make(map[string]string, maxLabelCount)
// Ensure attributes are sorted by key for consistent merging of keys which // Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized. // collide when sanitized.
labels := make([]prompb.Label, 0, maxLabelCount) labels := make([]prompb.Label, 0, maxLabelCount)
@ -148,6 +154,8 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
}) })
sort.Stable(ByLabelName(labels)) sort.Stable(ByLabelName(labels))
// map ensures no duplicate label names.
l := make(map[string]string, maxLabelCount)
for _, label := range labels { for _, label := range labels {
var finalKey = prometheustranslator.NormalizeLabel(label.Name) var finalKey = prometheustranslator.NormalizeLabel(label.Name)
if existingValue, alreadyExists := l[finalKey]; alreadyExists { if existingValue, alreadyExists := l[finalKey]; alreadyExists {
@ -157,6 +165,13 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
} }
} }
for _, lbl := range promotedAttrs {
normalized := prometheustranslator.NormalizeLabel(lbl.Name)
if _, exists := l[normalized]; !exists {
l[normalized] = lbl.Value
}
}
// Map service.name + service.namespace to job // Map service.name + service.namespace to job
if haveServiceName { if haveServiceName {
val := serviceName.AsString() val := serviceName.AsString()
@ -169,7 +184,7 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
if haveInstanceID { if haveInstanceID {
l[model.InstanceLabel] = instance.AsString() l[model.InstanceLabel] = instance.AsString()
} }
for key, value := range externalLabels { for key, value := range settings.ExternalLabels {
// External labels have already been sanitized // External labels have already been sanitized
if _, alreadyExists := l[key]; alreadyExists { if _, alreadyExists := l[key]; alreadyExists {
// Skip external labels if they are overridden by metric attributes // Skip external labels if they are overridden by metric attributes
@ -232,7 +247,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.Histogra
for x := 0; x < dataPoints.Len(); x++ { for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x) pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp()) timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false)
// If the sum is unset, it indicates the _sum metric point should be // If the sum is unset, it indicates the _sum metric point should be
// omitted // omitted
@ -408,7 +423,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDat
for x := 0; x < dataPoints.Len(); x++ { for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x) pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp()) timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false)
// treat sum as a sample in an individual TimeSeries // treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{ sum := &prompb.Sample{
@ -554,7 +569,8 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta
name = settings.Namespace + "_" + name name = settings.Namespace + "_" + name
} }
labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name) settings.PromoteResourceAttributes = nil
labels := createAttributes(resource, attributes, settings, identifyingAttrs, false, model.MetricNameLabel, name)
haveIdentifier := false haveIdentifier := false
for _, l := range labels { for _, l := range labels {
if l.Name == model.JobLabel || l.Name == model.InstanceLabel { if l.Name == model.JobLabel || l.Name == model.InstanceLabel {

View file

@ -45,7 +45,7 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetr
lbls := createAttributes( lbls := createAttributes(
resource, resource,
pt.Attributes(), pt.Attributes(),
settings.ExternalLabels, settings,
nil, nil,
true, true,
model.MetricNameLabel, model.MetricNameLabel,

View file

@ -30,12 +30,13 @@ import (
) )
type Settings struct { type Settings struct {
Namespace string Namespace string
ExternalLabels map[string]string ExternalLabels map[string]string
DisableTargetInfo bool DisableTargetInfo bool
ExportCreatedMetric bool ExportCreatedMetric bool
AddMetricSuffixes bool AddMetricSuffixes bool
SendMetadata bool SendMetadata bool
PromoteResourceAttributes []string
} }
// PrometheusConverter converts from OTel write format to Prometheus remote write format. // PrometheusConverter converts from OTel write format to Prometheus remote write format.

View file

@ -34,7 +34,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.Number
labels := createAttributes( labels := createAttributes(
resource, resource,
pt.Attributes(), pt.Attributes(),
settings.ExternalLabels, settings,
nil, nil,
true, true,
model.MetricNameLabel, model.MetricNameLabel,
@ -64,7 +64,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDa
lbls := createAttributes( lbls := createAttributes(
resource, resource,
pt.Attributes(), pt.Attributes(),
settings.ExternalLabels, settings,
nil, nil,
true, true,
model.MetricNameLabel, model.MetricNameLabel,

View file

@ -490,21 +490,23 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
// writes them to the provided appendable. // writes them to the provided appendable.
func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler { func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler {
rwHandler := &writeHandler{ rwHandler := &writeHandler{
logger: logger, logger: logger,
appendable: appendable, appendable: appendable,
} }
return &otlpWriteHandler{ return &otlpWriteHandler{
logger: logger, logger: logger,
rwHandler: rwHandler, rwHandler: rwHandler,
configFunc: configFunc,
} }
} }
type otlpWriteHandler struct { type otlpWriteHandler struct {
logger log.Logger logger log.Logger
rwHandler *writeHandler rwHandler *writeHandler
configFunc func() config.Config
} }
func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -515,9 +517,12 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
otlpCfg := h.configFunc().OTLPConfig
converter := otlptranslator.NewPrometheusConverter() converter := otlptranslator.NewPrometheusConverter()
if err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{ if err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{
AddMetricSuffixes: true, AddMetricSuffixes: true,
PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
}); err != nil { }); err != nil {
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
} }

View file

@ -379,7 +379,11 @@ func TestOTLPWriteHandler(t *testing.T) {
req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("Content-Type", "application/x-protobuf")
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewOTLPWriteHandler(nil, appendable) handler := NewOTLPWriteHandler(nil, appendable, func() config.Config {
return config.Config{
OTLPConfig: config.DefaultOTLPConfig,
}
})
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)

View file

@ -295,7 +295,7 @@ func NewAPI(
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs) a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs)
} }
if otlpEnabled { if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap) a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc)
} }
return a return a

View file

@ -359,6 +359,7 @@ var samplePrometheusCfg = config.Config{
ScrapeConfigs: []*config.ScrapeConfig{}, ScrapeConfigs: []*config.ScrapeConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{}, RemoteWriteConfigs: []*config.RemoteWriteConfig{},
RemoteReadConfigs: []*config.RemoteReadConfig{}, RemoteReadConfigs: []*config.RemoteReadConfig{},
OTLPConfig: config.OTLPConfig{},
} }
var sampleFlagMap = map[string]string{ var sampleFlagMap = map[string]string{