Add alternative compression algos to experiment

This commit is contained in:
alexgreenbank 2023-09-27 19:32:18 +00:00 committed by Callum Styan
parent f2bc16177d
commit 0ab4808153
6 changed files with 818 additions and 15 deletions

1
go.mod
View file

@ -9,6 +9,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1
github.com/alecthomas/kingpin/v2 v2.3.2 github.com/alecthomas/kingpin/v2 v2.3.2
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/andybalholm/brotli v1.0.6
github.com/aws/aws-sdk-go v1.45.25 github.com/aws/aws-sdk-go v1.45.25
github.com/cespare/xxhash/v2 v2.2.0 github.com/cespare/xxhash/v2 v2.2.0
github.com/dennwc/varint v1.0.0 github.com/dennwc/varint v1.0.0

2
go.sum
View file

@ -73,6 +73,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI=
github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=

View file

@ -867,7 +867,10 @@ func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
return nil, err return nil, err
} }
reqBuf, err := snappy.Decode(nil, compressed) comp := createComp()
reqBuf, err := comp.Decompress(compressed)
comp.Close()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -0,0 +1,590 @@
package remote
import (
"bytes"
"compress/flate"
"compress/gzip"
"compress/lzw"
"io"
reS2 "github.com/klauspost/compress/s2"
reSnappy "github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
reZstd "github.com/klauspost/compress/zstd"
"github.com/andybalholm/brotli"
"github.com/golang/snappy"
)
type Compression interface {
Compress(data []byte) ([]byte, error)
Decompress(data []byte) ([]byte, error)
Close() error
}
// hacky globals to easily tweak the compression algorithm and run some benchmarks
type CompAlgorithm int
var UseAlgorithm = SnappyAlt
const (
Snappy CompAlgorithm = iota
SnappyAlt
S2
ZstdFast
ZstdDefault
ZstdBestComp
GzipFast
GzipComp
Lzw
FlateFast
FlateComp
BrotliFast
BrotliComp
BrotliDefault
)
var createComp func() Compression = func() Compression {
switch UseAlgorithm {
case Snappy:
return &snappyCompression{}
case SnappyAlt:
return &snappyAltCompression{}
case S2:
return &s2Compression{}
case ZstdDefault:
return &zstdCompression{level: zstd.SpeedDefault}
case ZstdFast:
return &zstdCompression{level: zstd.SpeedFastest}
case ZstdBestComp:
return &zstdCompression{level: zstd.SpeedBestCompression}
case GzipFast:
return &gzipCompression{level: gzip.BestSpeed}
case GzipComp:
return &gzipCompression{level: gzip.BestCompression}
case Lzw:
return &lzwCompression{}
case FlateFast:
return &flateCompression{level: flate.BestSpeed}
case FlateComp:
return &flateCompression{level: flate.BestCompression}
case BrotliFast:
return &brotliCompression{quality: brotli.BestSpeed}
case BrotliDefault:
return &brotliCompression{quality: brotli.DefaultCompression}
case BrotliComp:
return &brotliCompression{quality: brotli.BestCompression}
default:
panic("unknown compression algorithm")
}
}
type noopCompression struct{}
func (n *noopCompression) Compress(data []byte) ([]byte, error) {
return data, nil
}
func (n *noopCompression) Decompress(data []byte) ([]byte, error) {
return data, nil
}
func (n *noopCompression) Close() error {
return nil
}
type snappyCompression struct {
buf []byte
}
func (s *snappyCompression) Compress(data []byte) ([]byte, error) {
s.buf = s.buf[0:cap(s.buf)]
compressed := snappy.Encode(s.buf, data)
if n := snappy.MaxEncodedLen(len(data)); n > len(s.buf) {
s.buf = make([]byte, n)
}
return compressed, nil
}
func (s *snappyCompression) Decompress(data []byte) ([]byte, error) {
uncompressed, err := snappy.Decode(nil, data)
return uncompressed, err
}
func (s *snappyCompression) Close() error {
return nil
}
type snappyAltCompression struct {
buf []byte
}
func (s *snappyAltCompression) Compress(data []byte) ([]byte, error) {
s.buf = s.buf[0:cap(s.buf)]
res := reSnappy.Encode(s.buf, data)
if n := reSnappy.MaxEncodedLen(len(data)); n > len(s.buf) {
s.buf = make([]byte, n)
}
return res, nil
}
func (s *snappyAltCompression) Decompress(data []byte) ([]byte, error) {
uncompressed, err := reSnappy.Decode(nil, data)
return uncompressed, err
}
func (s *snappyAltCompression) Close() error {
return nil
}
type s2Compression struct {
buf []byte
}
func (s *s2Compression) Compress(data []byte) ([]byte, error) {
res := reS2.Encode(s.buf, data)
if n := reS2.MaxEncodedLen(len(data)); n > len(s.buf) {
s.buf = make([]byte, n)
}
return res, nil
}
func (s *s2Compression) Decompress(data []byte) ([]byte, error) {
uncompressed, err := reS2.Decode(nil, data)
return uncompressed, err
}
func (s *s2Compression) Close() error {
return nil
}
type zstdCompression struct {
level zstd.EncoderLevel
buf []byte
w *reZstd.Encoder
}
func (z *zstdCompression) Compress(data []byte) ([]byte, error) {
var err error
if z.w == nil {
// TODO: should be initialized on creation
z.w, err = reZstd.NewWriter(nil, reZstd.WithEncoderLevel(reZstd.EncoderLevel(z.level)))
}
if err != nil {
return nil, err
}
z.buf = z.buf[:0]
writer := bytes.NewBuffer(z.buf)
if err != nil {
return nil, err
}
z.w.Reset(writer)
z.w.Write(data)
err = z.w.Close()
if err != nil {
return nil, err
}
res := writer.Bytes()
if len(res) > cap(z.buf) {
z.buf = res
}
return res, nil
}
func (z *zstdCompression) Decompress(data []byte) ([]byte, error) {
reader := bytes.NewReader(data)
decoder, err := reZstd.NewReader(reader)
if err != nil {
return nil, err
}
defer decoder.Close()
return io.ReadAll(decoder)
}
func (z *zstdCompression) Close() error {
if z.w != nil {
return z.w.Close()
}
return nil
}
type gzipCompression struct {
level int
buf []byte
w *gzip.Writer
}
func (g *gzipCompression) Compress(data []byte) ([]byte, error) {
var err error
if g.w == nil {
g.w, err = gzip.NewWriterLevel(nil, g.level)
}
if err != nil {
return nil, err
}
g.buf = g.buf[:0]
buf := bytes.NewBuffer(g.buf)
g.w.Reset(buf)
_, err = g.w.Write(data)
if err != nil {
return nil, err
}
err = g.w.Close()
if err != nil {
return nil, err
}
if len(buf.Bytes()) > cap(g.buf) {
g.buf = buf.Bytes()
}
return buf.Bytes(), nil
}
func (g *gzipCompression) Decompress(data []byte) ([]byte, error) {
r := bytes.NewReader(data)
gzReader, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
defer gzReader.Close()
decompressedData, err := io.ReadAll(gzReader)
if err != nil {
return nil, err
}
return decompressedData, nil
}
func (g *gzipCompression) Close() error {
return nil
}
type lzwCompression struct {
buf []byte
w *lzw.Writer
}
func (l *lzwCompression) Compress(data []byte) ([]byte, error) {
if l.w == nil {
l.w = lzw.NewWriter(nil, lzw.LSB, 8).(*lzw.Writer)
}
compressed := bytes.NewBuffer(l.buf)
l.w.Reset(compressed, lzw.LSB, 8)
_, err := l.w.Write(data)
if err != nil {
return nil, err
}
err = l.w.Close()
if err != nil {
return nil, err
}
if len(compressed.Bytes()) > cap(l.buf) {
l.buf = compressed.Bytes()
}
return compressed.Bytes(), nil
}
func (l *lzwCompression) Decompress(data []byte) ([]byte, error) {
reader := bytes.NewReader(data)
r := lzw.NewReader(reader, lzw.LSB, 8)
defer r.Close()
return io.ReadAll(r)
}
func (l *lzwCompression) Close() error {
return nil
}
type flateCompression struct {
level int
buf []byte
w *flate.Writer
}
func (f *flateCompression) Compress(data []byte) ([]byte, error) {
var err error
if f.w == nil {
f.w, err = flate.NewWriter(nil, f.level)
}
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
f.buf = f.buf[:0]
compressed := bytes.NewBuffer(f.buf)
f.w.Reset(compressed)
_, err = f.w.Write(data)
if err != nil {
return nil, err
}
err = f.w.Close()
if err != nil {
return nil, err
}
if len(compressed.Bytes()) > cap(f.buf) {
f.buf = compressed.Bytes()
}
return compressed.Bytes(), nil
}
func (f *flateCompression) Decompress(data []byte) ([]byte, error) {
reader := bytes.NewReader(data)
r := flate.NewReader(reader)
defer r.Close()
return io.ReadAll(r)
}
func (f *flateCompression) Close() error {
return f.w.Close()
}
type brotliCompression struct {
quality int
buf []byte
w *brotli.Writer
}
func (b *brotliCompression) Compress(data []byte) ([]byte, error) {
if b.w == nil {
b.w = brotli.NewWriterLevel(nil, b.quality)
}
b.buf = (b.buf)[:0]
compressed := bytes.NewBuffer(b.buf)
b.w.Reset(compressed)
_, err := b.w.Write(data)
if err != nil {
return nil, err
}
err = b.w.Flush()
if err != nil {
return nil, err
}
if len(compressed.Bytes()) > cap(b.buf) {
b.buf = compressed.Bytes()
}
return compressed.Bytes(), nil
}
func (b *brotliCompression) Decompress(data []byte) ([]byte, error) {
reader := bytes.NewReader(data)
r := brotli.NewReader(reader)
return io.ReadAll(r)
}
func (b *brotliCompression) Close() error {
return nil
}
// func compressSnappy(bytes []byte, buf *[]byte) ([]byte, error) {
// // snappy uses len() to see if it needs to allocate a new slice. Make the
// // buffer as long as possible.
// *buf = (*buf)[0:cap(*buf)]
// compressed := snappy.Encode(*buf, bytes)
// if n := snappy.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) {
// // grow the buffer for the next time
// *buf = make([]byte, n)
// }
// return compressed, nil
// }
// func compressSnappyAlt(bytes []byte, buf *[]byte) ([]byte, error) {
// res := reSnappy.Encode(*buf, bytes)
// if n := reSnappy.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) {
// // grow the buffer for the next time
// *buf = make([]byte, n)
// }
// return res, nil
// }
// func compressS2(bytes []byte, buf *[]byte) ([]byte, error) {
// res := reS2.Encode(*buf, bytes)
// if n := reS2.MaxEncodedLen(len(bytes)); buf != nil && n > len(*buf) {
// // grow the buffer for the next time
// *buf = make([]byte, n)
// }
// return res, nil
// }
// func compressZstdWithLevel(level reZstd.EncoderLevel) func(data []byte, buf *[]byte) ([]byte, error) {
// // TODO: use a pool or something. just testing for now
// encoder, err := reZstd.NewWriter(nil, reZstd.WithEncoderLevel(level))
// return func(data []byte, buf *[]byte) ([]byte, error) {
// if err != nil {
// return nil, err
// }
// *buf = (*buf)[:0]
// writer := bytes.NewBuffer(*buf)
// if err != nil {
// return nil, err
// }
// encoder.Reset(writer)
// encoder.Write(data)
// err = encoder.Close()
// if err != nil {
// return nil, err
// }
// res := writer.Bytes()
// if len(res) > cap(*buf) {
// *buf = res
// }
// return res, nil
// }
// }
// func compressGzipWithLevel(level int) func([]byte, *[]byte) ([]byte, error) {
// // TODO: use a pool or something. just testing for now
// gzWriter, err := gzip.NewWriterLevel(nil, level)
// return func(data []byte, buf2 *[]byte) ([]byte, error) {
// if err != nil {
// return nil, err
// }
// *buf2 = (*buf2)[:0]
// buf := bytes.NewBuffer(*buf2)
// gzWriter.Reset(buf)
// _, err = gzWriter.Write(data)
// if err != nil {
// return nil, err
// }
// err = gzWriter.Close()
// if err != nil {
// return nil, err
// }
// if len(buf.Bytes()) > cap(*buf2) {
// *buf2 = buf.Bytes()
// }
// return buf.Bytes(), nil
// }
// }
// func compressLzw() func(data []byte, buf *[]byte) ([]byte, error) {
// writer := lzw.NewWriter(nil, lzw.LSB, 8).(*lzw.Writer)
// return func(data []byte, buf *[]byte) ([]byte, error) {
// compressed := bytes.NewBuffer(*buf)
// writer.Reset(compressed, lzw.LSB, 8)
// _, err := writer.Write(data)
// if err != nil {
// return nil, err
// }
// err = writer.Close()
// if err != nil {
// return nil, err
// }
// if len(compressed.Bytes()) > cap(*buf) {
// *buf = compressed.Bytes()
// }
// return compressed.Bytes(), nil
// }
// }
// func compressFlateWithLevel(level int) func(data []byte, buf *[]byte) ([]byte, error) {
// writer, err := flate.NewWriter(nil, level)
// return func(data []byte, buf *[]byte) ([]byte, error) {
// if err != nil {
// return nil, err
// }
// *buf = (*buf)[:0]
// compressed := bytes.NewBuffer(*buf)
// writer.Reset(compressed)
// _, err = writer.Write(data)
// if err != nil {
// return nil, err
// }
// err = writer.Close()
// if err != nil {
// return nil, err
// }
// if len(compressed.Bytes()) > cap(*buf) {
// *buf = compressed.Bytes()
// }
// return compressed.Bytes(), nil
// }
// }
// func compressBrotliWithQuality(q int) func(data []byte, _ *[]byte) ([]byte, error) {
// writer := brotli.NewWriterLevel(nil, q)
// return func(data []byte, buf *[]byte) ([]byte, error) {
// // return brotli.Encode(data, brotli.WriterOptions{Quality: q})
// *buf = (*buf)[:0]
// compressed := bytes.NewBuffer(*buf)
// writer.Reset(compressed)
// _, err := writer.Write(data)
// if err != nil {
// return nil, err
// }
// err = writer.Flush()
// if err != nil {
// return nil, err
// }
// if len(compressed.Bytes()) > cap(*buf) {
// *buf = compressed.Bytes()
// }
// return compressed.Bytes(), nil
// }
// }
// func decompressBrotli(compressed []byte) ([]byte, error) {
// reader := bytes.NewReader(compressed)
// r := brotli.NewReader(reader)
// return io.ReadAll(r)
// }
// func decompressFlate(compressed []byte) ([]byte, error) {
// reader := bytes.NewReader(compressed)
// r := flate.NewReader(reader)
// defer r.Close()
// return io.ReadAll(r)
// }
// func decompressLzw(compressed []byte) ([]byte, error) {
// reader := bytes.NewReader(compressed)
// r := lzw.NewReader(reader, lzw.LSB, 8)
// defer r.Close()
// return io.ReadAll(r)
// }
// func decompressSnappy(compressed []byte) ([]byte, error) {
// uncompressed, err := snappy.Decode(nil, compressed)
// return uncompressed, err
// }
// func decompressSnappyAlt(compressed []byte) ([]byte, error) {
// uncompressed, err := reSnappy.Decode(nil, compressed)
// return uncompressed, err
// }
// func decompressS2(compressed []byte) ([]byte, error) {
// uncompressed, err := reS2.Decode(nil, compressed)
// return uncompressed, err
// }
// func decompressZstd(compressed []byte) ([]byte, error) {
// reader := bytes.NewReader(compressed)
// decoder, err := reZstd.NewReader(reader)
// if err != nil {
// return nil, err
// }
// defer decoder.Close()
// return io.ReadAll(decoder)
// }
// func decompressGzip(compressed []byte) ([]byte, error) {
// r := bytes.NewReader(compressed)
// gzReader, err := gzip.NewReader(r)
// if err != nil {
// return nil, err
// }
// defer gzReader.Close()
// decompressedData, err := io.ReadAll(gzReader)
// if err != nil {
// return nil, err
// }
// return decompressedData, nil
// }

View file

@ -26,7 +26,7 @@ import (
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
@ -545,7 +545,9 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples. // Build the WriteRequest with no samples.
req, _, err := buildWriteRequest(nil, metadata, pBuf, nil) comp := createComp()
req, _, err := buildWriteRequest(nil, metadata, pBuf, comp)
comp.Close()
if err != nil { if err != nil {
return err return err
} }
@ -1367,7 +1369,9 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
pBuf = proto.NewBuffer(nil) pBuf = proto.NewBuffer(nil)
pBufRaw []byte pBufRaw []byte
buf []byte buf []byte
comp = createComp()
) )
defer comp.Close()
if s.qm.sendExemplars { if s.qm.sendExemplars {
max += int(float64(max) * 0.1) max += int(float64(max) * 0.1)
} }
@ -1442,7 +1446,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear() symbolTable.clear()
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
stop() stop()
@ -1455,7 +1458,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case Base1: case Base1:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, comp)
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
case Min32Optimized: case Min32Optimized:
@ -1519,13 +1522,12 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
} }
return nPendingSamples, nPendingExemplars, nPendingHistograms return nPendingSamples, nPendingExemplars, nPendingHistograms
} }
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, comp Compression) {
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now() begin := time.Now()
// Build the WriteRequest with no metadata. // Build the WriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will // Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails. // only error if marshaling the proto to bytes fails.
req, highest, err := buildWriteRequest(samples, nil, pBuf, buf) req, highest, err := buildWriteRequest(samples, nil, pBuf, comp)
if err == nil { if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
} }
@ -1768,7 +1770,15 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
} }
} }
func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) { func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, comp Compression) ([]byte, int64, error) {
return buildWriteRequestWithCompression(samples, metadata, pBuf, comp)
}
func buildWriteRequestWithCompression(samples []prompb.TimeSeries,
metadata []prompb.MetricMetadata,
pBuf *proto.Buffer,
compressor Compression,
) ([]byte, int64, error) {
var highest int64 var highest int64
for _, ts := range samples { for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it. // At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1810,8 +1820,7 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
// grow the buffer for the next time // grow the buffer for the next time
*buf = make([]byte, n) *buf = make([]byte, n)
} }
return compressed, highest, err
return compressed, highest, nil
} }
type offLenPair struct { type offLenPair struct {
@ -1819,6 +1828,48 @@ type offLenPair struct {
Len uint32 Len uint32
} }
func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, comp Compression) ([]byte, int64, error) {
return buildReducedWriteRequestWithCompression(samples, labels, pBuf, comp)
}
func buildReducedWriteRequestWithCompression(samples []prompb.ReducedTimeSeries,
labels map[uint64]string,
pBuf *proto.Buffer,
compress Compression,
) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.WriteRequestWithRefs{
StringSymbolTable: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil {
return nil, 0, err
}
compressed, err := compress.Compress(pBuf.Bytes())
return compressed, highest, err
}
type rwSymbolTable struct { type rwSymbolTable struct {
symbols []byte symbols []byte
symbolsMap map[string]offLenPair symbolsMap map[string]offLenPair

View file

@ -14,6 +14,9 @@
package remote package remote
import ( import (
"bytes"
"compress/flate"
"compress/gzip"
"context" "context"
"fmt" "fmt"
"math" "math"
@ -30,6 +33,7 @@ import (
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
client_testutil "github.com/prometheus/client_golang/prometheus/testutil" client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -1447,7 +1451,8 @@ func createDummyTimeSeries(instances int) []timeSeries {
func BenchmarkBuildWriteRequest(b *testing.B) { func BenchmarkBuildWriteRequest(b *testing.B) {
bench := func(b *testing.B, batch []timeSeries) { bench := func(b *testing.B, batch []timeSeries) {
buff := make([]byte, 0)
comp := &snappyCompression{}
seriesBuff := make([]prompb.TimeSeries, len(batch)) seriesBuff := make([]prompb.TimeSeries, len(batch))
for i := range seriesBuff { for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}} seriesBuff[i].Samples = []prompb.Sample{{}}
@ -1458,14 +1463,14 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
// Warmup buffers // Warmup buffers
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
populateTimeSeries(batch, seriesBuff, true, true) populateTimeSeries(batch, seriesBuff, true, true)
buildWriteRequest(seriesBuff, nil, pBuf, &buff) buildWriteRequest(seriesBuff, nil, pBuf, comp)
} }
b.ResetTimer() b.ResetTimer()
totalSize := 0 totalSize := 0
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
populateTimeSeries(batch, seriesBuff, true, true) populateTimeSeries(batch, seriesBuff, true, true)
req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, &buff) req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, comp)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -1492,6 +1497,7 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
} }
func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
type testcase struct { type testcase struct {
batch []timeSeries batch []timeSeries
} }
@ -1502,7 +1508,6 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
} }
for _, tc := range testCases { for _, tc := range testCases {
symbolTable := newRwSymbolTable() symbolTable := newRwSymbolTable()
buff := make([]byte, 0)
seriesBuff := make([]prompb.MinimizedTimeSeries, len(tc.batch)) seriesBuff := make([]prompb.MinimizedTimeSeries, len(tc.batch))
for i := range seriesBuff { for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}} seriesBuff[i].Samples = []prompb.Sample{{}}
@ -1532,3 +1537,154 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
}) })
} }
} }
func makeUncompressedReducedWriteRequestBenchData(b *testing.B) []byte {
data := createDummyTimeSeries(1000)
pool := newLookupPool()
pBuf := proto.NewBuffer(nil)
seriesBuff := make([]prompb.ReducedTimeSeries, len(data))
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
seriesBuff[i].Exemplars = []prompb.ExemplarRef{{}}
}
populateReducedTimeSeries(pool, data, seriesBuff, true, true)
res, _, err := buildReducedWriteRequestWithCompression(seriesBuff, pool.getTable(), pBuf, &noopCompression{})
if err != nil {
b.Fatal(err)
}
return res
}
func makeUncompressedWriteRequestBenchData(b *testing.B) []byte {
data := createDummyTimeSeries(1000)
seriesBuff := make([]prompb.TimeSeries, len(data))
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
}
pBuf := proto.NewBuffer(nil)
populateTimeSeries(data, seriesBuff, true, true)
res, _, err := buildWriteRequestWithCompression(seriesBuff, nil, pBuf, &noopCompression{})
if err != nil {
b.Fatal(err)
}
return res
}
func BenchmarkCompressWriteRequest(b *testing.B) {
uncompV1 := makeUncompressedWriteRequestBenchData(b)
uncompV11 := makeUncompressedReducedWriteRequestBenchData(b)
// buf := make([]byte, 0)
bench := func(b *testing.B, data []byte, comp Compression) {
b.ResetTimer()
totalSize := 0
var res []byte
var err error
for i := 0; i < b.N; i++ {
res, err = comp.Compress(data)
if err != nil {
b.Fatal(err)
}
totalSize += len(res)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
b.StopTimer()
// sanity check
res, err = comp.Decompress(res)
if err != nil {
b.Fatal(err)
}
if !bytes.Equal(res, data) {
b.Fatalf("decompressed data doesn't match original")
}
}
cases := []struct {
name string
data []byte
comp Compression
}{
{"v1-go-snappy", uncompV1, &snappyCompression{}},
{"v1-snappy", uncompV1, &snappyAltCompression{}},
{"v1-s2", uncompV1, &s2Compression{}},
{"v1-ZstdFastest", uncompV1, &zstdCompression{level: zstd.SpeedFastest}},
{"v1-ZstdSpeedDef", uncompV1, &zstdCompression{level: zstd.SpeedDefault}},
{"v1-ZstdBestComp", uncompV1, &zstdCompression{level: zstd.SpeedBestCompression}},
{"v1-GzipBestComp", uncompV1, &gzipCompression{level: gzip.BestCompression}},
{"v1-GzipBestSpeed", uncompV1, &gzipCompression{level: gzip.BestSpeed}},
{"v1-GzipDefault", uncompV1, &gzipCompression{level: gzip.DefaultCompression}},
{"v1-Lzw", uncompV1, &lzwCompression{}},
{"v1-FlateBestComp", uncompV1, &flateCompression{level: flate.BestCompression}},
{"v1-FlateBestSpeed", uncompV1, &flateCompression{level: flate.BestSpeed}},
{"v1-Brotli-1", uncompV1, &brotliCompression{quality: 1}},
{"v1-Brotli-11", uncompV1, &brotliCompression{quality: 1}},
{"v1-Brotli-5", uncompV1, &brotliCompression{quality: 5}},
{"v1.1-go-snappy", uncompV11, &snappyCompression{}},
{"v1.1-snappy", uncompV11, &snappyAltCompression{}},
{"v1.1-s2", uncompV11, &s2Compression{}},
{"v1.1-ZstdFastest", uncompV11, &zstdCompression{level: zstd.SpeedFastest}},
{"v1.1-ZstdSpeedDef", uncompV11, &zstdCompression{level: zstd.SpeedDefault}},
{"v1.1-ZstdBestComp", uncompV11, &zstdCompression{level: zstd.SpeedBestCompression}},
{"v1.1-GzipBestComp", uncompV11, &gzipCompression{level: gzip.BestCompression}},
{"v1.1-GzipBestSpeed", uncompV11, &gzipCompression{level: gzip.BestSpeed}},
{"v1.1-GzipDefault", uncompV11, &gzipCompression{level: gzip.DefaultCompression}},
{"v1.1-Lzw", uncompV11, &lzwCompression{}},
{"v1.1-FlateBestComp", uncompV11, &flateCompression{level: flate.BestCompression}},
{"v1.1-FlateBestSpeed", uncompV11, &flateCompression{level: flate.BestSpeed}},
{"v1.1-Brotli-1", uncompV11, &brotliCompression{quality: 1}},
{"v1.1-Brotli-11", uncompV11, &brotliCompression{quality: 1}},
{"v1.1-Brotli-5", uncompV11, &brotliCompression{quality: 5}},
}
// Warmup buffers
for _, c := range cases {
bench(b, c.data, c.comp)
}
for _, c := range cases {
b.Run(c.name, func(b *testing.B) {
bench(b, c.data, c.comp)
})
}
}
// func BenchmarkDecompressWriteRequestGoSnappy(b *testing.B) {
// uncomp := makeUncompressedWriteRequestBenchData(b)
// buf, _ := compressSnappy(uncomp, &[]byte{})
// b.ResetTimer()
// totalSize := 0
// for i := 0; i < b.N; i++ {
// dbuf, _ := decompressSnappy(buf)
// totalSize += len(dbuf)
// b.ReportMetric(float64(totalSize)/float64(b.N), "decompressedSize/op")
// }
// }
// func BenchmarkDecompressWriteRequestSnappy(b *testing.B) {
// uncomp := makeUncompressedWriteRequestBenchData(b)
// buf, _ := compressSnappyAlt(uncomp, &[]byte{})
// b.ResetTimer()
// totalSize := 0
// for i := 0; i < b.N; i++ {
// dbuf, _ := decompressSnappyAlt(buf)
// totalSize += len(dbuf)
// b.ReportMetric(float64(totalSize)/float64(b.N), "decompressedSize/op")
// }
// }
// func BenchmarkDecompressWriteRequestS2(b *testing.B) {
// uncomp := makeUncompressedWriteRequestBenchData(b)
// buf, _ := compressSnappyAlt(uncomp, &[]byte{})
// b.ResetTimer()
// totalSize := 0
// for i := 0; i < b.N; i++ {
// dbuf, _ := decompressS2(buf)
// totalSize += len(dbuf)
// b.ReportMetric(float64(totalSize)/float64(b.N), "decompressedSize/op")
// }
// }