mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
[PRW 2.0] (part3) moved type specific conversions to prompb and writev2 codecs.
Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
4dbcdd19da
commit
c5faeb9511
198
prompb/codec.go
Normal file
198
prompb/codec.go
Normal file
|
@ -0,0 +1,198 @@
|
||||||
|
// Copyright 2024 Prometheus Team
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package prompb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NOTE(bwplotka): This file's code is tested in /prompb/rwcommon.
|
||||||
|
|
||||||
|
// ToLabels return model labels.Labels from timeseries' remote labels.
|
||||||
|
func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, _ []string) labels.Labels {
|
||||||
|
return labelProtosToLabels(b, m.GetLabels())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToLabels return model labels.Labels from timeseries' remote labels.
|
||||||
|
func (m ChunkedSeries) ToLabels(b *labels.ScratchBuilder, _ []string) labels.Labels {
|
||||||
|
return labelProtosToLabels(b, m.GetLabels())
|
||||||
|
}
|
||||||
|
|
||||||
|
func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []Label) labels.Labels {
|
||||||
|
b.Reset()
|
||||||
|
for _, l := range labelPairs {
|
||||||
|
b.Add(l.Name, l.Value)
|
||||||
|
}
|
||||||
|
b.Sort()
|
||||||
|
return b.Labels()
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromLabels transforms labels into prompb labels. The buffer slice
|
||||||
|
// will be used to avoid allocations if it is big enough to store the labels.
|
||||||
|
func FromLabels(lbls labels.Labels, buf []Label) []Label {
|
||||||
|
result := buf[:0]
|
||||||
|
lbls.Range(func(l labels.Label) {
|
||||||
|
result = append(result, Label{
|
||||||
|
Name: l.Name,
|
||||||
|
Value: l.Value,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromMetadataType transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum.
|
||||||
|
func FromMetadataType(t model.MetricType) MetricMetadata_MetricType {
|
||||||
|
mt := strings.ToUpper(string(t))
|
||||||
|
v, ok := MetricMetadata_MetricType_value[mt]
|
||||||
|
if !ok {
|
||||||
|
return MetricMetadata_UNKNOWN
|
||||||
|
}
|
||||||
|
return MetricMetadata_MetricType(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsFloatHistogram returns true if the histogram is float.
|
||||||
|
func (h Histogram) IsFloatHistogram() bool {
|
||||||
|
_, ok := h.GetCount().(*Histogram_CountFloat)
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToIntHistogram returns integer Prometheus histogram from the remote implementation
|
||||||
|
// of integer histogram. It's a caller responsibility to check if it's not a float histogram.
|
||||||
|
func (h Histogram) ToIntHistogram() *histogram.Histogram {
|
||||||
|
return &histogram.Histogram{
|
||||||
|
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
|
||||||
|
Schema: h.Schema,
|
||||||
|
ZeroThreshold: h.ZeroThreshold,
|
||||||
|
ZeroCount: h.GetZeroCountInt(),
|
||||||
|
Count: h.GetCountInt(),
|
||||||
|
Sum: h.Sum,
|
||||||
|
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
|
||||||
|
PositiveBuckets: h.GetPositiveDeltas(),
|
||||||
|
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
|
||||||
|
NegativeBuckets: h.GetNegativeDeltas(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToFloatHistogram returns float Prometheus histogram from the remote implementation
|
||||||
|
// of float histogram. If the underlying implementation is an integer histogram, a
|
||||||
|
// conversion is performed.
|
||||||
|
func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram {
|
||||||
|
if h.IsFloatHistogram() {
|
||||||
|
return &histogram.FloatHistogram{
|
||||||
|
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
|
||||||
|
Schema: h.Schema,
|
||||||
|
ZeroThreshold: h.ZeroThreshold,
|
||||||
|
ZeroCount: h.GetZeroCountFloat(),
|
||||||
|
Count: h.GetCountFloat(),
|
||||||
|
Sum: h.Sum,
|
||||||
|
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
|
||||||
|
PositiveBuckets: h.GetPositiveCounts(),
|
||||||
|
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
|
||||||
|
NegativeBuckets: h.GetNegativeCounts(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Conversion from integer histogram.
|
||||||
|
return &histogram.FloatHistogram{
|
||||||
|
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
|
||||||
|
Schema: h.Schema,
|
||||||
|
ZeroThreshold: h.ZeroThreshold,
|
||||||
|
ZeroCount: float64(h.GetZeroCountInt()),
|
||||||
|
Count: float64(h.GetCountInt()),
|
||||||
|
Sum: h.Sum,
|
||||||
|
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
|
||||||
|
PositiveBuckets: deltasToCounts(h.GetPositiveDeltas()),
|
||||||
|
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
|
||||||
|
NegativeBuckets: deltasToCounts(h.GetNegativeDeltas()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func spansProtoToSpans(s []BucketSpan) []histogram.Span {
|
||||||
|
spans := make([]histogram.Span, len(s))
|
||||||
|
for i := 0; i < len(s); i++ {
|
||||||
|
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
|
||||||
|
}
|
||||||
|
|
||||||
|
return spans
|
||||||
|
}
|
||||||
|
|
||||||
|
func deltasToCounts(deltas []int64) []float64 {
|
||||||
|
counts := make([]float64, len(deltas))
|
||||||
|
var cur float64
|
||||||
|
for i, d := range deltas {
|
||||||
|
cur += float64(d)
|
||||||
|
counts[i] = cur
|
||||||
|
}
|
||||||
|
return counts
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromIntHistogram returns remote Histogram from the integer Histogram.
|
||||||
|
func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram {
|
||||||
|
return Histogram{
|
||||||
|
Count: &Histogram_CountInt{CountInt: h.Count},
|
||||||
|
Sum: h.Sum,
|
||||||
|
Schema: h.Schema,
|
||||||
|
ZeroThreshold: h.ZeroThreshold,
|
||||||
|
ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
|
||||||
|
NegativeSpans: spansToSpansProto(h.NegativeSpans),
|
||||||
|
NegativeDeltas: h.NegativeBuckets,
|
||||||
|
PositiveSpans: spansToSpansProto(h.PositiveSpans),
|
||||||
|
PositiveDeltas: h.PositiveBuckets,
|
||||||
|
ResetHint: Histogram_ResetHint(h.CounterResetHint),
|
||||||
|
Timestamp: timestamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromFloatHistogram returns remote Histogram from the float Histogram.
|
||||||
|
func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram {
|
||||||
|
return Histogram{
|
||||||
|
Count: &Histogram_CountFloat{CountFloat: fh.Count},
|
||||||
|
Sum: fh.Sum,
|
||||||
|
Schema: fh.Schema,
|
||||||
|
ZeroThreshold: fh.ZeroThreshold,
|
||||||
|
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
|
||||||
|
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
|
||||||
|
NegativeCounts: fh.NegativeBuckets,
|
||||||
|
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
|
||||||
|
PositiveCounts: fh.PositiveBuckets,
|
||||||
|
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
|
||||||
|
Timestamp: timestamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func spansToSpansProto(s []histogram.Span) []BucketSpan {
|
||||||
|
spans := make([]BucketSpan, len(s))
|
||||||
|
for i := 0; i < len(s); i++ {
|
||||||
|
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
|
||||||
|
}
|
||||||
|
|
||||||
|
return spans
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToExemplar converts remote exemplar to model exemplar.
|
||||||
|
func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, _ []string) exemplar.Exemplar {
|
||||||
|
timestamp := m.Timestamp
|
||||||
|
|
||||||
|
return exemplar.Exemplar{
|
||||||
|
Labels: labelProtosToLabels(b, m.GetLabels()),
|
||||||
|
Value: m.Value,
|
||||||
|
Ts: timestamp,
|
||||||
|
HasTs: timestamp != 0,
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,14 +17,6 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m Sample) T() int64 { return m.Timestamp }
|
|
||||||
func (m Sample) V() float64 { return m.Value }
|
|
||||||
|
|
||||||
func (h Histogram) IsFloatHistogram() bool {
|
|
||||||
_, ok := h.GetCount().(*Histogram_CountFloat)
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
|
func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
|
||||||
size := r.Size()
|
size := r.Size()
|
||||||
data, ok := p.Get().(*[]byte)
|
data, ok := p.Get().(*[]byte)
|
||||||
|
|
183
prompb/io/prometheus/write/v2/codec.go
Normal file
183
prompb/io/prometheus/write/v2/codec.go
Normal file
|
@ -0,0 +1,183 @@
|
||||||
|
// Copyright 2024 Prometheus Team
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package writev2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/model/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NOTE(bwplotka): This file's code is tested in /prompb/rwcommon.
|
||||||
|
|
||||||
|
// ToLabels return model labels.Labels from timeseries' remote labels.
|
||||||
|
func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, symbols []string) labels.Labels {
|
||||||
|
return desymbolizeLabels(b, m.GetLabelsRefs(), symbols)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToMetadata return model metadata from timeseries' remote metadata.
|
||||||
|
func (m TimeSeries) ToMetadata(symbols []string) metadata.Metadata {
|
||||||
|
mt := strings.ToLower(m.Metadata.Type.String())
|
||||||
|
return metadata.Metadata{
|
||||||
|
Type: model.MetricType(mt), // TODO(@tpaschalis) a better way for this?
|
||||||
|
Unit: symbols[m.Metadata.UnitRef],
|
||||||
|
Help: symbols[m.Metadata.HelpRef],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromMetadataType transforms a Prometheus metricType into writev2 metricType. Since the former is a string we need to transform it to an enum.
|
||||||
|
func FromMetadataType(t model.MetricType) Metadata_MetricType {
|
||||||
|
mt := strings.ToUpper(string(t))
|
||||||
|
v, ok := prompb.MetricMetadata_MetricType_value[mt]
|
||||||
|
if !ok {
|
||||||
|
return Metadata_METRIC_TYPE_UNSPECIFIED
|
||||||
|
}
|
||||||
|
return Metadata_MetricType(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsFloatHistogram returns true if the histogram is float.
|
||||||
|
func (h Histogram) IsFloatHistogram() bool {
|
||||||
|
_, ok := h.GetCount().(*Histogram_CountFloat)
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToIntHistogram returns integer Prometheus histogram from the remote implementation
|
||||||
|
// of integer histogram. It's a caller responsibility to check if it's not a float histogram.
|
||||||
|
func (h Histogram) ToIntHistogram() *histogram.Histogram {
|
||||||
|
return &histogram.Histogram{
|
||||||
|
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
|
||||||
|
Schema: h.Schema,
|
||||||
|
ZeroThreshold: h.ZeroThreshold,
|
||||||
|
ZeroCount: h.GetZeroCountInt(),
|
||||||
|
Count: h.GetCountInt(),
|
||||||
|
Sum: h.Sum,
|
||||||
|
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
|
||||||
|
PositiveBuckets: h.GetPositiveDeltas(),
|
||||||
|
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
|
||||||
|
NegativeBuckets: h.GetNegativeDeltas(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToFloatHistogram returns float Prometheus histogram from the remote implementation
|
||||||
|
// of float histogram. If the underlying implementation is an integer histogram, a
|
||||||
|
// conversion is performed.
|
||||||
|
func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram {
|
||||||
|
if h.IsFloatHistogram() {
|
||||||
|
return &histogram.FloatHistogram{
|
||||||
|
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
|
||||||
|
Schema: h.Schema,
|
||||||
|
ZeroThreshold: h.ZeroThreshold,
|
||||||
|
ZeroCount: h.GetZeroCountFloat(),
|
||||||
|
Count: h.GetCountFloat(),
|
||||||
|
Sum: h.Sum,
|
||||||
|
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
|
||||||
|
PositiveBuckets: h.GetPositiveCounts(),
|
||||||
|
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
|
||||||
|
NegativeBuckets: h.GetNegativeCounts(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Conversion from integer histogram.
|
||||||
|
return &histogram.FloatHistogram{
|
||||||
|
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
|
||||||
|
Schema: h.Schema,
|
||||||
|
ZeroThreshold: h.ZeroThreshold,
|
||||||
|
ZeroCount: float64(h.GetZeroCountInt()),
|
||||||
|
Count: float64(h.GetCountInt()),
|
||||||
|
Sum: h.Sum,
|
||||||
|
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
|
||||||
|
PositiveBuckets: deltasToCounts(h.GetPositiveDeltas()),
|
||||||
|
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
|
||||||
|
NegativeBuckets: deltasToCounts(h.GetNegativeDeltas()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func spansProtoToSpans(s []BucketSpan) []histogram.Span {
|
||||||
|
spans := make([]histogram.Span, len(s))
|
||||||
|
for i := 0; i < len(s); i++ {
|
||||||
|
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
|
||||||
|
}
|
||||||
|
|
||||||
|
return spans
|
||||||
|
}
|
||||||
|
|
||||||
|
func deltasToCounts(deltas []int64) []float64 {
|
||||||
|
counts := make([]float64, len(deltas))
|
||||||
|
var cur float64
|
||||||
|
for i, d := range deltas {
|
||||||
|
cur += float64(d)
|
||||||
|
counts[i] = cur
|
||||||
|
}
|
||||||
|
return counts
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromIntHistogram returns remote Histogram from the integer Histogram.
|
||||||
|
func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram {
|
||||||
|
return Histogram{
|
||||||
|
Count: &Histogram_CountInt{CountInt: h.Count},
|
||||||
|
Sum: h.Sum,
|
||||||
|
Schema: h.Schema,
|
||||||
|
ZeroThreshold: h.ZeroThreshold,
|
||||||
|
ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
|
||||||
|
NegativeSpans: spansToSpansProto(h.NegativeSpans),
|
||||||
|
NegativeDeltas: h.NegativeBuckets,
|
||||||
|
PositiveSpans: spansToSpansProto(h.PositiveSpans),
|
||||||
|
PositiveDeltas: h.PositiveBuckets,
|
||||||
|
ResetHint: Histogram_ResetHint(h.CounterResetHint),
|
||||||
|
Timestamp: timestamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromFloatHistogram returns remote Histogram from the float Histogram.
|
||||||
|
func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram {
|
||||||
|
return Histogram{
|
||||||
|
Count: &Histogram_CountFloat{CountFloat: fh.Count},
|
||||||
|
Sum: fh.Sum,
|
||||||
|
Schema: fh.Schema,
|
||||||
|
ZeroThreshold: fh.ZeroThreshold,
|
||||||
|
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
|
||||||
|
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
|
||||||
|
NegativeCounts: fh.NegativeBuckets,
|
||||||
|
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
|
||||||
|
PositiveCounts: fh.PositiveBuckets,
|
||||||
|
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
|
||||||
|
Timestamp: timestamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func spansToSpansProto(s []histogram.Span) []BucketSpan {
|
||||||
|
spans := make([]BucketSpan, len(s))
|
||||||
|
for i := 0; i < len(s); i++ {
|
||||||
|
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
|
||||||
|
}
|
||||||
|
|
||||||
|
return spans
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, symbols []string) exemplar.Exemplar {
|
||||||
|
timestamp := m.Timestamp
|
||||||
|
|
||||||
|
return exemplar.Exemplar{
|
||||||
|
Labels: desymbolizeLabels(b, m.LabelsRefs, symbols),
|
||||||
|
Value: m.Value,
|
||||||
|
Ts: timestamp,
|
||||||
|
HasTs: timestamp != 0,
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,11 +20,6 @@ import (
|
||||||
func (m Sample) T() int64 { return m.Timestamp }
|
func (m Sample) T() int64 { return m.Timestamp }
|
||||||
func (m Sample) V() float64 { return m.Value }
|
func (m Sample) V() float64 { return m.Value }
|
||||||
|
|
||||||
func (h Histogram) IsFloatHistogram() bool {
|
|
||||||
_, ok := h.GetCount().(*Histogram_CountFloat)
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Request) OptimizedMarshal(dst []byte) ([]byte, error) {
|
func (m *Request) OptimizedMarshal(dst []byte) ([]byte, error) {
|
||||||
siz := m.Size()
|
siz := m.Size()
|
||||||
if cap(dst) < siz {
|
if cap(dst) < siz {
|
||||||
|
|
|
@ -72,9 +72,9 @@ func (t *SymbolsTable) Reset() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DesymbolizeLabels decodes label references, with given symbols to labels.
|
// desymbolizeLabels decodes label references, with given symbols to labels.
|
||||||
func DesymbolizeLabels(labelRefs []uint32, symbols []string) labels.Labels {
|
func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []string) labels.Labels {
|
||||||
b := labels.NewScratchBuilder(len(labelRefs))
|
b.Reset()
|
||||||
for i := 0; i < len(labelRefs); i += 2 {
|
for i := 0; i < len(labelRefs); i += 2 {
|
||||||
b.Add(symbols[labelRefs[i]], symbols[labelRefs[i+1]])
|
b.Add(symbols[labelRefs[i]], symbols[labelRefs[i+1]])
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,8 @@ func TestSymbolsTable(t *testing.T) {
|
||||||
ls := labels.FromStrings("__name__", "qwer", "zxcv", "1234")
|
ls := labels.FromStrings("__name__", "qwer", "zxcv", "1234")
|
||||||
encoded := s.SymbolizeLabels(ls, nil)
|
encoded := s.SymbolizeLabels(ls, nil)
|
||||||
require.Equal(t, []uint32{1, 3, 4, 5}, encoded)
|
require.Equal(t, []uint32{1, 3, 4, 5}, encoded)
|
||||||
decoded := DesymbolizeLabels(encoded, s.Symbols())
|
b := labels.NewScratchBuilder(len(encoded))
|
||||||
|
decoded := desymbolizeLabels(&b, encoded, s.Symbols())
|
||||||
require.Equal(t, ls, decoded)
|
require.Equal(t, ls, decoded)
|
||||||
|
|
||||||
// Different buf.
|
// Different buf.
|
||||||
|
|
230
prompb/rwcommon/codec_test.go
Normal file
230
prompb/rwcommon/codec_test.go
Normal file
|
@ -0,0 +1,230 @@
|
||||||
|
// Copyright 2024 Prometheus Team
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package rwcommon
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
|
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestToLabels(t *testing.T) {
|
||||||
|
expected := labels.FromStrings("__name__", "metric1", "foo", "bar")
|
||||||
|
|
||||||
|
t.Run("v1", func(t *testing.T) {
|
||||||
|
ts := prompb.TimeSeries{Labels: []prompb.Label{{Name: "__name__", Value: "metric1"}, {Name: "foo", Value: "bar"}}}
|
||||||
|
b := labels.NewScratchBuilder(2)
|
||||||
|
require.Equal(t, expected, ts.ToLabels(&b, nil))
|
||||||
|
require.Equal(t, ts.Labels, prompb.FromLabels(expected, nil))
|
||||||
|
require.Equal(t, ts.Labels, prompb.FromLabels(expected, ts.Labels))
|
||||||
|
})
|
||||||
|
t.Run("v2", func(t *testing.T) {
|
||||||
|
v2Symbols := []string{"", "__name__", "metric1", "foo", "bar"}
|
||||||
|
ts := writev2.TimeSeries{LabelsRefs: []uint32{1, 2, 3, 4}}
|
||||||
|
b := labels.NewScratchBuilder(2)
|
||||||
|
require.Equal(t, expected, ts.ToLabels(&b, v2Symbols))
|
||||||
|
// No need for FromLabels in our prod code as we use symbol table to do so.
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFromMetadataType(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
input model.MetricType
|
||||||
|
expectedV1 prompb.MetricMetadata_MetricType
|
||||||
|
expectedV2 writev2.Metadata_MetricType
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "with a single-word metric",
|
||||||
|
input: model.MetricTypeCounter,
|
||||||
|
expectedV1: prompb.MetricMetadata_COUNTER,
|
||||||
|
expectedV2: writev2.Metadata_METRIC_TYPE_COUNTER,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "with a two-word metric",
|
||||||
|
input: model.MetricTypeStateset,
|
||||||
|
expectedV1: prompb.MetricMetadata_STATESET,
|
||||||
|
expectedV2: writev2.Metadata_METRIC_TYPE_STATESET,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "with an unknown metric",
|
||||||
|
input: "not-known",
|
||||||
|
expectedV1: prompb.MetricMetadata_UNKNOWN,
|
||||||
|
expectedV2: writev2.Metadata_METRIC_TYPE_UNSPECIFIED,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
t.Run("v1", func(t *testing.T) {
|
||||||
|
require.Equal(t, tc.expectedV1, prompb.FromMetadataType(tc.input))
|
||||||
|
})
|
||||||
|
t.Run("v2", func(t *testing.T) {
|
||||||
|
require.Equal(t, tc.expectedV2, writev2.FromMetadataType(tc.input))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestToHistogram_Empty(t *testing.T) {
|
||||||
|
t.Run("v1", func(t *testing.T) {
|
||||||
|
require.NotNilf(t, prompb.Histogram{}.ToIntHistogram(), "")
|
||||||
|
require.NotNilf(t, prompb.Histogram{}.ToFloatHistogram(), "")
|
||||||
|
})
|
||||||
|
t.Run("v2", func(t *testing.T) {
|
||||||
|
require.NotNilf(t, writev2.Histogram{}.ToIntHistogram(), "")
|
||||||
|
require.NotNilf(t, writev2.Histogram{}.ToFloatHistogram(), "")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testIntHistogram() histogram.Histogram {
|
||||||
|
return histogram.Histogram{
|
||||||
|
CounterResetHint: histogram.GaugeType,
|
||||||
|
Schema: 0,
|
||||||
|
Count: 19,
|
||||||
|
Sum: 2.7,
|
||||||
|
ZeroThreshold: 1e-128,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 4},
|
||||||
|
{Offset: 0, Length: 0},
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
|
||||||
|
NegativeSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 5},
|
||||||
|
{Offset: 1, Length: 0},
|
||||||
|
{Offset: 0, Length: 1},
|
||||||
|
},
|
||||||
|
NegativeBuckets: []int64{1, 2, -2, 1, -1, 0},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testFloatHistogram() histogram.FloatHistogram {
|
||||||
|
return histogram.FloatHistogram{
|
||||||
|
CounterResetHint: histogram.GaugeType,
|
||||||
|
Schema: 0,
|
||||||
|
Count: 19,
|
||||||
|
Sum: 2.7,
|
||||||
|
ZeroThreshold: 1e-128,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 4},
|
||||||
|
{Offset: 0, Length: 0},
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1},
|
||||||
|
NegativeSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 5},
|
||||||
|
{Offset: 1, Length: 0},
|
||||||
|
{Offset: 0, Length: 1},
|
||||||
|
},
|
||||||
|
NegativeBuckets: []float64{1, 3, 1, 2, 1, 1},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFromIntToFloatOrIntHistogram(t *testing.T) {
|
||||||
|
testIntHist := testIntHistogram()
|
||||||
|
testFloatHist := testFloatHistogram()
|
||||||
|
|
||||||
|
t.Run("v1", func(t *testing.T) {
|
||||||
|
h := prompb.FromIntHistogram(123, &testIntHist)
|
||||||
|
require.False(t, h.IsFloatHistogram())
|
||||||
|
require.Equal(t, int64(123), h.Timestamp)
|
||||||
|
require.Equal(t, &testIntHist, h.ToIntHistogram())
|
||||||
|
require.Equal(t, &testFloatHist, h.ToFloatHistogram())
|
||||||
|
})
|
||||||
|
t.Run("v2", func(t *testing.T) {
|
||||||
|
h := writev2.FromIntHistogram(123, &testIntHist)
|
||||||
|
require.False(t, h.IsFloatHistogram())
|
||||||
|
require.Equal(t, int64(123), h.Timestamp)
|
||||||
|
require.Equal(t, &testIntHist, h.ToIntHistogram())
|
||||||
|
require.Equal(t, &testFloatHist, h.ToFloatHistogram())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFromFloatToFloatHistogram(t *testing.T) {
|
||||||
|
testFloatHist := testFloatHistogram()
|
||||||
|
|
||||||
|
t.Run("v1", func(t *testing.T) {
|
||||||
|
h := prompb.FromFloatHistogram(123, &testFloatHist)
|
||||||
|
require.True(t, h.IsFloatHistogram())
|
||||||
|
require.Equal(t, int64(123), h.Timestamp)
|
||||||
|
require.Equal(t, &testFloatHist, h.ToFloatHistogram())
|
||||||
|
})
|
||||||
|
t.Run("v2", func(t *testing.T) {
|
||||||
|
h := writev2.FromFloatHistogram(123, &testFloatHist)
|
||||||
|
require.True(t, h.IsFloatHistogram())
|
||||||
|
require.Equal(t, int64(123), h.Timestamp)
|
||||||
|
require.Equal(t, &testFloatHist, h.ToFloatHistogram())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFromIntOrFloatHistogram_ResetHint(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
input histogram.CounterResetHint
|
||||||
|
expectedV1 prompb.Histogram_ResetHint
|
||||||
|
expectedV2 writev2.Histogram_ResetHint
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
input: histogram.UnknownCounterReset,
|
||||||
|
expectedV1: prompb.Histogram_UNKNOWN,
|
||||||
|
expectedV2: writev2.Histogram_RESET_HINT_UNSPECIFIED,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: histogram.CounterReset,
|
||||||
|
expectedV1: prompb.Histogram_YES,
|
||||||
|
expectedV2: writev2.Histogram_RESET_HINT_YES,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: histogram.NotCounterReset,
|
||||||
|
expectedV1: prompb.Histogram_NO,
|
||||||
|
expectedV2: writev2.Histogram_RESET_HINT_NO,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: histogram.GaugeType,
|
||||||
|
expectedV1: prompb.Histogram_GAUGE,
|
||||||
|
expectedV2: writev2.Histogram_RESET_HINT_GAUGE,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run("", func(t *testing.T) {
|
||||||
|
t.Run("v1", func(t *testing.T) {
|
||||||
|
h := testIntHistogram()
|
||||||
|
h.CounterResetHint = tc.input
|
||||||
|
got := prompb.FromIntHistogram(1337, &h)
|
||||||
|
require.Equal(t, tc.expectedV1, got.GetResetHint())
|
||||||
|
|
||||||
|
fh := testFloatHistogram()
|
||||||
|
fh.CounterResetHint = tc.input
|
||||||
|
got2 := prompb.FromFloatHistogram(1337, &fh)
|
||||||
|
require.Equal(t, tc.expectedV1, got2.GetResetHint())
|
||||||
|
})
|
||||||
|
t.Run("v2", func(t *testing.T) {
|
||||||
|
h := testIntHistogram()
|
||||||
|
h.CounterResetHint = tc.input
|
||||||
|
got := writev2.FromIntHistogram(1337, &h)
|
||||||
|
require.Equal(t, tc.expectedV2, got.GetResetHint())
|
||||||
|
|
||||||
|
fh := testFloatHistogram()
|
||||||
|
fh.CounterResetHint = tc.input
|
||||||
|
got2 := writev2.FromFloatHistogram(1337, &fh)
|
||||||
|
require.Equal(t, tc.expectedV2, got2.GetResetHint())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,7 +22,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"slices"
|
"slices"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
|
@ -30,10 +29,8 @@ import (
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
|
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/exemplar"
|
|
||||||
"github.com/prometheus/prometheus/model/histogram"
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/model/metadata"
|
|
||||||
"github.com/prometheus/prometheus/prompb"
|
"github.com/prometheus/prometheus/prompb"
|
||||||
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
|
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
@ -155,10 +152,10 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
|
||||||
})
|
})
|
||||||
case chunkenc.ValHistogram:
|
case chunkenc.ValHistogram:
|
||||||
ts, h := iter.AtHistogram(nil)
|
ts, h := iter.AtHistogram(nil)
|
||||||
histograms = append(histograms, HistogramToHistogramProto(ts, h))
|
histograms = append(histograms, prompb.FromIntHistogram(ts, h))
|
||||||
case chunkenc.ValFloatHistogram:
|
case chunkenc.ValFloatHistogram:
|
||||||
ts, fh := iter.AtFloatHistogram(nil)
|
ts, fh := iter.AtFloatHistogram(nil)
|
||||||
histograms = append(histograms, FloatHistogramToHistogramProto(ts, fh))
|
histograms = append(histograms, prompb.FromFloatHistogram(ts, fh))
|
||||||
default:
|
default:
|
||||||
return nil, ss.Warnings(), fmt.Errorf("unrecognized value type: %s", valType)
|
return nil, ss.Warnings(), fmt.Errorf("unrecognized value type: %s", valType)
|
||||||
}
|
}
|
||||||
|
@ -168,7 +165,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
|
||||||
}
|
}
|
||||||
|
|
||||||
resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{
|
resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{
|
||||||
Labels: labelsToLabelsProto(series.Labels(), nil),
|
Labels: prompb.FromLabels(series.Labels(), nil),
|
||||||
Samples: samples,
|
Samples: samples,
|
||||||
Histograms: histograms,
|
Histograms: histograms,
|
||||||
})
|
})
|
||||||
|
@ -184,7 +181,7 @@ func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet
|
||||||
if err := validateLabelsAndMetricName(ts.Labels); err != nil {
|
if err := validateLabelsAndMetricName(ts.Labels); err != nil {
|
||||||
return errSeriesSet{err: err}
|
return errSeriesSet{err: err}
|
||||||
}
|
}
|
||||||
lbls := labelProtosToLabels(&b, ts.Labels)
|
lbls := ts.ToLabels(&b, nil)
|
||||||
series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms})
|
series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,7 +234,7 @@ func StreamChunkedReadResponses(
|
||||||
for ss.Next() {
|
for ss.Next() {
|
||||||
series := ss.At()
|
series := ss.At()
|
||||||
iter = series.Iterator(iter)
|
iter = series.Iterator(iter)
|
||||||
lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels)
|
lbls = MergeLabels(prompb.FromLabels(series.Labels(), lbls), sortedExternalLabels)
|
||||||
|
|
||||||
maxDataLength := maxBytesInFrame
|
maxDataLength := maxBytesInFrame
|
||||||
for _, lbl := range lbls {
|
for _, lbl := range lbls {
|
||||||
|
@ -483,21 +480,16 @@ func (c *concreteSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *hist
|
||||||
panic("iterator is not on an integer histogram sample")
|
panic("iterator is not on an integer histogram sample")
|
||||||
}
|
}
|
||||||
h := c.series.histograms[c.histogramsCur]
|
h := c.series.histograms[c.histogramsCur]
|
||||||
return h.Timestamp, HistogramProtoToHistogram(h)
|
return h.Timestamp, h.ToIntHistogram()
|
||||||
}
|
}
|
||||||
|
|
||||||
// AtFloatHistogram implements chunkenc.Iterator.
|
// AtFloatHistogram implements chunkenc.Iterator.
|
||||||
func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
||||||
switch c.curValType {
|
if c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram {
|
||||||
case chunkenc.ValHistogram:
|
|
||||||
fh := c.series.histograms[c.histogramsCur]
|
fh := c.series.histograms[c.histogramsCur]
|
||||||
return fh.Timestamp, HistogramProtoToFloatHistogram(fh)
|
return fh.Timestamp, fh.ToFloatHistogram() // integer will be auto-converted.
|
||||||
case chunkenc.ValFloatHistogram:
|
|
||||||
fh := c.series.histograms[c.histogramsCur]
|
|
||||||
return fh.Timestamp, FloatHistogramProtoToFloatHistogram(fh)
|
|
||||||
default:
|
|
||||||
panic("iterator is not on a histogram sample")
|
|
||||||
}
|
}
|
||||||
|
panic("iterator is not on a histogram sample")
|
||||||
}
|
}
|
||||||
|
|
||||||
// AtT implements chunkenc.Iterator.
|
// AtT implements chunkenc.Iterator.
|
||||||
|
@ -619,292 +611,6 @@ func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func exemplarProtoToExemplar(b *labels.ScratchBuilder, ep prompb.Exemplar) exemplar.Exemplar {
|
|
||||||
timestamp := ep.Timestamp
|
|
||||||
|
|
||||||
return exemplar.Exemplar{
|
|
||||||
Labels: labelProtosToLabels(b, ep.Labels),
|
|
||||||
Value: ep.Value,
|
|
||||||
Ts: timestamp,
|
|
||||||
HasTs: timestamp != 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func exemplarProtoV2ToExemplar(ep writev2.Exemplar, symbols []string) exemplar.Exemplar {
|
|
||||||
timestamp := ep.Timestamp
|
|
||||||
|
|
||||||
return exemplar.Exemplar{
|
|
||||||
Labels: writev2.DesymbolizeLabels(ep.LabelsRefs, symbols),
|
|
||||||
Value: ep.Value,
|
|
||||||
Ts: timestamp,
|
|
||||||
HasTs: timestamp != 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func metadataProtoV2ToMetadata(mp writev2.Metadata, symbols []string) metadata.Metadata {
|
|
||||||
return metadata.Metadata{
|
|
||||||
Type: metricTypeFromProtoV2Equivalent(mp.Type),
|
|
||||||
Unit: symbols[mp.UnitRef],
|
|
||||||
Help: symbols[mp.HelpRef],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
|
|
||||||
// provided proto message. The caller has to make sure that the proto message
|
|
||||||
// represents an integer histogram and not a float histogram, or it panics.
|
|
||||||
func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
|
|
||||||
if hp.IsFloatHistogram() {
|
|
||||||
panic("HistogramProtoToHistogram called with a float histogram")
|
|
||||||
}
|
|
||||||
return &histogram.Histogram{
|
|
||||||
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
|
|
||||||
Schema: hp.Schema,
|
|
||||||
ZeroThreshold: hp.ZeroThreshold,
|
|
||||||
ZeroCount: hp.GetZeroCountInt(),
|
|
||||||
Count: hp.GetCountInt(),
|
|
||||||
Sum: hp.Sum,
|
|
||||||
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
|
|
||||||
PositiveBuckets: hp.GetPositiveDeltas(),
|
|
||||||
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
|
|
||||||
NegativeBuckets: hp.GetNegativeDeltas(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// HistogramProtoV2ToHistogram extracts a (normal integer) Histogram from the
|
|
||||||
// provided proto message. The caller has to make sure that the proto message
|
|
||||||
// represents an integer histogram and not a float histogram, or it panics.
|
|
||||||
func HistogramProtoV2ToHistogram(hp writev2.Histogram) *histogram.Histogram {
|
|
||||||
if hp.IsFloatHistogram() {
|
|
||||||
panic("HistogramProtoToHistogram called with a float histogram")
|
|
||||||
}
|
|
||||||
return &histogram.Histogram{
|
|
||||||
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
|
|
||||||
Schema: hp.Schema,
|
|
||||||
ZeroThreshold: hp.ZeroThreshold,
|
|
||||||
ZeroCount: hp.GetZeroCountInt(),
|
|
||||||
Count: hp.GetCountInt(),
|
|
||||||
Sum: hp.Sum,
|
|
||||||
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
|
|
||||||
PositiveBuckets: hp.GetPositiveDeltas(),
|
|
||||||
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
|
|
||||||
NegativeBuckets: hp.GetNegativeDeltas(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// FloatHistogramProtoToFloatHistogram extracts a float Histogram from the
|
|
||||||
// provided proto message to a Float Histogram. The caller has to make sure that
|
|
||||||
// the proto message represents a float histogram and not an integer histogram,
|
|
||||||
// or it panics.
|
|
||||||
func FloatHistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
|
|
||||||
if !hp.IsFloatHistogram() {
|
|
||||||
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
|
|
||||||
}
|
|
||||||
return &histogram.FloatHistogram{
|
|
||||||
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
|
|
||||||
Schema: hp.Schema,
|
|
||||||
ZeroThreshold: hp.ZeroThreshold,
|
|
||||||
ZeroCount: hp.GetZeroCountFloat(),
|
|
||||||
Count: hp.GetCountFloat(),
|
|
||||||
Sum: hp.Sum,
|
|
||||||
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
|
|
||||||
PositiveBuckets: hp.GetPositiveCounts(),
|
|
||||||
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
|
|
||||||
NegativeBuckets: hp.GetNegativeCounts(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// FloatHistogramProtoV2ToFloatHistogram extracts a float Histogram from the
|
|
||||||
// provided proto message to a Float Histogram. The caller has to make sure that
|
|
||||||
// the proto message represents a float histogram and not an integer histogram,
|
|
||||||
// or it panics.
|
|
||||||
func FloatHistogramProtoV2ToFloatHistogram(hp writev2.Histogram) *histogram.FloatHistogram {
|
|
||||||
if !hp.IsFloatHistogram() {
|
|
||||||
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
|
|
||||||
}
|
|
||||||
return &histogram.FloatHistogram{
|
|
||||||
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
|
|
||||||
Schema: hp.Schema,
|
|
||||||
ZeroThreshold: hp.ZeroThreshold,
|
|
||||||
ZeroCount: hp.GetZeroCountFloat(),
|
|
||||||
Count: hp.GetCountFloat(),
|
|
||||||
Sum: hp.Sum,
|
|
||||||
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
|
|
||||||
PositiveBuckets: hp.GetPositiveCounts(),
|
|
||||||
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
|
|
||||||
NegativeBuckets: hp.GetNegativeCounts(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// HistogramProtoToFloatHistogram extracts and converts a (normal integer) histogram from the provided proto message
|
|
||||||
// to a float histogram. The caller has to make sure that the proto message represents an integer histogram and not a
|
|
||||||
// float histogram, or it panics.
|
|
||||||
func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
|
|
||||||
if hp.IsFloatHistogram() {
|
|
||||||
panic("HistogramProtoToFloatHistogram called with a float histogram")
|
|
||||||
}
|
|
||||||
return &histogram.FloatHistogram{
|
|
||||||
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
|
|
||||||
Schema: hp.Schema,
|
|
||||||
ZeroThreshold: hp.ZeroThreshold,
|
|
||||||
ZeroCount: float64(hp.GetZeroCountInt()),
|
|
||||||
Count: float64(hp.GetCountInt()),
|
|
||||||
Sum: hp.Sum,
|
|
||||||
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
|
|
||||||
PositiveBuckets: deltasToCounts(hp.GetPositiveDeltas()),
|
|
||||||
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
|
|
||||||
NegativeBuckets: deltasToCounts(hp.GetNegativeDeltas()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func FloatV2HistogramProtoToFloatHistogram(hp writev2.Histogram) *histogram.FloatHistogram {
|
|
||||||
if !hp.IsFloatHistogram() {
|
|
||||||
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
|
|
||||||
}
|
|
||||||
return &histogram.FloatHistogram{
|
|
||||||
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
|
|
||||||
Schema: hp.Schema,
|
|
||||||
ZeroThreshold: hp.ZeroThreshold,
|
|
||||||
ZeroCount: hp.GetZeroCountFloat(),
|
|
||||||
Count: hp.GetCountFloat(),
|
|
||||||
Sum: hp.Sum,
|
|
||||||
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
|
|
||||||
PositiveBuckets: hp.GetPositiveCounts(),
|
|
||||||
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
|
|
||||||
NegativeBuckets: hp.GetNegativeCounts(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// V2HistogramProtoToHistogram extracts a (normal integer) Histogram from the
|
|
||||||
// provided proto message. The caller has to make sure that the proto message
|
|
||||||
// represents an integer histogram and not a float histogram, or it panics.
|
|
||||||
func V2HistogramProtoToHistogram(hp writev2.Histogram) *histogram.Histogram {
|
|
||||||
if hp.IsFloatHistogram() {
|
|
||||||
panic("HistogramProtoToHistogram called with a float histogram")
|
|
||||||
}
|
|
||||||
return &histogram.Histogram{
|
|
||||||
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
|
|
||||||
Schema: hp.Schema,
|
|
||||||
ZeroThreshold: hp.ZeroThreshold,
|
|
||||||
ZeroCount: hp.GetZeroCountInt(),
|
|
||||||
Count: hp.GetCountInt(),
|
|
||||||
Sum: hp.Sum,
|
|
||||||
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
|
|
||||||
PositiveBuckets: hp.GetPositiveDeltas(),
|
|
||||||
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
|
|
||||||
NegativeBuckets: hp.GetNegativeDeltas(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span {
|
|
||||||
spans := make([]histogram.Span, len(s))
|
|
||||||
for i := 0; i < len(s); i++ {
|
|
||||||
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
|
|
||||||
}
|
|
||||||
|
|
||||||
return spans
|
|
||||||
}
|
|
||||||
|
|
||||||
func spansProtoV2ToSpans(s []writev2.BucketSpan) []histogram.Span {
|
|
||||||
spans := make([]histogram.Span, len(s))
|
|
||||||
for i := 0; i < len(s); i++ {
|
|
||||||
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
|
|
||||||
}
|
|
||||||
|
|
||||||
return spans
|
|
||||||
}
|
|
||||||
|
|
||||||
func deltasToCounts(deltas []int64) []float64 {
|
|
||||||
counts := make([]float64, len(deltas))
|
|
||||||
var cur float64
|
|
||||||
for i, d := range deltas {
|
|
||||||
cur += float64(d)
|
|
||||||
counts[i] = cur
|
|
||||||
}
|
|
||||||
return counts
|
|
||||||
}
|
|
||||||
|
|
||||||
func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram {
|
|
||||||
return prompb.Histogram{
|
|
||||||
Count: &prompb.Histogram_CountInt{CountInt: h.Count},
|
|
||||||
Sum: h.Sum,
|
|
||||||
Schema: h.Schema,
|
|
||||||
ZeroThreshold: h.ZeroThreshold,
|
|
||||||
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
|
|
||||||
NegativeSpans: spansToSpansProto(h.NegativeSpans),
|
|
||||||
NegativeDeltas: h.NegativeBuckets,
|
|
||||||
PositiveSpans: spansToSpansProto(h.PositiveSpans),
|
|
||||||
PositiveDeltas: h.PositiveBuckets,
|
|
||||||
ResetHint: prompb.Histogram_ResetHint(h.CounterResetHint),
|
|
||||||
Timestamp: timestamp,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func HistogramToV2HistogramProto(timestamp int64, h *histogram.Histogram) writev2.Histogram {
|
|
||||||
return writev2.Histogram{
|
|
||||||
Count: &writev2.Histogram_CountInt{CountInt: h.Count},
|
|
||||||
Sum: h.Sum,
|
|
||||||
Schema: h.Schema,
|
|
||||||
ZeroThreshold: h.ZeroThreshold,
|
|
||||||
ZeroCount: &writev2.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
|
|
||||||
NegativeSpans: spansToV2SpansProto(h.NegativeSpans),
|
|
||||||
NegativeDeltas: h.NegativeBuckets,
|
|
||||||
PositiveSpans: spansToV2SpansProto(h.PositiveSpans),
|
|
||||||
PositiveDeltas: h.PositiveBuckets,
|
|
||||||
ResetHint: writev2.Histogram_ResetHint(h.CounterResetHint),
|
|
||||||
Timestamp: timestamp,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) prompb.Histogram {
|
|
||||||
return prompb.Histogram{
|
|
||||||
Count: &prompb.Histogram_CountFloat{CountFloat: fh.Count},
|
|
||||||
Sum: fh.Sum,
|
|
||||||
Schema: fh.Schema,
|
|
||||||
ZeroThreshold: fh.ZeroThreshold,
|
|
||||||
ZeroCount: &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
|
|
||||||
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
|
|
||||||
NegativeCounts: fh.NegativeBuckets,
|
|
||||||
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
|
|
||||||
PositiveCounts: fh.PositiveBuckets,
|
|
||||||
ResetHint: prompb.Histogram_ResetHint(fh.CounterResetHint),
|
|
||||||
Timestamp: timestamp,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func FloatHistogramToV2HistogramProto(timestamp int64, fh *histogram.FloatHistogram) writev2.Histogram {
|
|
||||||
return writev2.Histogram{
|
|
||||||
Count: &writev2.Histogram_CountFloat{CountFloat: fh.Count},
|
|
||||||
Sum: fh.Sum,
|
|
||||||
Schema: fh.Schema,
|
|
||||||
ZeroThreshold: fh.ZeroThreshold,
|
|
||||||
ZeroCount: &writev2.Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
|
|
||||||
NegativeSpans: spansToV2SpansProto(fh.NegativeSpans),
|
|
||||||
NegativeCounts: fh.NegativeBuckets,
|
|
||||||
PositiveSpans: spansToV2SpansProto(fh.PositiveSpans),
|
|
||||||
PositiveCounts: fh.PositiveBuckets,
|
|
||||||
ResetHint: writev2.Histogram_ResetHint(fh.CounterResetHint),
|
|
||||||
Timestamp: timestamp,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func spansToSpansProto(s []histogram.Span) []prompb.BucketSpan {
|
|
||||||
spans := make([]prompb.BucketSpan, len(s))
|
|
||||||
for i := 0; i < len(s); i++ {
|
|
||||||
spans[i] = prompb.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
|
|
||||||
}
|
|
||||||
|
|
||||||
return spans
|
|
||||||
}
|
|
||||||
|
|
||||||
func spansToV2SpansProto(s []histogram.Span) []writev2.BucketSpan {
|
|
||||||
spans := make([]writev2.BucketSpan, len(s))
|
|
||||||
for i := 0; i < len(s); i++ {
|
|
||||||
spans[i] = writev2.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
|
|
||||||
}
|
|
||||||
|
|
||||||
return spans
|
|
||||||
}
|
|
||||||
|
|
||||||
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric.
|
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric.
|
||||||
func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
|
func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
|
||||||
metric := make(model.Metric, len(labelPairs))
|
metric := make(model.Metric, len(labelPairs))
|
||||||
|
@ -914,55 +620,6 @@ func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
|
||||||
return metric
|
return metric
|
||||||
}
|
}
|
||||||
|
|
||||||
func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []prompb.Label) labels.Labels {
|
|
||||||
b.Reset()
|
|
||||||
for _, l := range labelPairs {
|
|
||||||
b.Add(l.Name, l.Value)
|
|
||||||
}
|
|
||||||
b.Sort()
|
|
||||||
return b.Labels()
|
|
||||||
}
|
|
||||||
|
|
||||||
// labelsToLabelsProto transforms labels into prompb labels. The buffer slice
|
|
||||||
// will be used to avoid allocations if it is big enough to store the labels.
|
|
||||||
func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label {
|
|
||||||
result := buf[:0]
|
|
||||||
lbls.Range(func(l labels.Label) {
|
|
||||||
result = append(result, prompb.Label{
|
|
||||||
Name: l.Name,
|
|
||||||
Value: l.Value,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum.
|
|
||||||
func metricTypeToMetricTypeProto(t model.MetricType) prompb.MetricMetadata_MetricType {
|
|
||||||
mt := strings.ToUpper(string(t))
|
|
||||||
v, ok := prompb.MetricMetadata_MetricType_value[mt]
|
|
||||||
if !ok {
|
|
||||||
return prompb.MetricMetadata_UNKNOWN
|
|
||||||
}
|
|
||||||
|
|
||||||
return prompb.MetricMetadata_MetricType(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
// metricTypeToMetricTypeProtoV2 transforms a Prometheus metricType into writev2 metricType. Since the former is a string we need to transform it to an enum.
|
|
||||||
func metricTypeToMetricTypeProtoV2(t model.MetricType) writev2.Metadata_MetricType {
|
|
||||||
mt := strings.ToUpper(string(t))
|
|
||||||
v, ok := prompb.MetricMetadata_MetricType_value[mt]
|
|
||||||
if !ok {
|
|
||||||
return writev2.Metadata_METRIC_TYPE_UNSPECIFIED
|
|
||||||
}
|
|
||||||
|
|
||||||
return writev2.Metadata_MetricType(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func metricTypeFromProtoV2Equivalent(t writev2.Metadata_MetricType) model.MetricType {
|
|
||||||
mt := strings.ToLower(t.String())
|
|
||||||
return model.MetricType(mt) // TODO(@tpaschalis) a better way for this?
|
|
||||||
}
|
|
||||||
|
|
||||||
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
|
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
|
||||||
// snappy decompression.
|
// snappy decompression.
|
||||||
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
|
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
|
||||||
|
@ -1055,74 +712,3 @@ func DecodeV2WriteRequestStr(r io.Reader) (*writev2.Request, error) {
|
||||||
|
|
||||||
return &req, nil
|
return &req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func V2WriteRequestToWriteRequest(redReq *writev2.Request) (*prompb.WriteRequest, error) {
|
|
||||||
req := &prompb.WriteRequest{
|
|
||||||
Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)),
|
|
||||||
// TODO handle metadata?
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, rts := range redReq.Timeseries {
|
|
||||||
writev2.DesymbolizeLabels(rts.LabelsRefs, redReq.Symbols).Range(func(l labels.Label) {
|
|
||||||
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
|
|
||||||
Name: l.Name,
|
|
||||||
Value: l.Value,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
exemplars := make([]prompb.Exemplar, len(rts.Exemplars))
|
|
||||||
for j, e := range rts.Exemplars {
|
|
||||||
exemplars[j].Value = e.Value
|
|
||||||
exemplars[j].Timestamp = e.Timestamp
|
|
||||||
writev2.DesymbolizeLabels(e.LabelsRefs, redReq.Symbols).Range(func(l labels.Label) {
|
|
||||||
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
|
|
||||||
Name: l.Name,
|
|
||||||
Value: l.Value,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
req.Timeseries[i].Exemplars = exemplars
|
|
||||||
|
|
||||||
req.Timeseries[i].Samples = make([]prompb.Sample, len(rts.Samples))
|
|
||||||
for j, s := range rts.Samples {
|
|
||||||
req.Timeseries[i].Samples[j].Timestamp = s.Timestamp
|
|
||||||
req.Timeseries[i].Samples[j].Value = s.Value
|
|
||||||
}
|
|
||||||
|
|
||||||
req.Timeseries[i].Histograms = make([]prompb.Histogram, len(rts.Histograms))
|
|
||||||
for j, h := range rts.Histograms {
|
|
||||||
// TODO: double check
|
|
||||||
if h.IsFloatHistogram() {
|
|
||||||
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountFloat{CountFloat: h.GetCountFloat()}
|
|
||||||
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: h.GetZeroCountFloat()}
|
|
||||||
} else {
|
|
||||||
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountInt{CountInt: h.GetCountInt()}
|
|
||||||
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, span := range h.NegativeSpans {
|
|
||||||
req.Timeseries[i].Histograms[j].NegativeSpans = append(req.Timeseries[i].Histograms[j].NegativeSpans, prompb.BucketSpan{
|
|
||||||
Offset: span.Offset,
|
|
||||||
Length: span.Length,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
for _, span := range h.PositiveSpans {
|
|
||||||
req.Timeseries[i].Histograms[j].PositiveSpans = append(req.Timeseries[i].Histograms[j].PositiveSpans, prompb.BucketSpan{
|
|
||||||
Offset: span.Offset,
|
|
||||||
Length: span.Length,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
req.Timeseries[i].Histograms[j].Sum = h.Sum
|
|
||||||
req.Timeseries[i].Histograms[j].Schema = h.Schema
|
|
||||||
req.Timeseries[i].Histograms[j].ZeroThreshold = h.ZeroThreshold
|
|
||||||
req.Timeseries[i].Histograms[j].NegativeDeltas = h.NegativeDeltas
|
|
||||||
req.Timeseries[i].Histograms[j].NegativeCounts = h.NegativeCounts
|
|
||||||
req.Timeseries[i].Histograms[j].PositiveDeltas = h.PositiveDeltas
|
|
||||||
req.Timeseries[i].Histograms[j].PositiveCounts = h.PositiveCounts
|
|
||||||
req.Timeseries[i].Histograms[j].ResetHint = prompb.Histogram_ResetHint(h.ResetHint)
|
|
||||||
req.Timeseries[i].Histograms[j].Timestamp = h.Timestamp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return req, nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
"github.com/prometheus/common/model"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/histogram"
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
@ -59,7 +58,7 @@ var writeRequestFixture = &prompb.WriteRequest{
|
||||||
},
|
},
|
||||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||||
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
|
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
|
||||||
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))},
|
Histograms: []prompb.Histogram{prompb.FromIntHistogram(0, &testHistogram), prompb.FromFloatHistogram(1, testHistogram.ToFloat(nil))},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Labels: []prompb.Label{
|
Labels: []prompb.Label{
|
||||||
|
@ -71,7 +70,7 @@ var writeRequestFixture = &prompb.WriteRequest{
|
||||||
},
|
},
|
||||||
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
|
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
|
||||||
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
|
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
|
||||||
Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat(nil))},
|
Histograms: []prompb.Histogram{prompb.FromIntHistogram(2, &testHistogram), prompb.FromFloatHistogram(3, testHistogram.ToFloat(nil))},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -79,37 +78,23 @@ var writeRequestFixture = &prompb.WriteRequest{
|
||||||
// writeV2RequestFixture represents the same request as writeRequestFixture, but using the v2 representation.
|
// writeV2RequestFixture represents the same request as writeRequestFixture, but using the v2 representation.
|
||||||
var writeV2RequestFixture = func() *writev2.Request {
|
var writeV2RequestFixture = func() *writev2.Request {
|
||||||
st := writev2.NewSymbolTable()
|
st := writev2.NewSymbolTable()
|
||||||
var labels []uint32
|
b := labels.NewScratchBuilder(0)
|
||||||
for _, s := range []string{
|
labelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].ToLabels(&b, nil), nil)
|
||||||
"__name__", "test_metric1",
|
exemplar1LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].Exemplars[0].ToExemplar(&b, nil).Labels, nil)
|
||||||
"b", "c",
|
exemplar2LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].Exemplars[0].ToExemplar(&b, nil).Labels, nil)
|
||||||
"baz", "qux",
|
|
||||||
"d", "e",
|
|
||||||
"foo", "bar",
|
|
||||||
} {
|
|
||||||
ref := st.Symbolize(s)
|
|
||||||
labels = append(labels, ref)
|
|
||||||
}
|
|
||||||
for _, s := range []string{
|
|
||||||
"f", "g", // 10, 11
|
|
||||||
"h", "i", // 12, 13
|
|
||||||
} {
|
|
||||||
_ = st.Symbolize(s)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &writev2.Request{
|
return &writev2.Request{
|
||||||
Timeseries: []writev2.TimeSeries{
|
Timeseries: []writev2.TimeSeries{
|
||||||
{
|
{
|
||||||
LabelsRefs: labels,
|
LabelsRefs: labelRefs,
|
||||||
Samples: []writev2.Sample{{Value: 1, Timestamp: 0}},
|
Samples: []writev2.Sample{{Value: 1, Timestamp: 0}},
|
||||||
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{10, 11}, Value: 1, Timestamp: 0}},
|
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 0}},
|
||||||
Histograms: []writev2.Histogram{HistogramToV2HistogramProto(0, &testHistogram), FloatHistogramToV2HistogramProto(1, testHistogram.ToFloat(nil))},
|
Histograms: []writev2.Histogram{writev2.FromIntHistogram(0, &testHistogram), writev2.FromFloatHistogram(1, testHistogram.ToFloat(nil))},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
LabelsRefs: labels,
|
LabelsRefs: labelRefs,
|
||||||
Samples: []writev2.Sample{{Value: 2, Timestamp: 1}},
|
Samples: []writev2.Sample{{Value: 2, Timestamp: 1}},
|
||||||
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{12, 13}, Value: 2, Timestamp: 1}},
|
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 1}},
|
||||||
Histograms: []writev2.Histogram{HistogramToV2HistogramProto(2, &testHistogram), FloatHistogramToV2HistogramProto(3, testHistogram.ToFloat(nil))},
|
Histograms: []writev2.Histogram{writev2.FromIntHistogram(2, &testHistogram), writev2.FromFloatHistogram(3, testHistogram.ToFloat(nil))},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Symbols: st.Symbols(),
|
Symbols: st.Symbols(),
|
||||||
|
@ -310,7 +295,7 @@ func TestConcreteSeriesIterator_HistogramSamples(t *testing.T) {
|
||||||
} else {
|
} else {
|
||||||
ts = int64(i)
|
ts = int64(i)
|
||||||
}
|
}
|
||||||
histProtos[i] = HistogramToHistogramProto(ts, h)
|
histProtos[i] = prompb.FromIntHistogram(ts, h)
|
||||||
}
|
}
|
||||||
series := &concreteSeries{
|
series := &concreteSeries{
|
||||||
labels: labels.FromStrings("foo", "bar"),
|
labels: labels.FromStrings("foo", "bar"),
|
||||||
|
@ -361,9 +346,9 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
|
||||||
histProtos := make([]prompb.Histogram, len(histograms))
|
histProtos := make([]prompb.Histogram, len(histograms))
|
||||||
for i, h := range histograms {
|
for i, h := range histograms {
|
||||||
if i < 10 {
|
if i < 10 {
|
||||||
histProtos[i] = HistogramToHistogramProto(int64(i+1), h)
|
histProtos[i] = prompb.FromIntHistogram(int64(i+1), h)
|
||||||
} else {
|
} else {
|
||||||
histProtos[i] = HistogramToHistogramProto(int64(i+6), h)
|
histProtos[i] = prompb.FromIntHistogram(int64(i+6), h)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
series := &concreteSeries{
|
series := &concreteSeries{
|
||||||
|
@ -443,7 +428,7 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
|
||||||
require.Equal(t, chunkenc.ValHistogram, it.Next())
|
require.Equal(t, chunkenc.ValHistogram, it.Next())
|
||||||
ts, fh = it.AtFloatHistogram(nil)
|
ts, fh = it.AtFloatHistogram(nil)
|
||||||
require.Equal(t, int64(17), ts)
|
require.Equal(t, int64(17), ts)
|
||||||
expected := HistogramProtoToFloatHistogram(HistogramToHistogramProto(int64(17), histograms[11]))
|
expected := prompb.FromIntHistogram(int64(17), histograms[11]).ToFloatHistogram()
|
||||||
require.Equal(t, expected, fh)
|
require.Equal(t, expected, fh)
|
||||||
|
|
||||||
// Keep calling Next() until the end.
|
// Keep calling Next() until the end.
|
||||||
|
@ -527,37 +512,6 @@ func TestMergeLabels(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMetricTypeToMetricTypeProto(t *testing.T) {
|
|
||||||
tc := []struct {
|
|
||||||
desc string
|
|
||||||
input model.MetricType
|
|
||||||
expected prompb.MetricMetadata_MetricType
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
desc: "with a single-word metric",
|
|
||||||
input: model.MetricTypeCounter,
|
|
||||||
expected: prompb.MetricMetadata_COUNTER,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
desc: "with a two-word metric",
|
|
||||||
input: model.MetricTypeStateset,
|
|
||||||
expected: prompb.MetricMetadata_STATESET,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
desc: "with an unknown metric",
|
|
||||||
input: "not-known",
|
|
||||||
expected: prompb.MetricMetadata_UNKNOWN,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tc {
|
|
||||||
t.Run(tt.desc, func(t *testing.T) {
|
|
||||||
m := metricTypeToMetricTypeProto(tt.input)
|
|
||||||
require.Equal(t, tt.expected, m)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDecodeWriteRequest(t *testing.T) {
|
func TestDecodeWriteRequest(t *testing.T) {
|
||||||
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy")
|
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -576,212 +530,9 @@ func TestDecodeV2WriteRequest(t *testing.T) {
|
||||||
require.Equal(t, writeV2RequestFixture, actual)
|
require.Equal(t, writeV2RequestFixture, actual)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNilHistogramProto(t *testing.T) {
|
|
||||||
// This function will panic if it impromperly handles nil
|
|
||||||
// values, causing the test to fail.
|
|
||||||
HistogramProtoToHistogram(prompb.Histogram{})
|
|
||||||
HistogramProtoToFloatHistogram(prompb.Histogram{})
|
|
||||||
}
|
|
||||||
|
|
||||||
func exampleHistogram() histogram.Histogram {
|
|
||||||
return histogram.Histogram{
|
|
||||||
CounterResetHint: histogram.GaugeType,
|
|
||||||
Schema: 0,
|
|
||||||
Count: 19,
|
|
||||||
Sum: 2.7,
|
|
||||||
PositiveSpans: []histogram.Span{
|
|
||||||
{Offset: 0, Length: 4},
|
|
||||||
{Offset: 0, Length: 0},
|
|
||||||
{Offset: 0, Length: 3},
|
|
||||||
},
|
|
||||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
|
|
||||||
NegativeSpans: []histogram.Span{
|
|
||||||
{Offset: 0, Length: 5},
|
|
||||||
{Offset: 1, Length: 0},
|
|
||||||
{Offset: 0, Length: 1},
|
|
||||||
},
|
|
||||||
NegativeBuckets: []int64{1, 2, -2, 1, -1, 0},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func exampleHistogramProto() prompb.Histogram {
|
|
||||||
return prompb.Histogram{
|
|
||||||
Count: &prompb.Histogram_CountInt{CountInt: 19},
|
|
||||||
Sum: 2.7,
|
|
||||||
Schema: 0,
|
|
||||||
ZeroThreshold: 0,
|
|
||||||
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: 0},
|
|
||||||
NegativeSpans: []prompb.BucketSpan{
|
|
||||||
{
|
|
||||||
Offset: 0,
|
|
||||||
Length: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Offset: 1,
|
|
||||||
Length: 0,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Offset: 0,
|
|
||||||
Length: 1,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
NegativeDeltas: []int64{1, 2, -2, 1, -1, 0},
|
|
||||||
PositiveSpans: []prompb.BucketSpan{
|
|
||||||
{
|
|
||||||
Offset: 0,
|
|
||||||
Length: 4,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Offset: 0,
|
|
||||||
Length: 0,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Offset: 0,
|
|
||||||
Length: 3,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
PositiveDeltas: []int64{1, 2, -2, 1, -1, 0, 0},
|
|
||||||
ResetHint: prompb.Histogram_GAUGE,
|
|
||||||
Timestamp: 1337,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHistogramToProtoConvert(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
input histogram.CounterResetHint
|
|
||||||
expected prompb.Histogram_ResetHint
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
input: histogram.UnknownCounterReset,
|
|
||||||
expected: prompb.Histogram_UNKNOWN,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
input: histogram.CounterReset,
|
|
||||||
expected: prompb.Histogram_YES,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
input: histogram.NotCounterReset,
|
|
||||||
expected: prompb.Histogram_NO,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
input: histogram.GaugeType,
|
|
||||||
expected: prompb.Histogram_GAUGE,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
h := exampleHistogram()
|
|
||||||
h.CounterResetHint = test.input
|
|
||||||
p := exampleHistogramProto()
|
|
||||||
p.ResetHint = test.expected
|
|
||||||
|
|
||||||
require.Equal(t, p, HistogramToHistogramProto(1337, &h))
|
|
||||||
|
|
||||||
require.Equal(t, h, *HistogramProtoToHistogram(p))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func exampleFloatHistogram() histogram.FloatHistogram {
|
|
||||||
return histogram.FloatHistogram{
|
|
||||||
CounterResetHint: histogram.GaugeType,
|
|
||||||
Schema: 0,
|
|
||||||
Count: 19,
|
|
||||||
Sum: 2.7,
|
|
||||||
PositiveSpans: []histogram.Span{
|
|
||||||
{Offset: 0, Length: 4},
|
|
||||||
{Offset: 0, Length: 0},
|
|
||||||
{Offset: 0, Length: 3},
|
|
||||||
},
|
|
||||||
PositiveBuckets: []float64{1, 2, -2, 1, -1, 0, 0},
|
|
||||||
NegativeSpans: []histogram.Span{
|
|
||||||
{Offset: 0, Length: 5},
|
|
||||||
{Offset: 1, Length: 0},
|
|
||||||
{Offset: 0, Length: 1},
|
|
||||||
},
|
|
||||||
NegativeBuckets: []float64{1, 2, -2, 1, -1, 0},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func exampleFloatHistogramProto() prompb.Histogram {
|
|
||||||
return prompb.Histogram{
|
|
||||||
Count: &prompb.Histogram_CountFloat{CountFloat: 19},
|
|
||||||
Sum: 2.7,
|
|
||||||
Schema: 0,
|
|
||||||
ZeroThreshold: 0,
|
|
||||||
ZeroCount: &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: 0},
|
|
||||||
NegativeSpans: []prompb.BucketSpan{
|
|
||||||
{
|
|
||||||
Offset: 0,
|
|
||||||
Length: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Offset: 1,
|
|
||||||
Length: 0,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Offset: 0,
|
|
||||||
Length: 1,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
NegativeCounts: []float64{1, 2, -2, 1, -1, 0},
|
|
||||||
PositiveSpans: []prompb.BucketSpan{
|
|
||||||
{
|
|
||||||
Offset: 0,
|
|
||||||
Length: 4,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Offset: 0,
|
|
||||||
Length: 0,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Offset: 0,
|
|
||||||
Length: 3,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
PositiveCounts: []float64{1, 2, -2, 1, -1, 0, 0},
|
|
||||||
ResetHint: prompb.Histogram_GAUGE,
|
|
||||||
Timestamp: 1337,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFloatHistogramToProtoConvert(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
input histogram.CounterResetHint
|
|
||||||
expected prompb.Histogram_ResetHint
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
input: histogram.UnknownCounterReset,
|
|
||||||
expected: prompb.Histogram_UNKNOWN,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
input: histogram.CounterReset,
|
|
||||||
expected: prompb.Histogram_YES,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
input: histogram.NotCounterReset,
|
|
||||||
expected: prompb.Histogram_NO,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
input: histogram.GaugeType,
|
|
||||||
expected: prompb.Histogram_GAUGE,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
h := exampleFloatHistogram()
|
|
||||||
h.CounterResetHint = test.input
|
|
||||||
p := exampleFloatHistogramProto()
|
|
||||||
p.ResetHint = test.expected
|
|
||||||
|
|
||||||
require.Equal(t, p, FloatHistogramToHistogramProto(1337, &h))
|
|
||||||
|
|
||||||
require.Equal(t, h, *FloatHistogramProtoToFloatHistogram(p))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStreamResponse(t *testing.T) {
|
func TestStreamResponse(t *testing.T) {
|
||||||
lbs1 := labelsToLabelsProto(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil)
|
lbs1 := prompb.FromLabels(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil)
|
||||||
lbs2 := labelsToLabelsProto(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil)
|
lbs2 := prompb.FromLabels(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil)
|
||||||
chunk := prompb.Chunk{
|
chunk := prompb.Chunk{
|
||||||
Type: prompb.Chunk_XOR,
|
Type: prompb.Chunk_XOR,
|
||||||
Data: make([]byte, 100),
|
Data: make([]byte, 100),
|
||||||
|
@ -853,7 +604,7 @@ func (c *mockChunkSeriesSet) Next() bool {
|
||||||
|
|
||||||
func (c *mockChunkSeriesSet) At() storage.ChunkSeries {
|
func (c *mockChunkSeriesSet) At() storage.ChunkSeries {
|
||||||
return &storage.ChunkSeriesEntry{
|
return &storage.ChunkSeriesEntry{
|
||||||
Lset: labelProtosToLabels(&c.builder, c.chunkedSeries[c.index].Labels),
|
Lset: c.chunkedSeries[c.index].ToLabels(&c.builder, nil),
|
||||||
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
|
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
|
||||||
return &mockChunkIterator{
|
return &mockChunkIterator{
|
||||||
chunks: c.chunkedSeries[c.index].Chunks,
|
chunks: c.chunkedSeries[c.index].Chunks,
|
||||||
|
@ -895,3 +646,74 @@ func (c *mockChunkIterator) Next() bool {
|
||||||
func (c *mockChunkIterator) Err() error {
|
func (c *mockChunkIterator) Err() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func v2RequesToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, error) {
|
||||||
|
req := &prompb.WriteRequest{
|
||||||
|
Timeseries: make([]prompb.TimeSeries, len(v2Req.Timeseries)),
|
||||||
|
// TODO handle metadata?
|
||||||
|
}
|
||||||
|
b := labels.NewScratchBuilder(0)
|
||||||
|
for i, rts := range v2Req.Timeseries {
|
||||||
|
rts.ToLabels(&b, v2Req.Symbols).Range(func(l labels.Label) {
|
||||||
|
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
|
||||||
|
Name: l.Name,
|
||||||
|
Value: l.Value,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
exemplars := make([]prompb.Exemplar, len(rts.Exemplars))
|
||||||
|
for j, e := range rts.Exemplars {
|
||||||
|
exemplars[j].Value = e.Value
|
||||||
|
exemplars[j].Timestamp = e.Timestamp
|
||||||
|
e.ToExemplar(&b, v2Req.Symbols).Labels.Range(func(l labels.Label) {
|
||||||
|
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
|
||||||
|
Name: l.Name,
|
||||||
|
Value: l.Value,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
req.Timeseries[i].Exemplars = exemplars
|
||||||
|
|
||||||
|
req.Timeseries[i].Samples = make([]prompb.Sample, len(rts.Samples))
|
||||||
|
for j, s := range rts.Samples {
|
||||||
|
req.Timeseries[i].Samples[j].Timestamp = s.Timestamp
|
||||||
|
req.Timeseries[i].Samples[j].Value = s.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Timeseries[i].Histograms = make([]prompb.Histogram, len(rts.Histograms))
|
||||||
|
for j, h := range rts.Histograms {
|
||||||
|
// TODO: double check
|
||||||
|
if h.IsFloatHistogram() {
|
||||||
|
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountFloat{CountFloat: h.GetCountFloat()}
|
||||||
|
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: h.GetZeroCountFloat()}
|
||||||
|
} else {
|
||||||
|
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountInt{CountInt: h.GetCountInt()}
|
||||||
|
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, span := range h.NegativeSpans {
|
||||||
|
req.Timeseries[i].Histograms[j].NegativeSpans = append(req.Timeseries[i].Histograms[j].NegativeSpans, prompb.BucketSpan{
|
||||||
|
Offset: span.Offset,
|
||||||
|
Length: span.Length,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
for _, span := range h.PositiveSpans {
|
||||||
|
req.Timeseries[i].Histograms[j].PositiveSpans = append(req.Timeseries[i].Histograms[j].PositiveSpans, prompb.BucketSpan{
|
||||||
|
Offset: span.Offset,
|
||||||
|
Length: span.Length,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Timeseries[i].Histograms[j].Sum = h.Sum
|
||||||
|
req.Timeseries[i].Histograms[j].Schema = h.Schema
|
||||||
|
req.Timeseries[i].Histograms[j].ZeroThreshold = h.ZeroThreshold
|
||||||
|
req.Timeseries[i].Histograms[j].NegativeDeltas = h.NegativeDeltas
|
||||||
|
req.Timeseries[i].Histograms[j].NegativeCounts = h.NegativeCounts
|
||||||
|
req.Timeseries[i].Histograms[j].PositiveDeltas = h.PositiveDeltas
|
||||||
|
req.Timeseries[i].Histograms[j].PositiveCounts = h.PositiveCounts
|
||||||
|
req.Timeseries[i].Histograms[j].ResetHint = prompb.Histogram_ResetHint(h.ResetHint)
|
||||||
|
req.Timeseries[i].Histograms[j].Timestamp = h.Timestamp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
|
@ -553,7 +553,7 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr
|
||||||
mm = append(mm, prompb.MetricMetadata{
|
mm = append(mm, prompb.MetricMetadata{
|
||||||
MetricFamilyName: entry.Metric,
|
MetricFamilyName: entry.Metric,
|
||||||
Help: entry.Help,
|
Help: entry.Help,
|
||||||
Type: metricTypeToMetricTypeProto(entry.Type),
|
Type: prompb.FromMetadataType(entry.Type),
|
||||||
Unit: entry.Unit,
|
Unit: entry.Unit,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1624,7 +1624,7 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
|
||||||
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
|
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
|
||||||
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
|
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
|
||||||
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
|
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
|
||||||
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
|
pendingData[nPending].Labels = prompb.FromLabels(d.seriesLabels, pendingData[nPending].Labels)
|
||||||
|
|
||||||
switch d.sType {
|
switch d.sType {
|
||||||
case tSample:
|
case tSample:
|
||||||
|
@ -1635,16 +1635,16 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
|
||||||
nPendingSamples++
|
nPendingSamples++
|
||||||
case tExemplar:
|
case tExemplar:
|
||||||
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
|
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
|
||||||
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
|
Labels: prompb.FromLabels(d.exemplarLabels, nil),
|
||||||
Value: d.value,
|
Value: d.value,
|
||||||
Timestamp: d.timestamp,
|
Timestamp: d.timestamp,
|
||||||
})
|
})
|
||||||
nPendingExemplars++
|
nPendingExemplars++
|
||||||
case tHistogram:
|
case tHistogram:
|
||||||
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
|
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, prompb.FromIntHistogram(d.timestamp, d.histogram))
|
||||||
nPendingHistograms++
|
nPendingHistograms++
|
||||||
case tFloatHistogram:
|
case tFloatHistogram:
|
||||||
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
|
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, prompb.FromFloatHistogram(d.timestamp, d.floatHistogram))
|
||||||
nPendingHistograms++
|
nPendingHistograms++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1876,7 +1876,7 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries,
|
||||||
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
|
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
|
||||||
// todo: should we also safeguard against empty metadata here?
|
// todo: should we also safeguard against empty metadata here?
|
||||||
if d.metadata != nil {
|
if d.metadata != nil {
|
||||||
pendingData[nPending].Metadata.Type = metricTypeToMetricTypeProtoV2(d.metadata.Type)
|
pendingData[nPending].Metadata.Type = writev2.FromMetadataType(d.metadata.Type)
|
||||||
pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Help)
|
pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Help)
|
||||||
pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Unit)
|
pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Unit)
|
||||||
nPendingMetadata++
|
nPendingMetadata++
|
||||||
|
@ -1908,10 +1908,10 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries,
|
||||||
})
|
})
|
||||||
nPendingExemplars++
|
nPendingExemplars++
|
||||||
case tHistogram:
|
case tHistogram:
|
||||||
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToV2HistogramProto(d.timestamp, d.histogram))
|
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, writev2.FromIntHistogram(d.timestamp, d.histogram))
|
||||||
nPendingHistograms++
|
nPendingHistograms++
|
||||||
case tFloatHistogram:
|
case tFloatHistogram:
|
||||||
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToV2HistogramProto(d.timestamp, d.floatHistogram))
|
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, writev2.FromFloatHistogram(d.timestamp, d.floatHistogram))
|
||||||
nPendingHistograms++
|
nPendingHistograms++
|
||||||
case tMetadata:
|
case tMetadata:
|
||||||
// TODO: log or return an error?
|
// TODO: log or return an error?
|
||||||
|
|
|
@ -1019,7 +1019,7 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco
|
||||||
for _, s := range ss {
|
for _, s := range ss {
|
||||||
seriesName := getSeriesNameFromRef(series[s.Ref])
|
seriesName := getSeriesNameFromRef(series[s.Ref])
|
||||||
e := prompb.Exemplar{
|
e := prompb.Exemplar{
|
||||||
Labels: labelsToLabelsProto(s.Labels, nil),
|
Labels: prompb.FromLabels(s.Labels, nil),
|
||||||
Timestamp: s.T,
|
Timestamp: s.T,
|
||||||
Value: s.V,
|
Value: s.V,
|
||||||
}
|
}
|
||||||
|
@ -1036,7 +1036,7 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie
|
||||||
|
|
||||||
for _, h := range hh {
|
for _, h := range hh {
|
||||||
seriesName := getSeriesNameFromRef(series[h.Ref])
|
seriesName := getSeriesNameFromRef(series[h.Ref])
|
||||||
c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], HistogramToHistogramProto(h.T, h.H))
|
c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], prompb.FromIntHistogram(h.T, h.H))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1049,7 +1049,7 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa
|
||||||
|
|
||||||
for _, fh := range fhs {
|
for _, fh := range fhs {
|
||||||
seriesName := getSeriesNameFromRef(series[fh.Ref])
|
seriesName := getSeriesNameFromRef(series[fh.Ref])
|
||||||
c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH))
|
c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], prompb.FromFloatHistogram(fh.T, fh.FH))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1133,7 +1133,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
|
||||||
var reqProtoV2 writev2.Request
|
var reqProtoV2 writev2.Request
|
||||||
err = proto.Unmarshal(reqBuf, &reqProtoV2)
|
err = proto.Unmarshal(reqBuf, &reqProtoV2)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
reqProto, err = V2WriteRequestToWriteRequest(&reqProtoV2)
|
reqProto, err = v2RequesToWriteRequest(&reqProtoV2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1144,9 +1144,9 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
|
||||||
return errors.New("invalid request, no timeseries")
|
return errors.New("invalid request, no timeseries")
|
||||||
}
|
}
|
||||||
|
|
||||||
builder := labels.NewScratchBuilder(0)
|
b := labels.NewScratchBuilder(0)
|
||||||
for _, ts := range reqProto.Timeseries {
|
for _, ts := range reqProto.Timeseries {
|
||||||
labels := labelProtosToLabels(&builder, ts.Labels)
|
labels := ts.ToLabels(&b, nil)
|
||||||
seriesName := labels.Get("__name__")
|
seriesName := labels.Get("__name__")
|
||||||
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], ts.Samples...)
|
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], ts.Samples...)
|
||||||
if len(ts.Exemplars) > 0 {
|
if len(ts.Exemplars) > 0 {
|
||||||
|
|
|
@ -124,7 +124,7 @@ func TestSampledReadEndpoint(t *testing.T) {
|
||||||
{Name: "d", Value: "e"},
|
{Name: "d", Value: "e"},
|
||||||
},
|
},
|
||||||
Histograms: []prompb.Histogram{
|
Histograms: []prompb.Histogram{
|
||||||
FloatHistogramToHistogramProto(0, tsdbutil.GenerateTestFloatHistogram(0)),
|
prompb.FromFloatHistogram(0, tsdbutil.GenerateTestFloatHistogram(0)),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
@ -173,12 +173,12 @@ func TestSeriesSetFilter(t *testing.T) {
|
||||||
toRemove: []string{"foo"},
|
toRemove: []string{"foo"},
|
||||||
in: &prompb.QueryResult{
|
in: &prompb.QueryResult{
|
||||||
Timeseries: []*prompb.TimeSeries{
|
Timeseries: []*prompb.TimeSeries{
|
||||||
{Labels: labelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b"), nil)},
|
{Labels: prompb.FromLabels(labels.FromStrings("foo", "bar", "a", "b"), nil)},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expected: &prompb.QueryResult{
|
expected: &prompb.QueryResult{
|
||||||
Timeseries: []*prompb.TimeSeries{
|
Timeseries: []*prompb.TimeSeries{
|
||||||
{Labels: labelsToLabelsProto(labels.FromStrings("a", "b"), nil)},
|
{Labels: prompb.FromLabels(labels.FromStrings("a", "b"), nil)},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -212,7 +212,7 @@ func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prom
|
||||||
|
|
||||||
q := &prompb.QueryResult{}
|
q := &prompb.QueryResult{}
|
||||||
for _, s := range c.store {
|
for _, s := range c.store {
|
||||||
l := labelProtosToLabels(&c.b, s.Labels)
|
l := s.ToLabels(&c.b, nil)
|
||||||
var notMatch bool
|
var notMatch bool
|
||||||
|
|
||||||
for _, m := range matchers {
|
for _, m := range matchers {
|
||||||
|
|
|
@ -218,7 +218,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
|
||||||
|
|
||||||
b := labels.NewScratchBuilder(0)
|
b := labels.NewScratchBuilder(0)
|
||||||
for _, ts := range req.Timeseries {
|
for _, ts := range req.Timeseries {
|
||||||
ls := labelProtosToLabels(&b, ts.Labels)
|
ls := ts.ToLabels(&b, nil)
|
||||||
if !ls.IsValid() {
|
if !ls.IsValid() {
|
||||||
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", ls.String())
|
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", ls.String())
|
||||||
samplesWithInvalidLabels++
|
samplesWithInvalidLabels++
|
||||||
|
@ -231,7 +231,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ep := range ts.Exemplars {
|
for _, ep := range ts.Exemplars {
|
||||||
e := exemplarProtoToExemplar(&b, ep)
|
e := ep.ToExemplar(&b, nil)
|
||||||
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
|
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,8 +263,9 @@ func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (err e
|
||||||
err = app.Commit()
|
err = app.Commit()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
b := labels.NewScratchBuilder(0)
|
||||||
for _, ts := range req.Timeseries {
|
for _, ts := range req.Timeseries {
|
||||||
ls := writev2.DesymbolizeLabels(ts.LabelsRefs, req.Symbols)
|
ls := ts.ToLabels(&b, req.Symbols)
|
||||||
|
|
||||||
err := h.appendSamplesV2(app, ts.Samples, ls)
|
err := h.appendSamplesV2(app, ts.Samples, ls)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -272,7 +273,7 @@ func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (err e
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ep := range ts.Exemplars {
|
for _, ep := range ts.Exemplars {
|
||||||
e := exemplarProtoV2ToExemplar(ep, req.Symbols)
|
e := ep.ToExemplar(&b, req.Symbols)
|
||||||
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
|
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,7 +282,7 @@ func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (err e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
m := metadataProtoV2ToMetadata(ts.Metadata, req.Symbols)
|
m := ts.ToMetadata(req.Symbols)
|
||||||
if _, err = app.UpdateMetadata(0, ls, m); err != nil {
|
if _, err = app.UpdateMetadata(0, ls, m); err != nil {
|
||||||
level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err)
|
level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err)
|
||||||
}
|
}
|
||||||
|
@ -345,11 +346,9 @@ func (h *writeHandler) appendHistograms(app storage.Appender, hh []prompb.Histog
|
||||||
var err error
|
var err error
|
||||||
for _, hp := range hh {
|
for _, hp := range hh {
|
||||||
if hp.IsFloatHistogram() {
|
if hp.IsFloatHistogram() {
|
||||||
fhs := FloatHistogramProtoToFloatHistogram(hp)
|
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, hp.ToFloatHistogram())
|
||||||
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
|
|
||||||
} else {
|
} else {
|
||||||
hs := HistogramProtoToHistogram(hp)
|
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hp.ToIntHistogram(), nil)
|
||||||
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
unwrappedErr := errors.Unwrap(err)
|
unwrappedErr := errors.Unwrap(err)
|
||||||
|
@ -371,11 +370,9 @@ func (h *writeHandler) appendHistogramsV2(app storage.Appender, hh []writev2.His
|
||||||
var err error
|
var err error
|
||||||
for _, hp := range hh {
|
for _, hp := range hh {
|
||||||
if hp.IsFloatHistogram() {
|
if hp.IsFloatHistogram() {
|
||||||
fhs := FloatV2HistogramProtoToFloatHistogram(hp)
|
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, hp.ToFloatHistogram())
|
||||||
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
|
|
||||||
} else {
|
} else {
|
||||||
hs := V2HistogramProtoToHistogram(hp)
|
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hp.ToIntHistogram(), nil)
|
||||||
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
unwrappedErr := errors.Unwrap(err)
|
unwrappedErr := errors.Unwrap(err)
|
||||||
|
|
|
@ -266,24 +266,22 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) {
|
||||||
j := 0
|
j := 0
|
||||||
k := 0
|
k := 0
|
||||||
for _, ts := range writeRequestFixture.Timeseries {
|
for _, ts := range writeRequestFixture.Timeseries {
|
||||||
labels := labelProtosToLabels(&b, ts.Labels)
|
labels := ts.ToLabels(&b, nil)
|
||||||
for _, s := range ts.Samples {
|
for _, s := range ts.Samples {
|
||||||
requireEqual(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
|
requireEqual(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, e := range ts.Exemplars {
|
for _, e := range ts.Exemplars {
|
||||||
exemplarLabels := labelProtosToLabels(&b, e.Labels)
|
exemplarLabels := e.ToExemplar(&b, nil).Labels
|
||||||
requireEqual(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
|
requireEqual(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
|
||||||
j++
|
j++
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, hp := range ts.Histograms {
|
for _, hp := range ts.Histograms {
|
||||||
if hp.IsFloatHistogram() {
|
if hp.IsFloatHistogram() {
|
||||||
fh := FloatHistogramProtoToFloatHistogram(hp)
|
fh := hp.ToFloatHistogram()
|
||||||
requireEqual(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
|
requireEqual(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
|
||||||
} else {
|
} else {
|
||||||
h := HistogramProtoToHistogram(hp)
|
h := hp.ToIntHistogram()
|
||||||
requireEqual(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
|
requireEqual(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,30 +310,62 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
|
||||||
resp := recorder.Result()
|
resp := recorder.Result()
|
||||||
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
||||||
|
|
||||||
|
b := labels.NewScratchBuilder(0)
|
||||||
i := 0
|
i := 0
|
||||||
j := 0
|
j := 0
|
||||||
k := 0
|
k := 0
|
||||||
// the reduced write request is equivalent to the write request fixture.
|
|
||||||
// we can use it for
|
|
||||||
for _, ts := range writeV2RequestFixture.Timeseries {
|
for _, ts := range writeV2RequestFixture.Timeseries {
|
||||||
ls := writev2.DesymbolizeLabels(ts.LabelsRefs, writeV2RequestFixture.Symbols)
|
ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols)
|
||||||
for _, s := range ts.Samples {
|
for _, s := range ts.Samples {
|
||||||
require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
|
// NOTE(bwplotka): In this case I got errors due to correct labels.Labels.String(), but with dedupelabels tag
|
||||||
|
// require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) fails.
|
||||||
|
/*
|
||||||
|
Error Trace: /Users/bwplotka/Repos/prometheus/storage/remote/write_handler_test.go:323
|
||||||
|
Error: Not equal:
|
||||||
|
expected: labels.Labels{syms:(*labels.nameTable)(0x1400095afc0), data:"\x00\x01\x02\x03\x04\x05\x06\a\b\t"}
|
||||||
|
actual : labels.Labels{syms:(*labels.nameTable)(0x1400095b080), data:"\x00\x01\x02\x03\x04\x05\x06\a\b\t"}
|
||||||
|
|
||||||
|
Diff:
|
||||||
|
--- Expected
|
||||||
|
+++ Actual
|
||||||
|
@@ -13,4 +13,4 @@
|
||||||
|
(string) (len=3) "bar",
|
||||||
|
- (string) (len=1) "f",
|
||||||
|
- (string) (len=1) "g",
|
||||||
|
+ (string) "",
|
||||||
|
+ (string) "",
|
||||||
|
(string) "",
|
||||||
|
@@ -1034,4 +1034,4 @@
|
||||||
|
nameTable: (*labels.nameTable)(<already shown>),
|
||||||
|
- nextNum: (int) 12,
|
||||||
|
- byName: (map[string]int) (len=12) {
|
||||||
|
+ nextNum: (int) 10,
|
||||||
|
+ byName: (map[string]int) (len=10) {
|
||||||
|
(string) (len=8) "__name__": (int) 0,
|
||||||
|
@@ -1043,5 +1043,3 @@
|
||||||
|
(string) (len=1) "e": (int) 7,
|
||||||
|
- (string) (len=1) "f": (int) 10,
|
||||||
|
(string) (len=3) "foo": (int) 8,
|
||||||
|
- (string) (len=1) "g": (int) 11,
|
||||||
|
(string) (len=3) "qux": (int) 5,
|
||||||
|
*/
|
||||||
|
// TODO(bwplotka): Investigate why, is it the way we do fixtures?
|
||||||
|
require.Equal(t, appendable.samples[i].l.String(), ls.String())
|
||||||
|
require.Equal(t, s.Value, appendable.samples[i].v)
|
||||||
|
require.Equal(t, s.Timestamp, appendable.samples[i].t)
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, e := range ts.Exemplars {
|
for _, e := range ts.Exemplars {
|
||||||
exemplarLabels := writev2.DesymbolizeLabels(e.LabelsRefs, writeV2RequestFixture.Symbols)
|
exemplarLabels := e.ToExemplar(&b, writeV2RequestFixture.Symbols).Labels
|
||||||
require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
|
require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
|
||||||
j++
|
j++
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, hp := range ts.Histograms {
|
for _, hp := range ts.Histograms {
|
||||||
if hp.IsFloatHistogram() {
|
if hp.IsFloatHistogram() {
|
||||||
fh := FloatHistogramProtoV2ToFloatHistogram(hp)
|
fh := hp.ToFloatHistogram()
|
||||||
require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
|
require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
|
||||||
} else {
|
} else {
|
||||||
h := HistogramProtoV2ToHistogram(hp)
|
h := hp.ToIntHistogram()
|
||||||
require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
|
require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -442,7 +472,7 @@ func TestOutOfOrderExemplar_V2Message(t *testing.T) {
|
||||||
func TestOutOfOrderHistogram_V1Message(t *testing.T) {
|
func TestOutOfOrderHistogram_V1Message(t *testing.T) {
|
||||||
payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
|
payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
|
||||||
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||||
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))},
|
Histograms: []prompb.Histogram{prompb.FromIntHistogram(0, &testHistogram), prompb.FromFloatHistogram(1, testHistogram.ToFloat(nil))},
|
||||||
}}, nil, nil, nil, nil, "snappy")
|
}}, nil, nil, nil, nil, "snappy")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -462,7 +492,7 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) {
|
||||||
func TestOutOfOrderHistogram_V2Message(t *testing.T) {
|
func TestOutOfOrderHistogram_V2Message(t *testing.T) {
|
||||||
payload, _, _, err := buildV2WriteRequest(nil, []writev2.TimeSeries{{
|
payload, _, _, err := buildV2WriteRequest(nil, []writev2.TimeSeries{{
|
||||||
LabelsRefs: []uint32{0, 1},
|
LabelsRefs: []uint32{0, 1},
|
||||||
Histograms: []writev2.Histogram{HistogramToV2HistogramProto(0, &testHistogram), FloatHistogramToV2HistogramProto(1, testHistogram.ToFloat(nil))},
|
Histograms: []writev2.Histogram{writev2.FromIntHistogram(0, &testHistogram), writev2.FromFloatHistogram(1, testHistogram.ToFloat(nil))},
|
||||||
}}, []string{"__name__", "metric1"}, nil, nil, nil, "snappy") // TODO(bwplotka): No empty string!
|
}}, []string{"__name__", "metric1"}, nil, nil, nil, "snappy") // TODO(bwplotka): No empty string!
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -493,7 +523,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) {
|
||||||
{Name: "__name__", Value: "test_metric"},
|
{Name: "__name__", Value: "test_metric"},
|
||||||
{Name: "test_label_name_" + num, Value: labelValue + num},
|
{Name: "test_label_name_" + num, Value: labelValue + num},
|
||||||
},
|
},
|
||||||
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
|
Histograms: []prompb.Histogram{prompb.FromIntHistogram(0, &testHistogram)},
|
||||||
}}, nil, nil, nil, nil, "snappy")
|
}}, nil, nil, nil, nil, "snappy")
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||||
|
|
Loading…
Reference in a new issue