From 32d87282addfaa2af9f7a6b953d17f3da6101d66 Mon Sep 17 00:00:00 2001 From: Justin Lei Date: Tue, 11 Jul 2023 05:57:57 -0700 Subject: [PATCH] Add Zstandard compression option for wlog (#11666) Snappy remains as the default compression but there is now a flag to switch the compression algorithm. Signed-off-by: Justin Lei --- cmd/prometheus/main.go | 13 ++- go.mod | 1 + go.sum | 2 + tsdb/agent/db.go | 10 ++- tsdb/compact_test.go | 7 +- tsdb/db.go | 6 +- tsdb/db_test.go | 12 +-- tsdb/head_test.go | 164 +++++++++++++++++------------------ tsdb/head_wal.go | 2 +- tsdb/ooo_head_read_test.go | 5 +- tsdb/wal.go | 2 +- tsdb/wal_test.go | 4 +- tsdb/wlog/checkpoint.go | 2 +- tsdb/wlog/checkpoint_test.go | 6 +- tsdb/wlog/live_reader.go | 53 ++++++----- tsdb/wlog/reader.go | 36 +++++--- tsdb/wlog/reader_test.go | 14 +-- tsdb/wlog/watcher_test.go | 28 +++--- tsdb/wlog/wlog.go | 90 ++++++++++++++----- tsdb/wlog/wlog_test.go | 52 ++++++----- 20 files changed, 302 insertions(+), 207 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 3d723f152..debc0d3f9 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -71,6 +71,7 @@ import ( "github.com/prometheus/prometheus/tracing" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/agent" + "github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/util/documentcli" "github.com/prometheus/prometheus/util/logging" prom_runtime "github.com/prometheus/prometheus/util/runtime" @@ -334,6 +335,9 @@ func main() { serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL."). Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression) + serverOnlyFlag(a, "storage.tsdb.wal-compression-type", "Compression algorithm for the tsdb WAL."). + Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.tsdb.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd)) + serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental."). Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize) @@ -350,6 +354,9 @@ func main() { agentOnlyFlag(a, "storage.agent.wal-compression", "Compress the agent WAL."). Default("true").BoolVar(&cfg.agent.WALCompression) + agentOnlyFlag(a, "storage.agent.wal-compression-type", "Compression algorithm for the agent WAL."). + Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.agent.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd)) + agentOnlyFlag(a, "storage.agent.wal-truncate-frequency", "The frequency at which to truncate the WAL and remove old data."). Hidden().PlaceHolder("").SetValue(&cfg.agent.TruncateFrequency) @@ -1546,6 +1553,7 @@ type tsdbOptions struct { MaxBytes units.Base2Bytes NoLockfile bool WALCompression bool + WALCompressionType string HeadChunksWriteQueueSize int SamplesPerChunk int StripeSize int @@ -1566,7 +1574,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { MaxBytes: int64(opts.MaxBytes), NoLockfile: opts.NoLockfile, AllowOverlappingCompaction: true, - WALCompression: opts.WALCompression, + WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType), HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize, SamplesPerChunk: opts.SamplesPerChunk, StripeSize: opts.StripeSize, @@ -1585,6 +1593,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { type agentOptions struct { WALSegmentSize units.Base2Bytes WALCompression bool + WALCompressionType string StripeSize int TruncateFrequency model.Duration MinWALTime, MaxWALTime model.Duration @@ -1594,7 +1603,7 @@ type agentOptions struct { func (opts agentOptions) ToAgentOptions() agent.Options { return agent.Options{ WALSegmentSize: int(opts.WALSegmentSize), - WALCompression: opts.WALCompression, + WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType), StripeSize: opts.StripeSize, TruncateFrequency: time.Duration(opts.TruncateFrequency), MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)), diff --git a/go.mod b/go.mod index 48db76291..4597ee420 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/hetznercloud/hcloud-go v1.47.0 github.com/ionos-cloud/sdk-go/v6 v6.1.7 github.com/json-iterator/go v1.1.12 + github.com/klauspost/compress v1.15.12 github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b github.com/linode/linodego v1.17.2 github.com/miekg/dns v1.1.54 diff --git a/go.sum b/go.sum index af10deb8b..57f9f82c6 100644 --- a/go.sum +++ b/go.sum @@ -507,6 +507,8 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM= +github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00= github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 13cad6bfc..d47095a23 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -65,8 +65,8 @@ type Options struct { // WALSegmentSize > 0, segment size is WALSegmentSize. WALSegmentSize int - // WALCompression will turn on Snappy compression for records on the WAL. - WALCompression bool + // WALCompression configures the compression type to use on records in the WAL. + WALCompression wlog.CompressionType // StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance. StripeSize int @@ -87,7 +87,7 @@ type Options struct { func DefaultOptions() *Options { return &Options{ WALSegmentSize: wlog.DefaultSegmentSize, - WALCompression: false, + WALCompression: wlog.CompressionNone, StripeSize: tsdb.DefaultStripeSize, TruncateFrequency: DefaultTruncateFrequency, MinWALTime: DefaultMinWALTime, @@ -318,6 +318,10 @@ func validateOptions(opts *Options) *Options { opts.WALSegmentSize = wlog.DefaultSegmentSize } + if opts.WALCompression == "" { + opts.WALCompression = wlog.CompressionNone + } + // Revert Stripesize to DefaultStripsize if Stripsize is either 0 or not a power of 2. if opts.StripeSize <= 0 || ((opts.StripeSize & (opts.StripeSize - 1)) != 0) { opts.StripeSize = tsdb.DefaultStripeSize diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 27a5cdfa8..d20918268 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/prometheus/prometheus/tsdb/wlog" ) func TestSplitByRange(t *testing.T) { @@ -1306,7 +1307,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { func TestHeadCompactionWithHistograms(t *testing.T) { for _, floatTest := range []bool{true, false} { t.Run(fmt.Sprintf("float=%t", floatTest), func(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, false, false) + head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) require.NoError(t, head.Init(0)) t.Cleanup(func() { require.NoError(t, head.Close()) @@ -1485,11 +1486,11 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { c.numBuckets, ), func(t *testing.T) { - oldHead, _ := newTestHead(t, DefaultBlockDuration, false, false) + oldHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) t.Cleanup(func() { require.NoError(t, oldHead.Close()) }) - sparseHead, _ := newTestHead(t, DefaultBlockDuration, false, false) + sparseHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) t.Cleanup(func() { require.NoError(t, sparseHead.Close()) }) diff --git a/tsdb/db.go b/tsdb/db.go index 62359a737..2ca6034a0 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -77,8 +77,8 @@ func DefaultOptions() *Options { MaxBlockDuration: DefaultBlockDuration, NoLockfile: false, AllowOverlappingCompaction: true, - WALCompression: false, SamplesPerChunk: DefaultSamplesPerChunk, + WALCompression: wlog.CompressionNone, StripeSize: DefaultStripeSize, HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, IsolationDisabled: defaultIsolationDisabled, @@ -123,8 +123,8 @@ type Options struct { // For Prometheus, this will always be true. AllowOverlappingCompaction bool - // WALCompression will turn on Snappy compression for records on the WAL. - WALCompression bool + // WALCompression configures the compression type to use on records in the WAL. + WALCompression wlog.CompressionType // Maximum number of CPUs that can simultaneously processes WAL replay. // If it is <=0, then GOMAXPROCS is used. diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 427a3b7af..c746d5018 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1965,7 +1965,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { dir := t.TempDir() require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) - w, err := wlog.New(nil, nil, path.Join(dir, "wal"), false) + w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone) require.NoError(t, err) var enc record.Encoder @@ -2007,7 +2007,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 1000, 6000)) require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) - w, err := wlog.New(nil, nil, path.Join(dir, "wal"), false) + w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone) require.NoError(t, err) var enc record.Encoder @@ -2408,7 +2408,7 @@ func TestDBReadOnly(t *testing.T) { } // Add head to test DBReadOnly WAL reading capabilities. - w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), true) + w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), wlog.CompressionSnappy) require.NoError(t, err) h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir) require.NoError(t, h.Close()) @@ -2972,7 +2972,7 @@ func TestCompactHead(t *testing.T) { NoLockfile: true, MinBlockDuration: int64(time.Hour * 2 / time.Millisecond), MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond), - WALCompression: true, + WALCompression: wlog.CompressionSnappy, } db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil) @@ -3912,7 +3912,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) { ctx := context.Background() numSamples := 10000 - hb, w := newTestHead(t, int64(numSamples)*10, false, false) + hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false) // Add some series so we can append metadata to them. app := hb.Appender(ctx) @@ -5099,7 +5099,7 @@ func TestWBLAndMmapReplay(t *testing.T) { resetMmapToOriginal() // We neet to reset because new duplicate chunks can be written above. // Removing m-map markers in WBL by rewriting it. - newWbl, err := wlog.New(log.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), false) + newWbl, err := wlog.New(log.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), wlog.CompressionNone) require.NoError(t, err) sr, err := wlog.NewSegmentsReader(originalWblDir) require.NoError(t, err) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 282810620..f9eb88ba1 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -52,7 +52,7 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" ) -func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (*Head, *wlog.WL) { +func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) require.NoError(t, err) @@ -79,7 +79,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) ( func BenchmarkCreateSeries(b *testing.B) { series := genSeries(b.N, 10, 0, 0) - h, _ := newTestHead(b, 10000, false, false) + h, _ := newTestHead(b, 10000, wlog.CompressionNone, false) b.Cleanup(func() { require.NoError(b, h.Close()) }) @@ -100,7 +100,7 @@ func BenchmarkHeadAppender_Append_Commit_ExistingSeries(b *testing.B) { b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) { for _, samplesPerAppend := range []int64{1, 2, 5, 100} { b.Run(fmt.Sprintf("%d samples per append", samplesPerAppend), func(b *testing.B) { - h, _ := newTestHead(b, 10000, false, false) + h, _ := newTestHead(b, 10000, wlog.CompressionNone, false) b.Cleanup(func() { require.NoError(b, h.Close()) }) ts := int64(1000) @@ -245,7 +245,7 @@ func BenchmarkLoadWAL(b *testing.B) { func(b *testing.B) { dir := b.TempDir() - w, err := wlog.New(nil, nil, dir, false) + w, err := wlog.New(nil, nil, dir, wlog.CompressionNone) require.NoError(b, err) // Write series. @@ -337,7 +337,7 @@ func BenchmarkLoadWAL(b *testing.B) { // While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the // returned results are correct. func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, false, false) + head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) defer func() { require.NoError(t, head.Close()) }() @@ -527,8 +527,8 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { } func TestHead_ReadWAL(t *testing.T) { - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { entries := []interface{}{ []record.RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, @@ -609,7 +609,7 @@ func TestHead_ReadWAL(t *testing.T) { } func TestHead_WALMultiRef(t *testing.T) { - head, w := newTestHead(t, 1000, false, false) + head, w := newTestHead(t, 1000, wlog.CompressionNone, false) require.NoError(t, head.Init(0)) @@ -644,7 +644,7 @@ func TestHead_WALMultiRef(t *testing.T) { require.NotEqual(t, ref1, ref2, "Refs are the same") require.NoError(t, head.Close()) - w, err = wlog.New(nil, nil, w.Dir(), false) + w, err = wlog.New(nil, nil, w.Dir(), wlog.CompressionNone) require.NoError(t, err) opts := DefaultHeadOptions() @@ -669,7 +669,7 @@ func TestHead_WALMultiRef(t *testing.T) { } func TestHead_ActiveAppenders(t *testing.T) { - head, _ := newTestHead(t, 1000, false, false) + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer head.Close() require.NoError(t, head.Init(0)) @@ -702,14 +702,14 @@ func TestHead_ActiveAppenders(t *testing.T) { } func TestHead_UnknownWALRecord(t *testing.T) { - head, w := newTestHead(t, 1000, false, false) + head, w := newTestHead(t, 1000, wlog.CompressionNone, false) w.Log([]byte{255, 42}) require.NoError(t, head.Init(0)) require.NoError(t, head.Close()) } func TestHead_Truncate(t *testing.T) { - h, _ := newTestHead(t, 1000, false, false) + h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, h.Close()) }() @@ -847,8 +847,8 @@ func TestMemSeries_truncateChunks(t *testing.T) { } func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { entries := []interface{}{ []record.RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, @@ -927,8 +927,8 @@ func TestHeadDeleteSimple(t *testing.T) { }, } - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { for _, c := range cases { head, w := newTestHead(t, 1000, compress, false) require.NoError(t, head.Init(0)) @@ -1011,7 +1011,7 @@ func TestHeadDeleteSimple(t *testing.T) { } func TestDeleteUntilCurMax(t *testing.T) { - hb, _ := newTestHead(t, 1000000, false, false) + hb, _ := newTestHead(t, 1000000, wlog.CompressionNone, false) defer func() { require.NoError(t, hb.Close()) }() @@ -1064,7 +1064,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { numSamples := 10000 // Enough samples to cause a checkpoint. - hb, w := newTestHead(t, int64(numSamples)*10, false, false) + hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false) for i := 0; i < numSamples; i++ { app := hb.Appender(context.Background()) @@ -1156,7 +1156,7 @@ func TestDelete_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} } - hb, _ := newTestHead(t, 100000, false, false) + hb, _ := newTestHead(t, 100000, wlog.CompressionNone, false) defer func() { require.NoError(t, hb.Close()) }() @@ -1506,7 +1506,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. const chunkRange = 1000 - h, _ := newTestHead(t, chunkRange, false, false) + h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) defer func() { require.NoError(t, h.Close()) }() @@ -1565,7 +1565,7 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. const chunkRange = 1000 - h, _ := newTestHead(t, chunkRange, false, false) + h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) defer func() { require.NoError(t, h.Close()) }() @@ -1624,7 +1624,7 @@ func TestGCSeriesAccess(t *testing.T) { } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { - h, _ := newTestHead(t, 1000, false, false) + h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, h.Close()) }() @@ -1654,7 +1654,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { } func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { - h, _ := newTestHead(t, 1000, false, false) + h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, h.Close()) }() @@ -1685,8 +1685,8 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { } func TestHead_LogRollback(t *testing.T) { - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { h, w := newTestHead(t, 1000, compress, false) defer func() { require.NoError(t, h.Close()) @@ -1743,8 +1743,8 @@ func TestWalRepair_DecodingError(t *testing.T) { 5, }, } { - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) { + for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) { dir := t.TempDir() // Fill the wal and corrupt it. @@ -1812,7 +1812,7 @@ func TestHeadReadWriterRepair(t *testing.T) { walDir := filepath.Join(dir, "wal") // Fill the chunk segments and corrupt it. { - w, err := wlog.New(nil, nil, walDir, false) + w, err := wlog.New(nil, nil, walDir, wlog.CompressionNone) require.NoError(t, err) opts := DefaultHeadOptions() @@ -1880,7 +1880,7 @@ func TestHeadReadWriterRepair(t *testing.T) { } func TestNewWalSegmentOnTruncate(t *testing.T) { - h, wal := newTestHead(t, 1000, false, false) + h, wal := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, h.Close()) }() @@ -1910,7 +1910,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { } func TestAddDuplicateLabelName(t *testing.T) { - h, _ := newTestHead(t, 1000, false, false) + h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, h.Close()) }() @@ -1993,7 +1993,7 @@ func TestMemSeriesIsolation(t *testing.T) { } // Test isolation without restart of Head. - hb, _ := newTestHead(t, 1000, false, false) + hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) i := addSamples(hb) testIsolation(hb, i) @@ -2055,11 +2055,11 @@ func TestMemSeriesIsolation(t *testing.T) { require.NoError(t, hb.Close()) // Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay. - hb, w := newTestHead(t, 1000, false, false) + hb, w := newTestHead(t, 1000, wlog.CompressionNone, false) i = addSamples(hb) require.NoError(t, hb.Close()) - wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, false) + wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, wlog.CompressionNone) require.NoError(t, err) opts := DefaultHeadOptions() opts.ChunkRange = 1000 @@ -2108,7 +2108,7 @@ func TestIsolationRollback(t *testing.T) { } // Rollback after a failed append and test if the low watermark has progressed anyway. - hb, _ := newTestHead(t, 1000, false, false) + hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2139,7 +2139,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - hb, _ := newTestHead(t, 1000, false, false) + hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2176,7 +2176,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - h, _ := newTestHead(t, 1000, false, false) + h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, h.Close()) }() @@ -2207,7 +2207,7 @@ func TestIsolationWithoutAdd(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - hb, _ := newTestHead(t, 1000, false, false) + hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2302,7 +2302,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { } func testHeadSeriesChunkRace(t *testing.T) { - h, _ := newTestHead(t, 1000, false, false) + h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, h.Close()) }() @@ -2337,7 +2337,7 @@ func testHeadSeriesChunkRace(t *testing.T) { } func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { - head, _ := newTestHead(t, 1000, false, false) + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, head.Close()) }() @@ -2397,7 +2397,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { } func TestHeadLabelValuesWithMatchers(t *testing.T) { - head, _ := newTestHead(t, 1000, false, false) + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) app := head.Appender(context.Background()) @@ -2456,7 +2456,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) { } func TestHeadLabelNamesWithMatchers(t *testing.T) { - head, _ := newTestHead(t, 1000, false, false) + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, head.Close()) }() @@ -2524,7 +2524,7 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) { } func TestErrReuseAppender(t *testing.T) { - head, _ := newTestHead(t, 1000, false, false) + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, head.Close()) }() @@ -2560,7 +2560,7 @@ func TestErrReuseAppender(t *testing.T) { func TestHeadMintAfterTruncation(t *testing.T) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, false, false) + head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) app := head.Appender(context.Background()) _, err := app.Append(0, labels.FromStrings("a", "b"), 100, 100) @@ -2594,7 +2594,7 @@ func TestHeadMintAfterTruncation(t *testing.T) { func TestHeadExemplars(t *testing.T) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, false, false) + head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) app := head.Appender(context.Background()) l := labels.FromStrings("traceId", "123") @@ -2616,7 +2616,7 @@ func TestHeadExemplars(t *testing.T) { func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { chunkRange := int64(2000) - head, _ := newTestHead(b, chunkRange, false, false) + head, _ := newTestHead(b, chunkRange, wlog.CompressionNone, false) b.Cleanup(func() { require.NoError(b, head.Close()) }) app := head.Appender(context.Background()) @@ -2930,7 +2930,7 @@ func TestAppendHistogram(t *testing.T) { l := labels.FromStrings("a", "b") for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} { t.Run(fmt.Sprintf("%d", numHistograms), func(t *testing.T) { - head, _ := newTestHead(t, 1000, false, false) + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -3034,7 +3034,7 @@ func TestAppendHistogram(t *testing.T) { } func TestHistogramInWALAndMmapChunk(t *testing.T) { - head, _ := newTestHead(t, 3000, false, false) + head, _ := newTestHead(t, 3000, wlog.CompressionNone, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -3188,7 +3188,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { // Restart head. require.NoError(t, head.Close()) startHead := func() { - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -3217,7 +3217,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { } func TestChunkSnapshot(t *testing.T) { - head, _ := newTestHead(t, 120*4, false, false) + head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false) defer func() { head.opts.EnableMemorySnapshotOnShutdown = false require.NoError(t, head.Close()) @@ -3310,7 +3310,7 @@ func TestChunkSnapshot(t *testing.T) { } openHeadAndCheckReplay := func() { - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -3505,7 +3505,7 @@ func TestChunkSnapshot(t *testing.T) { } func TestSnapshotError(t *testing.T) { - head, _ := newTestHead(t, 120*4, false, false) + head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false) defer func() { head.opts.EnableMemorySnapshotOnShutdown = false require.NoError(t, head.Close()) @@ -3562,7 +3562,7 @@ func TestSnapshotError(t *testing.T) { require.NoError(t, f.Close()) // Create new Head which should replay this snapshot. - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) require.NoError(t, err) // Testing https://github.com/prometheus/prometheus/issues/9437 with the registry. head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil) @@ -3579,7 +3579,7 @@ func TestSnapshotError(t *testing.T) { func TestHistogramMetrics(t *testing.T) { numHistograms := 10 - head, _ := newTestHead(t, 1000, false, false) + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -3609,7 +3609,7 @@ func TestHistogramMetrics(t *testing.T) { require.Equal(t, float64(expHSamples), prom_testutil.ToFloat64(head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram))) require.NoError(t, head.Close()) - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -3631,7 +3631,7 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { t.Helper() l := labels.FromStrings("a", "b") numHistograms := 20 - head, _ := newTestHead(t, 100000, false, false) + head, _ := newTestHead(t, 100000, wlog.CompressionNone, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -3778,7 +3778,7 @@ func TestHistogramCounterResetHeader(t *testing.T) { for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) { l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, false, false) + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -4041,7 +4041,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { // Tests https://github.com/prometheus/prometheus/issues/9725. func TestChunkSnapshotReplayBug(t *testing.T) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) // Write few series records and samples such that the series references are not in order in the WAL @@ -4108,7 +4108,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) { func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { dir := t.TempDir() - wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) // Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots. @@ -4146,9 +4146,9 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { // TODO(codesome): Needs test for ooo WAL repair. func TestOOOWalReplay(t *testing.T) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -4193,9 +4193,9 @@ func TestOOOWalReplay(t *testing.T) { // Restart head. require.NoError(t, h.Close()) - wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) - oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) + oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) require.NoError(t, err) h, err = NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) @@ -4230,9 +4230,9 @@ func TestOOOWalReplay(t *testing.T) { // TestOOOMmapReplay checks the replay at a low level. func TestOOOMmapReplay(t *testing.T) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -4281,9 +4281,9 @@ func TestOOOMmapReplay(t *testing.T) { // Restart head. require.NoError(t, h.Close()) - wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) - oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) + oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) require.NoError(t, err) h, err = NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) @@ -4312,7 +4312,7 @@ func TestOOOMmapReplay(t *testing.T) { } func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { - h, _ := newTestHead(t, 1000, false, false) + h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { require.NoError(t, h.Close()) }() @@ -4355,7 +4355,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { require.NoError(t, h.Close()) - wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, false) + wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, wlog.CompressionNone) require.NoError(t, err) h, err = NewHead(nil, nil, wal, nil, h.opts, nil) require.NoError(t, err) @@ -4390,7 +4390,7 @@ func (c *unsupportedChunk) Encoding() chunkenc.Encoding { // Tests https://github.com/prometheus/prometheus/issues/10277. func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) require.NoError(t, err) opts := DefaultHeadOptions() @@ -4423,7 +4423,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { addChunks() require.NoError(t, h.Close()) - wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) require.NoError(t, err) mmapFilePath := filepath.Join(dir, "chunks_head", "000001") @@ -4449,7 +4449,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) { var err error openHead := func() { - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) require.NoError(t, err) opts := DefaultHeadOptions() @@ -4667,9 +4667,9 @@ func generateBigTestHistograms(n int) []*histogram.Histogram { func TestOOOAppendWithNoSeries(t *testing.T) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -4748,9 +4748,9 @@ func TestOOOAppendWithNoSeries(t *testing.T) { func TestHeadMinOOOTimeUpdate(t *testing.T) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -4795,7 +4795,7 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) { func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, false, false) + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -4859,7 +4859,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { require.NoError(t, head.Close()) require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -4870,7 +4870,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, false, false) + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -4934,7 +4934,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { require.NoError(t, head.Close()) require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -4944,7 +4944,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { } func TestSnapshotAheadOfWALError(t *testing.T) { - head, _ := newTestHead(t, 120*4, false, false) + head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false) head.opts.EnableMemorySnapshotOnShutdown = true // Add a sample to fill WAL. app := head.Appender(context.Background()) @@ -4967,7 +4967,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) { // to keep using the same snapshot directory instead of a random one. require.NoError(t, os.RemoveAll(head.wal.Dir())) head.opts.EnableMemorySnapshotOnShutdown = false - w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) // Add a sample to fill WAL. @@ -4986,7 +4986,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) { // Create new Head which should detect the incorrect index and delete the snapshot. head.opts.EnableMemorySnapshotOnShutdown = true - w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) require.NoError(t, head.Init(math.MinInt64)) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 2fe33befb..2397a9ec9 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -1119,7 +1119,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { return stats, errors.Wrap(err, "create chunk snapshot dir") } - cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled()) + cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionType()) if err != nil { return stats, errors.Wrap(err, "open chunk snapshot") } diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index ed9ca2769..6c0038f89 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/prometheus/prometheus/tsdb/wlog" ) type chunkInterval struct { @@ -295,7 +296,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { for perm, intervals := range permutations { for _, headChunk := range []bool{false, true} { t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) { - h, _ := newTestHead(t, 1000, false, true) + h, _ := newTestHead(t, 1000, wlog.CompressionNone, true) defer func() { require.NoError(t, h.Close()) }() @@ -375,7 +376,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { func TestOOOHeadChunkReader_LabelValues(t *testing.T) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, false, true) + head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true) t.Cleanup(func() { require.NoError(t, head.Close()) }) app := head.Appender(context.Background()) diff --git a/tsdb/wal.go b/tsdb/wal.go index 70378021a..3a410fb63 100644 --- a/tsdb/wal.go +++ b/tsdb/wal.go @@ -1226,7 +1226,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err := os.RemoveAll(tmpdir); err != nil { return errors.Wrap(err, "cleanup replacement dir") } - repl, err := wlog.New(logger, nil, tmpdir, false) + repl, err := wlog.New(logger, nil, tmpdir, wlog.CompressionNone) if err != nil { return errors.Wrap(err, "open new WAL") } diff --git a/tsdb/wal_test.go b/tsdb/wal_test.go index da242b875..5b2911131 100644 --- a/tsdb/wal_test.go +++ b/tsdb/wal_test.go @@ -450,7 +450,7 @@ func TestMigrateWAL_Empty(t *testing.T) { wdir := path.Join(dir, "wal") // Initialize empty WAL. - w, err := wlog.New(nil, nil, wdir, false) + w, err := wlog.New(nil, nil, wdir, wlog.CompressionNone) require.NoError(t, err) require.NoError(t, w.Close()) @@ -493,7 +493,7 @@ func TestMigrateWAL_Fuzz(t *testing.T) { // Perform migration. require.NoError(t, MigrateWAL(nil, wdir)) - w, err := wlog.New(nil, nil, wdir, false) + w, err := wlog.New(nil, nil, wdir, wlog.CompressionNone) require.NoError(t, err) // We can properly write some new data after migration. diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index fe9952a30..dd52ea2e3 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -134,7 +134,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { return nil, errors.Wrap(err, "create checkpoint dir") } - cp, err := New(nil, nil, cpdirtmp, w.CompressionEnabled()) + cp, err := New(nil, nil, cpdirtmp, w.CompressionType()) if err != nil { return nil, errors.Wrap(err, "open checkpoint") } diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index 22f577efd..704a65cc1 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -126,8 +126,8 @@ func TestCheckpoint(t *testing.T) { } } - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() var enc record.Encoder @@ -303,7 +303,7 @@ func TestCheckpoint(t *testing.T) { func TestCheckpointNoTmpFolderAfterError(t *testing.T) { // Create a new wlog with invalid data. dir := t.TempDir() - w, err := NewSize(nil, nil, dir, 64*1024, false) + w, err := NewSize(nil, nil, dir, 64*1024, CompressionNone) require.NoError(t, err) var enc record.Encoder require.NoError(t, w.Log(enc.Series([]record.RefSeries{ diff --git a/tsdb/wlog/live_reader.go b/tsdb/wlog/live_reader.go index 0ca69093a..c69017051 100644 --- a/tsdb/wlog/live_reader.go +++ b/tsdb/wlog/live_reader.go @@ -23,6 +23,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -51,10 +52,14 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics { // NewLiveReader returns a new live reader. func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader { + // Calling zstd.NewReader with a nil io.Reader and no options cannot return an error. + zstdReader, _ := zstd.NewReader(nil) + lr := &LiveReader{ - logger: logger, - rdr: r, - metrics: metrics, + logger: logger, + rdr: r, + zstdReader: zstdReader, + metrics: metrics, // Until we understand how they come about, make readers permissive // to records spanning pages. @@ -68,17 +73,18 @@ func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) * // that are still in the process of being written, and returns records as soon // as they can be read. type LiveReader struct { - logger log.Logger - rdr io.Reader - err error - rec []byte - snappyBuf []byte - hdr [recordHeaderSize]byte - buf [pageSize]byte - readIndex int // Index in buf to start at for next read. - writeIndex int // Index in buf to start at for next write. - total int64 // Total bytes processed during reading in calls to Next(). - index int // Used to track partial records, should be 0 at the start of every new record. + logger log.Logger + rdr io.Reader + err error + rec []byte + compressBuf []byte + zstdReader *zstd.Decoder + hdr [recordHeaderSize]byte + buf [pageSize]byte + readIndex int // Index in buf to start at for next read. + writeIndex int // Index in buf to start at for next write. + total int64 // Total bytes processed during reading in calls to Next(). + index int // Used to track partial records, should be 0 at the start of every new record. // For testing, we can treat EOF as a non-error. eofNonErr bool @@ -191,12 +197,14 @@ func (r *LiveReader) buildRecord() (bool, error) { rt := recTypeFromHeader(r.hdr[0]) if rt == recFirst || rt == recFull { r.rec = r.rec[:0] - r.snappyBuf = r.snappyBuf[:0] + r.compressBuf = r.compressBuf[:0] } - compressed := r.hdr[0]&snappyMask != 0 - if compressed { - r.snappyBuf = append(r.snappyBuf, temp...) + isSnappyCompressed := r.hdr[0]&snappyMask == snappyMask + isZstdCompressed := r.hdr[0]&zstdMask == zstdMask + + if isSnappyCompressed || isZstdCompressed { + r.compressBuf = append(r.compressBuf, temp...) } else { r.rec = append(r.rec, temp...) } @@ -207,12 +215,17 @@ func (r *LiveReader) buildRecord() (bool, error) { } if rt == recLast || rt == recFull { r.index = 0 - if compressed && len(r.snappyBuf) > 0 { + if isSnappyCompressed && len(r.compressBuf) > 0 { // The snappy library uses `len` to calculate if we need a new buffer. // In order to allocate as few buffers as possible make the length // equal to the capacity. r.rec = r.rec[:cap(r.rec)] - r.rec, err = snappy.Decode(r.rec, r.snappyBuf) + r.rec, err = snappy.Decode(r.rec, r.compressBuf) + if err != nil { + return false, err + } + } else if isZstdCompressed && len(r.compressBuf) > 0 { + r.rec, err = r.zstdReader.DecodeAll(r.compressBuf, r.rec[:0]) if err != nil { return false, err } diff --git a/tsdb/wlog/reader.go b/tsdb/wlog/reader.go index cba216764..f77b03b8e 100644 --- a/tsdb/wlog/reader.go +++ b/tsdb/wlog/reader.go @@ -20,23 +20,27 @@ import ( "io" "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" "github.com/pkg/errors" ) // Reader reads WAL records from an io.Reader. type Reader struct { - rdr io.Reader - err error - rec []byte - snappyBuf []byte - buf [pageSize]byte - total int64 // Total bytes processed. - curRecTyp recType // Used for checking that the last record is not torn. + rdr io.Reader + err error + rec []byte + compressBuf []byte + zstdReader *zstd.Decoder + buf [pageSize]byte + total int64 // Total bytes processed. + curRecTyp recType // Used for checking that the last record is not torn. } // NewReader returns a new reader. func NewReader(r io.Reader) *Reader { - return &Reader{rdr: r} + // Calling zstd.NewReader with a nil io.Reader and no options cannot return an error. + zstdReader, _ := zstd.NewReader(nil) + return &Reader{rdr: r, zstdReader: zstdReader} } // Next advances the reader to the next records and returns true if it exists. @@ -63,7 +67,7 @@ func (r *Reader) next() (err error) { buf := r.buf[recordHeaderSize:] r.rec = r.rec[:0] - r.snappyBuf = r.snappyBuf[:0] + r.compressBuf = r.compressBuf[:0] i := 0 for { @@ -72,7 +76,8 @@ func (r *Reader) next() (err error) { } r.total++ r.curRecTyp = recTypeFromHeader(hdr[0]) - compressed := hdr[0]&snappyMask != 0 + isSnappyCompressed := hdr[0]&snappyMask == snappyMask + isZstdCompressed := hdr[0]&zstdMask == zstdMask // Gobble up zero bytes. if r.curRecTyp == recPageTerm { @@ -128,8 +133,8 @@ func (r *Reader) next() (err error) { return errors.Errorf("unexpected checksum %x, expected %x", c, crc) } - if compressed { - r.snappyBuf = append(r.snappyBuf, buf[:length]...) + if isSnappyCompressed || isZstdCompressed { + r.compressBuf = append(r.compressBuf, buf[:length]...) } else { r.rec = append(r.rec, buf[:length]...) } @@ -138,12 +143,15 @@ func (r *Reader) next() (err error) { return err } if r.curRecTyp == recLast || r.curRecTyp == recFull { - if compressed && len(r.snappyBuf) > 0 { + if isSnappyCompressed && len(r.compressBuf) > 0 { // The snappy library uses `len` to calculate if we need a new buffer. // In order to allocate as few buffers as possible make the length // equal to the capacity. r.rec = r.rec[:cap(r.rec)] - r.rec, err = snappy.Decode(r.rec, r.snappyBuf) + r.rec, err = snappy.Decode(r.rec, r.compressBuf) + return err + } else if isZstdCompressed && len(r.compressBuf) > 0 { + r.rec, err = r.zstdReader.DecodeAll(r.compressBuf, r.rec[:0]) return err } return nil diff --git a/tsdb/wlog/reader_test.go b/tsdb/wlog/reader_test.go index 737520e76..2c4dd622c 100644 --- a/tsdb/wlog/reader_test.go +++ b/tsdb/wlog/reader_test.go @@ -310,8 +310,8 @@ func allSegments(dir string) (io.ReadCloser, error) { func TestReaderFuzz(t *testing.T) { for name, fn := range readerConstructors { - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) { + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) { dir := t.TempDir() w, err := NewSize(nil, nil, dir, 128*pageSize, compress) @@ -349,8 +349,8 @@ func TestReaderFuzz(t *testing.T) { func TestReaderFuzz_Live(t *testing.T) { logger := testutil.NewLogger(t) - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() w, err := NewSize(nil, nil, dir, 128*pageSize, compress) @@ -439,7 +439,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) { logger := testutil.NewLogger(t) dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize, false) + w, err := NewSize(nil, nil, dir, pageSize, CompressionNone) require.NoError(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -479,7 +479,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { logger := testutil.NewLogger(t) dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize*2, false) + w, err := NewSize(nil, nil, dir, pageSize*2, CompressionNone) require.NoError(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -526,7 +526,7 @@ func TestReaderData(t *testing.T) { for name, fn := range readerConstructors { t.Run(name, func(t *testing.T) { - w, err := New(nil, nil, dir, true) + w, err := New(nil, nil, dir, CompressionSnappy) require.NoError(t, err) sr, err := allSegments(dir) diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 94b6a92d1..bc6a10126 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -122,8 +122,8 @@ func TestTailSamples(t *testing.T) { const samplesCount = 250 const exemplarsCount = 25 const histogramsCount = 50 - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { now := time.Now() dir := t.TempDir() @@ -246,8 +246,8 @@ func TestReadToEndNoCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() wdir := path.Join(dir, "wal") err := os.Mkdir(wdir, 0o777) @@ -314,8 +314,8 @@ func TestReadToEndWithCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() wdir := path.Join(dir, "wal") @@ -402,8 +402,8 @@ func TestReadCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() wdir := path.Join(dir, "wal") @@ -475,8 +475,8 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { const seriesCount = 20 const samplesCount = 300 - for _, compress := range []bool{false, true} { - t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() wdir := path.Join(dir, "wal") @@ -546,15 +546,15 @@ func TestCheckpointSeriesReset(t *testing.T) { const seriesCount = 20 const samplesCount = 350 testCases := []struct { - compress bool + compress CompressionType segments int }{ - {compress: false, segments: 14}, - {compress: true, segments: 13}, + {compress: CompressionNone, segments: 14}, + {compress: CompressionSnappy, segments: 13}, } for _, tc := range testCases { - t.Run(fmt.Sprintf("compress=%t", tc.compress), func(t *testing.T) { + t.Run(fmt.Sprintf("compress=%s", tc.compress), func(t *testing.T) { dir := t.TempDir() wdir := path.Join(dir, "wal") diff --git a/tsdb/wlog/wlog.go b/tsdb/wlog/wlog.go index b7b1623f9..d898ebd7a 100644 --- a/tsdb/wlog/wlog.go +++ b/tsdb/wlog/wlog.go @@ -29,6 +29,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/slices" @@ -164,6 +165,26 @@ func OpenReadSegment(fn string) (*Segment, error) { return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil } +type CompressionType string + +const ( + CompressionNone CompressionType = "none" + CompressionSnappy CompressionType = "snappy" + CompressionZstd CompressionType = "zstd" +) + +// ParseCompressionType parses the two compression-related configuration values and returns the CompressionType. If +// compression is enabled but the compressType is unrecognized, we default to Snappy compression. +func ParseCompressionType(compress bool, compressType string) CompressionType { + if compress { + if compressType == "zstd" { + return CompressionZstd + } + return CompressionSnappy + } + return CompressionNone +} + // WL is a write log that stores records in segment files. // It must be read from start to end once before logging new data. // If an error occurs during read, the repair procedure must be called @@ -185,8 +206,9 @@ type WL struct { stopc chan chan struct{} actorc chan func() closed bool // To allow calling Close() more than once without blocking. - compress bool - snappyBuf []byte + compress CompressionType + compressBuf []byte + zstdWriter *zstd.Encoder WriteNotified WriteNotified @@ -265,13 +287,13 @@ func newWLMetrics(w *WL, r prometheus.Registerer) *wlMetrics { } // New returns a new WAL over the given directory. -func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WL, error) { +func New(logger log.Logger, reg prometheus.Registerer, dir string, compress CompressionType) (*WL, error) { return NewSize(logger, reg, dir, DefaultSegmentSize, compress) } // NewSize returns a new write log over the given directory. // New segments are created with the specified size. -func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WL, error) { +func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress CompressionType) (*WL, error) { if segmentSize%pageSize != 0 { return nil, errors.New("invalid segment size") } @@ -281,6 +303,16 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi if logger == nil { logger = log.NewNopLogger() } + + var zstdWriter *zstd.Encoder + if compress == CompressionZstd { + var err error + zstdWriter, err = zstd.NewWriter(nil) + if err != nil { + return nil, err + } + } + w := &WL{ dir: dir, logger: logger, @@ -289,6 +321,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi actorc: make(chan func(), 100), stopc: make(chan chan struct{}), compress: compress, + zstdWriter: zstdWriter, } prefix := "prometheus_tsdb_wal_" if filepath.Base(dir) == WblDirName { @@ -327,16 +360,22 @@ func Open(logger log.Logger, dir string) (*WL, error) { if logger == nil { logger = log.NewNopLogger() } + zstdWriter, err := zstd.NewWriter(nil) + if err != nil { + return nil, err + } + w := &WL{ - dir: dir, - logger: logger, + dir: dir, + logger: logger, + zstdWriter: zstdWriter, } return w, nil } -// CompressionEnabled returns if compression is enabled on this WAL. -func (w *WL) CompressionEnabled() bool { +// CompressionType returns if compression is enabled on this WAL. +func (w *WL) CompressionType() CompressionType { return w.compress } @@ -583,9 +622,10 @@ func (w *WL) flushPage(clear bool) error { } // First Byte of header format: -// [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ] +// [3 bits unallocated] [1 bit zstd compression flag] [1 bit snappy compression flag] [3 bit record type ] const ( snappyMask = 1 << 3 + zstdMask = 1 << 4 recTypeMask = snappyMask - 1 ) @@ -655,17 +695,23 @@ func (w *WL) log(rec []byte, final bool) error { // Compress the record before calculating if a new segment is needed. compressed := false - if w.compress && - len(rec) > 0 && + if w.compress == CompressionSnappy && len(rec) > 0 { // If MaxEncodedLen is less than 0 the record is too large to be compressed. - snappy.MaxEncodedLen(len(rec)) >= 0 { - // The snappy library uses `len` to calculate if we need a new buffer. - // In order to allocate as few buffers as possible make the length - // equal to the capacity. - w.snappyBuf = w.snappyBuf[:cap(w.snappyBuf)] - w.snappyBuf = snappy.Encode(w.snappyBuf, rec) - if len(w.snappyBuf) < len(rec) { - rec = w.snappyBuf + if len(rec) > 0 && snappy.MaxEncodedLen(len(rec)) >= 0 { + // The snappy library uses `len` to calculate if we need a new buffer. + // In order to allocate as few buffers as possible make the length + // equal to the capacity. + w.compressBuf = w.compressBuf[:cap(w.compressBuf)] + w.compressBuf = snappy.Encode(w.compressBuf, rec) + if len(w.compressBuf) < len(rec) { + rec = w.compressBuf + compressed = true + } + } + } else if w.compress == CompressionZstd && len(rec) > 0 { + w.compressBuf = w.zstdWriter.EncodeAll(rec, w.compressBuf[:0]) + if len(w.compressBuf) < len(rec) { + rec = w.compressBuf compressed = true } } @@ -706,7 +752,11 @@ func (w *WL) log(rec []byte, final bool) error { typ = recMiddle } if compressed { - typ |= snappyMask + if w.compress == CompressionSnappy { + typ |= snappyMask + } else if w.compress == CompressionZstd { + typ |= zstdMask + } } buf[0] = byte(typ) diff --git a/tsdb/wlog/wlog_test.go b/tsdb/wlog/wlog_test.go index 3d208baa3..f9ce225b3 100644 --- a/tsdb/wlog/wlog_test.go +++ b/tsdb/wlog/wlog_test.go @@ -124,7 +124,7 @@ func TestWALRepair_ReadingError(t *testing.T) { // then corrupt a given record in a given segment. // As a result we want a repaired WAL with given intact records. segSize := 3 * pageSize - w, err := NewSize(nil, nil, dir, segSize, false) + w, err := NewSize(nil, nil, dir, segSize, CompressionNone) require.NoError(t, err) var records [][]byte @@ -149,7 +149,7 @@ func TestWALRepair_ReadingError(t *testing.T) { require.NoError(t, f.Close()) - w, err = NewSize(nil, nil, dir, segSize, false) + w, err = NewSize(nil, nil, dir, segSize, CompressionNone) require.NoError(t, err) defer w.Close() @@ -223,7 +223,7 @@ func TestCorruptAndCarryOn(t *testing.T) { // Produce a WAL with a two segments of 3 pages with 3 records each, // so when we truncate the file we're guaranteed to split a record. { - w, err := NewSize(logger, nil, dir, segmentSize, false) + w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone) require.NoError(t, err) for i := 0; i < 18; i++ { @@ -294,7 +294,7 @@ func TestCorruptAndCarryOn(t *testing.T) { err = sr.Close() require.NoError(t, err) - w, err := NewSize(logger, nil, dir, segmentSize, false) + w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone) require.NoError(t, err) err = w.Repair(corruptionErr) @@ -337,7 +337,7 @@ func TestCorruptAndCarryOn(t *testing.T) { // TestClose ensures that calling Close more than once doesn't panic and doesn't block. func TestClose(t *testing.T) { dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize, false) + w, err := NewSize(nil, nil, dir, pageSize, CompressionNone) require.NoError(t, err) require.NoError(t, w.Close()) require.Error(t, w.Close()) @@ -350,7 +350,7 @@ func TestSegmentMetric(t *testing.T) { ) dir := t.TempDir() - w, err := NewSize(nil, nil, dir, segmentSize, false) + w, err := NewSize(nil, nil, dir, segmentSize, CompressionNone) require.NoError(t, err) initialSegment := client_testutil.ToFloat64(w.metrics.currentSegment) @@ -369,7 +369,7 @@ func TestSegmentMetric(t *testing.T) { } func TestCompression(t *testing.T) { - bootstrap := func(compressed bool) string { + bootstrap := func(compressed CompressionType) string { const ( segmentSize = pageSize recordSize = (pageSize / 2) - recordHeaderSize @@ -390,21 +390,27 @@ func TestCompression(t *testing.T) { return dirPath } - dirCompressed := bootstrap(true) + tmpDirs := make([]string, 0, 3) defer func() { - require.NoError(t, os.RemoveAll(dirCompressed)) - }() - dirUnCompressed := bootstrap(false) - defer func() { - require.NoError(t, os.RemoveAll(dirUnCompressed)) + for _, dir := range tmpDirs { + require.NoError(t, os.RemoveAll(dir)) + } }() - uncompressedSize, err := fileutil.DirSize(dirUnCompressed) - require.NoError(t, err) - compressedSize, err := fileutil.DirSize(dirCompressed) - require.NoError(t, err) + dirUnCompressed := bootstrap(CompressionNone) + tmpDirs = append(tmpDirs, dirUnCompressed) - require.Greater(t, float64(uncompressedSize)*0.75, float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize) + for _, compressionType := range []CompressionType{CompressionSnappy, CompressionZstd} { + dirCompressed := bootstrap(compressionType) + tmpDirs = append(tmpDirs, dirCompressed) + + uncompressedSize, err := fileutil.DirSize(dirUnCompressed) + require.NoError(t, err) + compressedSize, err := fileutil.DirSize(dirCompressed) + require.NoError(t, err) + + require.Greater(t, float64(uncompressedSize)*0.75, float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize) + } } func TestLogPartialWrite(t *testing.T) { @@ -438,7 +444,7 @@ func TestLogPartialWrite(t *testing.T) { t.Run(testName, func(t *testing.T) { dirPath := t.TempDir() - w, err := NewSize(nil, nil, dirPath, segmentSize, false) + w, err := NewSize(nil, nil, dirPath, segmentSize, CompressionNone) require.NoError(t, err) // Replace the underlying segment file with a mocked one that injects a failure. @@ -505,8 +511,8 @@ func (f *faultySegmentFile) Write(p []byte) (int, error) { } func BenchmarkWAL_LogBatched(b *testing.B) { - for _, compress := range []bool{true, false} { - b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) { + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) { dir := b.TempDir() w, err := New(nil, nil, dir, compress) @@ -535,8 +541,8 @@ func BenchmarkWAL_LogBatched(b *testing.B) { } func BenchmarkWAL_Log(b *testing.B) { - for _, compress := range []bool{true, false} { - b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) { + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) { dir := b.TempDir() w, err := New(nil, nil, dir, compress)