This commit is contained in:
Junang Li 2024-09-10 22:08:31 -04:00
parent 88acab7801
commit a74bec7b25
8 changed files with 348 additions and 65 deletions

View file

@ -15,4 +15,23 @@ plugins:
- plugin: buf.build/community/planetscale-vtprotobuf:v0.6.0 - plugin: buf.build/community/planetscale-vtprotobuf:v0.6.0
out: . out: .
opt: opt:
- paths=source_relative - paths=source_relative
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Request
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.TimeSeries
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Sample
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Histogram
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Exemplar
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Label
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Metadata
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.LabelPair
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.LabelValue
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.LabelName
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Type
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_XOR
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Delta
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Raw
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Zstd
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Snappy
- pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Lz4

View file

@ -12,6 +12,7 @@ import (
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
io "io" io "io"
math "math" math "math"
sync "sync"
unsafe "unsafe" unsafe "unsafe"
) )
@ -26,7 +27,7 @@ func (m *Request) CloneVT() *Request {
if m == nil { if m == nil {
return (*Request)(nil) return (*Request)(nil)
} }
r := new(Request) r := RequestFromVTPool()
if rhs := m.Symbols; rhs != nil { if rhs := m.Symbols; rhs != nil {
tmpContainer := make([]string, len(rhs)) tmpContainer := make([]string, len(rhs))
copy(tmpContainer, rhs) copy(tmpContainer, rhs)
@ -54,7 +55,7 @@ func (m *TimeSeries) CloneVT() *TimeSeries {
if m == nil { if m == nil {
return (*TimeSeries)(nil) return (*TimeSeries)(nil)
} }
r := new(TimeSeries) r := TimeSeriesFromVTPool()
r.Metadata = m.Metadata.CloneVT() r.Metadata = m.Metadata.CloneVT()
r.CreatedTimestamp = m.CreatedTimestamp r.CreatedTimestamp = m.CreatedTimestamp
if rhs := m.LabelsRefs; rhs != nil { if rhs := m.LabelsRefs; rhs != nil {
@ -98,7 +99,7 @@ func (m *Exemplar) CloneVT() *Exemplar {
if m == nil { if m == nil {
return (*Exemplar)(nil) return (*Exemplar)(nil)
} }
r := new(Exemplar) r := ExemplarFromVTPool()
r.Value = m.Value r.Value = m.Value
r.Timestamp = m.Timestamp r.Timestamp = m.Timestamp
if rhs := m.LabelsRefs; rhs != nil { if rhs := m.LabelsRefs; rhs != nil {
@ -121,7 +122,7 @@ func (m *Sample) CloneVT() *Sample {
if m == nil { if m == nil {
return (*Sample)(nil) return (*Sample)(nil)
} }
r := new(Sample) r := SampleFromVTPool()
r.Value = m.Value r.Value = m.Value
r.Timestamp = m.Timestamp r.Timestamp = m.Timestamp
if len(m.unknownFields) > 0 { if len(m.unknownFields) > 0 {
@ -139,7 +140,7 @@ func (m *Metadata) CloneVT() *Metadata {
if m == nil { if m == nil {
return (*Metadata)(nil) return (*Metadata)(nil)
} }
r := new(Metadata) r := MetadataFromVTPool()
r.Type = m.Type r.Type = m.Type
r.HelpRef = m.HelpRef r.HelpRef = m.HelpRef
r.UnitRef = m.UnitRef r.UnitRef = m.UnitRef
@ -158,7 +159,7 @@ func (m *Histogram) CloneVT() *Histogram {
if m == nil { if m == nil {
return (*Histogram)(nil) return (*Histogram)(nil)
} }
r := new(Histogram) r := HistogramFromVTPool()
r.Sum = m.Sum r.Sum = m.Sum
r.Schema = m.Schema r.Schema = m.Schema
r.ZeroThreshold = m.ZeroThreshold r.ZeroThreshold = m.ZeroThreshold
@ -1874,6 +1875,178 @@ func (m *BucketSpan) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) {
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
var vtprotoPool_Request = sync.Pool{
New: func() interface{} {
return &Request{}
},
}
func (m *Request) ResetVT() {
if m != nil {
f0 := m.Symbols[:0]
for _, mm := range m.Timeseries {
mm.ResetVT()
}
f1 := m.Timeseries[:0]
m.Reset()
m.Symbols = f0
m.Timeseries = f1
}
}
func (m *Request) ReturnToVTPool() {
if m != nil {
m.ResetVT()
vtprotoPool_Request.Put(m)
}
}
func RequestFromVTPool() *Request {
return vtprotoPool_Request.Get().(*Request)
}
var vtprotoPool_TimeSeries = sync.Pool{
New: func() interface{} {
return &TimeSeries{}
},
}
func (m *TimeSeries) ResetVT() {
if m != nil {
f0 := m.LabelsRefs[:0]
for _, mm := range m.Samples {
mm.ResetVT()
}
f1 := m.Samples[:0]
for _, mm := range m.Histograms {
mm.ResetVT()
}
f2 := m.Histograms[:0]
for _, mm := range m.Exemplars {
mm.ResetVT()
}
f3 := m.Exemplars[:0]
m.Metadata.ReturnToVTPool()
m.Reset()
m.LabelsRefs = f0
m.Samples = f1
m.Histograms = f2
m.Exemplars = f3
}
}
func (m *TimeSeries) ReturnToVTPool() {
if m != nil {
m.ResetVT()
vtprotoPool_TimeSeries.Put(m)
}
}
func TimeSeriesFromVTPool() *TimeSeries {
return vtprotoPool_TimeSeries.Get().(*TimeSeries)
}
var vtprotoPool_Exemplar = sync.Pool{
New: func() interface{} {
return &Exemplar{}
},
}
func (m *Exemplar) ResetVT() {
if m != nil {
f0 := m.LabelsRefs[:0]
m.Reset()
m.LabelsRefs = f0
}
}
func (m *Exemplar) ReturnToVTPool() {
if m != nil {
m.ResetVT()
vtprotoPool_Exemplar.Put(m)
}
}
func ExemplarFromVTPool() *Exemplar {
return vtprotoPool_Exemplar.Get().(*Exemplar)
}
var vtprotoPool_Sample = sync.Pool{
New: func() interface{} {
return &Sample{}
},
}
func (m *Sample) ResetVT() {
if m != nil {
m.Reset()
}
}
func (m *Sample) ReturnToVTPool() {
if m != nil {
m.ResetVT()
vtprotoPool_Sample.Put(m)
}
}
func SampleFromVTPool() *Sample {
return vtprotoPool_Sample.Get().(*Sample)
}
var vtprotoPool_Metadata = sync.Pool{
New: func() interface{} {
return &Metadata{}
},
}
func (m *Metadata) ResetVT() {
if m != nil {
m.Reset()
}
}
func (m *Metadata) ReturnToVTPool() {
if m != nil {
m.ResetVT()
vtprotoPool_Metadata.Put(m)
}
}
func MetadataFromVTPool() *Metadata {
return vtprotoPool_Metadata.Get().(*Metadata)
}
var vtprotoPool_Histogram = sync.Pool{
New: func() interface{} {
return &Histogram{}
},
}
func (m *Histogram) ResetVT() {
if m != nil {
for _, mm := range m.NegativeSpans {
mm.Reset()
}
f0 := m.NegativeSpans[:0]
f1 := m.NegativeDeltas[:0]
f2 := m.NegativeCounts[:0]
for _, mm := range m.PositiveSpans {
mm.Reset()
}
f3 := m.PositiveSpans[:0]
f4 := m.PositiveDeltas[:0]
f5 := m.PositiveCounts[:0]
f6 := m.CustomValues[:0]
m.Reset()
m.NegativeSpans = f0
m.NegativeDeltas = f1
m.NegativeCounts = f2
m.PositiveSpans = f3
m.PositiveDeltas = f4
m.PositiveCounts = f5
m.CustomValues = f6
}
}
func (m *Histogram) ReturnToVTPool() {
if m != nil {
m.ResetVT()
vtprotoPool_Histogram.Put(m)
}
}
func HistogramFromVTPool() *Histogram {
return vtprotoPool_Histogram.Get().(*Histogram)
}
func (m *Request) SizeVT() (n int) { func (m *Request) SizeVT() (n int) {
if m == nil { if m == nil {
return 0 return 0
@ -2204,7 +2377,14 @@ func (m *Request) UnmarshalVT(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Timeseries = append(m.Timeseries, &TimeSeries{}) if len(m.Timeseries) == cap(m.Timeseries) {
m.Timeseries = append(m.Timeseries, &TimeSeries{})
} else {
m.Timeseries = m.Timeseries[:len(m.Timeseries)+1]
if m.Timeseries[len(m.Timeseries)-1] == nil {
m.Timeseries[len(m.Timeseries)-1] = &TimeSeries{}
}
}
if err := m.Timeseries[len(m.Timeseries)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { if err := m.Timeseries[len(m.Timeseries)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -2312,7 +2492,7 @@ func (m *TimeSeries) UnmarshalVT(dAtA []byte) error {
} }
} }
elementCount = count elementCount = count
if elementCount != 0 && len(m.LabelsRefs) == 0 { if elementCount != 0 && len(m.LabelsRefs) == 0 && cap(m.LabelsRefs) < elementCount {
m.LabelsRefs = make([]uint32, 0, elementCount) m.LabelsRefs = make([]uint32, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -2365,7 +2545,14 @@ func (m *TimeSeries) UnmarshalVT(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Samples = append(m.Samples, &Sample{}) if len(m.Samples) == cap(m.Samples) {
m.Samples = append(m.Samples, &Sample{})
} else {
m.Samples = m.Samples[:len(m.Samples)+1]
if m.Samples[len(m.Samples)-1] == nil {
m.Samples[len(m.Samples)-1] = &Sample{}
}
}
if err := m.Samples[len(m.Samples)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { if err := m.Samples[len(m.Samples)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -2399,7 +2586,14 @@ func (m *TimeSeries) UnmarshalVT(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Histograms = append(m.Histograms, &Histogram{}) if len(m.Histograms) == cap(m.Histograms) {
m.Histograms = append(m.Histograms, &Histogram{})
} else {
m.Histograms = m.Histograms[:len(m.Histograms)+1]
if m.Histograms[len(m.Histograms)-1] == nil {
m.Histograms[len(m.Histograms)-1] = &Histogram{}
}
}
if err := m.Histograms[len(m.Histograms)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { if err := m.Histograms[len(m.Histograms)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -2433,7 +2627,14 @@ func (m *TimeSeries) UnmarshalVT(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Exemplars = append(m.Exemplars, &Exemplar{}) if len(m.Exemplars) == cap(m.Exemplars) {
m.Exemplars = append(m.Exemplars, &Exemplar{})
} else {
m.Exemplars = m.Exemplars[:len(m.Exemplars)+1]
if m.Exemplars[len(m.Exemplars)-1] == nil {
m.Exemplars[len(m.Exemplars)-1] = &Exemplar{}
}
}
if err := m.Exemplars[len(m.Exemplars)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { if err := m.Exemplars[len(m.Exemplars)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -2468,7 +2669,7 @@ func (m *TimeSeries) UnmarshalVT(dAtA []byte) error {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
if m.Metadata == nil { if m.Metadata == nil {
m.Metadata = &Metadata{} m.Metadata = MetadataFromVTPool()
} }
if err := m.Metadata.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { if err := m.Metadata.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err return err
@ -2596,7 +2797,7 @@ func (m *Exemplar) UnmarshalVT(dAtA []byte) error {
} }
} }
elementCount = count elementCount = count
if elementCount != 0 && len(m.LabelsRefs) == 0 { if elementCount != 0 && len(m.LabelsRefs) == 0 && cap(m.LabelsRefs) < elementCount {
m.LabelsRefs = make([]uint32, 0, elementCount) m.LabelsRefs = make([]uint32, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -3024,7 +3225,14 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.NegativeSpans = append(m.NegativeSpans, &BucketSpan{}) if len(m.NegativeSpans) == cap(m.NegativeSpans) {
m.NegativeSpans = append(m.NegativeSpans, &BucketSpan{})
} else {
m.NegativeSpans = m.NegativeSpans[:len(m.NegativeSpans)+1]
if m.NegativeSpans[len(m.NegativeSpans)-1] == nil {
m.NegativeSpans[len(m.NegativeSpans)-1] = &BucketSpan{}
}
}
if err := m.NegativeSpans[len(m.NegativeSpans)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { if err := m.NegativeSpans[len(m.NegativeSpans)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -3082,7 +3290,7 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error {
} }
} }
elementCount = count elementCount = count
if elementCount != 0 && len(m.NegativeDeltas) == 0 { if elementCount != 0 && len(m.NegativeDeltas) == 0 && cap(m.NegativeDeltas) < elementCount {
m.NegativeDeltas = make([]int64, 0, elementCount) m.NegativeDeltas = make([]int64, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -3145,7 +3353,7 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error {
} }
var elementCount int var elementCount int
elementCount = packedLen / 8 elementCount = packedLen / 8
if elementCount != 0 && len(m.NegativeCounts) == 0 { if elementCount != 0 && len(m.NegativeCounts) == 0 && cap(m.NegativeCounts) < elementCount {
m.NegativeCounts = make([]float64, 0, elementCount) m.NegativeCounts = make([]float64, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -3190,7 +3398,14 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.PositiveSpans = append(m.PositiveSpans, &BucketSpan{}) if len(m.PositiveSpans) == cap(m.PositiveSpans) {
m.PositiveSpans = append(m.PositiveSpans, &BucketSpan{})
} else {
m.PositiveSpans = m.PositiveSpans[:len(m.PositiveSpans)+1]
if m.PositiveSpans[len(m.PositiveSpans)-1] == nil {
m.PositiveSpans[len(m.PositiveSpans)-1] = &BucketSpan{}
}
}
if err := m.PositiveSpans[len(m.PositiveSpans)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { if err := m.PositiveSpans[len(m.PositiveSpans)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -3248,7 +3463,7 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error {
} }
} }
elementCount = count elementCount = count
if elementCount != 0 && len(m.PositiveDeltas) == 0 { if elementCount != 0 && len(m.PositiveDeltas) == 0 && cap(m.PositiveDeltas) < elementCount {
m.PositiveDeltas = make([]int64, 0, elementCount) m.PositiveDeltas = make([]int64, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -3311,7 +3526,7 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error {
} }
var elementCount int var elementCount int
elementCount = packedLen / 8 elementCount = packedLen / 8
if elementCount != 0 && len(m.PositiveCounts) == 0 { if elementCount != 0 && len(m.PositiveCounts) == 0 && cap(m.PositiveCounts) < elementCount {
m.PositiveCounts = make([]float64, 0, elementCount) m.PositiveCounts = make([]float64, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -3403,7 +3618,7 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error {
} }
var elementCount int var elementCount int
elementCount = packedLen / 8 elementCount = packedLen / 8
if elementCount != 0 && len(m.CustomValues) == 0 { if elementCount != 0 && len(m.CustomValues) == 0 && cap(m.CustomValues) < elementCount {
m.CustomValues = make([]float64, 0, elementCount) m.CustomValues = make([]float64, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -3626,7 +3841,14 @@ func (m *Request) UnmarshalVTUnsafe(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Timeseries = append(m.Timeseries, &TimeSeries{}) if len(m.Timeseries) == cap(m.Timeseries) {
m.Timeseries = append(m.Timeseries, &TimeSeries{})
} else {
m.Timeseries = m.Timeseries[:len(m.Timeseries)+1]
if m.Timeseries[len(m.Timeseries)-1] == nil {
m.Timeseries[len(m.Timeseries)-1] = &TimeSeries{}
}
}
if err := m.Timeseries[len(m.Timeseries)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { if err := m.Timeseries[len(m.Timeseries)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -3734,7 +3956,7 @@ func (m *TimeSeries) UnmarshalVTUnsafe(dAtA []byte) error {
} }
} }
elementCount = count elementCount = count
if elementCount != 0 && len(m.LabelsRefs) == 0 { if elementCount != 0 && len(m.LabelsRefs) == 0 && cap(m.LabelsRefs) < elementCount {
m.LabelsRefs = make([]uint32, 0, elementCount) m.LabelsRefs = make([]uint32, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -3787,7 +4009,14 @@ func (m *TimeSeries) UnmarshalVTUnsafe(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Samples = append(m.Samples, &Sample{}) if len(m.Samples) == cap(m.Samples) {
m.Samples = append(m.Samples, &Sample{})
} else {
m.Samples = m.Samples[:len(m.Samples)+1]
if m.Samples[len(m.Samples)-1] == nil {
m.Samples[len(m.Samples)-1] = &Sample{}
}
}
if err := m.Samples[len(m.Samples)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { if err := m.Samples[len(m.Samples)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -3821,7 +4050,14 @@ func (m *TimeSeries) UnmarshalVTUnsafe(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Histograms = append(m.Histograms, &Histogram{}) if len(m.Histograms) == cap(m.Histograms) {
m.Histograms = append(m.Histograms, &Histogram{})
} else {
m.Histograms = m.Histograms[:len(m.Histograms)+1]
if m.Histograms[len(m.Histograms)-1] == nil {
m.Histograms[len(m.Histograms)-1] = &Histogram{}
}
}
if err := m.Histograms[len(m.Histograms)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { if err := m.Histograms[len(m.Histograms)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -3855,7 +4091,14 @@ func (m *TimeSeries) UnmarshalVTUnsafe(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Exemplars = append(m.Exemplars, &Exemplar{}) if len(m.Exemplars) == cap(m.Exemplars) {
m.Exemplars = append(m.Exemplars, &Exemplar{})
} else {
m.Exemplars = m.Exemplars[:len(m.Exemplars)+1]
if m.Exemplars[len(m.Exemplars)-1] == nil {
m.Exemplars[len(m.Exemplars)-1] = &Exemplar{}
}
}
if err := m.Exemplars[len(m.Exemplars)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { if err := m.Exemplars[len(m.Exemplars)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -3890,7 +4133,7 @@ func (m *TimeSeries) UnmarshalVTUnsafe(dAtA []byte) error {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
if m.Metadata == nil { if m.Metadata == nil {
m.Metadata = &Metadata{} m.Metadata = MetadataFromVTPool()
} }
if err := m.Metadata.UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { if err := m.Metadata.UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil {
return err return err
@ -4018,7 +4261,7 @@ func (m *Exemplar) UnmarshalVTUnsafe(dAtA []byte) error {
} }
} }
elementCount = count elementCount = count
if elementCount != 0 && len(m.LabelsRefs) == 0 { if elementCount != 0 && len(m.LabelsRefs) == 0 && cap(m.LabelsRefs) < elementCount {
m.LabelsRefs = make([]uint32, 0, elementCount) m.LabelsRefs = make([]uint32, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -4446,7 +4689,14 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.NegativeSpans = append(m.NegativeSpans, &BucketSpan{}) if len(m.NegativeSpans) == cap(m.NegativeSpans) {
m.NegativeSpans = append(m.NegativeSpans, &BucketSpan{})
} else {
m.NegativeSpans = m.NegativeSpans[:len(m.NegativeSpans)+1]
if m.NegativeSpans[len(m.NegativeSpans)-1] == nil {
m.NegativeSpans[len(m.NegativeSpans)-1] = &BucketSpan{}
}
}
if err := m.NegativeSpans[len(m.NegativeSpans)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { if err := m.NegativeSpans[len(m.NegativeSpans)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -4504,7 +4754,7 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error {
} }
} }
elementCount = count elementCount = count
if elementCount != 0 && len(m.NegativeDeltas) == 0 { if elementCount != 0 && len(m.NegativeDeltas) == 0 && cap(m.NegativeDeltas) < elementCount {
m.NegativeDeltas = make([]int64, 0, elementCount) m.NegativeDeltas = make([]int64, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -4567,7 +4817,7 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error {
} }
var elementCount int var elementCount int
elementCount = packedLen / 8 elementCount = packedLen / 8
if elementCount != 0 && len(m.NegativeCounts) == 0 { if elementCount != 0 && len(m.NegativeCounts) == 0 && cap(m.NegativeCounts) < elementCount {
m.NegativeCounts = make([]float64, 0, elementCount) m.NegativeCounts = make([]float64, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -4612,7 +4862,14 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.PositiveSpans = append(m.PositiveSpans, &BucketSpan{}) if len(m.PositiveSpans) == cap(m.PositiveSpans) {
m.PositiveSpans = append(m.PositiveSpans, &BucketSpan{})
} else {
m.PositiveSpans = m.PositiveSpans[:len(m.PositiveSpans)+1]
if m.PositiveSpans[len(m.PositiveSpans)-1] == nil {
m.PositiveSpans[len(m.PositiveSpans)-1] = &BucketSpan{}
}
}
if err := m.PositiveSpans[len(m.PositiveSpans)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { if err := m.PositiveSpans[len(m.PositiveSpans)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil {
return err return err
} }
@ -4670,7 +4927,7 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error {
} }
} }
elementCount = count elementCount = count
if elementCount != 0 && len(m.PositiveDeltas) == 0 { if elementCount != 0 && len(m.PositiveDeltas) == 0 && cap(m.PositiveDeltas) < elementCount {
m.PositiveDeltas = make([]int64, 0, elementCount) m.PositiveDeltas = make([]int64, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -4733,7 +4990,7 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error {
} }
var elementCount int var elementCount int
elementCount = packedLen / 8 elementCount = packedLen / 8
if elementCount != 0 && len(m.PositiveCounts) == 0 { if elementCount != 0 && len(m.PositiveCounts) == 0 && cap(m.PositiveCounts) < elementCount {
m.PositiveCounts = make([]float64, 0, elementCount) m.PositiveCounts = make([]float64, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {
@ -4825,7 +5082,7 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error {
} }
var elementCount int var elementCount int
elementCount = packedLen / 8 elementCount = packedLen / 8
if elementCount != 0 && len(m.CustomValues) == 0 { if elementCount != 0 && len(m.CustomValues) == 0 && cap(m.CustomValues) < elementCount {
m.CustomValues = make([]float64, 0, elementCount) m.CustomValues = make([]float64, 0, elementCount)
} }
for iNdEx < postIndex { for iNdEx < postIndex {

View file

@ -278,13 +278,13 @@ func TestReadClient(t *testing.T) {
require.True(t, ok) require.True(t, ok)
cw := NewChunkedWriter(w, flusher) cw := NewChunkedWriter(w, flusher)
l := []prompb.Label{ l := []*prompb.Label{
{Name: "foo", Value: "bar"}, {Name: "foo", Value: "bar"},
} }
chunks := buildTestChunks(t) chunks := buildTestChunks(t)
for i, c := range chunks { for i, c := range chunks {
cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}} cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []*prompb.Chunk{c}}
readResp := prompb.ChunkedReadResponse{ readResp := prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{&cSeries}, ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
QueryIndex: int64(i), QueryIndex: int64(i),
@ -407,24 +407,24 @@ func sampledResponseHTTPHandler(t *testing.T) http.HandlerFunc {
{ {
Timeseries: []*prompb.TimeSeries{ Timeseries: []*prompb.TimeSeries{
{ {
Labels: []prompb.Label{ Labels: []*prompb.Label{
{Name: "foo2", Value: "bar"}, {Name: "foo2", Value: "bar"},
}, },
Samples: []prompb.Sample{ Samples: []*prompb.Sample{
{Value: float64(1), Timestamp: int64(0)}, {Value: float64(1), Timestamp: int64(0)},
{Value: float64(2), Timestamp: int64(5)}, {Value: float64(2), Timestamp: int64(5)},
}, },
Exemplars: []prompb.Exemplar{}, Exemplars: []*prompb.Exemplar{},
}, },
{ {
Labels: []prompb.Label{ Labels: []*prompb.Label{
{Name: "foo1", Value: "bar"}, {Name: "foo1", Value: "bar"},
}, },
Samples: []prompb.Sample{ Samples: []*prompb.Sample{
{Value: float64(3), Timestamp: int64(0)}, {Value: float64(3), Timestamp: int64(0)},
{Value: float64(4), Timestamp: int64(5)}, {Value: float64(4), Timestamp: int64(5)},
}, },
Exemplars: []prompb.Exemplar{}, Exemplars: []*prompb.Exemplar{},
}, },
}, },
}, },

View file

@ -234,7 +234,7 @@ func StreamChunkedReadResponses(
for ss.Next() { for ss.Next() {
series := ss.At() series := ss.At()
iter = series.Iterator(iter) iter = series.Iterator(iter)
lbls = MergeLabels(prompb.FromLabels(series.Labels(), lbls), sortedExternalLabels) lbls = MergeLabels(prompb.FromLabels(series.Labels(), lbls), sortedExternalLabels)
maxDataLength := maxBytesInFrame maxDataLength := maxBytesInFrame
for _, lbl := range lbls { for _, lbl := range lbls {
@ -863,7 +863,7 @@ func DecodeWriteV2Request(r io.Reader) (*writev2.Request, error) {
} }
var req writev2.Request var req writev2.Request
if err := proto.Unmarshal(reqBuf, &req); err != nil { if err := req.UnmarshalVT(reqBuf); err != nil {
return nil, err return nil, err
} }

View file

@ -579,7 +579,7 @@ func TestDecodeWriteRequest(t *testing.T) {
actual, err := DecodeWriteRequest(bytes.NewReader(buf)) actual, err := DecodeWriteRequest(bytes.NewReader(buf))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, writeRequestFixture, actual) require.True(t, proto.Equal(writeRequestFixture, actual))
} }
func TestDecodeWriteV2Request(t *testing.T) { func TestDecodeWriteV2Request(t *testing.T) {
@ -799,14 +799,14 @@ func TestChunkedSeriesIterator(t *testing.T) {
}) })
t.Run("empty chunks", func(t *testing.T) { t.Run("empty chunks", func(t *testing.T) {
emptyChunks := make([]prompb.Chunk, 0) emptyChunks := make([]*prompb.Chunk, 0)
it1 := newChunkedSeriesIterator(emptyChunks, 0, 1000) it1 := newChunkedSeriesIterator(emptyChunks, 0, 1000)
require.Equal(t, chunkenc.ValNone, it1.Next()) require.Equal(t, chunkenc.ValNone, it1.Next())
require.Equal(t, chunkenc.ValNone, it1.Seek(1000)) require.Equal(t, chunkenc.ValNone, it1.Seek(1000))
require.NoError(t, it1.Err()) require.NoError(t, it1.Err())
var nilChunks []prompb.Chunk var nilChunks []*prompb.Chunk
it2 := newChunkedSeriesIterator(nilChunks, 0, 1000) it2 := newChunkedSeriesIterator(nilChunks, 0, 1000)
require.Equal(t, chunkenc.ValNone, it2.Next()) require.Equal(t, chunkenc.ValNone, it2.Next())
@ -821,7 +821,7 @@ func TestChunkedSeries(t *testing.T) {
s := chunkedSeries{ s := chunkedSeries{
ChunkedSeries: prompb.ChunkedSeries{ ChunkedSeries: prompb.ChunkedSeries{
Labels: []prompb.Label{ Labels: []*prompb.Label{
{Name: "foo", Value: "bar"}, {Name: "foo", Value: "bar"},
{Name: "asdf", Value: "zxcv"}, {Name: "asdf", Value: "zxcv"},
}, },
@ -852,12 +852,12 @@ func TestChunkedSeriesSet(t *testing.T) {
r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil) r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil)
chks := buildTestChunks(t) chks := buildTestChunks(t)
l := []prompb.Label{ l := []*prompb.Label{
{Name: "foo", Value: "bar"}, {Name: "foo", Value: "bar"},
} }
for i, c := range chks { for i, c := range chks {
cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}} cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []*prompb.Chunk{c}}
readResp := prompb.ChunkedReadResponse{ readResp := prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{&cSeries}, ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
QueryIndex: int64(i), QueryIndex: int64(i),
@ -905,12 +905,12 @@ func TestChunkedSeriesSet(t *testing.T) {
r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil) r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil)
chks := buildTestChunks(t) chks := buildTestChunks(t)
l := []prompb.Label{ l := []*prompb.Label{
{Name: "foo", Value: "bar"}, {Name: "foo", Value: "bar"},
} }
for i, c := range chks { for i, c := range chks {
cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}} cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []*prompb.Chunk{c}}
readResp := prompb.ChunkedReadResponse{ readResp := prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{&cSeries}, ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
QueryIndex: int64(i), QueryIndex: int64(i),
@ -945,9 +945,9 @@ const (
numSamplesPerTestChunk = 5 numSamplesPerTestChunk = 5
) )
func buildTestChunks(t *testing.T) []prompb.Chunk { func buildTestChunks(t *testing.T) []*prompb.Chunk {
startTime := int64(0) startTime := int64(0)
chks := make([]prompb.Chunk, 0, numTestChunks) chks := make([]*prompb.Chunk, 0, numTestChunks)
time := startTime time := startTime
@ -964,7 +964,7 @@ func buildTestChunks(t *testing.T) []prompb.Chunk {
time += int64(1000) time += int64(1000)
} }
chks = append(chks, prompb.Chunk{ chks = append(chks, &prompb.Chunk{
MinTimeMs: minTimeMs, MinTimeMs: minTimeMs,
MaxTimeMs: time, MaxTimeMs: time,
Type: prompb.Chunk_XOR, Type: prompb.Chunk_XOR,

View file

@ -2203,16 +2203,21 @@ func buildV2WriteRequest(logger log.Logger, samples []*writev2.TimeSeries, label
level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms) level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
} }
req := &writev2.Request{ //req := &writev2.Request{
Symbols: labels, // Symbols: labels,
Timeseries: timeSeries, // Timeseries: timeSeries,
} //}
req := writev2.RequestFromVTPool()
req.Symbols = labels
req.Timeseries = timeSeries
if pBuf == nil { if pBuf == nil {
pBuf = &[]byte{} // For convenience in tests. Not efficient. pBuf = &[]byte{} // For convenience in tests. Not efficient.
} }
data, err := req.MarshalVT() data, err := req.MarshalVT()
req.Symbols = []string{}
req.Timeseries = []*writev2.TimeSeries{}
req.ReturnToVTPool()
if err != nil { if err != nil {
return nil, highest, lowest, err return nil, highest, lowest, err
} }

View file

@ -190,15 +190,17 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
// Remote Write 2.x proto message handling. // Remote Write 2.x proto message handling.
var req writev2.Request req := writev2.RequestFromVTPool()
if err := proto.Unmarshal(decompressed, &req); err != nil { // Timeseries as well
if err := proto.Unmarshal(decompressed, req); err != nil {
// TODO(bwplotka): Add more context to responded error? // TODO(bwplotka): Add more context to responded error?
level.Error(h.logger).Log("msg", "Error decoding v2 remote write request", "protobuf_message", msgType, "err", err.Error()) level.Error(h.logger).Log("msg", "Error decoding v2 remote write request", "protobuf_message", msgType, "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
respStats, errHTTPCode, err := h.writeV2(r.Context(), &req) respStats, errHTTPCode, err := h.writeV2(r.Context(), req)
req.ReturnToVTPool()
// Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases. // Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases.
respStats.SetHeaders(w) respStats.SetHeaders(w)

View file

@ -342,7 +342,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
desc: "Partial write; first series with duplicate labels", desc: "Partial write; first series with duplicate labels",
input: append( input: append(
// Series with __name__="test_metric1",test_metric1="test_metric1",test_metric1="test_metric1" labels. // Series with __name__="test_metric1",test_metric1="test_metric1",test_metric1="test_metric1" labels.
[]writev2.TimeSeries{{LabelsRefs: []uint32{1, 2, 2, 2, 2, 2}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}}, []*writev2.TimeSeries{{LabelsRefs: []uint32{1, 2, 2, 2, 2, 2}, Samples: []*writev2.Sample{{Value: 1, Timestamp: 1}}}},
writeV2RequestFixture.Timeseries...), writeV2RequestFixture.Timeseries...),
expectedCode: http.StatusBadRequest, expectedCode: http.StatusBadRequest,
expectedRespBody: "invalid labels for series, labels {__name__=\"test_metric1\", test_metric1=\"test_metric1\", test_metric1=\"test_metric1\"}, duplicated label test_metric1\n", expectedRespBody: "invalid labels for series, labels {__name__=\"test_metric1\", test_metric1=\"test_metric1\", test_metric1=\"test_metric1\"}, duplicated label test_metric1\n",