Remove gzip option and various alloc optimizations

This commit is contained in:
Nicolás Pazos 2023-11-01 19:36:43 -03:00 committed by Callum Styan
parent 44844cb655
commit 8a4e5769fc
4 changed files with 69 additions and 364 deletions

View file

@ -867,7 +867,8 @@ func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
return nil, err return nil, err
} }
comp := createComp() comp := GetPooledComp()
defer PutPooledComp(comp)
reqBuf, err := comp.Decompress(compressed) reqBuf, err := comp.Decompress(compressed)
if err != nil { if err != nil {

View file

@ -3,7 +3,6 @@ package remote
import ( import (
"bytes" "bytes"
"compress/flate" "compress/flate"
"compress/gzip"
"compress/lzw" "compress/lzw"
"io" "io"
"sync" "sync"
@ -34,8 +33,6 @@ const (
ZstdFast ZstdFast
ZstdDefault ZstdDefault
ZstdBestComp ZstdBestComp
GzipFast
GzipComp
Lzw Lzw
FlateFast FlateFast
FlateComp FlateComp
@ -73,10 +70,6 @@ var createComp func() Compression = func() Compression {
return &zstdCompression{level: zstd.SpeedFastest} return &zstdCompression{level: zstd.SpeedFastest}
case ZstdBestComp: case ZstdBestComp:
return &zstdCompression{level: zstd.SpeedBestCompression} return &zstdCompression{level: zstd.SpeedBestCompression}
case GzipFast:
return &gzipCompression{level: gzip.BestSpeed}
case GzipComp:
return &gzipCompression{level: gzip.BestCompression}
case Lzw: case Lzw:
return &lzwCompression{} return &lzwCompression{}
case FlateFast: case FlateFast:
@ -170,32 +163,15 @@ func (s *s2Compression) Decompress(data []byte) ([]byte, error) {
type zstdCompression struct { type zstdCompression struct {
level zstd.EncoderLevel level zstd.EncoderLevel
buf []byte buf []byte
w *reZstd.Encoder
} }
func (z *zstdCompression) Compress(data []byte) ([]byte, error) { func (z *zstdCompression) Compress(data []byte) ([]byte, error) {
var err error w, err := reZstd.NewWriter(nil, reZstd.WithEncoderLevel(z.level))
if z.w == nil {
// TODO: should be initialized on creation
z.w, err = reZstd.NewWriter(nil, reZstd.WithEncoderLevel(reZstd.EncoderLevel(z.level)))
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
z.buf = z.buf[0:cap(z.buf)]
z.buf = z.buf[:0] res := w.EncodeAll(data, z.buf)
writer := bytes.NewBuffer(z.buf)
if err != nil {
return nil, err
}
z.w.Reset(writer)
z.w.Write(data)
err = z.w.Close()
if err != nil {
return nil, err
}
res := writer.Bytes()
if len(res) > cap(z.buf) { if len(res) > cap(z.buf) {
z.buf = res z.buf = res
} }
@ -218,75 +194,18 @@ func (z *zstdCompression) Decompress(data []byte) ([]byte, error) {
return buf, nil return buf, nil
} }
type gzipCompression struct {
level int
buf []byte
w *gzip.Writer
}
func (g *gzipCompression) Compress(data []byte) ([]byte, error) {
var err error
if g.w == nil {
g.w, err = gzip.NewWriterLevel(nil, g.level)
}
if err != nil {
return nil, err
}
g.buf = g.buf[:0]
buf := bytes.NewBuffer(g.buf)
g.w.Reset(buf)
_, err = g.w.Write(data)
if err != nil {
return nil, err
}
err = g.w.Close()
if err != nil {
return nil, err
}
if len(buf.Bytes()) > cap(g.buf) {
g.buf = buf.Bytes()
}
return buf.Bytes(), nil
}
func (g *gzipCompression) Decompress(data []byte) ([]byte, error) {
r := bytes.NewReader(data)
gzReader, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
defer gzReader.Close()
decompressedData, err := io.ReadAll(gzReader)
if err != nil {
return nil, err
}
return decompressedData, nil
// TODO: debug this
// r := bytes.NewReader(data)
// var err error
// if g.r == nil {
// g.r, err = gzip.NewReader(r)
// if err != nil {
// return nil, err
// }
// }
// g.r.Reset(r)
// defer g.r.Close()
// return io.ReadAll(g.r)
}
type lzwCompression struct { type lzwCompression struct {
buf []byte
w *lzw.Writer w *lzw.Writer
r *lzw.Reader
buf bytes.Buffer
} }
func (l *lzwCompression) Compress(data []byte) ([]byte, error) { func (l *lzwCompression) Compress(data []byte) ([]byte, error) {
if l.w == nil { if l.w == nil {
l.w = lzw.NewWriter(nil, lzw.LSB, 8).(*lzw.Writer) l.w = lzw.NewWriter(nil, lzw.LSB, 8).(*lzw.Writer)
} }
compressed := bytes.NewBuffer(l.buf) l.buf.Reset()
l.w.Reset(compressed, lzw.LSB, 8) l.w.Reset(&l.buf, lzw.LSB, 8)
_, err := l.w.Write(data) _, err := l.w.Write(data)
if err != nil { if err != nil {
return nil, err return nil, err
@ -295,40 +214,39 @@ func (l *lzwCompression) Compress(data []byte) ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(compressed.Bytes()) > cap(l.buf) { return l.buf.Bytes(), nil
l.buf = compressed.Bytes()
}
return compressed.Bytes(), nil
} }
func (l *lzwCompression) Decompress(data []byte) ([]byte, error) { func (l *lzwCompression) Decompress(data []byte) ([]byte, error) {
reader := bytes.NewReader(data) if l.r == nil {
r := lzw.NewReader(reader, lzw.LSB, 8) l.r = lzw.NewReader(nil, lzw.LSB, 8).(*lzw.Reader)
defer r.Close() }
return io.ReadAll(r) l.r.Reset(bytes.NewReader(data), lzw.LSB, 8)
l.buf.Reset()
_, err := io.Copy(&l.buf, l.r)
if err != nil {
return nil, err
}
return l.buf.Bytes(), nil
} }
type flateCompression struct { type flateCompression struct {
level int level int
buf []byte buf bytes.Buffer
w *flate.Writer w *flate.Writer
r io.ReadCloser
} }
func (f *flateCompression) Compress(data []byte) ([]byte, error) { func (f *flateCompression) Compress(data []byte) ([]byte, error) {
var err error var err error
if f.w == nil { if f.w == nil {
f.w, err = flate.NewWriter(nil, f.level) f.w, err = flate.NewWriter(nil, f.level)
if err != nil {
return nil, err
}
} }
if err != nil { f.buf.Reset()
return nil, err f.w.Reset(&f.buf)
}
if err != nil {
return nil, err
}
f.buf = f.buf[:0]
compressed := bytes.NewBuffer(f.buf)
f.w.Reset(compressed)
_, err = f.w.Write(data) _, err = f.w.Write(data)
if err != nil { if err != nil {
return nil, err return nil, err
@ -337,22 +255,26 @@ func (f *flateCompression) Compress(data []byte) ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(compressed.Bytes()) > cap(f.buf) { return f.buf.Bytes(), nil
f.buf = compressed.Bytes()
}
return compressed.Bytes(), nil
} }
func (f *flateCompression) Decompress(data []byte) ([]byte, error) { func (f *flateCompression) Decompress(data []byte) ([]byte, error) {
reader := bytes.NewReader(data) if f.r == nil {
r := flate.NewReader(reader) f.r = flate.NewReader(nil)
defer r.Close() }
return io.ReadAll(r) f.r.(flate.Resetter).Reset(bytes.NewReader(data), nil)
defer f.r.Close()
f.buf.Reset()
_, err := io.Copy(&f.buf, f.r)
if err != nil {
return nil, err
}
return f.buf.Bytes(), nil
} }
type brotliCompression struct { type brotliCompression struct {
quality int quality int
buf []byte buf bytes.Buffer
w *brotli.Writer w *brotli.Writer
r *brotli.Reader r *brotli.Reader
} }
@ -361,10 +283,8 @@ func (b *brotliCompression) Compress(data []byte) ([]byte, error) {
if b.w == nil { if b.w == nil {
b.w = brotli.NewWriterLevel(nil, b.quality) b.w = brotli.NewWriterLevel(nil, b.quality)
} }
b.buf.Reset()
b.buf = (b.buf)[:0] b.w.Reset(&b.buf)
compressed := bytes.NewBuffer(b.buf)
b.w.Reset(compressed)
_, err := b.w.Write(data) _, err := b.w.Write(data)
if err != nil { if err != nil {
return nil, err return nil, err
@ -373,230 +293,18 @@ func (b *brotliCompression) Compress(data []byte) ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(compressed.Bytes()) > cap(b.buf) { return b.buf.Bytes(), nil
b.buf = compressed.Bytes()
}
return compressed.Bytes(), nil
} }
func (b *brotliCompression) Decompress(data []byte) ([]byte, error) { func (b *brotliCompression) Decompress(data []byte) ([]byte, error) {
if b.r == nil { if b.r == nil {
b.r = brotli.NewReader(nil) b.r = brotli.NewReader(nil)
} }
b.buf.Reset()
b.r.Reset(bytes.NewReader(data)) b.r.Reset(bytes.NewReader(data))
return io.ReadAll(b.r) _, err := io.Copy(&b.buf, b.r)
if err != nil {
return nil, err
}
return b.buf.Bytes(), nil
} }
// func compressSnappy(bytes []byte, buf *[]byte) ([]byte, error) {
// // snappy uses len() to see if it needs to allocate a new slice. Make the
// // buffer as long as possible.
// *buf = (*buf)[0:cap(*buf)]
// compressed := snappy.Encode(*buf, bytes)
// if n := snappy.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) {
// // grow the buffer for the next time
// *buf = make([]byte, n)
// }
// return compressed, nil
// }
// func compressSnappyAlt(bytes []byte, buf *[]byte) ([]byte, error) {
// res := reSnappy.Encode(*buf, bytes)
// if n := reSnappy.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) {
// // grow the buffer for the next time
// *buf = make([]byte, n)
// }
// return res, nil
// }
// func compressS2(bytes []byte, buf *[]byte) ([]byte, error) {
// res := reS2.Encode(*buf, bytes)
// if n := reS2.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) {
// // grow the buffer for the next time
// *buf = make([]byte, n)
// }
// return res, nil
// }
// func compressZstdWithLevel(level reZstd.EncoderLevel) func(data []byte, buf *[]byte) ([]byte, error) {
// // TODO: use a pool or something. just testing for now
// encoder, err := reZstd.NewWriter(nil, reZstd.WithEncoderLevel(level))
// return func(data []byte, buf *[]byte) ([]byte, error) {
// if err != nil {
// return nil, err
// }
// *buf = (*buf)[:0]
// writer := bytes.NewBuffer(*buf)
// if err != nil {
// return nil, err
// }
// encoder.Reset(writer)
// encoder.Write(data)
// err = encoder.Close()
// if err != nil {
// return nil, err
// }
// res := writer.Bytes()
// if len(res) > cap(*buf) {
// *buf = res
// }
// return res, nil
// }
// }
// func compressGzipWithLevel(level int) func([]byte, *[]byte) ([]byte, error) {
// // TODO: use a pool or something. just testing for now
// gzWriter, err := gzip.NewWriterLevel(nil, level)
// return func(data []byte, buf2 *[]byte) ([]byte, error) {
// if err != nil {
// return nil, err
// }
// *buf2 = (*buf2)[:0]
// buf := bytes.NewBuffer(*buf2)
// gzWriter.Reset(buf)
// _, err = gzWriter.Write(data)
// if err != nil {
// return nil, err
// }
// err = gzWriter.Close()
// if err != nil {
// return nil, err
// }
// if len(buf.Bytes()) > cap(*buf2) {
// *buf2 = buf.Bytes()
// }
// return buf.Bytes(), nil
// }
// }
// func compressLzw() func(data []byte, buf *[]byte) ([]byte, error) {
// writer := lzw.NewWriter(nil, lzw.LSB, 8).(*lzw.Writer)
// return func(data []byte, buf *[]byte) ([]byte, error) {
// compressed := bytes.NewBuffer(*buf)
// writer.Reset(compressed, lzw.LSB, 8)
// _, err := writer.Write(data)
// if err != nil {
// return nil, err
// }
// err = writer.Close()
// if err != nil {
// return nil, err
// }
// if len(compressed.Bytes()) > cap(*buf) {
// *buf = compressed.Bytes()
// }
// return compressed.Bytes(), nil
// }
// }
// func compressFlateWithLevel(level int) func(data []byte, buf *[]byte) ([]byte, error) {
// writer, err := flate.NewWriter(nil, level)
// return func(data []byte, buf *[]byte) ([]byte, error) {
// if err != nil {
// return nil, err
// }
// *buf = (*buf)[:0]
// compressed := bytes.NewBuffer(*buf)
// writer.Reset(compressed)
// _, err = writer.Write(data)
// if err != nil {
// return nil, err
// }
// err = writer.Close()
// if err != nil {
// return nil, err
// }
// if len(compressed.Bytes()) > cap(*buf) {
// *buf = compressed.Bytes()
// }
// return compressed.Bytes(), nil
// }
// }
// func compressBrotliWithQuality(q int) func(data []byte, _ *[]byte) ([]byte, error) {
// writer := brotli.NewWriterLevel(nil, q)
// return func(data []byte, buf *[]byte) ([]byte, error) {
// // return brotli.Encode(data, brotli.WriterOptions{Quality: q})
// *buf = (*buf)[:0]
// compressed := bytes.NewBuffer(*buf)
// writer.Reset(compressed)
// _, err := writer.Write(data)
// if err != nil {
// return nil, err
// }
// err = writer.Flush()
// if err != nil {
// return nil, err
// }
// if len(compressed.Bytes()) > cap(*buf) {
// *buf = compressed.Bytes()
// }
// return compressed.Bytes(), nil
// }
// }
// func decompressBrotli(compressed []byte) ([]byte, error) {
// reader := bytes.NewReader(compressed)
// r := brotli.NewReader(reader)
// return io.ReadAll(r)
// }
// func decompressFlate(compressed []byte) ([]byte, error) {
// reader := bytes.NewReader(compressed)
// r := flate.NewReader(reader)
// defer r.Close()
// return io.ReadAll(r)
// }
// func decompressLzw(compressed []byte) ([]byte, error) {
// reader := bytes.NewReader(compressed)
// r := lzw.NewReader(reader, lzw.LSB, 8)
// defer r.Close()
// return io.ReadAll(r)
// }
// func decompressSnappy(compressed []byte) ([]byte, error) {
// uncompressed, err := snappy.Decode(nil, compressed)
// return uncompressed, err
// }
// func decompressSnappyAlt(compressed []byte) ([]byte, error) {
// uncompressed, err := reSnappy.Decode(nil, compressed)
// return uncompressed, err
// }
// func decompressS2(compressed []byte) ([]byte, error) {
// uncompressed, err := reS2.Decode(nil, compressed)
// return uncompressed, err
// }
// func decompressZstd(compressed []byte) ([]byte, error) {
// reader := bytes.NewReader(compressed)
// decoder, err := reZstd.NewReader(reader)
// if err != nil {
// return nil, err
// }
// defer decoder.Close()
// return io.ReadAll(decoder)
// }
// func decompressGzip(compressed []byte) ([]byte, error) {
// r := bytes.NewReader(compressed)
// gzReader, err := gzip.NewReader(r)
// if err != nil {
// return nil, err
// }
// defer gzReader.Close()
// decompressedData, err := io.ReadAll(gzReader)
// if err != nil {
// return nil, err
// }
// return decompressedData, nil
// }

View file

@ -3,7 +3,7 @@ package remote
import "testing" import "testing"
func TestCompressions(t *testing.T) { func TestCompressions(t *testing.T) {
data := []byte("Hello World") data := makeUncompressedReducedWriteRequestBenchData(t)
tc := []struct { tc := []struct {
name string name string
algo CompAlgorithm algo CompAlgorithm
@ -14,8 +14,6 @@ func TestCompressions(t *testing.T) {
{"ZstdFast", ZstdFast}, {"ZstdFast", ZstdFast},
{"ZstdDefault", ZstdDefault}, {"ZstdDefault", ZstdDefault},
{"ZstdBestComp", ZstdBestComp}, {"ZstdBestComp", ZstdBestComp},
{"GzipFast", GzipFast},
{"GzipComp", GzipComp},
{"Lzw", Lzw}, {"Lzw", Lzw},
{"FlateFast", FlateFast}, {"FlateFast", FlateFast},
{"FlateComp", FlateComp}, {"FlateComp", FlateComp},
@ -32,7 +30,9 @@ func TestCompressions(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
decompressed, err := comp.Decompress(compressed) compressedCopy := make([]byte, len(compressed))
copy(compressedCopy, compressed)
decompressed, err := comp.Decompress(compressedCopy)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -44,7 +44,7 @@ func TestCompressions(t *testing.T) {
} }
func BenchmarkCompressions(b *testing.B) { func BenchmarkCompressions(b *testing.B) {
data := makeUncompressedWriteRequestBenchData(b) data := makeUncompressedReducedWriteRequestBenchData(b)
bc := []struct { bc := []struct {
name string name string
algo CompAlgorithm algo CompAlgorithm
@ -55,8 +55,6 @@ func BenchmarkCompressions(b *testing.B) {
{"ZstdFast", ZstdFast}, {"ZstdFast", ZstdFast},
{"ZstdDefault", ZstdDefault}, {"ZstdDefault", ZstdDefault},
{"ZstdBestComp", ZstdBestComp}, {"ZstdBestComp", ZstdBestComp},
{"GzipFast", GzipFast},
{"GzipComp", GzipComp},
{"Lzw", Lzw}, {"Lzw", Lzw},
{"FlateFast", FlateFast}, {"FlateFast", FlateFast},
{"FlateComp", FlateComp}, {"FlateComp", FlateComp},
@ -65,20 +63,23 @@ func BenchmarkCompressions(b *testing.B) {
{"BrotliDefault", BrotliDefault}, {"BrotliDefault", BrotliDefault},
} }
comps := make(map[CompAlgorithm]Compression) comps := make(map[CompAlgorithm]Compression)
decomps := make(map[CompAlgorithm]Compression)
for _, c := range bc { for _, c := range bc {
UseAlgorithm = c.algo UseAlgorithm = c.algo
comp := createComp() comp := createComp()
decomp := createComp()
comps[c.algo] = comp comps[c.algo] = comp
decomps[c.algo] = decomp
// warmup // warmup
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
compressed, _ := comp.Compress(data) compressed, err := comp.Compress(data)
// if err != nil { if err != nil {
// b.Fatal(err) b.Fatal(err)
// } }
_, _ = comp.Decompress(compressed) _, err = decomp.Decompress(compressed)
// if err != nil { if err != nil {
// b.Fatal(err) b.Fatal(err)
// } }
} }
} }
@ -95,13 +96,14 @@ func BenchmarkCompressions(b *testing.B) {
}) })
b.Run("decompress-"+c.name, func(b *testing.B) { b.Run("decompress-"+c.name, func(b *testing.B) {
comp := comps[c.algo] comp := comps[c.algo]
decomp := decomps[c.algo]
compressed, err := comp.Compress(data) compressed, err := comp.Compress(data)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err = comp.Decompress(compressed) _, err = decomp.Decompress(compressed)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }

View file

@ -16,7 +16,6 @@ package remote
import ( import (
"bytes" "bytes"
"compress/flate" "compress/flate"
"compress/gzip"
"context" "context"
"fmt" "fmt"
"math" "math"
@ -1537,7 +1536,8 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
}) })
} }
} }
func makeUncompressedReducedWriteRequestBenchData(b *testing.B) []byte {
func makeUncompressedReducedWriteRequestBenchData(b testing.TB) []byte {
data := createDummyTimeSeries(1000) data := createDummyTimeSeries(1000)
pool := newLookupPool() pool := newLookupPool()
pBuf := proto.NewBuffer(nil) pBuf := proto.NewBuffer(nil)
@ -1612,9 +1612,6 @@ func BenchmarkCompressWriteRequest(b *testing.B) {
{"v1-ZstdFastest", uncompV1, &zstdCompression{level: zstd.SpeedFastest}}, {"v1-ZstdFastest", uncompV1, &zstdCompression{level: zstd.SpeedFastest}},
{"v1-ZstdSpeedDef", uncompV1, &zstdCompression{level: zstd.SpeedDefault}}, {"v1-ZstdSpeedDef", uncompV1, &zstdCompression{level: zstd.SpeedDefault}},
{"v1-ZstdBestComp", uncompV1, &zstdCompression{level: zstd.SpeedBestCompression}}, {"v1-ZstdBestComp", uncompV1, &zstdCompression{level: zstd.SpeedBestCompression}},
{"v1-GzipBestComp", uncompV1, &gzipCompression{level: gzip.BestCompression}},
{"v1-GzipBestSpeed", uncompV1, &gzipCompression{level: gzip.BestSpeed}},
{"v1-GzipDefault", uncompV1, &gzipCompression{level: gzip.DefaultCompression}},
{"v1-Lzw", uncompV1, &lzwCompression{}}, {"v1-Lzw", uncompV1, &lzwCompression{}},
{"v1-FlateBestComp", uncompV1, &flateCompression{level: flate.BestCompression}}, {"v1-FlateBestComp", uncompV1, &flateCompression{level: flate.BestCompression}},
{"v1-FlateBestSpeed", uncompV1, &flateCompression{level: flate.BestSpeed}}, {"v1-FlateBestSpeed", uncompV1, &flateCompression{level: flate.BestSpeed}},
@ -1628,9 +1625,6 @@ func BenchmarkCompressWriteRequest(b *testing.B) {
{"v1.1-ZstdFastest", uncompV11, &zstdCompression{level: zstd.SpeedFastest}}, {"v1.1-ZstdFastest", uncompV11, &zstdCompression{level: zstd.SpeedFastest}},
{"v1.1-ZstdSpeedDef", uncompV11, &zstdCompression{level: zstd.SpeedDefault}}, {"v1.1-ZstdSpeedDef", uncompV11, &zstdCompression{level: zstd.SpeedDefault}},
{"v1.1-ZstdBestComp", uncompV11, &zstdCompression{level: zstd.SpeedBestCompression}}, {"v1.1-ZstdBestComp", uncompV11, &zstdCompression{level: zstd.SpeedBestCompression}},
{"v1.1-GzipBestComp", uncompV11, &gzipCompression{level: gzip.BestCompression}},
{"v1.1-GzipBestSpeed", uncompV11, &gzipCompression{level: gzip.BestSpeed}},
{"v1.1-GzipDefault", uncompV11, &gzipCompression{level: gzip.DefaultCompression}},
{"v1.1-Lzw", uncompV11, &lzwCompression{}}, {"v1.1-Lzw", uncompV11, &lzwCompression{}},
{"v1.1-FlateBestComp", uncompV11, &flateCompression{level: flate.BestCompression}}, {"v1.1-FlateBestComp", uncompV11, &flateCompression{level: flate.BestCompression}},
{"v1.1-FlateBestSpeed", uncompV11, &flateCompression{level: flate.BestSpeed}}, {"v1.1-FlateBestSpeed", uncompV11, &flateCompression{level: flate.BestSpeed}},