Improve sender benchmarks and some allocations

Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Nicolás Pazos 2023-10-11 12:10:00 -03:00
parent 9b4fbd9552
commit c710e19f9c
3 changed files with 212 additions and 132 deletions

View file

@ -771,15 +771,12 @@ func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels {
}
func labelRefProtosToLabels(st map[uint64]string, lbls []prompb.LabelRef) labels.Labels {
result := make(labels.Labels, 0, len(lbls))
b := labels.NewScratchBuilder(len(lbls))
for _, l := range lbls {
result = append(result, labels.Label{
Name: st[l.NameRef],
Value: st[l.ValueRef],
})
b.Add(st[l.NameRef], st[l.ValueRef])
}
sort.Sort(result)
return result
b.Sort()
return b.Labels()
}
func exemplarRefProtoToExemplar(st map[uint64]string, ep prompb.ExemplarRef) exemplar.Exemplar {
@ -806,6 +803,20 @@ func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label
return result
}
// labelsToLabelsRefProto transforms labels into prompb LabelRefs. The buffer slice
// will be used to avoid allocations if it is big enough to store the labels.
func labelsToLabelRefsProto(lbls labels.Labels, pool *lookupPool, buf []prompb.LabelRef) []prompb.LabelRef {
result := buf[:0]
lbls.Range(func(l labels.Label) {
result = append(result, prompb.LabelRef{
NameRef: pool.intern(l.Name),
ValueRef: pool.intern(l.Value),
})
})
return result
}
// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum.
func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_MetricType {
mt := strings.ToUpper(string(t))

View file

@ -1412,12 +1412,12 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
return
}
if s.qm.internFormat {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData)
nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
pool.clear()
} else {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
}
@ -1430,7 +1430,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
batch := queue.Batch()
if len(batch) > 0 {
if s.qm.internFormat {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData)
nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
@ -1438,7 +1438,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
pool.clear()
} else {
nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData)
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
@ -1451,14 +1451,14 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
}
func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int) {
func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if s.qm.sendExemplars {
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if s.qm.sendNativeHistograms {
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
@ -1495,7 +1495,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
// Build the WriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
req, highest, err := buildWriteRequest(samples, nil, pBuf, buf)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
@ -1507,7 +1507,7 @@ func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.Reduce
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, *buf)
req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, buf)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
@ -1596,14 +1596,14 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, samp
return err
}
func (s *shards) populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, pendingData []prompb.ReducedTimeSeries) (int, int, int) {
func populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, pendingData []prompb.ReducedTimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if s.qm.sendExemplars {
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if s.qm.sendNativeHistograms {
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
@ -1611,12 +1611,7 @@ func (s *shards) populateReducedTimeSeries(pool *lookupPool, batch []timeSeries,
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Labels = make([]prompb.LabelRef, len(d.seriesLabels))
for i, sl := range d.seriesLabels {
nRef := pool.intern(sl.Name)
vRef := pool.intern(sl.Value)
pendingData[nPending].Labels[i] = prompb.LabelRef{NameRef: nRef, ValueRef: vRef}
}
pendingData[nPending].Labels = labelsToLabelRefsProto(d.seriesLabels, pool, pendingData[nPending].Labels)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
@ -1701,7 +1696,7 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
}
}
func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1734,13 +1729,20 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
buf = buf[0:cap(buf)]
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := reSnappy.Encode(buf, pBuf.Bytes())
compressed := reSnappy.Encode(*buf, pBuf.Bytes())
if n := reSnappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
return compressed, highest, nil
}
func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1773,10 +1775,15 @@ func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uin
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
buf = buf[0:cap(buf)]
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := reSnappy.Encode(buf, pBuf.Bytes())
compressed := reSnappy.Encode(*buf, pBuf.Bytes())
if n := reSnappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
return compressed, highest, nil
}

View file

@ -619,60 +619,36 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([
return samples, series
}
func createTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Label) []prompb.TimeSeries {
// samples := make([]record.RefSample, 0, numSamples)
series := make([]prompb.TimeSeries, 0, numSeries)
for i := 0; i < numSeries; i++ {
func createDummyTimeseriesBatch(numSeries int, extraLabels ...labels.Label) []timeSeries {
result := make([]timeSeries, numSeries)
for i := range result {
name := fmt.Sprintf("test_metric_%d", i)
// for j := 0; j < numSamples; j++ {
sample := prompb.Sample{
Value: float64(i),
Timestamp: int64(i),
}
// }
rand.Shuffle(len(extraLabels), func(i, j int) {
extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i]
})
series = append(series, prompb.TimeSeries{
Labels: labelsToLabelsProto(labels.Labels{{Name: "__name__", Value: name}, extraLabels[0], extraLabels[1], extraLabels[2]}, nil),
Samples: []prompb.Sample{sample},
// Ref: chunks.HeadSeriesRef(i),
// Labels: append(labels.Labels{{Name: "__name__", Value: name}}, extraLabels...),
})
}
return series
}
func createReducedTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Label) ([]prompb.ReducedTimeSeries, *lookupPool) {
pool := newLookupPool()
series := make([]prompb.ReducedTimeSeries, 0, numSeries)
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
sample := prompb.Sample{
Value: float64(i),
Timestamp: int64(i),
result[i] = timeSeries{
seriesLabels: labels.NewBuilder(extraLabels[0:3]).Set(labels.MetricName, name).Labels(),
timestamp: int64(i),
}
nRef := pool.intern("__name__")
vRef := pool.intern(name)
l := []prompb.LabelRef{{NameRef: nRef, ValueRef: vRef}}
rand.Shuffle(len(extraLabels), func(i, j int) {
extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i]
})
for i, v := range extraLabels {
if i > 2 {
break
switch i % 10 {
case 0, 1, 2, 3, 4, 5:
result[i].value = float64(i)
case 6:
result[i].exemplarLabels = extraLabels
result[i].value = float64(i)
case 7:
result[i].histogram = &histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
}
case 8, 9:
result[i].floatHistogram = &histogram.FloatHistogram{
Schema: 2,
ZeroThreshold: 1e-128,
}
nRef := pool.intern(v.Name)
vRef := pool.intern(v.Value)
l = append(l, prompb.LabelRef{NameRef: nRef, ValueRef: vRef})
}
series = append(series, prompb.ReducedTimeSeries{
Labels: l,
Samples: []prompb.Sample{sample},
})
}
return series, pool
return result
}
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) {
@ -1439,70 +1415,156 @@ func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) {
}
}
func createDummyTimeSeries(instances int) []timeSeries {
metrics := []labels.Labels{
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0"),
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.25"),
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.5"),
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.75"),
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "1"),
labels.FromStrings("__name__", "go_gc_duration_seconds_sum"),
labels.FromStrings("__name__", "go_gc_duration_seconds_count"),
labels.FromStrings("__name__", "go_memstats_alloc_bytes_total"),
labels.FromStrings("__name__", "go_memstats_frees_total"),
labels.FromStrings("__name__", "go_memstats_lookups_total"),
labels.FromStrings("__name__", "go_memstats_mallocs_total"),
labels.FromStrings("__name__", "go_goroutines"),
labels.FromStrings("__name__", "go_info", "version", "go1.19.3"),
labels.FromStrings("__name__", "go_memstats_alloc_bytes"),
labels.FromStrings("__name__", "go_memstats_buck_hash_sys_bytes"),
labels.FromStrings("__name__", "go_memstats_gc_sys_bytes"),
labels.FromStrings("__name__", "go_memstats_heap_alloc_bytes"),
labels.FromStrings("__name__", "go_memstats_heap_idle_bytes"),
labels.FromStrings("__name__", "go_memstats_heap_inuse_bytes"),
labels.FromStrings("__name__", "go_memstats_heap_objects"),
labels.FromStrings("__name__", "go_memstats_heap_released_bytes"),
labels.FromStrings("__name__", "go_memstats_heap_sys_bytes"),
labels.FromStrings("__name__", "go_memstats_last_gc_time_seconds"),
labels.FromStrings("__name__", "go_memstats_mcache_inuse_bytes"),
labels.FromStrings("__name__", "go_memstats_mcache_sys_bytes"),
labels.FromStrings("__name__", "go_memstats_mspan_inuse_bytes"),
labels.FromStrings("__name__", "go_memstats_mspan_sys_bytes"),
labels.FromStrings("__name__", "go_memstats_next_gc_bytes"),
labels.FromStrings("__name__", "go_memstats_other_sys_bytes"),
labels.FromStrings("__name__", "go_memstats_stack_inuse_bytes"),
labels.FromStrings("__name__", "go_memstats_stack_sys_bytes"),
labels.FromStrings("__name__", "go_memstats_sys_bytes"),
labels.FromStrings("__name__", "go_threads"),
}
commonLabels := labels.FromStrings(
"cluster", "some-cluster-0",
"container", "prometheus",
"job", "some-namespace/prometheus",
"namespace", "some-namespace")
var result []timeSeries
r := rand.New(rand.NewSource(0))
for i := 0; i < instances; i++ {
b := labels.NewBuilder(commonLabels)
b.Set("pod", "prometheus-"+strconv.Itoa(i))
for _, lbls := range metrics {
for _, l := range lbls {
b.Set(l.Name, l.Value)
}
result = append(result, timeSeries{
seriesLabels: b.Labels(),
value: r.Float64(),
})
}
}
return result
}
func BenchmarkBuildWriteRequest(b *testing.B) {
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
extraLabels := labels.Labels{
{Name: "kubernetes_io_arch", Value: "amd64"},
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
{Name: "kubernetes_io_os", Value: "linux"},
{Name: "container_name", Value: "some-name"},
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
{Name: "job", Value: "kubernetes-cadvisor"},
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
{Name: "monitor", Value: "prod"},
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
{Name: "namespace", Value: "kube-system"},
{Name: "pod_name", Value: "some-other-name-5j8s8"},
}
series := createTimeseriesProto(1, 10000, extraLabels...)
bench := func(b *testing.B, batch []timeSeries) {
b.ResetTimer()
totalSize := 0
for i := 0; i < b.N; i++ {
buf, _, _ := buildWriteRequest(series, nil, nil, nil)
totalSize += len(buf)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
buff := make([]byte, 0)
seriesBuff := make([]prompb.TimeSeries, len(batch))
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
}
pBuf := proto.NewBuffer(nil)
// Warmup buffers
for i := 0; i < 10; i++ {
populateTimeSeries(batch, seriesBuff, true, true)
buildWriteRequest(seriesBuff, nil, pBuf, &buff)
}
b.ResetTimer()
totalSize := 0
for i := 0; i < b.N; i++ {
populateTimeSeries(batch, seriesBuff, true, true)
req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, &buff)
if err != nil {
b.Fatal(err)
}
totalSize += len(req)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
}
// Do not include shutdown
b.StopTimer()
b.Run("2 instances", func(b *testing.B) {
batch := createDummyTimeSeries(2)
bench(b, batch)
})
b.Run("10 instances", func(b *testing.B) {
batch := createDummyTimeSeries(10)
bench(b, batch)
})
b.Run("1k instances", func(b *testing.B) {
batch := createDummyTimeSeries(1000)
bench(b, batch)
})
}
func BenchmarkBuildReducedWriteRequest(b *testing.B) {
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
extraLabels := labels.Labels{
{Name: "kubernetes_io_arch", Value: "amd64"},
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
{Name: "kubernetes_io_os", Value: "linux"},
{Name: "container_name", Value: "some-name"},
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
{Name: "job", Value: "kubernetes-cadvisor"},
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
{Name: "monitor", Value: "prod"},
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
{Name: "namespace", Value: "kube-system"},
{Name: "pod_name", Value: "some-other-name-5j8s8"},
}
series, pool := createReducedTimeseriesProto(1, 10000, extraLabels...)
bench := func(b *testing.B, batch []timeSeries) {
pool := newLookupPool()
pBuf := proto.NewBuffer(nil)
buff := make([]byte, 0)
seriesBuff := make([]prompb.ReducedTimeSeries, len(batch))
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
seriesBuff[i].Exemplars = []prompb.ExemplarRef{{}}
}
b.ResetTimer()
totalSize := 0
for i := 0; i < b.N; i++ {
buf, _, _ := buildReducedWriteRequest(series, pool.getTable(), nil, nil)
totalSize += len(buf)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
// Warmup buffers
for i := 0; i < 10; i++ {
populateReducedTimeSeries(pool, batch, seriesBuff, true, true)
buildReducedWriteRequest(seriesBuff, pool.getTable(), pBuf, &buff)
}
b.ResetTimer()
totalSize := 0
for i := 0; i < b.N; i++ {
populateReducedTimeSeries(pool, batch, seriesBuff, true, true)
req, _, err := buildReducedWriteRequest(seriesBuff, pool.getTable(), pBuf, &buff)
if err != nil {
b.Fatal(err)
}
pool.clear()
totalSize += len(req)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
}
// Do not include shutdown
b.StopTimer()
b.Run("2 instances", func(b *testing.B) {
batch := createDummyTimeSeries(2)
bench(b, batch)
})
b.Run("10 instances", func(b *testing.B) {
batch := createDummyTimeSeries(10)
bench(b, batch)
})
b.Run("1k instances", func(b *testing.B) {
batch := createDummyTimeSeries(1000)
bench(b, batch)
})
}