This commit is contained in:
Junang Li 2024-07-21 23:01:26 -04:00
parent 1fa9ba838a
commit 47cfc39545
23 changed files with 24964 additions and 12417 deletions

1
go.mod
View file

@ -177,6 +177,7 @@ require (
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/planetscale/vtprotobuf v0.6.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect

2
go.sum
View file

@ -592,6 +592,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/planetscale/vtprotobuf v0.6.0 h1:nBeETjudeJ5ZgBHUz1fVHvbqUKnYOXNhsIEabROxmNA=
github.com/planetscale/vtprotobuf v0.6.0/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

View file

@ -23,9 +23,9 @@ import (
"strings"
"unicode/utf8"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/golang/protobuf/proto"
"github.com/prometheus/common/model"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
@ -382,7 +382,7 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
// CreatedTimestamp returns CT or nil if CT is not present or
// invalid (as timestamp e.g. negative value) on counters, summaries or histograms.
func (p *ProtobufParser) CreatedTimestamp() *int64 {
var ct *types.Timestamp
var ct *timestamppb.Timestamp
switch p.mf.GetType() {
case dto.MetricType_COUNTER:
ct = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp()
@ -392,11 +392,7 @@ func (p *ProtobufParser) CreatedTimestamp() *int64 {
ct = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp()
default:
}
ctAsTime, err := types.TimestampFromProto(ct)
if err != nil {
// Errors means ct == nil or invalid timestamp, which we silently ignore.
return nil
}
ctAsTime := ct.AsTime()
ctMilis := ctAsTime.UnixMilli()
return &ctMilis
}
@ -599,7 +595,7 @@ func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) {
return 0, fmt.Errorf("protobufparse: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b))
}
mf.Reset()
return totalLength, mf.Unmarshal(b[varIntLength:totalLength])
return totalLength, mf.UnmarshalVT(b[varIntLength:totalLength])
}
// formatOpenMetricsFloat works like the usual Go string formatting of a fleat

18
prompb/buf.gen.yaml Normal file
View file

@ -0,0 +1,18 @@
version: v1
managed:
enabled: true
go_package_prefix:
# <module_name> : name in go.mod
# <relative_path> : where generated code should be output
default: github.com/prometheus/prometheus/prompb
except:
- buf.build/googleapis/googleapis
plugins:
- plugin: buf.build/protocolbuffers/go
out: .
opt:
- paths=source_relative
- plugin: buf.build/community/planetscale-vtprotobuf:v0.6.0
out: .
opt:
- paths=source_relative

View file

@ -35,7 +35,7 @@ func (m ChunkedSeries) ToLabels(b *labels.ScratchBuilder, _ []string) labels.Lab
return labelProtosToLabels(b, m.GetLabels())
}
func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []Label) labels.Labels {
func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []*Label) labels.Labels {
b.Reset()
for _, l := range labelPairs {
b.Add(l.Name, l.Value)
@ -126,7 +126,7 @@ func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram {
}
}
func spansProtoToSpans(s []BucketSpan) []histogram.Span {
func spansProtoToSpans(s []*BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
@ -179,10 +179,10 @@ func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram
}
}
func spansToSpansProto(s []histogram.Span) []BucketSpan {
spans := make([]BucketSpan, len(s))
func spansToSpansProto(s []histogram.Span) []*BucketSpan {
spans := make([]*BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
spans[i] = &BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans

View file

@ -18,14 +18,14 @@ import (
)
func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
size := r.Size()
size := r.SizeVT()
data, ok := p.Get().(*[]byte)
if ok && cap(*data) >= size {
n, err := r.MarshalToSizedBuffer((*data)[:size])
n, err := r.MarshalToVT((*data)[:size])
if err != nil {
return nil, err
}
return (*data)[:n], nil
}
return r.Marshal()
return r.MarshalVT()
}

File diff suppressed because it is too large Load diff

View file

@ -21,7 +21,6 @@ syntax = "proto3";
package io.prometheus.client;
option go_package = "io_prometheus_client";
import "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";
message LabelPair {
@ -63,7 +62,7 @@ message Quantile {
message Summary {
uint64 sample_count = 1;
double sample_sum = 2;
repeated Quantile quantile = 3 [(gogoproto.nullable) = false];
repeated Quantile quantile = 3;
google.protobuf.Timestamp created_timestamp = 4;
}
@ -77,7 +76,7 @@ message Histogram {
double sample_count_float = 4; // Overrides sample_count if > 0.
double sample_sum = 2;
// Buckets for the classic histogram.
repeated Bucket bucket = 3 [(gogoproto.nullable) = false]; // Ordered in increasing order of upper_bound, +Inf bucket is optional.
repeated Bucket bucket = 3; // Ordered in increasing order of upper_bound, +Inf bucket is optional.
google.protobuf.Timestamp created_timestamp = 15;
@ -95,7 +94,7 @@ message Histogram {
double zero_count_float = 8; // Overrides sb_zero_count if > 0.
// Negative buckets for the native histogram.
repeated BucketSpan negative_span = 9 [(gogoproto.nullable) = false];
repeated BucketSpan negative_span = 9;
// Use either "negative_delta" or "negative_count", the former for
// regular histograms with integer counts, the latter for float
// histograms.
@ -106,7 +105,7 @@ message Histogram {
// Use a no-op span (offset 0, length 0) for a native histogram without any
// observations yet and with a zero_threshold of 0. Otherwise, it would be
// indistinguishable from a classic histogram.
repeated BucketSpan positive_span = 12 [(gogoproto.nullable) = false];
repeated BucketSpan positive_span = 12;
// Use either "positive_delta" or "positive_count", the former for
// regular histograms with integer counts, the latter for float
// histograms.
@ -136,13 +135,13 @@ message BucketSpan {
}
message Exemplar {
repeated LabelPair label = 1 [(gogoproto.nullable) = false];
repeated LabelPair label = 1;
double value = 2;
google.protobuf.Timestamp timestamp = 3; // OpenMetrics-style.
}
message Metric {
repeated LabelPair label = 1 [(gogoproto.nullable) = false];
repeated LabelPair label = 1;
Gauge gauge = 2;
Counter counter = 3;
Summary summary = 4;
@ -155,6 +154,6 @@ message MetricFamily {
string name = 1;
string help = 2;
MetricType type = 3;
repeated Metric metric = 4 [(gogoproto.nullable) = false];
repeated Metric metric = 4;
string unit = 5;
}

File diff suppressed because it is too large Load diff

View file

@ -140,7 +140,7 @@ func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram {
}
}
func spansProtoToSpans(s []BucketSpan) []histogram.Span {
func spansProtoToSpans(s []*BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
@ -195,10 +195,10 @@ func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram
}
}
func spansToSpansProto(s []histogram.Span) []BucketSpan {
spans := make([]BucketSpan, len(s))
func spansToSpansProto(s []histogram.Span) []*BucketSpan {
spans := make([]*BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
spans[i] = &BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans

View file

@ -14,6 +14,7 @@
package writev2
import (
math_bits "math/bits"
"slices"
)
@ -21,7 +22,7 @@ func (m Sample) T() int64 { return m.Timestamp }
func (m Sample) V() float64 { return m.Value }
func (m *Request) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.Size()
siz := m.SizeVT()
if cap(dst) < siz {
dst = make([]byte, siz)
}
@ -39,10 +40,7 @@ func (m *Request) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
// Removed XXX_unrecognized handling
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
@ -73,20 +71,13 @@ func (m *Request) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
// but marshals m.LabelsRefs in place without extra allocations.
func (m *TimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.CreatedTimestamp != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.CreatedTimestamp))
i--
dAtA[i] = 0x30
}
{
size, err := m.Metadata.MarshalToSizedBuffer(dAtA[:i])
size, err := m.Metadata.MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
@ -98,7 +89,7 @@ func (m *TimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
if len(m.Histograms) > 0 {
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Histograms[iNdEx].MarshalToSizedBuffer(dAtA[:i])
size, err := m.Histograms[iNdEx].MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
@ -112,7 +103,7 @@ func (m *TimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
if len(m.Exemplars) > 0 {
for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Exemplars[iNdEx].MarshalToSizedBuffer(dAtA[:i])
size, err := m.Exemplars[iNdEx].MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
@ -126,7 +117,7 @@ func (m *TimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
if len(m.Samples) > 0 {
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i])
size, err := m.Samples[iNdEx].MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
@ -139,8 +130,6 @@ func (m *TimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
}
if len(m.LabelsRefs) > 0 {
// This is the trick: encode the varints in reverse order to make it easier
// to do it in place. Then reverse the whole thing.
var j10 int
start := i
for _, num := range m.LabelsRefs {
@ -155,11 +144,25 @@ func (m *TimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
j10++
}
slices.Reverse(dAtA[i:start])
// --- end of trick
i = encodeVarintTypes(dAtA, i, uint64(j10))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintTypes(dAtA []byte, offset int, v uint64) int {
offset -= sovTypes(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func sovTypes(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}

File diff suppressed because it is too large Load diff

View file

@ -18,8 +18,6 @@ package io.prometheus.write.v2;
option go_package = "writev2";
import "gogoproto/gogo.proto";
// Request represents a request to write the given timeseries to a remote destination.
// This message was introduced in the Remote Write 2.0 specification:
// https://prometheus.io/docs/concepts/remote_write_spec_2_0/
@ -48,7 +46,7 @@ message Request {
// strings is up to the sender. The receiver should not assume any particular encoding.
repeated string symbols = 4;
// timeseries represents an array of distinct series with 0 or more samples.
repeated TimeSeries timeseries = 5 [(gogoproto.nullable) = false];
repeated TimeSeries timeseries = 5;
}
// TimeSeries represents a single series.
@ -67,14 +65,14 @@ message TimeSeries {
// streaming), in healthy cases, there will be only one sample or histogram.
//
// Samples and histograms are sorted by timestamp (older first).
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Histogram histograms = 3 [(gogoproto.nullable) = false];
repeated Sample samples = 2;
repeated Histogram histograms = 3;
// exemplars represents an optional set of exemplars attached to this series' samples.
repeated Exemplar exemplars = 4 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 4;
// metadata represents the metadata associated with the given series' samples.
Metadata metadata = 5 [(gogoproto.nullable) = false];
Metadata metadata = 5;
// created_timestamp represents an optional created timestamp associated with
// this series' samples in ms format, typically for counter or histogram type
@ -192,7 +190,7 @@ message Histogram {
}
// Negative Buckets.
repeated BucketSpan negative_spans = 8 [(gogoproto.nullable) = false];
repeated BucketSpan negative_spans = 8;
// Use either "negative_deltas" or "negative_counts", the former for
// regular histograms with integer counts, the latter for
// float histograms.
@ -205,7 +203,7 @@ message Histogram {
// * The span offset+length points to an the index of the custom_values array
// or +Inf if pointing to the len of the array.
// * The counts and deltas have the same meaning as for exponential histograms.
repeated BucketSpan positive_spans = 11 [(gogoproto.nullable) = false];
repeated BucketSpan positive_spans = 11;
// Use either "positive_deltas" or "positive_counts", the former for
// regular histograms with integer counts, the latter for
// float histograms.

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -17,14 +17,13 @@ package prometheus;
option go_package = "prompb";
import "types.proto";
import "gogoproto/gogo.proto";
message WriteRequest {
repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
repeated prometheus.TimeSeries timeseries = 1;
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;
repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false];
repeated prometheus.MetricMetadata metadata = 3;
}
// ReadRequest represents a remote read request.

2601
prompb/remote_vtproto.pb.go Normal file

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -16,7 +16,6 @@ package prometheus;
option go_package = "prompb";
import "gogoproto/gogo.proto";
message MetricMetadata {
enum MetricType {
@ -47,7 +46,7 @@ message Sample {
message Exemplar {
// Optional, can be empty.
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Label labels = 1;
double value = 2;
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
@ -88,7 +87,7 @@ message Histogram {
}
// Negative Buckets.
repeated BucketSpan negative_spans = 8 [(gogoproto.nullable) = false];
repeated BucketSpan negative_spans = 8;
// Use either "negative_deltas" or "negative_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
@ -96,7 +95,7 @@ message Histogram {
repeated double negative_counts = 10; // Absolute count of each bucket.
// Positive Buckets.
repeated BucketSpan positive_spans = 11 [(gogoproto.nullable) = false];
repeated BucketSpan positive_spans = 11;
// Use either "positive_deltas" or "positive_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
@ -123,10 +122,10 @@ message BucketSpan {
message TimeSeries {
// For a timeseries to be valid, and for the samples and exemplars
// to be ingested by the remote system properly, the labels field is required.
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
repeated Label labels = 1;
repeated Sample samples = 2;
repeated Exemplar exemplars = 3;
repeated Histogram histograms = 4;
}
message Label {
@ -135,7 +134,7 @@ message Label {
}
message Labels {
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Label labels = 1;
}
// Matcher specifies a rule, which can match or set of labels or not.
@ -181,7 +180,7 @@ message Chunk {
// ChunkedSeries represents single, encoded time series.
message ChunkedSeries {
// Labels should be sorted.
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Label labels = 1;
// Chunks will be in start time order and may overlap.
repeated Chunk chunks = 2 [(gogoproto.nullable) = false];
repeated Chunk chunks = 2;
}

6972
prompb/types_vtproto.pb.go Normal file

File diff suppressed because it is too large Load diff

View file

@ -73,7 +73,7 @@ func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound
func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
// ByLabelName enables the usage of sort.Sort() with a slice of labels.
type ByLabelName []prompb.Label
type ByLabelName []*prompb.Label
func (a ByLabelName) Len() int { return len(a) }
func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name }
@ -83,7 +83,7 @@ func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// The label slice should not contain duplicate label names; this method sorts the slice by label name before creating
// the signature.
// The algorithm is the same as in Prometheus' labels.StableHash function.
func timeSeriesSignature(labels []prompb.Label) uint64 {
func timeSeriesSignature(labels []*prompb.Label) uint64 {
sort.Sort(ByLabelName(labels))
// Use xxhash.Sum64(b) for fast path as it's faster.
@ -117,15 +117,15 @@ var seps = []byte{'\xff'}
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
// If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, settings Settings,
ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label {
ignoreAttrs []string, logOnOverwrite bool, extras ...string) []*prompb.Label {
resourceAttrs := resource.Attributes()
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID)
promotedAttrs := make([]prompb.Label, 0, len(settings.PromoteResourceAttributes))
promotedAttrs := make([]*prompb.Label, 0, len(settings.PromoteResourceAttributes))
for _, name := range settings.PromoteResourceAttributes {
if value, exists := resourceAttrs.Get(name); exists {
promotedAttrs = append(promotedAttrs, prompb.Label{Name: name, Value: value.AsString()})
promotedAttrs = append(promotedAttrs, &prompb.Label{Name: name, Value: value.AsString()})
}
}
sort.Stable(ByLabelName(promotedAttrs))
@ -143,12 +143,12 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, setting
// Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized.
labels := make([]prompb.Label, 0, maxLabelCount)
labels := make([]*prompb.Label, 0, maxLabelCount)
// XXX: Should we always drop service namespace/service name/service instance ID from the labels
// (as they get mapped to other Prometheus labels)?
attributes.Range(func(key string, value pcommon.Value) bool {
if !slices.Contains(ignoreAttrs, key) {
labels = append(labels, prompb.Label{Name: key, Value: value.AsString()})
labels = append(labels, &prompb.Label{Name: key, Value: value.AsString()})
}
return true
})
@ -212,7 +212,7 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, setting
labels = labels[:0]
for k, v := range l {
labels = append(labels, prompb.Label{Name: k, Value: v})
labels = append(labels, &prompb.Label{Name: k, Value: v})
}
return labels
@ -328,20 +328,20 @@ type exemplarType interface {
Exemplars() pmetric.ExemplarSlice
}
func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
promExemplars := make([]prompb.Exemplar, 0, pt.Exemplars().Len())
func getPromExemplars[T exemplarType](pt T) []*prompb.Exemplar {
promExemplars := make([]*prompb.Exemplar, 0, pt.Exemplars().Len())
for i := 0; i < pt.Exemplars().Len(); i++ {
exemplar := pt.Exemplars().At(i)
exemplarRunes := 0
promExemplar := prompb.Exemplar{
promExemplar := &prompb.Exemplar{
Value: exemplar.DoubleValue(),
Timestamp: timestamp.FromTime(exemplar.Timestamp().AsTime()),
}
if traceID := exemplar.TraceID(); !traceID.IsEmpty() {
val := hex.EncodeToString(traceID[:])
exemplarRunes += utf8.RuneCountInString(traceIDKey) + utf8.RuneCountInString(val)
promLabel := prompb.Label{
promLabel := &prompb.Label{
Name: traceIDKey,
Value: val,
}
@ -350,7 +350,7 @@ func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
if spanID := exemplar.SpanID(); !spanID.IsEmpty() {
val := hex.EncodeToString(spanID[:])
exemplarRunes += utf8.RuneCountInString(spanIDKey) + utf8.RuneCountInString(val)
promLabel := prompb.Label{
promLabel := &prompb.Label{
Name: spanIDKey,
Value: val,
}
@ -358,11 +358,11 @@ func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
}
attrs := exemplar.FilteredAttributes()
labelsFromAttributes := make([]prompb.Label, 0, attrs.Len())
labelsFromAttributes := make([]*prompb.Label, 0, attrs.Len())
attrs.Range(func(key string, value pcommon.Value) bool {
val := value.AsString()
exemplarRunes += utf8.RuneCountInString(key) + utf8.RuneCountInString(val)
promLabel := prompb.Label{
promLabel := &prompb.Label{
Name: key,
Value: val,
}
@ -474,24 +474,24 @@ func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDat
// createLabels returns a copy of baseLabels, adding to it the pair model.MetricNameLabel=name.
// If extras are provided, corresponding label pairs are also added to the returned slice.
// If extras is uneven length, the last (unpaired) extra will be ignored.
func createLabels(name string, baseLabels []prompb.Label, extras ...string) []prompb.Label {
func createLabels(name string, baseLabels []*prompb.Label, extras ...string) []*prompb.Label {
extraLabelCount := len(extras) / 2
labels := make([]prompb.Label, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name
labels := make([]*prompb.Label, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name
copy(labels, baseLabels)
n := len(extras)
n -= n % 2
for extrasIdx := 0; extrasIdx < n; extrasIdx += 2 {
labels = append(labels, prompb.Label{Name: extras[extrasIdx], Value: extras[extrasIdx+1]})
labels = append(labels, &prompb.Label{Name: extras[extrasIdx], Value: extras[extrasIdx+1]})
}
labels = append(labels, prompb.Label{Name: model.MetricNameLabel, Value: name})
labels = append(labels, &prompb.Label{Name: model.MetricNameLabel, Value: name})
return labels
}
// getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false.
// Otherwise it creates a new one and returns that, and true.
func (c *PrometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) {
func (c *PrometheusConverter) getOrCreateTimeSeries(lbls []*prompb.Label) (*prompb.TimeSeries, bool) {
h := timeSeriesSignature(lbls)
ts := c.unique[h]
if ts != nil {
@ -527,10 +527,10 @@ func (c *PrometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*promp
// addTimeSeriesIfNeeded adds a corresponding time series if it doesn't already exist.
// If the time series doesn't already exist, it gets added with startTimestamp for its value and timestamp for its timestamp,
// both converted to milliseconds.
func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) {
func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []*prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) {
ts, created := c.getOrCreateTimeSeries(lbls)
if created {
ts.Samples = []prompb.Sample{
ts.Samples = []*prompb.Sample{
{
// convert ns to ms
Value: float64(convertTimeStamp(startTimestamp)),

View file

@ -63,10 +63,10 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetr
// exponentialToNativeHistogram translates OTel Exponential Histogram data point
// to Prometheus Native Histogram.
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, error) {
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (*prompb.Histogram, error) {
scale := p.Scale()
if scale < -4 {
return prompb.Histogram{},
return &prompb.Histogram{},
fmt.Errorf("cannot convert exponential to native histogram."+
" Scale must be >= -4, was %d", scale)
}
@ -80,7 +80,7 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom
pSpans, pDeltas := convertBucketsLayout(p.Positive(), scaleDown)
nSpans, nDeltas := convertBucketsLayout(p.Negative(), scaleDown)
h := prompb.Histogram{
h := &prompb.Histogram{
// The counter reset detection must be compatible with Prometheus to
// safely set ResetHint to NO. This is not ensured currently.
// Sending a sample that triggers counter reset but with ResetHint==NO
@ -128,14 +128,14 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom
// to the range (base 1].
//
// scaleDown is the factor by which the buckets are scaled down. In other words 2^scaleDown buckets will be merged into one.
func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, scaleDown int32) ([]prompb.BucketSpan, []int64) {
func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, scaleDown int32) ([]*prompb.BucketSpan, []int64) {
bucketCounts := buckets.BucketCounts()
if bucketCounts.Len() == 0 {
return nil, nil
}
var (
spans []prompb.BucketSpan
spans []*prompb.BucketSpan
deltas []int64
count int64
prevCount int64
@ -153,7 +153,7 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets,
// The offset is scaled and adjusted by 1 as described above.
bucketIdx := buckets.Offset()>>scaleDown + 1
spans = append(spans, prompb.BucketSpan{
spans = append(spans, &prompb.BucketSpan{
Offset: bucketIdx,
Length: 0,
})
@ -175,7 +175,7 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets,
// We have to create a new span, because we have found a gap
// of more than two buckets. The constant 2 is copied from the logic in
// https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296
spans = append(spans, prompb.BucketSpan{
spans = append(spans, &prompb.BucketSpan{
Offset: gap,
Length: 0,
})
@ -196,7 +196,7 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets,
// We have to create a new span, because we have found a gap
// of more than two buckets. The constant 2 is copied from the logic in
// https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296
spans = append(spans, prompb.BucketSpan{
spans = append(spans, &prompb.BucketSpan{
Offset: gap,
Length: 0,
})

View file

@ -131,7 +131,7 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings)
return
}
func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool {
func isSameMetric(ts *prompb.TimeSeries, lbls []*prompb.Label) bool {
if len(ts.Labels) != len(lbls) {
return false
}
@ -170,13 +170,13 @@ func (c *PrometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint,
// If there is no corresponding TimeSeries already, it's created.
// The corresponding TimeSeries is returned.
// If either lbls is nil/empty or sample is nil, nothing is done.
func (c *PrometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries {
func (c *PrometheusConverter) addSample(sample *prompb.Sample, lbls []*prompb.Label) *prompb.TimeSeries {
if sample == nil || len(lbls) == 0 {
// This shouldn't happen
return nil
}
ts, _ := c.getOrCreateTimeSeries(lbls)
ts.Samples = append(ts.Samples, *sample)
ts.Samples = append(ts.Samples, sample)
return ts
}