[PRW 2.0] (part3) moved type specific conversions to prompb and writev2 codecs.

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2024-06-24 09:14:02 +01:00
parent aa6f38cccd
commit 0dd176b202
15 changed files with 781 additions and 790 deletions

201
prompb/codec.go Normal file
View file

@ -0,0 +1,201 @@
// Copyright 2024 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prompb
import (
"strings"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
)
// NOTE(bwplotka): This file's code is tested in /prompb/rwcommon.
// ToLabels return model labels.Labels from timeseries' remote labels.
func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, _ []string) labels.Labels {
return labelProtosToLabels(b, m.GetLabels())
}
// ToLabels return model labels.Labels from timeseries' remote labels.
func (m ChunkedSeries) ToLabels(b *labels.ScratchBuilder, _ []string) labels.Labels {
return labelProtosToLabels(b, m.GetLabels())
}
func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []Label) labels.Labels {
b.Reset()
for _, l := range labelPairs {
b.Add(l.Name, l.Value)
}
b.Sort()
return b.Labels()
}
// FromLabels transforms labels into prompb labels. The buffer slice
// will be used to avoid allocations if it is big enough to store the labels.
func FromLabels(lbls labels.Labels, buf []Label) []Label {
result := buf[:0]
lbls.Range(func(l labels.Label) {
result = append(result, Label{
Name: l.Name,
Value: l.Value,
})
})
return result
}
// FromMetadataType transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum.
func FromMetadataType(t model.MetricType) MetricMetadata_MetricType {
mt := strings.ToUpper(string(t))
v, ok := MetricMetadata_MetricType_value[mt]
if !ok {
return MetricMetadata_UNKNOWN
}
return MetricMetadata_MetricType(v)
}
// IsFloatHistogram returns true if the histogram is float.
func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}
// ToIntHistogram returns integer Prometheus histogram from the remote implementation
// of integer histogram. If it's a float histogram, the method returns nil.
func (h Histogram) ToIntHistogram() *histogram.Histogram {
if h.IsFloatHistogram() {
return nil
}
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: h.GetZeroCountInt(),
Count: h.GetCountInt(),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: h.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: h.GetNegativeDeltas(),
}
}
// ToFloatHistogram returns float Prometheus histogram from the remote implementation
// of float histogram. If the underlying implementation is an integer histogram, a
// conversion is performed.
func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram {
if h.IsFloatHistogram() {
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: h.GetZeroCountFloat(),
Count: h.GetCountFloat(),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: h.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: h.GetNegativeCounts(),
}
}
// Conversion from integer histogram.
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: float64(h.GetZeroCountInt()),
Count: float64(h.GetCountInt()),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: deltasToCounts(h.GetPositiveDeltas()),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: deltasToCounts(h.GetNegativeDeltas()),
}
}
func spansProtoToSpans(s []BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func deltasToCounts(deltas []int64) []float64 {
counts := make([]float64, len(deltas))
var cur float64
for i, d := range deltas {
cur += float64(d)
counts[i] = cur
}
return counts
}
// FromIntHistogram returns remote Histogram from the integer Histogram.
func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram {
return Histogram{
Count: &Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: Histogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp,
}
}
// FromFloatHistogram returns remote Histogram from the float Histogram.
func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}
func spansToSpansProto(s []histogram.Span) []BucketSpan {
spans := make([]BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
// ToExemplar converts remote exemplar to model exemplar.
func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, _ []string) exemplar.Exemplar {
timestamp := m.Timestamp
return exemplar.Exemplar{
Labels: labelProtosToLabels(b, m.GetLabels()),
Value: m.Value,
Ts: timestamp,
HasTs: timestamp != 0,
}
}

View file

@ -17,14 +17,6 @@ import (
"sync"
)
func (m Sample) T() int64 { return m.Timestamp }
func (m Sample) V() float64 { return m.Value }
func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}
func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
size := r.Size()
data, ok := p.Get().(*[]byte)

View file

@ -0,0 +1,188 @@
// Copyright 2024 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package writev2
import (
"strings"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
)
// NOTE(bwplotka): This file's code is tested in /prompb/rwcommon.
// ToLabels return model labels.Labels from timeseries' remote labels.
func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, symbols []string) labels.Labels {
return desymbolizeLabels(b, m.GetLabelsRefs(), symbols)
}
// ToMetadata return model metadata from timeseries' remote metadata.
func (m TimeSeries) ToMetadata(symbols []string) metadata.Metadata {
mt := strings.ToLower(m.Metadata.Type.String())
return metadata.Metadata{
Type: model.MetricType(mt), // TODO(@tpaschalis) a better way for this?
Unit: symbols[m.Metadata.UnitRef],
Help: symbols[m.Metadata.HelpRef],
}
}
// FromMetadataType transforms a Prometheus metricType into writev2 metricType. Since the former is a string we need to transform it to an enum.
func FromMetadataType(t model.MetricType) Metadata_MetricType {
mt := strings.ToUpper(string(t))
v, ok := prompb.MetricMetadata_MetricType_value[mt]
if !ok {
return Metadata_METRIC_TYPE_UNSPECIFIED
}
return Metadata_MetricType(v)
}
// IsFloatHistogram returns true if the histogram is float.
func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}
// ToIntHistogram returns integer Prometheus histogram from the remote implementation
// of integer histogram. If it's a float histogram, the method returns nil.
// TODO(bwplotka): Add support for incoming NHCB.
func (h Histogram) ToIntHistogram() *histogram.Histogram {
if h.IsFloatHistogram() {
return nil
}
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: h.GetZeroCountInt(),
Count: h.GetCountInt(),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: h.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: h.GetNegativeDeltas(),
}
}
// ToFloatHistogram returns float Prometheus histogram from the remote implementation
// of float histogram. If the underlying implementation is an integer histogram, a
// conversion is performed.
// TODO(bwplotka): Add support for incoming NHCB.
func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram {
if h.IsFloatHistogram() {
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: h.GetZeroCountFloat(),
Count: h.GetCountFloat(),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: h.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: h.GetNegativeCounts(),
}
}
// Conversion from integer histogram.
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: float64(h.GetZeroCountInt()),
Count: float64(h.GetCountInt()),
Sum: h.Sum,
PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()),
PositiveBuckets: deltasToCounts(h.GetPositiveDeltas()),
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
NegativeBuckets: deltasToCounts(h.GetNegativeDeltas()),
}
}
func spansProtoToSpans(s []BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func deltasToCounts(deltas []int64) []float64 {
counts := make([]float64, len(deltas))
var cur float64
for i, d := range deltas {
cur += float64(d)
counts[i] = cur
}
return counts
}
// FromIntHistogram returns remote Histogram from the integer Histogram.
func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram {
return Histogram{
Count: &Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: Histogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp,
}
}
// FromFloatHistogram returns remote Histogram from the float Histogram.
func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}
func spansToSpansProto(s []histogram.Span) []BucketSpan {
spans := make([]BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, symbols []string) exemplar.Exemplar {
timestamp := m.Timestamp
return exemplar.Exemplar{
Labels: desymbolizeLabels(b, m.LabelsRefs, symbols),
Value: m.Value,
Ts: timestamp,
HasTs: timestamp != 0,
}
}

View file

@ -20,11 +20,6 @@ import (
func (m Sample) T() int64 { return m.Timestamp }
func (m Sample) V() float64 { return m.Value }
func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}
func (m *Request) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.Size()
if cap(dst) < siz {

View file

@ -72,9 +72,9 @@ func (t *SymbolsTable) Reset() {
}
}
// DesymbolizeLabels decodes label references, with given symbols to labels.
func DesymbolizeLabels(labelRefs []uint32, symbols []string) labels.Labels {
b := labels.NewScratchBuilder(len(labelRefs))
// desymbolizeLabels decodes label references, with given symbols to labels.
func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []string) labels.Labels {
b.Reset()
for i := 0; i < len(labelRefs); i += 2 {
b.Add(symbols[labelRefs[i]], symbols[labelRefs[i+1]])
}

View file

@ -49,7 +49,8 @@ func TestSymbolsTable(t *testing.T) {
ls := labels.FromStrings("__name__", "qwer", "zxcv", "1234")
encoded := s.SymbolizeLabels(ls, nil)
require.Equal(t, []uint32{1, 3, 4, 5}, encoded)
decoded := DesymbolizeLabels(encoded, s.Symbols())
b := labels.NewScratchBuilder(len(encoded))
decoded := desymbolizeLabels(&b, encoded, s.Symbols())
require.Equal(t, ls, decoded)
// Different buf.

View file

@ -0,0 +1,232 @@
// Copyright 2024 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rwcommon
import (
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
)
func TestToLabels(t *testing.T) {
expected := labels.FromStrings("__name__", "metric1", "foo", "bar")
t.Run("v1", func(t *testing.T) {
ts := prompb.TimeSeries{Labels: []prompb.Label{{Name: "__name__", Value: "metric1"}, {Name: "foo", Value: "bar"}}}
b := labels.NewScratchBuilder(2)
require.Equal(t, expected, ts.ToLabels(&b, nil))
require.Equal(t, ts.Labels, prompb.FromLabels(expected, nil))
require.Equal(t, ts.Labels, prompb.FromLabels(expected, ts.Labels))
})
t.Run("v2", func(t *testing.T) {
v2Symbols := []string{"", "__name__", "metric1", "foo", "bar"}
ts := writev2.TimeSeries{LabelsRefs: []uint32{1, 2, 3, 4}}
b := labels.NewScratchBuilder(2)
require.Equal(t, expected, ts.ToLabels(&b, v2Symbols))
// No need for FromLabels in our prod code as we use symbol table to do so.
})
}
func TestFromMetadataType(t *testing.T) {
for _, tc := range []struct {
desc string
input model.MetricType
expectedV1 prompb.MetricMetadata_MetricType
expectedV2 writev2.Metadata_MetricType
}{
{
desc: "with a single-word metric",
input: model.MetricTypeCounter,
expectedV1: prompb.MetricMetadata_COUNTER,
expectedV2: writev2.Metadata_METRIC_TYPE_COUNTER,
},
{
desc: "with a two-word metric",
input: model.MetricTypeStateset,
expectedV1: prompb.MetricMetadata_STATESET,
expectedV2: writev2.Metadata_METRIC_TYPE_STATESET,
},
{
desc: "with an unknown metric",
input: "not-known",
expectedV1: prompb.MetricMetadata_UNKNOWN,
expectedV2: writev2.Metadata_METRIC_TYPE_UNSPECIFIED,
},
} {
t.Run(tc.desc, func(t *testing.T) {
t.Run("v1", func(t *testing.T) {
require.Equal(t, tc.expectedV1, prompb.FromMetadataType(tc.input))
})
t.Run("v2", func(t *testing.T) {
require.Equal(t, tc.expectedV2, writev2.FromMetadataType(tc.input))
})
})
}
}
func TestToHistogram_Empty(t *testing.T) {
t.Run("v1", func(t *testing.T) {
require.NotNilf(t, prompb.Histogram{}.ToIntHistogram(), "")
require.NotNilf(t, prompb.Histogram{}.ToFloatHistogram(), "")
})
t.Run("v2", func(t *testing.T) {
require.NotNilf(t, writev2.Histogram{}.ToIntHistogram(), "")
require.NotNilf(t, writev2.Histogram{}.ToFloatHistogram(), "")
})
}
func testIntHistogram() histogram.Histogram {
return histogram.Histogram{
CounterResetHint: histogram.GaugeType,
Schema: 0,
Count: 19,
Sum: 2.7,
ZeroThreshold: 1e-128,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 5},
{Offset: 1, Length: 0},
{Offset: 0, Length: 1},
},
NegativeBuckets: []int64{1, 2, -2, 1, -1, 0},
}
}
func testFloatHistogram() histogram.FloatHistogram {
return histogram.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Schema: 0,
Count: 19,
Sum: 2.7,
ZeroThreshold: 1e-128,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
},
PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 5},
{Offset: 1, Length: 0},
{Offset: 0, Length: 1},
},
NegativeBuckets: []float64{1, 3, 1, 2, 1, 1},
}
}
func TestFromIntToFloatOrIntHistogram(t *testing.T) {
testIntHist := testIntHistogram()
testFloatHist := testFloatHistogram()
t.Run("v1", func(t *testing.T) {
h := prompb.FromIntHistogram(123, testIntHist.Copy())
require.False(t, h.IsFloatHistogram())
require.Equal(t, int64(123), h.Timestamp)
require.Equal(t, testIntHist, *h.ToIntHistogram())
require.Equal(t, testFloatHist, *h.ToFloatHistogram())
})
t.Run("v2", func(t *testing.T) {
h := writev2.FromIntHistogram(123, testIntHist.Copy())
require.False(t, h.IsFloatHistogram())
require.Equal(t, int64(123), h.Timestamp)
require.Equal(t, testIntHist, *h.ToIntHistogram())
require.Equal(t, testFloatHist, *h.ToFloatHistogram())
})
}
func TestFromFloatToFloatHistogram(t *testing.T) {
testFloatHist := testFloatHistogram()
t.Run("v1", func(t *testing.T) {
h := prompb.FromFloatHistogram(123, testFloatHist.Copy())
require.True(t, h.IsFloatHistogram())
require.Equal(t, int64(123), h.Timestamp)
require.Nil(t, h.ToIntHistogram())
require.Equal(t, testFloatHist, *h.ToFloatHistogram())
})
t.Run("v2", func(t *testing.T) {
h := writev2.FromFloatHistogram(123, testFloatHist.Copy())
require.True(t, h.IsFloatHistogram())
require.Equal(t, int64(123), h.Timestamp)
require.Nil(t, h.ToIntHistogram())
require.Equal(t, testFloatHist, *h.ToFloatHistogram())
})
}
func TestFromIntOrFloatHistogram_ResetHint(t *testing.T) {
for _, tc := range []struct {
input histogram.CounterResetHint
expectedV1 prompb.Histogram_ResetHint
expectedV2 writev2.Histogram_ResetHint
}{
{
input: histogram.UnknownCounterReset,
expectedV1: prompb.Histogram_UNKNOWN,
expectedV2: writev2.Histogram_RESET_HINT_UNSPECIFIED,
},
{
input: histogram.CounterReset,
expectedV1: prompb.Histogram_YES,
expectedV2: writev2.Histogram_RESET_HINT_YES,
},
{
input: histogram.NotCounterReset,
expectedV1: prompb.Histogram_NO,
expectedV2: writev2.Histogram_RESET_HINT_NO,
},
{
input: histogram.GaugeType,
expectedV1: prompb.Histogram_GAUGE,
expectedV2: writev2.Histogram_RESET_HINT_GAUGE,
},
} {
t.Run("", func(t *testing.T) {
t.Run("v1", func(t *testing.T) {
h := testIntHistogram()
h.CounterResetHint = tc.input
got := prompb.FromIntHistogram(1337, &h)
require.Equal(t, tc.expectedV1, got.GetResetHint())
fh := testFloatHistogram()
fh.CounterResetHint = tc.input
got2 := prompb.FromFloatHistogram(1337, &fh)
require.Equal(t, tc.expectedV1, got2.GetResetHint())
})
t.Run("v2", func(t *testing.T) {
h := testIntHistogram()
h.CounterResetHint = tc.input
got := writev2.FromIntHistogram(1337, &h)
require.Equal(t, tc.expectedV2, got.GetResetHint())
fh := testFloatHistogram()
fh.CounterResetHint = tc.input
got2 := writev2.FromFloatHistogram(1337, &fh)
require.Equal(t, tc.expectedV2, got2.GetResetHint())
})
})
}
}

View file

@ -22,7 +22,6 @@ import (
"net/http"
"slices"
"sort"
"strings"
"sync"
"github.com/gogo/protobuf/proto"
@ -30,10 +29,8 @@ import (
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
@ -155,10 +152,10 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
})
case chunkenc.ValHistogram:
ts, h := iter.AtHistogram(nil)
histograms = append(histograms, HistogramToHistogramProto(ts, h))
histograms = append(histograms, prompb.FromIntHistogram(ts, h))
case chunkenc.ValFloatHistogram:
ts, fh := iter.AtFloatHistogram(nil)
histograms = append(histograms, FloatHistogramToHistogramProto(ts, fh))
histograms = append(histograms, prompb.FromFloatHistogram(ts, fh))
default:
return nil, ss.Warnings(), fmt.Errorf("unrecognized value type: %s", valType)
}
@ -168,7 +165,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
}
resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{
Labels: LabelsToLabelsProto(series.Labels(), nil),
Labels: prompb.FromLabels(series.Labels(), nil),
Samples: samples,
Histograms: histograms,
})
@ -184,7 +181,7 @@ func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet
if err := validateLabelsAndMetricName(ts.Labels); err != nil {
return errSeriesSet{err: err}
}
lbls := LabelProtosToLabels(&b, ts.Labels)
lbls := ts.ToLabels(&b, nil)
series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms})
}
@ -237,7 +234,7 @@ func StreamChunkedReadResponses(
for ss.Next() {
series := ss.At()
iter = series.Iterator(iter)
lbls = MergeLabels(LabelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels)
lbls = MergeLabels(prompb.FromLabels(series.Labels(), lbls), sortedExternalLabels)
maxDataLength := maxBytesInFrame
for _, lbl := range lbls {
@ -483,21 +480,16 @@ func (c *concreteSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *hist
panic("iterator is not on an integer histogram sample")
}
h := c.series.histograms[c.histogramsCur]
return h.Timestamp, HistogramProtoToHistogram(h)
return h.Timestamp, h.ToIntHistogram()
}
// AtFloatHistogram implements chunkenc.Iterator.
func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
switch c.curValType {
case chunkenc.ValHistogram:
if c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram {
fh := c.series.histograms[c.histogramsCur]
return fh.Timestamp, HistogramProtoToFloatHistogram(fh)
case chunkenc.ValFloatHistogram:
fh := c.series.histograms[c.histogramsCur]
return fh.Timestamp, FloatHistogramProtoToFloatHistogram(fh)
default:
panic("iterator is not on a histogram sample")
return fh.Timestamp, fh.ToFloatHistogram() // integer will be auto-converted.
}
panic("iterator is not on a histogram sample")
}
// AtT implements chunkenc.Iterator.
@ -620,292 +612,6 @@ func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro
return result, nil
}
func exemplarProtoToExemplar(b *labels.ScratchBuilder, ep prompb.Exemplar) exemplar.Exemplar {
timestamp := ep.Timestamp
return exemplar.Exemplar{
Labels: LabelProtosToLabels(b, ep.Labels),
Value: ep.Value,
Ts: timestamp,
HasTs: timestamp != 0,
}
}
func exemplarProtoV2ToExemplar(ep writev2.Exemplar, symbols []string) exemplar.Exemplar {
timestamp := ep.Timestamp
return exemplar.Exemplar{
Labels: writev2.DesymbolizeLabels(ep.LabelsRefs, symbols),
Value: ep.Value,
Ts: timestamp,
HasTs: timestamp != 0,
}
}
func metadataProtoV2ToMetadata(mp writev2.Metadata, symbols []string) metadata.Metadata {
return metadata.Metadata{
Type: metricTypeFromProtoV2Equivalent(mp.Type),
Unit: symbols[mp.UnitRef],
Help: symbols[mp.HelpRef],
}
}
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics.
func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram")
}
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}
// HistogramProtoV2ToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics.
func HistogramProtoV2ToHistogram(hp writev2.Histogram) *histogram.Histogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram")
}
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}
// FloatHistogramProtoToFloatHistogram extracts a float Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents a float histogram and not an integer histogram,
// or it panics.
func FloatHistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}
// FloatHistogramProtoV2ToFloatHistogram extracts a float Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents a float histogram and not an integer histogram,
// or it panics.
func FloatHistogramProtoV2ToFloatHistogram(hp writev2.Histogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}
// HistogramProtoToFloatHistogram extracts and converts a (normal integer) histogram from the provided proto message
// to a float histogram. The caller has to make sure that the proto message represents an integer histogram and not a
// float histogram, or it panics.
func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToFloatHistogram called with a float histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: float64(hp.GetZeroCountInt()),
Count: float64(hp.GetCountInt()),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: deltasToCounts(hp.GetPositiveDeltas()),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: deltasToCounts(hp.GetNegativeDeltas()),
}
}
func FloatV2HistogramProtoToFloatHistogram(hp writev2.Histogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}
// V2HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics.
func V2HistogramProtoToHistogram(hp writev2.Histogram) *histogram.Histogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram")
}
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}
func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func spansProtoV2ToSpans(s []writev2.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func deltasToCounts(deltas []int64) []float64 {
counts := make([]float64, len(deltas))
var cur float64
for i, d := range deltas {
cur += float64(d)
counts[i] = cur
}
return counts
}
func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: prompb.Histogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp,
}
}
func HistogramToV2HistogramProto(timestamp int64, h *histogram.Histogram) writev2.Histogram {
return writev2.Histogram{
Count: &writev2.Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &writev2.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToV2SpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToV2SpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: writev2.Histogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp,
}
}
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: prompb.Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}
func FloatHistogramToV2HistogramProto(timestamp int64, fh *histogram.FloatHistogram) writev2.Histogram {
return writev2.Histogram{
Count: &writev2.Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &writev2.Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToV2SpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToV2SpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: writev2.Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}
func spansToSpansProto(s []histogram.Span) []prompb.BucketSpan {
spans := make([]prompb.BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = prompb.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func spansToV2SpansProto(s []histogram.Span) []writev2.BucketSpan {
spans := make([]writev2.BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = writev2.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric.
func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
metric := make(model.Metric, len(labelPairs))
@ -915,57 +621,6 @@ func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
return metric
}
// LabelProtosToLabels transforms prompb labels into labels. The labels builder
// will be used to build the returned labels.
func LabelProtosToLabels(b *labels.ScratchBuilder, labelPairs []prompb.Label) labels.Labels {
b.Reset()
for _, l := range labelPairs {
b.Add(l.Name, l.Value)
}
b.Sort()
return b.Labels()
}
// LabelsToLabelsProto transforms labels into prompb labels. The buffer slice
// will be used to avoid allocations if it is big enough to store the labels.
func LabelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label {
result := buf[:0]
lbls.Range(func(l labels.Label) {
result = append(result, prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
return result
}
// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum.
func metricTypeToMetricTypeProto(t model.MetricType) prompb.MetricMetadata_MetricType {
mt := strings.ToUpper(string(t))
v, ok := prompb.MetricMetadata_MetricType_value[mt]
if !ok {
return prompb.MetricMetadata_UNKNOWN
}
return prompb.MetricMetadata_MetricType(v)
}
// metricTypeToMetricTypeProtoV2 transforms a Prometheus metricType into writev2 metricType. Since the former is a string we need to transform it to an enum.
func metricTypeToMetricTypeProtoV2(t model.MetricType) writev2.Metadata_MetricType {
mt := strings.ToUpper(string(t))
v, ok := prompb.MetricMetadata_MetricType_value[mt]
if !ok {
return writev2.Metadata_METRIC_TYPE_UNSPECIFIED
}
return writev2.Metadata_MetricType(v)
}
func metricTypeFromProtoV2Equivalent(t writev2.Metadata_MetricType) model.MetricType {
mt := strings.ToLower(t.String())
return model.MetricType(mt) // TODO(@tpaschalis) a better way for this?
}
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression.
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
@ -1058,74 +713,3 @@ func DecodeV2WriteRequestStr(r io.Reader) (*writev2.Request, error) {
return &req, nil
}
func V2WriteRequestToWriteRequest(redReq *writev2.Request) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)),
// TODO handle metadata?
}
for i, rts := range redReq.Timeseries {
writev2.DesymbolizeLabels(rts.LabelsRefs, redReq.Symbols).Range(func(l labels.Label) {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
exemplars := make([]prompb.Exemplar, len(rts.Exemplars))
for j, e := range rts.Exemplars {
exemplars[j].Value = e.Value
exemplars[j].Timestamp = e.Timestamp
writev2.DesymbolizeLabels(e.LabelsRefs, redReq.Symbols).Range(func(l labels.Label) {
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
}
req.Timeseries[i].Exemplars = exemplars
req.Timeseries[i].Samples = make([]prompb.Sample, len(rts.Samples))
for j, s := range rts.Samples {
req.Timeseries[i].Samples[j].Timestamp = s.Timestamp
req.Timeseries[i].Samples[j].Value = s.Value
}
req.Timeseries[i].Histograms = make([]prompb.Histogram, len(rts.Histograms))
for j, h := range rts.Histograms {
// TODO: double check
if h.IsFloatHistogram() {
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountFloat{CountFloat: h.GetCountFloat()}
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: h.GetZeroCountFloat()}
} else {
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountInt{CountInt: h.GetCountInt()}
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}
}
for _, span := range h.NegativeSpans {
req.Timeseries[i].Histograms[j].NegativeSpans = append(req.Timeseries[i].Histograms[j].NegativeSpans, prompb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
})
}
for _, span := range h.PositiveSpans {
req.Timeseries[i].Histograms[j].PositiveSpans = append(req.Timeseries[i].Histograms[j].PositiveSpans, prompb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
})
}
req.Timeseries[i].Histograms[j].Sum = h.Sum
req.Timeseries[i].Histograms[j].Schema = h.Schema
req.Timeseries[i].Histograms[j].ZeroThreshold = h.ZeroThreshold
req.Timeseries[i].Histograms[j].NegativeDeltas = h.NegativeDeltas
req.Timeseries[i].Histograms[j].NegativeCounts = h.NegativeCounts
req.Timeseries[i].Histograms[j].PositiveDeltas = h.PositiveDeltas
req.Timeseries[i].Histograms[j].PositiveCounts = h.PositiveCounts
req.Timeseries[i].Histograms[j].ResetHint = prompb.Histogram_ResetHint(h.ResetHint)
req.Timeseries[i].Histograms[j].Timestamp = h.Timestamp
}
}
return req, nil
}

View file

@ -21,7 +21,6 @@ import (
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
@ -59,7 +58,7 @@ var writeRequestFixture = &prompb.WriteRequest{
},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))},
Histograms: []prompb.Histogram{prompb.FromIntHistogram(0, &testHistogram), prompb.FromFloatHistogram(1, testHistogram.ToFloat(nil))},
},
{
Labels: []prompb.Label{
@ -71,7 +70,7 @@ var writeRequestFixture = &prompb.WriteRequest{
},
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat(nil))},
Histograms: []prompb.Histogram{prompb.FromIntHistogram(2, &testHistogram), prompb.FromFloatHistogram(3, testHistogram.ToFloat(nil))},
},
},
}
@ -79,37 +78,23 @@ var writeRequestFixture = &prompb.WriteRequest{
// writeV2RequestFixture represents the same request as writeRequestFixture, but using the v2 representation.
var writeV2RequestFixture = func() *writev2.Request {
st := writev2.NewSymbolTable()
var labels []uint32
for _, s := range []string{
"__name__", "test_metric1",
"b", "c",
"baz", "qux",
"d", "e",
"foo", "bar",
} {
ref := st.Symbolize(s)
labels = append(labels, ref)
}
for _, s := range []string{
"f", "g", // 10, 11
"h", "i", // 12, 13
} {
_ = st.Symbolize(s)
}
b := labels.NewScratchBuilder(0)
labelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].ToLabels(&b, nil), nil)
exemplar1LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].Exemplars[0].ToExemplar(&b, nil).Labels, nil)
exemplar2LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].Exemplars[0].ToExemplar(&b, nil).Labels, nil)
return &writev2.Request{
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: labels,
LabelsRefs: labelRefs,
Samples: []writev2.Sample{{Value: 1, Timestamp: 0}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{10, 11}, Value: 1, Timestamp: 0}},
Histograms: []writev2.Histogram{HistogramToV2HistogramProto(0, &testHistogram), FloatHistogramToV2HistogramProto(1, testHistogram.ToFloat(nil))},
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 0}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(0, &testHistogram), writev2.FromFloatHistogram(1, testHistogram.ToFloat(nil))},
},
{
LabelsRefs: labels,
LabelsRefs: labelRefs,
Samples: []writev2.Sample{{Value: 2, Timestamp: 1}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{12, 13}, Value: 2, Timestamp: 1}},
Histograms: []writev2.Histogram{HistogramToV2HistogramProto(2, &testHistogram), FloatHistogramToV2HistogramProto(3, testHistogram.ToFloat(nil))},
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 1}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(2, &testHistogram), writev2.FromFloatHistogram(3, testHistogram.ToFloat(nil))},
},
},
Symbols: st.Symbols(),
@ -310,7 +295,7 @@ func TestConcreteSeriesIterator_HistogramSamples(t *testing.T) {
} else {
ts = int64(i)
}
histProtos[i] = HistogramToHistogramProto(ts, h)
histProtos[i] = prompb.FromIntHistogram(ts, h)
}
series := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
@ -361,9 +346,9 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
histProtos := make([]prompb.Histogram, len(histograms))
for i, h := range histograms {
if i < 10 {
histProtos[i] = HistogramToHistogramProto(int64(i+1), h)
histProtos[i] = prompb.FromIntHistogram(int64(i+1), h)
} else {
histProtos[i] = HistogramToHistogramProto(int64(i+6), h)
histProtos[i] = prompb.FromIntHistogram(int64(i+6), h)
}
}
series := &concreteSeries{
@ -443,7 +428,7 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, fh = it.AtFloatHistogram(nil)
require.Equal(t, int64(17), ts)
expected := HistogramProtoToFloatHistogram(HistogramToHistogramProto(int64(17), histograms[11]))
expected := prompb.FromIntHistogram(int64(17), histograms[11]).ToFloatHistogram()
require.Equal(t, expected, fh)
// Keep calling Next() until the end.
@ -527,37 +512,6 @@ func TestMergeLabels(t *testing.T) {
}
}
func TestMetricTypeToMetricTypeProto(t *testing.T) {
tc := []struct {
desc string
input model.MetricType
expected prompb.MetricMetadata_MetricType
}{
{
desc: "with a single-word metric",
input: model.MetricTypeCounter,
expected: prompb.MetricMetadata_COUNTER,
},
{
desc: "with a two-word metric",
input: model.MetricTypeStateset,
expected: prompb.MetricMetadata_STATESET,
},
{
desc: "with an unknown metric",
input: "not-known",
expected: prompb.MetricMetadata_UNKNOWN,
},
}
for _, tt := range tc {
t.Run(tt.desc, func(t *testing.T) {
m := metricTypeToMetricTypeProto(tt.input)
require.Equal(t, tt.expected, m)
})
}
}
func TestDecodeWriteRequest(t *testing.T) {
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy")
require.NoError(t, err)
@ -576,212 +530,9 @@ func TestDecodeV2WriteRequest(t *testing.T) {
require.Equal(t, writeV2RequestFixture, actual)
}
func TestNilHistogramProto(t *testing.T) {
// This function will panic if it impromperly handles nil
// values, causing the test to fail.
HistogramProtoToHistogram(prompb.Histogram{})
HistogramProtoToFloatHistogram(prompb.Histogram{})
}
func exampleHistogram() histogram.Histogram {
return histogram.Histogram{
CounterResetHint: histogram.GaugeType,
Schema: 0,
Count: 19,
Sum: 2.7,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 5},
{Offset: 1, Length: 0},
{Offset: 0, Length: 1},
},
NegativeBuckets: []int64{1, 2, -2, 1, -1, 0},
}
}
func exampleHistogramProto() prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountInt{CountInt: 19},
Sum: 2.7,
Schema: 0,
ZeroThreshold: 0,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: 0},
NegativeSpans: []prompb.BucketSpan{
{
Offset: 0,
Length: 5,
},
{
Offset: 1,
Length: 0,
},
{
Offset: 0,
Length: 1,
},
},
NegativeDeltas: []int64{1, 2, -2, 1, -1, 0},
PositiveSpans: []prompb.BucketSpan{
{
Offset: 0,
Length: 4,
},
{
Offset: 0,
Length: 0,
},
{
Offset: 0,
Length: 3,
},
},
PositiveDeltas: []int64{1, 2, -2, 1, -1, 0, 0},
ResetHint: prompb.Histogram_GAUGE,
Timestamp: 1337,
}
}
func TestHistogramToProtoConvert(t *testing.T) {
tests := []struct {
input histogram.CounterResetHint
expected prompb.Histogram_ResetHint
}{
{
input: histogram.UnknownCounterReset,
expected: prompb.Histogram_UNKNOWN,
},
{
input: histogram.CounterReset,
expected: prompb.Histogram_YES,
},
{
input: histogram.NotCounterReset,
expected: prompb.Histogram_NO,
},
{
input: histogram.GaugeType,
expected: prompb.Histogram_GAUGE,
},
}
for _, test := range tests {
h := exampleHistogram()
h.CounterResetHint = test.input
p := exampleHistogramProto()
p.ResetHint = test.expected
require.Equal(t, p, HistogramToHistogramProto(1337, &h))
require.Equal(t, h, *HistogramProtoToHistogram(p))
}
}
func exampleFloatHistogram() histogram.FloatHistogram {
return histogram.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Schema: 0,
Count: 19,
Sum: 2.7,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
},
PositiveBuckets: []float64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 5},
{Offset: 1, Length: 0},
{Offset: 0, Length: 1},
},
NegativeBuckets: []float64{1, 2, -2, 1, -1, 0},
}
}
func exampleFloatHistogramProto() prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountFloat{CountFloat: 19},
Sum: 2.7,
Schema: 0,
ZeroThreshold: 0,
ZeroCount: &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: 0},
NegativeSpans: []prompb.BucketSpan{
{
Offset: 0,
Length: 5,
},
{
Offset: 1,
Length: 0,
},
{
Offset: 0,
Length: 1,
},
},
NegativeCounts: []float64{1, 2, -2, 1, -1, 0},
PositiveSpans: []prompb.BucketSpan{
{
Offset: 0,
Length: 4,
},
{
Offset: 0,
Length: 0,
},
{
Offset: 0,
Length: 3,
},
},
PositiveCounts: []float64{1, 2, -2, 1, -1, 0, 0},
ResetHint: prompb.Histogram_GAUGE,
Timestamp: 1337,
}
}
func TestFloatHistogramToProtoConvert(t *testing.T) {
tests := []struct {
input histogram.CounterResetHint
expected prompb.Histogram_ResetHint
}{
{
input: histogram.UnknownCounterReset,
expected: prompb.Histogram_UNKNOWN,
},
{
input: histogram.CounterReset,
expected: prompb.Histogram_YES,
},
{
input: histogram.NotCounterReset,
expected: prompb.Histogram_NO,
},
{
input: histogram.GaugeType,
expected: prompb.Histogram_GAUGE,
},
}
for _, test := range tests {
h := exampleFloatHistogram()
h.CounterResetHint = test.input
p := exampleFloatHistogramProto()
p.ResetHint = test.expected
require.Equal(t, p, FloatHistogramToHistogramProto(1337, &h))
require.Equal(t, h, *FloatHistogramProtoToFloatHistogram(p))
}
}
func TestStreamResponse(t *testing.T) {
lbs1 := LabelsToLabelsProto(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil)
lbs2 := LabelsToLabelsProto(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil)
lbs1 := prompb.FromLabels(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil)
lbs2 := prompb.FromLabels(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil)
chunk := prompb.Chunk{
Type: prompb.Chunk_XOR,
Data: make([]byte, 100),
@ -853,7 +604,7 @@ func (c *mockChunkSeriesSet) Next() bool {
func (c *mockChunkSeriesSet) At() storage.ChunkSeries {
return &storage.ChunkSeriesEntry{
Lset: LabelProtosToLabels(&c.builder, c.chunkedSeries[c.index].Labels),
Lset: c.chunkedSeries[c.index].ToLabels(&c.builder, nil),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
return &mockChunkIterator{
chunks: c.chunkedSeries[c.index].Chunks,

View file

@ -553,7 +553,7 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr
mm = append(mm, prompb.MetricMetadata{
MetricFamilyName: entry.Metric,
Help: entry.Help,
Type: metricTypeToMetricTypeProto(entry.Type),
Type: prompb.FromMetadataType(entry.Type),
Unit: entry.Unit,
})
}
@ -1628,7 +1628,8 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
pendingData[nPending].Labels = LabelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Labels = prompb.FromLabels(d.seriesLabels, pendingData[nPending].Labels)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
@ -1638,16 +1639,16 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
nPendingSamples++
case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
Labels: LabelsToLabelsProto(d.exemplarLabels, nil),
Labels: prompb.FromLabels(d.exemplarLabels, nil),
Value: d.value,
Timestamp: d.timestamp,
})
nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, prompb.FromIntHistogram(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, prompb.FromFloatHistogram(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
}
@ -1879,7 +1880,7 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries,
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
// todo: should we also safeguard against empty metadata here?
if d.metadata != nil {
pendingData[nPending].Metadata.Type = metricTypeToMetricTypeProtoV2(d.metadata.Type)
pendingData[nPending].Metadata.Type = writev2.FromMetadataType(d.metadata.Type)
pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Help)
pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Unit)
nPendingMetadata++
@ -1911,10 +1912,10 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries,
})
nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToV2HistogramProto(d.timestamp, d.histogram))
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, writev2.FromIntHistogram(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToV2HistogramProto(d.timestamp, d.floatHistogram))
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, writev2.FromFloatHistogram(d.timestamp, d.floatHistogram))
nPendingHistograms++
case tMetadata:
// TODO: log or return an error?

View file

@ -1023,7 +1023,7 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco
for _, s := range ss {
tsID := getSeriesIDFromRef(series[s.Ref])
e := prompb.Exemplar{
Labels: LabelsToLabelsProto(s.Labels, nil),
Labels: prompb.FromLabels(s.Labels, nil),
Timestamp: s.T,
Value: s.V,
}
@ -1040,7 +1040,7 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie
for _, h := range hh {
tsID := getSeriesIDFromRef(series[h.Ref])
c.expectedHistograms[tsID] = append(c.expectedHistograms[tsID], HistogramToHistogramProto(h.T, h.H))
c.expectedHistograms[tsID] = append(c.expectedHistograms[tsID], prompb.FromIntHistogram(h.T, h.H))
}
}
@ -1053,7 +1053,7 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa
for _, fh := range fhs {
tsID := getSeriesIDFromRef(series[fh.Ref])
c.expectedFloatHistograms[tsID] = append(c.expectedFloatHistograms[tsID], FloatHistogramToHistogramProto(fh.T, fh.FH))
c.expectedFloatHistograms[tsID] = append(c.expectedFloatHistograms[tsID], prompb.FromFloatHistogram(fh.T, fh.FH))
}
}
@ -1155,7 +1155,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
var reqProtoV2 writev2.Request
err = proto.Unmarshal(reqBuf, &reqProtoV2)
if err == nil {
reqProto, err = V2WriteRequestToWriteRequest(&reqProtoV2)
reqProto, err = v2RequestToWriteRequest(&reqProtoV2)
}
}
if err != nil {
@ -1166,9 +1166,9 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
return errors.New("invalid request, no timeseries")
}
builder := labels.NewScratchBuilder(0)
b := labels.NewScratchBuilder(0)
for _, ts := range reqProto.Timeseries {
labels := LabelProtosToLabels(&builder, ts.Labels)
labels := ts.ToLabels(&b, nil)
tsID := labels.String()
if len(ts.Samples) > 0 {
c.receivedSamples[tsID] = append(c.receivedSamples[tsID], ts.Samples...)
@ -1202,6 +1202,51 @@ func (c *TestWriteClient) Endpoint() string {
return "http://test-remote.com/1234"
}
func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(v2Req.Timeseries)),
// TODO handle metadata?
}
b := labels.NewScratchBuilder(0)
for i, rts := range v2Req.Timeseries {
rts.ToLabels(&b, v2Req.Symbols).Range(func(l labels.Label) {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
exemplars := make([]prompb.Exemplar, len(rts.Exemplars))
for j, e := range rts.Exemplars {
exemplars[j].Value = e.Value
exemplars[j].Timestamp = e.Timestamp
e.ToExemplar(&b, v2Req.Symbols).Labels.Range(func(l labels.Label) {
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
}
req.Timeseries[i].Exemplars = exemplars
req.Timeseries[i].Samples = make([]prompb.Sample, len(rts.Samples))
for j, s := range rts.Samples {
req.Timeseries[i].Samples[j].Timestamp = s.Timestamp
req.Timeseries[i].Samples[j].Value = s.Value
}
req.Timeseries[i].Histograms = make([]prompb.Histogram, len(rts.Histograms))
for j, h := range rts.Histograms {
if h.IsFloatHistogram() {
req.Timeseries[i].Histograms[j] = prompb.FromFloatHistogram(h.Timestamp, h.ToFloatHistogram())
continue
}
req.Timeseries[i].Histograms[j] = prompb.FromIntHistogram(h.Timestamp, h.ToIntHistogram())
}
}
return req, nil
}
// TestBlockingWriteClient is a queue_manager WriteClient which will block
// on any calls to Store(), until the request's Context is cancelled, at which
// point the `numCalls` property will contain a count of how many times Store()

View file

@ -124,7 +124,7 @@ func TestSampledReadEndpoint(t *testing.T) {
{Name: "d", Value: "e"},
},
Histograms: []prompb.Histogram{
FloatHistogramToHistogramProto(0, tsdbutil.GenerateTestFloatHistogram(0)),
prompb.FromFloatHistogram(0, tsdbutil.GenerateTestFloatHistogram(0)),
},
},
},

View file

@ -173,12 +173,12 @@ func TestSeriesSetFilter(t *testing.T) {
toRemove: []string{"foo"},
in: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
{Labels: LabelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b"), nil)},
{Labels: prompb.FromLabels(labels.FromStrings("foo", "bar", "a", "b"), nil)},
},
},
expected: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
{Labels: LabelsToLabelsProto(labels.FromStrings("a", "b"), nil)},
{Labels: prompb.FromLabels(labels.FromStrings("a", "b"), nil)},
},
},
},
@ -212,7 +212,7 @@ func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prom
q := &prompb.QueryResult{}
for _, s := range c.store {
l := LabelProtosToLabels(&c.b, s.Labels)
l := s.ToLabels(&c.b, nil)
var notMatch bool
for _, m := range matchers {

View file

@ -227,7 +227,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
b := labels.NewScratchBuilder(0)
for _, ts := range req.Timeseries {
ls := LabelProtosToLabels(&b, ts.Labels)
ls := ts.ToLabels(&b, nil)
if !ls.IsValid() {
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", ls.String())
samplesWithInvalidLabels++
@ -240,7 +240,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(&b, ep)
e := ep.ToExemplar(&b, nil)
h.appendExemplar(timeLimitApp, e, ls, &outOfOrderExemplarErrs)
}
@ -276,8 +276,9 @@ func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (err e
err = timeLimitApp.Commit()
}()
b := labels.NewScratchBuilder(0)
for _, ts := range req.Timeseries {
ls := writev2.DesymbolizeLabels(ts.LabelsRefs, req.Symbols)
ls := ts.ToLabels(&b, req.Symbols)
err := h.appendSamplesV2(timeLimitApp, ts.Samples, ls)
if err != nil {
@ -285,7 +286,7 @@ func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (err e
}
for _, ep := range ts.Exemplars {
e := exemplarProtoV2ToExemplar(ep, req.Symbols)
e := ep.ToExemplar(&b, req.Symbols)
h.appendExemplar(timeLimitApp, e, ls, &outOfOrderExemplarErrs)
}
@ -294,7 +295,7 @@ func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (err e
return err
}
m := metadataProtoV2ToMetadata(ts.Metadata, req.Symbols)
m := ts.ToMetadata(req.Symbols)
if _, err = timeLimitApp.UpdateMetadata(0, ls, m); err != nil {
level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err)
}
@ -358,11 +359,9 @@ func (h *writeHandler) appendHistograms(app storage.Appender, hh []prompb.Histog
var err error
for _, hp := range hh {
if hp.IsFloatHistogram() {
fhs := FloatHistogramProtoToFloatHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, hp.ToFloatHistogram())
} else {
hs := HistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hp.ToIntHistogram(), nil)
}
if err != nil {
unwrappedErr := errors.Unwrap(err)
@ -384,11 +383,9 @@ func (h *writeHandler) appendHistogramsV2(app storage.Appender, hh []writev2.His
var err error
for _, hp := range hh {
if hp.IsFloatHistogram() {
fhs := FloatV2HistogramProtoToFloatHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, hp.ToFloatHistogram())
} else {
hs := V2HistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hp.ToIntHistogram(), nil)
}
if err != nil {
unwrappedErr := errors.Unwrap(err)

View file

@ -267,24 +267,22 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) {
j := 0
k := 0
for _, ts := range writeRequestFixture.Timeseries {
labels := LabelProtosToLabels(&b, ts.Labels)
labels := ts.ToLabels(&b, nil)
for _, s := range ts.Samples {
requireEqual(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
i++
}
for _, e := range ts.Exemplars {
exemplarLabels := LabelProtosToLabels(&b, e.Labels)
exemplarLabels := e.ToExemplar(&b, nil).Labels
requireEqual(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++
}
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fh := FloatHistogramProtoToFloatHistogram(hp)
fh := hp.ToFloatHistogram()
requireEqual(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else {
h := HistogramProtoToHistogram(hp)
h := hp.ToIntHistogram()
requireEqual(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
}
@ -313,33 +311,29 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
resp := recorder.Result()
require.Equal(t, http.StatusNoContent, resp.StatusCode)
b := labels.NewScratchBuilder(0)
i := 0
j := 0
k := 0
// the reduced write request is equivalent to the write request fixture.
// we can use it for
for _, ts := range writeV2RequestFixture.Timeseries {
ls := writev2.DesymbolizeLabels(ts.LabelsRefs, writeV2RequestFixture.Symbols)
ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols)
for _, s := range ts.Samples {
require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
i++
}
for _, e := range ts.Exemplars {
exemplarLabels := writev2.DesymbolizeLabels(e.LabelsRefs, writeV2RequestFixture.Symbols)
require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
exemplarLabels := e.ToExemplar(&b, writeV2RequestFixture.Symbols).Labels
requireEqual(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++
}
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fh := FloatHistogramProtoV2ToFloatHistogram(hp)
require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
fh := hp.ToFloatHistogram()
requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else {
h := HistogramProtoV2ToHistogram(hp)
require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
h := hp.ToIntHistogram()
requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
}
k++
}
@ -348,7 +342,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
}
func TestOutOfOrderSample_V1Message(t *testing.T) {
tests := []struct {
for _, tc := range []struct {
Name string
Timestamp int64
}{
@ -360,9 +354,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) {
Name: "future",
Timestamp: math.MaxInt64,
},
}
for _, tc := range tests {
} {
t.Run(tc.Name, func(t *testing.T) {
payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
@ -386,7 +378,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) {
}
func TestOutOfOrderSample_V2Message(t *testing.T) {
tests := []struct {
for _, tc := range []struct {
Name string
Timestamp int64
}{
@ -398,9 +390,7 @@ func TestOutOfOrderSample_V2Message(t *testing.T) {
Name: "future",
Timestamp: math.MaxInt64,
},
}
for _, tc := range tests {
} {
t.Run(tc.Name, func(t *testing.T) {
payload, _, _, err := buildV2WriteRequest(nil, []writev2.TimeSeries{{
LabelsRefs: []uint32{0, 1},
@ -513,7 +503,7 @@ func TestOutOfOrderExemplar_V2Message(t *testing.T) {
}
func TestOutOfOrderHistogram_V1Message(t *testing.T) {
tests := []struct {
for _, tc := range []struct {
Name string
Timestamp int64
}{
@ -525,13 +515,11 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) {
Name: "future",
Timestamp: math.MaxInt64,
},
}
for _, tc := range tests {
} {
t.Run(tc.Name, func(t *testing.T) {
payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(tc.Timestamp, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))},
Histograms: []prompb.Histogram{prompb.FromIntHistogram(tc.Timestamp, &testHistogram), prompb.FromFloatHistogram(1, testHistogram.ToFloat(nil))},
}}, nil, nil, nil, nil, "snappy")
require.NoError(t, err)
@ -551,27 +539,43 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) {
}
func TestOutOfOrderHistogram_V2Message(t *testing.T) {
payload, _, _, err := buildV2WriteRequest(nil, []writev2.TimeSeries{{
LabelsRefs: []uint32{0, 1},
Histograms: []writev2.Histogram{HistogramToV2HistogramProto(0, &testHistogram), FloatHistogramToV2HistogramProto(1, testHistogram.ToFloat(nil))},
}}, []string{"__name__", "metric1"}, nil, nil, nil, "snappy") // TODO(bwplotka): No empty string!
require.NoError(t, err)
for _, tc := range []struct {
Name string
Timestamp int64
}{
{
Name: "historic",
Timestamp: 0,
},
{
Name: "future",
Timestamp: math.MaxInt64,
},
} {
t.Run(tc.Name, func(t *testing.T) {
payload, _, _, err := buildV2WriteRequest(nil, []writev2.TimeSeries{{
LabelsRefs: []uint32{0, 1},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(0, &testHistogram), writev2.FromFloatHistogram(1, testHistogram.ToFloat(nil))},
}}, []string{"__name__", "metric1"}, nil, nil, nil, "snappy")
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(payload))
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(payload))
require.NoError(t, err)
req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2])
req.Header.Set("Content-Encoding", string(SnappyBlockCompression))
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2])
req.Header.Set("Content-Encoding", string(SnappyBlockCompression))
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
appendable := &mockAppendable{latestHistogram: 100}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2})
appendable := &mockAppendable{latestHistogram: 100}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2})
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
resp := recorder.Result()
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
})
}
}
func BenchmarkRemoteWriteHandler(b *testing.B) {
@ -584,7 +588,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) {
{Name: "__name__", Value: "test_metric"},
{Name: "test_label_name_" + num, Value: labelValue + num},
},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
Histograms: []prompb.Histogram{prompb.FromIntHistogram(0, &testHistogram)},
}}, nil, nil, nil, nil, "snappy")
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))