diff --git a/go.mod b/go.mod index 7adaacfd76..02ce127f14 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1 github.com/alecthomas/kingpin/v2 v2.3.2 github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 + github.com/andybalholm/brotli v1.0.6 github.com/aws/aws-sdk-go v1.45.25 github.com/cespare/xxhash/v2 v2.2.0 github.com/dennwc/varint v1.0.0 diff --git a/go.sum b/go.sum index 1a08b123cb..6dad01552b 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= +github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI= +github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 80afcdd368..b62482482c 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -867,7 +867,10 @@ func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { return nil, err } - reqBuf, err := snappy.Decode(nil, compressed) + comp := createComp() + reqBuf, err := comp.Decompress(compressed) + comp.Close() + if err != nil { return nil, err } diff --git a/storage/remote/compression.go b/storage/remote/compression.go new file mode 100644 index 0000000000..369d992eba --- /dev/null +++ b/storage/remote/compression.go @@ -0,0 +1,590 @@ +package remote + +import ( + "bytes" + "compress/flate" + "compress/gzip" + "compress/lzw" + "io" + + reS2 "github.com/klauspost/compress/s2" + reSnappy "github.com/klauspost/compress/snappy" + "github.com/klauspost/compress/zstd" + reZstd "github.com/klauspost/compress/zstd" + + "github.com/andybalholm/brotli" + "github.com/golang/snappy" +) + +type Compression interface { + Compress(data []byte) ([]byte, error) + Decompress(data []byte) ([]byte, error) + Close() error +} + +// hacky globals to easily tweak the compression algorithm and run some benchmarks +type CompAlgorithm int + +var UseAlgorithm = SnappyAlt + +const ( + Snappy CompAlgorithm = iota + SnappyAlt + S2 + ZstdFast + ZstdDefault + ZstdBestComp + GzipFast + GzipComp + Lzw + FlateFast + FlateComp + BrotliFast + BrotliComp + BrotliDefault +) + +var createComp func() Compression = func() Compression { + switch UseAlgorithm { + case Snappy: + return &snappyCompression{} + case SnappyAlt: + return &snappyAltCompression{} + case S2: + return &s2Compression{} + case ZstdDefault: + return &zstdCompression{level: zstd.SpeedDefault} + case ZstdFast: + return &zstdCompression{level: zstd.SpeedFastest} + case ZstdBestComp: + return &zstdCompression{level: zstd.SpeedBestCompression} + case GzipFast: + return &gzipCompression{level: gzip.BestSpeed} + case GzipComp: + return &gzipCompression{level: gzip.BestCompression} + case Lzw: + return &lzwCompression{} + case FlateFast: + return &flateCompression{level: flate.BestSpeed} + case FlateComp: + return &flateCompression{level: flate.BestCompression} + case BrotliFast: + return &brotliCompression{quality: brotli.BestSpeed} + case BrotliDefault: + return &brotliCompression{quality: brotli.DefaultCompression} + case BrotliComp: + return &brotliCompression{quality: brotli.BestCompression} + default: + panic("unknown compression algorithm") + } +} + +type noopCompression struct{} + +func (n *noopCompression) Compress(data []byte) ([]byte, error) { + return data, nil +} + +func (n *noopCompression) Decompress(data []byte) ([]byte, error) { + return data, nil +} + +func (n *noopCompression) Close() error { + return nil +} + +type snappyCompression struct { + buf []byte +} + +func (s *snappyCompression) Compress(data []byte) ([]byte, error) { + s.buf = s.buf[0:cap(s.buf)] + compressed := snappy.Encode(s.buf, data) + if n := snappy.MaxEncodedLen(len(data)); n > len(s.buf) { + s.buf = make([]byte, n) + } + return compressed, nil +} +func (s *snappyCompression) Decompress(data []byte) ([]byte, error) { + uncompressed, err := snappy.Decode(nil, data) + return uncompressed, err +} +func (s *snappyCompression) Close() error { + return nil +} + +type snappyAltCompression struct { + buf []byte +} + +func (s *snappyAltCompression) Compress(data []byte) ([]byte, error) { + s.buf = s.buf[0:cap(s.buf)] + res := reSnappy.Encode(s.buf, data) + if n := reSnappy.MaxEncodedLen(len(data)); n > len(s.buf) { + s.buf = make([]byte, n) + } + return res, nil +} +func (s *snappyAltCompression) Decompress(data []byte) ([]byte, error) { + uncompressed, err := reSnappy.Decode(nil, data) + return uncompressed, err +} +func (s *snappyAltCompression) Close() error { + return nil +} + +type s2Compression struct { + buf []byte +} + +func (s *s2Compression) Compress(data []byte) ([]byte, error) { + res := reS2.Encode(s.buf, data) + if n := reS2.MaxEncodedLen(len(data)); n > len(s.buf) { + s.buf = make([]byte, n) + } + return res, nil +} + +func (s *s2Compression) Decompress(data []byte) ([]byte, error) { + uncompressed, err := reS2.Decode(nil, data) + return uncompressed, err +} + +func (s *s2Compression) Close() error { + return nil +} + +type zstdCompression struct { + level zstd.EncoderLevel + buf []byte + w *reZstd.Encoder +} + +func (z *zstdCompression) Compress(data []byte) ([]byte, error) { + var err error + if z.w == nil { + // TODO: should be initialized on creation + z.w, err = reZstd.NewWriter(nil, reZstd.WithEncoderLevel(reZstd.EncoderLevel(z.level))) + } + if err != nil { + return nil, err + } + + z.buf = z.buf[:0] + writer := bytes.NewBuffer(z.buf) + if err != nil { + return nil, err + } + z.w.Reset(writer) + + z.w.Write(data) + err = z.w.Close() + if err != nil { + return nil, err + } + res := writer.Bytes() + if len(res) > cap(z.buf) { + z.buf = res + } + return res, nil +} + +func (z *zstdCompression) Decompress(data []byte) ([]byte, error) { + reader := bytes.NewReader(data) + decoder, err := reZstd.NewReader(reader) + if err != nil { + return nil, err + } + defer decoder.Close() + return io.ReadAll(decoder) +} + +func (z *zstdCompression) Close() error { + if z.w != nil { + return z.w.Close() + } + return nil +} + +type gzipCompression struct { + level int + buf []byte + w *gzip.Writer +} + +func (g *gzipCompression) Compress(data []byte) ([]byte, error) { + var err error + if g.w == nil { + g.w, err = gzip.NewWriterLevel(nil, g.level) + } + if err != nil { + return nil, err + } + g.buf = g.buf[:0] + buf := bytes.NewBuffer(g.buf) + g.w.Reset(buf) + _, err = g.w.Write(data) + if err != nil { + return nil, err + } + err = g.w.Close() + if err != nil { + return nil, err + } + if len(buf.Bytes()) > cap(g.buf) { + g.buf = buf.Bytes() + } + return buf.Bytes(), nil +} + +func (g *gzipCompression) Decompress(data []byte) ([]byte, error) { + r := bytes.NewReader(data) + gzReader, err := gzip.NewReader(r) + if err != nil { + return nil, err + } + defer gzReader.Close() + decompressedData, err := io.ReadAll(gzReader) + if err != nil { + return nil, err + } + return decompressedData, nil +} + +func (g *gzipCompression) Close() error { + return nil +} + +type lzwCompression struct { + buf []byte + w *lzw.Writer +} + +func (l *lzwCompression) Compress(data []byte) ([]byte, error) { + if l.w == nil { + l.w = lzw.NewWriter(nil, lzw.LSB, 8).(*lzw.Writer) + } + compressed := bytes.NewBuffer(l.buf) + l.w.Reset(compressed, lzw.LSB, 8) + _, err := l.w.Write(data) + if err != nil { + return nil, err + } + err = l.w.Close() + if err != nil { + return nil, err + } + if len(compressed.Bytes()) > cap(l.buf) { + l.buf = compressed.Bytes() + } + return compressed.Bytes(), nil +} + +func (l *lzwCompression) Decompress(data []byte) ([]byte, error) { + reader := bytes.NewReader(data) + r := lzw.NewReader(reader, lzw.LSB, 8) + defer r.Close() + return io.ReadAll(r) +} + +func (l *lzwCompression) Close() error { + return nil +} + +type flateCompression struct { + level int + buf []byte + w *flate.Writer +} + +func (f *flateCompression) Compress(data []byte) ([]byte, error) { + var err error + if f.w == nil { + f.w, err = flate.NewWriter(nil, f.level) + } + if err != nil { + return nil, err + } + + if err != nil { + return nil, err + } + f.buf = f.buf[:0] + compressed := bytes.NewBuffer(f.buf) + f.w.Reset(compressed) + _, err = f.w.Write(data) + if err != nil { + return nil, err + } + err = f.w.Close() + if err != nil { + return nil, err + } + if len(compressed.Bytes()) > cap(f.buf) { + f.buf = compressed.Bytes() + } + return compressed.Bytes(), nil +} + +func (f *flateCompression) Decompress(data []byte) ([]byte, error) { + reader := bytes.NewReader(data) + r := flate.NewReader(reader) + defer r.Close() + return io.ReadAll(r) +} + +func (f *flateCompression) Close() error { + return f.w.Close() +} + +type brotliCompression struct { + quality int + buf []byte + w *brotli.Writer +} + +func (b *brotliCompression) Compress(data []byte) ([]byte, error) { + if b.w == nil { + b.w = brotli.NewWriterLevel(nil, b.quality) + } + + b.buf = (b.buf)[:0] + compressed := bytes.NewBuffer(b.buf) + b.w.Reset(compressed) + _, err := b.w.Write(data) + if err != nil { + return nil, err + } + err = b.w.Flush() + if err != nil { + return nil, err + } + if len(compressed.Bytes()) > cap(b.buf) { + b.buf = compressed.Bytes() + } + return compressed.Bytes(), nil +} + +func (b *brotliCompression) Decompress(data []byte) ([]byte, error) { + reader := bytes.NewReader(data) + r := brotli.NewReader(reader) + return io.ReadAll(r) +} + +func (b *brotliCompression) Close() error { + return nil +} + +// func compressSnappy(bytes []byte, buf *[]byte) ([]byte, error) { +// // snappy uses len() to see if it needs to allocate a new slice. Make the +// // buffer as long as possible. +// *buf = (*buf)[0:cap(*buf)] +// compressed := snappy.Encode(*buf, bytes) +// if n := snappy.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) { +// // grow the buffer for the next time +// *buf = make([]byte, n) +// } + +// return compressed, nil +// } + +// func compressSnappyAlt(bytes []byte, buf *[]byte) ([]byte, error) { +// res := reSnappy.Encode(*buf, bytes) +// if n := reSnappy.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) { +// // grow the buffer for the next time +// *buf = make([]byte, n) +// } +// return res, nil +// } + +// func compressS2(bytes []byte, buf *[]byte) ([]byte, error) { +// res := reS2.Encode(*buf, bytes) +// if n := reS2.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) { +// // grow the buffer for the next time +// *buf = make([]byte, n) +// } +// return res, nil +// } + +// func compressZstdWithLevel(level reZstd.EncoderLevel) func(data []byte, buf *[]byte) ([]byte, error) { +// // TODO: use a pool or something. just testing for now +// encoder, err := reZstd.NewWriter(nil, reZstd.WithEncoderLevel(level)) +// return func(data []byte, buf *[]byte) ([]byte, error) { +// if err != nil { +// return nil, err +// } +// *buf = (*buf)[:0] +// writer := bytes.NewBuffer(*buf) +// if err != nil { +// return nil, err +// } +// encoder.Reset(writer) +// encoder.Write(data) +// err = encoder.Close() +// if err != nil { +// return nil, err +// } +// res := writer.Bytes() +// if len(res) > cap(*buf) { +// *buf = res +// } +// return res, nil +// } +// } + +// func compressGzipWithLevel(level int) func([]byte, *[]byte) ([]byte, error) { +// // TODO: use a pool or something. just testing for now +// gzWriter, err := gzip.NewWriterLevel(nil, level) + +// return func(data []byte, buf2 *[]byte) ([]byte, error) { +// if err != nil { +// return nil, err +// } +// *buf2 = (*buf2)[:0] +// buf := bytes.NewBuffer(*buf2) +// gzWriter.Reset(buf) +// _, err = gzWriter.Write(data) +// if err != nil { +// return nil, err +// } +// err = gzWriter.Close() +// if err != nil { +// return nil, err +// } + +// if len(buf.Bytes()) > cap(*buf2) { +// *buf2 = buf.Bytes() +// } + +// return buf.Bytes(), nil +// } +// } + +// func compressLzw() func(data []byte, buf *[]byte) ([]byte, error) { +// writer := lzw.NewWriter(nil, lzw.LSB, 8).(*lzw.Writer) + +// return func(data []byte, buf *[]byte) ([]byte, error) { +// compressed := bytes.NewBuffer(*buf) +// writer.Reset(compressed, lzw.LSB, 8) +// _, err := writer.Write(data) +// if err != nil { +// return nil, err +// } +// err = writer.Close() +// if err != nil { +// return nil, err +// } +// if len(compressed.Bytes()) > cap(*buf) { +// *buf = compressed.Bytes() +// } +// return compressed.Bytes(), nil +// } +// } + +// func compressFlateWithLevel(level int) func(data []byte, buf *[]byte) ([]byte, error) { +// writer, err := flate.NewWriter(nil, level) + +// return func(data []byte, buf *[]byte) ([]byte, error) { +// if err != nil { +// return nil, err +// } +// *buf = (*buf)[:0] +// compressed := bytes.NewBuffer(*buf) +// writer.Reset(compressed) +// _, err = writer.Write(data) +// if err != nil { +// return nil, err +// } +// err = writer.Close() +// if err != nil { +// return nil, err +// } +// if len(compressed.Bytes()) > cap(*buf) { +// *buf = compressed.Bytes() +// } +// return compressed.Bytes(), nil +// } +// } + +// func compressBrotliWithQuality(q int) func(data []byte, _ *[]byte) ([]byte, error) { +// writer := brotli.NewWriterLevel(nil, q) +// return func(data []byte, buf *[]byte) ([]byte, error) { +// // return brotli.Encode(data, brotli.WriterOptions{Quality: q}) +// *buf = (*buf)[:0] +// compressed := bytes.NewBuffer(*buf) +// writer.Reset(compressed) +// _, err := writer.Write(data) +// if err != nil { +// return nil, err +// } +// err = writer.Flush() +// if err != nil { +// return nil, err +// } +// if len(compressed.Bytes()) > cap(*buf) { +// *buf = compressed.Bytes() +// } +// return compressed.Bytes(), nil +// } + +// } + +// func decompressBrotli(compressed []byte) ([]byte, error) { +// reader := bytes.NewReader(compressed) +// r := brotli.NewReader(reader) +// return io.ReadAll(r) +// } + +// func decompressFlate(compressed []byte) ([]byte, error) { +// reader := bytes.NewReader(compressed) +// r := flate.NewReader(reader) +// defer r.Close() +// return io.ReadAll(r) +// } + +// func decompressLzw(compressed []byte) ([]byte, error) { +// reader := bytes.NewReader(compressed) +// r := lzw.NewReader(reader, lzw.LSB, 8) +// defer r.Close() +// return io.ReadAll(r) +// } + +// func decompressSnappy(compressed []byte) ([]byte, error) { +// uncompressed, err := snappy.Decode(nil, compressed) +// return uncompressed, err +// } + +// func decompressSnappyAlt(compressed []byte) ([]byte, error) { +// uncompressed, err := reSnappy.Decode(nil, compressed) +// return uncompressed, err +// } + +// func decompressS2(compressed []byte) ([]byte, error) { +// uncompressed, err := reS2.Decode(nil, compressed) +// return uncompressed, err +// } + +// func decompressZstd(compressed []byte) ([]byte, error) { +// reader := bytes.NewReader(compressed) +// decoder, err := reZstd.NewReader(reader) +// if err != nil { +// return nil, err +// } +// defer decoder.Close() +// return io.ReadAll(decoder) +// } + +// func decompressGzip(compressed []byte) ([]byte, error) { +// r := bytes.NewReader(compressed) +// gzReader, err := gzip.NewReader(r) +// if err != nil { +// return nil, err +// } +// defer gzReader.Close() +// decompressedData, err := io.ReadAll(gzReader) +// if err != nil { +// return nil, err +// } + +// return decompressedData, nil +// } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 9ca4ebcbe7..3a4c5647f1 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -26,7 +26,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "go.opentelemetry.io/otel" @@ -545,7 +545,9 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { // Build the WriteRequest with no samples. - req, _, err := buildWriteRequest(nil, metadata, pBuf, nil) + comp := createComp() + req, _, err := buildWriteRequest(nil, metadata, pBuf, comp) + comp.Close() if err != nil { return err } @@ -1367,7 +1369,9 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { pBuf = proto.NewBuffer(nil) pBufRaw []byte buf []byte + comp = createComp() ) + defer comp.Close() if s.qm.sendExemplars { max += int(float64(max) * 0.1) } @@ -1442,7 +1446,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) symbolTable.clear() } - queue.ReturnForReuse(batch) stop() @@ -1455,7 +1458,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { case Base1: nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, comp) level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) case Min32Optimized: @@ -1519,13 +1522,12 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen } return nPendingSamples, nPendingExemplars, nPendingHistograms } - -func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { +func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, comp Compression) { begin := time.Now() // Build the WriteRequest with no metadata. // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. - req, highest, err := buildWriteRequest(samples, nil, pBuf, buf) + req, highest, err := buildWriteRequest(samples, nil, pBuf, comp) if err == nil { err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) } @@ -1768,7 +1770,15 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l } } -func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) { +func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, comp Compression) ([]byte, int64, error) { + return buildWriteRequestWithCompression(samples, metadata, pBuf, comp) +} + +func buildWriteRequestWithCompression(samples []prompb.TimeSeries, + metadata []prompb.MetricMetadata, + pBuf *proto.Buffer, + compressor Compression, +) ([]byte, int64, error) { var highest int64 for _, ts := range samples { // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. @@ -1810,8 +1820,7 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta // grow the buffer for the next time *buf = make([]byte, n) } - - return compressed, highest, nil + return compressed, highest, err } type offLenPair struct { @@ -1819,6 +1828,48 @@ type offLenPair struct { Len uint32 } +func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, comp Compression) ([]byte, int64, error) { + return buildReducedWriteRequestWithCompression(samples, labels, pBuf, comp) +} + +func buildReducedWriteRequestWithCompression(samples []prompb.ReducedTimeSeries, + labels map[uint64]string, + pBuf *proto.Buffer, + compress Compression, +) ([]byte, int64, error) { + var highest int64 + for _, ts := range samples { + // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. + if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { + highest = ts.Samples[0].Timestamp + } + if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { + highest = ts.Exemplars[0].Timestamp + } + if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { + highest = ts.Histograms[0].Timestamp + } + } + + req := &prompb.WriteRequestWithRefs{ + StringSymbolTable: labels, + Timeseries: samples, + } + + if pBuf == nil { + pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient. + } else { + pBuf.Reset() + } + err := pBuf.Marshal(req) + if err != nil { + return nil, 0, err + } + + compressed, err := compress.Compress(pBuf.Bytes()) + return compressed, highest, err +} + type rwSymbolTable struct { symbols []byte symbolsMap map[string]offLenPair diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index dbd53a69d2..80e6304ae2 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -14,6 +14,9 @@ package remote import ( + "bytes" + "compress/flate" + "compress/gzip" "context" "fmt" "math" @@ -30,6 +33,7 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" @@ -1447,7 +1451,8 @@ func createDummyTimeSeries(instances int) []timeSeries { func BenchmarkBuildWriteRequest(b *testing.B) { bench := func(b *testing.B, batch []timeSeries) { - buff := make([]byte, 0) + + comp := &snappyCompression{} seriesBuff := make([]prompb.TimeSeries, len(batch)) for i := range seriesBuff { seriesBuff[i].Samples = []prompb.Sample{{}} @@ -1458,14 +1463,14 @@ func BenchmarkBuildWriteRequest(b *testing.B) { // Warmup buffers for i := 0; i < 10; i++ { populateTimeSeries(batch, seriesBuff, true, true) - buildWriteRequest(seriesBuff, nil, pBuf, &buff) + buildWriteRequest(seriesBuff, nil, pBuf, comp) } b.ResetTimer() totalSize := 0 for i := 0; i < b.N; i++ { populateTimeSeries(batch, seriesBuff, true, true) - req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, &buff) + req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, comp) if err != nil { b.Fatal(err) } @@ -1492,6 +1497,7 @@ func BenchmarkBuildWriteRequest(b *testing.B) { } func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { + type testcase struct { batch []timeSeries } @@ -1502,7 +1508,6 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { } for _, tc := range testCases { symbolTable := newRwSymbolTable() - buff := make([]byte, 0) seriesBuff := make([]prompb.MinimizedTimeSeries, len(tc.batch)) for i := range seriesBuff { seriesBuff[i].Samples = []prompb.Sample{{}} @@ -1532,3 +1537,154 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { }) } } +func makeUncompressedReducedWriteRequestBenchData(b *testing.B) []byte { + data := createDummyTimeSeries(1000) + pool := newLookupPool() + pBuf := proto.NewBuffer(nil) + seriesBuff := make([]prompb.ReducedTimeSeries, len(data)) + for i := range seriesBuff { + seriesBuff[i].Samples = []prompb.Sample{{}} + seriesBuff[i].Exemplars = []prompb.ExemplarRef{{}} + } + + populateReducedTimeSeries(pool, data, seriesBuff, true, true) + res, _, err := buildReducedWriteRequestWithCompression(seriesBuff, pool.getTable(), pBuf, &noopCompression{}) + if err != nil { + b.Fatal(err) + } + return res +} + +func makeUncompressedWriteRequestBenchData(b *testing.B) []byte { + data := createDummyTimeSeries(1000) + seriesBuff := make([]prompb.TimeSeries, len(data)) + for i := range seriesBuff { + seriesBuff[i].Samples = []prompb.Sample{{}} + seriesBuff[i].Exemplars = []prompb.Exemplar{{}} + } + pBuf := proto.NewBuffer(nil) + + populateTimeSeries(data, seriesBuff, true, true) + res, _, err := buildWriteRequestWithCompression(seriesBuff, nil, pBuf, &noopCompression{}) + if err != nil { + b.Fatal(err) + } + return res +} + +func BenchmarkCompressWriteRequest(b *testing.B) { + uncompV1 := makeUncompressedWriteRequestBenchData(b) + uncompV11 := makeUncompressedReducedWriteRequestBenchData(b) + // buf := make([]byte, 0) + + bench := func(b *testing.B, data []byte, comp Compression) { + b.ResetTimer() + totalSize := 0 + var res []byte + var err error + for i := 0; i < b.N; i++ { + res, err = comp.Compress(data) + if err != nil { + b.Fatal(err) + } + totalSize += len(res) + b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") + } + b.StopTimer() + // sanity check + res, err = comp.Decompress(res) + if err != nil { + b.Fatal(err) + } + if !bytes.Equal(res, data) { + b.Fatalf("decompressed data doesn't match original") + } + } + + cases := []struct { + name string + data []byte + comp Compression + }{ + {"v1-go-snappy", uncompV1, &snappyCompression{}}, + {"v1-snappy", uncompV1, &snappyAltCompression{}}, + {"v1-s2", uncompV1, &s2Compression{}}, + {"v1-ZstdFastest", uncompV1, &zstdCompression{level: zstd.SpeedFastest}}, + {"v1-ZstdSpeedDef", uncompV1, &zstdCompression{level: zstd.SpeedDefault}}, + {"v1-ZstdBestComp", uncompV1, &zstdCompression{level: zstd.SpeedBestCompression}}, + {"v1-GzipBestComp", uncompV1, &gzipCompression{level: gzip.BestCompression}}, + {"v1-GzipBestSpeed", uncompV1, &gzipCompression{level: gzip.BestSpeed}}, + {"v1-GzipDefault", uncompV1, &gzipCompression{level: gzip.DefaultCompression}}, + {"v1-Lzw", uncompV1, &lzwCompression{}}, + {"v1-FlateBestComp", uncompV1, &flateCompression{level: flate.BestCompression}}, + {"v1-FlateBestSpeed", uncompV1, &flateCompression{level: flate.BestSpeed}}, + {"v1-Brotli-1", uncompV1, &brotliCompression{quality: 1}}, + {"v1-Brotli-11", uncompV1, &brotliCompression{quality: 1}}, + {"v1-Brotli-5", uncompV1, &brotliCompression{quality: 5}}, + + {"v1.1-go-snappy", uncompV11, &snappyCompression{}}, + {"v1.1-snappy", uncompV11, &snappyAltCompression{}}, + {"v1.1-s2", uncompV11, &s2Compression{}}, + {"v1.1-ZstdFastest", uncompV11, &zstdCompression{level: zstd.SpeedFastest}}, + {"v1.1-ZstdSpeedDef", uncompV11, &zstdCompression{level: zstd.SpeedDefault}}, + {"v1.1-ZstdBestComp", uncompV11, &zstdCompression{level: zstd.SpeedBestCompression}}, + {"v1.1-GzipBestComp", uncompV11, &gzipCompression{level: gzip.BestCompression}}, + {"v1.1-GzipBestSpeed", uncompV11, &gzipCompression{level: gzip.BestSpeed}}, + {"v1.1-GzipDefault", uncompV11, &gzipCompression{level: gzip.DefaultCompression}}, + {"v1.1-Lzw", uncompV11, &lzwCompression{}}, + {"v1.1-FlateBestComp", uncompV11, &flateCompression{level: flate.BestCompression}}, + {"v1.1-FlateBestSpeed", uncompV11, &flateCompression{level: flate.BestSpeed}}, + {"v1.1-Brotli-1", uncompV11, &brotliCompression{quality: 1}}, + {"v1.1-Brotli-11", uncompV11, &brotliCompression{quality: 1}}, + {"v1.1-Brotli-5", uncompV11, &brotliCompression{quality: 5}}, + } + + // Warmup buffers + for _, c := range cases { + bench(b, c.data, c.comp) + } + + for _, c := range cases { + b.Run(c.name, func(b *testing.B) { + bench(b, c.data, c.comp) + }) + } +} + +// func BenchmarkDecompressWriteRequestGoSnappy(b *testing.B) { +// uncomp := makeUncompressedWriteRequestBenchData(b) +// buf, _ := compressSnappy(uncomp, &[]byte{}) +// b.ResetTimer() +// totalSize := 0 +// for i := 0; i < b.N; i++ { +// dbuf, _ := decompressSnappy(buf) +// totalSize += len(dbuf) +// b.ReportMetric(float64(totalSize)/float64(b.N), "decompressedSize/op") + +// } +// } +// func BenchmarkDecompressWriteRequestSnappy(b *testing.B) { +// uncomp := makeUncompressedWriteRequestBenchData(b) +// buf, _ := compressSnappyAlt(uncomp, &[]byte{}) + +// b.ResetTimer() +// totalSize := 0 +// for i := 0; i < b.N; i++ { +// dbuf, _ := decompressSnappyAlt(buf) +// totalSize += len(dbuf) +// b.ReportMetric(float64(totalSize)/float64(b.N), "decompressedSize/op") + +// } +// } +// func BenchmarkDecompressWriteRequestS2(b *testing.B) { +// uncomp := makeUncompressedWriteRequestBenchData(b) +// buf, _ := compressSnappyAlt(uncomp, &[]byte{}) +// b.ResetTimer() +// totalSize := 0 +// for i := 0; i < b.N; i++ { +// dbuf, _ := decompressS2(buf) +// totalSize += len(dbuf) +// b.ReportMetric(float64(totalSize)/float64(b.N), "decompressedSize/op") + +// } +// }