From 7a862d265f035a0f9d527c36963a5610a0e165c5 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Mon, 20 Feb 2023 12:20:45 -0800 Subject: [PATCH] replace snappy encoding library Signed-off-by: Callum Styan --- storage/remote/queue_manager.go | 4 +- storage/remote/queue_manager_test.go | 61 ++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a25c7d90c..a2541ae2f 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -24,7 +24,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" + reSnappy "github.com/klauspost/compress/snappy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "go.opentelemetry.io/otel" @@ -1643,6 +1643,6 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta if buf != nil { buf = buf[0:cap(buf)] } - compressed := snappy.Encode(buf, pBuf.Bytes()) + compressed := reSnappy.Encode(buf, pBuf.Bytes()) return compressed, highest, nil } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index a141df348..fa5e5beb4 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "math" + "math/rand" "os" "runtime/pprof" "sort" @@ -574,6 +575,31 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([ return samples, series } +func createTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Label) []prompb.TimeSeries { + // samples := make([]record.RefSample, 0, numSamples) + series := make([]prompb.TimeSeries, 0, numSeries) + for i := 0; i < numSeries; i++ { + name := fmt.Sprintf("test_metric_%d", i) + + // for j := 0; j < numSamples; j++ { + sample := prompb.Sample{ + Value: float64(i), + Timestamp: int64(i), + } + // } + rand.Shuffle(len(extraLabels), func(i, j int) { + extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i] + }) + series = append(series, prompb.TimeSeries{ + Labels: labelsToLabelsProto(labels.Labels{{Name: "__name__", Value: name}, extraLabels[0], extraLabels[1], extraLabels[2]}, nil), + Samples: []prompb.Sample{sample}, + // Ref: chunks.HeadSeriesRef(i), + // Labels: append(labels.Labels{{Name: "__name__", Value: name}}, extraLabels...), + }) + } + return series +} + func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) { exemplars := make([]record.RefExemplar, 0, numExemplars) series := make([]record.RefSeries, 0, numSeries) @@ -1322,3 +1348,38 @@ func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) { t.FailNow() } } + +func BenchmarkBuildWriteRequest(b *testing.B) { + // Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. + extraLabels := labels.Labels{ + {Name: "kubernetes_io_arch", Value: "amd64"}, + {Name: "kubernetes_io_instance_type", Value: "c3.somesize"}, + {Name: "kubernetes_io_os", Value: "linux"}, + {Name: "container_name", Value: "some-name"}, + {Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"}, + {Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"}, + {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, + {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, + {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, + {Name: "job", Value: "kubernetes-cadvisor"}, + {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, + {Name: "monitor", Value: "prod"}, + {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, + {Name: "namespace", Value: "kube-system"}, + {Name: "pod_name", Value: "some-other-name-5j8s8"}, + } + series := createTimeseriesProto(1, 10000, extraLabels...) + + b.ResetTimer() + totalSize := 0 + for i := 0; i < b.N; i++ { + buf, _, _ := buildWriteRequest(series, nil, nil, nil) + totalSize += len(buf) + b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") + + } + + // Do not include shutdown + b.StopTimer() +} +