Remote-write: reuse memory for marshalling (#9412)

By holding a `proto.Buffer` per shard and passing it down to where
marshalling is done, we avoid creating a lot of garbage.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2021-10-29 22:44:40 +01:00 committed by GitHub
parent bc72a718c4
commit 5afa606ecb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 20 deletions

View file

@ -292,7 +292,7 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
} }
func TestDecodeWriteRequest(t *testing.T) { func TestDecodeWriteRequest(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
actual, err := DecodeWriteRequest(bytes.NewReader(buf)) actual, err := DecodeWriteRequest(bytes.NewReader(buf))

View file

@ -433,7 +433,7 @@ func NewQueueManager(
return t return t
} }
// AppendMetadata sends metadata the remote storage. Metadata is sent all at once and is not parallelized. // AppendMetadata sends metadata the remote storage. Metadata is sent in batches, but is not parallelized.
func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) { func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) {
mm := make([]prompb.MetricMetadata, 0, len(metadata)) mm := make([]prompb.MetricMetadata, 0, len(metadata))
for _, entry := range metadata { for _, entry := range metadata {
@ -445,13 +445,14 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met
}) })
} }
pBuf := proto.NewBuffer(nil)
numSends := int(math.Ceil(float64(len(metadata)) / float64(t.mcfg.MaxSamplesPerSend))) numSends := int(math.Ceil(float64(len(metadata)) / float64(t.mcfg.MaxSamplesPerSend)))
for i := 0; i < numSends; i++ { for i := 0; i < numSends; i++ {
last := (i + 1) * t.mcfg.MaxSamplesPerSend last := (i + 1) * t.mcfg.MaxSamplesPerSend
if last > len(metadata) { if last > len(metadata) {
last = len(metadata) last = len(metadata)
} }
err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last]) err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last], pBuf)
if err != nil { if err != nil {
t.metrics.failedMetadataTotal.Add(float64(last - (i * t.mcfg.MaxSamplesPerSend))) t.metrics.failedMetadataTotal.Add(float64(last - (i * t.mcfg.MaxSamplesPerSend)))
level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", last-(i*t.mcfg.MaxSamplesPerSend), "err", err) level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", last-(i*t.mcfg.MaxSamplesPerSend), "err", err)
@ -459,9 +460,9 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met
} }
} }
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata) error { func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples. // Build the WriteRequest with no samples.
req, _, err := buildWriteRequest(nil, metadata, nil) req, _, err := buildWriteRequest(nil, metadata, pBuf, nil)
if err != nil { if err != nil {
return err return err
} }
@ -1040,7 +1041,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
max = s.qm.cfg.MaxSamplesPerSend max = s.qm.cfg.MaxSamplesPerSend
nPending, nPendingSamples, nPendingExemplars = 0, 0, 0 nPending, nPendingSamples, nPendingExemplars = 0, 0, 0
buf []byte pBuf = proto.NewBuffer(nil)
buf []byte
) )
if s.qm.sendExemplars { if s.qm.sendExemplars {
max += int(float64(max) * 0.1) max += int(float64(max) * 0.1)
@ -1084,7 +1086,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
if !ok { if !ok {
if nPendingSamples > 0 || nPendingExemplars > 0 { if nPendingSamples > 0 || nPendingExemplars > 0 {
level.Debug(s.qm.logger).Log("msg", "Flushing data to remote storage...", "samples", nPendingSamples, "exemplars", nPendingExemplars) level.Debug(s.qm.logger).Log("msg", "Flushing data to remote storage...", "samples", nPendingSamples, "exemplars", nPendingExemplars)
s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf) s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples))
s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars))
level.Debug(s.qm.logger).Log("msg", "Done flushing.") level.Debug(s.qm.logger).Log("msg", "Done flushing.")
@ -1114,7 +1116,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
} }
if nPending >= max { if nPending >= max {
s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf) s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples))
s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars))
nPendingSamples = 0 nPendingSamples = 0
@ -1128,7 +1130,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
case <-timer.C: case <-timer.C:
if nPendingSamples > 0 || nPendingExemplars > 0 { if nPendingSamples > 0 || nPendingExemplars > 0 {
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf) s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples))
s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars))
nPendingSamples = 0 nPendingSamples = 0
@ -1140,9 +1142,9 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
} }
} }
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, buf *[]byte) { func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now() begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, buf) err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, pBuf, buf)
if err != nil { if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
@ -1157,9 +1159,9 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
} }
// sendSamples to the remote storage with backoff for recoverable errors. // sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, buf *[]byte) error { func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata. // Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, nil, *buf) req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
if err != nil { if err != nil {
// Failing to build the write request is non-recoverable, since it will // Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails. // only error if marshaling the proto to bytes fails.
@ -1266,7 +1268,7 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
} }
} }
func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, buf []byte) ([]byte, int64, error) { func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
var highest int64 var highest int64
for _, ts := range samples { for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it. // At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1283,7 +1285,12 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
Metadata: metadata, Metadata: metadata,
} }
data, err := proto.Marshal(req) if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil { if err != nil {
return nil, highest, err return nil, highest, err
} }
@ -1293,6 +1300,6 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
if buf != nil { if buf != nil {
buf = buf[0:cap(buf)] buf = buf[0:cap(buf)]
} }
compressed := snappy.Encode(buf, data) compressed := snappy.Encode(buf, pBuf.Bytes())
return compressed, highest, nil return compressed, highest, nil
} }

View file

@ -32,7 +32,7 @@ import (
) )
func TestRemoteWriteHandler(t *testing.T) { func TestRemoteWriteHandler(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -68,7 +68,7 @@ func TestOutOfOrderSample(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
}}, nil, nil) }}, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -93,7 +93,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
}}, nil, nil) }}, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -113,7 +113,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
} }
func TestCommitErr(t *testing.T) { func TestCommitErr(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))