replace snappy encoding library

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Callum Styan 2023-02-20 12:20:45 -08:00 committed by Nicolás Pazos
parent b36ea85700
commit 4c4b9aa471
2 changed files with 63 additions and 2 deletions

View file

@ -24,7 +24,7 @@ import (
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/golang/snappy" reSnappy "github.com/klauspost/compress/snappy"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
@ -1643,6 +1643,6 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
if buf != nil { if buf != nil {
buf = buf[0:cap(buf)] buf = buf[0:cap(buf)]
} }
compressed := snappy.Encode(buf, pBuf.Bytes()) compressed := reSnappy.Encode(buf, pBuf.Bytes())
return compressed, highest, nil return compressed, highest, nil
} }

View file

@ -17,6 +17,7 @@ import (
"context" "context"
"fmt" "fmt"
"math" "math"
"math/rand"
"os" "os"
"runtime/pprof" "runtime/pprof"
"sort" "sort"
@ -574,6 +575,31 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([
return samples, series return samples, series
} }
func createTimeseriesProto(numSamples, numSeries int, extraLabels ...labels.Label) []prompb.TimeSeries {
// samples := make([]record.RefSample, 0, numSamples)
series := make([]prompb.TimeSeries, 0, numSeries)
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
// for j := 0; j < numSamples; j++ {
sample := prompb.Sample{
Value: float64(i),
Timestamp: int64(i),
}
// }
rand.Shuffle(len(extraLabels), func(i, j int) {
extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i]
})
series = append(series, prompb.TimeSeries{
Labels: labelsToLabelsProto(labels.Labels{{Name: "__name__", Value: name}, extraLabels[0], extraLabels[1], extraLabels[2]}, nil),
Samples: []prompb.Sample{sample},
// Ref: chunks.HeadSeriesRef(i),
// Labels: append(labels.Labels{{Name: "__name__", Value: name}}, extraLabels...),
})
}
return series
}
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) { func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) {
exemplars := make([]record.RefExemplar, 0, numExemplars) exemplars := make([]record.RefExemplar, 0, numExemplars)
series := make([]record.RefSeries, 0, numSeries) series := make([]record.RefSeries, 0, numSeries)
@ -1322,3 +1348,38 @@ func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) {
t.FailNow() t.FailNow()
} }
} }
func BenchmarkBuildWriteRequest(b *testing.B) {
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
extraLabels := labels.Labels{
{Name: "kubernetes_io_arch", Value: "amd64"},
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
{Name: "kubernetes_io_os", Value: "linux"},
{Name: "container_name", Value: "some-name"},
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
{Name: "job", Value: "kubernetes-cadvisor"},
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
{Name: "monitor", Value: "prod"},
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
{Name: "namespace", Value: "kube-system"},
{Name: "pod_name", Value: "some-other-name-5j8s8"},
}
series := createTimeseriesProto(1, 10000, extraLabels...)
b.ResetTimer()
totalSize := 0
for i := 0; i < b.N; i++ {
buf, _, _ := buildWriteRequest(series, nil, nil, nil)
totalSize += len(buf)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
// Do not include shutdown
b.StopTimer()
}