diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 104444332..7ea8a9227 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -771,15 +771,12 @@ func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels { } func labelRefProtosToLabels(st map[uint64]string, lbls []prompb.LabelRef) labels.Labels { - result := make(labels.Labels, 0, len(lbls)) + b := labels.NewScratchBuilder(len(lbls)) for _, l := range lbls { - result = append(result, labels.Label{ - Name: st[l.NameRef], - Value: st[l.ValueRef], - }) + b.Add(st[l.NameRef], st[l.ValueRef]) } - sort.Sort(result) - return result + b.Sort() + return b.Labels() } func exemplarRefProtoToExemplar(st map[uint64]string, ep prompb.ExemplarRef) exemplar.Exemplar { @@ -806,6 +803,20 @@ func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label return result } +// labelsToLabelsRefProto transforms labels into prompb LabelRefs. The buffer slice +// will be used to avoid allocations if it is big enough to store the labels. +func labelsToLabelRefsProto(lbls labels.Labels, pool *lookupPool, buf []prompb.LabelRef) []prompb.LabelRef { + result := buf[:0] + lbls.Range(func(l labels.Label) { + result = append(result, prompb.LabelRef{ + NameRef: pool.intern(l.Name), + ValueRef: pool.intern(l.Value), + }) + }) + + return result +} + // metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum. func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_MetricType { mt := strings.ToUpper(string(t)) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 2b9a9a786..71a833943 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1412,12 +1412,12 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { return } if s.qm.internFormat { - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData) + nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) pool.clear() } else { - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) + nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) } @@ -1430,7 +1430,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { batch := queue.Batch() if len(batch) > 0 { if s.qm.internFormat { - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateReducedTimeSeries(pool, batch, pendingReducedData) + nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) @@ -1438,7 +1438,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { pool.clear() } else { - nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) + nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, @@ -1451,14 +1451,14 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } } -func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int) { +func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) { var nPendingSamples, nPendingExemplars, nPendingHistograms int for nPending, d := range batch { pendingData[nPending].Samples = pendingData[nPending].Samples[:0] - if s.qm.sendExemplars { + if sendExemplars { pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] } - if s.qm.sendNativeHistograms { + if sendNativeHistograms { pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0] } @@ -1495,7 +1495,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s // Build the WriteRequest with no metadata. // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. - req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) + req, highest, err := buildWriteRequest(samples, nil, pBuf, buf) if err == nil { err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) } @@ -1507,7 +1507,7 @@ func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.Reduce // Build the ReducedWriteRequest with no metadata. // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. - req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, *buf) + req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, buf) if err == nil { err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) } @@ -1596,14 +1596,14 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, samp return err } -func (s *shards) populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, pendingData []prompb.ReducedTimeSeries) (int, int, int) { +func populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, pendingData []prompb.ReducedTimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) { var nPendingSamples, nPendingExemplars, nPendingHistograms int for nPending, d := range batch { pendingData[nPending].Samples = pendingData[nPending].Samples[:0] - if s.qm.sendExemplars { + if sendExemplars { pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] } - if s.qm.sendNativeHistograms { + if sendNativeHistograms { pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0] } @@ -1611,12 +1611,7 @@ func (s *shards) populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // stop reading from the queue. This makes it safe to reference pendingSamples by index. // pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) - pendingData[nPending].Labels = make([]prompb.LabelRef, len(d.seriesLabels)) - for i, sl := range d.seriesLabels { - nRef := pool.intern(sl.Name) - vRef := pool.intern(sl.Value) - pendingData[nPending].Labels[i] = prompb.LabelRef{NameRef: nRef, ValueRef: vRef} - } + pendingData[nPending].Labels = labelsToLabelRefsProto(d.seriesLabels, pool, pendingData[nPending].Labels) switch d.sType { case tSample: pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ @@ -1701,7 +1696,7 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l } } -func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) { +func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) { var highest int64 for _, ts := range samples { // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. @@ -1734,13 +1729,20 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta // snappy uses len() to see if it needs to allocate a new slice. Make the // buffer as long as possible. if buf != nil { - buf = buf[0:cap(buf)] + *buf = (*buf)[0:cap(*buf)] + } else { + buf = &[]byte{} } - compressed := reSnappy.Encode(buf, pBuf.Bytes()) + compressed := reSnappy.Encode(*buf, pBuf.Bytes()) + if n := reSnappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) { + // grow the buffer for the next time + *buf = make([]byte, n) + } + return compressed, highest, nil } -func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) { +func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) { var highest int64 for _, ts := range samples { // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. @@ -1773,10 +1775,15 @@ func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uin // snappy uses len() to see if it needs to allocate a new slice. Make the // buffer as long as possible. if buf != nil { - buf = buf[0:cap(buf)] + *buf = (*buf)[0:cap(*buf)] + } else { + buf = &[]byte{} } - compressed := reSnappy.Encode(buf, pBuf.Bytes()) - + compressed := reSnappy.Encode(*buf, pBuf.Bytes()) + if n := reSnappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) { + // grow the buffer for the next time + *buf = make([]byte, n) + } return compressed, highest, nil } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 10cfbf0ad..0187db2b8 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -619,60 +619,36 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([ return samples, series } -func createTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Label) []prompb.TimeSeries { - // samples := make([]record.RefSample, 0, numSamples) - series := make([]prompb.TimeSeries, 0, numSeries) - for i := 0; i < numSeries; i++ { +func createDummyTimeseriesBatch(numSeries int, extraLabels ...labels.Label) []timeSeries { + result := make([]timeSeries, numSeries) + for i := range result { name := fmt.Sprintf("test_metric_%d", i) - - // for j := 0; j < numSamples; j++ { - sample := prompb.Sample{ - Value: float64(i), - Timestamp: int64(i), - } - // } rand.Shuffle(len(extraLabels), func(i, j int) { extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i] }) - series = append(series, prompb.TimeSeries{ - Labels: labelsToLabelsProto(labels.Labels{{Name: "__name__", Value: name}, extraLabels[0], extraLabels[1], extraLabels[2]}, nil), - Samples: []prompb.Sample{sample}, - // Ref: chunks.HeadSeriesRef(i), - // Labels: append(labels.Labels{{Name: "__name__", Value: name}}, extraLabels...), - }) - } - return series -} - -func createReducedTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Label) ([]prompb.ReducedTimeSeries, *lookupPool) { - pool := newLookupPool() - series := make([]prompb.ReducedTimeSeries, 0, numSeries) - for i := 0; i < numSeries; i++ { - name := fmt.Sprintf("test_metric_%d", i) - sample := prompb.Sample{ - Value: float64(i), - Timestamp: int64(i), + result[i] = timeSeries{ + seriesLabels: labels.NewBuilder(extraLabels[0:3]).Set(labels.MetricName, name).Labels(), + timestamp: int64(i), } - nRef := pool.intern("__name__") - vRef := pool.intern(name) - l := []prompb.LabelRef{{NameRef: nRef, ValueRef: vRef}} - rand.Shuffle(len(extraLabels), func(i, j int) { - extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i] - }) - for i, v := range extraLabels { - if i > 2 { - break + switch i % 10 { + case 0, 1, 2, 3, 4, 5: + result[i].value = float64(i) + case 6: + result[i].exemplarLabels = extraLabels + result[i].value = float64(i) + case 7: + result[i].histogram = &histogram.Histogram{ + Schema: 2, + ZeroThreshold: 1e-128, + } + case 8, 9: + result[i].floatHistogram = &histogram.FloatHistogram{ + Schema: 2, + ZeroThreshold: 1e-128, } - nRef := pool.intern(v.Name) - vRef := pool.intern(v.Value) - l = append(l, prompb.LabelRef{NameRef: nRef, ValueRef: vRef}) } - series = append(series, prompb.ReducedTimeSeries{ - Labels: l, - Samples: []prompb.Sample{sample}, - }) } - return series, pool + return result } func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) { @@ -1439,70 +1415,156 @@ func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) { } } +func createDummyTimeSeries(instances int) []timeSeries { + metrics := []labels.Labels{ + labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0"), + labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.25"), + labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.5"), + labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.75"), + labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "1"), + labels.FromStrings("__name__", "go_gc_duration_seconds_sum"), + labels.FromStrings("__name__", "go_gc_duration_seconds_count"), + labels.FromStrings("__name__", "go_memstats_alloc_bytes_total"), + labels.FromStrings("__name__", "go_memstats_frees_total"), + labels.FromStrings("__name__", "go_memstats_lookups_total"), + labels.FromStrings("__name__", "go_memstats_mallocs_total"), + labels.FromStrings("__name__", "go_goroutines"), + labels.FromStrings("__name__", "go_info", "version", "go1.19.3"), + labels.FromStrings("__name__", "go_memstats_alloc_bytes"), + labels.FromStrings("__name__", "go_memstats_buck_hash_sys_bytes"), + labels.FromStrings("__name__", "go_memstats_gc_sys_bytes"), + labels.FromStrings("__name__", "go_memstats_heap_alloc_bytes"), + labels.FromStrings("__name__", "go_memstats_heap_idle_bytes"), + labels.FromStrings("__name__", "go_memstats_heap_inuse_bytes"), + labels.FromStrings("__name__", "go_memstats_heap_objects"), + labels.FromStrings("__name__", "go_memstats_heap_released_bytes"), + labels.FromStrings("__name__", "go_memstats_heap_sys_bytes"), + labels.FromStrings("__name__", "go_memstats_last_gc_time_seconds"), + labels.FromStrings("__name__", "go_memstats_mcache_inuse_bytes"), + labels.FromStrings("__name__", "go_memstats_mcache_sys_bytes"), + labels.FromStrings("__name__", "go_memstats_mspan_inuse_bytes"), + labels.FromStrings("__name__", "go_memstats_mspan_sys_bytes"), + labels.FromStrings("__name__", "go_memstats_next_gc_bytes"), + labels.FromStrings("__name__", "go_memstats_other_sys_bytes"), + labels.FromStrings("__name__", "go_memstats_stack_inuse_bytes"), + labels.FromStrings("__name__", "go_memstats_stack_sys_bytes"), + labels.FromStrings("__name__", "go_memstats_sys_bytes"), + labels.FromStrings("__name__", "go_threads"), + } + + commonLabels := labels.FromStrings( + "cluster", "some-cluster-0", + "container", "prometheus", + "job", "some-namespace/prometheus", + "namespace", "some-namespace") + + var result []timeSeries + r := rand.New(rand.NewSource(0)) + for i := 0; i < instances; i++ { + b := labels.NewBuilder(commonLabels) + b.Set("pod", "prometheus-"+strconv.Itoa(i)) + for _, lbls := range metrics { + for _, l := range lbls { + b.Set(l.Name, l.Value) + } + result = append(result, timeSeries{ + seriesLabels: b.Labels(), + value: r.Float64(), + }) + } + } + return result +} + func BenchmarkBuildWriteRequest(b *testing.B) { - // Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. - extraLabels := labels.Labels{ - {Name: "kubernetes_io_arch", Value: "amd64"}, - {Name: "kubernetes_io_instance_type", Value: "c3.somesize"}, - {Name: "kubernetes_io_os", Value: "linux"}, - {Name: "container_name", Value: "some-name"}, - {Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"}, - {Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"}, - {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, - {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, - {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, - {Name: "job", Value: "kubernetes-cadvisor"}, - {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, - {Name: "monitor", Value: "prod"}, - {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, - {Name: "namespace", Value: "kube-system"}, - {Name: "pod_name", Value: "some-other-name-5j8s8"}, - } - series := createTimeseriesProto(1, 10000, extraLabels...) + bench := func(b *testing.B, batch []timeSeries) { - b.ResetTimer() - totalSize := 0 - for i := 0; i < b.N; i++ { - buf, _, _ := buildWriteRequest(series, nil, nil, nil) - totalSize += len(buf) - b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") + buff := make([]byte, 0) + seriesBuff := make([]prompb.TimeSeries, len(batch)) + for i := range seriesBuff { + seriesBuff[i].Samples = []prompb.Sample{{}} + seriesBuff[i].Exemplars = []prompb.Exemplar{{}} + } + pBuf := proto.NewBuffer(nil) + // Warmup buffers + for i := 0; i < 10; i++ { + populateTimeSeries(batch, seriesBuff, true, true) + buildWriteRequest(seriesBuff, nil, pBuf, &buff) + } + + b.ResetTimer() + totalSize := 0 + for i := 0; i < b.N; i++ { + populateTimeSeries(batch, seriesBuff, true, true) + req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, &buff) + if err != nil { + b.Fatal(err) + } + totalSize += len(req) + b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") + } } - // Do not include shutdown - b.StopTimer() + b.Run("2 instances", func(b *testing.B) { + batch := createDummyTimeSeries(2) + bench(b, batch) + }) + + b.Run("10 instances", func(b *testing.B) { + batch := createDummyTimeSeries(10) + bench(b, batch) + }) + + b.Run("1k instances", func(b *testing.B) { + batch := createDummyTimeSeries(1000) + bench(b, batch) + }) } func BenchmarkBuildReducedWriteRequest(b *testing.B) { - // Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. - extraLabels := labels.Labels{ - {Name: "kubernetes_io_arch", Value: "amd64"}, - {Name: "kubernetes_io_instance_type", Value: "c3.somesize"}, - {Name: "kubernetes_io_os", Value: "linux"}, - {Name: "container_name", Value: "some-name"}, - {Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"}, - {Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"}, - {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, - {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, - {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, - {Name: "job", Value: "kubernetes-cadvisor"}, - {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, - {Name: "monitor", Value: "prod"}, - {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, - {Name: "namespace", Value: "kube-system"}, - {Name: "pod_name", Value: "some-other-name-5j8s8"}, - } - series, pool := createReducedTimeseriesProto(1, 10000, extraLabels...) + bench := func(b *testing.B, batch []timeSeries) { + pool := newLookupPool() + pBuf := proto.NewBuffer(nil) + buff := make([]byte, 0) + seriesBuff := make([]prompb.ReducedTimeSeries, len(batch)) + for i := range seriesBuff { + seriesBuff[i].Samples = []prompb.Sample{{}} + seriesBuff[i].Exemplars = []prompb.ExemplarRef{{}} + } - b.ResetTimer() - totalSize := 0 - for i := 0; i < b.N; i++ { - buf, _, _ := buildReducedWriteRequest(series, pool.getTable(), nil, nil) - totalSize += len(buf) - b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") + // Warmup buffers + for i := 0; i < 10; i++ { + populateReducedTimeSeries(pool, batch, seriesBuff, true, true) + buildReducedWriteRequest(seriesBuff, pool.getTable(), pBuf, &buff) + } + b.ResetTimer() + totalSize := 0 + for i := 0; i < b.N; i++ { + populateReducedTimeSeries(pool, batch, seriesBuff, true, true) + req, _, err := buildReducedWriteRequest(seriesBuff, pool.getTable(), pBuf, &buff) + if err != nil { + b.Fatal(err) + } + pool.clear() + totalSize += len(req) + b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") + } } - // Do not include shutdown - b.StopTimer() + b.Run("2 instances", func(b *testing.B) { + batch := createDummyTimeSeries(2) + bench(b, batch) + }) + + b.Run("10 instances", func(b *testing.B) { + batch := createDummyTimeSeries(10) + bench(b, batch) + }) + + b.Run("1k instances", func(b *testing.B) { + batch := createDummyTimeSeries(1000) + bench(b, batch) + }) }