diff --git a/go.mod b/go.mod index 4c38be5f86..c98bc97bec 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/hetznercloud/hcloud-go v1.35.3 github.com/ionos-cloud/sdk-go/v6 v6.1.3 github.com/json-iterator/go v1.1.12 + github.com/klauspost/compress v1.15.12 github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b github.com/linode/linodego v1.9.3 github.com/miekg/dns v1.1.50 diff --git a/go.sum b/go.sum index 82b29d916f..a047d63863 100644 --- a/go.sum +++ b/go.sum @@ -522,6 +522,8 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM= +github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00= github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index e701cb94ba..7073131851 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -24,7 +24,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" + reSnappy "github.com/klauspost/compress/snappy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "go.opentelemetry.io/otel" @@ -1584,6 +1584,6 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta if buf != nil { buf = buf[0:cap(buf)] } - compressed := snappy.Encode(buf, pBuf.Bytes()) + compressed := reSnappy.Encode(buf, pBuf.Bytes()) return compressed, highest, nil } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 86b4e45861..1e28c033a6 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "math" + "math/rand" "os" "runtime/pprof" "sort" @@ -556,6 +557,31 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([ return samples, series } +func createTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Label) []prompb.TimeSeries { + // samples := make([]record.RefSample, 0, numSamples) + series := make([]prompb.TimeSeries, 0, numSeries) + for i := 0; i < numSeries; i++ { + name := fmt.Sprintf("test_metric_%d", i) + + // for j := 0; j < numSamples; j++ { + sample := prompb.Sample{ + Value: float64(i), + Timestamp: int64(i), + } + // } + rand.Shuffle(len(extraLabels), func(i, j int) { + extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i] + }) + series = append(series, prompb.TimeSeries{ + Labels: labelsToLabelsProto(labels.Labels{{Name: "__name__", Value: name}, extraLabels[0], extraLabels[1], extraLabels[2]}, nil), + Samples: []prompb.Sample{sample}, + // Ref: chunks.HeadSeriesRef(i), + // Labels: append(labels.Labels{{Name: "__name__", Value: name}}, extraLabels...), + }) + } + return series +} + func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) { exemplars := make([]record.RefExemplar, 0, numExemplars) series := make([]record.RefSeries, 0, numSeries) @@ -1255,3 +1281,38 @@ func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) { t.FailNow() } } + +func BenchmarkBuildWriteRequest(b *testing.B) { + // Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. + extraLabels := labels.Labels{ + {Name: "kubernetes_io_arch", Value: "amd64"}, + {Name: "kubernetes_io_instance_type", Value: "c3.somesize"}, + {Name: "kubernetes_io_os", Value: "linux"}, + {Name: "container_name", Value: "some-name"}, + {Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"}, + {Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"}, + {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, + {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, + {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, + {Name: "job", Value: "kubernetes-cadvisor"}, + {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, + {Name: "monitor", Value: "prod"}, + {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, + {Name: "namespace", Value: "kube-system"}, + {Name: "pod_name", Value: "some-other-name-5j8s8"}, + } + series := createTimeseriesProto(1, 10000, extraLabels...) + + b.ResetTimer() + totalSize := 0 + for i := 0; i < b.N; i++ { + buf, _, _ := buildWriteRequest(series, nil, nil, nil) + totalSize += len(buf) + b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") + + } + + // Do not include shutdown + b.StopTimer() +} +