mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-12 16:44:05 -08:00
working version
Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
parent
842d221814
commit
a2fdac1600
|
@ -108,7 +108,7 @@ func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]s
|
|||
return false
|
||||
}
|
||||
|
||||
raw, err := metricsData.Marshal()
|
||||
raw, err := metricsData.MarshalVT()
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, " FAILED:", err)
|
||||
return false
|
||||
|
|
|
@ -45,7 +45,7 @@ type FloatHistogram struct {
|
|||
// Sum of observations. This is also used as the stale marker.
|
||||
Sum float64
|
||||
// Spans for positive and negative buckets (see Span below).
|
||||
PositiveSpans, NegativeSpans []Span
|
||||
PositiveSpans, NegativeSpans []*Span
|
||||
// Observation counts in buckets. Each represents an absolute count and
|
||||
// must be zero or positive.
|
||||
PositiveBuckets, NegativeBuckets []float64
|
||||
|
@ -56,11 +56,11 @@ func (h *FloatHistogram) Copy() *FloatHistogram {
|
|||
c := *h
|
||||
|
||||
if h.PositiveSpans != nil {
|
||||
c.PositiveSpans = make([]Span, len(h.PositiveSpans))
|
||||
c.PositiveSpans = make([]*Span, len(h.PositiveSpans))
|
||||
copy(c.PositiveSpans, h.PositiveSpans)
|
||||
}
|
||||
if h.NegativeSpans != nil {
|
||||
c.NegativeSpans = make([]Span, len(h.NegativeSpans))
|
||||
c.NegativeSpans = make([]*Span, len(h.NegativeSpans))
|
||||
copy(c.NegativeSpans, h.NegativeSpans)
|
||||
}
|
||||
if h.PositiveBuckets != nil {
|
||||
|
@ -156,7 +156,7 @@ func (h *FloatHistogram) TestExpression() string {
|
|||
res = append(res, fmt.Sprintf("z_bucket_w:%g", m.ZeroThreshold))
|
||||
}
|
||||
|
||||
addBuckets := func(kind, bucketsKey, offsetKey string, buckets []float64, spans []Span) []string {
|
||||
addBuckets := func(kind, bucketsKey, offsetKey string, buckets []float64, spans []*Span) []string {
|
||||
if len(spans) > 1 {
|
||||
panic(fmt.Sprintf("histogram with multiple %s spans not supported", kind))
|
||||
}
|
||||
|
@ -780,7 +780,7 @@ func (h *FloatHistogram) floatBucketIterator(
|
|||
|
||||
// reverseFloatBucketIterator is a low-level constructor for reverse bucket iterators.
|
||||
func newReverseFloatBucketIterator(
|
||||
spans []Span, buckets []float64, schema int32, positive bool,
|
||||
spans []*Span, buckets []float64, schema int32, positive bool,
|
||||
) reverseFloatBucketIterator {
|
||||
r := reverseFloatBucketIterator{
|
||||
baseBucketIterator: baseBucketIterator[float64, float64]{
|
||||
|
@ -996,9 +996,9 @@ func targetIdx(idx, originSchema, targetSchema int32) int32 {
|
|||
// If negative is true, the buckets in spansB/bucketsB are subtracted rather than added.
|
||||
func addBuckets(
|
||||
schema int32, threshold float64, negative bool,
|
||||
spansA []Span, bucketsA []float64,
|
||||
spansB []Span, bucketsB []float64,
|
||||
) ([]Span, []float64) {
|
||||
spansA []*Span, bucketsA []float64,
|
||||
spansB []*Span, bucketsB []float64,
|
||||
) ([]*Span, []float64) {
|
||||
var (
|
||||
iSpan int = -1
|
||||
iBucket int = -1
|
||||
|
@ -1035,9 +1035,9 @@ func addBuckets(
|
|||
spansA[0].Offset--
|
||||
goto nextLoop
|
||||
} else {
|
||||
spansA = append(spansA, Span{})
|
||||
spansA = append(spansA, &Span{})
|
||||
copy(spansA[1:], spansA)
|
||||
spansA[0] = Span{Offset: indexB, Length: 1}
|
||||
spansA[0] = &Span{Offset: indexB, Length: 1}
|
||||
if len(spansA) > 1 {
|
||||
// Convert the absolute offset in the formerly
|
||||
// first span to a relative offset.
|
||||
|
@ -1094,9 +1094,9 @@ func addBuckets(
|
|||
if iSpan < len(spansA) {
|
||||
spansA[iSpan].Offset -= deltaIndex + 1
|
||||
}
|
||||
spansA = append(spansA, Span{})
|
||||
spansA = append(spansA, &Span{})
|
||||
copy(spansA[iSpan+1:], spansA[iSpan:])
|
||||
spansA[iSpan] = Span{Length: 1, Offset: deltaIndex}
|
||||
spansA[iSpan] = &Span{Length: 1, Offset: deltaIndex}
|
||||
goto nextLoop
|
||||
}
|
||||
} else {
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -104,7 +104,7 @@ type BucketIterator[BC BucketCount] interface {
|
|||
// code replication.
|
||||
type baseBucketIterator[BC BucketCount, IBC InternalBucketCount] struct {
|
||||
schema int32
|
||||
spans []Span
|
||||
spans []*Span
|
||||
buckets []IBC
|
||||
|
||||
positive bool // Whether this is for positive buckets.
|
||||
|
@ -150,7 +150,7 @@ func (b *baseBucketIterator[BC, IBC]) strippedAt() strippedBucket[BC] {
|
|||
// compactBuckets is a generic function used by both Histogram.Compact and
|
||||
// FloatHistogram.Compact. Set deltaBuckets to true if the provided buckets are
|
||||
// deltas. Set it to false if the buckets contain absolute counts.
|
||||
func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmptyBuckets int, deltaBuckets bool) ([]IBC, []Span) {
|
||||
func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []*Span, maxEmptyBuckets int, deltaBuckets bool) ([]IBC, []*Span) {
|
||||
// Fast path: If there are no empty buckets AND no offset in any span is
|
||||
// <= maxEmptyBuckets AND no span has length 0, there is nothing to do and we can return
|
||||
// immediately. We check that first because it's cheap and presumably
|
||||
|
@ -276,7 +276,7 @@ func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmp
|
|||
}
|
||||
// It's in the middle or in the end of the span.
|
||||
// Split the current span.
|
||||
newSpan := Span{
|
||||
newSpan := &Span{
|
||||
Offset: int32(nEmpty),
|
||||
Length: spans[iSpan].Length - posInSpan - uint32(nEmpty),
|
||||
}
|
||||
|
@ -294,7 +294,7 @@ func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmp
|
|||
continue
|
||||
}
|
||||
// Insert the new span.
|
||||
spans = append(spans, Span{})
|
||||
spans = append(spans, &Span{})
|
||||
if iSpan+1 < len(spans) {
|
||||
copy(spans[iSpan+1:], spans[iSpan:])
|
||||
}
|
||||
|
@ -356,7 +356,7 @@ func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmp
|
|||
return buckets, spans
|
||||
}
|
||||
|
||||
func checkHistogramSpans(spans []Span, numBuckets int) error {
|
||||
func checkHistogramSpans(spans []*Span, numBuckets int) error {
|
||||
var spanBuckets int
|
||||
for n, span := range spans {
|
||||
if n > 0 && span.Offset < 0 {
|
||||
|
@ -608,21 +608,21 @@ var exponentialBounds = [][]float64{
|
|||
// Set inplace to true to reuse input slices and avoid allocations (otherwise
|
||||
// new slices will be allocated for result).
|
||||
func reduceResolution[IBC InternalBucketCount](
|
||||
originSpans []Span,
|
||||
originSpans []*Span,
|
||||
originBuckets []IBC,
|
||||
originSchema,
|
||||
targetSchema int32,
|
||||
deltaBuckets bool,
|
||||
inplace bool,
|
||||
) ([]Span, []IBC) {
|
||||
) ([]*Span, []IBC) {
|
||||
var (
|
||||
targetSpans []Span // The spans in the target schema.
|
||||
targetBuckets []IBC // The bucket counts in the target schema.
|
||||
bucketIdx int32 // The index of bucket in the origin schema.
|
||||
bucketCountIdx int // The position of a bucket in origin bucket count slice `originBuckets`.
|
||||
targetBucketIdx int32 // The index of bucket in the target schema.
|
||||
lastBucketCount IBC // The last visited bucket's count in the origin schema.
|
||||
lastTargetBucketIdx int32 // The index of the last added target bucket.
|
||||
targetSpans []*Span // The spans in the target schema.
|
||||
targetBuckets []IBC // The bucket counts in the target schema.
|
||||
bucketIdx int32 // The index of bucket in the origin schema.
|
||||
bucketCountIdx int // The position of a bucket in origin bucket count slice `originBuckets`.
|
||||
targetBucketIdx int32 // The index of bucket in the target schema.
|
||||
lastBucketCount IBC // The last visited bucket's count in the origin schema.
|
||||
lastTargetBucketIdx int32 // The index of the last added target bucket.
|
||||
lastTargetBucketCount IBC
|
||||
)
|
||||
|
||||
|
@ -643,7 +643,7 @@ func reduceResolution[IBC InternalBucketCount](
|
|||
switch {
|
||||
case len(targetSpans) == 0:
|
||||
// This is the first span in the targetSpans.
|
||||
span := Span{
|
||||
span := &Span{
|
||||
Offset: targetBucketIdx,
|
||||
Length: 1,
|
||||
}
|
||||
|
@ -681,7 +681,7 @@ func reduceResolution[IBC InternalBucketCount](
|
|||
// The current bucket has to go into a new target bucket,
|
||||
// and that bucket is separated by a gap from the previous target bucket,
|
||||
// so we need to add a new target span.
|
||||
span := Span{
|
||||
span := &Span{
|
||||
Offset: targetBucketIdx - lastTargetBucketIdx - 1,
|
||||
Length: 1,
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ type Histogram struct {
|
|||
// Sum of observations. This is also used as the stale marker.
|
||||
Sum float64
|
||||
// Spans for positive and negative buckets (see Span below).
|
||||
PositiveSpans, NegativeSpans []Span
|
||||
PositiveSpans, NegativeSpans []*Span
|
||||
// Observation counts in buckets. The first element is an absolute
|
||||
// count. All following ones are deltas relative to the previous
|
||||
// element.
|
||||
|
@ -86,11 +86,11 @@ func (h *Histogram) Copy() *Histogram {
|
|||
c := *h
|
||||
|
||||
if len(h.PositiveSpans) != 0 {
|
||||
c.PositiveSpans = make([]Span, len(h.PositiveSpans))
|
||||
c.PositiveSpans = make([]*Span, len(h.PositiveSpans))
|
||||
copy(c.PositiveSpans, h.PositiveSpans)
|
||||
}
|
||||
if len(h.NegativeSpans) != 0 {
|
||||
c.NegativeSpans = make([]Span, len(h.NegativeSpans))
|
||||
c.NegativeSpans = make([]*Span, len(h.NegativeSpans))
|
||||
copy(c.NegativeSpans, h.NegativeSpans)
|
||||
}
|
||||
if len(h.PositiveBuckets) != 0 {
|
||||
|
@ -208,7 +208,7 @@ func (h *Histogram) Equals(h2 *Histogram) bool {
|
|||
|
||||
// spansMatch returns true if both spans represent the same bucket layout
|
||||
// after combining zero length spans with the next non-zero length span.
|
||||
func spansMatch(s1, s2 []Span) bool {
|
||||
func spansMatch(s1, s2 []*Span) bool {
|
||||
if len(s1) == 0 && len(s2) == 0 {
|
||||
return true
|
||||
}
|
||||
|
@ -261,7 +261,7 @@ func spansMatch(s1, s2 []Span) bool {
|
|||
}
|
||||
}
|
||||
|
||||
func allEmptySpans(s []Span) bool {
|
||||
func allEmptySpans(s []*Span) bool {
|
||||
for _, ss := range s {
|
||||
if ss.Length > 0 {
|
||||
return false
|
||||
|
@ -286,15 +286,15 @@ func (h *Histogram) Compact(maxEmptyBuckets int) *Histogram {
|
|||
// deep copy (e.g. spans are not shared).
|
||||
func (h *Histogram) ToFloat() *FloatHistogram {
|
||||
var (
|
||||
positiveSpans, negativeSpans []Span
|
||||
positiveSpans, negativeSpans []*Span
|
||||
positiveBuckets, negativeBuckets []float64
|
||||
)
|
||||
if len(h.PositiveSpans) != 0 {
|
||||
positiveSpans = make([]Span, len(h.PositiveSpans))
|
||||
positiveSpans = make([]*Span, len(h.PositiveSpans))
|
||||
copy(positiveSpans, h.PositiveSpans)
|
||||
}
|
||||
if len(h.NegativeSpans) != 0 {
|
||||
negativeSpans = make([]Span, len(h.NegativeSpans))
|
||||
negativeSpans = make([]*Span, len(h.NegativeSpans))
|
||||
copy(negativeSpans, h.NegativeSpans)
|
||||
}
|
||||
if len(h.PositiveBuckets) != 0 {
|
||||
|
@ -370,7 +370,7 @@ type regularBucketIterator struct {
|
|||
baseBucketIterator[uint64, int64]
|
||||
}
|
||||
|
||||
func newRegularBucketIterator(spans []Span, buckets []int64, schema int32, positive bool) regularBucketIterator {
|
||||
func newRegularBucketIterator(spans []*Span, buckets []int64, schema int32, positive bool) regularBucketIterator {
|
||||
i := baseBucketIterator[uint64, int64]{
|
||||
schema: schema,
|
||||
spans: spans,
|
||||
|
|
|
@ -29,14 +29,14 @@ func GenerateBigTestHistograms(numHistograms, numBuckets int) []*Histogram {
|
|||
ZeroThreshold: 1e-128,
|
||||
Sum: 18.4 * float64(i+1),
|
||||
Schema: 2,
|
||||
NegativeSpans: make([]Span, numSpans),
|
||||
PositiveSpans: make([]Span, numSpans),
|
||||
NegativeSpans: make([]*Span, numSpans),
|
||||
PositiveSpans: make([]*Span, numSpans),
|
||||
NegativeBuckets: make([]int64, bucketsPerSide),
|
||||
PositiveBuckets: make([]int64, bucketsPerSide),
|
||||
}
|
||||
|
||||
for j := 0; j < numSpans; j++ {
|
||||
s := Span{Offset: 1, Length: spanLength}
|
||||
s := &Span{Offset: 1, Length: spanLength}
|
||||
h.NegativeSpans[j] = s
|
||||
h.PositiveSpans[j] = s
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
|
||||
dto "github.com/prometheus/prometheus/prompb/io/prometheus/client"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
// ProtobufParser is a very inefficient way of unmarshaling the old Prometheus
|
||||
|
@ -187,9 +187,9 @@ func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *his
|
|||
ZeroThreshold: h.GetZeroThreshold(),
|
||||
ZeroCount: h.GetZeroCountFloat(),
|
||||
Schema: h.GetSchema(),
|
||||
PositiveSpans: make([]histogram.Span, len(h.GetPositiveSpan())),
|
||||
PositiveSpans: make([]*histogram.Span, len(h.GetPositiveSpan())),
|
||||
PositiveBuckets: h.GetPositiveCount(),
|
||||
NegativeSpans: make([]histogram.Span, len(h.GetNegativeSpan())),
|
||||
NegativeSpans: make([]*histogram.Span, len(h.GetNegativeSpan())),
|
||||
NegativeBuckets: h.GetNegativeCount(),
|
||||
}
|
||||
for i, span := range h.GetPositiveSpan() {
|
||||
|
@ -219,9 +219,9 @@ func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *his
|
|||
ZeroThreshold: h.GetZeroThreshold(),
|
||||
ZeroCount: h.GetZeroCount(),
|
||||
Schema: h.GetSchema(),
|
||||
PositiveSpans: make([]histogram.Span, len(h.GetPositiveSpan())),
|
||||
PositiveSpans: make([]*histogram.Span, len(h.GetPositiveSpan())),
|
||||
PositiveBuckets: h.GetPositiveDelta(),
|
||||
NegativeSpans: make([]histogram.Span, len(h.GetNegativeSpan())),
|
||||
NegativeSpans: make([]*histogram.Span, len(h.GetNegativeSpan())),
|
||||
NegativeBuckets: h.GetNegativeDelta(),
|
||||
}
|
||||
for i, span := range h.GetPositiveSpan() {
|
||||
|
@ -363,12 +363,13 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
|
|||
func (p *ProtobufParser) CreatedTimestamp(ct *types.Timestamp) bool {
|
||||
var foundCT *types.Timestamp
|
||||
switch p.mf.GetType() {
|
||||
case dto.MetricType_COUNTER:
|
||||
foundCT = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp()
|
||||
case dto.MetricType_SUMMARY:
|
||||
foundCT = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp()
|
||||
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
|
||||
foundCT = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp()
|
||||
// TODO: fix, what to do about timestamp types?
|
||||
//case dto.MetricType_COUNTER:
|
||||
// foundCT = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp()
|
||||
//case dto.MetricType_SUMMARY:
|
||||
// foundCT = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp()
|
||||
//case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
|
||||
// foundCT = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp()
|
||||
default:
|
||||
}
|
||||
if foundCT == nil {
|
||||
|
@ -566,7 +567,7 @@ func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) {
|
|||
return 0, fmt.Errorf("protobufparse: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b))
|
||||
}
|
||||
mf.Reset()
|
||||
return totalLength, mf.Unmarshal(b[varIntLength:totalLength])
|
||||
return totalLength, proto.Unmarshal(b[varIntLength:totalLength], mf)
|
||||
}
|
||||
|
||||
// formatOpenMetricsFloat works like the usual Go string formatting of a fleat
|
||||
|
|
|
@ -26,14 +26,14 @@ func (h Histogram) IsFloatHistogram() bool {
|
|||
}
|
||||
|
||||
func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
|
||||
size := r.Size()
|
||||
size := r.SizeVT()
|
||||
data, ok := p.Get().(*[]byte)
|
||||
if ok && cap(*data) >= size {
|
||||
n, err := r.MarshalToSizedBuffer((*data)[:size])
|
||||
n, err := r.MarshalToVT((*data)[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return (*data)[:n], nil
|
||||
}
|
||||
return r.Marshal()
|
||||
return r.MarshalVT()
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -1,156 +0,0 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// This is copied and lightly edited from
|
||||
// github.com/prometheus/client_model/io/prometheus/client/metrics.proto
|
||||
// and finally converted to proto3 syntax to make it usable for the
|
||||
// gogo-protobuf approach taken within prometheus/prometheus.
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package io.prometheus.client;
|
||||
option go_package = "io_prometheus_client";
|
||||
|
||||
import "gogoproto/gogo.proto";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
message LabelPair {
|
||||
string name = 1;
|
||||
string value = 2;
|
||||
}
|
||||
|
||||
enum MetricType {
|
||||
// COUNTER must use the Metric field "counter".
|
||||
COUNTER = 0;
|
||||
// GAUGE must use the Metric field "gauge".
|
||||
GAUGE = 1;
|
||||
// SUMMARY must use the Metric field "summary".
|
||||
SUMMARY = 2;
|
||||
// UNTYPED must use the Metric field "untyped".
|
||||
UNTYPED = 3;
|
||||
// HISTOGRAM must use the Metric field "histogram".
|
||||
HISTOGRAM = 4;
|
||||
// GAUGE_HISTOGRAM must use the Metric field "histogram".
|
||||
GAUGE_HISTOGRAM = 5;
|
||||
}
|
||||
|
||||
message Gauge {
|
||||
double value = 1;
|
||||
}
|
||||
|
||||
message Counter {
|
||||
double value = 1;
|
||||
Exemplar exemplar = 2;
|
||||
|
||||
google.protobuf.Timestamp created_timestamp = 3;
|
||||
}
|
||||
|
||||
message Quantile {
|
||||
double quantile = 1;
|
||||
double value = 2;
|
||||
}
|
||||
|
||||
message Summary {
|
||||
uint64 sample_count = 1;
|
||||
double sample_sum = 2;
|
||||
repeated Quantile quantile = 3 [(gogoproto.nullable) = false];
|
||||
|
||||
google.protobuf.Timestamp created_timestamp = 4;
|
||||
}
|
||||
|
||||
message Untyped {
|
||||
double value = 1;
|
||||
}
|
||||
|
||||
message Histogram {
|
||||
uint64 sample_count = 1;
|
||||
double sample_count_float = 4; // Overrides sample_count if > 0.
|
||||
double sample_sum = 2;
|
||||
// Buckets for the conventional histogram.
|
||||
repeated Bucket bucket = 3 [(gogoproto.nullable) = false]; // Ordered in increasing order of upper_bound, +Inf bucket is optional.
|
||||
|
||||
google.protobuf.Timestamp created_timestamp = 15;
|
||||
|
||||
// Everything below here is for native histograms (also known as sparse histograms).
|
||||
// Native histograms are an experimental feature without stability guarantees.
|
||||
|
||||
// schema defines the bucket schema. Currently, valid numbers are -4 <= n <= 8.
|
||||
// They are all for base-2 bucket schemas, where 1 is a bucket boundary in each case, and
|
||||
// then each power of two is divided into 2^n logarithmic buckets.
|
||||
// Or in other words, each bucket boundary is the previous boundary times 2^(2^-n).
|
||||
// In the future, more bucket schemas may be added using numbers < -4 or > 8.
|
||||
sint32 schema = 5;
|
||||
double zero_threshold = 6; // Breadth of the zero bucket.
|
||||
uint64 zero_count = 7; // Count in zero bucket.
|
||||
double zero_count_float = 8; // Overrides sb_zero_count if > 0.
|
||||
|
||||
// Negative buckets for the native histogram.
|
||||
repeated BucketSpan negative_span = 9 [(gogoproto.nullable) = false];
|
||||
// Use either "negative_delta" or "negative_count", the former for
|
||||
// regular histograms with integer counts, the latter for float
|
||||
// histograms.
|
||||
repeated sint64 negative_delta = 10; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
|
||||
repeated double negative_count = 11; // Absolute count of each bucket.
|
||||
|
||||
// Positive buckets for the native histogram.
|
||||
// Use a no-op span (offset 0, length 0) for a native histogram without any
|
||||
// observations yet and with a zero_threshold of 0. Otherwise, it would be
|
||||
// indistinguishable from a classic histogram.
|
||||
repeated BucketSpan positive_span = 12 [(gogoproto.nullable) = false];
|
||||
// Use either "positive_delta" or "positive_count", the former for
|
||||
// regular histograms with integer counts, the latter for float
|
||||
// histograms.
|
||||
repeated sint64 positive_delta = 13; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
|
||||
repeated double positive_count = 14; // Absolute count of each bucket.
|
||||
}
|
||||
|
||||
message Bucket {
|
||||
uint64 cumulative_count = 1; // Cumulative in increasing order.
|
||||
double cumulative_count_float = 4; // Overrides cumulative_count if > 0.
|
||||
double upper_bound = 2; // Inclusive.
|
||||
Exemplar exemplar = 3;
|
||||
}
|
||||
|
||||
// A BucketSpan defines a number of consecutive buckets in a native
|
||||
// histogram with their offset. Logically, it would be more
|
||||
// straightforward to include the bucket counts in the Span. However,
|
||||
// the protobuf representation is more compact in the way the data is
|
||||
// structured here (with all the buckets in a single array separate
|
||||
// from the Spans).
|
||||
message BucketSpan {
|
||||
sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
|
||||
uint32 length = 2; // Length of consecutive buckets.
|
||||
}
|
||||
|
||||
message Exemplar {
|
||||
repeated LabelPair label = 1 [(gogoproto.nullable) = false];
|
||||
double value = 2;
|
||||
google.protobuf.Timestamp timestamp = 3; // OpenMetrics-style.
|
||||
}
|
||||
|
||||
message Metric {
|
||||
repeated LabelPair label = 1 [(gogoproto.nullable) = false];
|
||||
Gauge gauge = 2;
|
||||
Counter counter = 3;
|
||||
Summary summary = 4;
|
||||
Untyped untyped = 5;
|
||||
Histogram histogram = 7;
|
||||
int64 timestamp_ms = 6;
|
||||
}
|
||||
|
||||
message MetricFamily {
|
||||
string name = 1;
|
||||
string help = 2;
|
||||
MetricType type = 3;
|
||||
repeated Metric metric = 4 [(gogoproto.nullable) = false];
|
||||
}
|
|
@ -14,17 +14,16 @@
|
|||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.31.0
|
||||
// protoc v4.25.1
|
||||
// protoc (unknown)
|
||||
// source: remote.proto
|
||||
|
||||
package prompb
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
1175
prompb/remote_vtproto.pb.go
Normal file
1175
prompb/remote_vtproto.pb.go
Normal file
File diff suppressed because it is too large
Load diff
|
@ -14,17 +14,16 @@
|
|||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.31.0
|
||||
// protoc v4.25.1
|
||||
// protoc (unknown)
|
||||
// source: types.proto
|
||||
|
||||
package prompb
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
3275
prompb/types_vtproto.pb.go
Normal file
3275
prompb/types_vtproto.pb.go
Normal file
File diff suppressed because it is too large
Load diff
|
@ -560,7 +560,7 @@ func (p *parser) buildHistogramFromMap(desc *map[string]interface{}) *histogram.
|
|||
}
|
||||
|
||||
func (p *parser) buildHistogramBucketsAndSpans(desc *map[string]interface{}, bucketsKey, offsetKey string,
|
||||
) (buckets []float64, spans []histogram.Span) {
|
||||
) (buckets []float64, spans []*histogram.Span) {
|
||||
bucketCount := 0
|
||||
val, ok := (*desc)[bucketsKey]
|
||||
if ok {
|
||||
|
@ -583,7 +583,7 @@ func (p *parser) buildHistogramBucketsAndSpans(desc *map[string]interface{}, buc
|
|||
}
|
||||
}
|
||||
if bucketCount > 0 {
|
||||
spans = []histogram.Span{{Offset: offset, Length: uint32(bucketCount)}}
|
||||
spans = []*histogram.Span{{Offset: offset, Length: uint32(bucketCount)}}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -10,10 +10,10 @@ if ! [[ "$0" =~ "scripts/genproto.sh" ]]; then
|
|||
exit 255
|
||||
fi
|
||||
|
||||
# if ! [[ $(protoc --version) =~ "3.15.8" ]]; then
|
||||
# echo "could not find protoc 3.15.8, is it installed + in PATH?"
|
||||
# exit 255
|
||||
# fi
|
||||
if ! [[ $(buf --version) =~ 1.28.1 ]]; then
|
||||
echo "could not find buf 1.28.1, is it installed + in PATH?"
|
||||
exit 255
|
||||
fi
|
||||
|
||||
# Since we run go install, go mod download, the go.sum will change.
|
||||
# Make a backup.
|
||||
|
@ -38,47 +38,6 @@ for pkg in "${GET_PKGS[@]}"; do
|
|||
done
|
||||
|
||||
|
||||
MAPPINGS=(
|
||||
"google/protobuf/descriptor.proto=github.com/golang/protobuf/protoc-gen-go/descriptor"
|
||||
)
|
||||
MAPPING_ARG=""
|
||||
for mapping in "${MAPPINGS[@]}"
|
||||
do
|
||||
MAPPING_ARG+="M$mapping"
|
||||
done
|
||||
|
||||
PROM_ROOT="${PWD}"
|
||||
PROM_PATH="${PROM_ROOT}/prompb"
|
||||
# GOGOPROTO_ROOT="$(GO111MODULE=on go list -mod=readonly -f '{{ .Dir }}' -m github.com/gogo/protobuf)"
|
||||
# GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf"
|
||||
GRPC_GATEWAY_ROOT="$(GO111MODULE=on go list -mod=readonly -f '{{ .Dir }}' -m github.com/grpc-ecosystem/grpc-gateway)"
|
||||
|
||||
DIRS="prompb"
|
||||
|
||||
echo "generating code"
|
||||
for dir in ${DIRS}; do
|
||||
pushd ${dir}
|
||||
protoc \
|
||||
--go_out="$MAPPING_ARG":. \
|
||||
--go_opt=paths=source_relative \
|
||||
--go-grpc_out="$MAPPING_ARG":. \
|
||||
--go-grpc_opt=paths=source_relative \
|
||||
-I=. \
|
||||
-I="${PROM_PATH}" \
|
||||
-I="${GRPC_GATEWAY_ROOT}/third_party/googleapis" \
|
||||
./*.proto
|
||||
# protoc --gogofast_out=Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,paths=source_relative:. -I=. \
|
||||
# -I="${GOGOPROTO_PATH}" \
|
||||
# ./io/prometheus/client/*.proto
|
||||
# sed -i.bak -E 's/import _ \"github.com\/gogo\/protobuf\/gogoproto\"//g' -- *.pb.go
|
||||
# sed -i.bak -E 's/import _ \"google\/protobuf\"//g' -- *.pb.go
|
||||
# sed -i.bak -E 's/\t_ \"google\/protobuf\"//g' -- *.pb.go
|
||||
# sed -i.bak -E 's/golang\/protobuf\/descriptor/gogo\/protobuf\/protoc-gen-gogo\/descriptor/g' -- *.go
|
||||
# sed -i.bak -E 's/golang\/protobuf/gogo\/protobuf/g' -- *.go
|
||||
# rm -f -- *.bak
|
||||
rm -f ./*.bak
|
||||
goimports -w ./*.go ./io/prometheus/client/*.go
|
||||
popd
|
||||
done
|
||||
buf generate --verbose
|
||||
|
||||
mv go.sum.bak go.sum
|
||||
|
|
|
@ -132,8 +132,8 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
|
|||
iter = series.Iterator(iter)
|
||||
|
||||
var (
|
||||
samples []prompb.Sample
|
||||
histograms []prompb.Histogram
|
||||
samples []*prompb.Sample
|
||||
histograms []*prompb.Histogram
|
||||
)
|
||||
|
||||
for valType := iter.Next(); valType != chunkenc.ValNone; valType = iter.Next() {
|
||||
|
@ -148,7 +148,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
|
|||
switch valType {
|
||||
case chunkenc.ValFloat:
|
||||
ts, val := iter.At()
|
||||
samples = append(samples, prompb.Sample{
|
||||
samples = append(samples, &prompb.Sample{
|
||||
Timestamp: ts,
|
||||
Value: val,
|
||||
})
|
||||
|
@ -183,7 +183,17 @@ func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet
|
|||
return errSeriesSet{err: err}
|
||||
}
|
||||
lbls := labelProtosToLabels(ts.Labels)
|
||||
series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms})
|
||||
var samples []prompb.Sample
|
||||
var histograms []*prompb.Histogram
|
||||
|
||||
for _, s := range ts.Samples {
|
||||
samples = append(samples, *s)
|
||||
}
|
||||
for _, h := range ts.Histograms {
|
||||
histograms = append(histograms, h)
|
||||
}
|
||||
series = append(series, &concreteSeries{labels: lbls, floats: samples, histograms: histograms})
|
||||
|
||||
}
|
||||
|
||||
if sortSeries {
|
||||
|
@ -222,13 +232,13 @@ func StreamChunkedReadResponses(
|
|||
stream io.Writer,
|
||||
queryIndex int64,
|
||||
ss storage.ChunkSeriesSet,
|
||||
sortedExternalLabels []prompb.Label,
|
||||
sortedExternalLabels []*prompb.Label,
|
||||
maxBytesInFrame int,
|
||||
marshalPool *sync.Pool,
|
||||
) (annotations.Annotations, error) {
|
||||
var (
|
||||
chks []prompb.Chunk
|
||||
lbls []prompb.Label
|
||||
chks []*prompb.Chunk
|
||||
lbls []*prompb.Label
|
||||
iter chunks.Iterator
|
||||
)
|
||||
|
||||
|
@ -239,7 +249,7 @@ func StreamChunkedReadResponses(
|
|||
|
||||
maxDataLength := maxBytesInFrame
|
||||
for _, lbl := range lbls {
|
||||
maxDataLength -= lbl.Size()
|
||||
maxDataLength -= lbl.SizeVT()
|
||||
}
|
||||
frameBytesLeft := maxDataLength
|
||||
|
||||
|
@ -254,13 +264,13 @@ func StreamChunkedReadResponses(
|
|||
}
|
||||
|
||||
// Cut the chunk.
|
||||
chks = append(chks, prompb.Chunk{
|
||||
chks = append(chks, &prompb.Chunk{
|
||||
MinTimeMs: chk.MinTime,
|
||||
MaxTimeMs: chk.MaxTime,
|
||||
Type: prompb.Chunk_Encoding(chk.Chunk.Encoding()),
|
||||
Data: chk.Chunk.Bytes(),
|
||||
})
|
||||
frameBytesLeft -= chks[len(chks)-1].Size()
|
||||
frameBytesLeft -= chks[len(chks)-1].SizeVT()
|
||||
|
||||
// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
|
||||
isNext = iter.Next()
|
||||
|
@ -298,8 +308,8 @@ func StreamChunkedReadResponses(
|
|||
|
||||
// MergeLabels merges two sets of sorted proto labels, preferring those in
|
||||
// primary to those in secondary when there is an overlap.
|
||||
func MergeLabels(primary, secondary []prompb.Label) []prompb.Label {
|
||||
result := make([]prompb.Label, 0, len(primary)+len(secondary))
|
||||
func MergeLabels(primary, secondary []*prompb.Label) []*prompb.Label {
|
||||
result := make([]*prompb.Label, 0, len(primary)+len(secondary))
|
||||
i, j := 0, 0
|
||||
for i < len(primary) && j < len(secondary) {
|
||||
switch {
|
||||
|
@ -368,7 +378,7 @@ func (c *concreteSeriesSet) Warnings() annotations.Annotations { return nil }
|
|||
type concreteSeries struct {
|
||||
labels labels.Labels
|
||||
floats []prompb.Sample
|
||||
histograms []prompb.Histogram
|
||||
histograms []*prompb.Histogram
|
||||
}
|
||||
|
||||
func (c *concreteSeries) Labels() labels.Labels {
|
||||
|
@ -440,7 +450,7 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
|||
if c.series.floats[c.floatsCur].Timestamp <= c.series.histograms[c.histogramsCur].Timestamp {
|
||||
c.curValType = chunkenc.ValFloat
|
||||
} else {
|
||||
c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur])
|
||||
c.curValType = getHistogramValType(c.series.histograms[c.histogramsCur])
|
||||
}
|
||||
// When the timestamps do not overlap the cursor for the non-selected sample type has advanced too
|
||||
// far; we decrement it back down here.
|
||||
|
@ -454,7 +464,7 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
|||
case c.floatsCur < len(c.series.floats):
|
||||
c.curValType = chunkenc.ValFloat
|
||||
case c.histogramsCur < len(c.series.histograms):
|
||||
c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur])
|
||||
c.curValType = getHistogramValType(c.series.histograms[c.histogramsCur])
|
||||
}
|
||||
return c.curValType
|
||||
}
|
||||
|
@ -548,7 +558,7 @@ func (c *concreteSeriesIterator) Err() error {
|
|||
|
||||
// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read,
|
||||
// also making sure that there are no labels with duplicate names.
|
||||
func validateLabelsAndMetricName(ls []prompb.Label) error {
|
||||
func validateLabelsAndMetricName(ls []*prompb.Label) error {
|
||||
for i, l := range ls {
|
||||
if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) {
|
||||
return fmt.Errorf("invalid metric name: %v", l.Value)
|
||||
|
@ -617,7 +627,7 @@ func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
|
||||
func exemplarProtoToExemplar(ep *prompb.Exemplar) exemplar.Exemplar {
|
||||
timestamp := ep.Timestamp
|
||||
|
||||
return exemplar.Exemplar{
|
||||
|
@ -631,7 +641,7 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
|
|||
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
|
||||
// provided proto message. The caller has to make sure that the proto message
|
||||
// represents an integer histogram and not a float histogram, or it panics.
|
||||
func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
|
||||
func HistogramProtoToHistogram(hp *prompb.Histogram) *histogram.Histogram {
|
||||
if hp.IsFloatHistogram() {
|
||||
panic("HistogramProtoToHistogram called with a float histogram")
|
||||
}
|
||||
|
@ -653,7 +663,7 @@ func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
|
|||
// provided proto message to a Float Histogram. The caller has to make sure that
|
||||
// the proto message represents a float histogram and not an integer histogram,
|
||||
// or it panics.
|
||||
func FloatHistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
|
||||
func FloatHistogramProtoToFloatHistogram(hp *prompb.Histogram) *histogram.FloatHistogram {
|
||||
if !hp.IsFloatHistogram() {
|
||||
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
|
||||
}
|
||||
|
@ -674,7 +684,7 @@ func FloatHistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHi
|
|||
// HistogramProtoToFloatHistogram extracts and converts a (normal integer) histogram from the provided proto message
|
||||
// to a float histogram. The caller has to make sure that the proto message represents an integer histogram and not a
|
||||
// float histogram, or it panics.
|
||||
func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
|
||||
func HistogramProtoToFloatHistogram(hp *prompb.Histogram) *histogram.FloatHistogram {
|
||||
if hp.IsFloatHistogram() {
|
||||
panic("HistogramProtoToFloatHistogram called with a float histogram")
|
||||
}
|
||||
|
@ -692,10 +702,10 @@ func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogr
|
|||
}
|
||||
}
|
||||
|
||||
func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span {
|
||||
spans := make([]histogram.Span, len(s))
|
||||
func spansProtoToSpans(s []*prompb.BucketSpan) []*histogram.Span {
|
||||
spans := make([]*histogram.Span, len(s))
|
||||
for i := 0; i < len(s); i++ {
|
||||
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
|
||||
spans[i] = &histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
|
||||
}
|
||||
|
||||
return spans
|
||||
|
@ -711,8 +721,8 @@ func deltasToCounts(deltas []int64) []float64 {
|
|||
return counts
|
||||
}
|
||||
|
||||
func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram {
|
||||
return prompb.Histogram{
|
||||
func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) *prompb.Histogram {
|
||||
return &prompb.Histogram{
|
||||
Count: &prompb.Histogram_CountInt{CountInt: h.Count},
|
||||
Sum: h.Sum,
|
||||
Schema: h.Schema,
|
||||
|
@ -727,8 +737,8 @@ func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.H
|
|||
}
|
||||
}
|
||||
|
||||
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) prompb.Histogram {
|
||||
return prompb.Histogram{
|
||||
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) *prompb.Histogram {
|
||||
return &prompb.Histogram{
|
||||
Count: &prompb.Histogram_CountFloat{CountFloat: fh.Count},
|
||||
Sum: fh.Sum,
|
||||
Schema: fh.Schema,
|
||||
|
@ -743,10 +753,10 @@ func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogra
|
|||
}
|
||||
}
|
||||
|
||||
func spansToSpansProto(s []histogram.Span) []prompb.BucketSpan {
|
||||
spans := make([]prompb.BucketSpan, len(s))
|
||||
func spansToSpansProto(s []*histogram.Span) []*prompb.BucketSpan {
|
||||
spans := make([]*prompb.BucketSpan, len(s))
|
||||
for i := 0; i < len(s); i++ {
|
||||
spans[i] = prompb.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
|
||||
spans[i] = &prompb.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
|
||||
}
|
||||
|
||||
return spans
|
||||
|
@ -761,7 +771,7 @@ func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
|
|||
return metric
|
||||
}
|
||||
|
||||
func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels {
|
||||
func labelProtosToLabels(labelPairs []*prompb.Label) labels.Labels {
|
||||
b := labels.ScratchBuilder{}
|
||||
for _, l := range labelPairs {
|
||||
b.Add(l.Name, l.Value)
|
||||
|
@ -772,10 +782,10 @@ func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels {
|
|||
|
||||
// labelsToLabelsProto transforms labels into prompb labels. The buffer slice
|
||||
// will be used to avoid allocations if it is big enough to store the labels.
|
||||
func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label {
|
||||
func labelsToLabelsProto(lbls labels.Labels, buf []*prompb.Label) []*prompb.Label {
|
||||
result := buf[:0]
|
||||
lbls.Range(func(l labels.Label) {
|
||||
result = append(result, prompb.Label{
|
||||
result = append(result, &prompb.Label{
|
||||
Name: l.Name,
|
||||
Value: l.Value,
|
||||
})
|
||||
|
|
|
@ -39,49 +39,49 @@ var testHistogram = histogram.Histogram{
|
|||
ZeroCount: 0,
|
||||
Count: 0,
|
||||
Sum: 20,
|
||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
||||
PositiveSpans: []*histogram.Span{{Offset: 0, Length: 1}},
|
||||
PositiveBuckets: []int64{1},
|
||||
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
||||
NegativeSpans: []*histogram.Span{{Offset: 0, Length: 1}},
|
||||
NegativeBuckets: []int64{-1},
|
||||
}
|
||||
|
||||
var writeRequestFixture = &prompb.WriteRequest{
|
||||
Timeseries: []prompb.TimeSeries{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar"},
|
||||
},
|
||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
|
||||
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
|
||||
Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||
Exemplars: []*prompb.Exemplar{{Labels: []*prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
|
||||
Histograms: []*prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar"},
|
||||
},
|
||||
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
|
||||
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
|
||||
Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat())},
|
||||
Samples: []*prompb.Sample{{Value: 2, Timestamp: 1}},
|
||||
Exemplars: []*prompb.Exemplar{{Labels: []*prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
|
||||
Histograms: []*prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat())},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestValidateLabelsAndMetricName(t *testing.T) {
|
||||
tests := []struct {
|
||||
input []prompb.Label
|
||||
input []*prompb.Label
|
||||
expectedErr string
|
||||
description string
|
||||
}{
|
||||
{
|
||||
input: []prompb.Label{
|
||||
input: []*prompb.Label{
|
||||
{Name: "__name__", Value: "name"},
|
||||
{Name: "labelName", Value: "labelValue"},
|
||||
},
|
||||
|
@ -89,7 +89,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
|
|||
description: "regular labels",
|
||||
},
|
||||
{
|
||||
input: []prompb.Label{
|
||||
input: []*prompb.Label{
|
||||
{Name: "__name__", Value: "name"},
|
||||
{Name: "_labelName", Value: "labelValue"},
|
||||
},
|
||||
|
@ -97,7 +97,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
|
|||
description: "label name with _",
|
||||
},
|
||||
{
|
||||
input: []prompb.Label{
|
||||
input: []*prompb.Label{
|
||||
{Name: "__name__", Value: "name"},
|
||||
{Name: "@labelName", Value: "labelValue"},
|
||||
},
|
||||
|
@ -105,7 +105,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
|
|||
description: "label name with @",
|
||||
},
|
||||
{
|
||||
input: []prompb.Label{
|
||||
input: []*prompb.Label{
|
||||
{Name: "__name__", Value: "name"},
|
||||
{Name: "123labelName", Value: "labelValue"},
|
||||
},
|
||||
|
@ -113,7 +113,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
|
|||
description: "label name starts with numbers",
|
||||
},
|
||||
{
|
||||
input: []prompb.Label{
|
||||
input: []*prompb.Label{
|
||||
{Name: "__name__", Value: "name"},
|
||||
{Name: "", Value: "labelValue"},
|
||||
},
|
||||
|
@ -121,7 +121,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
|
|||
description: "label name is empty string",
|
||||
},
|
||||
{
|
||||
input: []prompb.Label{
|
||||
input: []*prompb.Label{
|
||||
{Name: "__name__", Value: "name"},
|
||||
{Name: "labelName", Value: string([]byte{0xff})},
|
||||
},
|
||||
|
@ -129,14 +129,14 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
|
|||
description: "label value is an invalid UTF-8 value",
|
||||
},
|
||||
{
|
||||
input: []prompb.Label{
|
||||
input: []*prompb.Label{
|
||||
{Name: "__name__", Value: "@invalid_name"},
|
||||
},
|
||||
expectedErr: "invalid metric name: @invalid_name",
|
||||
description: "metric name starts with @",
|
||||
},
|
||||
{
|
||||
input: []prompb.Label{
|
||||
input: []*prompb.Label{
|
||||
{Name: "__name__", Value: "name1"},
|
||||
{Name: "__name__", Value: "name2"},
|
||||
},
|
||||
|
@ -144,7 +144,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
|
|||
description: "duplicate label names",
|
||||
},
|
||||
{
|
||||
input: []prompb.Label{
|
||||
input: []*prompb.Label{
|
||||
{Name: "label1", Value: "name"},
|
||||
{Name: "label2", Value: "name"},
|
||||
},
|
||||
|
@ -152,7 +152,7 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
|
|||
description: "duplicate label values",
|
||||
},
|
||||
{
|
||||
input: []prompb.Label{
|
||||
input: []*prompb.Label{
|
||||
{Name: "", Value: "name"},
|
||||
{Name: "label2", Value: "name"},
|
||||
},
|
||||
|
@ -259,7 +259,7 @@ func TestConcreteSeriesIterator_FloatSamples(t *testing.T) {
|
|||
|
||||
func TestConcreteSeriesIterator_HistogramSamples(t *testing.T) {
|
||||
histograms := tsdbutil.GenerateTestHistograms(5)
|
||||
histProtos := make([]prompb.Histogram, len(histograms))
|
||||
histProtos := make([]*prompb.Histogram, len(histograms))
|
||||
for i, h := range histograms {
|
||||
// Results in ts sequence of 1, 1, 2, 3, 4.
|
||||
var ts int64
|
||||
|
@ -316,7 +316,7 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
|
|||
// Series starts as histograms, then transitions to floats at ts=8 (with an overlap from ts=8 to ts=10), then
|
||||
// transitions back to histograms at ts=16.
|
||||
histograms := tsdbutil.GenerateTestHistograms(15)
|
||||
histProtos := make([]prompb.Histogram, len(histograms))
|
||||
histProtos := make([]*prompb.Histogram, len(histograms))
|
||||
for i, h := range histograms {
|
||||
if i < 10 {
|
||||
histProtos[i] = HistogramToHistogramProto(int64(i+1), h)
|
||||
|
@ -418,11 +418,11 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
|
|||
|
||||
func TestFromQueryResultWithDuplicates(t *testing.T) {
|
||||
ts1 := prompb.TimeSeries{
|
||||
Labels: []prompb.Label{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "foo", Value: "bar"},
|
||||
{Name: "foo", Value: "def"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
Samples: []*prompb.Sample{
|
||||
{Value: 0.0, Timestamp: 0},
|
||||
},
|
||||
}
|
||||
|
@ -468,17 +468,17 @@ func TestNegotiateResponseType(t *testing.T) {
|
|||
|
||||
func TestMergeLabels(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
primary, secondary, expected []prompb.Label
|
||||
primary, secondary, expected []*prompb.Label
|
||||
}{
|
||||
{
|
||||
primary: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
|
||||
secondary: []prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
|
||||
expected: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
|
||||
primary: []*prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
|
||||
secondary: []*prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
|
||||
expected: []*prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
|
||||
},
|
||||
{
|
||||
primary: []prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
|
||||
secondary: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
|
||||
expected: []prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
|
||||
primary: []*prompb.Label{{Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}},
|
||||
secondary: []*prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "foo"}, {Name: "ddd", Value: "foo"}},
|
||||
expected: []*prompb.Label{{Name: "aaa", Value: "foo"}, {Name: "bbb", Value: "bar"}, {Name: "ccc", Value: "bar"}, {Name: "ddd", Value: "foo"}},
|
||||
},
|
||||
} {
|
||||
require.Equal(t, tc.expected, MergeLabels(tc.primary, tc.secondary))
|
||||
|
@ -528,8 +528,8 @@ func TestDecodeWriteRequest(t *testing.T) {
|
|||
func TestNilHistogramProto(*testing.T) {
|
||||
// This function will panic if it impromperly handles nil
|
||||
// values, causing the test to fail.
|
||||
HistogramProtoToHistogram(prompb.Histogram{})
|
||||
HistogramProtoToFloatHistogram(prompb.Histogram{})
|
||||
HistogramProtoToHistogram(&prompb.Histogram{})
|
||||
HistogramProtoToFloatHistogram(&prompb.Histogram{})
|
||||
}
|
||||
|
||||
func exampleHistogram() histogram.Histogram {
|
||||
|
@ -538,13 +538,13 @@ func exampleHistogram() histogram.Histogram {
|
|||
Schema: 0,
|
||||
Count: 19,
|
||||
Sum: 2.7,
|
||||
PositiveSpans: []histogram.Span{
|
||||
PositiveSpans: []*histogram.Span{
|
||||
{Offset: 0, Length: 4},
|
||||
{Offset: 0, Length: 0},
|
||||
{Offset: 0, Length: 3},
|
||||
},
|
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
|
||||
NegativeSpans: []histogram.Span{
|
||||
NegativeSpans: []*histogram.Span{
|
||||
{Offset: 0, Length: 5},
|
||||
{Offset: 1, Length: 0},
|
||||
{Offset: 0, Length: 1},
|
||||
|
@ -560,7 +560,7 @@ func exampleHistogramProto() prompb.Histogram {
|
|||
Schema: 0,
|
||||
ZeroThreshold: 0,
|
||||
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: 0},
|
||||
NegativeSpans: []prompb.BucketSpan{
|
||||
NegativeSpans: []*prompb.BucketSpan{
|
||||
{
|
||||
Offset: 0,
|
||||
Length: 5,
|
||||
|
@ -575,7 +575,7 @@ func exampleHistogramProto() prompb.Histogram {
|
|||
},
|
||||
},
|
||||
NegativeDeltas: []int64{1, 2, -2, 1, -1, 0},
|
||||
PositiveSpans: []prompb.BucketSpan{
|
||||
PositiveSpans: []*prompb.BucketSpan{
|
||||
{
|
||||
Offset: 0,
|
||||
Length: 4,
|
||||
|
@ -626,7 +626,7 @@ func TestHistogramToProtoConvert(t *testing.T) {
|
|||
|
||||
require.Equal(t, p, HistogramToHistogramProto(1337, &h))
|
||||
|
||||
require.Equal(t, h, *HistogramProtoToHistogram(p))
|
||||
require.Equal(t, h, *HistogramProtoToHistogram(&p))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -636,13 +636,13 @@ func exampleFloatHistogram() histogram.FloatHistogram {
|
|||
Schema: 0,
|
||||
Count: 19,
|
||||
Sum: 2.7,
|
||||
PositiveSpans: []histogram.Span{
|
||||
PositiveSpans: []*histogram.Span{
|
||||
{Offset: 0, Length: 4},
|
||||
{Offset: 0, Length: 0},
|
||||
{Offset: 0, Length: 3},
|
||||
},
|
||||
PositiveBuckets: []float64{1, 2, -2, 1, -1, 0, 0},
|
||||
NegativeSpans: []histogram.Span{
|
||||
NegativeSpans: []*histogram.Span{
|
||||
{Offset: 0, Length: 5},
|
||||
{Offset: 1, Length: 0},
|
||||
{Offset: 0, Length: 1},
|
||||
|
@ -658,7 +658,7 @@ func exampleFloatHistogramProto() prompb.Histogram {
|
|||
Schema: 0,
|
||||
ZeroThreshold: 0,
|
||||
ZeroCount: &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: 0},
|
||||
NegativeSpans: []prompb.BucketSpan{
|
||||
NegativeSpans: []*prompb.BucketSpan{
|
||||
{
|
||||
Offset: 0,
|
||||
Length: 5,
|
||||
|
@ -673,7 +673,7 @@ func exampleFloatHistogramProto() prompb.Histogram {
|
|||
},
|
||||
},
|
||||
NegativeCounts: []float64{1, 2, -2, 1, -1, 0},
|
||||
PositiveSpans: []prompb.BucketSpan{
|
||||
PositiveSpans: []*prompb.BucketSpan{
|
||||
{
|
||||
Offset: 0,
|
||||
Length: 4,
|
||||
|
@ -724,28 +724,28 @@ func TestFloatHistogramToProtoConvert(t *testing.T) {
|
|||
|
||||
require.Equal(t, p, FloatHistogramToHistogramProto(1337, &h))
|
||||
|
||||
require.Equal(t, h, *FloatHistogramProtoToFloatHistogram(p))
|
||||
require.Equal(t, h, *FloatHistogramProtoToFloatHistogram(&p))
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamResponse(t *testing.T) {
|
||||
lbs1 := labelsToLabelsProto(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil)
|
||||
lbs2 := labelsToLabelsProto(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil)
|
||||
chunk := prompb.Chunk{
|
||||
chunk := &prompb.Chunk{
|
||||
Type: prompb.Chunk_XOR,
|
||||
Data: make([]byte, 100),
|
||||
}
|
||||
lbSize, chunkSize := 0, chunk.Size()
|
||||
lbSize, chunkSize := 0, chunk.SizeVT()
|
||||
for _, lb := range lbs1 {
|
||||
lbSize += lb.Size()
|
||||
lbSize += lb.SizeVT()
|
||||
}
|
||||
maxBytesInFrame := lbSize + chunkSize*2
|
||||
testData := []*prompb.ChunkedSeries{{
|
||||
Labels: lbs1,
|
||||
Chunks: []prompb.Chunk{chunk, chunk, chunk, chunk},
|
||||
Chunks: []*prompb.Chunk{chunk, chunk, chunk, chunk},
|
||||
}, {
|
||||
Labels: lbs2,
|
||||
Chunks: []prompb.Chunk{chunk, chunk, chunk, chunk},
|
||||
Chunks: []*prompb.Chunk{chunk, chunk, chunk, chunk},
|
||||
}}
|
||||
css := newMockChunkSeriesSet(testData)
|
||||
writer := mockWriter{}
|
||||
|
@ -758,16 +758,16 @@ func TestStreamResponse(t *testing.T) {
|
|||
require.Nil(t, err)
|
||||
expectData := []*prompb.ChunkedSeries{{
|
||||
Labels: lbs1,
|
||||
Chunks: []prompb.Chunk{chunk, chunk},
|
||||
Chunks: []*prompb.Chunk{chunk, chunk},
|
||||
}, {
|
||||
Labels: lbs1,
|
||||
Chunks: []prompb.Chunk{chunk, chunk},
|
||||
Chunks: []*prompb.Chunk{chunk, chunk},
|
||||
}, {
|
||||
Labels: lbs2,
|
||||
Chunks: []prompb.Chunk{chunk, chunk},
|
||||
Chunks: []*prompb.Chunk{chunk, chunk},
|
||||
}, {
|
||||
Labels: lbs2,
|
||||
Chunks: []prompb.Chunk{chunk, chunk},
|
||||
Chunks: []*prompb.Chunk{chunk, chunk},
|
||||
}}
|
||||
require.Equal(t, expectData, writer.actual)
|
||||
}
|
||||
|
@ -818,7 +818,7 @@ func (c *mockChunkSeriesSet) Err() error {
|
|||
}
|
||||
|
||||
type mockChunkIterator struct {
|
||||
chunks []prompb.Chunk
|
||||
chunks []*prompb.Chunk
|
||||
index int
|
||||
}
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound
|
|||
func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
|
||||
|
||||
// ByLabelName enables the usage of sort.Sort() with a slice of labels
|
||||
type ByLabelName []prompb.Label
|
||||
type ByLabelName []*prompb.Label
|
||||
|
||||
func (a ByLabelName) Len() int { return len(a) }
|
||||
func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name }
|
||||
|
@ -70,22 +70,22 @@ func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|||
// addSample finds a TimeSeries in tsMap that corresponds to the label set labels, and add sample to the TimeSeries; it
|
||||
// creates a new TimeSeries in the map if not found and returns the time series signature.
|
||||
// tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil.
|
||||
func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, labels []prompb.Label,
|
||||
func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, labels []*prompb.Label,
|
||||
datatype string) string {
|
||||
|
||||
if sample == nil || labels == nil || tsMap == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
sig := timeSeriesSignature(datatype, &labels)
|
||||
sig := timeSeriesSignature(datatype, labels)
|
||||
ts, ok := tsMap[sig]
|
||||
|
||||
if ok {
|
||||
ts.Samples = append(ts.Samples, *sample)
|
||||
ts.Samples = append(ts.Samples, sample)
|
||||
} else {
|
||||
newTs := &prompb.TimeSeries{
|
||||
Labels: labels,
|
||||
Samples: []prompb.Sample{*sample},
|
||||
Samples: []*prompb.Sample{sample},
|
||||
}
|
||||
tsMap[sig] = newTs
|
||||
}
|
||||
|
@ -96,7 +96,7 @@ func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, label
|
|||
// addExemplars finds a bucket bound that corresponds to the exemplars value and add the exemplar to the specific sig;
|
||||
// we only add exemplars if samples are presents
|
||||
// tsMap is unmodified if either of its parameters is nil and samples are nil.
|
||||
func addExemplars(tsMap map[string]*prompb.TimeSeries, exemplars []prompb.Exemplar, bucketBoundsData []bucketBoundsData) {
|
||||
func addExemplars(tsMap map[string]*prompb.TimeSeries, exemplars []*prompb.Exemplar, bucketBoundsData []bucketBoundsData) {
|
||||
if tsMap == nil || bucketBoundsData == nil || exemplars == nil {
|
||||
return
|
||||
}
|
||||
|
@ -108,7 +108,7 @@ func addExemplars(tsMap map[string]*prompb.TimeSeries, exemplars []prompb.Exempl
|
|||
}
|
||||
}
|
||||
|
||||
func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBoundsData, exemplar prompb.Exemplar) {
|
||||
func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBoundsData, exemplar *prompb.Exemplar) {
|
||||
for _, bucketBound := range bucketBounds {
|
||||
sig := bucketBound.sig
|
||||
bound := bucketBound.bound
|
||||
|
@ -131,10 +131,10 @@ func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBound
|
|||
//
|
||||
// the label slice should not contain duplicate label names; this method sorts the slice by label name before creating
|
||||
// the signature.
|
||||
func timeSeriesSignature(datatype string, labels *[]prompb.Label) string {
|
||||
func timeSeriesSignature(datatype string, labels []*prompb.Label) string {
|
||||
length := len(datatype)
|
||||
|
||||
for _, lb := range *labels {
|
||||
for _, lb := range labels {
|
||||
length += 2 + len(lb.GetName()) + len(lb.GetValue())
|
||||
}
|
||||
|
||||
|
@ -142,9 +142,9 @@ func timeSeriesSignature(datatype string, labels *[]prompb.Label) string {
|
|||
b.Grow(length)
|
||||
b.WriteString(datatype)
|
||||
|
||||
sort.Sort(ByLabelName(*labels))
|
||||
sort.Sort(ByLabelName(labels))
|
||||
|
||||
for _, lb := range *labels {
|
||||
for _, lb := range labels {
|
||||
b.WriteString("-")
|
||||
b.WriteString(lb.GetName())
|
||||
b.WriteString("-")
|
||||
|
@ -157,7 +157,7 @@ func timeSeriesSignature(datatype string, labels *[]prompb.Label) string {
|
|||
// createAttributes creates a slice of Cortex Label with OTLP attributes and pairs of string values.
|
||||
// Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is
|
||||
// logged. Resultant label names are sanitized.
|
||||
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, extras ...string) []prompb.Label {
|
||||
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, extras ...string) []*prompb.Label {
|
||||
serviceName, haveServiceName := resource.Attributes().Get(conventions.AttributeServiceName)
|
||||
instance, haveInstanceID := resource.Attributes().Get(conventions.AttributeServiceInstanceID)
|
||||
|
||||
|
@ -177,9 +177,9 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
|
|||
|
||||
// Ensure attributes are sorted by key for consistent merging of keys which
|
||||
// collide when sanitized.
|
||||
labels := make([]prompb.Label, 0, attributes.Len())
|
||||
labels := make([]*prompb.Label, 0, attributes.Len())
|
||||
attributes.Range(func(key string, value pcommon.Value) bool {
|
||||
labels = append(labels, prompb.Label{Name: key, Value: value.AsString()})
|
||||
labels = append(labels, &prompb.Label{Name: key, Value: value.AsString()})
|
||||
return true
|
||||
})
|
||||
sort.Stable(ByLabelName(labels))
|
||||
|
@ -230,9 +230,9 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
|
|||
l[name] = extras[i+1]
|
||||
}
|
||||
|
||||
s := make([]prompb.Label, 0, len(l))
|
||||
s := make([]*prompb.Label, 0, len(l))
|
||||
for k, v := range l {
|
||||
s = append(s, prompb.Label{Name: k, Value: v})
|
||||
s = append(s, &prompb.Label{Name: k, Value: v})
|
||||
}
|
||||
|
||||
return s
|
||||
|
@ -263,16 +263,16 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon
|
|||
baseName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes)
|
||||
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels)
|
||||
|
||||
createLabels := func(nameSuffix string, extras ...string) []prompb.Label {
|
||||
createLabels := func(nameSuffix string, extras ...string) []*prompb.Label {
|
||||
extraLabelCount := len(extras) / 2
|
||||
labels := make([]prompb.Label, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name
|
||||
labels := make([]*prompb.Label, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name
|
||||
copy(labels, baseLabels)
|
||||
|
||||
for extrasIdx := 0; extrasIdx < extraLabelCount; extrasIdx++ {
|
||||
labels = append(labels, prompb.Label{Name: extras[extrasIdx], Value: extras[extrasIdx+1]})
|
||||
labels = append(labels, &prompb.Label{Name: extras[extrasIdx], Value: extras[extrasIdx+1]})
|
||||
}
|
||||
|
||||
labels = append(labels, prompb.Label{Name: nameStr, Value: baseName + nameSuffix})
|
||||
labels = append(labels, &prompb.Label{Name: nameStr, Value: baseName + nameSuffix})
|
||||
|
||||
return labels
|
||||
}
|
||||
|
@ -358,8 +358,8 @@ type exemplarType interface {
|
|||
Exemplars() pmetric.ExemplarSlice
|
||||
}
|
||||
|
||||
func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
|
||||
var promExemplars []prompb.Exemplar
|
||||
func getPromExemplars[T exemplarType](pt T) []*prompb.Exemplar {
|
||||
var promExemplars []*prompb.Exemplar
|
||||
|
||||
for i := 0; i < pt.Exemplars().Len(); i++ {
|
||||
exemplar := pt.Exemplars().At(i)
|
||||
|
@ -372,7 +372,7 @@ func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
|
|||
if traceID := exemplar.TraceID(); !traceID.IsEmpty() {
|
||||
val := hex.EncodeToString(traceID[:])
|
||||
exemplarRunes += utf8.RuneCountInString(traceIDKey) + utf8.RuneCountInString(val)
|
||||
promLabel := prompb.Label{
|
||||
promLabel := &prompb.Label{
|
||||
Name: traceIDKey,
|
||||
Value: val,
|
||||
}
|
||||
|
@ -381,18 +381,18 @@ func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
|
|||
if spanID := exemplar.SpanID(); !spanID.IsEmpty() {
|
||||
val := hex.EncodeToString(spanID[:])
|
||||
exemplarRunes += utf8.RuneCountInString(spanIDKey) + utf8.RuneCountInString(val)
|
||||
promLabel := prompb.Label{
|
||||
promLabel := &prompb.Label{
|
||||
Name: spanIDKey,
|
||||
Value: val,
|
||||
}
|
||||
promExemplar.Labels = append(promExemplar.Labels, promLabel)
|
||||
}
|
||||
var labelsFromAttributes []prompb.Label
|
||||
var labelsFromAttributes []*prompb.Label
|
||||
|
||||
exemplar.FilteredAttributes().Range(func(key string, value pcommon.Value) bool {
|
||||
val := value.AsString()
|
||||
exemplarRunes += utf8.RuneCountInString(key) + utf8.RuneCountInString(val)
|
||||
promLabel := prompb.Label{
|
||||
promLabel := &prompb.Label{
|
||||
Name: key,
|
||||
Value: val,
|
||||
}
|
||||
|
@ -407,7 +407,7 @@ func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
|
|||
promExemplar.Labels = append(promExemplar.Labels, labelsFromAttributes...)
|
||||
}
|
||||
|
||||
promExemplars = append(promExemplars, *promExemplar)
|
||||
promExemplars = append(promExemplars, promExemplar)
|
||||
}
|
||||
|
||||
return promExemplars
|
||||
|
@ -463,16 +463,16 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res
|
|||
baseName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes)
|
||||
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels)
|
||||
|
||||
createLabels := func(name string, extras ...string) []prompb.Label {
|
||||
createLabels := func(name string, extras ...string) []*prompb.Label {
|
||||
extraLabelCount := len(extras) / 2
|
||||
labels := make([]prompb.Label, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name
|
||||
labels := make([]*prompb.Label, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name
|
||||
copy(labels, baseLabels)
|
||||
|
||||
for extrasIdx := 0; extrasIdx < extraLabelCount; extrasIdx++ {
|
||||
labels = append(labels, prompb.Label{Name: extras[extrasIdx], Value: extras[extrasIdx+1]})
|
||||
labels = append(labels, &prompb.Label{Name: extras[extrasIdx], Value: extras[extrasIdx+1]})
|
||||
}
|
||||
|
||||
labels = append(labels, prompb.Label{Name: nameStr, Value: name})
|
||||
labels = append(labels, &prompb.Label{Name: nameStr, Value: name})
|
||||
|
||||
return labels
|
||||
}
|
||||
|
@ -526,15 +526,15 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res
|
|||
// sample. If the series exists, then new samples won't be added.
|
||||
func addCreatedTimeSeriesIfNeeded(
|
||||
series map[string]*prompb.TimeSeries,
|
||||
labels []prompb.Label,
|
||||
labels []*prompb.Label,
|
||||
startTimestamp pcommon.Timestamp,
|
||||
metricType string,
|
||||
) {
|
||||
sig := timeSeriesSignature(metricType, &labels)
|
||||
sig := timeSeriesSignature(metricType, labels)
|
||||
if _, ok := series[sig]; !ok {
|
||||
series[sig] = &prompb.TimeSeries{
|
||||
Labels: labels,
|
||||
Samples: []prompb.Sample{
|
||||
Samples: []*prompb.Sample{
|
||||
{ // convert ns to ms
|
||||
Value: float64(convertTimeStamp(startTimestamp)),
|
||||
},
|
||||
|
|
|
@ -34,7 +34,7 @@ func addSingleExponentialHistogramDataPoint(
|
|||
|
||||
sig := timeSeriesSignature(
|
||||
pmetric.MetricTypeExponentialHistogram.String(),
|
||||
&labels,
|
||||
labels,
|
||||
)
|
||||
ts, ok := series[sig]
|
||||
if !ok {
|
||||
|
@ -58,10 +58,10 @@ func addSingleExponentialHistogramDataPoint(
|
|||
|
||||
// exponentialToNativeHistogram translates OTel Exponential Histogram data point
|
||||
// to Prometheus Native Histogram.
|
||||
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, error) {
|
||||
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (*prompb.Histogram, error) {
|
||||
scale := p.Scale()
|
||||
if scale < -4 {
|
||||
return prompb.Histogram{},
|
||||
return &prompb.Histogram{},
|
||||
fmt.Errorf("cannot convert exponential to native histogram."+
|
||||
" Scale must be >= -4, was %d", scale)
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom
|
|||
pSpans, pDeltas := convertBucketsLayout(p.Positive(), scaleDown)
|
||||
nSpans, nDeltas := convertBucketsLayout(p.Negative(), scaleDown)
|
||||
|
||||
h := prompb.Histogram{
|
||||
h := &prompb.Histogram{
|
||||
Schema: scale,
|
||||
|
||||
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: p.ZeroCount()},
|
||||
|
@ -113,14 +113,14 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom
|
|||
// to the range (base 1].
|
||||
//
|
||||
// scaleDown is the factor by which the buckets are scaled down. In other words 2^scaleDown buckets will be merged into one.
|
||||
func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, scaleDown int32) ([]prompb.BucketSpan, []int64) {
|
||||
func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, scaleDown int32) ([]*prompb.BucketSpan, []int64) {
|
||||
bucketCounts := buckets.BucketCounts()
|
||||
if bucketCounts.Len() == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var (
|
||||
spans []prompb.BucketSpan
|
||||
spans []*prompb.BucketSpan
|
||||
deltas []int64
|
||||
count int64
|
||||
prevCount int64
|
||||
|
@ -138,7 +138,7 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets,
|
|||
|
||||
// The offset is scaled and adjusted by 1 as described above.
|
||||
bucketIdx := buckets.Offset()>>scaleDown + 1
|
||||
spans = append(spans, prompb.BucketSpan{
|
||||
spans = append(spans, &prompb.BucketSpan{
|
||||
Offset: bucketIdx,
|
||||
Length: 0,
|
||||
})
|
||||
|
@ -160,7 +160,7 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets,
|
|||
// We have to create a new span, because we have found a gap
|
||||
// of more than two buckets. The constant 2 is copied from the logic in
|
||||
// https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296
|
||||
spans = append(spans, prompb.BucketSpan{
|
||||
spans = append(spans, &prompb.BucketSpan{
|
||||
Offset: gap,
|
||||
Length: 0,
|
||||
})
|
||||
|
@ -181,7 +181,7 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets,
|
|||
// We have to create a new span, because we have found a gap
|
||||
// of more than two buckets. The constant 2 is copied from the logic in
|
||||
// https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296
|
||||
spans = append(spans, prompb.BucketSpan{
|
||||
spans = append(spans, &prompb.BucketSpan{
|
||||
Offset: gap,
|
||||
Length: 0,
|
||||
})
|
||||
|
|
|
@ -502,9 +502,9 @@ func NewQueueManager(
|
|||
|
||||
// AppendMetadata sends metadata to the remote storage. Metadata is sent in batches, but is not parallelized.
|
||||
func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) {
|
||||
mm := make([]prompb.MetricMetadata, 0, len(metadata))
|
||||
mm := make([]*prompb.MetricMetadata, 0, len(metadata))
|
||||
for _, entry := range metadata {
|
||||
mm = append(mm, prompb.MetricMetadata{
|
||||
mm = append(mm, &prompb.MetricMetadata{
|
||||
MetricFamilyName: entry.Metric,
|
||||
Help: entry.Help,
|
||||
Type: metricTypeToMetricTypeProto(entry.Type),
|
||||
|
@ -527,7 +527,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met
|
|||
}
|
||||
}
|
||||
|
||||
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
|
||||
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []*prompb.MetricMetadata, pBuf *proto.Buffer) error {
|
||||
// Build the WriteRequest with no samples.
|
||||
req, _, err := buildWriteRequest(nil, metadata, pBuf, nil)
|
||||
if err != nil {
|
||||
|
@ -1357,11 +1357,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
|||
}
|
||||
|
||||
batchQueue := queue.Chan()
|
||||
pendingData := make([]prompb.TimeSeries, max)
|
||||
pendingData := make([]*prompb.TimeSeries, max)
|
||||
for i := range pendingData {
|
||||
pendingData[i].Samples = []prompb.Sample{{}}
|
||||
pendingData[i].Samples = []*prompb.Sample{{}}
|
||||
if s.qm.sendExemplars {
|
||||
pendingData[i].Exemplars = []prompb.Exemplar{{}}
|
||||
pendingData[i].Exemplars = []*prompb.Exemplar{{}}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1422,14 +1422,15 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int) {
|
||||
func populateTimeSeries(batch []timeSeries, pendingData []*prompb.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) {
|
||||
var nPendingSamples, nPendingExemplars, nPendingHistograms int
|
||||
for nPending, d := range batch {
|
||||
//fmt.Println("pending:", pendingData[nPending])
|
||||
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
|
||||
if s.qm.sendExemplars {
|
||||
if sendExemplars {
|
||||
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
|
||||
}
|
||||
if s.qm.sendNativeHistograms {
|
||||
if sendNativeHistograms {
|
||||
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
|
||||
}
|
||||
|
||||
|
@ -1439,13 +1440,13 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
|
|||
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
|
||||
switch d.sType {
|
||||
case tSample:
|
||||
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
|
||||
pendingData[nPending].Samples = append(pendingData[nPending].Samples, &prompb.Sample{
|
||||
Value: d.value,
|
||||
Timestamp: d.timestamp,
|
||||
})
|
||||
nPendingSamples++
|
||||
case tExemplar:
|
||||
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
|
||||
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, &prompb.Exemplar{
|
||||
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
|
||||
Value: d.value,
|
||||
Timestamp: d.timestamp,
|
||||
|
@ -1462,7 +1463,47 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
|
|||
return nPendingSamples, nPendingExemplars, nPendingHistograms
|
||||
}
|
||||
|
||||
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
|
||||
func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []*prompb.TimeSeries) (int, int, int) {
|
||||
var nPendingSamples, nPendingExemplars, nPendingHistograms int
|
||||
for nPending, d := range batch {
|
||||
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
|
||||
if s.qm.sendExemplars {
|
||||
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
|
||||
}
|
||||
if s.qm.sendNativeHistograms {
|
||||
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
|
||||
}
|
||||
|
||||
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
|
||||
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
|
||||
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
|
||||
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
|
||||
switch d.sType {
|
||||
case tSample:
|
||||
pendingData[nPending].Samples = append(pendingData[nPending].Samples, &prompb.Sample{
|
||||
Value: d.value,
|
||||
Timestamp: d.timestamp,
|
||||
})
|
||||
nPendingSamples++
|
||||
case tExemplar:
|
||||
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, &prompb.Exemplar{
|
||||
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
|
||||
Value: d.value,
|
||||
Timestamp: d.timestamp,
|
||||
})
|
||||
nPendingExemplars++
|
||||
case tHistogram:
|
||||
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
|
||||
nPendingHistograms++
|
||||
case tFloatHistogram:
|
||||
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
|
||||
nPendingHistograms++
|
||||
}
|
||||
}
|
||||
return nPendingSamples, nPendingExemplars, nPendingHistograms
|
||||
}
|
||||
|
||||
func (s *shards) sendSamples(ctx context.Context, samples []*prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
|
||||
begin := time.Now()
|
||||
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf)
|
||||
if err != nil {
|
||||
|
@ -1488,7 +1529,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
|
|||
}
|
||||
|
||||
// sendSamples to the remote storage with backoff for recoverable errors.
|
||||
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
|
||||
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []*prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
|
||||
// Build the WriteRequest with no metadata.
|
||||
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
|
||||
if err != nil {
|
||||
|
@ -1608,7 +1649,7 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
|
|||
}
|
||||
}
|
||||
|
||||
func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
|
||||
func buildWriteRequest(samples []*prompb.TimeSeries, metadata []*prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
|
||||
var highest int64
|
||||
for _, ts := range samples {
|
||||
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"runtime/pprof"
|
||||
"sort"
|
||||
|
@ -609,9 +610,9 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.
|
|||
ZeroCount: 0,
|
||||
Count: 2,
|
||||
Sum: 0,
|
||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
||||
PositiveSpans: []*histogram.Span{{Offset: 0, Length: 1}},
|
||||
PositiveBuckets: []int64{int64(i) + 1},
|
||||
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
||||
NegativeSpans: []*histogram.Span{{Offset: 0, Length: 1}},
|
||||
NegativeBuckets: []int64{int64(-i) - 1},
|
||||
}
|
||||
|
||||
|
@ -647,15 +648,15 @@ func getSeriesNameFromRef(r record.RefSeries) string {
|
|||
}
|
||||
|
||||
type TestWriteClient struct {
|
||||
receivedSamples map[string][]prompb.Sample
|
||||
expectedSamples map[string][]prompb.Sample
|
||||
receivedExemplars map[string][]prompb.Exemplar
|
||||
expectedExemplars map[string][]prompb.Exemplar
|
||||
receivedHistograms map[string][]prompb.Histogram
|
||||
receivedFloatHistograms map[string][]prompb.Histogram
|
||||
expectedHistograms map[string][]prompb.Histogram
|
||||
expectedFloatHistograms map[string][]prompb.Histogram
|
||||
receivedMetadata map[string][]prompb.MetricMetadata
|
||||
receivedSamples map[string][]*prompb.Sample
|
||||
expectedSamples map[string][]*prompb.Sample
|
||||
receivedExemplars map[string][]*prompb.Exemplar
|
||||
expectedExemplars map[string][]*prompb.Exemplar
|
||||
receivedHistograms map[string][]*prompb.Histogram
|
||||
receivedFloatHistograms map[string][]*prompb.Histogram
|
||||
expectedHistograms map[string][]*prompb.Histogram
|
||||
expectedFloatHistograms map[string][]*prompb.Histogram
|
||||
receivedMetadata map[string][]*prompb.MetricMetadata
|
||||
writesReceived int
|
||||
withWaitGroup bool
|
||||
wg sync.WaitGroup
|
||||
|
@ -666,9 +667,9 @@ type TestWriteClient struct {
|
|||
func NewTestWriteClient() *TestWriteClient {
|
||||
return &TestWriteClient{
|
||||
withWaitGroup: true,
|
||||
receivedSamples: map[string][]prompb.Sample{},
|
||||
expectedSamples: map[string][]prompb.Sample{},
|
||||
receivedMetadata: map[string][]prompb.MetricMetadata{},
|
||||
receivedSamples: map[string][]*prompb.Sample{},
|
||||
expectedSamples: map[string][]*prompb.Sample{},
|
||||
receivedMetadata: map[string][]*prompb.MetricMetadata{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -679,12 +680,12 @@ func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.R
|
|||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
c.expectedSamples = map[string][]prompb.Sample{}
|
||||
c.receivedSamples = map[string][]prompb.Sample{}
|
||||
c.expectedSamples = map[string][]*prompb.Sample{}
|
||||
c.receivedSamples = map[string][]*prompb.Sample{}
|
||||
|
||||
for _, s := range ss {
|
||||
seriesName := getSeriesNameFromRef(series[s.Ref])
|
||||
c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], prompb.Sample{
|
||||
c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], &prompb.Sample{
|
||||
Timestamp: s.T,
|
||||
Value: s.V,
|
||||
})
|
||||
|
@ -699,8 +700,8 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco
|
|||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
c.expectedExemplars = map[string][]prompb.Exemplar{}
|
||||
c.receivedExemplars = map[string][]prompb.Exemplar{}
|
||||
c.expectedExemplars = map[string][]*prompb.Exemplar{}
|
||||
c.receivedExemplars = map[string][]*prompb.Exemplar{}
|
||||
|
||||
for _, s := range ss {
|
||||
seriesName := getSeriesNameFromRef(series[s.Ref])
|
||||
|
@ -709,7 +710,7 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco
|
|||
Timestamp: s.T,
|
||||
Value: s.V,
|
||||
}
|
||||
c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e)
|
||||
c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], &e)
|
||||
}
|
||||
c.wg.Add(len(ss))
|
||||
}
|
||||
|
@ -721,8 +722,8 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie
|
|||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
c.expectedHistograms = map[string][]prompb.Histogram{}
|
||||
c.receivedHistograms = map[string][]prompb.Histogram{}
|
||||
c.expectedHistograms = map[string][]*prompb.Histogram{}
|
||||
c.receivedHistograms = map[string][]*prompb.Histogram{}
|
||||
|
||||
for _, h := range hh {
|
||||
seriesName := getSeriesNameFromRef(series[h.Ref])
|
||||
|
@ -738,8 +739,8 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa
|
|||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
c.expectedFloatHistograms = map[string][]prompb.Histogram{}
|
||||
c.receivedFloatHistograms = map[string][]prompb.Histogram{}
|
||||
c.expectedFloatHistograms = map[string][]*prompb.Histogram{}
|
||||
c.receivedFloatHistograms = map[string][]*prompb.Histogram{}
|
||||
|
||||
for _, fh := range fhs {
|
||||
seriesName := getSeriesNameFromRef(series[fh.Ref])
|
||||
|
@ -1322,3 +1323,115 @@ func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) {
|
|||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func createDummyTimeSeries(instances int) []timeSeries {
|
||||
metrics := []labels.Labels{
|
||||
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0"),
|
||||
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.25"),
|
||||
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.5"),
|
||||
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.75"),
|
||||
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "1"),
|
||||
labels.FromStrings("__name__", "go_gc_duration_seconds_sum"),
|
||||
labels.FromStrings("__name__", "go_gc_duration_seconds_count"),
|
||||
labels.FromStrings("__name__", "go_memstats_alloc_bytes_total"),
|
||||
labels.FromStrings("__name__", "go_memstats_frees_total"),
|
||||
labels.FromStrings("__name__", "go_memstats_lookups_total"),
|
||||
labels.FromStrings("__name__", "go_memstats_mallocs_total"),
|
||||
labels.FromStrings("__name__", "go_goroutines"),
|
||||
labels.FromStrings("__name__", "go_info", "version", "go1.19.3"),
|
||||
labels.FromStrings("__name__", "go_memstats_alloc_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_buck_hash_sys_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_gc_sys_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_heap_alloc_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_heap_idle_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_heap_inuse_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_heap_objects"),
|
||||
labels.FromStrings("__name__", "go_memstats_heap_released_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_heap_sys_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_last_gc_time_seconds"),
|
||||
labels.FromStrings("__name__", "go_memstats_mcache_inuse_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_mcache_sys_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_mspan_inuse_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_mspan_sys_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_next_gc_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_other_sys_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_stack_inuse_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_stack_sys_bytes"),
|
||||
labels.FromStrings("__name__", "go_memstats_sys_bytes"),
|
||||
labels.FromStrings("__name__", "go_threads"),
|
||||
}
|
||||
|
||||
commonLabels := labels.FromStrings(
|
||||
"cluster", "some-cluster-0",
|
||||
"container", "prometheus",
|
||||
"job", "some-namespace/prometheus",
|
||||
"namespace", "some-namespace")
|
||||
|
||||
var result []timeSeries
|
||||
r := rand.New(rand.NewSource(0))
|
||||
for i := 0; i < instances; i++ {
|
||||
b := labels.NewBuilder(commonLabels)
|
||||
b.Set("pod", "prometheus-"+strconv.Itoa(i))
|
||||
for _, lbls := range metrics {
|
||||
for _, l := range lbls {
|
||||
b.Set(l.Name, l.Value)
|
||||
}
|
||||
result = append(result, timeSeries{
|
||||
seriesLabels: b.Labels(),
|
||||
value: r.Float64(),
|
||||
})
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func BenchmarkBuildWriteRequest(b *testing.B) {
|
||||
bench := func(b *testing.B, batch []timeSeries) {
|
||||
buff := make([]byte, 0)
|
||||
seriesBuff := make([]*prompb.TimeSeries, len(batch))
|
||||
for i := range seriesBuff {
|
||||
seriesBuff[i] = &prompb.TimeSeries{
|
||||
Samples: []*prompb.Sample{{}},
|
||||
Exemplars: []*prompb.Exemplar{{}},
|
||||
}
|
||||
//seriesBuff[i].Samples = []*prompb.Sample{{}}
|
||||
//seriesBuff[i].Exemplars = []*prompb.Exemplar{{}}
|
||||
}
|
||||
pBuf := proto.NewBuffer(nil)
|
||||
|
||||
//fmt.Printf("series buff: %+v\n", seriesBuff)
|
||||
//Warmup buffers
|
||||
for i := 0; i < 10; i++ {
|
||||
populateTimeSeries(batch, seriesBuff, true, true)
|
||||
buildWriteRequest(seriesBuff, nil, pBuf, buff)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
totalSize := 0
|
||||
for i := 0; i < b.N; i++ {
|
||||
populateTimeSeries(batch, seriesBuff, true, true)
|
||||
req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, buff)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
totalSize += len(req)
|
||||
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
|
||||
}
|
||||
}
|
||||
|
||||
two_batch := createDummyTimeSeries(2)
|
||||
ten_batch := createDummyTimeSeries(10)
|
||||
hundred_batch := createDummyTimeSeries(100)
|
||||
|
||||
b.Run("2 instances", func(b *testing.B) {
|
||||
bench(b, two_batch)
|
||||
})
|
||||
|
||||
b.Run("10 instances", func(b *testing.B) {
|
||||
bench(b, ten_batch)
|
||||
})
|
||||
|
||||
b.Run("1k instances", func(b *testing.B) {
|
||||
bench(b, hundred_batch)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -99,6 +99,10 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
return strings.Compare(a.Name, b.Name)
|
||||
})
|
||||
|
||||
sortedPB := make([]*prompb.Label, 0, len(externalLabels))
|
||||
for _, l := range sortedExternalLabels {
|
||||
sortedPB = append(sortedPB, &l)
|
||||
}
|
||||
responseType, err := NegotiateResponseType(req.AcceptedResponseTypes)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
|
@ -107,10 +111,10 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
switch responseType {
|
||||
case prompb.ReadRequest_STREAMED_XOR_CHUNKS:
|
||||
h.remoteReadStreamedXORChunks(ctx, w, req, externalLabels, sortedExternalLabels)
|
||||
h.remoteReadStreamedXORChunks(ctx, w, req, externalLabels, sortedPB)
|
||||
default:
|
||||
// On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response.
|
||||
h.remoteReadSamples(ctx, w, req, externalLabels, sortedExternalLabels)
|
||||
h.remoteReadSamples(ctx, w, req, externalLabels, sortedPB)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,7 +123,7 @@ func (h *readHandler) remoteReadSamples(
|
|||
w http.ResponseWriter,
|
||||
req *prompb.ReadRequest,
|
||||
externalLabels map[string]string,
|
||||
sortedExternalLabels []prompb.Label,
|
||||
sortedExternalLabels []*prompb.Label,
|
||||
) {
|
||||
w.Header().Set("Content-Type", "application/x-protobuf")
|
||||
w.Header().Set("Content-Encoding", "snappy")
|
||||
|
@ -186,7 +190,7 @@ func (h *readHandler) remoteReadSamples(
|
|||
}
|
||||
}
|
||||
|
||||
func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) {
|
||||
func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []*prompb.Label) {
|
||||
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
|
||||
|
||||
f, ok := w.(http.Flusher)
|
||||
|
|
|
@ -102,14 +102,14 @@ func TestSampledReadEndpoint(t *testing.T) {
|
|||
require.Equal(t, &prompb.QueryResult{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar"},
|
||||
},
|
||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||
Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||
},
|
||||
},
|
||||
}, resp.Results[0])
|
||||
|
@ -117,13 +117,13 @@ func TestSampledReadEndpoint(t *testing.T) {
|
|||
require.Equal(t, &prompb.QueryResult{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_histogram_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
},
|
||||
Histograms: []prompb.Histogram{
|
||||
Histograms: []*prompb.Histogram{
|
||||
FloatHistogramToHistogramProto(0, tsdbutil.GenerateTestFloatHistogram(0)),
|
||||
},
|
||||
},
|
||||
|
@ -297,14 +297,14 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
{
|
||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar1"},
|
||||
},
|
||||
Chunks: []prompb.Chunk{
|
||||
Chunks: []*prompb.Chunk{
|
||||
{
|
||||
Type: prompb.Chunk_XOR,
|
||||
MaxTimeMs: 7140000,
|
||||
|
@ -317,14 +317,14 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
{
|
||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar2"},
|
||||
},
|
||||
Chunks: []prompb.Chunk{
|
||||
Chunks: []*prompb.Chunk{
|
||||
{
|
||||
Type: prompb.Chunk_XOR,
|
||||
MaxTimeMs: 7140000,
|
||||
|
@ -343,14 +343,14 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
{
|
||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar3"},
|
||||
},
|
||||
Chunks: []prompb.Chunk{
|
||||
Chunks: []*prompb.Chunk{
|
||||
{
|
||||
Type: prompb.Chunk_XOR,
|
||||
MaxTimeMs: 7140000,
|
||||
|
@ -369,14 +369,14 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
{
|
||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar3"},
|
||||
},
|
||||
Chunks: []prompb.Chunk{
|
||||
Chunks: []*prompb.Chunk{
|
||||
{
|
||||
Type: prompb.Chunk_XOR,
|
||||
MinTimeMs: 14400000,
|
||||
|
@ -390,14 +390,14 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
{
|
||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar1"},
|
||||
},
|
||||
Chunks: []prompb.Chunk{
|
||||
Chunks: []*prompb.Chunk{
|
||||
{
|
||||
Type: prompb.Chunk_XOR,
|
||||
MaxTimeMs: 7140000,
|
||||
|
@ -411,13 +411,13 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
{
|
||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_histogram_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
},
|
||||
Chunks: []prompb.Chunk{
|
||||
Chunks: []*prompb.Chunk{
|
||||
{
|
||||
Type: prompb.Chunk_FLOAT_HISTOGRAM,
|
||||
MaxTimeMs: 1440000,
|
||||
|
|
|
@ -237,9 +237,9 @@ func TestSampleAndChunkQueryableClient(t *testing.T) {
|
|||
m := &mockedRemoteClient{
|
||||
// Samples does not matter for below tests.
|
||||
store: []*prompb.TimeSeries{
|
||||
{Labels: []prompb.Label{{Name: "a", Value: "b"}}},
|
||||
{Labels: []prompb.Label{{Name: "a", Value: "b3"}, {Name: "region", Value: "us"}}},
|
||||
{Labels: []prompb.Label{{Name: "a", Value: "b2"}, {Name: "region", Value: "europe"}}},
|
||||
{Labels: []*prompb.Label{{Name: "a", Value: "b"}}},
|
||||
{Labels: []*prompb.Label{{Name: "a", Value: "b3"}, {Name: "region", Value: "us"}}},
|
||||
{Labels: []*prompb.Label{{Name: "a", Value: "b2"}, {Name: "region", Value: "europe"}}},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -214,10 +214,10 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", errs)
|
||||
}
|
||||
|
||||
prwMetrics := make([]prompb.TimeSeries, 0, len(prwMetricsMap))
|
||||
prwMetrics := make([]*prompb.TimeSeries, 0, len(prwMetricsMap))
|
||||
|
||||
for _, ts := range prwMetricsMap {
|
||||
prwMetrics = append(prwMetrics, *ts)
|
||||
prwMetrics = append(prwMetrics, ts)
|
||||
}
|
||||
|
||||
err = h.rwHandler.write(r.Context(), &prompb.WriteRequest{
|
||||
|
|
|
@ -84,9 +84,9 @@ func TestRemoteWriteHandler(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOutOfOrderSample(t *testing.T) {
|
||||
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
|
||||
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||
buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
|
||||
Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||
Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||
}}, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -109,9 +109,9 @@ func TestOutOfOrderSample(t *testing.T) {
|
|||
// don't fail on ingestion errors since the exemplar storage is
|
||||
// still experimental.
|
||||
func TestOutOfOrderExemplar(t *testing.T) {
|
||||
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
|
||||
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
|
||||
buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
|
||||
Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||
Exemplars: []*prompb.Exemplar{{Labels: []*prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
|
||||
}}, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -132,9 +132,9 @@ func TestOutOfOrderExemplar(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOutOfOrderHistogram(t *testing.T) {
|
||||
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
|
||||
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
|
||||
buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
|
||||
Labels: []*prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||
Histograms: []*prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
|
||||
}}, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -158,12 +158,12 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
|
|||
reqs := []*http.Request{}
|
||||
for i := 0; i < b.N; i++ {
|
||||
num := strings.Repeat(strconv.Itoa(i), 16)
|
||||
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
|
||||
Labels: []prompb.Label{
|
||||
buf, _, err := buildWriteRequest([]*prompb.TimeSeries{{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric"},
|
||||
{Name: "test_label_name_" + num, Value: labelValue + num},
|
||||
},
|
||||
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
|
||||
Histograms: []*prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
|
||||
}}, nil, nil, nil)
|
||||
require.NoError(b, err)
|
||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||
|
@ -249,12 +249,12 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
func genSeriesWithSample(numSeries int, ts int64) []prompb.TimeSeries {
|
||||
var series []prompb.TimeSeries
|
||||
func genSeriesWithSample(numSeries int, ts int64) []*prompb.TimeSeries {
|
||||
var series []*prompb.TimeSeries
|
||||
for i := 0; i < numSeries; i++ {
|
||||
s := prompb.TimeSeries{
|
||||
Labels: []prompb.Label{{Name: "__name__", Value: fmt.Sprintf("test_metric_%d", i)}},
|
||||
Samples: []prompb.Sample{{Value: float64(i), Timestamp: ts}},
|
||||
s := &prompb.TimeSeries{
|
||||
Labels: []*prompb.Label{{Name: "__name__", Value: fmt.Sprintf("test_metric_%d", i)}},
|
||||
Samples: []*prompb.Sample{{Value: float64(i), Timestamp: ts}},
|
||||
}
|
||||
series = append(series, s)
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ func (c *FloatHistogramChunk) NumSamples() int {
|
|||
// least one sample.
|
||||
func (c *FloatHistogramChunk) Layout() (
|
||||
schema int32, zeroThreshold float64,
|
||||
negativeSpans, positiveSpans []histogram.Span,
|
||||
negativeSpans, positiveSpans []*histogram.Span,
|
||||
err error,
|
||||
) {
|
||||
if c.NumSamples() == 0 {
|
||||
|
@ -186,7 +186,7 @@ type FloatHistogramAppender struct {
|
|||
// Layout:
|
||||
schema int32
|
||||
zThreshold float64
|
||||
pSpans, nSpans []histogram.Span
|
||||
pSpans, nSpans []*histogram.Span
|
||||
|
||||
t, tDelta int64
|
||||
sum, cnt, zCnt xorValue
|
||||
|
@ -304,7 +304,7 @@ func (a *FloatHistogramAppender) appendable(h *histogram.FloatHistogram) (
|
|||
func (a *FloatHistogramAppender) appendableGauge(h *histogram.FloatHistogram) (
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
backwardPositiveInserts, backwardNegativeInserts []Insert,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
positiveSpans, negativeSpans []*histogram.Span,
|
||||
okToAppend bool,
|
||||
) {
|
||||
if a.NumSamples() > 0 && a.GetCounterResetHeader() != GaugeType {
|
||||
|
@ -334,7 +334,7 @@ func (a *FloatHistogramAppender) appendableGauge(h *histogram.FloatHistogram) (
|
|||
// counterResetInAnyFloatBucket returns true if there was a counter reset for any
|
||||
// bucket. This should be called only when the bucket layout is the same or new
|
||||
// buckets were added. It does not handle the case of buckets missing.
|
||||
func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, oldSpans, newSpans []histogram.Span) bool {
|
||||
func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, oldSpans, newSpans []*histogram.Span) bool {
|
||||
if len(oldSpans) == 0 || len(oldBuckets) == 0 {
|
||||
return false
|
||||
}
|
||||
|
@ -423,13 +423,13 @@ func (a *FloatHistogramAppender) appendFloatHistogram(t int64, h *histogram.Floa
|
|||
a.zThreshold = h.ZeroThreshold
|
||||
|
||||
if len(h.PositiveSpans) > 0 {
|
||||
a.pSpans = make([]histogram.Span, len(h.PositiveSpans))
|
||||
a.pSpans = make([]*histogram.Span, len(h.PositiveSpans))
|
||||
copy(a.pSpans, h.PositiveSpans)
|
||||
} else {
|
||||
a.pSpans = nil
|
||||
}
|
||||
if len(h.NegativeSpans) > 0 {
|
||||
a.nSpans = make([]histogram.Span, len(h.NegativeSpans))
|
||||
a.nSpans = make([]*histogram.Span, len(h.NegativeSpans))
|
||||
copy(a.nSpans, h.NegativeSpans)
|
||||
} else {
|
||||
a.nSpans = nil
|
||||
|
@ -510,7 +510,7 @@ func (a *FloatHistogramAppender) writeXorValue(old *xorValue, v float64) {
|
|||
// this method.
|
||||
func (a *FloatHistogramAppender) recode(
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
positiveSpans, negativeSpans []*histogram.Span,
|
||||
) (Chunk, Appender) {
|
||||
// TODO(beorn7): This currently just decodes everything and then encodes
|
||||
// it again with the new span layout. This can probably be done in-place
|
||||
|
@ -688,7 +688,7 @@ type floatHistogramIterator struct {
|
|||
// Layout:
|
||||
schema int32
|
||||
zThreshold float64
|
||||
pSpans, nSpans []histogram.Span
|
||||
pSpans, nSpans []*histogram.Span
|
||||
|
||||
// For the fields that are tracked as deltas and ultimately dod's.
|
||||
t int64
|
||||
|
|
|
@ -64,7 +64,7 @@ func (c *HistogramChunk) NumSamples() int {
|
|||
// least one sample.
|
||||
func (c *HistogramChunk) Layout() (
|
||||
schema int32, zeroThreshold float64,
|
||||
negativeSpans, positiveSpans []histogram.Span,
|
||||
negativeSpans, positiveSpans []*histogram.Span,
|
||||
err error,
|
||||
) {
|
||||
if c.NumSamples() == 0 {
|
||||
|
@ -148,7 +148,7 @@ func (c *HistogramChunk) Appender() (Appender, error) {
|
|||
return a, nil
|
||||
}
|
||||
|
||||
func countSpans(spans []histogram.Span) int {
|
||||
func countSpans(spans []*histogram.Span) int {
|
||||
var cnt int
|
||||
for _, s := range spans {
|
||||
cnt += int(s.Length)
|
||||
|
@ -193,7 +193,7 @@ type HistogramAppender struct {
|
|||
// Layout:
|
||||
schema int32
|
||||
zThreshold float64
|
||||
pSpans, nSpans []histogram.Span
|
||||
pSpans, nSpans []*histogram.Span
|
||||
|
||||
// Although we intend to start new chunks on counter resets, we still
|
||||
// have to handle negative deltas for gauge histograms. Therefore, even
|
||||
|
@ -324,7 +324,7 @@ func (a *HistogramAppender) appendable(h *histogram.Histogram) (
|
|||
func (a *HistogramAppender) appendableGauge(h *histogram.Histogram) (
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
backwardPositiveInserts, backwardNegativeInserts []Insert,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
positiveSpans, negativeSpans []*histogram.Span,
|
||||
okToAppend bool,
|
||||
) {
|
||||
if a.NumSamples() > 0 && a.GetCounterResetHeader() != GaugeType {
|
||||
|
@ -354,7 +354,7 @@ func (a *HistogramAppender) appendableGauge(h *histogram.Histogram) (
|
|||
// counterResetInAnyBucket returns true if there was a counter reset for any
|
||||
// bucket. This should be called only when the bucket layout is the same or new
|
||||
// buckets were added. It does not handle the case of buckets missing.
|
||||
func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans []histogram.Span) bool {
|
||||
func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans []*histogram.Span) bool {
|
||||
if len(oldSpans) == 0 || len(oldBuckets) == 0 {
|
||||
return false
|
||||
}
|
||||
|
@ -443,13 +443,13 @@ func (a *HistogramAppender) appendHistogram(t int64, h *histogram.Histogram) {
|
|||
a.zThreshold = h.ZeroThreshold
|
||||
|
||||
if len(h.PositiveSpans) > 0 {
|
||||
a.pSpans = make([]histogram.Span, len(h.PositiveSpans))
|
||||
a.pSpans = make([]*histogram.Span, len(h.PositiveSpans))
|
||||
copy(a.pSpans, h.PositiveSpans)
|
||||
} else {
|
||||
a.pSpans = nil
|
||||
}
|
||||
if len(h.NegativeSpans) > 0 {
|
||||
a.nSpans = make([]histogram.Span, len(h.NegativeSpans))
|
||||
a.nSpans = make([]*histogram.Span, len(h.NegativeSpans))
|
||||
copy(a.nSpans, h.NegativeSpans)
|
||||
} else {
|
||||
a.nSpans = nil
|
||||
|
@ -541,7 +541,7 @@ func (a *HistogramAppender) appendHistogram(t int64, h *histogram.Histogram) {
|
|||
// this method.
|
||||
func (a *HistogramAppender) recode(
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
positiveSpans, negativeSpans []*histogram.Span,
|
||||
) (Chunk, Appender) {
|
||||
// TODO(beorn7): This currently just decodes everything and then encodes
|
||||
// it again with the new span layout. This can probably be done in-place
|
||||
|
@ -736,7 +736,7 @@ type histogramIterator struct {
|
|||
// Layout:
|
||||
schema int32
|
||||
zThreshold float64
|
||||
pSpans, nSpans []histogram.Span
|
||||
pSpans, nSpans []*histogram.Span
|
||||
|
||||
// For the fields that are tracked as deltas and ultimately dod's.
|
||||
t int64
|
||||
|
|
|
@ -21,7 +21,7 @@ import (
|
|||
|
||||
func writeHistogramChunkLayout(
|
||||
b *bstream, schema int32, zeroThreshold float64,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
positiveSpans, negativeSpans []*histogram.Span,
|
||||
) {
|
||||
putZeroThreshold(b, zeroThreshold)
|
||||
putVarbitInt(b, int64(schema))
|
||||
|
@ -31,7 +31,7 @@ func writeHistogramChunkLayout(
|
|||
|
||||
func readHistogramChunkLayout(b *bstreamReader) (
|
||||
schema int32, zeroThreshold float64,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
positiveSpans, negativeSpans []*histogram.Span,
|
||||
err error,
|
||||
) {
|
||||
zeroThreshold, err = readZeroThreshold(b)
|
||||
|
@ -58,7 +58,7 @@ func readHistogramChunkLayout(b *bstreamReader) (
|
|||
return
|
||||
}
|
||||
|
||||
func putHistogramChunkLayoutSpans(b *bstream, spans []histogram.Span) {
|
||||
func putHistogramChunkLayoutSpans(b *bstream, spans []*histogram.Span) {
|
||||
putVarbitUint(b, uint64(len(spans)))
|
||||
for _, s := range spans {
|
||||
putVarbitUint(b, uint64(s.Length))
|
||||
|
@ -66,8 +66,8 @@ func putHistogramChunkLayoutSpans(b *bstream, spans []histogram.Span) {
|
|||
}
|
||||
}
|
||||
|
||||
func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
|
||||
var spans []histogram.Span
|
||||
func readHistogramChunkLayoutSpans(b *bstreamReader) ([]*histogram.Span, error) {
|
||||
var spans []*histogram.Span
|
||||
num, err := readVarbitUint(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -84,7 +84,7 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
spans = append(spans, histogram.Span{
|
||||
spans = append(spans, &histogram.Span{
|
||||
Length: uint32(length),
|
||||
Offset: int32(offset),
|
||||
})
|
||||
|
@ -141,13 +141,13 @@ func readZeroThreshold(br *bstreamReader) (float64, error) {
|
|||
}
|
||||
|
||||
type bucketIterator struct {
|
||||
spans []histogram.Span
|
||||
spans []*histogram.Span
|
||||
span int // Span position of last yielded bucket.
|
||||
bucket int // Bucket position within span of last yielded bucket.
|
||||
idx int // Bucket index (globally across all spans) of last yielded bucket.
|
||||
}
|
||||
|
||||
func newBucketIterator(spans []histogram.Span) *bucketIterator {
|
||||
func newBucketIterator(spans []*histogram.Span) *bucketIterator {
|
||||
b := bucketIterator{
|
||||
spans: spans,
|
||||
span: 0,
|
||||
|
@ -232,7 +232,7 @@ type Insert struct {
|
|||
// spans themselves, thanks to the iterators we get to work with the more useful
|
||||
// bucket indices (which of course directly correspond to the buckets we have to
|
||||
// adjust).
|
||||
func expandSpansForward(a, b []histogram.Span) (forward []Insert, ok bool) {
|
||||
func expandSpansForward(a, b []*histogram.Span) (forward []Insert, ok bool) {
|
||||
ai := newBucketIterator(a)
|
||||
bi := newBucketIterator(b)
|
||||
|
||||
|
@ -285,7 +285,7 @@ loop:
|
|||
// “forward” inserts to expand 'a' to also cover all the buckets exclusively
|
||||
// covered by 'b', and it returns the “backward” inserts to expand 'b' to also
|
||||
// cover all the buckets exclusively covered by 'a'.
|
||||
func expandSpansBothWays(a, b []histogram.Span) (forward, backward []Insert, mergedSpans []histogram.Span) {
|
||||
func expandSpansBothWays(a, b []*histogram.Span) (forward, backward []Insert, mergedSpans []*histogram.Span) {
|
||||
ai := newBucketIterator(a)
|
||||
bi := newBucketIterator(b)
|
||||
|
||||
|
@ -299,7 +299,7 @@ func expandSpansBothWays(a, b []histogram.Span) (forward, backward []Insert, mer
|
|||
if len(mergedSpans) == 0 {
|
||||
offset++
|
||||
}
|
||||
mergedSpans = append(mergedSpans, histogram.Span{
|
||||
mergedSpans = append(mergedSpans, &histogram.Span{
|
||||
Offset: int32(offset),
|
||||
Length: 1,
|
||||
})
|
||||
|
@ -490,7 +490,7 @@ func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterR
|
|||
|
||||
// Handle pathological case of empty span when advancing span idx.
|
||||
// Call it with idx==-1 to find the first non empty span.
|
||||
func nextNonEmptySpanSliceIdx(idx int, bucketIdx int32, spans []histogram.Span) (newIdx int, newBucketIdx int32) {
|
||||
func nextNonEmptySpanSliceIdx(idx int, bucketIdx int32, spans []*histogram.Span) (newIdx int, newBucketIdx int32) {
|
||||
for idx++; idx < len(spans); idx++ {
|
||||
if spans[idx].Length > 0 {
|
||||
return idx, bucketIdx + spans[idx].Offset + 1
|
||||
|
|
|
@ -470,7 +470,7 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) {
|
|||
|
||||
l := buf.Uvarint()
|
||||
if l > 0 {
|
||||
h.PositiveSpans = make([]histogram.Span, l)
|
||||
h.PositiveSpans = make([]*histogram.Span, l)
|
||||
}
|
||||
for i := range h.PositiveSpans {
|
||||
h.PositiveSpans[i].Offset = int32(buf.Varint64())
|
||||
|
@ -479,7 +479,7 @@ func DecodeHistogram(buf *encoding.Decbuf, h *histogram.Histogram) {
|
|||
|
||||
l = buf.Uvarint()
|
||||
if l > 0 {
|
||||
h.NegativeSpans = make([]histogram.Span, l)
|
||||
h.NegativeSpans = make([]*histogram.Span, l)
|
||||
}
|
||||
for i := range h.NegativeSpans {
|
||||
h.NegativeSpans[i].Offset = int32(buf.Varint64())
|
||||
|
@ -552,7 +552,7 @@ func DecodeFloatHistogram(buf *encoding.Decbuf, fh *histogram.FloatHistogram) {
|
|||
|
||||
l := buf.Uvarint()
|
||||
if l > 0 {
|
||||
fh.PositiveSpans = make([]histogram.Span, l)
|
||||
fh.PositiveSpans = make([]*histogram.Span, l)
|
||||
}
|
||||
for i := range fh.PositiveSpans {
|
||||
fh.PositiveSpans[i].Offset = int32(buf.Varint64())
|
||||
|
@ -561,7 +561,7 @@ func DecodeFloatHistogram(buf *encoding.Decbuf, fh *histogram.FloatHistogram) {
|
|||
|
||||
l = buf.Uvarint()
|
||||
if l > 0 {
|
||||
fh.NegativeSpans = make([]histogram.Span, l)
|
||||
fh.NegativeSpans = make([]*histogram.Span, l)
|
||||
}
|
||||
for i := range fh.NegativeSpans {
|
||||
fh.NegativeSpans[i].Offset = int32(buf.Varint64())
|
||||
|
|
|
@ -38,12 +38,12 @@ func GenerateTestHistogram(i int) *histogram.Histogram {
|
|||
ZeroThreshold: 0.001,
|
||||
Sum: 18.4 * float64(i+1),
|
||||
Schema: 1,
|
||||
PositiveSpans: []histogram.Span{
|
||||
PositiveSpans: []*histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
|
||||
NegativeSpans: []histogram.Span{
|
||||
NegativeSpans: []*histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
|
@ -84,12 +84,12 @@ func GenerateTestFloatHistogram(i int) *histogram.FloatHistogram {
|
|||
ZeroThreshold: 0.001,
|
||||
Sum: 18.4 * float64(i+1),
|
||||
Schema: 1,
|
||||
PositiveSpans: []histogram.Span{
|
||||
PositiveSpans: []*histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)},
|
||||
NegativeSpans: []histogram.Span{
|
||||
NegativeSpans: []*histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
|
|
|
@ -69,7 +69,7 @@ func MetricFamiliesToWriteRequest(mf map[string]*dto.MetricFamily, extraLabels m
|
|||
for _, metricName := range sortedMetricNames {
|
||||
// Set metadata writerequest
|
||||
mtype := MetricMetadataTypeValue[mf[metricName].Type.String()]
|
||||
metadata := prompb.MetricMetadata{
|
||||
metadata := &prompb.MetricMetadata{
|
||||
MetricFamilyName: mf[metricName].GetName(),
|
||||
Type: prompb.MetricMetadata_MetricType(mtype),
|
||||
Help: mf[metricName].GetHelp(),
|
||||
|
@ -87,9 +87,9 @@ func MetricFamiliesToWriteRequest(mf map[string]*dto.MetricFamily, extraLabels m
|
|||
}
|
||||
|
||||
func toTimeseries(wr *prompb.WriteRequest, labels map[string]string, timestamp int64, value float64) {
|
||||
var ts prompb.TimeSeries
|
||||
var ts *prompb.TimeSeries
|
||||
ts.Labels = makeLabels(labels)
|
||||
ts.Samples = []prompb.Sample{
|
||||
ts.Samples = []*prompb.Sample{
|
||||
{
|
||||
Timestamp: timestamp,
|
||||
Value: value,
|
||||
|
@ -161,7 +161,7 @@ func makeTimeseries(wr *prompb.WriteRequest, labels map[string]string, m *dto.Me
|
|||
return err
|
||||
}
|
||||
|
||||
func makeLabels(labelsMap map[string]string) []prompb.Label {
|
||||
func makeLabels(labelsMap map[string]string) []*prompb.Label {
|
||||
// build labels name list
|
||||
sortedLabelNames := make([]string, 0, len(labelsMap))
|
||||
for label := range labelsMap {
|
||||
|
@ -170,9 +170,9 @@ func makeLabels(labelsMap map[string]string) []prompb.Label {
|
|||
// sort labels name in lexicographical order
|
||||
sort.Strings(sortedLabelNames)
|
||||
|
||||
var labels []prompb.Label
|
||||
var labels []*prompb.Label
|
||||
for _, label := range sortedLabelNames {
|
||||
labels = append(labels, prompb.Label{
|
||||
labels = append(labels, &prompb.Label{
|
||||
Name: label,
|
||||
Value: labelsMap[label],
|
||||
})
|
||||
|
|
Loading…
Reference in a new issue