manually optimize varint marshaling

Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Nicolás Pazos 2023-11-09 13:00:16 -03:00
parent 761efc860e
commit c9b6dddef9
3 changed files with 217 additions and 11 deletions

View file

@ -14,6 +14,7 @@
package prompb
import (
"slices"
"sync"
)
@ -37,3 +38,131 @@ func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
}
return r.Marshal()
}
func (m *MinimizedWriteRequest) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.Size()
if cap(dst) < siz {
dst = make([]byte, siz)
}
dst = dst[:siz]
n, err := m.OptimizedMarshalToSizedBuffer(dst)
if err != nil {
return nil, err
}
return (dst)[:n], nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but calls OptimizedMarshalToSizedBuffer on the timeseries.
func (m *MinimizedWriteRequest) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
i -= len(m.Symbols)
copy(dAtA[i:], m.Symbols)
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols)))
i--
dAtA[i] = 0x22
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].OptimizedMarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but marshals m.LabelSymbols in place without extra allocations.
func (m *MinimizedTimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Histograms) > 0 {
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Histograms[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x22
}
}
if len(m.Exemplars) > 0 {
for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Exemplars[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Samples) > 0 {
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.LabelSymbols) > 0 {
// This is the trick: encode the varints in reverse order to make it easier
// to do it in place. Then reverse the whole thing.
var j10 int
start := i
for _, num := range m.LabelSymbols {
for num >= 1<<7 {
dAtA[i-1] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
i--
j10++
}
dAtA[i-1] = uint8(num)
i--
j10++
}
slices.Reverse(dAtA[i:start])
// --- end of trick
i = encodeVarintTypes(dAtA, i, uint64(j10))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}

77
prompb/custom_test.go Normal file
View file

@ -0,0 +1,77 @@
package prompb
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestOptimizedMarshal(t *testing.T) {
var got []byte
tests := []struct {
name string
m *MinimizedWriteRequest
}{
// {
// name: "empty",
// m: &MinimizedWriteRequest{},
// },
{
name: "simple",
m: &MinimizedWriteRequest{
Timeseries: []MinimizedTimeSeries{
{
LabelSymbols: []uint32{
0, 10,
10, 3,
13, 3,
16, 6,
22, 3,
25, 5,
30, 3,
33, 7,
},
Samples: []Sample{{Value: 1, Timestamp: 0}},
Exemplars: []Exemplar{{Labels: []Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
Histograms: nil,
},
{
LabelSymbols: []uint32{
0, 10,
10, 3,
13, 3,
16, 6,
22, 3,
25, 5,
30, 3,
33, 7,
},
Samples: []Sample{{Value: 2, Timestamp: 1}},
Exemplars: []Exemplar{{Labels: []Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
Histograms: nil,
},
},
// 40 chars
Symbols: "abcdefghijabcdefghijabcdefghijabcdefghij",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got = got[:0]
// should be the same as the standard marshal
expected, err := tt.m.Marshal()
assert.NoError(t, err)
got, err = tt.m.OptimizedMarshal(got)
assert.NoError(t, err)
assert.Equal(t, expected, got)
// round trip
m := &MinimizedWriteRequest{}
assert.NoError(t, m.Unmarshal(got))
assert.Equal(t, tt.m, m)
})
}
}

View file

@ -1359,8 +1359,9 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
var (
max = s.qm.cfg.MaxSamplesPerSend
pBuf = proto.NewBuffer(nil)
buf []byte
pBuf = proto.NewBuffer(nil)
pBufRaw []byte
buf []byte
)
if s.qm.sendExemplars {
max += int(float64(max) * 0.1)
@ -1426,7 +1427,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
symbolTable.clear()
} else if s.qm.internFormat && !s.qm.secondInternFormat {
// the new internFormat feature flag is be set
@ -1534,7 +1535,7 @@ func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.Reduce
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf *[]byte, buf *[]byte) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
@ -1907,7 +1908,7 @@ func (r *rwSymbolTable) clear() {
r.symbols.Reset()
}
func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf *[]byte, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1928,14 +1929,13 @@ func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels str
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
pBuf = &[]byte{} // For convenience in tests. Not efficient.
}
err := pBuf.Marshal(req)
data, err := req.OptimizedMarshal(*pBuf)
if err != nil {
return nil, 0, err
}
*pBuf = data
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
@ -1945,8 +1945,8 @@ func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels str
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, pBuf.Bytes())
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) {
compressed := snappy.Encode(*buf, data)
if n := snappy.MaxEncodedLen(len(data)); buf != nil && n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}