This commit is contained in:
Junang Li 2024-09-03 12:18:53 -04:00
parent ba8397df61
commit 3e4fa98175
5 changed files with 289 additions and 9 deletions

View file

@ -26,12 +26,12 @@ import (
// NOTE(bwplotka): This file's code is tested in /prompb/rwcommon.
// ToLabels return model labels.Labels from timeseries' remote labels.
func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, _ []string) labels.Labels {
func (m *TimeSeries) ToLabels(b *labels.ScratchBuilder, _ []string) labels.Labels {
return labelProtosToLabels(b, m.GetLabels())
}
// ToLabels return model labels.Labels from timeseries' remote labels.
func (m ChunkedSeries) ToLabels(b *labels.ScratchBuilder, _ []string) labels.Labels {
func (m *ChunkedSeries) ToLabels(b *labels.ScratchBuilder, _ []string) labels.Labels {
return labelProtosToLabels(b, m.GetLabels())
}

View file

@ -25,12 +25,19 @@ import (
// NOTE(bwplotka): This file's code is tested in /prompb/rwcommon.
// ToLabels return model labels.Labels from timeseries' remote labels.
func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, symbols []string) labels.Labels {
func (m *TimeSeries) ToLabels(b *labels.ScratchBuilder, symbols []string) labels.Labels {
return desymbolizeLabels(b, m.GetLabelsRefs(), symbols)
}
// ToMetadata return model metadata from timeseries' remote metadata.
func (m TimeSeries) ToMetadata(symbols []string) metadata.Metadata {
func (m *TimeSeries) ToMetadata(symbols []string) metadata.Metadata {
if m.Metadata == nil {
return metadata.Metadata{
Type: model.MetricTypeUnknown,
Unit: "",
Help: "",
}
}
typ := model.MetricTypeUnknown
switch m.Metadata.Type {
case Metadata_METRIC_TYPE_COUNTER:
@ -79,14 +86,14 @@ func FromMetadataType(t model.MetricType) Metadata_MetricType {
}
// IsFloatHistogram returns true if the histogram is float.
func (h Histogram) IsFloatHistogram() bool {
func (h *Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}
// ToIntHistogram returns integer Prometheus histogram from the remote implementation
// of integer histogram. If it's a float histogram, the method returns nil.
func (h Histogram) ToIntHistogram() *histogram.Histogram {
func (h *Histogram) ToIntHistogram() *histogram.Histogram {
if h.IsFloatHistogram() {
return nil
}
@ -108,7 +115,7 @@ func (h Histogram) ToIntHistogram() *histogram.Histogram {
// ToFloatHistogram returns float Prometheus histogram from the remote implementation
// of float histogram. If the underlying implementation is an integer histogram, a
// conversion is performed.
func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram {
func (h *Histogram) ToFloatHistogram() *histogram.FloatHistogram {
if h.IsFloatHistogram() {
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(h.ResetHint),
@ -204,7 +211,7 @@ func spansToSpansProto(s []histogram.Span) []*BucketSpan {
return spans
}
func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, symbols []string) exemplar.Exemplar {
func (m *Exemplar) ToExemplar(b *labels.ScratchBuilder, symbols []string) exemplar.Exemplar {
timestamp := m.Timestamp
return exemplar.Exemplar{

View file

@ -0,0 +1,168 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package writev2
import (
math_bits "math/bits"
"slices"
)
func (m Sample) T() int64 { return m.Timestamp }
func (m Sample) V() float64 { return m.Value }
func (m *Request) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.SizeVT()
if cap(dst) < siz {
dst = make([]byte, siz)
}
n, err := m.OptimizedMarshalToSizedBuffer(dst[:siz])
if err != nil {
return nil, err
}
return dst[:n], nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but calls OptimizedMarshalToSizedBuffer on the timeseries.
func (m *Request) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
// Removed XXX_unrecognized handling
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].OptimizedMarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x2a
}
}
if len(m.Symbols) > 0 {
for iNdEx := len(m.Symbols) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Symbols[iNdEx])
copy(dAtA[i:], m.Symbols[iNdEx])
i = encodeVarintTypes(dAtA, i, uint64(len(m.Symbols[iNdEx])))
i--
dAtA[i] = 0x22
}
}
return len(dAtA) - i, nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but marshals m.LabelsRefs in place without extra allocations.
func (m *TimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.CreatedTimestamp != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.CreatedTimestamp))
i--
dAtA[i] = 0x30
}
{
size, err := m.Metadata.MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x2a
if len(m.Histograms) > 0 {
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Histograms[iNdEx].MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Exemplars) > 0 {
for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Exemplars[iNdEx].MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x22
}
}
if len(m.Samples) > 0 {
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Samples[iNdEx].MarshalToVT(dAtA[:i]) // Adjusted for vtproto
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.LabelsRefs) > 0 {
var j10 int
start := i
for _, num := range m.LabelsRefs {
for num >= 1<<7 {
dAtA[i-1] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
i--
j10++
}
dAtA[i-1] = uint8(num)
i--
j10++
}
slices.Reverse(dAtA[i:start])
i = encodeVarintTypes(dAtA, i, uint64(j10))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintTypes(dAtA []byte, offset int, v uint64) int {
offset -= sovTypes(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func sovTypes(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}

View file

@ -0,0 +1,97 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package writev2
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestOptimizedMarshal(t *testing.T) {
for _, tt := range []struct {
name string
m *Request
}{
{
name: "empty",
m: &Request{},
},
{
name: "simple",
m: &Request{
Timeseries: []TimeSeries{
{
LabelsRefs: []uint32{
0, 1,
2, 3,
4, 5,
6, 7,
8, 9,
10, 11,
12, 13,
14, 15,
},
Samples: []Sample{{Value: 1, Timestamp: 0}},
Exemplars: []Exemplar{{LabelsRefs: []uint32{0, 1}, Value: 1, Timestamp: 0}},
Histograms: nil,
},
{
LabelsRefs: []uint32{
0, 1,
2, 3,
4, 5,
6, 7,
8, 9,
10, 11,
12, 13,
14, 15,
},
Samples: []Sample{{Value: 2, Timestamp: 1}},
Exemplars: []Exemplar{{LabelsRefs: []uint32{0, 1}, Value: 2, Timestamp: 1}},
Histograms: nil,
},
},
Symbols: []string{
"a", "b",
"c", "d",
"e", "f",
"g", "h",
"i", "j",
"k", "l",
"m", "n",
"o", "p",
},
},
},
} {
t.Run(tt.name, func(t *testing.T) {
// Keep the slice allocated to mimic what std Marshal
// would give to sized Marshal.
got := make([]byte, 0)
// Should be the same as the standard marshal.
expected, err := tt.m.Marshal()
require.NoError(t, err)
got, err = tt.m.OptimizedMarshal(got)
require.NoError(t, err)
require.Equal(t, expected, got)
// Unmarshal should work too.
m := &Request{}
require.NoError(t, m.Unmarshal(got))
require.Equal(t, tt.m, m)
})
}
}

View file

@ -714,9 +714,17 @@ outer:
t.seriesMtx.Unlock()
continue
}
// TODO: LOG HERE,
// 1. if metadata is missing
// 2. see if serielization is broken, not inserted to serialization
// 3. deserialize right after serializa in the sender
//
// TODO(cstyan): Handle or at least log an error if no metadata is found.
// See https://github.com/prometheus/prometheus/issues/14405
meta := t.seriesMetadata[s.Ref]
meta, ok := t.seriesMetadata[s.Ref]
if !ok || meta == nil {
t.logger.Log("meta", meta, "lbls", lbls, "ref", s.Ref, "ok", ok)
}
t.seriesMtx.Unlock()
// Start with a very small backoff. This should not be t.cfg.MinBackoff
// as it can happen without errors, and we want to pickup work after