From 8a4e5769fc4f94132dd4a1098c16e5762420268b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Pazos?= Date: Wed, 1 Nov 2023 19:36:43 -0300 Subject: [PATCH] Remove gzip option and various alloc optimizations --- storage/remote/codec.go | 3 +- storage/remote/compression.go | 386 ++++----------------------- storage/remote/compression_test.go | 34 +-- storage/remote/queue_manager_test.go | 10 +- 4 files changed, 69 insertions(+), 364 deletions(-) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index d9770ad1cd..61beda193f 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -867,7 +867,8 @@ func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { return nil, err } - comp := createComp() + comp := GetPooledComp() + defer PutPooledComp(comp) reqBuf, err := comp.Decompress(compressed) if err != nil { diff --git a/storage/remote/compression.go b/storage/remote/compression.go index ac897cc90d..4895208e20 100644 --- a/storage/remote/compression.go +++ b/storage/remote/compression.go @@ -3,7 +3,6 @@ package remote import ( "bytes" "compress/flate" - "compress/gzip" "compress/lzw" "io" "sync" @@ -34,8 +33,6 @@ const ( ZstdFast ZstdDefault ZstdBestComp - GzipFast - GzipComp Lzw FlateFast FlateComp @@ -73,10 +70,6 @@ var createComp func() Compression = func() Compression { return &zstdCompression{level: zstd.SpeedFastest} case ZstdBestComp: return &zstdCompression{level: zstd.SpeedBestCompression} - case GzipFast: - return &gzipCompression{level: gzip.BestSpeed} - case GzipComp: - return &gzipCompression{level: gzip.BestCompression} case Lzw: return &lzwCompression{} case FlateFast: @@ -170,32 +163,15 @@ func (s *s2Compression) Decompress(data []byte) ([]byte, error) { type zstdCompression struct { level zstd.EncoderLevel buf []byte - w *reZstd.Encoder } func (z *zstdCompression) Compress(data []byte) ([]byte, error) { - var err error - if z.w == nil { - // TODO: should be initialized on creation - z.w, err = reZstd.NewWriter(nil, reZstd.WithEncoderLevel(reZstd.EncoderLevel(z.level))) - } + w, err := reZstd.NewWriter(nil, reZstd.WithEncoderLevel(z.level)) if err != nil { return nil, err } - - z.buf = z.buf[:0] - writer := bytes.NewBuffer(z.buf) - if err != nil { - return nil, err - } - z.w.Reset(writer) - - z.w.Write(data) - err = z.w.Close() - if err != nil { - return nil, err - } - res := writer.Bytes() + z.buf = z.buf[0:cap(z.buf)] + res := w.EncodeAll(data, z.buf) if len(res) > cap(z.buf) { z.buf = res } @@ -218,75 +194,18 @@ func (z *zstdCompression) Decompress(data []byte) ([]byte, error) { return buf, nil } -type gzipCompression struct { - level int - buf []byte - w *gzip.Writer -} - -func (g *gzipCompression) Compress(data []byte) ([]byte, error) { - var err error - if g.w == nil { - g.w, err = gzip.NewWriterLevel(nil, g.level) - } - if err != nil { - return nil, err - } - g.buf = g.buf[:0] - buf := bytes.NewBuffer(g.buf) - g.w.Reset(buf) - _, err = g.w.Write(data) - if err != nil { - return nil, err - } - err = g.w.Close() - if err != nil { - return nil, err - } - if len(buf.Bytes()) > cap(g.buf) { - g.buf = buf.Bytes() - } - return buf.Bytes(), nil -} - -func (g *gzipCompression) Decompress(data []byte) ([]byte, error) { - r := bytes.NewReader(data) - gzReader, err := gzip.NewReader(r) - if err != nil { - return nil, err - } - defer gzReader.Close() - decompressedData, err := io.ReadAll(gzReader) - if err != nil { - return nil, err - } - return decompressedData, nil - - // TODO: debug this - // r := bytes.NewReader(data) - // var err error - // if g.r == nil { - // g.r, err = gzip.NewReader(r) - // if err != nil { - // return nil, err - // } - // } - // g.r.Reset(r) - // defer g.r.Close() - // return io.ReadAll(g.r) -} - type lzwCompression struct { - buf []byte w *lzw.Writer + r *lzw.Reader + buf bytes.Buffer } func (l *lzwCompression) Compress(data []byte) ([]byte, error) { if l.w == nil { l.w = lzw.NewWriter(nil, lzw.LSB, 8).(*lzw.Writer) } - compressed := bytes.NewBuffer(l.buf) - l.w.Reset(compressed, lzw.LSB, 8) + l.buf.Reset() + l.w.Reset(&l.buf, lzw.LSB, 8) _, err := l.w.Write(data) if err != nil { return nil, err @@ -295,40 +214,39 @@ func (l *lzwCompression) Compress(data []byte) ([]byte, error) { if err != nil { return nil, err } - if len(compressed.Bytes()) > cap(l.buf) { - l.buf = compressed.Bytes() - } - return compressed.Bytes(), nil + return l.buf.Bytes(), nil } func (l *lzwCompression) Decompress(data []byte) ([]byte, error) { - reader := bytes.NewReader(data) - r := lzw.NewReader(reader, lzw.LSB, 8) - defer r.Close() - return io.ReadAll(r) + if l.r == nil { + l.r = lzw.NewReader(nil, lzw.LSB, 8).(*lzw.Reader) + } + l.r.Reset(bytes.NewReader(data), lzw.LSB, 8) + l.buf.Reset() + _, err := io.Copy(&l.buf, l.r) + if err != nil { + return nil, err + } + return l.buf.Bytes(), nil } type flateCompression struct { level int - buf []byte + buf bytes.Buffer w *flate.Writer + r io.ReadCloser } func (f *flateCompression) Compress(data []byte) ([]byte, error) { var err error if f.w == nil { f.w, err = flate.NewWriter(nil, f.level) + if err != nil { + return nil, err + } } - if err != nil { - return nil, err - } - - if err != nil { - return nil, err - } - f.buf = f.buf[:0] - compressed := bytes.NewBuffer(f.buf) - f.w.Reset(compressed) + f.buf.Reset() + f.w.Reset(&f.buf) _, err = f.w.Write(data) if err != nil { return nil, err @@ -337,22 +255,26 @@ func (f *flateCompression) Compress(data []byte) ([]byte, error) { if err != nil { return nil, err } - if len(compressed.Bytes()) > cap(f.buf) { - f.buf = compressed.Bytes() - } - return compressed.Bytes(), nil + return f.buf.Bytes(), nil } func (f *flateCompression) Decompress(data []byte) ([]byte, error) { - reader := bytes.NewReader(data) - r := flate.NewReader(reader) - defer r.Close() - return io.ReadAll(r) + if f.r == nil { + f.r = flate.NewReader(nil) + } + f.r.(flate.Resetter).Reset(bytes.NewReader(data), nil) + defer f.r.Close() + f.buf.Reset() + _, err := io.Copy(&f.buf, f.r) + if err != nil { + return nil, err + } + return f.buf.Bytes(), nil } type brotliCompression struct { quality int - buf []byte + buf bytes.Buffer w *brotli.Writer r *brotli.Reader } @@ -361,10 +283,8 @@ func (b *brotliCompression) Compress(data []byte) ([]byte, error) { if b.w == nil { b.w = brotli.NewWriterLevel(nil, b.quality) } - - b.buf = (b.buf)[:0] - compressed := bytes.NewBuffer(b.buf) - b.w.Reset(compressed) + b.buf.Reset() + b.w.Reset(&b.buf) _, err := b.w.Write(data) if err != nil { return nil, err @@ -373,230 +293,18 @@ func (b *brotliCompression) Compress(data []byte) ([]byte, error) { if err != nil { return nil, err } - if len(compressed.Bytes()) > cap(b.buf) { - b.buf = compressed.Bytes() - } - return compressed.Bytes(), nil + return b.buf.Bytes(), nil } func (b *brotliCompression) Decompress(data []byte) ([]byte, error) { if b.r == nil { b.r = brotli.NewReader(nil) } + b.buf.Reset() b.r.Reset(bytes.NewReader(data)) - return io.ReadAll(b.r) + _, err := io.Copy(&b.buf, b.r) + if err != nil { + return nil, err + } + return b.buf.Bytes(), nil } - -// func compressSnappy(bytes []byte, buf *[]byte) ([]byte, error) { -// // snappy uses len() to see if it needs to allocate a new slice. Make the -// // buffer as long as possible. -// *buf = (*buf)[0:cap(*buf)] -// compressed := snappy.Encode(*buf, bytes) -// if n := snappy.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) { -// // grow the buffer for the next time -// *buf = make([]byte, n) -// } - -// return compressed, nil -// } - -// func compressSnappyAlt(bytes []byte, buf *[]byte) ([]byte, error) { -// res := reSnappy.Encode(*buf, bytes) -// if n := reSnappy.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) { -// // grow the buffer for the next time -// *buf = make([]byte, n) -// } -// return res, nil -// } - -// func compressS2(bytes []byte, buf *[]byte) ([]byte, error) { -// res := reS2.Encode(*buf, bytes) -// if n := reS2.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) { -// // grow the buffer for the next time -// *buf = make([]byte, n) -// } -// return res, nil -// } - -// func compressZstdWithLevel(level reZstd.EncoderLevel) func(data []byte, buf *[]byte) ([]byte, error) { -// // TODO: use a pool or something. just testing for now -// encoder, err := reZstd.NewWriter(nil, reZstd.WithEncoderLevel(level)) -// return func(data []byte, buf *[]byte) ([]byte, error) { -// if err != nil { -// return nil, err -// } -// *buf = (*buf)[:0] -// writer := bytes.NewBuffer(*buf) -// if err != nil { -// return nil, err -// } -// encoder.Reset(writer) -// encoder.Write(data) -// err = encoder.Close() -// if err != nil { -// return nil, err -// } -// res := writer.Bytes() -// if len(res) > cap(*buf) { -// *buf = res -// } -// return res, nil -// } -// } - -// func compressGzipWithLevel(level int) func([]byte, *[]byte) ([]byte, error) { -// // TODO: use a pool or something. just testing for now -// gzWriter, err := gzip.NewWriterLevel(nil, level) - -// return func(data []byte, buf2 *[]byte) ([]byte, error) { -// if err != nil { -// return nil, err -// } -// *buf2 = (*buf2)[:0] -// buf := bytes.NewBuffer(*buf2) -// gzWriter.Reset(buf) -// _, err = gzWriter.Write(data) -// if err != nil { -// return nil, err -// } -// err = gzWriter.Close() -// if err != nil { -// return nil, err -// } - -// if len(buf.Bytes()) > cap(*buf2) { -// *buf2 = buf.Bytes() -// } - -// return buf.Bytes(), nil -// } -// } - -// func compressLzw() func(data []byte, buf *[]byte) ([]byte, error) { -// writer := lzw.NewWriter(nil, lzw.LSB, 8).(*lzw.Writer) - -// return func(data []byte, buf *[]byte) ([]byte, error) { -// compressed := bytes.NewBuffer(*buf) -// writer.Reset(compressed, lzw.LSB, 8) -// _, err := writer.Write(data) -// if err != nil { -// return nil, err -// } -// err = writer.Close() -// if err != nil { -// return nil, err -// } -// if len(compressed.Bytes()) > cap(*buf) { -// *buf = compressed.Bytes() -// } -// return compressed.Bytes(), nil -// } -// } - -// func compressFlateWithLevel(level int) func(data []byte, buf *[]byte) ([]byte, error) { -// writer, err := flate.NewWriter(nil, level) - -// return func(data []byte, buf *[]byte) ([]byte, error) { -// if err != nil { -// return nil, err -// } -// *buf = (*buf)[:0] -// compressed := bytes.NewBuffer(*buf) -// writer.Reset(compressed) -// _, err = writer.Write(data) -// if err != nil { -// return nil, err -// } -// err = writer.Close() -// if err != nil { -// return nil, err -// } -// if len(compressed.Bytes()) > cap(*buf) { -// *buf = compressed.Bytes() -// } -// return compressed.Bytes(), nil -// } -// } - -// func compressBrotliWithQuality(q int) func(data []byte, _ *[]byte) ([]byte, error) { -// writer := brotli.NewWriterLevel(nil, q) -// return func(data []byte, buf *[]byte) ([]byte, error) { -// // return brotli.Encode(data, brotli.WriterOptions{Quality: q}) -// *buf = (*buf)[:0] -// compressed := bytes.NewBuffer(*buf) -// writer.Reset(compressed) -// _, err := writer.Write(data) -// if err != nil { -// return nil, err -// } -// err = writer.Flush() -// if err != nil { -// return nil, err -// } -// if len(compressed.Bytes()) > cap(*buf) { -// *buf = compressed.Bytes() -// } -// return compressed.Bytes(), nil -// } - -// } - -// func decompressBrotli(compressed []byte) ([]byte, error) { -// reader := bytes.NewReader(compressed) -// r := brotli.NewReader(reader) -// return io.ReadAll(r) -// } - -// func decompressFlate(compressed []byte) ([]byte, error) { -// reader := bytes.NewReader(compressed) -// r := flate.NewReader(reader) -// defer r.Close() -// return io.ReadAll(r) -// } - -// func decompressLzw(compressed []byte) ([]byte, error) { -// reader := bytes.NewReader(compressed) -// r := lzw.NewReader(reader, lzw.LSB, 8) -// defer r.Close() -// return io.ReadAll(r) -// } - -// func decompressSnappy(compressed []byte) ([]byte, error) { -// uncompressed, err := snappy.Decode(nil, compressed) -// return uncompressed, err -// } - -// func decompressSnappyAlt(compressed []byte) ([]byte, error) { -// uncompressed, err := reSnappy.Decode(nil, compressed) -// return uncompressed, err -// } - -// func decompressS2(compressed []byte) ([]byte, error) { -// uncompressed, err := reS2.Decode(nil, compressed) -// return uncompressed, err -// } - -// func decompressZstd(compressed []byte) ([]byte, error) { -// reader := bytes.NewReader(compressed) -// decoder, err := reZstd.NewReader(reader) -// if err != nil { -// return nil, err -// } -// defer decoder.Close() -// return io.ReadAll(decoder) -// } - -// func decompressGzip(compressed []byte) ([]byte, error) { -// r := bytes.NewReader(compressed) -// gzReader, err := gzip.NewReader(r) -// if err != nil { -// return nil, err -// } -// defer gzReader.Close() -// decompressedData, err := io.ReadAll(gzReader) -// if err != nil { -// return nil, err -// } - -// return decompressedData, nil -// } diff --git a/storage/remote/compression_test.go b/storage/remote/compression_test.go index d5d76fa29d..8aacf76074 100644 --- a/storage/remote/compression_test.go +++ b/storage/remote/compression_test.go @@ -3,7 +3,7 @@ package remote import "testing" func TestCompressions(t *testing.T) { - data := []byte("Hello World") + data := makeUncompressedReducedWriteRequestBenchData(t) tc := []struct { name string algo CompAlgorithm @@ -14,8 +14,6 @@ func TestCompressions(t *testing.T) { {"ZstdFast", ZstdFast}, {"ZstdDefault", ZstdDefault}, {"ZstdBestComp", ZstdBestComp}, - {"GzipFast", GzipFast}, - {"GzipComp", GzipComp}, {"Lzw", Lzw}, {"FlateFast", FlateFast}, {"FlateComp", FlateComp}, @@ -32,7 +30,9 @@ func TestCompressions(t *testing.T) { if err != nil { t.Fatal(err) } - decompressed, err := comp.Decompress(compressed) + compressedCopy := make([]byte, len(compressed)) + copy(compressedCopy, compressed) + decompressed, err := comp.Decompress(compressedCopy) if err != nil { t.Fatal(err) } @@ -44,7 +44,7 @@ func TestCompressions(t *testing.T) { } func BenchmarkCompressions(b *testing.B) { - data := makeUncompressedWriteRequestBenchData(b) + data := makeUncompressedReducedWriteRequestBenchData(b) bc := []struct { name string algo CompAlgorithm @@ -55,8 +55,6 @@ func BenchmarkCompressions(b *testing.B) { {"ZstdFast", ZstdFast}, {"ZstdDefault", ZstdDefault}, {"ZstdBestComp", ZstdBestComp}, - {"GzipFast", GzipFast}, - {"GzipComp", GzipComp}, {"Lzw", Lzw}, {"FlateFast", FlateFast}, {"FlateComp", FlateComp}, @@ -65,20 +63,23 @@ func BenchmarkCompressions(b *testing.B) { {"BrotliDefault", BrotliDefault}, } comps := make(map[CompAlgorithm]Compression) + decomps := make(map[CompAlgorithm]Compression) for _, c := range bc { UseAlgorithm = c.algo comp := createComp() + decomp := createComp() comps[c.algo] = comp + decomps[c.algo] = decomp // warmup for i := 0; i < 10; i++ { - compressed, _ := comp.Compress(data) - // if err != nil { - // b.Fatal(err) - // } - _, _ = comp.Decompress(compressed) - // if err != nil { - // b.Fatal(err) - // } + compressed, err := comp.Compress(data) + if err != nil { + b.Fatal(err) + } + _, err = decomp.Decompress(compressed) + if err != nil { + b.Fatal(err) + } } } @@ -95,13 +96,14 @@ func BenchmarkCompressions(b *testing.B) { }) b.Run("decompress-"+c.name, func(b *testing.B) { comp := comps[c.algo] + decomp := decomps[c.algo] compressed, err := comp.Compress(data) if err != nil { b.Fatal(err) } b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = comp.Decompress(compressed) + _, err = decomp.Decompress(compressed) if err != nil { b.Fatal(err) } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 80e6304ae2..b9c8acd13a 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -16,7 +16,6 @@ package remote import ( "bytes" "compress/flate" - "compress/gzip" "context" "fmt" "math" @@ -1537,7 +1536,8 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { }) } } -func makeUncompressedReducedWriteRequestBenchData(b *testing.B) []byte { + +func makeUncompressedReducedWriteRequestBenchData(b testing.TB) []byte { data := createDummyTimeSeries(1000) pool := newLookupPool() pBuf := proto.NewBuffer(nil) @@ -1612,9 +1612,6 @@ func BenchmarkCompressWriteRequest(b *testing.B) { {"v1-ZstdFastest", uncompV1, &zstdCompression{level: zstd.SpeedFastest}}, {"v1-ZstdSpeedDef", uncompV1, &zstdCompression{level: zstd.SpeedDefault}}, {"v1-ZstdBestComp", uncompV1, &zstdCompression{level: zstd.SpeedBestCompression}}, - {"v1-GzipBestComp", uncompV1, &gzipCompression{level: gzip.BestCompression}}, - {"v1-GzipBestSpeed", uncompV1, &gzipCompression{level: gzip.BestSpeed}}, - {"v1-GzipDefault", uncompV1, &gzipCompression{level: gzip.DefaultCompression}}, {"v1-Lzw", uncompV1, &lzwCompression{}}, {"v1-FlateBestComp", uncompV1, &flateCompression{level: flate.BestCompression}}, {"v1-FlateBestSpeed", uncompV1, &flateCompression{level: flate.BestSpeed}}, @@ -1628,9 +1625,6 @@ func BenchmarkCompressWriteRequest(b *testing.B) { {"v1.1-ZstdFastest", uncompV11, &zstdCompression{level: zstd.SpeedFastest}}, {"v1.1-ZstdSpeedDef", uncompV11, &zstdCompression{level: zstd.SpeedDefault}}, {"v1.1-ZstdBestComp", uncompV11, &zstdCompression{level: zstd.SpeedBestCompression}}, - {"v1.1-GzipBestComp", uncompV11, &gzipCompression{level: gzip.BestCompression}}, - {"v1.1-GzipBestSpeed", uncompV11, &gzipCompression{level: gzip.BestSpeed}}, - {"v1.1-GzipDefault", uncompV11, &gzipCompression{level: gzip.DefaultCompression}}, {"v1.1-Lzw", uncompV11, &lzwCompression{}}, {"v1.1-FlateBestComp", uncompV11, &flateCompression{level: flate.BestCompression}}, {"v1.1-FlateBestSpeed", uncompV11, &flateCompression{level: flate.BestSpeed}},