From a74bec7b259728baf9c23f5c07501daadb2bfb3c Mon Sep 17 00:00:00 2001 From: Junang Li Date: Tue, 10 Sep 2024 22:08:31 -0400 Subject: [PATCH] WIP --- prompb/buf.gen.yaml | 21 +- .../prometheus/write/v2/types_vtproto.pb.go | 325 ++++++++++++++++-- storage/remote/client_test.go | 16 +- storage/remote/codec.go | 4 +- storage/remote/codec_test.go | 22 +- storage/remote/queue_manager.go | 15 +- storage/remote/write_handler.go | 8 +- storage/remote/write_handler_test.go | 2 +- 8 files changed, 348 insertions(+), 65 deletions(-) diff --git a/prompb/buf.gen.yaml b/prompb/buf.gen.yaml index 6c1a17f2c6..078607a648 100644 --- a/prompb/buf.gen.yaml +++ b/prompb/buf.gen.yaml @@ -15,4 +15,23 @@ plugins: - plugin: buf.build/community/planetscale-vtprotobuf:v0.6.0 out: . opt: - - paths=source_relative \ No newline at end of file + - paths=source_relative + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Request + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.TimeSeries + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Sample + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Histogram + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Exemplar + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Label + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Metadata + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.LabelPair + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.LabelValue + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.LabelName + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Type + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_XOR + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Delta + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Raw + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Zstd + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Snappy + - pool=github.com/prometheus/prometheus/prompb/io/prometheus/write/v2.Chunk_Lz4 + diff --git a/prompb/io/prometheus/write/v2/types_vtproto.pb.go b/prompb/io/prometheus/write/v2/types_vtproto.pb.go index 7be1e9588a..2be75c2d70 100644 --- a/prompb/io/prometheus/write/v2/types_vtproto.pb.go +++ b/prompb/io/prometheus/write/v2/types_vtproto.pb.go @@ -12,6 +12,7 @@ import ( protoimpl "google.golang.org/protobuf/runtime/protoimpl" io "io" math "math" + sync "sync" unsafe "unsafe" ) @@ -26,7 +27,7 @@ func (m *Request) CloneVT() *Request { if m == nil { return (*Request)(nil) } - r := new(Request) + r := RequestFromVTPool() if rhs := m.Symbols; rhs != nil { tmpContainer := make([]string, len(rhs)) copy(tmpContainer, rhs) @@ -54,7 +55,7 @@ func (m *TimeSeries) CloneVT() *TimeSeries { if m == nil { return (*TimeSeries)(nil) } - r := new(TimeSeries) + r := TimeSeriesFromVTPool() r.Metadata = m.Metadata.CloneVT() r.CreatedTimestamp = m.CreatedTimestamp if rhs := m.LabelsRefs; rhs != nil { @@ -98,7 +99,7 @@ func (m *Exemplar) CloneVT() *Exemplar { if m == nil { return (*Exemplar)(nil) } - r := new(Exemplar) + r := ExemplarFromVTPool() r.Value = m.Value r.Timestamp = m.Timestamp if rhs := m.LabelsRefs; rhs != nil { @@ -121,7 +122,7 @@ func (m *Sample) CloneVT() *Sample { if m == nil { return (*Sample)(nil) } - r := new(Sample) + r := SampleFromVTPool() r.Value = m.Value r.Timestamp = m.Timestamp if len(m.unknownFields) > 0 { @@ -139,7 +140,7 @@ func (m *Metadata) CloneVT() *Metadata { if m == nil { return (*Metadata)(nil) } - r := new(Metadata) + r := MetadataFromVTPool() r.Type = m.Type r.HelpRef = m.HelpRef r.UnitRef = m.UnitRef @@ -158,7 +159,7 @@ func (m *Histogram) CloneVT() *Histogram { if m == nil { return (*Histogram)(nil) } - r := new(Histogram) + r := HistogramFromVTPool() r.Sum = m.Sum r.Schema = m.Schema r.ZeroThreshold = m.ZeroThreshold @@ -1874,6 +1875,178 @@ func (m *BucketSpan) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +var vtprotoPool_Request = sync.Pool{ + New: func() interface{} { + return &Request{} + }, +} + +func (m *Request) ResetVT() { + if m != nil { + f0 := m.Symbols[:0] + for _, mm := range m.Timeseries { + mm.ResetVT() + } + f1 := m.Timeseries[:0] + m.Reset() + m.Symbols = f0 + m.Timeseries = f1 + } +} +func (m *Request) ReturnToVTPool() { + if m != nil { + m.ResetVT() + vtprotoPool_Request.Put(m) + } +} +func RequestFromVTPool() *Request { + return vtprotoPool_Request.Get().(*Request) +} + +var vtprotoPool_TimeSeries = sync.Pool{ + New: func() interface{} { + return &TimeSeries{} + }, +} + +func (m *TimeSeries) ResetVT() { + if m != nil { + f0 := m.LabelsRefs[:0] + for _, mm := range m.Samples { + mm.ResetVT() + } + f1 := m.Samples[:0] + for _, mm := range m.Histograms { + mm.ResetVT() + } + f2 := m.Histograms[:0] + for _, mm := range m.Exemplars { + mm.ResetVT() + } + f3 := m.Exemplars[:0] + m.Metadata.ReturnToVTPool() + m.Reset() + m.LabelsRefs = f0 + m.Samples = f1 + m.Histograms = f2 + m.Exemplars = f3 + } +} +func (m *TimeSeries) ReturnToVTPool() { + if m != nil { + m.ResetVT() + vtprotoPool_TimeSeries.Put(m) + } +} +func TimeSeriesFromVTPool() *TimeSeries { + return vtprotoPool_TimeSeries.Get().(*TimeSeries) +} + +var vtprotoPool_Exemplar = sync.Pool{ + New: func() interface{} { + return &Exemplar{} + }, +} + +func (m *Exemplar) ResetVT() { + if m != nil { + f0 := m.LabelsRefs[:0] + m.Reset() + m.LabelsRefs = f0 + } +} +func (m *Exemplar) ReturnToVTPool() { + if m != nil { + m.ResetVT() + vtprotoPool_Exemplar.Put(m) + } +} +func ExemplarFromVTPool() *Exemplar { + return vtprotoPool_Exemplar.Get().(*Exemplar) +} + +var vtprotoPool_Sample = sync.Pool{ + New: func() interface{} { + return &Sample{} + }, +} + +func (m *Sample) ResetVT() { + if m != nil { + m.Reset() + } +} +func (m *Sample) ReturnToVTPool() { + if m != nil { + m.ResetVT() + vtprotoPool_Sample.Put(m) + } +} +func SampleFromVTPool() *Sample { + return vtprotoPool_Sample.Get().(*Sample) +} + +var vtprotoPool_Metadata = sync.Pool{ + New: func() interface{} { + return &Metadata{} + }, +} + +func (m *Metadata) ResetVT() { + if m != nil { + m.Reset() + } +} +func (m *Metadata) ReturnToVTPool() { + if m != nil { + m.ResetVT() + vtprotoPool_Metadata.Put(m) + } +} +func MetadataFromVTPool() *Metadata { + return vtprotoPool_Metadata.Get().(*Metadata) +} + +var vtprotoPool_Histogram = sync.Pool{ + New: func() interface{} { + return &Histogram{} + }, +} + +func (m *Histogram) ResetVT() { + if m != nil { + for _, mm := range m.NegativeSpans { + mm.Reset() + } + f0 := m.NegativeSpans[:0] + f1 := m.NegativeDeltas[:0] + f2 := m.NegativeCounts[:0] + for _, mm := range m.PositiveSpans { + mm.Reset() + } + f3 := m.PositiveSpans[:0] + f4 := m.PositiveDeltas[:0] + f5 := m.PositiveCounts[:0] + f6 := m.CustomValues[:0] + m.Reset() + m.NegativeSpans = f0 + m.NegativeDeltas = f1 + m.NegativeCounts = f2 + m.PositiveSpans = f3 + m.PositiveDeltas = f4 + m.PositiveCounts = f5 + m.CustomValues = f6 + } +} +func (m *Histogram) ReturnToVTPool() { + if m != nil { + m.ResetVT() + vtprotoPool_Histogram.Put(m) + } +} +func HistogramFromVTPool() *Histogram { + return vtprotoPool_Histogram.Get().(*Histogram) +} func (m *Request) SizeVT() (n int) { if m == nil { return 0 @@ -2204,7 +2377,14 @@ func (m *Request) UnmarshalVT(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Timeseries = append(m.Timeseries, &TimeSeries{}) + if len(m.Timeseries) == cap(m.Timeseries) { + m.Timeseries = append(m.Timeseries, &TimeSeries{}) + } else { + m.Timeseries = m.Timeseries[:len(m.Timeseries)+1] + if m.Timeseries[len(m.Timeseries)-1] == nil { + m.Timeseries[len(m.Timeseries)-1] = &TimeSeries{} + } + } if err := m.Timeseries[len(m.Timeseries)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -2312,7 +2492,7 @@ func (m *TimeSeries) UnmarshalVT(dAtA []byte) error { } } elementCount = count - if elementCount != 0 && len(m.LabelsRefs) == 0 { + if elementCount != 0 && len(m.LabelsRefs) == 0 && cap(m.LabelsRefs) < elementCount { m.LabelsRefs = make([]uint32, 0, elementCount) } for iNdEx < postIndex { @@ -2365,7 +2545,14 @@ func (m *TimeSeries) UnmarshalVT(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Samples = append(m.Samples, &Sample{}) + if len(m.Samples) == cap(m.Samples) { + m.Samples = append(m.Samples, &Sample{}) + } else { + m.Samples = m.Samples[:len(m.Samples)+1] + if m.Samples[len(m.Samples)-1] == nil { + m.Samples[len(m.Samples)-1] = &Sample{} + } + } if err := m.Samples[len(m.Samples)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -2399,7 +2586,14 @@ func (m *TimeSeries) UnmarshalVT(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Histograms = append(m.Histograms, &Histogram{}) + if len(m.Histograms) == cap(m.Histograms) { + m.Histograms = append(m.Histograms, &Histogram{}) + } else { + m.Histograms = m.Histograms[:len(m.Histograms)+1] + if m.Histograms[len(m.Histograms)-1] == nil { + m.Histograms[len(m.Histograms)-1] = &Histogram{} + } + } if err := m.Histograms[len(m.Histograms)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -2433,7 +2627,14 @@ func (m *TimeSeries) UnmarshalVT(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Exemplars = append(m.Exemplars, &Exemplar{}) + if len(m.Exemplars) == cap(m.Exemplars) { + m.Exemplars = append(m.Exemplars, &Exemplar{}) + } else { + m.Exemplars = m.Exemplars[:len(m.Exemplars)+1] + if m.Exemplars[len(m.Exemplars)-1] == nil { + m.Exemplars[len(m.Exemplars)-1] = &Exemplar{} + } + } if err := m.Exemplars[len(m.Exemplars)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -2468,7 +2669,7 @@ func (m *TimeSeries) UnmarshalVT(dAtA []byte) error { return io.ErrUnexpectedEOF } if m.Metadata == nil { - m.Metadata = &Metadata{} + m.Metadata = MetadataFromVTPool() } if err := m.Metadata.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { return err @@ -2596,7 +2797,7 @@ func (m *Exemplar) UnmarshalVT(dAtA []byte) error { } } elementCount = count - if elementCount != 0 && len(m.LabelsRefs) == 0 { + if elementCount != 0 && len(m.LabelsRefs) == 0 && cap(m.LabelsRefs) < elementCount { m.LabelsRefs = make([]uint32, 0, elementCount) } for iNdEx < postIndex { @@ -3024,7 +3225,14 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.NegativeSpans = append(m.NegativeSpans, &BucketSpan{}) + if len(m.NegativeSpans) == cap(m.NegativeSpans) { + m.NegativeSpans = append(m.NegativeSpans, &BucketSpan{}) + } else { + m.NegativeSpans = m.NegativeSpans[:len(m.NegativeSpans)+1] + if m.NegativeSpans[len(m.NegativeSpans)-1] == nil { + m.NegativeSpans[len(m.NegativeSpans)-1] = &BucketSpan{} + } + } if err := m.NegativeSpans[len(m.NegativeSpans)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -3082,7 +3290,7 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error { } } elementCount = count - if elementCount != 0 && len(m.NegativeDeltas) == 0 { + if elementCount != 0 && len(m.NegativeDeltas) == 0 && cap(m.NegativeDeltas) < elementCount { m.NegativeDeltas = make([]int64, 0, elementCount) } for iNdEx < postIndex { @@ -3145,7 +3353,7 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error { } var elementCount int elementCount = packedLen / 8 - if elementCount != 0 && len(m.NegativeCounts) == 0 { + if elementCount != 0 && len(m.NegativeCounts) == 0 && cap(m.NegativeCounts) < elementCount { m.NegativeCounts = make([]float64, 0, elementCount) } for iNdEx < postIndex { @@ -3190,7 +3398,14 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.PositiveSpans = append(m.PositiveSpans, &BucketSpan{}) + if len(m.PositiveSpans) == cap(m.PositiveSpans) { + m.PositiveSpans = append(m.PositiveSpans, &BucketSpan{}) + } else { + m.PositiveSpans = m.PositiveSpans[:len(m.PositiveSpans)+1] + if m.PositiveSpans[len(m.PositiveSpans)-1] == nil { + m.PositiveSpans[len(m.PositiveSpans)-1] = &BucketSpan{} + } + } if err := m.PositiveSpans[len(m.PositiveSpans)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -3248,7 +3463,7 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error { } } elementCount = count - if elementCount != 0 && len(m.PositiveDeltas) == 0 { + if elementCount != 0 && len(m.PositiveDeltas) == 0 && cap(m.PositiveDeltas) < elementCount { m.PositiveDeltas = make([]int64, 0, elementCount) } for iNdEx < postIndex { @@ -3311,7 +3526,7 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error { } var elementCount int elementCount = packedLen / 8 - if elementCount != 0 && len(m.PositiveCounts) == 0 { + if elementCount != 0 && len(m.PositiveCounts) == 0 && cap(m.PositiveCounts) < elementCount { m.PositiveCounts = make([]float64, 0, elementCount) } for iNdEx < postIndex { @@ -3403,7 +3618,7 @@ func (m *Histogram) UnmarshalVT(dAtA []byte) error { } var elementCount int elementCount = packedLen / 8 - if elementCount != 0 && len(m.CustomValues) == 0 { + if elementCount != 0 && len(m.CustomValues) == 0 && cap(m.CustomValues) < elementCount { m.CustomValues = make([]float64, 0, elementCount) } for iNdEx < postIndex { @@ -3626,7 +3841,14 @@ func (m *Request) UnmarshalVTUnsafe(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Timeseries = append(m.Timeseries, &TimeSeries{}) + if len(m.Timeseries) == cap(m.Timeseries) { + m.Timeseries = append(m.Timeseries, &TimeSeries{}) + } else { + m.Timeseries = m.Timeseries[:len(m.Timeseries)+1] + if m.Timeseries[len(m.Timeseries)-1] == nil { + m.Timeseries[len(m.Timeseries)-1] = &TimeSeries{} + } + } if err := m.Timeseries[len(m.Timeseries)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -3734,7 +3956,7 @@ func (m *TimeSeries) UnmarshalVTUnsafe(dAtA []byte) error { } } elementCount = count - if elementCount != 0 && len(m.LabelsRefs) == 0 { + if elementCount != 0 && len(m.LabelsRefs) == 0 && cap(m.LabelsRefs) < elementCount { m.LabelsRefs = make([]uint32, 0, elementCount) } for iNdEx < postIndex { @@ -3787,7 +4009,14 @@ func (m *TimeSeries) UnmarshalVTUnsafe(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Samples = append(m.Samples, &Sample{}) + if len(m.Samples) == cap(m.Samples) { + m.Samples = append(m.Samples, &Sample{}) + } else { + m.Samples = m.Samples[:len(m.Samples)+1] + if m.Samples[len(m.Samples)-1] == nil { + m.Samples[len(m.Samples)-1] = &Sample{} + } + } if err := m.Samples[len(m.Samples)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -3821,7 +4050,14 @@ func (m *TimeSeries) UnmarshalVTUnsafe(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Histograms = append(m.Histograms, &Histogram{}) + if len(m.Histograms) == cap(m.Histograms) { + m.Histograms = append(m.Histograms, &Histogram{}) + } else { + m.Histograms = m.Histograms[:len(m.Histograms)+1] + if m.Histograms[len(m.Histograms)-1] == nil { + m.Histograms[len(m.Histograms)-1] = &Histogram{} + } + } if err := m.Histograms[len(m.Histograms)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -3855,7 +4091,14 @@ func (m *TimeSeries) UnmarshalVTUnsafe(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Exemplars = append(m.Exemplars, &Exemplar{}) + if len(m.Exemplars) == cap(m.Exemplars) { + m.Exemplars = append(m.Exemplars, &Exemplar{}) + } else { + m.Exemplars = m.Exemplars[:len(m.Exemplars)+1] + if m.Exemplars[len(m.Exemplars)-1] == nil { + m.Exemplars[len(m.Exemplars)-1] = &Exemplar{} + } + } if err := m.Exemplars[len(m.Exemplars)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -3890,7 +4133,7 @@ func (m *TimeSeries) UnmarshalVTUnsafe(dAtA []byte) error { return io.ErrUnexpectedEOF } if m.Metadata == nil { - m.Metadata = &Metadata{} + m.Metadata = MetadataFromVTPool() } if err := m.Metadata.UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { return err @@ -4018,7 +4261,7 @@ func (m *Exemplar) UnmarshalVTUnsafe(dAtA []byte) error { } } elementCount = count - if elementCount != 0 && len(m.LabelsRefs) == 0 { + if elementCount != 0 && len(m.LabelsRefs) == 0 && cap(m.LabelsRefs) < elementCount { m.LabelsRefs = make([]uint32, 0, elementCount) } for iNdEx < postIndex { @@ -4446,7 +4689,14 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.NegativeSpans = append(m.NegativeSpans, &BucketSpan{}) + if len(m.NegativeSpans) == cap(m.NegativeSpans) { + m.NegativeSpans = append(m.NegativeSpans, &BucketSpan{}) + } else { + m.NegativeSpans = m.NegativeSpans[:len(m.NegativeSpans)+1] + if m.NegativeSpans[len(m.NegativeSpans)-1] == nil { + m.NegativeSpans[len(m.NegativeSpans)-1] = &BucketSpan{} + } + } if err := m.NegativeSpans[len(m.NegativeSpans)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -4504,7 +4754,7 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error { } } elementCount = count - if elementCount != 0 && len(m.NegativeDeltas) == 0 { + if elementCount != 0 && len(m.NegativeDeltas) == 0 && cap(m.NegativeDeltas) < elementCount { m.NegativeDeltas = make([]int64, 0, elementCount) } for iNdEx < postIndex { @@ -4567,7 +4817,7 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error { } var elementCount int elementCount = packedLen / 8 - if elementCount != 0 && len(m.NegativeCounts) == 0 { + if elementCount != 0 && len(m.NegativeCounts) == 0 && cap(m.NegativeCounts) < elementCount { m.NegativeCounts = make([]float64, 0, elementCount) } for iNdEx < postIndex { @@ -4612,7 +4862,14 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.PositiveSpans = append(m.PositiveSpans, &BucketSpan{}) + if len(m.PositiveSpans) == cap(m.PositiveSpans) { + m.PositiveSpans = append(m.PositiveSpans, &BucketSpan{}) + } else { + m.PositiveSpans = m.PositiveSpans[:len(m.PositiveSpans)+1] + if m.PositiveSpans[len(m.PositiveSpans)-1] == nil { + m.PositiveSpans[len(m.PositiveSpans)-1] = &BucketSpan{} + } + } if err := m.PositiveSpans[len(m.PositiveSpans)-1].UnmarshalVTUnsafe(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -4670,7 +4927,7 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error { } } elementCount = count - if elementCount != 0 && len(m.PositiveDeltas) == 0 { + if elementCount != 0 && len(m.PositiveDeltas) == 0 && cap(m.PositiveDeltas) < elementCount { m.PositiveDeltas = make([]int64, 0, elementCount) } for iNdEx < postIndex { @@ -4733,7 +4990,7 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error { } var elementCount int elementCount = packedLen / 8 - if elementCount != 0 && len(m.PositiveCounts) == 0 { + if elementCount != 0 && len(m.PositiveCounts) == 0 && cap(m.PositiveCounts) < elementCount { m.PositiveCounts = make([]float64, 0, elementCount) } for iNdEx < postIndex { @@ -4825,7 +5082,7 @@ func (m *Histogram) UnmarshalVTUnsafe(dAtA []byte) error { } var elementCount int elementCount = packedLen / 8 - if elementCount != 0 && len(m.CustomValues) == 0 { + if elementCount != 0 && len(m.CustomValues) == 0 && cap(m.CustomValues) < elementCount { m.CustomValues = make([]float64, 0, elementCount) } for iNdEx < postIndex { diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index c8b3d487e7..09d8369423 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -278,13 +278,13 @@ func TestReadClient(t *testing.T) { require.True(t, ok) cw := NewChunkedWriter(w, flusher) - l := []prompb.Label{ + l := []*prompb.Label{ {Name: "foo", Value: "bar"}, } chunks := buildTestChunks(t) for i, c := range chunks { - cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}} + cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []*prompb.Chunk{c}} readResp := prompb.ChunkedReadResponse{ ChunkedSeries: []*prompb.ChunkedSeries{&cSeries}, QueryIndex: int64(i), @@ -407,24 +407,24 @@ func sampledResponseHTTPHandler(t *testing.T) http.HandlerFunc { { Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ {Name: "foo2", Value: "bar"}, }, - Samples: []prompb.Sample{ + Samples: []*prompb.Sample{ {Value: float64(1), Timestamp: int64(0)}, {Value: float64(2), Timestamp: int64(5)}, }, - Exemplars: []prompb.Exemplar{}, + Exemplars: []*prompb.Exemplar{}, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ {Name: "foo1", Value: "bar"}, }, - Samples: []prompb.Sample{ + Samples: []*prompb.Sample{ {Value: float64(3), Timestamp: int64(0)}, {Value: float64(4), Timestamp: int64(5)}, }, - Exemplars: []prompb.Exemplar{}, + Exemplars: []*prompb.Exemplar{}, }, }, }, diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 40bc5eb424..48ef05bea0 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -234,7 +234,7 @@ func StreamChunkedReadResponses( for ss.Next() { series := ss.At() iter = series.Iterator(iter) - lbls = MergeLabels(prompb.FromLabels(series.Labels(), lbls), sortedExternalLabels) + lbls = MergeLabels(prompb.FromLabels(series.Labels(), lbls), sortedExternalLabels) maxDataLength := maxBytesInFrame for _, lbl := range lbls { @@ -863,7 +863,7 @@ func DecodeWriteV2Request(r io.Reader) (*writev2.Request, error) { } var req writev2.Request - if err := proto.Unmarshal(reqBuf, &req); err != nil { + if err := req.UnmarshalVT(reqBuf); err != nil { return nil, err } diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index bec422f0ae..c5da12c6d8 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -579,7 +579,7 @@ func TestDecodeWriteRequest(t *testing.T) { actual, err := DecodeWriteRequest(bytes.NewReader(buf)) require.NoError(t, err) - require.Equal(t, writeRequestFixture, actual) + require.True(t, proto.Equal(writeRequestFixture, actual)) } func TestDecodeWriteV2Request(t *testing.T) { @@ -799,14 +799,14 @@ func TestChunkedSeriesIterator(t *testing.T) { }) t.Run("empty chunks", func(t *testing.T) { - emptyChunks := make([]prompb.Chunk, 0) + emptyChunks := make([]*prompb.Chunk, 0) it1 := newChunkedSeriesIterator(emptyChunks, 0, 1000) require.Equal(t, chunkenc.ValNone, it1.Next()) require.Equal(t, chunkenc.ValNone, it1.Seek(1000)) require.NoError(t, it1.Err()) - var nilChunks []prompb.Chunk + var nilChunks []*prompb.Chunk it2 := newChunkedSeriesIterator(nilChunks, 0, 1000) require.Equal(t, chunkenc.ValNone, it2.Next()) @@ -821,7 +821,7 @@ func TestChunkedSeries(t *testing.T) { s := chunkedSeries{ ChunkedSeries: prompb.ChunkedSeries{ - Labels: []prompb.Label{ + Labels: []*prompb.Label{ {Name: "foo", Value: "bar"}, {Name: "asdf", Value: "zxcv"}, }, @@ -852,12 +852,12 @@ func TestChunkedSeriesSet(t *testing.T) { r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil) chks := buildTestChunks(t) - l := []prompb.Label{ + l := []*prompb.Label{ {Name: "foo", Value: "bar"}, } for i, c := range chks { - cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}} + cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []*prompb.Chunk{c}} readResp := prompb.ChunkedReadResponse{ ChunkedSeries: []*prompb.ChunkedSeries{&cSeries}, QueryIndex: int64(i), @@ -905,12 +905,12 @@ func TestChunkedSeriesSet(t *testing.T) { r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil) chks := buildTestChunks(t) - l := []prompb.Label{ + l := []*prompb.Label{ {Name: "foo", Value: "bar"}, } for i, c := range chks { - cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}} + cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []*prompb.Chunk{c}} readResp := prompb.ChunkedReadResponse{ ChunkedSeries: []*prompb.ChunkedSeries{&cSeries}, QueryIndex: int64(i), @@ -945,9 +945,9 @@ const ( numSamplesPerTestChunk = 5 ) -func buildTestChunks(t *testing.T) []prompb.Chunk { +func buildTestChunks(t *testing.T) []*prompb.Chunk { startTime := int64(0) - chks := make([]prompb.Chunk, 0, numTestChunks) + chks := make([]*prompb.Chunk, 0, numTestChunks) time := startTime @@ -964,7 +964,7 @@ func buildTestChunks(t *testing.T) []prompb.Chunk { time += int64(1000) } - chks = append(chks, prompb.Chunk{ + chks = append(chks, &prompb.Chunk{ MinTimeMs: minTimeMs, MaxTimeMs: time, Type: prompb.Chunk_XOR, diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index ce19582acf..84f0756d56 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -2203,16 +2203,21 @@ func buildV2WriteRequest(logger log.Logger, samples []*writev2.TimeSeries, label level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms) } - req := &writev2.Request{ - Symbols: labels, - Timeseries: timeSeries, - } - + //req := &writev2.Request{ + // Symbols: labels, + // Timeseries: timeSeries, + //} + req := writev2.RequestFromVTPool() + req.Symbols = labels + req.Timeseries = timeSeries if pBuf == nil { pBuf = &[]byte{} // For convenience in tests. Not efficient. } data, err := req.MarshalVT() + req.Symbols = []string{} + req.Timeseries = []*writev2.TimeSeries{} + req.ReturnToVTPool() if err != nil { return nil, highest, lowest, err } diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 81595b0abe..e4bbdf8594 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -190,15 +190,17 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // Remote Write 2.x proto message handling. - var req writev2.Request - if err := proto.Unmarshal(decompressed, &req); err != nil { + req := writev2.RequestFromVTPool() + // Timeseries as well + if err := proto.Unmarshal(decompressed, req); err != nil { // TODO(bwplotka): Add more context to responded error? level.Error(h.logger).Log("msg", "Error decoding v2 remote write request", "protobuf_message", msgType, "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return } - respStats, errHTTPCode, err := h.writeV2(r.Context(), &req) + respStats, errHTTPCode, err := h.writeV2(r.Context(), req) + req.ReturnToVTPool() // Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases. respStats.SetHeaders(w) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 3a9b066984..6a69cb5f7b 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -342,7 +342,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { desc: "Partial write; first series with duplicate labels", input: append( // Series with __name__="test_metric1",test_metric1="test_metric1",test_metric1="test_metric1" labels. - []writev2.TimeSeries{{LabelsRefs: []uint32{1, 2, 2, 2, 2, 2}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}}, + []*writev2.TimeSeries{{LabelsRefs: []uint32{1, 2, 2, 2, 2, 2}, Samples: []*writev2.Sample{{Value: 1, Timestamp: 1}}}}, writeV2RequestFixture.Timeseries...), expectedCode: http.StatusBadRequest, expectedRespBody: "invalid labels for series, labels {__name__=\"test_metric1\", test_metric1=\"test_metric1\", test_metric1=\"test_metric1\"}, duplicated label test_metric1\n",