diff --git a/storage/remote/codec.go b/storage/remote/codec.go index b62482482c..d9770ad1cd 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -869,7 +869,6 @@ func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { comp := createComp() reqBuf, err := comp.Decompress(compressed) - comp.Close() if err != nil { return nil, err diff --git a/storage/remote/compression.go b/storage/remote/compression.go index 369d992eba..ac897cc90d 100644 --- a/storage/remote/compression.go +++ b/storage/remote/compression.go @@ -6,6 +6,7 @@ import ( "compress/gzip" "compress/lzw" "io" + "sync" reS2 "github.com/klauspost/compress/s2" reSnappy "github.com/klauspost/compress/snappy" @@ -19,13 +20,12 @@ import ( type Compression interface { Compress(data []byte) ([]byte, error) Decompress(data []byte) ([]byte, error) - Close() error } // hacky globals to easily tweak the compression algorithm and run some benchmarks type CompAlgorithm int -var UseAlgorithm = SnappyAlt +var UseAlgorithm = Snappy const ( Snappy CompAlgorithm = iota @@ -44,6 +44,21 @@ const ( BrotliDefault ) +// sync.Pool-ed createComp +var compPool = sync.Pool{ + // New optionally specifies a function to generate + // a value when Get would otherwise return nil. + New: func() interface{} { return createComp() }, +} + +func GetPooledComp() Compression { + return compPool.Get().(Compression) +} + +func PutPooledComp(c Compression) { + compPool.Put(c) +} + var createComp func() Compression = func() Compression { switch UseAlgorithm { case Snappy: @@ -89,10 +104,6 @@ func (n *noopCompression) Decompress(data []byte) ([]byte, error) { return data, nil } -func (n *noopCompression) Close() error { - return nil -} - type snappyCompression struct { buf []byte } @@ -106,12 +117,13 @@ func (s *snappyCompression) Compress(data []byte) ([]byte, error) { return compressed, nil } func (s *snappyCompression) Decompress(data []byte) ([]byte, error) { - uncompressed, err := snappy.Decode(nil, data) + s.buf = s.buf[0:cap(s.buf)] + uncompressed, err := snappy.Decode(s.buf, data) + if len(uncompressed) > cap(s.buf) { + s.buf = uncompressed + } return uncompressed, err } -func (s *snappyCompression) Close() error { - return nil -} type snappyAltCompression struct { buf []byte @@ -126,12 +138,13 @@ func (s *snappyAltCompression) Compress(data []byte) ([]byte, error) { return res, nil } func (s *snappyAltCompression) Decompress(data []byte) ([]byte, error) { - uncompressed, err := reSnappy.Decode(nil, data) + s.buf = s.buf[0:cap(s.buf)] + uncompressed, err := reSnappy.Decode(s.buf, data) + if len(uncompressed) > cap(s.buf) { + s.buf = uncompressed + } return uncompressed, err } -func (s *snappyAltCompression) Close() error { - return nil -} type s2Compression struct { buf []byte @@ -146,14 +159,14 @@ func (s *s2Compression) Compress(data []byte) ([]byte, error) { } func (s *s2Compression) Decompress(data []byte) ([]byte, error) { - uncompressed, err := reS2.Decode(nil, data) + s.buf = s.buf[0:cap(s.buf)] + uncompressed, err := reS2.Decode(s.buf, data) + if len(uncompressed) > cap(s.buf) { + s.buf = uncompressed + } return uncompressed, err } -func (s *s2Compression) Close() error { - return nil -} - type zstdCompression struct { level zstd.EncoderLevel buf []byte @@ -190,20 +203,19 @@ func (z *zstdCompression) Compress(data []byte) ([]byte, error) { } func (z *zstdCompression) Decompress(data []byte) ([]byte, error) { - reader := bytes.NewReader(data) - decoder, err := reZstd.NewReader(reader) + decoder, err := reZstd.NewReader(nil) if err != nil { return nil, err } - defer decoder.Close() - return io.ReadAll(decoder) -} - -func (z *zstdCompression) Close() error { - if z.w != nil { - return z.w.Close() + z.buf = z.buf[:0] + buf, err := decoder.DecodeAll(data, z.buf) + if err != nil { + return nil, err } - return nil + if len(buf) > cap(z.buf) { + z.buf = buf + } + return buf, nil } type gzipCompression struct { @@ -249,10 +261,19 @@ func (g *gzipCompression) Decompress(data []byte) ([]byte, error) { return nil, err } return decompressedData, nil -} -func (g *gzipCompression) Close() error { - return nil + // TODO: debug this + // r := bytes.NewReader(data) + // var err error + // if g.r == nil { + // g.r, err = gzip.NewReader(r) + // if err != nil { + // return nil, err + // } + // } + // g.r.Reset(r) + // defer g.r.Close() + // return io.ReadAll(g.r) } type lzwCompression struct { @@ -287,10 +308,6 @@ func (l *lzwCompression) Decompress(data []byte) ([]byte, error) { return io.ReadAll(r) } -func (l *lzwCompression) Close() error { - return nil -} - type flateCompression struct { level int buf []byte @@ -333,14 +350,11 @@ func (f *flateCompression) Decompress(data []byte) ([]byte, error) { return io.ReadAll(r) } -func (f *flateCompression) Close() error { - return f.w.Close() -} - type brotliCompression struct { quality int buf []byte w *brotli.Writer + r *brotli.Reader } func (b *brotliCompression) Compress(data []byte) ([]byte, error) { @@ -366,13 +380,11 @@ func (b *brotliCompression) Compress(data []byte) ([]byte, error) { } func (b *brotliCompression) Decompress(data []byte) ([]byte, error) { - reader := bytes.NewReader(data) - r := brotli.NewReader(reader) - return io.ReadAll(r) -} - -func (b *brotliCompression) Close() error { - return nil + if b.r == nil { + b.r = brotli.NewReader(nil) + } + b.r.Reset(bytes.NewReader(data)) + return io.ReadAll(b.r) } // func compressSnappy(bytes []byte, buf *[]byte) ([]byte, error) { diff --git a/storage/remote/compression_test.go b/storage/remote/compression_test.go new file mode 100644 index 0000000000..d5d76fa29d --- /dev/null +++ b/storage/remote/compression_test.go @@ -0,0 +1,111 @@ +package remote + +import "testing" + +func TestCompressions(t *testing.T) { + data := []byte("Hello World") + tc := []struct { + name string + algo CompAlgorithm + }{ + {"Snappy", Snappy}, + {"SnappyAlt", SnappyAlt}, + {"S2", S2}, + {"ZstdFast", ZstdFast}, + {"ZstdDefault", ZstdDefault}, + {"ZstdBestComp", ZstdBestComp}, + {"GzipFast", GzipFast}, + {"GzipComp", GzipComp}, + {"Lzw", Lzw}, + {"FlateFast", FlateFast}, + {"FlateComp", FlateComp}, + {"BrotliFast", BrotliFast}, + {"BrotliComp", BrotliComp}, + {"BrotliDefault", BrotliDefault}, + } + + for _, c := range tc { + t.Run(c.name, func(t *testing.T) { + UseAlgorithm = c.algo + comp := createComp() + compressed, err := comp.Compress(data) + if err != nil { + t.Fatal(err) + } + decompressed, err := comp.Decompress(compressed) + if err != nil { + t.Fatal(err) + } + if string(decompressed) != string(data) { + t.Fatalf("decompressed data is not equal to original data") + } + }) + } +} + +func BenchmarkCompressions(b *testing.B) { + data := makeUncompressedWriteRequestBenchData(b) + bc := []struct { + name string + algo CompAlgorithm + }{ + {"Snappy", Snappy}, + {"SnappyAlt", SnappyAlt}, + {"S2", S2}, + {"ZstdFast", ZstdFast}, + {"ZstdDefault", ZstdDefault}, + {"ZstdBestComp", ZstdBestComp}, + {"GzipFast", GzipFast}, + {"GzipComp", GzipComp}, + {"Lzw", Lzw}, + {"FlateFast", FlateFast}, + {"FlateComp", FlateComp}, + {"BrotliFast", BrotliFast}, + {"BrotliComp", BrotliComp}, + {"BrotliDefault", BrotliDefault}, + } + comps := make(map[CompAlgorithm]Compression) + for _, c := range bc { + UseAlgorithm = c.algo + comp := createComp() + comps[c.algo] = comp + // warmup + for i := 0; i < 10; i++ { + compressed, _ := comp.Compress(data) + // if err != nil { + // b.Fatal(err) + // } + _, _ = comp.Decompress(compressed) + // if err != nil { + // b.Fatal(err) + // } + } + } + + for _, c := range bc { + b.Run("compress-"+c.name, func(b *testing.B) { + comp := comps[c.algo] + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := comp.Compress(data) + if err != nil { + b.Fatal(err) + } + } + }) + b.Run("decompress-"+c.name, func(b *testing.B) { + comp := comps[c.algo] + compressed, err := comp.Compress(data) + if err != nil { + b.Fatal(err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = comp.Decompress(compressed) + if err != nil { + b.Fatal(err) + } + } + }) + } +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3a4c5647f1..c331fa1707 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -547,7 +547,6 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p // Build the WriteRequest with no samples. comp := createComp() req, _, err := buildWriteRequest(nil, metadata, pBuf, comp) - comp.Close() if err != nil { return err } @@ -1371,7 +1370,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { buf []byte comp = createComp() ) - defer comp.Close() if s.qm.sendExemplars { max += int(float64(max) * 0.1) }