wip, rework compression testing after format PR was merged

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2023-11-23 19:25:00 -08:00
parent b15d1c1d81
commit 934bb2dbc6
16 changed files with 474 additions and 459 deletions

View file

@ -155,7 +155,8 @@ type flagConfig struct {
enablePerStepStats bool
enableAutoGOMAXPROCS bool
// todo: how to use the enable feature flag properly + use the remote format enum type
rwFormat int
rwFormat int
rwCompression int
prometheusURL string
corsRegexString string
@ -428,6 +429,9 @@ func main() {
a.Flag("remote-write-format", "remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format)").
Default("0").IntVar(&cfg.rwFormat)
a.Flag("remote-write-compression", "remote write compression to use, valid options: 0 (snappy), 1 (zstd), 3 (flate)").
Default("0").IntVar(&cfg.rwCompression)
promlogflag.AddFlags(a, &cfg.promlogConfig)
a.Flag("write-documentation", "Generate command line documentation. Internal use.").Hidden().Action(func(ctx *kingpin.ParseContext) error {
@ -600,7 +604,7 @@ func main() {
var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, remote.RemoteWriteFormat(cfg.rwFormat))
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, remote.RemoteWriteFormat(cfg.rwFormat), remote.RemoteWriteCompression(cfg.rwCompression))
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

View file

@ -861,14 +861,14 @@ func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_M
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression.
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
func DecodeWriteRequest(r io.Reader, c *Compressor) (*prompb.WriteRequest, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
comp := GetPooledComp()
defer PutPooledComp(comp)
comp := c.GetPooledCompressor()
defer c.PutPooledCompressor(comp)
reqBuf, err := comp.Decompress(compressed)
if err != nil {
@ -938,45 +938,45 @@ func DecodeOTLPWriteRequest(r *http.Request) (pmetricotlp.ExportRequest, error)
// DecodeMinimizedWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression.
func DecodeMinimizedWriteRequest(r io.Reader) (*prompb.MinimizedWriteRequest, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
comp := GetPooledComp()
defer PutPooledComp(comp)
reqBuf, err := comp.Decompress(compressed)
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
func DecodeMinimizedWriteRequestLen(r io.Reader) (*prompb.MinimizedWriteRequestLen, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequestLen
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
//func DecodeMinimizedWriteRequest(r io.Reader) (*prompb.MinimizedWriteRequest, error) {
// compressed, err := io.ReadAll(r)
// if err != nil {
// return nil, err
// }
//
// comp := c.GetPooledCompressor()
// defer c.PutPooledCompressor(comp)
// reqBuf, err := comp.Decompress(compressed)
// if err != nil {
// return nil, err
// }
//
// var req prompb.MinimizedWriteRequest
// if err := proto.Unmarshal(reqBuf, &req); err != nil {
// return nil, err
// }
//
// return &req, nil
//}
//
//func DecodeMinimizedWriteRequestLen(r io.Reader) (*prompb.MinimizedWriteRequestLen, error) {
// compressed, err := io.ReadAll(r)
// if err != nil {
// return nil, err
// }
//
// reqBuf, err := snappy.Decode(nil, compressed)
// if err != nil {
// return nil, err
// }
//
// var req prompb.MinimizedWriteRequestLen
// if err := proto.Unmarshal(reqBuf, &req); err != nil {
// return nil, err
// }
//
// return &req, nil
//}
func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequest) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{

View file

@ -550,23 +550,25 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
}
func TestDecodeWriteRequest(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
c := NewCompressor(Snappy)
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, &c)
require.NoError(t, err)
actual, err := DecodeWriteRequest(bytes.NewReader(buf))
actual, err := DecodeWriteRequest(bytes.NewReader(buf), &c)
require.NoError(t, err)
require.Equal(t, writeRequestFixture, actual)
}
func TestDecodeMinWriteRequest(t *testing.T) {
buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil)
require.NoError(t, err)
actual, err := DecodeMinimizedWriteRequest(bytes.NewReader(buf))
require.NoError(t, err)
require.Equal(t, writeRequestMinimizedFixture, actual)
}
//func TestDecodeMinWriteRequest(t *testing.T) {
// c := NewCompressor(Snappy)
// buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil)
//
// require.NoError(t, err)
//
// actual, err := DecodeMinimizedWriteRequest(bytes.NewReader(buf))
// require.NoError(t, err)
// require.Equal(t, writeRequestMinimizedFixture, actual)
//}
func TestNilHistogramProto(t *testing.T) {
// This function will panic if it impromperly handles nil

View file

@ -3,6 +3,7 @@ package remote
import (
"bytes"
"compress/lzw"
"fmt"
"io"
"sync"
@ -24,67 +25,87 @@ type Compression interface {
// hacky globals to easily tweak the compression algorithm and run some benchmarks
type CompAlgorithm int
var UseAlgorithm = Snappy
//var UseAlgorithm = Snappy
const (
Snappy CompAlgorithm = iota
SnappyAlt
S2
ZstdFast
ZstdDefault
ZstdBestComp
Lzw
FlateFast
FlateDefault
FlateComp
BrotliFast
BrotliComp
BrotliDefault
)
//const (
// Snappy CompAlgorithm = iota
// SnappyAlt
// S2
// ZstdFast
// ZstdDefault
// ZstdBestComp
// Lzw
// FlateFast
// FlateDefault
// FlateComp
// BrotliFast
// BrotliComp
// BrotliDefault
//)
// sync.Pool-ed createComp
var compPool = sync.Pool{
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
New: func() interface{} { return createComp() },
//var compPool = sync.Pool{
// // New optionally specifies a function to generate
// // a value when Get would otherwise return nil.
// New: func() interface{} { return createComp() },
//}
type Compressor struct {
Compression
pool sync.Pool
}
func GetPooledComp() Compression {
return compPool.Get().(Compression)
func NewCompressor(compType RemoteWriteCompression) Compressor {
var c Compressor
c.Compression = createComp(compType)
c.pool = sync.Pool{
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
New: func() interface{} { return createComp(compType) },
}
return c
}
func PutPooledComp(c Compression) {
compPool.Put(c)
func (comp Compressor) GetPooledCompressor() Compression {
return comp.pool.Get().(Compression)
}
var createComp func() Compression = func() Compression {
switch UseAlgorithm {
func (comp Compressor) PutPooledCompressor(c Compression) {
comp.pool.Put(c)
}
func createComp(comp RemoteWriteCompression) Compression {
switch comp {
case Snappy:
return &snappyCompression{}
case SnappyAlt:
return &snappyAltCompression{}
case S2:
return &s2Compression{}
case ZstdDefault:
case Zstd:
return &zstdCompression{level: zstd.SpeedDefault}
case ZstdFast:
return &zstdCompression{level: zstd.SpeedFastest}
case ZstdBestComp:
return &zstdCompression{level: zstd.SpeedBestCompression}
case Lzw:
return &lzwCompression{}
case FlateFast:
return &flateCompression{level: flate.BestSpeed}
case FlateComp:
return &flateCompression{level: flate.BestCompression}
case FlateDefault:
case Flate:
return &flateCompression{level: flate.DefaultCompression}
case BrotliFast:
return &brotliCompression{quality: brotli.BestSpeed}
case BrotliDefault:
return &brotliCompression{quality: brotli.DefaultCompression}
case BrotliComp:
return &brotliCompression{quality: brotli.BestCompression}
//case SnappyAlt:
// return &snappyAltCompression{}
//case S2:
// return &s2Compression{}
//case ZstdDefault:
// return &zstdCompression{level: zstd.SpeedDefault}
//case ZstdFast:
// return &zstdCompression{level: zstd.SpeedFastest}
//case ZstdBestComp:
// return &zstdCompression{level: zstd.SpeedBestCompression}
//case Lzw:
// return &lzwCompression{}
//case FlateFast:
// return &flateCompression{level: flate.BestSpeed}
//case FlateComp:
// return &flateCompression{level: flate.BestCompression}
//case FlateDefault:
// return &flateCompression{level: flate.DefaultCompression}
//case BrotliFast:
// return &brotliCompression{quality: brotli.BestSpeed}
//case BrotliDefault:
// return &brotliCompression{quality: brotli.DefaultCompression}
//case BrotliComp:
// return &brotliCompression{quality: brotli.BestCompression}
default:
panic("unknown compression algorithm")
}
@ -110,10 +131,17 @@ func (s *snappyCompression) Compress(data []byte) ([]byte, error) {
if n := snappy.MaxEncodedLen(len(data)); n > cap(s.buf) {
s.buf = make([]byte, n)
}
uncompressed, err := snappy.Decode(nil, compressed)
if err != nil {
fmt.Println("error uncompressing immediately after compressing")
}
fmt.Println("len uncompressed: ", len(uncompressed))
fmt.Println("returning snappy compressed data")
return compressed, nil
}
func (s *snappyCompression) Decompress(data []byte) ([]byte, error) {
s.buf = s.buf[0:cap(s.buf)]
fmt.Println("len data: ", len(data))
uncompressed, err := snappy.Decode(s.buf, data)
if len(uncompressed) > cap(s.buf) {
s.buf = uncompressed

View file

@ -1,35 +1,40 @@
package remote
import (
"github.com/stretchr/testify/require"
"os"
"testing"
"time"
)
func TestCompressions(t *testing.T) {
data := makeUncompressedReducedWriteRequestBenchData(t)
data, err := makeUncompressedWriteRequestBenchData()
require.NoError(t, err)
tc := []struct {
name string
algo CompAlgorithm
algo RemoteWriteCompression
}{
{"Snappy", Snappy},
{"SnappyAlt", SnappyAlt},
{"S2", S2},
{"ZstdFast", ZstdFast},
{"ZstdDefault", ZstdDefault},
{"ZstdBestComp", ZstdBestComp},
{"Lzw", Lzw},
{"FlateFast", FlateFast},
{"FlateComp", FlateComp},
{"BrotliFast", BrotliFast},
{"BrotliComp", BrotliComp},
{"BrotliDefault", BrotliDefault},
{"Zstd", Zstd},
{"Flate", Flate},
//{"SnappyAlt", SnappyAlt},
//{"S2", S2},
//{"ZstdFast", ZstdFast},
//{"ZstdDefault", ZstdDefault},
//{"ZstdBestComp", ZstdBestComp},
//{"Lzw", Lzw},
//{"FlateFast", FlateFast},
//{"FlateComp", FlateComp},
//{"BrotliFast", BrotliFast},
//{"BrotliComp", BrotliComp},
//{"BrotliDefault", BrotliDefault},
}
for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
UseAlgorithm = c.algo
comp := createComp()
//UseAlgorithm = c.algo
//comp := createComp()
comp := NewCompressor(c.algo)
compressed, err := comp.Compress(data)
if err != nil {
t.Fatal(err)
@ -49,61 +54,64 @@ func TestCompressions(t *testing.T) {
func BenchmarkCompressions_V1(b *testing.B) {
// Synthetic data, attempts to be representative
data := makeUncompressedWriteRequestBenchData(b)
data, err := makeUncompressedWriteRequestBenchData()
require.NoError(b, err)
benchmarkCompressionsForData(b, [][]byte{data})
}
func BenchmarkCompressions_V11(b *testing.B) {
// Synthetic data, attempts to be representative
data := makeUncompressedWriteRequestBenchData(b)
benchmarkCompressionsForData(b, [][]byte{data})
}
//func BenchmarkCompressions_V11(b *testing.B) {
// // Synthetic data, attempts to be representative
// data, err := makeUncompressedWriteRequestBenchData()
// require.NoError(b, err)
// benchmarkCompressionsForData(b, [][]byte{data})
//}
// Needs the dataset to be present in /home/nicolas/rw11data/v11_raw/
func BenchmarkCompressions_V11_FileDataSet(b *testing.B) {
datas := readAllFiles("/home/nicolas/rw11data/v11_raw/")
if len(datas) != 10 {
b.Fatal("unexpected number of files")
}
benchmarkCompressionsForData(b, datas)
}
//func BenchmarkCompressions_V11_FileDataSet(b *testing.B) {
// datas := readAllFiles("/home/nicolas/rw11data/v11_raw/")
// if len(datas) != 10 {
// b.Fatal("unexpected number of files")
// }
// benchmarkCompressionsForData(b, datas)
//}
// Needs the dataset to be present in /home/nicolas/rw11data/v1_raw/
func BenchmarkCompressions_V1_FileDataSet(b *testing.B) {
datas := readAllFiles("/home/nicolas/rw11data/v1_raw/")
if len(datas) != 10 {
b.Fatal("unexpected number of files")
}
benchmarkCompressionsForData(b, datas)
}
//func BenchmarkCompressions_V1_FileDataSet(b *testing.B) {
// datas := readAllFiles("/home/nicolas/rw11data/v1_raw/")
// if len(datas) != 10 {
// b.Fatal("unexpected number of files")
// }
// benchmarkCompressionsForData(b, datas)
//}
func benchmarkCompressionsForData(b *testing.B, datas [][]byte) {
bc := []struct {
name string
algo CompAlgorithm
algo RemoteWriteCompression
}{
{"Snappy", Snappy},
{"SnappyAlt", SnappyAlt},
{"S2", S2},
{"ZstdFast", ZstdFast},
{"ZstdDefault", ZstdDefault},
{"ZstdBestComp", ZstdBestComp},
{"Lzw", Lzw},
{"FlateFast", FlateFast},
{"FlateDefault", FlateDefault},
{"FlateComp", FlateComp},
{"BrotliFast", BrotliFast},
{"BrotliDefault", BrotliDefault},
// {"BrotliComp", BrotliComp},
{"Zstd", Zstd},
{"Flate", Flate},
//{"SnappyAlt", SnappyAlt},
//{"S2", S2},
//{"ZstdFast", ZstdFast},
//{"ZstdDefault", ZstdDefault},
//{"ZstdBestComp", ZstdBestComp},
//{"Lzw", Lzw},
//{"FlateFast", FlateFast},
//{"FlateComp", FlateComp},
//{"BrotliFast", BrotliFast},
//{"BrotliComp", BrotliComp},
//{"BrotliDefault", BrotliDefault},
}
comps := make(map[CompAlgorithm]Compression)
decomps := make(map[CompAlgorithm]Compression)
comps := make(map[RemoteWriteCompression]*Compressor)
decomps := make(map[RemoteWriteCompression]*Compressor)
for _, c := range bc {
UseAlgorithm = c.algo
comp := createComp()
decomp := createComp()
comps[c.algo] = comp
decomps[c.algo] = decomp
comp := NewCompressor(c.algo)
decomp := NewCompressor(c.algo)
comps[c.algo] = &comp
decomps[c.algo] = &decomp
// warmup
for i := 0; i < 2; i++ {
for _, data := range datas {

View file

@ -397,6 +397,14 @@ const (
MinLen // symbols are now just offsets, and we encode lengths as varints in the large symbols string (which is also now a byte slice)
)
type RemoteWriteCompression int64
const (
Snappy RemoteWriteCompression = iota
Zstd
Flate
)
// QueueManager manages a queue of samples to be sent to the Storage
// indicated by the provided WriteClient. Implements writeTo interface
// used by WAL Watcher.
@ -426,6 +434,7 @@ type QueueManager struct {
seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first.
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
compressor Compressor
shards *shards
numShards int
reshardChan chan int
@ -463,6 +472,7 @@ func NewQueueManager(
enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool,
rwFormat RemoteWriteFormat,
rwComp RemoteWriteCompression,
) *QueueManager {
if logger == nil {
logger = log.NewNopLogger()
@ -487,7 +497,8 @@ func NewQueueManager(
sendNativeHistograms: enableNativeHistogramRemoteWrite,
// TODO: we should eventually set the format via content negotiation,
// so this field would be the desired format, maybe with a fallback?
rwFormat: rwFormat,
rwFormat: rwFormat,
compressor: NewCompressor(rwComp),
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
@ -545,8 +556,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples.
comp := createComp()
req, _, err := buildWriteRequest(nil, metadata, pBuf, comp)
req, _, err := buildWriteRequest(nil, metadata, pBuf, t.compressor.GetPooledCompressor())
if err != nil {
return err
}
@ -1368,7 +1378,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
pBuf = proto.NewBuffer(nil)
pBufRaw []byte
buf []byte
comp = createComp()
comp = s.qm.compressor.GetPooledCompressor()
)
if s.qm.sendExemplars {
max += int(float64(max) * 0.1)
@ -1432,16 +1442,16 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case Base1:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, comp)
case Min32Optimized:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf, comp)
symbolTable.clear()
case MinLen:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesLen(&symbolTable, batch, pendingMinLenData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, comp)
symbolTable.clear()
}
queue.ReturnForReuse(batch)
@ -1464,14 +1474,14 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf, comp)
symbolTable.clear()
case MinLen:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesLen(&symbolTable, batch, pendingMinLenData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, comp)
symbolTable.clear()
}
}
@ -1532,24 +1542,24 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte) {
func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte, comp Compression) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequest(samples, labels, pBuf, buf)
req, highest, err := buildMinimizedWriteRequest(samples, labels, pBuf, buf, comp)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) sendMinLenSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesLen, labels []byte, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
func (s *shards) sendMinLenSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesLen, labels []byte, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, comp Compression) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequestLen(samples, labels, pBuf, buf)
req, highest, err := buildMinimizedWriteRequestLen(samples, labels, pBuf, comp)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
@ -1772,11 +1782,7 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
return buildWriteRequestWithCompression(samples, metadata, pBuf, comp)
}
func buildWriteRequestWithCompression(samples []prompb.TimeSeries,
metadata []prompb.MetricMetadata,
pBuf *proto.Buffer,
compressor Compression,
) ([]byte, int64, error) {
func buildWriteRequestWithCompression(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, compressor Compression) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1806,18 +1812,7 @@ func buildWriteRequestWithCompression(samples []prompb.TimeSeries,
return nil, highest, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, pBuf.Bytes())
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
compressed, err := compressor.Compress(pBuf.Bytes())
return compressed, highest, err
}
@ -1826,48 +1821,6 @@ type offLenPair struct {
Len uint32
}
func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, comp Compression) ([]byte, int64, error) {
return buildReducedWriteRequestWithCompression(samples, labels, pBuf, comp)
}
func buildReducedWriteRequestWithCompression(samples []prompb.ReducedTimeSeries,
labels map[uint64]string,
pBuf *proto.Buffer,
compress Compression,
) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.WriteRequestWithRefs{
StringSymbolTable: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil {
return nil, 0, err
}
compressed, err := compress.Compress(pBuf.Bytes())
return compressed, highest, err
}
type rwSymbolTable struct {
symbols []byte
symbolsMap map[string]offLenPair
@ -1926,7 +1879,7 @@ func (r *rwSymbolTable) clear() {
r.symbols = r.symbols[:0]
}
func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf, buf *[]byte) ([]byte, int64, error) {
func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf, buf *[]byte, comp Compression) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1955,23 +1908,11 @@ func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels str
}
*pBuf = data
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, data)
if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
compressed, err := comp.Compress(*pBuf)
return compressed, highest, nil
}
func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labels []byte, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labels []byte, pBuf *proto.Buffer, comp Compression) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -2001,18 +1942,6 @@ func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labe
return nil, 0, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, pBuf.Bytes())
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
compressed, err := comp.Compress(pBuf.Bytes())
return compressed, highest, nil
}

View file

@ -15,7 +15,6 @@ package remote
import (
"bytes"
"compress/flate"
"context"
"fmt"
"math"
@ -32,7 +31,6 @@ import (
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus"
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
@ -71,13 +69,14 @@ func TestSampleDelivery(t *testing.T) {
histograms bool
floatHistograms bool
rwFormat RemoteWriteFormat
rwComp RemoteWriteCompression
}{
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
//{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
//{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
//{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
//{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
//{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
//
{rwFormat: Min32Optimized, samples: true, exemplars: false, histograms: false, name: "interned samples only"},
{rwFormat: Min32Optimized, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "interned samples, exemplars, and histograms"},
{rwFormat: Min32Optimized, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"},
@ -109,7 +108,7 @@ func TestSampleDelivery(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.rwFormat)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.rwFormat, tc.rwComp)
defer s.Close()
var (
@ -148,6 +147,7 @@ func TestSampleDelivery(t *testing.T) {
qm.StoreSeries(series, 0)
// Send first half of data.
fmt.Println("Waiting for first half of data")
c.expectSamples(samples[:len(samples)/2], series)
c.expectExemplars(exemplars[:len(exemplars)/2], series)
c.expectHistograms(histograms[:len(histograms)/2], series)
@ -157,6 +157,8 @@ func TestSampleDelivery(t *testing.T) {
qm.AppendHistograms(histograms[:len(histograms)/2])
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
c.waitForExpectedData(t)
fmt.Println("got all of first half of data")
fmt.Println("Waiting for second half of data")
// Send second half of data.
c.expectSamples(samples[len(samples)/2:], series)
@ -168,6 +170,7 @@ func TestSampleDelivery(t *testing.T) {
qm.AppendHistograms(histograms[len(histograms)/2:])
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
c.waitForExpectedData(t)
fmt.Println("got all of second half of data")
})
}
}
@ -181,7 +184,25 @@ func TestMetadataDelivery(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, 0)
m := NewQueueManager(metrics,
nil,
nil,
nil,
dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg,
mcfg,
labels.EmptyLabels(),
nil,
c,
defaultFlushDeadline,
newPool(),
newHighestTimestampMetric(),
nil,
false,
false,
0,
0)
m.Start()
defer m.Stop()
@ -222,7 +243,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, 0)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -268,7 +289,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, 0)
m.StoreSeries(series, 0)
m.Start()
@ -290,7 +311,7 @@ func TestShutdown(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0)
@ -328,7 +349,7 @@ func TestSeriesReset(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy)
for i := 0; i < numSegments; i++ {
series := []record.RefSeries{}
for j := 0; j < numSeries; j++ {
@ -341,6 +362,7 @@ func TestSeriesReset(t *testing.T) {
require.Equal(t, numSegments*numSeries/2, len(m.seriesLabels))
}
// doesn't end
func TestReshard(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
@ -359,7 +381,7 @@ func TestReshard(t *testing.T) {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, Snappy)
m.StoreSeries(series, 0)
m.Start()
@ -399,7 +421,7 @@ func TestReshardRaceWithStop(t *testing.T) {
go func() {
for {
metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat)
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, Snappy)
m.Start()
h.Unlock()
h.Lock()
@ -438,7 +460,7 @@ func TestReshardPartialBatch(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat)
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, Snappy)
m.StoreSeries(series, 0)
m.Start()
@ -487,7 +509,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat)
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, Snappy)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -518,7 +540,7 @@ func TestReleaseNoninternedString(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient(rwFormat)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, Snappy)
m.Start()
defer m.Stop()
@ -568,7 +590,7 @@ func TestShouldReshard(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
// todo: test with new proto type(s)
client := NewTestWriteClient(Base1)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy)
m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut)
@ -966,7 +988,7 @@ func BenchmarkSampleSend(b *testing.B) {
metrics := newQueueManagerMetrics(nil, "", "")
// todo: test with new proto type(s)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy)
m.StoreSeries(series, 0)
// These should be received by the client.
@ -1013,7 +1035,7 @@ func BenchmarkStartup(b *testing.B) {
// todo: test with new proto type(s)
m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, Base1)
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run()
@ -1097,7 +1119,7 @@ func TestCalculateDesiredShards(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
// todo: test with new proto type(s)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy)
// Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual
@ -1175,7 +1197,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
// todo: test with new proto type(s)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy)
for _, tc := range []struct {
name string
@ -1495,67 +1517,67 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
})
}
func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
//func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
//
// type testcase struct {
// batch []timeSeries
// }
// testCases := []testcase{
// {createDummyTimeSeries(2)},
// {createDummyTimeSeries(10)},
// {createDummyTimeSeries(100)},
// }
// for _, tc := range testCases {
// symbolTable := newRwSymbolTable()
// seriesBuff := make([]prompb.MinimizedTimeSeries, len(tc.batch))
// for i := range seriesBuff {
// seriesBuff[i].Samples = []prompb.Sample{{}}
// seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
// }
// pBuf := []byte{}
//
// // Warmup buffers
// for i := 0; i < 10; i++ {
// populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
// buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff)
// }
//
// b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
// totalSize := 0
// for j := 0; j < b.N; j++ {
// populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
// b.ResetTimer()
// req, _, err := buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff)
// if err != nil {
// b.Fatal(err)
// }
// symbolTable.clear()
// totalSize += len(req)
// b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
// }
// })
// }
//}
type testcase struct {
batch []timeSeries
}
testCases := []testcase{
{createDummyTimeSeries(2)},
{createDummyTimeSeries(10)},
{createDummyTimeSeries(100)},
}
for _, tc := range testCases {
symbolTable := newRwSymbolTable()
seriesBuff := make([]prompb.MinimizedTimeSeries, len(tc.batch))
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
}
pBuf := []byte{}
//func makeUncompressedReducedWriteRequestBenchData(b testing.TB) []byte {
// data := createDummyTimeSeries(1000)
// pool := newLookupPool()
// pBuf := proto.NewBuffer(nil)
// seriesBuff := make([]prompb.ReducedTimeSeries, len(data))
// for i := range seriesBuff {
// seriesBuff[i].Samples = []prompb.Sample{{}}
// seriesBuff[i].Exemplars = []prompb.ExemplarRef{{}}
// }
//
// populateReducedTimeSeries(pool, data, seriesBuff, true, true)
// res, _, err := buildReducedWriteRequestWithCompression(seriesBuff, pool.getTable(), pBuf, &noopCompression{})
// if err != nil {
// b.Fatal(err)
// }
// return res
//}
// Warmup buffers
for i := 0; i < 10; i++ {
populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff)
}
b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
totalSize := 0
for j := 0; j < b.N; j++ {
populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
b.ResetTimer()
req, _, err := buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff)
if err != nil {
b.Fatal(err)
}
symbolTable.clear()
totalSize += len(req)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
})
}
}
func makeUncompressedReducedWriteRequestBenchData(b testing.TB) []byte {
data := createDummyTimeSeries(1000)
pool := newLookupPool()
pBuf := proto.NewBuffer(nil)
seriesBuff := make([]prompb.ReducedTimeSeries, len(data))
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
seriesBuff[i].Exemplars = []prompb.ExemplarRef{{}}
}
populateReducedTimeSeries(pool, data, seriesBuff, true, true)
res, _, err := buildReducedWriteRequestWithCompression(seriesBuff, pool.getTable(), pBuf, &noopCompression{})
if err != nil {
b.Fatal(err)
}
return res
}
func makeUncompressedWriteRequestBenchData(b *testing.B) []byte {
func makeUncompressedWriteRequestBenchData() ([]byte, error) {
data := createDummyTimeSeries(1000)
seriesBuff := make([]prompb.TimeSeries, len(data))
for i := range seriesBuff {
@ -1567,34 +1589,37 @@ func makeUncompressedWriteRequestBenchData(b *testing.B) []byte {
populateTimeSeries(data, seriesBuff, true, true)
res, _, err := buildWriteRequestWithCompression(seriesBuff, nil, pBuf, &noopCompression{})
if err != nil {
b.Fatal(err)
return nil, err
}
return res
return res, nil
}
func BenchmarkCompressWriteRequest(b *testing.B) {
uncompV1 := makeUncompressedWriteRequestBenchData(b)
uncompV11 := makeUncompressedReducedWriteRequestBenchData(b)
uncompV1, err := makeUncompressedWriteRequestBenchData()
require.NoError(b, err)
//uncompV11 := makeUncompressedReducedWriteRequestBenchData(b)
// buf := make([]byte, 0)
bench := func(b *testing.B, data []byte, comp Compression) {
bench := func(b *testing.B, name string, data []byte, comp Compression) {
b.ResetTimer()
totalSize := 0
var res []byte
var err error
for i := 0; i < b.N; i++ {
fmt.Println("data len: ", len(data))
res, err = comp.Compress(data)
if err != nil {
b.Fatal(err)
}
totalSize += len(res)
fmt.Println("compressed len:", len(res))
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
b.StopTimer()
// sanity check
res, err = comp.Decompress(res)
if err != nil {
b.Fatal(err)
b.Fatal(err, fmt.Sprint("compression: ", name))
}
if !bytes.Equal(res, data) {
b.Fatalf("decompressed data doesn't match original")
@ -1604,43 +1629,44 @@ func BenchmarkCompressWriteRequest(b *testing.B) {
cases := []struct {
name string
data []byte
comp Compression
comp Compressor
}{
{"v1-go-snappy", uncompV1, &snappyCompression{}},
{"v1-snappy", uncompV1, &snappyAltCompression{}},
{"v1-s2", uncompV1, &s2Compression{}},
{"v1-ZstdFastest", uncompV1, &zstdCompression{level: zstd.SpeedFastest}},
{"v1-ZstdSpeedDef", uncompV1, &zstdCompression{level: zstd.SpeedDefault}},
{"v1-ZstdBestComp", uncompV1, &zstdCompression{level: zstd.SpeedBestCompression}},
{"v1-Lzw", uncompV1, &lzwCompression{}},
{"v1-FlateBestComp", uncompV1, &flateCompression{level: flate.BestCompression}},
{"v1-FlateBestSpeed", uncompV1, &flateCompression{level: flate.BestSpeed}},
{"v1-Brotli-1", uncompV1, &brotliCompression{quality: 1}},
{"v1-Brotli-11", uncompV1, &brotliCompression{quality: 1}},
{"v1-Brotli-5", uncompV1, &brotliCompression{quality: 5}},
//{"v1-go-snappy", uncompV1, &snappyCompression{}},
//{"v1-snappy", uncompV1, &snappyAltCompression{}},
{"v1-s2", uncompV1, NewCompressor(Snappy)},
//{"v1-ZstdFastest", uncompV1, &zstdCompression{level: zstd.SpeedFastest}},
//{"v1-ZstdSpeedDef", uncompV1, &zstdCompression{level: zstd.SpeedDefault}},
//{"v1-ZstdBestComp", uncompV1, &zstdCompression{level: zstd.SpeedBestCompression}},
//{"v1-Lzw", uncompV1, &lzwCompression{}},
//{"v1-FlateBestComp", uncompV1, &flateCompression{level: flate.BestCompression}},
//{"v1-FlateBestSpeed", uncompV1, &flateCompression{level: flate.BestSpeed}},
//{"v1-Brotli-1", uncompV1, &brotliCompression{quality: 1}},
//{"v1-Brotli-11", uncompV1, &brotliCompression{quality: 1}},
//{"v1-Brotli-5", uncompV1, &brotliCompression{quality: 5}},
{"v1.1-go-snappy", uncompV11, &snappyCompression{}},
{"v1.1-snappy", uncompV11, &snappyAltCompression{}},
{"v1.1-s2", uncompV11, &s2Compression{}},
{"v1.1-ZstdFastest", uncompV11, &zstdCompression{level: zstd.SpeedFastest}},
{"v1.1-ZstdSpeedDef", uncompV11, &zstdCompression{level: zstd.SpeedDefault}},
{"v1.1-ZstdBestComp", uncompV11, &zstdCompression{level: zstd.SpeedBestCompression}},
{"v1.1-Lzw", uncompV11, &lzwCompression{}},
{"v1.1-FlateBestComp", uncompV11, &flateCompression{level: flate.BestCompression}},
{"v1.1-FlateBestSpeed", uncompV11, &flateCompression{level: flate.BestSpeed}},
{"v1.1-Brotli-1", uncompV11, &brotliCompression{quality: 1}},
{"v1.1-Brotli-11", uncompV11, &brotliCompression{quality: 1}},
{"v1.1-Brotli-5", uncompV11, &brotliCompression{quality: 5}},
//{"v1.1-go-snappy", uncompV11, &snappyCompression{}},
//{"v1.1-snappy", uncompV11, &snappyAltCompression{}},
//{"v1.1-s2", uncompV11, &s2Compression{}},
//{"v1.1-ZstdFastest", uncompV11, &zstdCompression{level: zstd.SpeedFastest}},
//{"v1.1-ZstdSpeedDef", uncompV11, &zstdCompression{level: zstd.SpeedDefault}},
//{"v1.1-ZstdBestComp", uncompV11, &zstdCompression{level: zstd.SpeedBestCompression}},
//{"v1.1-Lzw", uncompV11, &lzwCompression{}},
//{"v1.1-FlateBestComp", uncompV11, &flateCompression{level: flate.BestCompression}},
//{"v1.1-FlateBestSpeed", uncompV11, &flateCompression{level: flate.BestSpeed}},
//{"v1.1-Brotli-1", uncompV11, &brotliCompression{quality: 1}},
//{"v1.1-Brotli-11", uncompV11, &brotliCompression{quality: 1}},
//{"v1.1-Brotli-5", uncompV11, &brotliCompression{quality: 5}},
}
// Warmup buffers
for _, c := range cases {
bench(b, c.data, c.comp)
bench(b, c.name, c.data, c.comp)
}
fmt.Println("done warm up")
for _, c := range cases {
b.Run(c.name, func(b *testing.B) {
bench(b, c.data, c.comp)
bench(b, c.name, c.data, c.comp)
})
}
}

View file

@ -92,7 +92,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
for _, tc := range cases {
t.Run("", func(t *testing.T) {
// todo: test with new format type(s)?
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteReadConfigs: tc.cfgs,

View file

@ -62,7 +62,7 @@ type Storage struct {
}
// NewStorage returns a remote.Storage.
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat) *Storage {
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat, rwCompression RemoteWriteCompression) *Storage {
if l == nil {
l = log.NewNopLogger()
}
@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
logger: logger,
localStartTimeCallback: stCallback,
}
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, rwFormat)
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, rwFormat, rwCompression)
return s
}

View file

@ -28,7 +28,7 @@ func TestStorageLifecycle(t *testing.T) {
dir := t.TempDir()
// todo: test with new format type(s)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -56,7 +56,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
dir := t.TempDir()
// todo: test with new format type(s)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
@ -78,7 +78,7 @@ func TestFilterExternalLabels(t *testing.T) {
dir := t.TempDir()
// todo: test with new format type(s)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{
@ -104,7 +104,7 @@ func TestIgnoreExternalLabels(t *testing.T) {
dir := t.TempDir()
// todo: test with new format type(s)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{

View file

@ -66,6 +66,7 @@ type WriteStorage struct {
dir string
queues map[string]*QueueManager
rwFormat RemoteWriteFormat
rwComp RemoteWriteCompression
samplesIn *ewmaRate
flushDeadline time.Duration
interner *pool
@ -77,13 +78,14 @@ type WriteStorage struct {
}
// NewWriteStorage creates and runs a WriteStorage.
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat) *WriteStorage {
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat, rwCompression RemoteWriteCompression) *WriteStorage {
if logger == nil {
logger = log.NewNopLogger()
}
rws := &WriteStorage{
queues: make(map[string]*QueueManager),
rwFormat: rwFormat,
rwComp: rwCompression,
watcherMetrics: wlog.NewWatcherMetrics(reg),
liveReaderMetrics: wlog.NewLiveReaderMetrics(reg),
logger: logger,
@ -201,6 +203,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rwConf.SendExemplars,
rwConf.SendNativeHistograms,
rws.rwFormat,
rws.rwComp,
)
// Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash)

View file

@ -48,15 +48,18 @@ type writeHandler struct {
// The handler will accept the new format, but it can still accept the old one
// TODO: this should eventually be via content negotiation
rwFormat RemoteWriteFormat
//rwComp RemoteWriteCompression
comp Compressor
}
// NewWriteHandler creates a http.Handler that accepts remote write requests and
// writes them to the provided appendable.
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, rwFormat RemoteWriteFormat) http.Handler {
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, rwFormat RemoteWriteFormat, rwComp RemoteWriteCompression) http.Handler {
h := &writeHandler{
logger: logger,
appendable: appendable,
rwFormat: rwFormat,
comp: NewCompressor(rwComp),
samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "api",
@ -79,11 +82,11 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO: this should eventually be done via content negotiation/looking at the header
switch h.rwFormat {
case Base1:
req, err = DecodeWriteRequest(r.Body)
case Min32Optimized:
reqMin, err = DecodeMinimizedWriteRequest(r.Body)
case MinLen:
reqMinLen, err = DecodeMinimizedWriteRequestLen(r.Body)
req, err = DecodeWriteRequest(r.Body, &h.comp)
//case Min32Optimized:
// reqMin, err = DecodeMinimizedWriteRequest(r.Body, &h.comp)
//case MinLen:
// reqMinLen, err = DecodeMinimizedWriteRequestLen(r.Body)
}
if err != nil {

View file

@ -38,7 +38,8 @@ import (
)
func TestRemoteWriteHandler(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
c := NewCompressor(Snappy)
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, &c)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -46,7 +47,7 @@ func TestRemoteWriteHandler(t *testing.T) {
appendable := &mockAppendable{}
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -84,61 +85,62 @@ func TestRemoteWriteHandler(t *testing.T) {
}
}
func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue)
require.NoError(t, err)
appendable := &mockAppendable{}
// TODO: test with other proto format(s)
handler := NewWriteHandler(nil, nil, appendable, Min32Optimized)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
require.Equal(t, http.StatusNoContent, resp.StatusCode)
i := 0
j := 0
k := 0
// the reduced write request is equivalent to the write request fixture.
// we can use it for
for _, ts := range writeRequestFixture.Timeseries {
ls := labelProtosToLabels(ts.Labels)
for _, s := range ts.Samples {
require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
i++
}
for _, e := range ts.Exemplars {
exemplarLabels := labelProtosToLabels(e.Labels)
require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++
}
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fh := FloatHistogramProtoToFloatHistogram(hp)
require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else {
h := HistogramProtoToHistogram(hp)
require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
}
k++
}
}
}
//func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
// buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil)
// require.NoError(t, err)
//
// req, err := http.NewRequest("", "", bytes.NewReader(buf))
// req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue)
// require.NoError(t, err)
//
// appendable := &mockAppendable{}
// // TODO: test with other proto format(s)
// handler := NewWriteHandler(nil, nil, appendable, Min32Optimized)
//
// recorder := httptest.NewRecorder()
// handler.ServeHTTP(recorder, req)
//
// resp := recorder.Result()
// require.Equal(t, http.StatusNoContent, resp.StatusCode)
//
// i := 0
// j := 0
// k := 0
// // the reduced write request is equivalent to the write request fixture.
// // we can use it for
// for _, ts := range writeRequestFixture.Timeseries {
// ls := labelProtosToLabels(ts.Labels)
// for _, s := range ts.Samples {
// require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
// i++
// }
//
// for _, e := range ts.Exemplars {
// exemplarLabels := labelProtosToLabels(e.Labels)
// require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
// j++
// }
//
// for _, hp := range ts.Histograms {
// if hp.IsFloatHistogram() {
// fh := FloatHistogramProtoToFloatHistogram(hp)
// require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
// } else {
// h := HistogramProtoToHistogram(hp)
// require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
// }
//
// k++
// }
// }
//}
func TestOutOfOrderSample(t *testing.T) {
c := NewCompressor(Snappy)
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
}}, nil, nil, nil)
}}, nil, nil, &c)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -148,7 +150,7 @@ func TestOutOfOrderSample(t *testing.T) {
latestSample: 100,
}
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -161,10 +163,11 @@ func TestOutOfOrderSample(t *testing.T) {
// don't fail on ingestion errors since the exemplar storage is
// still experimental.
func TestOutOfOrderExemplar(t *testing.T) {
c := NewCompressor(Snappy)
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
}}, nil, nil, nil)
}}, nil, nil, &c)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -174,7 +177,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
latestExemplar: 100,
}
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -185,10 +188,11 @@ func TestOutOfOrderExemplar(t *testing.T) {
}
func TestOutOfOrderHistogram(t *testing.T) {
c := NewCompressor(Snappy)
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
}}, nil, nil, nil)
}}, nil, nil, &c)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -198,7 +202,7 @@ func TestOutOfOrderHistogram(t *testing.T) {
latestHistogram: 100,
}
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -210,6 +214,8 @@ func TestOutOfOrderHistogram(t *testing.T) {
func BenchmarkRemoteWritehandler(b *testing.B) {
const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte"
var reqs []*http.Request
c := NewCompressor(Snappy)
for i := 0; i < b.N; i++ {
num := strings.Repeat(strconv.Itoa(i), 16)
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
@ -218,7 +224,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
{Name: "test_label_name_" + num, Value: labelValue + num},
},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
}}, nil, nil, nil)
}}, nil, nil, &c)
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(b, err)
@ -227,7 +233,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
appendable := &mockAppendable{}
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy)
recorder := httptest.NewRecorder()
b.ResetTimer()
@ -237,7 +243,9 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
}
func TestCommitErr(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
c := NewCompressor(Snappy)
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, &c)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -247,7 +255,7 @@ func TestCommitErr(t *testing.T) {
commitErr: fmt.Errorf("commit error"),
}
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -273,9 +281,10 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
require.NoError(b, db.Close())
})
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Base1)
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Base1, Snappy)
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil)
c := NewCompressor(Snappy)
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, &c)
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))

View file

@ -118,7 +118,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) {
for _, tc := range cases {
// todo: test with new format type(s)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Base1)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Base1, Snappy)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: tc.cfgs,
@ -141,7 +141,7 @@ func TestRestartOnNameChange(t *testing.T) {
require.NoError(t, err)
// todo: test with new format type(s)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Base1)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Base1, Snappy)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
@ -167,7 +167,7 @@ func TestUpdateWithRegisterer(t *testing.T) {
dir := t.TempDir()
// todo: test with new format type(s)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, Base1)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, Base1, Snappy)
c1 := &config.RemoteWriteConfig{
Name: "named",
URL: &common_config.URL{
@ -208,7 +208,7 @@ func TestWriteStorageLifecycle(t *testing.T) {
dir := t.TempDir()
// todo: test with new format type(s)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -226,7 +226,7 @@ func TestUpdateExternalLabels(t *testing.T) {
dir := t.TempDir()
// todo: test with new format type(s)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, Base1)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, Base1, Snappy)
externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{
@ -256,7 +256,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
dir := t.TempDir()
// todo: test with new format type(s)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -282,7 +282,7 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
dir := t.TempDir()
// todo: test with new format type(s)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy)
c0 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(10 * time.Second),

View file

@ -254,6 +254,7 @@ func NewAPI(
statsRenderer StatsRenderer,
rwEnabled bool,
rwFormat remote.RemoteWriteFormat,
rwComp remote.RemoteWriteCompression,
otlpEnabled bool,
) *API {
a := &API{
@ -296,7 +297,7 @@ func NewAPI(
}
if rwEnabled {
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwFormat)
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwFormat, rwComp)
}
if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap)

View file

@ -264,6 +264,7 @@ type Options struct {
IsAgent bool
AppName string
RemoteWriteFormat remote.RemoteWriteFormat
RemoteWriteCompression remote.RemoteWriteCompression
Gatherer prometheus.Gatherer
Registerer prometheus.Registerer
@ -354,6 +355,7 @@ func New(logger log.Logger, o *Options) *Handler {
nil,
o.EnableRemoteWriteReceiver,
o.RemoteWriteFormat,
o.RemoteWriteCompression,
o.EnableOTLPWriteReceiver,
)