No Close method and basic pooling

This commit is contained in:
Nicolás Pazos 2023-11-01 17:49:01 -03:00 committed by Callum Styan
parent 0ab4808153
commit 32eb50917e
4 changed files with 170 additions and 50 deletions

View file

@ -869,7 +869,6 @@ func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
comp := createComp()
reqBuf, err := comp.Decompress(compressed)
comp.Close()
if err != nil {
return nil, err

View file

@ -6,6 +6,7 @@ import (
"compress/gzip"
"compress/lzw"
"io"
"sync"
reS2 "github.com/klauspost/compress/s2"
reSnappy "github.com/klauspost/compress/snappy"
@ -19,13 +20,12 @@ import (
type Compression interface {
Compress(data []byte) ([]byte, error)
Decompress(data []byte) ([]byte, error)
Close() error
}
// hacky globals to easily tweak the compression algorithm and run some benchmarks
type CompAlgorithm int
var UseAlgorithm = SnappyAlt
var UseAlgorithm = Snappy
const (
Snappy CompAlgorithm = iota
@ -44,6 +44,21 @@ const (
BrotliDefault
)
// sync.Pool-ed createComp
var compPool = sync.Pool{
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
New: func() interface{} { return createComp() },
}
func GetPooledComp() Compression {
return compPool.Get().(Compression)
}
func PutPooledComp(c Compression) {
compPool.Put(c)
}
var createComp func() Compression = func() Compression {
switch UseAlgorithm {
case Snappy:
@ -89,10 +104,6 @@ func (n *noopCompression) Decompress(data []byte) ([]byte, error) {
return data, nil
}
func (n *noopCompression) Close() error {
return nil
}
type snappyCompression struct {
buf []byte
}
@ -106,12 +117,13 @@ func (s *snappyCompression) Compress(data []byte) ([]byte, error) {
return compressed, nil
}
func (s *snappyCompression) Decompress(data []byte) ([]byte, error) {
uncompressed, err := snappy.Decode(nil, data)
s.buf = s.buf[0:cap(s.buf)]
uncompressed, err := snappy.Decode(s.buf, data)
if len(uncompressed) > cap(s.buf) {
s.buf = uncompressed
}
return uncompressed, err
}
func (s *snappyCompression) Close() error {
return nil
}
type snappyAltCompression struct {
buf []byte
@ -126,12 +138,13 @@ func (s *snappyAltCompression) Compress(data []byte) ([]byte, error) {
return res, nil
}
func (s *snappyAltCompression) Decompress(data []byte) ([]byte, error) {
uncompressed, err := reSnappy.Decode(nil, data)
s.buf = s.buf[0:cap(s.buf)]
uncompressed, err := reSnappy.Decode(s.buf, data)
if len(uncompressed) > cap(s.buf) {
s.buf = uncompressed
}
return uncompressed, err
}
func (s *snappyAltCompression) Close() error {
return nil
}
type s2Compression struct {
buf []byte
@ -146,14 +159,14 @@ func (s *s2Compression) Compress(data []byte) ([]byte, error) {
}
func (s *s2Compression) Decompress(data []byte) ([]byte, error) {
uncompressed, err := reS2.Decode(nil, data)
s.buf = s.buf[0:cap(s.buf)]
uncompressed, err := reS2.Decode(s.buf, data)
if len(uncompressed) > cap(s.buf) {
s.buf = uncompressed
}
return uncompressed, err
}
func (s *s2Compression) Close() error {
return nil
}
type zstdCompression struct {
level zstd.EncoderLevel
buf []byte
@ -190,20 +203,19 @@ func (z *zstdCompression) Compress(data []byte) ([]byte, error) {
}
func (z *zstdCompression) Decompress(data []byte) ([]byte, error) {
reader := bytes.NewReader(data)
decoder, err := reZstd.NewReader(reader)
decoder, err := reZstd.NewReader(nil)
if err != nil {
return nil, err
}
defer decoder.Close()
return io.ReadAll(decoder)
}
func (z *zstdCompression) Close() error {
if z.w != nil {
return z.w.Close()
z.buf = z.buf[:0]
buf, err := decoder.DecodeAll(data, z.buf)
if err != nil {
return nil, err
}
return nil
if len(buf) > cap(z.buf) {
z.buf = buf
}
return buf, nil
}
type gzipCompression struct {
@ -249,10 +261,19 @@ func (g *gzipCompression) Decompress(data []byte) ([]byte, error) {
return nil, err
}
return decompressedData, nil
}
func (g *gzipCompression) Close() error {
return nil
// TODO: debug this
// r := bytes.NewReader(data)
// var err error
// if g.r == nil {
// g.r, err = gzip.NewReader(r)
// if err != nil {
// return nil, err
// }
// }
// g.r.Reset(r)
// defer g.r.Close()
// return io.ReadAll(g.r)
}
type lzwCompression struct {
@ -287,10 +308,6 @@ func (l *lzwCompression) Decompress(data []byte) ([]byte, error) {
return io.ReadAll(r)
}
func (l *lzwCompression) Close() error {
return nil
}
type flateCompression struct {
level int
buf []byte
@ -333,14 +350,11 @@ func (f *flateCompression) Decompress(data []byte) ([]byte, error) {
return io.ReadAll(r)
}
func (f *flateCompression) Close() error {
return f.w.Close()
}
type brotliCompression struct {
quality int
buf []byte
w *brotli.Writer
r *brotli.Reader
}
func (b *brotliCompression) Compress(data []byte) ([]byte, error) {
@ -366,13 +380,11 @@ func (b *brotliCompression) Compress(data []byte) ([]byte, error) {
}
func (b *brotliCompression) Decompress(data []byte) ([]byte, error) {
reader := bytes.NewReader(data)
r := brotli.NewReader(reader)
return io.ReadAll(r)
}
func (b *brotliCompression) Close() error {
return nil
if b.r == nil {
b.r = brotli.NewReader(nil)
}
b.r.Reset(bytes.NewReader(data))
return io.ReadAll(b.r)
}
// func compressSnappy(bytes []byte, buf *[]byte) ([]byte, error) {

View file

@ -0,0 +1,111 @@
package remote
import "testing"
func TestCompressions(t *testing.T) {
data := []byte("Hello World")
tc := []struct {
name string
algo CompAlgorithm
}{
{"Snappy", Snappy},
{"SnappyAlt", SnappyAlt},
{"S2", S2},
{"ZstdFast", ZstdFast},
{"ZstdDefault", ZstdDefault},
{"ZstdBestComp", ZstdBestComp},
{"GzipFast", GzipFast},
{"GzipComp", GzipComp},
{"Lzw", Lzw},
{"FlateFast", FlateFast},
{"FlateComp", FlateComp},
{"BrotliFast", BrotliFast},
{"BrotliComp", BrotliComp},
{"BrotliDefault", BrotliDefault},
}
for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
UseAlgorithm = c.algo
comp := createComp()
compressed, err := comp.Compress(data)
if err != nil {
t.Fatal(err)
}
decompressed, err := comp.Decompress(compressed)
if err != nil {
t.Fatal(err)
}
if string(decompressed) != string(data) {
t.Fatalf("decompressed data is not equal to original data")
}
})
}
}
func BenchmarkCompressions(b *testing.B) {
data := makeUncompressedWriteRequestBenchData(b)
bc := []struct {
name string
algo CompAlgorithm
}{
{"Snappy", Snappy},
{"SnappyAlt", SnappyAlt},
{"S2", S2},
{"ZstdFast", ZstdFast},
{"ZstdDefault", ZstdDefault},
{"ZstdBestComp", ZstdBestComp},
{"GzipFast", GzipFast},
{"GzipComp", GzipComp},
{"Lzw", Lzw},
{"FlateFast", FlateFast},
{"FlateComp", FlateComp},
{"BrotliFast", BrotliFast},
{"BrotliComp", BrotliComp},
{"BrotliDefault", BrotliDefault},
}
comps := make(map[CompAlgorithm]Compression)
for _, c := range bc {
UseAlgorithm = c.algo
comp := createComp()
comps[c.algo] = comp
// warmup
for i := 0; i < 10; i++ {
compressed, _ := comp.Compress(data)
// if err != nil {
// b.Fatal(err)
// }
_, _ = comp.Decompress(compressed)
// if err != nil {
// b.Fatal(err)
// }
}
}
for _, c := range bc {
b.Run("compress-"+c.name, func(b *testing.B) {
comp := comps[c.algo]
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := comp.Compress(data)
if err != nil {
b.Fatal(err)
}
}
})
b.Run("decompress-"+c.name, func(b *testing.B) {
comp := comps[c.algo]
compressed, err := comp.Compress(data)
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = comp.Decompress(compressed)
if err != nil {
b.Fatal(err)
}
}
})
}
}

View file

@ -547,7 +547,6 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
// Build the WriteRequest with no samples.
comp := createComp()
req, _, err := buildWriteRequest(nil, metadata, pBuf, comp)
comp.Close()
if err != nil {
return err
}
@ -1371,7 +1370,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
buf []byte
comp = createComp()
)
defer comp.Close()
if s.qm.sendExemplars {
max += int(float64(max) * 0.1)
}