mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-30 15:12:27 -08:00
manually optimize varint marshaling
Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
parent
0584610080
commit
5dbe3c1919
129
prompb/custom.go
129
prompb/custom.go
|
@ -14,6 +14,7 @@
|
||||||
package prompb
|
package prompb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -37,3 +38,131 @@ func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
|
||||||
}
|
}
|
||||||
return r.Marshal()
|
return r.Marshal()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MinimizedWriteRequest) OptimizedMarshal(dst []byte) ([]byte, error) {
|
||||||
|
siz := m.Size()
|
||||||
|
if cap(dst) < siz {
|
||||||
|
dst = make([]byte, siz)
|
||||||
|
}
|
||||||
|
dst = dst[:siz]
|
||||||
|
|
||||||
|
n, err := m.OptimizedMarshalToSizedBuffer(dst)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return (dst)[:n], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
|
||||||
|
// but calls OptimizedMarshalToSizedBuffer on the timeseries.
|
||||||
|
func (m *MinimizedWriteRequest) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||||
|
i := len(dAtA)
|
||||||
|
_ = i
|
||||||
|
var l int
|
||||||
|
_ = l
|
||||||
|
if m.XXX_unrecognized != nil {
|
||||||
|
i -= len(m.XXX_unrecognized)
|
||||||
|
copy(dAtA[i:], m.XXX_unrecognized)
|
||||||
|
}
|
||||||
|
if len(m.Symbols) > 0 {
|
||||||
|
i -= len(m.Symbols)
|
||||||
|
copy(dAtA[i:], m.Symbols)
|
||||||
|
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols)))
|
||||||
|
i--
|
||||||
|
dAtA[i] = 0x22
|
||||||
|
}
|
||||||
|
if len(m.Timeseries) > 0 {
|
||||||
|
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
|
||||||
|
{
|
||||||
|
size, err := m.Timeseries[iNdEx].OptimizedMarshalToSizedBuffer(dAtA[:i])
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
i -= size
|
||||||
|
i = encodeVarintRemote(dAtA, i, uint64(size))
|
||||||
|
}
|
||||||
|
i--
|
||||||
|
dAtA[i] = 0xa
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return len(dAtA) - i, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
|
||||||
|
// but marshals m.LabelSymbols in place without extra allocations.
|
||||||
|
func (m *MinimizedTimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||||
|
i := len(dAtA)
|
||||||
|
_ = i
|
||||||
|
var l int
|
||||||
|
_ = l
|
||||||
|
if m.XXX_unrecognized != nil {
|
||||||
|
i -= len(m.XXX_unrecognized)
|
||||||
|
copy(dAtA[i:], m.XXX_unrecognized)
|
||||||
|
}
|
||||||
|
if len(m.Histograms) > 0 {
|
||||||
|
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
|
||||||
|
{
|
||||||
|
size, err := m.Histograms[iNdEx].MarshalToSizedBuffer(dAtA[:i])
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
i -= size
|
||||||
|
i = encodeVarintTypes(dAtA, i, uint64(size))
|
||||||
|
}
|
||||||
|
i--
|
||||||
|
dAtA[i] = 0x22
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(m.Exemplars) > 0 {
|
||||||
|
for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- {
|
||||||
|
{
|
||||||
|
size, err := m.Exemplars[iNdEx].MarshalToSizedBuffer(dAtA[:i])
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
i -= size
|
||||||
|
i = encodeVarintTypes(dAtA, i, uint64(size))
|
||||||
|
}
|
||||||
|
i--
|
||||||
|
dAtA[i] = 0x1a
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(m.Samples) > 0 {
|
||||||
|
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
|
||||||
|
{
|
||||||
|
size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i])
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
i -= size
|
||||||
|
i = encodeVarintTypes(dAtA, i, uint64(size))
|
||||||
|
}
|
||||||
|
i--
|
||||||
|
dAtA[i] = 0x12
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(m.LabelSymbols) > 0 {
|
||||||
|
// This is the trick: encode the varints in reverse order to make it easier
|
||||||
|
// to do it in place. Then reverse the whole thing.
|
||||||
|
var j10 int
|
||||||
|
start := i
|
||||||
|
for _, num := range m.LabelSymbols {
|
||||||
|
for num >= 1<<7 {
|
||||||
|
dAtA[i-1] = uint8(uint64(num)&0x7f | 0x80)
|
||||||
|
num >>= 7
|
||||||
|
i--
|
||||||
|
j10++
|
||||||
|
}
|
||||||
|
dAtA[i-1] = uint8(num)
|
||||||
|
i--
|
||||||
|
j10++
|
||||||
|
}
|
||||||
|
slices.Reverse(dAtA[i:start])
|
||||||
|
// --- end of trick
|
||||||
|
|
||||||
|
i = encodeVarintTypes(dAtA, i, uint64(j10))
|
||||||
|
i--
|
||||||
|
dAtA[i] = 0xa
|
||||||
|
}
|
||||||
|
return len(dAtA) - i, nil
|
||||||
|
}
|
||||||
|
|
77
prompb/custom_test.go
Normal file
77
prompb/custom_test.go
Normal file
|
@ -0,0 +1,77 @@
|
||||||
|
package prompb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestOptimizedMarshal(t *testing.T) {
|
||||||
|
var got []byte
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
m *MinimizedWriteRequest
|
||||||
|
}{
|
||||||
|
// {
|
||||||
|
// name: "empty",
|
||||||
|
// m: &MinimizedWriteRequest{},
|
||||||
|
// },
|
||||||
|
{
|
||||||
|
name: "simple",
|
||||||
|
m: &MinimizedWriteRequest{
|
||||||
|
Timeseries: []MinimizedTimeSeries{
|
||||||
|
{
|
||||||
|
LabelSymbols: []uint32{
|
||||||
|
0, 10,
|
||||||
|
10, 3,
|
||||||
|
13, 3,
|
||||||
|
16, 6,
|
||||||
|
22, 3,
|
||||||
|
25, 5,
|
||||||
|
30, 3,
|
||||||
|
33, 7,
|
||||||
|
},
|
||||||
|
Samples: []Sample{{Value: 1, Timestamp: 0}},
|
||||||
|
Exemplars: []Exemplar{{Labels: []Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
|
||||||
|
Histograms: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
LabelSymbols: []uint32{
|
||||||
|
0, 10,
|
||||||
|
10, 3,
|
||||||
|
13, 3,
|
||||||
|
16, 6,
|
||||||
|
22, 3,
|
||||||
|
25, 5,
|
||||||
|
30, 3,
|
||||||
|
33, 7,
|
||||||
|
},
|
||||||
|
Samples: []Sample{{Value: 2, Timestamp: 1}},
|
||||||
|
Exemplars: []Exemplar{{Labels: []Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
|
||||||
|
Histograms: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// 40 chars
|
||||||
|
Symbols: "abcdefghijabcdefghijabcdefghijabcdefghij",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got = got[:0]
|
||||||
|
// should be the same as the standard marshal
|
||||||
|
expected, err := tt.m.Marshal()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
got, err = tt.m.OptimizedMarshal(got)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, expected, got)
|
||||||
|
|
||||||
|
// round trip
|
||||||
|
m := &MinimizedWriteRequest{}
|
||||||
|
assert.NoError(t, m.Unmarshal(got))
|
||||||
|
assert.Equal(t, tt.m, m)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -1359,8 +1359,9 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
||||||
var (
|
var (
|
||||||
max = s.qm.cfg.MaxSamplesPerSend
|
max = s.qm.cfg.MaxSamplesPerSend
|
||||||
|
|
||||||
pBuf = proto.NewBuffer(nil)
|
pBuf = proto.NewBuffer(nil)
|
||||||
buf []byte
|
pBufRaw []byte
|
||||||
|
buf []byte
|
||||||
)
|
)
|
||||||
if s.qm.sendExemplars {
|
if s.qm.sendExemplars {
|
||||||
max += int(float64(max) * 0.1)
|
max += int(float64(max) * 0.1)
|
||||||
|
@ -1426,7 +1427,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
||||||
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
||||||
|
|
||||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||||
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
|
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
|
||||||
symbolTable.clear()
|
symbolTable.clear()
|
||||||
} else if s.qm.internFormat && !s.qm.secondInternFormat {
|
} else if s.qm.internFormat && !s.qm.secondInternFormat {
|
||||||
// the new internFormat feature flag is be set
|
// the new internFormat feature flag is be set
|
||||||
|
@ -1534,7 +1535,7 @@ func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.Reduce
|
||||||
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
|
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
|
func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf *[]byte, buf *[]byte) {
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
// Build the ReducedWriteRequest with no metadata.
|
// Build the ReducedWriteRequest with no metadata.
|
||||||
// Failing to build the write request is non-recoverable, since it will
|
// Failing to build the write request is non-recoverable, since it will
|
||||||
|
@ -1907,7 +1908,7 @@ func (r *rwSymbolTable) clear() {
|
||||||
r.symbols.Reset()
|
r.symbols.Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
|
func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf *[]byte, buf *[]byte) ([]byte, int64, error) {
|
||||||
var highest int64
|
var highest int64
|
||||||
for _, ts := range samples {
|
for _, ts := range samples {
|
||||||
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
|
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
|
||||||
|
@ -1928,14 +1929,13 @@ func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels str
|
||||||
}
|
}
|
||||||
|
|
||||||
if pBuf == nil {
|
if pBuf == nil {
|
||||||
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
|
pBuf = &[]byte{} // For convenience in tests. Not efficient.
|
||||||
} else {
|
|
||||||
pBuf.Reset()
|
|
||||||
}
|
}
|
||||||
err := pBuf.Marshal(req)
|
data, err := req.OptimizedMarshal(*pBuf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
*pBuf = data
|
||||||
|
|
||||||
// snappy uses len() to see if it needs to allocate a new slice. Make the
|
// snappy uses len() to see if it needs to allocate a new slice. Make the
|
||||||
// buffer as long as possible.
|
// buffer as long as possible.
|
||||||
|
@ -1945,8 +1945,8 @@ func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels str
|
||||||
buf = &[]byte{}
|
buf = &[]byte{}
|
||||||
}
|
}
|
||||||
|
|
||||||
compressed := snappy.Encode(*buf, pBuf.Bytes())
|
compressed := snappy.Encode(*buf, data)
|
||||||
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) {
|
if n := snappy.MaxEncodedLen(len(data)); buf != nil && n > len(*buf) {
|
||||||
// grow the buffer for the next time
|
// grow the buffer for the next time
|
||||||
*buf = make([]byte, n)
|
*buf = make([]byte, n)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue