diff --git a/prompb/codec.go b/prompb/codec.go new file mode 100644 index 0000000000..ad30cd5e7b --- /dev/null +++ b/prompb/codec.go @@ -0,0 +1,201 @@ +// Copyright 2024 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prompb + +import ( + "strings" + + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" +) + +// NOTE(bwplotka): This file's code is tested in /prompb/rwcommon. + +// ToLabels return model labels.Labels from timeseries' remote labels. +func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, _ []string) labels.Labels { + return labelProtosToLabels(b, m.GetLabels()) +} + +// ToLabels return model labels.Labels from timeseries' remote labels. +func (m ChunkedSeries) ToLabels(b *labels.ScratchBuilder, _ []string) labels.Labels { + return labelProtosToLabels(b, m.GetLabels()) +} + +func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []Label) labels.Labels { + b.Reset() + for _, l := range labelPairs { + b.Add(l.Name, l.Value) + } + b.Sort() + return b.Labels() +} + +// FromLabels transforms labels into prompb labels. The buffer slice +// will be used to avoid allocations if it is big enough to store the labels. +func FromLabels(lbls labels.Labels, buf []Label) []Label { + result := buf[:0] + lbls.Range(func(l labels.Label) { + result = append(result, Label{ + Name: l.Name, + Value: l.Value, + }) + }) + return result +} + +// FromMetadataType transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum. +func FromMetadataType(t model.MetricType) MetricMetadata_MetricType { + mt := strings.ToUpper(string(t)) + v, ok := MetricMetadata_MetricType_value[mt] + if !ok { + return MetricMetadata_UNKNOWN + } + return MetricMetadata_MetricType(v) +} + +// IsFloatHistogram returns true if the histogram is float. +func (h Histogram) IsFloatHistogram() bool { + _, ok := h.GetCount().(*Histogram_CountFloat) + return ok +} + +// ToIntHistogram returns integer Prometheus histogram from the remote implementation +// of integer histogram. If it's a float histogram, the method returns nil. +func (h Histogram) ToIntHistogram() *histogram.Histogram { + if h.IsFloatHistogram() { + return nil + } + return &histogram.Histogram{ + CounterResetHint: histogram.CounterResetHint(h.ResetHint), + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: h.GetZeroCountInt(), + Count: h.GetCountInt(), + Sum: h.Sum, + PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()), + PositiveBuckets: h.GetPositiveDeltas(), + NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()), + NegativeBuckets: h.GetNegativeDeltas(), + } +} + +// ToFloatHistogram returns float Prometheus histogram from the remote implementation +// of float histogram. If the underlying implementation is an integer histogram, a +// conversion is performed. +func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram { + if h.IsFloatHistogram() { + return &histogram.FloatHistogram{ + CounterResetHint: histogram.CounterResetHint(h.ResetHint), + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: h.GetZeroCountFloat(), + Count: h.GetCountFloat(), + Sum: h.Sum, + PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()), + PositiveBuckets: h.GetPositiveCounts(), + NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()), + NegativeBuckets: h.GetNegativeCounts(), + } + } + // Conversion from integer histogram. + return &histogram.FloatHistogram{ + CounterResetHint: histogram.CounterResetHint(h.ResetHint), + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: float64(h.GetZeroCountInt()), + Count: float64(h.GetCountInt()), + Sum: h.Sum, + PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()), + PositiveBuckets: deltasToCounts(h.GetPositiveDeltas()), + NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()), + NegativeBuckets: deltasToCounts(h.GetNegativeDeltas()), + } +} + +func spansProtoToSpans(s []BucketSpan) []histogram.Span { + spans := make([]histogram.Span, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans +} + +func deltasToCounts(deltas []int64) []float64 { + counts := make([]float64, len(deltas)) + var cur float64 + for i, d := range deltas { + cur += float64(d) + counts[i] = cur + } + return counts +} + +// FromIntHistogram returns remote Histogram from the integer Histogram. +func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram { + return Histogram{ + Count: &Histogram_CountInt{CountInt: h.Count}, + Sum: h.Sum, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount}, + NegativeSpans: spansToSpansProto(h.NegativeSpans), + NegativeDeltas: h.NegativeBuckets, + PositiveSpans: spansToSpansProto(h.PositiveSpans), + PositiveDeltas: h.PositiveBuckets, + ResetHint: Histogram_ResetHint(h.CounterResetHint), + Timestamp: timestamp, + } +} + +// FromFloatHistogram returns remote Histogram from the float Histogram. +func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram { + return Histogram{ + Count: &Histogram_CountFloat{CountFloat: fh.Count}, + Sum: fh.Sum, + Schema: fh.Schema, + ZeroThreshold: fh.ZeroThreshold, + ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount}, + NegativeSpans: spansToSpansProto(fh.NegativeSpans), + NegativeCounts: fh.NegativeBuckets, + PositiveSpans: spansToSpansProto(fh.PositiveSpans), + PositiveCounts: fh.PositiveBuckets, + ResetHint: Histogram_ResetHint(fh.CounterResetHint), + Timestamp: timestamp, + } +} + +func spansToSpansProto(s []histogram.Span) []BucketSpan { + spans := make([]BucketSpan, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans +} + +// ToExemplar converts remote exemplar to model exemplar. +func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, _ []string) exemplar.Exemplar { + timestamp := m.Timestamp + + return exemplar.Exemplar{ + Labels: labelProtosToLabels(b, m.GetLabels()), + Value: m.Value, + Ts: timestamp, + HasTs: timestamp != 0, + } +} diff --git a/prompb/custom.go b/prompb/custom.go index 13d6e0f0cd..f73ddd446b 100644 --- a/prompb/custom.go +++ b/prompb/custom.go @@ -17,14 +17,6 @@ import ( "sync" ) -func (m Sample) T() int64 { return m.Timestamp } -func (m Sample) V() float64 { return m.Value } - -func (h Histogram) IsFloatHistogram() bool { - _, ok := h.GetCount().(*Histogram_CountFloat) - return ok -} - func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) { size := r.Size() data, ok := p.Get().(*[]byte) diff --git a/prompb/io/prometheus/write/v2/codec.go b/prompb/io/prometheus/write/v2/codec.go new file mode 100644 index 0000000000..ba7ac0fc12 --- /dev/null +++ b/prompb/io/prometheus/write/v2/codec.go @@ -0,0 +1,188 @@ +// Copyright 2024 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package writev2 + +import ( + "strings" + + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/prompb" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" +) + +// NOTE(bwplotka): This file's code is tested in /prompb/rwcommon. + +// ToLabels return model labels.Labels from timeseries' remote labels. +func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, symbols []string) labels.Labels { + return desymbolizeLabels(b, m.GetLabelsRefs(), symbols) +} + +// ToMetadata return model metadata from timeseries' remote metadata. +func (m TimeSeries) ToMetadata(symbols []string) metadata.Metadata { + mt := strings.ToLower(m.Metadata.Type.String()) + return metadata.Metadata{ + Type: model.MetricType(mt), // TODO(@tpaschalis) a better way for this? + Unit: symbols[m.Metadata.UnitRef], + Help: symbols[m.Metadata.HelpRef], + } +} + +// FromMetadataType transforms a Prometheus metricType into writev2 metricType. Since the former is a string we need to transform it to an enum. +func FromMetadataType(t model.MetricType) Metadata_MetricType { + mt := strings.ToUpper(string(t)) + v, ok := prompb.MetricMetadata_MetricType_value[mt] + if !ok { + return Metadata_METRIC_TYPE_UNSPECIFIED + } + return Metadata_MetricType(v) +} + +// IsFloatHistogram returns true if the histogram is float. +func (h Histogram) IsFloatHistogram() bool { + _, ok := h.GetCount().(*Histogram_CountFloat) + return ok +} + +// ToIntHistogram returns integer Prometheus histogram from the remote implementation +// of integer histogram. If it's a float histogram, the method returns nil. +// TODO(bwplotka): Add support for incoming NHCB. +func (h Histogram) ToIntHistogram() *histogram.Histogram { + if h.IsFloatHistogram() { + return nil + } + return &histogram.Histogram{ + CounterResetHint: histogram.CounterResetHint(h.ResetHint), + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: h.GetZeroCountInt(), + Count: h.GetCountInt(), + Sum: h.Sum, + PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()), + PositiveBuckets: h.GetPositiveDeltas(), + NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()), + NegativeBuckets: h.GetNegativeDeltas(), + } +} + +// ToFloatHistogram returns float Prometheus histogram from the remote implementation +// of float histogram. If the underlying implementation is an integer histogram, a +// conversion is performed. +// TODO(bwplotka): Add support for incoming NHCB. +func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram { + if h.IsFloatHistogram() { + return &histogram.FloatHistogram{ + CounterResetHint: histogram.CounterResetHint(h.ResetHint), + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: h.GetZeroCountFloat(), + Count: h.GetCountFloat(), + Sum: h.Sum, + PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()), + PositiveBuckets: h.GetPositiveCounts(), + NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()), + NegativeBuckets: h.GetNegativeCounts(), + } + } + // Conversion from integer histogram. + return &histogram.FloatHistogram{ + CounterResetHint: histogram.CounterResetHint(h.ResetHint), + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: float64(h.GetZeroCountInt()), + Count: float64(h.GetCountInt()), + Sum: h.Sum, + PositiveSpans: spansProtoToSpans(h.GetPositiveSpans()), + PositiveBuckets: deltasToCounts(h.GetPositiveDeltas()), + NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()), + NegativeBuckets: deltasToCounts(h.GetNegativeDeltas()), + } +} + +func spansProtoToSpans(s []BucketSpan) []histogram.Span { + spans := make([]histogram.Span, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans +} + +func deltasToCounts(deltas []int64) []float64 { + counts := make([]float64, len(deltas)) + var cur float64 + for i, d := range deltas { + cur += float64(d) + counts[i] = cur + } + return counts +} + +// FromIntHistogram returns remote Histogram from the integer Histogram. +func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram { + return Histogram{ + Count: &Histogram_CountInt{CountInt: h.Count}, + Sum: h.Sum, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount}, + NegativeSpans: spansToSpansProto(h.NegativeSpans), + NegativeDeltas: h.NegativeBuckets, + PositiveSpans: spansToSpansProto(h.PositiveSpans), + PositiveDeltas: h.PositiveBuckets, + ResetHint: Histogram_ResetHint(h.CounterResetHint), + Timestamp: timestamp, + } +} + +// FromFloatHistogram returns remote Histogram from the float Histogram. +func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram { + return Histogram{ + Count: &Histogram_CountFloat{CountFloat: fh.Count}, + Sum: fh.Sum, + Schema: fh.Schema, + ZeroThreshold: fh.ZeroThreshold, + ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount}, + NegativeSpans: spansToSpansProto(fh.NegativeSpans), + NegativeCounts: fh.NegativeBuckets, + PositiveSpans: spansToSpansProto(fh.PositiveSpans), + PositiveCounts: fh.PositiveBuckets, + ResetHint: Histogram_ResetHint(fh.CounterResetHint), + Timestamp: timestamp, + } +} + +func spansToSpansProto(s []histogram.Span) []BucketSpan { + spans := make([]BucketSpan, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans +} + +func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, symbols []string) exemplar.Exemplar { + timestamp := m.Timestamp + + return exemplar.Exemplar{ + Labels: desymbolizeLabels(b, m.LabelsRefs, symbols), + Value: m.Value, + Ts: timestamp, + HasTs: timestamp != 0, + } +} diff --git a/prompb/io/prometheus/write/v2/custom.go b/prompb/io/prometheus/write/v2/custom.go index 1c92bd3058..651889318b 100644 --- a/prompb/io/prometheus/write/v2/custom.go +++ b/prompb/io/prometheus/write/v2/custom.go @@ -20,11 +20,6 @@ import ( func (m Sample) T() int64 { return m.Timestamp } func (m Sample) V() float64 { return m.Value } -func (h Histogram) IsFloatHistogram() bool { - _, ok := h.GetCount().(*Histogram_CountFloat) - return ok -} - func (m *Request) OptimizedMarshal(dst []byte) ([]byte, error) { siz := m.Size() if cap(dst) < siz { diff --git a/prompb/io/prometheus/write/v2/symbols.go b/prompb/io/prometheus/write/v2/symbols.go index 71f5006886..f316a976f2 100644 --- a/prompb/io/prometheus/write/v2/symbols.go +++ b/prompb/io/prometheus/write/v2/symbols.go @@ -72,9 +72,9 @@ func (t *SymbolsTable) Reset() { } } -// DesymbolizeLabels decodes label references, with given symbols to labels. -func DesymbolizeLabels(labelRefs []uint32, symbols []string) labels.Labels { - b := labels.NewScratchBuilder(len(labelRefs)) +// desymbolizeLabels decodes label references, with given symbols to labels. +func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []string) labels.Labels { + b.Reset() for i := 0; i < len(labelRefs); i += 2 { b.Add(symbols[labelRefs[i]], symbols[labelRefs[i+1]]) } diff --git a/prompb/io/prometheus/write/v2/symbols_test.go b/prompb/io/prometheus/write/v2/symbols_test.go index 50eb743cec..3d852e88f1 100644 --- a/prompb/io/prometheus/write/v2/symbols_test.go +++ b/prompb/io/prometheus/write/v2/symbols_test.go @@ -49,7 +49,8 @@ func TestSymbolsTable(t *testing.T) { ls := labels.FromStrings("__name__", "qwer", "zxcv", "1234") encoded := s.SymbolizeLabels(ls, nil) require.Equal(t, []uint32{1, 3, 4, 5}, encoded) - decoded := DesymbolizeLabels(encoded, s.Symbols()) + b := labels.NewScratchBuilder(len(encoded)) + decoded := desymbolizeLabels(&b, encoded, s.Symbols()) require.Equal(t, ls, decoded) // Different buf. diff --git a/prompb/rwcommon/codec_test.go b/prompb/rwcommon/codec_test.go new file mode 100644 index 0000000000..fa103dbf50 --- /dev/null +++ b/prompb/rwcommon/codec_test.go @@ -0,0 +1,232 @@ +// Copyright 2024 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rwcommon + +import ( + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/prometheus/prometheus/model/histogram" + + "github.com/prometheus/prometheus/prompb" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" +) + +func TestToLabels(t *testing.T) { + expected := labels.FromStrings("__name__", "metric1", "foo", "bar") + + t.Run("v1", func(t *testing.T) { + ts := prompb.TimeSeries{Labels: []prompb.Label{{Name: "__name__", Value: "metric1"}, {Name: "foo", Value: "bar"}}} + b := labels.NewScratchBuilder(2) + require.Equal(t, expected, ts.ToLabels(&b, nil)) + require.Equal(t, ts.Labels, prompb.FromLabels(expected, nil)) + require.Equal(t, ts.Labels, prompb.FromLabels(expected, ts.Labels)) + }) + t.Run("v2", func(t *testing.T) { + v2Symbols := []string{"", "__name__", "metric1", "foo", "bar"} + ts := writev2.TimeSeries{LabelsRefs: []uint32{1, 2, 3, 4}} + b := labels.NewScratchBuilder(2) + require.Equal(t, expected, ts.ToLabels(&b, v2Symbols)) + // No need for FromLabels in our prod code as we use symbol table to do so. + }) +} + +func TestFromMetadataType(t *testing.T) { + for _, tc := range []struct { + desc string + input model.MetricType + expectedV1 prompb.MetricMetadata_MetricType + expectedV2 writev2.Metadata_MetricType + }{ + { + desc: "with a single-word metric", + input: model.MetricTypeCounter, + expectedV1: prompb.MetricMetadata_COUNTER, + expectedV2: writev2.Metadata_METRIC_TYPE_COUNTER, + }, + { + desc: "with a two-word metric", + input: model.MetricTypeStateset, + expectedV1: prompb.MetricMetadata_STATESET, + expectedV2: writev2.Metadata_METRIC_TYPE_STATESET, + }, + { + desc: "with an unknown metric", + input: "not-known", + expectedV1: prompb.MetricMetadata_UNKNOWN, + expectedV2: writev2.Metadata_METRIC_TYPE_UNSPECIFIED, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Run("v1", func(t *testing.T) { + require.Equal(t, tc.expectedV1, prompb.FromMetadataType(tc.input)) + }) + t.Run("v2", func(t *testing.T) { + require.Equal(t, tc.expectedV2, writev2.FromMetadataType(tc.input)) + }) + }) + } +} + +func TestToHistogram_Empty(t *testing.T) { + t.Run("v1", func(t *testing.T) { + require.NotNilf(t, prompb.Histogram{}.ToIntHistogram(), "") + require.NotNilf(t, prompb.Histogram{}.ToFloatHistogram(), "") + }) + t.Run("v2", func(t *testing.T) { + require.NotNilf(t, writev2.Histogram{}.ToIntHistogram(), "") + require.NotNilf(t, writev2.Histogram{}.ToFloatHistogram(), "") + }) +} + +func testIntHistogram() histogram.Histogram { + return histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Schema: 0, + Count: 19, + Sum: 2.7, + ZeroThreshold: 1e-128, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 5}, + {Offset: 1, Length: 0}, + {Offset: 0, Length: 1}, + }, + NegativeBuckets: []int64{1, 2, -2, 1, -1, 0}, + } +} + +func testFloatHistogram() histogram.FloatHistogram { + return histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Schema: 0, + Count: 19, + Sum: 2.7, + ZeroThreshold: 1e-128, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 5}, + {Offset: 1, Length: 0}, + {Offset: 0, Length: 1}, + }, + NegativeBuckets: []float64{1, 3, 1, 2, 1, 1}, + } +} + +func TestFromIntToFloatOrIntHistogram(t *testing.T) { + testIntHist := testIntHistogram() + testFloatHist := testFloatHistogram() + + t.Run("v1", func(t *testing.T) { + h := prompb.FromIntHistogram(123, testIntHist.Copy()) + require.False(t, h.IsFloatHistogram()) + require.Equal(t, int64(123), h.Timestamp) + require.Equal(t, testIntHist, *h.ToIntHistogram()) + require.Equal(t, testFloatHist, *h.ToFloatHistogram()) + }) + t.Run("v2", func(t *testing.T) { + h := writev2.FromIntHistogram(123, testIntHist.Copy()) + require.False(t, h.IsFloatHistogram()) + require.Equal(t, int64(123), h.Timestamp) + require.Equal(t, testIntHist, *h.ToIntHistogram()) + require.Equal(t, testFloatHist, *h.ToFloatHistogram()) + }) +} + +func TestFromFloatToFloatHistogram(t *testing.T) { + testFloatHist := testFloatHistogram() + + t.Run("v1", func(t *testing.T) { + h := prompb.FromFloatHistogram(123, testFloatHist.Copy()) + require.True(t, h.IsFloatHistogram()) + require.Equal(t, int64(123), h.Timestamp) + require.Nil(t, h.ToIntHistogram()) + require.Equal(t, testFloatHist, *h.ToFloatHistogram()) + }) + t.Run("v2", func(t *testing.T) { + h := writev2.FromFloatHistogram(123, testFloatHist.Copy()) + require.True(t, h.IsFloatHistogram()) + require.Equal(t, int64(123), h.Timestamp) + require.Nil(t, h.ToIntHistogram()) + require.Equal(t, testFloatHist, *h.ToFloatHistogram()) + }) +} + +func TestFromIntOrFloatHistogram_ResetHint(t *testing.T) { + for _, tc := range []struct { + input histogram.CounterResetHint + expectedV1 prompb.Histogram_ResetHint + expectedV2 writev2.Histogram_ResetHint + }{ + { + input: histogram.UnknownCounterReset, + expectedV1: prompb.Histogram_UNKNOWN, + expectedV2: writev2.Histogram_RESET_HINT_UNSPECIFIED, + }, + { + input: histogram.CounterReset, + expectedV1: prompb.Histogram_YES, + expectedV2: writev2.Histogram_RESET_HINT_YES, + }, + { + input: histogram.NotCounterReset, + expectedV1: prompb.Histogram_NO, + expectedV2: writev2.Histogram_RESET_HINT_NO, + }, + { + input: histogram.GaugeType, + expectedV1: prompb.Histogram_GAUGE, + expectedV2: writev2.Histogram_RESET_HINT_GAUGE, + }, + } { + t.Run("", func(t *testing.T) { + t.Run("v1", func(t *testing.T) { + h := testIntHistogram() + h.CounterResetHint = tc.input + got := prompb.FromIntHistogram(1337, &h) + require.Equal(t, tc.expectedV1, got.GetResetHint()) + + fh := testFloatHistogram() + fh.CounterResetHint = tc.input + got2 := prompb.FromFloatHistogram(1337, &fh) + require.Equal(t, tc.expectedV1, got2.GetResetHint()) + }) + t.Run("v2", func(t *testing.T) { + h := testIntHistogram() + h.CounterResetHint = tc.input + got := writev2.FromIntHistogram(1337, &h) + require.Equal(t, tc.expectedV2, got.GetResetHint()) + + fh := testFloatHistogram() + fh.CounterResetHint = tc.input + got2 := writev2.FromFloatHistogram(1337, &fh) + require.Equal(t, tc.expectedV2, got2.GetResetHint()) + }) + }) + } +} diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 7209ca177e..f730864979 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -22,7 +22,6 @@ import ( "net/http" "slices" "sort" - "strings" "sync" "github.com/gogo/protobuf/proto" @@ -30,10 +29,8 @@ import ( "github.com/prometheus/common/model" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" - "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/prompb" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/storage" @@ -155,10 +152,10 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, }) case chunkenc.ValHistogram: ts, h := iter.AtHistogram(nil) - histograms = append(histograms, HistogramToHistogramProto(ts, h)) + histograms = append(histograms, prompb.FromIntHistogram(ts, h)) case chunkenc.ValFloatHistogram: ts, fh := iter.AtFloatHistogram(nil) - histograms = append(histograms, FloatHistogramToHistogramProto(ts, fh)) + histograms = append(histograms, prompb.FromFloatHistogram(ts, fh)) default: return nil, ss.Warnings(), fmt.Errorf("unrecognized value type: %s", valType) } @@ -168,7 +165,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, } resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{ - Labels: LabelsToLabelsProto(series.Labels(), nil), + Labels: prompb.FromLabels(series.Labels(), nil), Samples: samples, Histograms: histograms, }) @@ -184,7 +181,7 @@ func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet if err := validateLabelsAndMetricName(ts.Labels); err != nil { return errSeriesSet{err: err} } - lbls := LabelProtosToLabels(&b, ts.Labels) + lbls := ts.ToLabels(&b, nil) series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms}) } @@ -237,7 +234,7 @@ func StreamChunkedReadResponses( for ss.Next() { series := ss.At() iter = series.Iterator(iter) - lbls = MergeLabels(LabelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels) + lbls = MergeLabels(prompb.FromLabels(series.Labels(), lbls), sortedExternalLabels) maxDataLength := maxBytesInFrame for _, lbl := range lbls { @@ -483,21 +480,16 @@ func (c *concreteSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *hist panic("iterator is not on an integer histogram sample") } h := c.series.histograms[c.histogramsCur] - return h.Timestamp, HistogramProtoToHistogram(h) + return h.Timestamp, h.ToIntHistogram() } // AtFloatHistogram implements chunkenc.Iterator. func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - switch c.curValType { - case chunkenc.ValHistogram: + if c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram { fh := c.series.histograms[c.histogramsCur] - return fh.Timestamp, HistogramProtoToFloatHistogram(fh) - case chunkenc.ValFloatHistogram: - fh := c.series.histograms[c.histogramsCur] - return fh.Timestamp, FloatHistogramProtoToFloatHistogram(fh) - default: - panic("iterator is not on a histogram sample") + return fh.Timestamp, fh.ToFloatHistogram() // integer will be auto-converted. } + panic("iterator is not on a histogram sample") } // AtT implements chunkenc.Iterator. @@ -620,292 +612,6 @@ func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro return result, nil } -func exemplarProtoToExemplar(b *labels.ScratchBuilder, ep prompb.Exemplar) exemplar.Exemplar { - timestamp := ep.Timestamp - - return exemplar.Exemplar{ - Labels: LabelProtosToLabels(b, ep.Labels), - Value: ep.Value, - Ts: timestamp, - HasTs: timestamp != 0, - } -} - -func exemplarProtoV2ToExemplar(ep writev2.Exemplar, symbols []string) exemplar.Exemplar { - timestamp := ep.Timestamp - - return exemplar.Exemplar{ - Labels: writev2.DesymbolizeLabels(ep.LabelsRefs, symbols), - Value: ep.Value, - Ts: timestamp, - HasTs: timestamp != 0, - } -} - -func metadataProtoV2ToMetadata(mp writev2.Metadata, symbols []string) metadata.Metadata { - return metadata.Metadata{ - Type: metricTypeFromProtoV2Equivalent(mp.Type), - Unit: symbols[mp.UnitRef], - Help: symbols[mp.HelpRef], - } -} - -// HistogramProtoToHistogram extracts a (normal integer) Histogram from the -// provided proto message. The caller has to make sure that the proto message -// represents an integer histogram and not a float histogram, or it panics. -func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram { - if hp.IsFloatHistogram() { - panic("HistogramProtoToHistogram called with a float histogram") - } - return &histogram.Histogram{ - CounterResetHint: histogram.CounterResetHint(hp.ResetHint), - Schema: hp.Schema, - ZeroThreshold: hp.ZeroThreshold, - ZeroCount: hp.GetZeroCountInt(), - Count: hp.GetCountInt(), - Sum: hp.Sum, - PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), - PositiveBuckets: hp.GetPositiveDeltas(), - NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), - NegativeBuckets: hp.GetNegativeDeltas(), - } -} - -// HistogramProtoV2ToHistogram extracts a (normal integer) Histogram from the -// provided proto message. The caller has to make sure that the proto message -// represents an integer histogram and not a float histogram, or it panics. -func HistogramProtoV2ToHistogram(hp writev2.Histogram) *histogram.Histogram { - if hp.IsFloatHistogram() { - panic("HistogramProtoToHistogram called with a float histogram") - } - return &histogram.Histogram{ - CounterResetHint: histogram.CounterResetHint(hp.ResetHint), - Schema: hp.Schema, - ZeroThreshold: hp.ZeroThreshold, - ZeroCount: hp.GetZeroCountInt(), - Count: hp.GetCountInt(), - Sum: hp.Sum, - PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()), - PositiveBuckets: hp.GetPositiveDeltas(), - NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()), - NegativeBuckets: hp.GetNegativeDeltas(), - } -} - -// FloatHistogramProtoToFloatHistogram extracts a float Histogram from the -// provided proto message to a Float Histogram. The caller has to make sure that -// the proto message represents a float histogram and not an integer histogram, -// or it panics. -func FloatHistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram { - if !hp.IsFloatHistogram() { - panic("FloatHistogramProtoToFloatHistogram called with an integer histogram") - } - return &histogram.FloatHistogram{ - CounterResetHint: histogram.CounterResetHint(hp.ResetHint), - Schema: hp.Schema, - ZeroThreshold: hp.ZeroThreshold, - ZeroCount: hp.GetZeroCountFloat(), - Count: hp.GetCountFloat(), - Sum: hp.Sum, - PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), - PositiveBuckets: hp.GetPositiveCounts(), - NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), - NegativeBuckets: hp.GetNegativeCounts(), - } -} - -// FloatHistogramProtoV2ToFloatHistogram extracts a float Histogram from the -// provided proto message to a Float Histogram. The caller has to make sure that -// the proto message represents a float histogram and not an integer histogram, -// or it panics. -func FloatHistogramProtoV2ToFloatHistogram(hp writev2.Histogram) *histogram.FloatHistogram { - if !hp.IsFloatHistogram() { - panic("FloatHistogramProtoToFloatHistogram called with an integer histogram") - } - return &histogram.FloatHistogram{ - CounterResetHint: histogram.CounterResetHint(hp.ResetHint), - Schema: hp.Schema, - ZeroThreshold: hp.ZeroThreshold, - ZeroCount: hp.GetZeroCountFloat(), - Count: hp.GetCountFloat(), - Sum: hp.Sum, - PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()), - PositiveBuckets: hp.GetPositiveCounts(), - NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()), - NegativeBuckets: hp.GetNegativeCounts(), - } -} - -// HistogramProtoToFloatHistogram extracts and converts a (normal integer) histogram from the provided proto message -// to a float histogram. The caller has to make sure that the proto message represents an integer histogram and not a -// float histogram, or it panics. -func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram { - if hp.IsFloatHistogram() { - panic("HistogramProtoToFloatHistogram called with a float histogram") - } - return &histogram.FloatHistogram{ - CounterResetHint: histogram.CounterResetHint(hp.ResetHint), - Schema: hp.Schema, - ZeroThreshold: hp.ZeroThreshold, - ZeroCount: float64(hp.GetZeroCountInt()), - Count: float64(hp.GetCountInt()), - Sum: hp.Sum, - PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), - PositiveBuckets: deltasToCounts(hp.GetPositiveDeltas()), - NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), - NegativeBuckets: deltasToCounts(hp.GetNegativeDeltas()), - } -} - -func FloatV2HistogramProtoToFloatHistogram(hp writev2.Histogram) *histogram.FloatHistogram { - if !hp.IsFloatHistogram() { - panic("FloatHistogramProtoToFloatHistogram called with an integer histogram") - } - return &histogram.FloatHistogram{ - CounterResetHint: histogram.CounterResetHint(hp.ResetHint), - Schema: hp.Schema, - ZeroThreshold: hp.ZeroThreshold, - ZeroCount: hp.GetZeroCountFloat(), - Count: hp.GetCountFloat(), - Sum: hp.Sum, - PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()), - PositiveBuckets: hp.GetPositiveCounts(), - NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()), - NegativeBuckets: hp.GetNegativeCounts(), - } -} - -// V2HistogramProtoToHistogram extracts a (normal integer) Histogram from the -// provided proto message. The caller has to make sure that the proto message -// represents an integer histogram and not a float histogram, or it panics. -func V2HistogramProtoToHistogram(hp writev2.Histogram) *histogram.Histogram { - if hp.IsFloatHistogram() { - panic("HistogramProtoToHistogram called with a float histogram") - } - return &histogram.Histogram{ - CounterResetHint: histogram.CounterResetHint(hp.ResetHint), - Schema: hp.Schema, - ZeroThreshold: hp.ZeroThreshold, - ZeroCount: hp.GetZeroCountInt(), - Count: hp.GetCountInt(), - Sum: hp.Sum, - PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()), - PositiveBuckets: hp.GetPositiveDeltas(), - NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()), - NegativeBuckets: hp.GetNegativeDeltas(), - } -} - -func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span { - spans := make([]histogram.Span, len(s)) - for i := 0; i < len(s); i++ { - spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length} - } - - return spans -} - -func spansProtoV2ToSpans(s []writev2.BucketSpan) []histogram.Span { - spans := make([]histogram.Span, len(s)) - for i := 0; i < len(s); i++ { - spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length} - } - - return spans -} - -func deltasToCounts(deltas []int64) []float64 { - counts := make([]float64, len(deltas)) - var cur float64 - for i, d := range deltas { - cur += float64(d) - counts[i] = cur - } - return counts -} - -func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram { - return prompb.Histogram{ - Count: &prompb.Histogram_CountInt{CountInt: h.Count}, - Sum: h.Sum, - Schema: h.Schema, - ZeroThreshold: h.ZeroThreshold, - ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount}, - NegativeSpans: spansToSpansProto(h.NegativeSpans), - NegativeDeltas: h.NegativeBuckets, - PositiveSpans: spansToSpansProto(h.PositiveSpans), - PositiveDeltas: h.PositiveBuckets, - ResetHint: prompb.Histogram_ResetHint(h.CounterResetHint), - Timestamp: timestamp, - } -} - -func HistogramToV2HistogramProto(timestamp int64, h *histogram.Histogram) writev2.Histogram { - return writev2.Histogram{ - Count: &writev2.Histogram_CountInt{CountInt: h.Count}, - Sum: h.Sum, - Schema: h.Schema, - ZeroThreshold: h.ZeroThreshold, - ZeroCount: &writev2.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount}, - NegativeSpans: spansToV2SpansProto(h.NegativeSpans), - NegativeDeltas: h.NegativeBuckets, - PositiveSpans: spansToV2SpansProto(h.PositiveSpans), - PositiveDeltas: h.PositiveBuckets, - ResetHint: writev2.Histogram_ResetHint(h.CounterResetHint), - Timestamp: timestamp, - } -} - -func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) prompb.Histogram { - return prompb.Histogram{ - Count: &prompb.Histogram_CountFloat{CountFloat: fh.Count}, - Sum: fh.Sum, - Schema: fh.Schema, - ZeroThreshold: fh.ZeroThreshold, - ZeroCount: &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount}, - NegativeSpans: spansToSpansProto(fh.NegativeSpans), - NegativeCounts: fh.NegativeBuckets, - PositiveSpans: spansToSpansProto(fh.PositiveSpans), - PositiveCounts: fh.PositiveBuckets, - ResetHint: prompb.Histogram_ResetHint(fh.CounterResetHint), - Timestamp: timestamp, - } -} - -func FloatHistogramToV2HistogramProto(timestamp int64, fh *histogram.FloatHistogram) writev2.Histogram { - return writev2.Histogram{ - Count: &writev2.Histogram_CountFloat{CountFloat: fh.Count}, - Sum: fh.Sum, - Schema: fh.Schema, - ZeroThreshold: fh.ZeroThreshold, - ZeroCount: &writev2.Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount}, - NegativeSpans: spansToV2SpansProto(fh.NegativeSpans), - NegativeCounts: fh.NegativeBuckets, - PositiveSpans: spansToV2SpansProto(fh.PositiveSpans), - PositiveCounts: fh.PositiveBuckets, - ResetHint: writev2.Histogram_ResetHint(fh.CounterResetHint), - Timestamp: timestamp, - } -} - -func spansToSpansProto(s []histogram.Span) []prompb.BucketSpan { - spans := make([]prompb.BucketSpan, len(s)) - for i := 0; i < len(s); i++ { - spans[i] = prompb.BucketSpan{Offset: s[i].Offset, Length: s[i].Length} - } - - return spans -} - -func spansToV2SpansProto(s []histogram.Span) []writev2.BucketSpan { - spans := make([]writev2.BucketSpan, len(s)) - for i := 0; i < len(s); i++ { - spans[i] = writev2.BucketSpan{Offset: s[i].Offset, Length: s[i].Length} - } - - return spans -} - // LabelProtosToMetric unpack a []*prompb.Label to a model.Metric. func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric { metric := make(model.Metric, len(labelPairs)) @@ -915,57 +621,6 @@ func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric { return metric } -// LabelProtosToLabels transforms prompb labels into labels. The labels builder -// will be used to build the returned labels. -func LabelProtosToLabels(b *labels.ScratchBuilder, labelPairs []prompb.Label) labels.Labels { - b.Reset() - for _, l := range labelPairs { - b.Add(l.Name, l.Value) - } - b.Sort() - return b.Labels() -} - -// LabelsToLabelsProto transforms labels into prompb labels. The buffer slice -// will be used to avoid allocations if it is big enough to store the labels. -func LabelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label { - result := buf[:0] - lbls.Range(func(l labels.Label) { - result = append(result, prompb.Label{ - Name: l.Name, - Value: l.Value, - }) - }) - return result -} - -// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum. -func metricTypeToMetricTypeProto(t model.MetricType) prompb.MetricMetadata_MetricType { - mt := strings.ToUpper(string(t)) - v, ok := prompb.MetricMetadata_MetricType_value[mt] - if !ok { - return prompb.MetricMetadata_UNKNOWN - } - - return prompb.MetricMetadata_MetricType(v) -} - -// metricTypeToMetricTypeProtoV2 transforms a Prometheus metricType into writev2 metricType. Since the former is a string we need to transform it to an enum. -func metricTypeToMetricTypeProtoV2(t model.MetricType) writev2.Metadata_MetricType { - mt := strings.ToUpper(string(t)) - v, ok := prompb.MetricMetadata_MetricType_value[mt] - if !ok { - return writev2.Metadata_METRIC_TYPE_UNSPECIFIED - } - - return writev2.Metadata_MetricType(v) -} - -func metricTypeFromProtoV2Equivalent(t writev2.Metadata_MetricType) model.MetricType { - mt := strings.ToLower(t.String()) - return model.MetricType(mt) // TODO(@tpaschalis) a better way for this? -} - // DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling // snappy decompression. func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { @@ -1058,74 +713,3 @@ func DecodeV2WriteRequestStr(r io.Reader) (*writev2.Request, error) { return &req, nil } - -func V2WriteRequestToWriteRequest(redReq *writev2.Request) (*prompb.WriteRequest, error) { - req := &prompb.WriteRequest{ - Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)), - // TODO handle metadata? - } - - for i, rts := range redReq.Timeseries { - writev2.DesymbolizeLabels(rts.LabelsRefs, redReq.Symbols).Range(func(l labels.Label) { - req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{ - Name: l.Name, - Value: l.Value, - }) - }) - - exemplars := make([]prompb.Exemplar, len(rts.Exemplars)) - for j, e := range rts.Exemplars { - exemplars[j].Value = e.Value - exemplars[j].Timestamp = e.Timestamp - writev2.DesymbolizeLabels(e.LabelsRefs, redReq.Symbols).Range(func(l labels.Label) { - exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{ - Name: l.Name, - Value: l.Value, - }) - }) - } - req.Timeseries[i].Exemplars = exemplars - - req.Timeseries[i].Samples = make([]prompb.Sample, len(rts.Samples)) - for j, s := range rts.Samples { - req.Timeseries[i].Samples[j].Timestamp = s.Timestamp - req.Timeseries[i].Samples[j].Value = s.Value - } - - req.Timeseries[i].Histograms = make([]prompb.Histogram, len(rts.Histograms)) - for j, h := range rts.Histograms { - // TODO: double check - if h.IsFloatHistogram() { - req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountFloat{CountFloat: h.GetCountFloat()} - req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: h.GetZeroCountFloat()} - } else { - req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountInt{CountInt: h.GetCountInt()} - req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()} - } - - for _, span := range h.NegativeSpans { - req.Timeseries[i].Histograms[j].NegativeSpans = append(req.Timeseries[i].Histograms[j].NegativeSpans, prompb.BucketSpan{ - Offset: span.Offset, - Length: span.Length, - }) - } - for _, span := range h.PositiveSpans { - req.Timeseries[i].Histograms[j].PositiveSpans = append(req.Timeseries[i].Histograms[j].PositiveSpans, prompb.BucketSpan{ - Offset: span.Offset, - Length: span.Length, - }) - } - - req.Timeseries[i].Histograms[j].Sum = h.Sum - req.Timeseries[i].Histograms[j].Schema = h.Schema - req.Timeseries[i].Histograms[j].ZeroThreshold = h.ZeroThreshold - req.Timeseries[i].Histograms[j].NegativeDeltas = h.NegativeDeltas - req.Timeseries[i].Histograms[j].NegativeCounts = h.NegativeCounts - req.Timeseries[i].Histograms[j].PositiveDeltas = h.PositiveDeltas - req.Timeseries[i].Histograms[j].PositiveCounts = h.PositiveCounts - req.Timeseries[i].Histograms[j].ResetHint = prompb.Histogram_ResetHint(h.ResetHint) - req.Timeseries[i].Histograms[j].Timestamp = h.Timestamp - } - } - return req, nil -} diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 3eed28320e..96c6454ecc 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -21,7 +21,6 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/histogram" @@ -59,7 +58,7 @@ var writeRequestFixture = &prompb.WriteRequest{ }, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}}, - Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))}, + Histograms: []prompb.Histogram{prompb.FromIntHistogram(0, &testHistogram), prompb.FromFloatHistogram(1, testHistogram.ToFloat(nil))}, }, { Labels: []prompb.Label{ @@ -71,7 +70,7 @@ var writeRequestFixture = &prompb.WriteRequest{ }, Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}}, - Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat(nil))}, + Histograms: []prompb.Histogram{prompb.FromIntHistogram(2, &testHistogram), prompb.FromFloatHistogram(3, testHistogram.ToFloat(nil))}, }, }, } @@ -79,37 +78,23 @@ var writeRequestFixture = &prompb.WriteRequest{ // writeV2RequestFixture represents the same request as writeRequestFixture, but using the v2 representation. var writeV2RequestFixture = func() *writev2.Request { st := writev2.NewSymbolTable() - var labels []uint32 - for _, s := range []string{ - "__name__", "test_metric1", - "b", "c", - "baz", "qux", - "d", "e", - "foo", "bar", - } { - ref := st.Symbolize(s) - labels = append(labels, ref) - } - for _, s := range []string{ - "f", "g", // 10, 11 - "h", "i", // 12, 13 - } { - _ = st.Symbolize(s) - } - + b := labels.NewScratchBuilder(0) + labelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].ToLabels(&b, nil), nil) + exemplar1LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].Exemplars[0].ToExemplar(&b, nil).Labels, nil) + exemplar2LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].Exemplars[0].ToExemplar(&b, nil).Labels, nil) return &writev2.Request{ Timeseries: []writev2.TimeSeries{ { - LabelsRefs: labels, + LabelsRefs: labelRefs, Samples: []writev2.Sample{{Value: 1, Timestamp: 0}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{10, 11}, Value: 1, Timestamp: 0}}, - Histograms: []writev2.Histogram{HistogramToV2HistogramProto(0, &testHistogram), FloatHistogramToV2HistogramProto(1, testHistogram.ToFloat(nil))}, + Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 0}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(0, &testHistogram), writev2.FromFloatHistogram(1, testHistogram.ToFloat(nil))}, }, { - LabelsRefs: labels, + LabelsRefs: labelRefs, Samples: []writev2.Sample{{Value: 2, Timestamp: 1}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{12, 13}, Value: 2, Timestamp: 1}}, - Histograms: []writev2.Histogram{HistogramToV2HistogramProto(2, &testHistogram), FloatHistogramToV2HistogramProto(3, testHistogram.ToFloat(nil))}, + Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 1}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(2, &testHistogram), writev2.FromFloatHistogram(3, testHistogram.ToFloat(nil))}, }, }, Symbols: st.Symbols(), @@ -310,7 +295,7 @@ func TestConcreteSeriesIterator_HistogramSamples(t *testing.T) { } else { ts = int64(i) } - histProtos[i] = HistogramToHistogramProto(ts, h) + histProtos[i] = prompb.FromIntHistogram(ts, h) } series := &concreteSeries{ labels: labels.FromStrings("foo", "bar"), @@ -361,9 +346,9 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) { histProtos := make([]prompb.Histogram, len(histograms)) for i, h := range histograms { if i < 10 { - histProtos[i] = HistogramToHistogramProto(int64(i+1), h) + histProtos[i] = prompb.FromIntHistogram(int64(i+1), h) } else { - histProtos[i] = HistogramToHistogramProto(int64(i+6), h) + histProtos[i] = prompb.FromIntHistogram(int64(i+6), h) } } series := &concreteSeries{ @@ -443,7 +428,7 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) { require.Equal(t, chunkenc.ValHistogram, it.Next()) ts, fh = it.AtFloatHistogram(nil) require.Equal(t, int64(17), ts) - expected := HistogramProtoToFloatHistogram(HistogramToHistogramProto(int64(17), histograms[11])) + expected := prompb.FromIntHistogram(int64(17), histograms[11]).ToFloatHistogram() require.Equal(t, expected, fh) // Keep calling Next() until the end. @@ -527,37 +512,6 @@ func TestMergeLabels(t *testing.T) { } } -func TestMetricTypeToMetricTypeProto(t *testing.T) { - tc := []struct { - desc string - input model.MetricType - expected prompb.MetricMetadata_MetricType - }{ - { - desc: "with a single-word metric", - input: model.MetricTypeCounter, - expected: prompb.MetricMetadata_COUNTER, - }, - { - desc: "with a two-word metric", - input: model.MetricTypeStateset, - expected: prompb.MetricMetadata_STATESET, - }, - { - desc: "with an unknown metric", - input: "not-known", - expected: prompb.MetricMetadata_UNKNOWN, - }, - } - - for _, tt := range tc { - t.Run(tt.desc, func(t *testing.T) { - m := metricTypeToMetricTypeProto(tt.input) - require.Equal(t, tt.expected, m) - }) - } -} - func TestDecodeWriteRequest(t *testing.T) { buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy") require.NoError(t, err) @@ -576,212 +530,9 @@ func TestDecodeV2WriteRequest(t *testing.T) { require.Equal(t, writeV2RequestFixture, actual) } -func TestNilHistogramProto(t *testing.T) { - // This function will panic if it impromperly handles nil - // values, causing the test to fail. - HistogramProtoToHistogram(prompb.Histogram{}) - HistogramProtoToFloatHistogram(prompb.Histogram{}) -} - -func exampleHistogram() histogram.Histogram { - return histogram.Histogram{ - CounterResetHint: histogram.GaugeType, - Schema: 0, - Count: 19, - Sum: 2.7, - PositiveSpans: []histogram.Span{ - {Offset: 0, Length: 4}, - {Offset: 0, Length: 0}, - {Offset: 0, Length: 3}, - }, - PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, - NegativeSpans: []histogram.Span{ - {Offset: 0, Length: 5}, - {Offset: 1, Length: 0}, - {Offset: 0, Length: 1}, - }, - NegativeBuckets: []int64{1, 2, -2, 1, -1, 0}, - } -} - -func exampleHistogramProto() prompb.Histogram { - return prompb.Histogram{ - Count: &prompb.Histogram_CountInt{CountInt: 19}, - Sum: 2.7, - Schema: 0, - ZeroThreshold: 0, - ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: 0}, - NegativeSpans: []prompb.BucketSpan{ - { - Offset: 0, - Length: 5, - }, - { - Offset: 1, - Length: 0, - }, - { - Offset: 0, - Length: 1, - }, - }, - NegativeDeltas: []int64{1, 2, -2, 1, -1, 0}, - PositiveSpans: []prompb.BucketSpan{ - { - Offset: 0, - Length: 4, - }, - { - Offset: 0, - Length: 0, - }, - { - Offset: 0, - Length: 3, - }, - }, - PositiveDeltas: []int64{1, 2, -2, 1, -1, 0, 0}, - ResetHint: prompb.Histogram_GAUGE, - Timestamp: 1337, - } -} - -func TestHistogramToProtoConvert(t *testing.T) { - tests := []struct { - input histogram.CounterResetHint - expected prompb.Histogram_ResetHint - }{ - { - input: histogram.UnknownCounterReset, - expected: prompb.Histogram_UNKNOWN, - }, - { - input: histogram.CounterReset, - expected: prompb.Histogram_YES, - }, - { - input: histogram.NotCounterReset, - expected: prompb.Histogram_NO, - }, - { - input: histogram.GaugeType, - expected: prompb.Histogram_GAUGE, - }, - } - - for _, test := range tests { - h := exampleHistogram() - h.CounterResetHint = test.input - p := exampleHistogramProto() - p.ResetHint = test.expected - - require.Equal(t, p, HistogramToHistogramProto(1337, &h)) - - require.Equal(t, h, *HistogramProtoToHistogram(p)) - } -} - -func exampleFloatHistogram() histogram.FloatHistogram { - return histogram.FloatHistogram{ - CounterResetHint: histogram.GaugeType, - Schema: 0, - Count: 19, - Sum: 2.7, - PositiveSpans: []histogram.Span{ - {Offset: 0, Length: 4}, - {Offset: 0, Length: 0}, - {Offset: 0, Length: 3}, - }, - PositiveBuckets: []float64{1, 2, -2, 1, -1, 0, 0}, - NegativeSpans: []histogram.Span{ - {Offset: 0, Length: 5}, - {Offset: 1, Length: 0}, - {Offset: 0, Length: 1}, - }, - NegativeBuckets: []float64{1, 2, -2, 1, -1, 0}, - } -} - -func exampleFloatHistogramProto() prompb.Histogram { - return prompb.Histogram{ - Count: &prompb.Histogram_CountFloat{CountFloat: 19}, - Sum: 2.7, - Schema: 0, - ZeroThreshold: 0, - ZeroCount: &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: 0}, - NegativeSpans: []prompb.BucketSpan{ - { - Offset: 0, - Length: 5, - }, - { - Offset: 1, - Length: 0, - }, - { - Offset: 0, - Length: 1, - }, - }, - NegativeCounts: []float64{1, 2, -2, 1, -1, 0}, - PositiveSpans: []prompb.BucketSpan{ - { - Offset: 0, - Length: 4, - }, - { - Offset: 0, - Length: 0, - }, - { - Offset: 0, - Length: 3, - }, - }, - PositiveCounts: []float64{1, 2, -2, 1, -1, 0, 0}, - ResetHint: prompb.Histogram_GAUGE, - Timestamp: 1337, - } -} - -func TestFloatHistogramToProtoConvert(t *testing.T) { - tests := []struct { - input histogram.CounterResetHint - expected prompb.Histogram_ResetHint - }{ - { - input: histogram.UnknownCounterReset, - expected: prompb.Histogram_UNKNOWN, - }, - { - input: histogram.CounterReset, - expected: prompb.Histogram_YES, - }, - { - input: histogram.NotCounterReset, - expected: prompb.Histogram_NO, - }, - { - input: histogram.GaugeType, - expected: prompb.Histogram_GAUGE, - }, - } - - for _, test := range tests { - h := exampleFloatHistogram() - h.CounterResetHint = test.input - p := exampleFloatHistogramProto() - p.ResetHint = test.expected - - require.Equal(t, p, FloatHistogramToHistogramProto(1337, &h)) - - require.Equal(t, h, *FloatHistogramProtoToFloatHistogram(p)) - } -} - func TestStreamResponse(t *testing.T) { - lbs1 := LabelsToLabelsProto(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil) - lbs2 := LabelsToLabelsProto(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil) + lbs1 := prompb.FromLabels(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil) + lbs2 := prompb.FromLabels(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil) chunk := prompb.Chunk{ Type: prompb.Chunk_XOR, Data: make([]byte, 100), @@ -853,7 +604,7 @@ func (c *mockChunkSeriesSet) Next() bool { func (c *mockChunkSeriesSet) At() storage.ChunkSeries { return &storage.ChunkSeriesEntry{ - Lset: LabelProtosToLabels(&c.builder, c.chunkedSeries[c.index].Labels), + Lset: c.chunkedSeries[c.index].ToLabels(&c.builder, nil), ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { return &mockChunkIterator{ chunks: c.chunkedSeries[c.index].Chunks, diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 475b6fcd77..72006d1526 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -553,7 +553,7 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr mm = append(mm, prompb.MetricMetadata{ MetricFamilyName: entry.Metric, Help: entry.Help, - Type: metricTypeToMetricTypeProto(entry.Type), + Type: prompb.FromMetadataType(entry.Type), Unit: entry.Unit, }) } @@ -1628,7 +1628,8 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // stop reading from the queue. This makes it safe to reference pendingSamples by index. - pendingData[nPending].Labels = LabelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + pendingData[nPending].Labels = prompb.FromLabels(d.seriesLabels, pendingData[nPending].Labels) + switch d.sType { case tSample: pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ @@ -1638,16 +1639,16 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen nPendingSamples++ case tExemplar: pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{ - Labels: LabelsToLabelsProto(d.exemplarLabels, nil), + Labels: prompb.FromLabels(d.exemplarLabels, nil), Value: d.value, Timestamp: d.timestamp, }) nPendingExemplars++ case tHistogram: - pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram)) + pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, prompb.FromIntHistogram(d.timestamp, d.histogram)) nPendingHistograms++ case tFloatHistogram: - pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram)) + pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, prompb.FromFloatHistogram(d.timestamp, d.floatHistogram)) nPendingHistograms++ } } @@ -1879,7 +1880,7 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData[nPending].Samples = pendingData[nPending].Samples[:0] // todo: should we also safeguard against empty metadata here? if d.metadata != nil { - pendingData[nPending].Metadata.Type = metricTypeToMetricTypeProtoV2(d.metadata.Type) + pendingData[nPending].Metadata.Type = writev2.FromMetadataType(d.metadata.Type) pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Help) pendingData[nPending].Metadata.HelpRef = symbolTable.Symbolize(d.metadata.Unit) nPendingMetadata++ @@ -1911,10 +1912,10 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, }) nPendingExemplars++ case tHistogram: - pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToV2HistogramProto(d.timestamp, d.histogram)) + pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, writev2.FromIntHistogram(d.timestamp, d.histogram)) nPendingHistograms++ case tFloatHistogram: - pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToV2HistogramProto(d.timestamp, d.floatHistogram)) + pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, writev2.FromFloatHistogram(d.timestamp, d.floatHistogram)) nPendingHistograms++ case tMetadata: // TODO: log or return an error? diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 4685a2405e..1ccf8a3aed 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -1023,7 +1023,7 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco for _, s := range ss { tsID := getSeriesIDFromRef(series[s.Ref]) e := prompb.Exemplar{ - Labels: LabelsToLabelsProto(s.Labels, nil), + Labels: prompb.FromLabels(s.Labels, nil), Timestamp: s.T, Value: s.V, } @@ -1040,7 +1040,7 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie for _, h := range hh { tsID := getSeriesIDFromRef(series[h.Ref]) - c.expectedHistograms[tsID] = append(c.expectedHistograms[tsID], HistogramToHistogramProto(h.T, h.H)) + c.expectedHistograms[tsID] = append(c.expectedHistograms[tsID], prompb.FromIntHistogram(h.T, h.H)) } } @@ -1053,7 +1053,7 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa for _, fh := range fhs { tsID := getSeriesIDFromRef(series[fh.Ref]) - c.expectedFloatHistograms[tsID] = append(c.expectedFloatHistograms[tsID], FloatHistogramToHistogramProto(fh.T, fh.FH)) + c.expectedFloatHistograms[tsID] = append(c.expectedFloatHistograms[tsID], prompb.FromFloatHistogram(fh.T, fh.FH)) } } @@ -1155,7 +1155,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { var reqProtoV2 writev2.Request err = proto.Unmarshal(reqBuf, &reqProtoV2) if err == nil { - reqProto, err = V2WriteRequestToWriteRequest(&reqProtoV2) + reqProto, err = v2RequestToWriteRequest(&reqProtoV2) } } if err != nil { @@ -1166,9 +1166,9 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { return errors.New("invalid request, no timeseries") } - builder := labels.NewScratchBuilder(0) + b := labels.NewScratchBuilder(0) for _, ts := range reqProto.Timeseries { - labels := LabelProtosToLabels(&builder, ts.Labels) + labels := ts.ToLabels(&b, nil) tsID := labels.String() if len(ts.Samples) > 0 { c.receivedSamples[tsID] = append(c.receivedSamples[tsID], ts.Samples...) @@ -1202,6 +1202,51 @@ func (c *TestWriteClient) Endpoint() string { return "http://test-remote.com/1234" } +func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, error) { + req := &prompb.WriteRequest{ + Timeseries: make([]prompb.TimeSeries, len(v2Req.Timeseries)), + // TODO handle metadata? + } + b := labels.NewScratchBuilder(0) + for i, rts := range v2Req.Timeseries { + rts.ToLabels(&b, v2Req.Symbols).Range(func(l labels.Label) { + req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{ + Name: l.Name, + Value: l.Value, + }) + }) + + exemplars := make([]prompb.Exemplar, len(rts.Exemplars)) + for j, e := range rts.Exemplars { + exemplars[j].Value = e.Value + exemplars[j].Timestamp = e.Timestamp + e.ToExemplar(&b, v2Req.Symbols).Labels.Range(func(l labels.Label) { + exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{ + Name: l.Name, + Value: l.Value, + }) + }) + } + req.Timeseries[i].Exemplars = exemplars + + req.Timeseries[i].Samples = make([]prompb.Sample, len(rts.Samples)) + for j, s := range rts.Samples { + req.Timeseries[i].Samples[j].Timestamp = s.Timestamp + req.Timeseries[i].Samples[j].Value = s.Value + } + + req.Timeseries[i].Histograms = make([]prompb.Histogram, len(rts.Histograms)) + for j, h := range rts.Histograms { + if h.IsFloatHistogram() { + req.Timeseries[i].Histograms[j] = prompb.FromFloatHistogram(h.Timestamp, h.ToFloatHistogram()) + continue + } + req.Timeseries[i].Histograms[j] = prompb.FromIntHistogram(h.Timestamp, h.ToIntHistogram()) + } + } + return req, nil +} + // TestBlockingWriteClient is a queue_manager WriteClient which will block // on any calls to Store(), until the request's Context is cancelled, at which // point the `numCalls` property will contain a count of how many times Store() diff --git a/storage/remote/read_handler_test.go b/storage/remote/read_handler_test.go index 452b292210..a681872687 100644 --- a/storage/remote/read_handler_test.go +++ b/storage/remote/read_handler_test.go @@ -124,7 +124,7 @@ func TestSampledReadEndpoint(t *testing.T) { {Name: "d", Value: "e"}, }, Histograms: []prompb.Histogram{ - FloatHistogramToHistogramProto(0, tsdbutil.GenerateTestFloatHistogram(0)), + prompb.FromFloatHistogram(0, tsdbutil.GenerateTestFloatHistogram(0)), }, }, }, diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 9415a80fd5..e57a932263 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -173,12 +173,12 @@ func TestSeriesSetFilter(t *testing.T) { toRemove: []string{"foo"}, in: &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ - {Labels: LabelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b"), nil)}, + {Labels: prompb.FromLabels(labels.FromStrings("foo", "bar", "a", "b"), nil)}, }, }, expected: &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ - {Labels: LabelsToLabelsProto(labels.FromStrings("a", "b"), nil)}, + {Labels: prompb.FromLabels(labels.FromStrings("a", "b"), nil)}, }, }, }, @@ -212,7 +212,7 @@ func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prom q := &prompb.QueryResult{} for _, s := range c.store { - l := LabelProtosToLabels(&c.b, s.Labels) + l := s.ToLabels(&c.b, nil) var notMatch bool for _, m := range matchers { diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index a1447d26ea..9997811ab0 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -227,7 +227,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err b := labels.NewScratchBuilder(0) for _, ts := range req.Timeseries { - ls := LabelProtosToLabels(&b, ts.Labels) + ls := ts.ToLabels(&b, nil) if !ls.IsValid() { level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", ls.String()) samplesWithInvalidLabels++ @@ -240,7 +240,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err } for _, ep := range ts.Exemplars { - e := exemplarProtoToExemplar(&b, ep) + e := ep.ToExemplar(&b, nil) h.appendExemplar(timeLimitApp, e, ls, &outOfOrderExemplarErrs) } @@ -276,8 +276,9 @@ func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (err e err = timeLimitApp.Commit() }() + b := labels.NewScratchBuilder(0) for _, ts := range req.Timeseries { - ls := writev2.DesymbolizeLabels(ts.LabelsRefs, req.Symbols) + ls := ts.ToLabels(&b, req.Symbols) err := h.appendSamplesV2(timeLimitApp, ts.Samples, ls) if err != nil { @@ -285,7 +286,7 @@ func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (err e } for _, ep := range ts.Exemplars { - e := exemplarProtoV2ToExemplar(ep, req.Symbols) + e := ep.ToExemplar(&b, req.Symbols) h.appendExemplar(timeLimitApp, e, ls, &outOfOrderExemplarErrs) } @@ -294,7 +295,7 @@ func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (err e return err } - m := metadataProtoV2ToMetadata(ts.Metadata, req.Symbols) + m := ts.ToMetadata(req.Symbols) if _, err = timeLimitApp.UpdateMetadata(0, ls, m); err != nil { level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err) } @@ -358,11 +359,9 @@ func (h *writeHandler) appendHistograms(app storage.Appender, hh []prompb.Histog var err error for _, hp := range hh { if hp.IsFloatHistogram() { - fhs := FloatHistogramProtoToFloatHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) + _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, hp.ToFloatHistogram()) } else { - hs := HistogramProtoToHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil) + _, err = app.AppendHistogram(0, labels, hp.Timestamp, hp.ToIntHistogram(), nil) } if err != nil { unwrappedErr := errors.Unwrap(err) @@ -384,11 +383,9 @@ func (h *writeHandler) appendHistogramsV2(app storage.Appender, hh []writev2.His var err error for _, hp := range hh { if hp.IsFloatHistogram() { - fhs := FloatV2HistogramProtoToFloatHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) + _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, hp.ToFloatHistogram()) } else { - hs := V2HistogramProtoToHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil) + _, err = app.AppendHistogram(0, labels, hp.Timestamp, hp.ToIntHistogram(), nil) } if err != nil { unwrappedErr := errors.Unwrap(err) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index f01b758455..7d89fd009a 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -267,24 +267,22 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) { j := 0 k := 0 for _, ts := range writeRequestFixture.Timeseries { - labels := LabelProtosToLabels(&b, ts.Labels) + labels := ts.ToLabels(&b, nil) for _, s := range ts.Samples { requireEqual(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) i++ } - for _, e := range ts.Exemplars { - exemplarLabels := LabelProtosToLabels(&b, e.Labels) + exemplarLabels := e.ToExemplar(&b, nil).Labels requireEqual(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) j++ } - for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { - fh := FloatHistogramProtoToFloatHistogram(hp) + fh := hp.ToFloatHistogram() requireEqual(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k]) } else { - h := HistogramProtoToHistogram(hp) + h := hp.ToIntHistogram() requireEqual(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k]) } @@ -313,33 +311,29 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { resp := recorder.Result() require.Equal(t, http.StatusNoContent, resp.StatusCode) + b := labels.NewScratchBuilder(0) i := 0 j := 0 k := 0 - // the reduced write request is equivalent to the write request fixture. - // we can use it for for _, ts := range writeV2RequestFixture.Timeseries { - ls := writev2.DesymbolizeLabels(ts.LabelsRefs, writeV2RequestFixture.Symbols) + ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols) for _, s := range ts.Samples { - require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) + requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) i++ } - for _, e := range ts.Exemplars { - exemplarLabels := writev2.DesymbolizeLabels(e.LabelsRefs, writeV2RequestFixture.Symbols) - require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) + exemplarLabels := e.ToExemplar(&b, writeV2RequestFixture.Symbols).Labels + requireEqual(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) j++ } - for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { - fh := FloatHistogramProtoV2ToFloatHistogram(hp) - require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) + fh := hp.ToFloatHistogram() + requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) } else { - h := HistogramProtoV2ToHistogram(hp) - require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) + h := hp.ToIntHistogram() + requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) } - k++ } @@ -348,7 +342,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { } func TestOutOfOrderSample_V1Message(t *testing.T) { - tests := []struct { + for _, tc := range []struct { Name string Timestamp int64 }{ @@ -360,9 +354,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) { Name: "future", Timestamp: math.MaxInt64, }, - } - - for _, tc := range tests { + } { t.Run(tc.Name, func(t *testing.T) { payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, @@ -386,7 +378,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) { } func TestOutOfOrderSample_V2Message(t *testing.T) { - tests := []struct { + for _, tc := range []struct { Name string Timestamp int64 }{ @@ -398,9 +390,7 @@ func TestOutOfOrderSample_V2Message(t *testing.T) { Name: "future", Timestamp: math.MaxInt64, }, - } - - for _, tc := range tests { + } { t.Run(tc.Name, func(t *testing.T) { payload, _, _, err := buildV2WriteRequest(nil, []writev2.TimeSeries{{ LabelsRefs: []uint32{0, 1}, @@ -513,7 +503,7 @@ func TestOutOfOrderExemplar_V2Message(t *testing.T) { } func TestOutOfOrderHistogram_V1Message(t *testing.T) { - tests := []struct { + for _, tc := range []struct { Name string Timestamp int64 }{ @@ -525,13 +515,11 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) { Name: "future", Timestamp: math.MaxInt64, }, - } - - for _, tc := range tests { + } { t.Run(tc.Name, func(t *testing.T) { payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, - Histograms: []prompb.Histogram{HistogramToHistogramProto(tc.Timestamp, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))}, + Histograms: []prompb.Histogram{prompb.FromIntHistogram(tc.Timestamp, &testHistogram), prompb.FromFloatHistogram(1, testHistogram.ToFloat(nil))}, }}, nil, nil, nil, nil, "snappy") require.NoError(t, err) @@ -551,27 +539,43 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) { } func TestOutOfOrderHistogram_V2Message(t *testing.T) { - payload, _, _, err := buildV2WriteRequest(nil, []writev2.TimeSeries{{ - LabelsRefs: []uint32{0, 1}, - Histograms: []writev2.Histogram{HistogramToV2HistogramProto(0, &testHistogram), FloatHistogramToV2HistogramProto(1, testHistogram.ToFloat(nil))}, - }}, []string{"__name__", "metric1"}, nil, nil, nil, "snappy") // TODO(bwplotka): No empty string! - require.NoError(t, err) + for _, tc := range []struct { + Name string + Timestamp int64 + }{ + { + Name: "historic", + Timestamp: 0, + }, + { + Name: "future", + Timestamp: math.MaxInt64, + }, + } { + t.Run(tc.Name, func(t *testing.T) { + payload, _, _, err := buildV2WriteRequest(nil, []writev2.TimeSeries{{ + LabelsRefs: []uint32{0, 1}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(0, &testHistogram), writev2.FromFloatHistogram(1, testHistogram.ToFloat(nil))}, + }}, []string{"__name__", "metric1"}, nil, nil, nil, "snappy") + require.NoError(t, err) - req, err := http.NewRequest("", "", bytes.NewReader(payload)) - require.NoError(t, err) + req, err := http.NewRequest("", "", bytes.NewReader(payload)) + require.NoError(t, err) - req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2]) - req.Header.Set("Content-Encoding", string(SnappyBlockCompression)) - req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) + req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2]) + req.Header.Set("Content-Encoding", string(SnappyBlockCompression)) + req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) - appendable := &mockAppendable{latestHistogram: 100} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + appendable := &mockAppendable{latestHistogram: 100} + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) - resp := recorder.Result() - require.Equal(t, http.StatusBadRequest, resp.StatusCode) + resp := recorder.Result() + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + } } func BenchmarkRemoteWriteHandler(b *testing.B) { @@ -584,7 +588,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) { {Name: "__name__", Value: "test_metric"}, {Name: "test_label_name_" + num, Value: labelValue + num}, }, - Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)}, + Histograms: []prompb.Histogram{prompb.FromIntHistogram(0, &testHistogram)}, }}, nil, nil, nil, nil, "snappy") require.NoError(b, err) req, err := http.NewRequest("", "", bytes.NewReader(buf))