From 5dbe3c191961b228c3eae08a5d827271d590c745 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Pazos?= Date: Thu, 9 Nov 2023 13:00:16 -0300 Subject: [PATCH] manually optimize varint marshaling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nicolás Pazos --- prompb/custom.go | 129 ++++++++++++++++++++++++++++++++ prompb/custom_test.go | 77 +++++++++++++++++++ storage/remote/queue_manager.go | 22 +++--- 3 files changed, 217 insertions(+), 11 deletions(-) create mode 100644 prompb/custom_test.go diff --git a/prompb/custom.go b/prompb/custom.go index 13d6e0f0c..7f97b2734 100644 --- a/prompb/custom.go +++ b/prompb/custom.go @@ -14,6 +14,7 @@ package prompb import ( + "slices" "sync" ) @@ -37,3 +38,131 @@ func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) { } return r.Marshal() } + +func (m *MinimizedWriteRequest) OptimizedMarshal(dst []byte) ([]byte, error) { + siz := m.Size() + if cap(dst) < siz { + dst = make([]byte, siz) + } + dst = dst[:siz] + + n, err := m.OptimizedMarshalToSizedBuffer(dst) + if err != nil { + return nil, err + } + return (dst)[:n], nil +} + +// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer, +// but calls OptimizedMarshalToSizedBuffer on the timeseries. +func (m *MinimizedWriteRequest) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.Symbols) > 0 { + i -= len(m.Symbols) + copy(dAtA[i:], m.Symbols) + i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols))) + i-- + dAtA[i] = 0x22 + } + if len(m.Timeseries) > 0 { + for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Timeseries[iNdEx].OptimizedMarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRemote(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer, +// but marshals m.LabelSymbols in place without extra allocations. +func (m *MinimizedTimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.Histograms) > 0 { + for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Histograms[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + } + if len(m.Exemplars) > 0 { + for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Exemplars[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + } + if len(m.Samples) > 0 { + for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if len(m.LabelSymbols) > 0 { + // This is the trick: encode the varints in reverse order to make it easier + // to do it in place. Then reverse the whole thing. + var j10 int + start := i + for _, num := range m.LabelSymbols { + for num >= 1<<7 { + dAtA[i-1] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + i-- + j10++ + } + dAtA[i-1] = uint8(num) + i-- + j10++ + } + slices.Reverse(dAtA[i:start]) + // --- end of trick + + i = encodeVarintTypes(dAtA, i, uint64(j10)) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} diff --git a/prompb/custom_test.go b/prompb/custom_test.go new file mode 100644 index 000000000..b12f49a33 --- /dev/null +++ b/prompb/custom_test.go @@ -0,0 +1,77 @@ +package prompb + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestOptimizedMarshal(t *testing.T) { + var got []byte + + tests := []struct { + name string + m *MinimizedWriteRequest + }{ + // { + // name: "empty", + // m: &MinimizedWriteRequest{}, + // }, + { + name: "simple", + m: &MinimizedWriteRequest{ + Timeseries: []MinimizedTimeSeries{ + { + LabelSymbols: []uint32{ + 0, 10, + 10, 3, + 13, 3, + 16, 6, + 22, 3, + 25, 5, + 30, 3, + 33, 7, + }, + Samples: []Sample{{Value: 1, Timestamp: 0}}, + Exemplars: []Exemplar{{Labels: []Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}}, + Histograms: nil, + }, + { + LabelSymbols: []uint32{ + 0, 10, + 10, 3, + 13, 3, + 16, 6, + 22, 3, + 25, 5, + 30, 3, + 33, 7, + }, + Samples: []Sample{{Value: 2, Timestamp: 1}}, + Exemplars: []Exemplar{{Labels: []Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}}, + Histograms: nil, + }, + }, + // 40 chars + Symbols: "abcdefghijabcdefghijabcdefghijabcdefghij", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got = got[:0] + // should be the same as the standard marshal + expected, err := tt.m.Marshal() + assert.NoError(t, err) + got, err = tt.m.OptimizedMarshal(got) + assert.NoError(t, err) + assert.Equal(t, expected, got) + + // round trip + m := &MinimizedWriteRequest{} + assert.NoError(t, m.Unmarshal(got)) + assert.Equal(t, tt.m, m) + }) + } +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 35f311c1d..fd732184f 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1359,8 +1359,9 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { var ( max = s.qm.cfg.MaxSamplesPerSend - pBuf = proto.NewBuffer(nil) - buf []byte + pBuf = proto.NewBuffer(nil) + pBufRaw []byte + buf []byte ) if s.qm.sendExemplars { max += int(float64(max) * 0.1) @@ -1426,7 +1427,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) symbolTable.clear() } else if s.qm.internFormat && !s.qm.secondInternFormat { // the new internFormat feature flag is be set @@ -1534,7 +1535,7 @@ func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.Reduce s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) } -func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { +func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf *[]byte, buf *[]byte) { begin := time.Now() // Build the ReducedWriteRequest with no metadata. // Failing to build the write request is non-recoverable, since it will @@ -1907,7 +1908,7 @@ func (r *rwSymbolTable) clear() { r.symbols.Reset() } -func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) { +func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf *[]byte, buf *[]byte) ([]byte, int64, error) { var highest int64 for _, ts := range samples { // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. @@ -1928,14 +1929,13 @@ func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels str } if pBuf == nil { - pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient. - } else { - pBuf.Reset() + pBuf = &[]byte{} // For convenience in tests. Not efficient. } - err := pBuf.Marshal(req) + data, err := req.OptimizedMarshal(*pBuf) if err != nil { return nil, 0, err } + *pBuf = data // snappy uses len() to see if it needs to allocate a new slice. Make the // buffer as long as possible. @@ -1945,8 +1945,8 @@ func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels str buf = &[]byte{} } - compressed := snappy.Encode(*buf, pBuf.Bytes()) - if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) { + compressed := snappy.Encode(*buf, data) + if n := snappy.MaxEncodedLen(len(data)); buf != nil && n > len(*buf) { // grow the buffer for the next time *buf = make([]byte, n) }