diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index d9f7c51a50..f9116a8a22 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -73,6 +73,7 @@ import ( "github.com/prometheus/prometheus/tracing" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/agent" + "github.com/prometheus/prometheus/tsdb/compression" "github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/util/documentcli" "github.com/prometheus/prometheus/util/logging" @@ -442,7 +443,7 @@ func main() { Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression) serverOnlyFlag(a, "storage.tsdb.wal-compression-type", "Compression algorithm for the tsdb WAL."). - Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.tsdb.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd)) + Hidden().Default(string(compression.Snappy)).EnumVar(&cfg.tsdb.WALCompressionType, string(compression.Snappy), string(compression.Zstd)) serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental."). Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize) @@ -464,7 +465,7 @@ func main() { Default("true").BoolVar(&cfg.agent.WALCompression) agentOnlyFlag(a, "storage.agent.wal-compression-type", "Compression algorithm for the agent WAL."). - Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.agent.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd)) + Hidden().Default(string(compression.Snappy)).EnumVar(&cfg.agent.WALCompressionType, string(compression.Snappy), string(compression.Zstd)) agentOnlyFlag(a, "storage.agent.wal-truncate-frequency", "The frequency at which to truncate the WAL and remove old data."). diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 825c2ec454..80e1e29514 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -37,6 +37,7 @@ import ( "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/compression" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tsdbutil" @@ -66,7 +67,7 @@ type Options struct { WALSegmentSize int // WALCompression configures the compression type to use on records in the WAL. - WALCompression wlog.CompressionType + WALCompression compression.Type // StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance. StripeSize int @@ -90,7 +91,7 @@ type Options struct { func DefaultOptions() *Options { return &Options{ WALSegmentSize: wlog.DefaultSegmentSize, - WALCompression: wlog.CompressionNone, + WALCompression: compression.None, StripeSize: tsdb.DefaultStripeSize, TruncateFrequency: DefaultTruncateFrequency, MinWALTime: DefaultMinWALTime, @@ -339,7 +340,7 @@ func validateOptions(opts *Options) *Options { } if opts.WALCompression == "" { - opts.WALCompression = wlog.CompressionNone + opts.WALCompression = compression.None } // Revert StripeSize to DefaultStripeSize if StripeSize is either 0 or not a power of 2. diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 6e3db15eb4..16d678c55f 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -38,11 +38,11 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/compression" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/tsdb/wlog" ) func TestSplitByRange(t *testing.T) { @@ -1447,7 +1447,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { func TestHeadCompactionWithHistograms(t *testing.T) { for _, floatTest := range []bool{true, false} { t.Run(fmt.Sprintf("float=%t", floatTest), func(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) require.NoError(t, head.Init(0)) t.Cleanup(func() { require.NoError(t, head.Close()) @@ -1627,11 +1627,11 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { c.numBuckets, ), func(t *testing.T) { - oldHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + oldHead, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) t.Cleanup(func() { require.NoError(t, oldHead.Close()) }) - sparseHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + sparseHead, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) t.Cleanup(func() { require.NoError(t, sparseHead.Close()) }) diff --git a/tsdb/compression/compression.go b/tsdb/compression/compression.go new file mode 100644 index 0000000000..047ba1f527 --- /dev/null +++ b/tsdb/compression/compression.go @@ -0,0 +1,120 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compression + +import ( + "errors" + "fmt" + + "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" +) + +type Type string + +const ( + // None represents no compression case. + // None it's a default when Type is empty. + None Type = "none" + // Snappy represents snappy block format. + Snappy Type = "snappy" + // Zstd represents zstd compression. + Zstd Type = "zstd" +) + +type Encoder struct { + w *zstd.Encoder +} + +func NewEncoder() (*Encoder, error) { + e := &Encoder{} + w, err := zstd.NewWriter(nil) + if err != nil { + return nil, err + } + e.w = w + return e, nil +} + +// Encode returns the encoded form of src for the given compression type. It also +// returns the indicator if the compression was performed. Encode may skip +// compressing for None type, but also when src is too large e.g. for Snappy block format. +// +// The buf is used as a buffer for returned encoding, and it must not overlap with +// src. It is valid to pass a nil buf. +// +// Encoder may be nil compression types other than Zstd. +func (e *Encoder) Encode(t Type, src, buf []byte) (_ []byte, compressed bool, err error) { + switch { + case len(src) == 0, t == "", t == None: + return src, false, nil + case t == Snappy: + // If MaxEncodedLen is less than 0 the record is too large to be compressed. + if snappy.MaxEncodedLen(len(src)) < 0 { + return src, false, nil + } + + // The snappy library uses `len` to calculate if we need a new buffer. + // In order to allocate as few buffers as possible make the length + // equal to the capacity. + buf = buf[:cap(buf)] + return snappy.Encode(buf, src), true, nil + case t == Zstd: + if e == nil { + return nil, false, errors.New("zstd requested but encoder was not initialized with NewEncoder()") + } + return e.w.EncodeAll(src, buf[:0]), true, nil + default: + return nil, false, fmt.Errorf("unsupported compression type: %s", t) + } +} + +type Decoder struct { + r *zstd.Decoder +} + +func NewDecoder() *Decoder { + d := &Decoder{} + + // Calling zstd.NewReader with a nil io.Reader and no options cannot return an error. + r, _ := zstd.NewReader(nil) + d.r = r + return d +} + +// Decode returns the decoded form of src or error, given expected compression type. +// +// The buf is used as a buffer for the returned decoded entry, and it must not +// overlap with src. It is valid to pass a nil buf. +// +// Decoder may be nil compression types other than Zstd. +func (d *Decoder) Decode(t Type, src, buf []byte) (_ []byte, err error) { + switch { + case len(src) == 0, t == "", t == None: + return src, nil + case t == Snappy: + // The snappy library uses `len` to calculate if we need a new buffer. + // In order to allocate as few buffers as possible make the length + // equal to the capacity. + buf = buf[:cap(buf)] + return snappy.Decode(buf, src) + case t == Zstd: + if d == nil { + return nil, errors.New("zstd requested but Decoder was not initialized with NewDecoder()") + } + return d.r.DecodeAll(src, buf[:0]) + default: + return nil, fmt.Errorf("unsupported compression type: %s", t) + } +} diff --git a/tsdb/db.go b/tsdb/db.go index 9ab150c5b4..d4065eb7b2 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -41,6 +41,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/compression" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" _ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met. @@ -80,7 +81,7 @@ func DefaultOptions() *Options { MaxBlockDuration: DefaultBlockDuration, NoLockfile: false, SamplesPerChunk: DefaultSamplesPerChunk, - WALCompression: wlog.CompressionNone, + WALCompression: compression.None, StripeSize: DefaultStripeSize, HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, IsolationDisabled: defaultIsolationDisabled, @@ -124,7 +125,7 @@ type Options struct { NoLockfile bool // WALCompression configures the compression type to use on records in the WAL. - WALCompression wlog.CompressionType + WALCompression compression.Type // Maximum number of CPUs that can simultaneously processes WAL replay. // If it is <=0, then GOMAXPROCS is used. diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 2b320b68c9..02c3acc7c4 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -58,6 +58,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/compression" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/record" @@ -1962,7 +1963,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { dir := t.TempDir() require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) - w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone) + w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None) require.NoError(t, err) var enc record.Encoder @@ -2006,7 +2007,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 1000, 6000)) require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) - w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone) + w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None) require.NoError(t, err) var enc record.Encoder @@ -2407,7 +2408,7 @@ func TestDBReadOnly(t *testing.T) { } // Add head to test DBReadOnly WAL reading capabilities. - w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), wlog.CompressionSnappy) + w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), compression.Snappy) require.NoError(t, err) h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir) require.NoError(t, h.Close()) @@ -3058,7 +3059,7 @@ func TestCompactHead(t *testing.T) { NoLockfile: true, MinBlockDuration: int64(time.Hour * 2 / time.Millisecond), MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond), - WALCompression: wlog.CompressionSnappy, + WALCompression: compression.Snappy, } db, err := Open(dbDir, promslog.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil) @@ -4662,7 +4663,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) { ctx := context.Background() numSamples := 10000 - hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false) + hb, w := newTestHead(t, int64(numSamples)*10, compression.None, false) // Add some series so we can append metadata to them. app := hb.Appender(ctx) @@ -7019,7 +7020,7 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) { resetMmapToOriginal() // We neet to reset because new duplicate chunks can be written above. // Removing m-map markers in WBL by rewriting it. - newWbl, err := wlog.New(promslog.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), wlog.CompressionNone) + newWbl, err := wlog.New(promslog.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), compression.None) require.NoError(t, err) sr, err := wlog.NewSegmentsReader(originalWblDir) require.NoError(t, err) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 4f611bd7ad..b1cc4ca7b9 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -676,7 +676,7 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, return storage.SeriesRef(s.ref), nil } -func (a *headAppender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { +func (a *headAppender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, _ int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { // TODO(bwplotka): Add support for native histograms with CTs in WAL; add/consolidate records. // We ignore CT for now. return a.AppendHistogram(ref, lset, t, h, fh) diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index dc682602b1..7fc25bd1e0 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -29,7 +29,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/tsdb/compression" ) func BenchmarkHeadStripeSeriesCreate(b *testing.B) { @@ -132,7 +132,7 @@ func BenchmarkHead_WalCommit(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - h, w := newTestHead(b, 10000, wlog.CompressionNone, false) + h, w := newTestHead(b, 10000, compression.None, false) b.Cleanup(func() { if h != nil { h.Close() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index eaf79c5d3d..c856dbe963 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -46,6 +46,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/compression" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/record" @@ -68,11 +69,11 @@ func newTestHeadDefaultOptions(chunkRange int64, oooEnabled bool) *HeadOptions { return opts } -func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) { +func newTestHead(t testing.TB, chunkRange int64, compressWAL compression.Type, oooEnabled bool) (*Head, *wlog.WL) { return newTestHeadWithOptions(t, compressWAL, newTestHeadDefaultOptions(chunkRange, oooEnabled)) } -func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts *HeadOptions) (*Head, *wlog.WL) { +func newTestHeadWithOptions(t testing.TB, compressWAL compression.Type, opts *HeadOptions) (*Head, *wlog.WL) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) require.NoError(t, err) @@ -92,7 +93,7 @@ func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts func BenchmarkCreateSeries(b *testing.B) { series := genSeries(b.N, 10, 0, 0) - h, _ := newTestHead(b, 10000, wlog.CompressionNone, false) + h, _ := newTestHead(b, 10000, compression.None, false) b.Cleanup(func() { require.NoError(b, h.Close()) }) @@ -113,7 +114,7 @@ func BenchmarkHeadAppender_Append_Commit_ExistingSeries(b *testing.B) { b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) { for _, samplesPerAppend := range []int64{1, 2, 5, 100} { b.Run(fmt.Sprintf("%d samples per append", samplesPerAppend), func(b *testing.B) { - h, _ := newTestHead(b, 10000, wlog.CompressionNone, false) + h, _ := newTestHead(b, 10000, compression.None, false) b.Cleanup(func() { require.NoError(b, h.Close()) }) ts := int64(1000) @@ -294,11 +295,11 @@ func BenchmarkLoadWLs(b *testing.B) { func(b *testing.B) { dir := b.TempDir() - wal, err := wlog.New(nil, nil, dir, wlog.CompressionNone) + wal, err := wlog.New(nil, nil, dir, compression.None) require.NoError(b, err) var wbl *wlog.WL if c.oooSeriesPct != 0 { - wbl, err = wlog.New(nil, nil, dir, wlog.CompressionNone) + wbl, err = wlog.New(nil, nil, dir, compression.None) require.NoError(b, err) } @@ -459,11 +460,11 @@ func BenchmarkLoadRealWLs(b *testing.B) { dir := b.TempDir() require.NoError(b, fileutil.CopyDirs(srcDir, dir)) - wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone) + wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), compression.None) require.NoError(b, err) b.Cleanup(func() { wal.Close() }) - wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone) + wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), compression.None) require.NoError(b, err) b.Cleanup(func() { wbl.Close() }) b.StartTimer() @@ -484,7 +485,7 @@ func BenchmarkLoadRealWLs(b *testing.B) { // While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the // returned results are correct. func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) defer func() { require.NoError(t, head.Close()) }() @@ -674,7 +675,7 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { } func TestHead_ReadWAL(t *testing.T) { - for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { entries := []interface{}{ []record.RefSeries{ @@ -756,7 +757,7 @@ func TestHead_ReadWAL(t *testing.T) { } func TestHead_WALMultiRef(t *testing.T) { - head, w := newTestHead(t, 1000, wlog.CompressionNone, false) + head, w := newTestHead(t, 1000, compression.None, false) require.NoError(t, head.Init(0)) @@ -791,7 +792,7 @@ func TestHead_WALMultiRef(t *testing.T) { require.NotEqual(t, ref1, ref2, "Refs are the same") require.NoError(t, head.Close()) - w, err = wlog.New(nil, nil, w.Dir(), wlog.CompressionNone) + w, err = wlog.New(nil, nil, w.Dir(), compression.None) require.NoError(t, err) opts := DefaultHeadOptions() @@ -816,7 +817,7 @@ func TestHead_WALMultiRef(t *testing.T) { } func TestHead_ActiveAppenders(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) defer head.Close() require.NoError(t, head.Init(0)) @@ -849,7 +850,7 @@ func TestHead_ActiveAppenders(t *testing.T) { } func TestHead_UnknownWALRecord(t *testing.T) { - head, w := newTestHead(t, 1000, wlog.CompressionNone, false) + head, w := newTestHead(t, 1000, compression.None, false) w.Log([]byte{255, 42}) require.NoError(t, head.Init(0)) require.NoError(t, head.Close()) @@ -861,7 +862,7 @@ func BenchmarkHead_Truncate(b *testing.B) { const total = 1e6 prepare := func(b *testing.B, churn int) *Head { - h, _ := newTestHead(b, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(b, 1000, compression.None, false) b.Cleanup(func() { require.NoError(b, h.Close()) }) @@ -930,7 +931,7 @@ func BenchmarkHead_Truncate(b *testing.B) { } func TestHead_Truncate(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -1240,7 +1241,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { } func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { - for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { entries := []interface{}{ []record.RefSeries{ @@ -1320,7 +1321,7 @@ func TestHeadDeleteSimple(t *testing.T) { }, } - for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { for _, c := range cases { head, w := newTestHead(t, 1000, compress, false) @@ -1402,7 +1403,7 @@ func TestHeadDeleteSimple(t *testing.T) { } func TestDeleteUntilCurMax(t *testing.T) { - hb, _ := newTestHead(t, 1000000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 1000000, compression.None, false) defer func() { require.NoError(t, hb.Close()) }() @@ -1455,7 +1456,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { numSamples := 10000 // Enough samples to cause a checkpoint. - hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false) + hb, w := newTestHead(t, int64(numSamples)*10, compression.None, false) for i := 0; i < numSamples; i++ { app := hb.Appender(context.Background()) @@ -1547,7 +1548,7 @@ func TestDelete_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []chunks.Sample{} } - hb, _ := newTestHead(t, 100000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 100000, compression.None, false) defer func() { require.NoError(t, hb.Close()) }() @@ -1915,7 +1916,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. const chunkRange = 1000 - h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) + h, _ := newTestHead(t, chunkRange, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -1974,7 +1975,7 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. const chunkRange = 1000 - h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) + h, _ := newTestHead(t, chunkRange, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2033,7 +2034,7 @@ func TestGCSeriesAccess(t *testing.T) { } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2063,7 +2064,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { } func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2094,7 +2095,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { } func TestHead_LogRollback(t *testing.T) { - for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { h, w := newTestHead(t, 1000, compress, false) defer func() { @@ -2118,7 +2119,7 @@ func TestHead_LogRollback(t *testing.T) { } func TestHead_ReturnsSortedLabelValues(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2182,7 +2183,7 @@ func TestWalRepair_DecodingError(t *testing.T) { 5, }, } { - for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) { dir := t.TempDir() @@ -2256,9 +2257,9 @@ func TestWblRepair_DecodingError(t *testing.T) { // Fill the wbl and corrupt it. { - wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone) + wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), compression.None) require.NoError(t, err) - wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone) + wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), compression.None) require.NoError(t, err) for i := 1; i <= totalRecs; i++ { @@ -2322,7 +2323,7 @@ func TestHeadReadWriterRepair(t *testing.T) { walDir := filepath.Join(dir, "wal") // Fill the chunk segments and corrupt it. { - w, err := wlog.New(nil, nil, walDir, wlog.CompressionNone) + w, err := wlog.New(nil, nil, walDir, compression.None) require.NoError(t, err) opts := DefaultHeadOptions() @@ -2391,7 +2392,7 @@ func TestHeadReadWriterRepair(t *testing.T) { } func TestNewWalSegmentOnTruncate(t *testing.T) { - h, wal := newTestHead(t, 1000, wlog.CompressionNone, false) + h, wal := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2421,7 +2422,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { } func TestAddDuplicateLabelName(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2504,7 +2505,7 @@ func TestMemSeriesIsolation(t *testing.T) { } // Test isolation without restart of Head. - hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 1000, compression.None, false) i := addSamples(hb) testIsolation(hb, i) @@ -2566,11 +2567,11 @@ func TestMemSeriesIsolation(t *testing.T) { require.NoError(t, hb.Close()) // Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay. - hb, w := newTestHead(t, 1000, wlog.CompressionNone, false) + hb, w := newTestHead(t, 1000, compression.None, false) i = addSamples(hb) require.NoError(t, hb.Close()) - wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, wlog.CompressionNone) + wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, compression.None) require.NoError(t, err) opts := DefaultHeadOptions() opts.ChunkRange = 1000 @@ -2619,7 +2620,7 @@ func TestIsolationRollback(t *testing.T) { } // Rollback after a failed append and test if the low watermark has progressed anyway. - hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2650,7 +2651,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2687,7 +2688,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2718,7 +2719,7 @@ func TestIsolationWithoutAdd(t *testing.T) { t.Skip("skipping test since tsdb isolation is disabled") } - hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + hb, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, hb.Close()) }() @@ -2843,7 +2844,7 @@ func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario, opti } func testHeadSeriesChunkRace(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -2878,7 +2879,7 @@ func testHeadSeriesChunkRace(t *testing.T) { } func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, head.Close()) }() @@ -2939,7 +2940,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { } func TestHeadLabelValuesWithMatchers(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) ctx := context.Background() @@ -3015,7 +3016,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) { } func TestHeadLabelNamesWithMatchers(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, head.Close()) }() @@ -3085,7 +3086,7 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) { func TestHeadShardedPostings(t *testing.T) { headOpts := newTestHeadDefaultOptions(1000, false) headOpts.EnableSharding = true - head, _ := newTestHeadWithOptions(t, wlog.CompressionNone, headOpts) + head, _ := newTestHeadWithOptions(t, compression.None, headOpts) defer func() { require.NoError(t, head.Close()) }() @@ -3148,7 +3149,7 @@ func TestHeadShardedPostings(t *testing.T) { } func TestErrReuseAppender(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, head.Close()) }() @@ -3184,7 +3185,7 @@ func TestErrReuseAppender(t *testing.T) { func TestHeadMintAfterTruncation(t *testing.T) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) + head, _ := newTestHead(t, chunkRange, compression.None, false) app := head.Appender(context.Background()) _, err := app.Append(0, labels.FromStrings("a", "b"), 100, 100) @@ -3218,7 +3219,7 @@ func TestHeadMintAfterTruncation(t *testing.T) { func TestHeadExemplars(t *testing.T) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false) + head, _ := newTestHead(t, chunkRange, compression.None, false) app := head.Appender(context.Background()) l := labels.FromStrings("trace_id", "123") @@ -3240,7 +3241,7 @@ func TestHeadExemplars(t *testing.T) { func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { chunkRange := int64(2000) - head, _ := newTestHead(b, chunkRange, wlog.CompressionNone, false) + head, _ := newTestHead(b, chunkRange, compression.None, false) b.Cleanup(func() { require.NoError(b, head.Close()) }) ctx := context.Background() @@ -3685,7 +3686,7 @@ func TestAppendHistogram(t *testing.T) { l := labels.FromStrings("a", "b") for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} { t.Run(strconv.Itoa(numHistograms), func(t *testing.T) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -3789,7 +3790,7 @@ func TestAppendHistogram(t *testing.T) { } func TestHistogramInWALAndMmapChunk(t *testing.T) { - head, _ := newTestHead(t, 3000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 3000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -3940,7 +3941,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { // Restart head. require.NoError(t, head.Close()) startHead := func() { - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -3969,7 +3970,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { } func TestChunkSnapshot(t *testing.T) { - head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false) + head, _ := newTestHead(t, 120*4, compression.None, false) defer func() { head.opts.EnableMemorySnapshotOnShutdown = false require.NoError(t, head.Close()) @@ -4062,7 +4063,7 @@ func TestChunkSnapshot(t *testing.T) { } openHeadAndCheckReplay := func() { - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -4256,7 +4257,7 @@ func TestChunkSnapshot(t *testing.T) { } func TestSnapshotError(t *testing.T) { - head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false) + head, _ := newTestHead(t, 120*4, compression.None, false) defer func() { head.opts.EnableMemorySnapshotOnShutdown = false require.NoError(t, head.Close()) @@ -4316,7 +4317,7 @@ func TestSnapshotError(t *testing.T) { require.NoError(t, f.Close()) // Create new Head which should replay this snapshot. - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) // Testing https://github.com/prometheus/prometheus/issues/9437 with the registry. head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil) @@ -4345,7 +4346,7 @@ func TestSnapshotError(t *testing.T) { opts := head.opts opts.SeriesCallback = c - w, err = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -4367,7 +4368,7 @@ func TestSnapshotError(t *testing.T) { func TestHistogramMetrics(t *testing.T) { numHistograms := 10 - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -4397,7 +4398,7 @@ func TestHistogramMetrics(t *testing.T) { require.Equal(t, float64(expHSamples), prom_testutil.ToFloat64(head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram))) require.NoError(t, head.Close()) - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -4419,7 +4420,7 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { t.Helper() l := labels.FromStrings("a", "b") numHistograms := 20 - head, _ := newTestHead(t, 100000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 100000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -4571,7 +4572,7 @@ func TestHistogramCounterResetHeader(t *testing.T) { for _, floatHisto := range []bool{true} { // FIXME t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) { l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -4692,7 +4693,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) { for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) { l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, wlog.CompressionNone, true) + head, _ := newTestHead(t, 1000, compression.None, true) head.opts.OutOfOrderCapMax.Store(5) head.opts.EnableOOONativeHistograms.Store(true) @@ -5003,7 +5004,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { // Tests https://github.com/prometheus/prometheus/issues/9725. func TestChunkSnapshotReplayBug(t *testing.T) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) // Write few series records and samples such that the series references are not in order in the WAL @@ -5070,7 +5071,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) { func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { dir := t.TempDir() - wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) // Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots. @@ -5115,9 +5116,9 @@ func TestWBLReplay(t *testing.T) { func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5163,9 +5164,9 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { // Restart head. require.NoError(t, h.Close()) - wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) h, err = NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) @@ -5209,9 +5210,9 @@ func TestOOOMmapReplay(t *testing.T) { func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5261,9 +5262,9 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) { // Restart head. require.NoError(t, h.Close()) - wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) h, err = NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) @@ -5292,7 +5293,7 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) { } func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + h, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -5336,7 +5337,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { require.NoError(t, h.Close()) - wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, wlog.CompressionNone) + wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, compression.None) require.NoError(t, err) h, err = NewHead(nil, nil, wal, nil, h.opts, nil) require.NoError(t, err) @@ -5371,7 +5372,7 @@ func (c *unsupportedChunk) Encoding() chunkenc.Encoding { // Tests https://github.com/prometheus/prometheus/issues/10277. func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5404,7 +5405,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { addChunks() require.NoError(t, h.Close()) - wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None) require.NoError(t, err) mmapFilePath := filepath.Join(dir, "chunks_head", "000001") @@ -5430,7 +5431,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) { var err error openHead := func() { - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5513,9 +5514,9 @@ func TestOOOAppendWithNoSeries(t *testing.T) { func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5606,9 +5607,9 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) { func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) - oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy) + oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy) require.NoError(t, err) opts := DefaultHeadOptions() @@ -5653,7 +5654,7 @@ func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) { func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -5718,7 +5719,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { require.NoError(t, head.Close()) require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -5729,7 +5730,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { l := labels.FromStrings("a", "b") - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -5794,7 +5795,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { require.NoError(t, head.Close()) require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) require.NoError(t, err) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) @@ -5804,7 +5805,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { } func TestSnapshotAheadOfWALError(t *testing.T) { - head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false) + head, _ := newTestHead(t, 120*4, compression.None, false) head.opts.EnableMemorySnapshotOnShutdown = true // Add a sample to fill WAL. app := head.Appender(context.Background()) @@ -5827,7 +5828,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) { // to keep using the same snapshot directory instead of a random one. require.NoError(t, os.RemoveAll(head.wal.Dir())) head.opts.EnableMemorySnapshotOnShutdown = false - w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) // Add a sample to fill WAL. @@ -5846,7 +5847,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) { // Create new Head which should detect the incorrect index and delete the snapshot. head.opts.EnableMemorySnapshotOnShutdown = true - w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone) + w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None) head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) require.NoError(t, head.Init(math.MinInt64)) @@ -5865,7 +5866,7 @@ func BenchmarkCuttingHeadHistogramChunks(b *testing.B) { ) samples := histogram.GenerateBigTestHistograms(numSamples, numBuckets) - h, _ := newTestHead(b, DefaultBlockDuration, wlog.CompressionNone, false) + h, _ := newTestHead(b, DefaultBlockDuration, compression.None, false) defer func() { require.NoError(b, h.Close()) }() @@ -5982,7 +5983,7 @@ func TestCuttingNewHeadChunks(t *testing.T) { } for testName, tc := range testCases { t.Run(testName, func(t *testing.T) { - h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -6050,7 +6051,7 @@ func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) { numSamples := 1000 baseTS := int64(1695209650) - h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -6117,7 +6118,7 @@ func TestWALSampleAndExemplarOrder(t *testing.T) { for testName, tc := range testcases { t.Run(testName, func(t *testing.T) { - h, w := newTestHead(t, 1000, wlog.CompressionNone, false) + h, w := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -6154,7 +6155,7 @@ func TestWALSampleAndExemplarOrder(t *testing.T) { // `signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0xbb03d1` // panic, that we have seen in the wild once. func TestHeadCompactionWhileAppendAndCommitExemplar(t *testing.T) { - h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) app := h.Appender(context.Background()) lbls := labels.FromStrings("foo", "bar") ref, err := app.Append(0, lbls, 1, 1) @@ -6267,7 +6268,7 @@ func TestPostingsCardinalityStats(t *testing.T) { } func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) t.Cleanup(func() { head.Close() }) ls := labels.FromStrings(labels.MetricName, "test") @@ -6482,7 +6483,7 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -6567,7 +6568,7 @@ func TestHeadAppender_AppendWithCT(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - h, w := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + h, w := newTestHead(t, DefaultBlockDuration, compression.None, false) defer func() { require.NoError(t, h.Close()) }() @@ -6596,7 +6597,7 @@ func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) { // was compactable using default values for min and max times, `Head.compactable()` // would return true which is incorrect. This test verifies that we short-circuit // the check when the head has not yet had any samples added. - head, _ := newTestHead(t, 1, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1, compression.None, false) defer func() { require.NoError(t, head.Close()) }() @@ -6638,7 +6639,7 @@ func TestHeadAppendHistogramAndCommitConcurrency(t *testing.T) { } func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(storage.Appender, int) error) { - head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + head, _ := newTestHead(t, 1000, compression.None, false) defer func() { require.NoError(t, head.Close()) }() diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index adbd3278ba..7f080842a7 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -28,7 +28,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/tsdb/compression" ) type chunkInterval struct { @@ -300,7 +300,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { for perm, intervals := range permutations { for _, headChunk := range []bool{false, true} { t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) { - h, _ := newTestHead(t, 1000, wlog.CompressionNone, true) + h, _ := newTestHead(t, 1000, compression.None, true) defer func() { require.NoError(t, h.Close()) }() @@ -388,7 +388,7 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) { //nolint:revive // unexported-return. func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) { chunkRange := int64(2000) - head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true) + head, _ := newTestHead(t, chunkRange, compression.None, true) head.opts.EnableOOONativeHistograms.Store(true) t.Cleanup(func() { require.NoError(t, head.Close()) }) diff --git a/tsdb/record/bench_test.go b/tsdb/record/bench_test.go new file mode 100644 index 0000000000..1476ba84ff --- /dev/null +++ b/tsdb/record/bench_test.go @@ -0,0 +1,172 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package record_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/tsdb/compression" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/util/testrecord" +) + +func TestEncodeDecode(t *testing.T) { + for _, tcase := range []testrecord.RefSamplesCase{ + testrecord.Realistic1000Samples, + testrecord.Realistic1000WithCTSamples, + testrecord.WorstCase1000Samples, + } { + var ( + enc record.Encoder + dec record.Decoder + buf []byte + ) + + s := testrecord.GenTestRefSamplesCase(t, tcase) + + { + got, err := dec.Samples(enc.Samples(s, nil), nil) + require.NoError(t, err) + require.Equal(t, s, got) + } + + // With byte buffer (append!) + { + buf = make([]byte, 10, 1e5) + got, err := dec.Samples(enc.Samples(s, buf)[10:], nil) + require.NoError(t, err) + require.Equal(t, s, got) + } + + // With sample slice + { + samples := make([]record.RefSample, 0, len(s)+1) + got, err := dec.Samples(enc.Samples(s, nil), samples) + require.NoError(t, err) + require.Equal(t, s, got) + } + + // With compression. + { + buf := enc.Samples(s, nil) + + cEnc, err := compression.NewEncoder() + require.NoError(t, err) + buf, _, err = cEnc.Encode(compression.Zstd, buf, nil) + require.NoError(t, err) + + buf, err = compression.NewDecoder().Decode(compression.Zstd, buf, nil) + require.NoError(t, err) + + got, err := dec.Samples(buf, nil) + require.NoError(t, err) + require.Equal(t, s, got) + } + } +} + +var ( + compressions = []compression.Type{compression.None, compression.Snappy, compression.Zstd} + dataCases = []testrecord.RefSamplesCase{ + testrecord.Realistic1000Samples, + testrecord.Realistic1000WithCTSamples, + testrecord.WorstCase1000Samples, + } +) + +/* + export bench=encode-v1 && go test ./tsdb/record/... \ + -run '^$' -bench '^BenchmarkEncode_Samples' \ + -benchtime 5s -count 6 -cpu 2 -timeout 999m \ + | tee ${bench}.txt +*/ +func BenchmarkEncode_Samples(b *testing.B) { + for _, compr := range compressions { + for _, data := range dataCases { + b.Run(fmt.Sprintf("compr=%v/data=%v", compr, data), func(b *testing.B) { + var ( + samples = testrecord.GenTestRefSamplesCase(b, data) + enc record.Encoder + buf []byte + cBuf []byte + ) + + cEnc, err := compression.NewEncoder() + require.NoError(b, err) + + // Warm up. + buf = enc.Samples(samples, buf[:0]) + cBuf, _, err = cEnc.Encode(compr, buf, cBuf[:0]) + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + buf = enc.Samples(samples, buf[:0]) + b.ReportMetric(float64(len(buf)), "B/rec") + + cBuf, _, _ = cEnc.Encode(compr, buf, cBuf[:0]) + b.ReportMetric(float64(len(cBuf)), "B/compressed-rec") + } + }) + } + } +} + +/* + export bench=decode-v1 && go test ./tsdb/record/... \ + -run '^$' -bench '^BenchmarkDecode_Samples' \ + -benchtime 5s -count 6 -cpu 2 -timeout 999m \ + | tee ${bench}.txt +*/ +func BenchmarkDecode_Samples(b *testing.B) { + for _, compr := range compressions { + for _, data := range dataCases { + b.Run(fmt.Sprintf("compr=%v/data=%v", compr, data), func(b *testing.B) { + var ( + samples = testrecord.GenTestRefSamplesCase(b, data) + enc record.Encoder + dec record.Decoder + cDec = compression.NewDecoder() + cBuf []byte + samplesBuf []record.RefSample + ) + + buf := enc.Samples(samples, nil) + + cEnc, err := compression.NewEncoder() + require.NoError(b, err) + + buf, _, err = cEnc.Encode(compr, buf, nil) + require.NoError(b, err) + + // Warm up. + cBuf, err = cDec.Decode(compr, buf, cBuf[:0]) + require.NoError(b, err) + samplesBuf, err = dec.Samples(cBuf, samplesBuf[:0]) + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + cBuf, _ = cDec.Decode(compr, buf, cBuf[:0]) + samplesBuf, _ = dec.Samples(cBuf, samplesBuf[:0]) + } + }) + } + } +} diff --git a/tsdb/record/record.go b/tsdb/record/record.go index c6152e5e02..a1567dde1c 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -163,7 +163,7 @@ type RefSeries struct { // TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogramSample. type RefSample struct { Ref chunks.HeadSeriesRef - T, CT int64 + CT, T int64 V float64 } @@ -361,23 +361,23 @@ func (d *Decoder) samplesWithCT(dec *encoding.Decbuf, samples []RefSample) ([]Re } var ( baseRef = dec.Be64() - baseTime = dec.Be64int64() baseCT = dec.Be64int64() + baseTime = dec.Be64int64() ) // Allow 1 byte for each varint and 8 for the value; the output slice must be at least that big. - if minSize := dec.Len() / (1 + 1 + 8); cap(samples) < minSize { + if minSize := dec.Len() / (1 + 1 + 1 + 8); cap(samples) < minSize { samples = make([]RefSample, 0, minSize) } for len(dec.B) > 0 && dec.Err() == nil { dref := dec.Varint64() - dtime := dec.Varint64() dCT := dec.Varint64() + dtime := dec.Varint64() val := dec.Be64() samples = append(samples, RefSample{ Ref: chunks.HeadSeriesRef(int64(baseRef) + dref), - T: baseTime + dtime, CT: baseCT + dCT, + T: baseTime + dtime, V: math.Float64frombits(val), }) } @@ -763,13 +763,13 @@ func (e *Encoder) samplesWithCT(samples []RefSample, b []byte) []byte { first := samples[0] buf.PutBE64(uint64(first.Ref)) - buf.PutBE64int64(first.T) buf.PutBE64int64(first.CT) + buf.PutBE64int64(first.T) for _, s := range samples { buf.PutVarint64(int64(s.Ref) - int64(first.Ref)) - buf.PutVarint64(s.T - first.T) buf.PutVarint64(s.CT - first.CT) + buf.PutVarint64(s.T - first.T) buf.PutBE64(math.Float64bits(s.V)) } return buf.Get() diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index 797c33ac97..158a2c45fd 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -75,6 +75,7 @@ func TestRecord_EncodeDecode(t *testing.T) { require.NoError(t, err) require.Equal(t, metadata, decMetadata) + // Without CT. samples := []RefSample{ {Ref: 0, T: 12423423, V: 1.2345}, {Ref: 123, T: -1231, V: -123}, @@ -84,6 +85,7 @@ func TestRecord_EncodeDecode(t *testing.T) { require.NoError(t, err) require.Equal(t, samples, decSamples) + // With CT. samplesWithCT := []RefSample{ {Ref: 0, T: 12423423, CT: 14, V: 1.2345}, {Ref: 123, T: -1231, CT: 14, V: -123}, @@ -486,7 +488,7 @@ type recordsMaker struct { } // BenchmarkWAL_HistogramEncoding measures efficiency of encoding classic -// histograms and native historgrams with custom buckets (NHCB). +// histograms and native histograms with custom buckets (NHCB). func BenchmarkWAL_HistogramEncoding(b *testing.B) { initClassicRefs := func(labelCount, histograms, buckets int) (series []RefSeries, floatSamples []RefSample, histSamples []RefHistogramSample) { ref := chunks.HeadSeriesRef(0) diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index b48ecaeec3..621b388c50 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -25,6 +25,8 @@ import ( "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/tsdb/compression" + "github.com/prometheus/common/promslog" "github.com/prometheus/prometheus/model/histogram" @@ -170,7 +172,7 @@ func TestCheckpoint(t *testing.T) { } } - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() @@ -385,7 +387,7 @@ func TestCheckpoint(t *testing.T) { func TestCheckpointNoTmpFolderAfterError(t *testing.T) { // Create a new wlog with invalid data. dir := t.TempDir() - w, err := NewSize(nil, nil, dir, 64*1024, CompressionNone) + w, err := NewSize(nil, nil, dir, 64*1024, compression.None) require.NoError(t, err) var enc record.Encoder require.NoError(t, w.Log(enc.Series([]record.RefSeries{ diff --git a/tsdb/wlog/reader.go b/tsdb/wlog/reader.go index a744b0cc4b..128dc287fe 100644 --- a/tsdb/wlog/reader.go +++ b/tsdb/wlog/reader.go @@ -21,8 +21,7 @@ import ( "hash/crc32" "io" - "github.com/golang/snappy" - "github.com/klauspost/compress/zstd" + "github.com/prometheus/prometheus/tsdb/compression" ) // Reader reads WAL records from an io.Reader. @@ -31,7 +30,7 @@ type Reader struct { err error rec []byte compressBuf []byte - zstdReader *zstd.Decoder + cDec *compression.Decoder buf [pageSize]byte total int64 // Total bytes processed. curRecTyp recType // Used for checking that the last record is not torn. @@ -39,9 +38,7 @@ type Reader struct { // NewReader returns a new reader. func NewReader(r io.Reader) *Reader { - // Calling zstd.NewReader with a nil io.Reader and no options cannot return an error. - zstdReader, _ := zstd.NewReader(nil) - return &Reader{rdr: r, zstdReader: zstdReader} + return &Reader{rdr: r, cDec: compression.NewDecoder()} } // Next advances the reader to the next records and returns true if it exists. @@ -67,7 +64,6 @@ func (r *Reader) next() (err error) { hdr := r.buf[:recordHeaderSize] buf := r.buf[recordHeaderSize:] - r.rec = r.rec[:0] r.compressBuf = r.compressBuf[:0] i := 0 @@ -77,8 +73,13 @@ func (r *Reader) next() (err error) { } r.total++ r.curRecTyp = recTypeFromHeader(hdr[0]) - isSnappyCompressed := hdr[0]&snappyMask == snappyMask - isZstdCompressed := hdr[0]&zstdMask == zstdMask + // TODO(bwplotka): Handle unknown compressions. + compr := compression.None + if hdr[0]&snappyMask == snappyMask { + compr = compression.Snappy + } else if hdr[0]&zstdMask == zstdMask { + compr = compression.Zstd + } // Gobble up zero bytes. if r.curRecTyp == recPageTerm { @@ -133,26 +134,14 @@ func (r *Reader) next() (err error) { if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { return fmt.Errorf("unexpected checksum %x, expected %x", c, crc) } - - if isSnappyCompressed || isZstdCompressed { - r.compressBuf = append(r.compressBuf, buf[:length]...) - } else { - r.rec = append(r.rec, buf[:length]...) - } - if err := validateRecord(r.curRecTyp, i); err != nil { return err } + + r.compressBuf = append(r.compressBuf, buf[:length]...) if r.curRecTyp == recLast || r.curRecTyp == recFull { - if isSnappyCompressed && len(r.compressBuf) > 0 { - // The snappy library uses `len` to calculate if we need a new buffer. - // In order to allocate as few buffers as possible make the length - // equal to the capacity. - r.rec = r.rec[:cap(r.rec)] - r.rec, err = snappy.Decode(r.rec, r.compressBuf) - return err - } else if isZstdCompressed && len(r.compressBuf) > 0 { - r.rec, err = r.zstdReader.DecodeAll(r.compressBuf, r.rec[:0]) + r.rec, err = r.cDec.Decode(compr, r.compressBuf, r.rec[:0]) + if err != nil { return err } return nil diff --git a/tsdb/wlog/reader_test.go b/tsdb/wlog/reader_test.go index 2ac63cbf15..aafc1b2989 100644 --- a/tsdb/wlog/reader_test.go +++ b/tsdb/wlog/reader_test.go @@ -31,6 +31,8 @@ import ( "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/tsdb/compression" + "github.com/prometheus/common/promslog" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" @@ -315,7 +317,7 @@ func allSegments(dir string) (io.ReadCloser, error) { func TestReaderFuzz(t *testing.T) { for name, fn := range readerConstructors { - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) { dir := t.TempDir() @@ -354,7 +356,7 @@ func TestReaderFuzz(t *testing.T) { func TestReaderFuzz_Live(t *testing.T) { logger := promslog.NewNopLogger() - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() @@ -444,7 +446,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) { logger := promslog.NewNopLogger() dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize, CompressionNone) + w, err := NewSize(nil, nil, dir, pageSize, compression.None) require.NoError(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -484,7 +486,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { logger := promslog.NewNopLogger() dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize*2, CompressionNone) + w, err := NewSize(nil, nil, dir, pageSize*2, compression.None) require.NoError(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -531,7 +533,7 @@ func TestReaderData(t *testing.T) { for name, fn := range readerConstructors { t.Run(name, func(t *testing.T) { - w, err := New(nil, nil, dir, CompressionSnappy) + w, err := New(nil, nil, dir, compression.Snappy) require.NoError(t, err) sr, err := allSegments(dir) diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index fbe5735734..9fb01893b7 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -18,7 +18,6 @@ import ( "fmt" "io" "log/slog" - "math" "os" "path/filepath" "strconv" @@ -28,6 +27,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/promslog" + "github.com/prometheus/prometheus/util/zeropool" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/tsdb/record" @@ -109,6 +110,9 @@ type Watcher struct { // For testing, stop when we hit this segment. MaxSegment int + + readSeriesPool zeropool.Pool[[]record.RefSeries] + readSamplesPool zeropool.Pool[[]record.RefSample] } func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { @@ -332,26 +336,6 @@ func (w *Watcher) findSegmentForIndex(index int) (int, error) { return -1, errors.New("failed to find segment for index") } -func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error { - err := w.readSegment(r, segmentNum, tail) - - // Ignore all errors reading to end of segment whilst replaying the WAL. - if !tail { - if err != nil && !errors.Is(err, io.EOF) { - w.logger.Warn("Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) - } else if r.Offset() != size { - w.logger.Warn("Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size) - } - return ErrIgnorable - } - - // Otherwise, when we are tailing, non-EOFs are fatal. - if err != nil && !errors.Is(err, io.EOF) { - return err - } - return nil -} - // Use tail true to indicate that the reader is currently on a segment that is // actively being written to. If false, assume it's a full segment and we're // replaying it on start to cache the series records. @@ -364,15 +348,18 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { reader := NewLiveReader(w.logger, w.readerMetrics, segment) - size := int64(math.MaxInt64) if !tail { - var err error - size, err = getSegmentSize(w.walDir, segmentNum) + size, err := getSegmentSize(w.walDir, segmentNum) if err != nil { return fmt.Errorf("getSegmentSize: %w", err) } - - return w.readAndHandleError(reader, segmentNum, tail, size) + // Ignore all errors reading to end of segment whilst replaying the WAL. + if err := w.readSegmentSeries(reader, segmentNum); err != nil && !errors.Is(err, io.EOF) { + w.logger.Warn("Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) + } else if reader.Offset() != size { + w.logger.Warn("Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size) + } + return ErrIgnorable } checkpointTicker := time.NewTicker(checkpointPeriod) @@ -418,23 +405,21 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { } if last > segmentNum { - return w.readAndHandleError(reader, segmentNum, tail, size) + return w.readSegment(reader, segmentNum) } continue // we haven't read due to a notification in quite some time, try reading anyways case <-readTicker.C: w.logger.Debug("Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout) - err := w.readAndHandleError(reader, segmentNum, tail, size) - if err != nil { + if err := w.readSegment(reader, segmentNum); err != nil { return err } // reset the ticker so we don't read too often readTicker.Reset(readTimeout) case <-w.readNotify: - err := w.readAndHandleError(reader, segmentNum, tail, size) - if err != nil { + if err := w.readSegment(reader, segmentNum); err != nil { return err } // reset the ticker so we don't read too often @@ -475,20 +460,11 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { return nil } -// Read from a segment and pass the details to w.writer. -// Also used with readCheckpoint - implements segmentReadFn. -func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { +// readSegmentSeries reads the series records into w.writer from a segment. +func (w *Watcher) readSegmentSeries(r *LiveReader, segmentNum int) error { var ( - dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely. - series []record.RefSeries - samples []record.RefSample - samplesToSend []record.RefSample - exemplars []record.RefExemplar - histograms []record.RefHistogramSample - histogramsToSend []record.RefHistogramSample - floatHistograms []record.RefFloatHistogramSample - floatHistogramsToSend []record.RefFloatHistogramSample - metadata []record.RefMetadata + dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely. + series []record.RefSeries ) for r.Next() && !isClosed(w.quit) { rec := r.Record() @@ -502,14 +478,57 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { return err } w.writer.StoreSeries(series, segmentNum) + case record.Unknown: + // Could be corruption, or reading from a WAL from a newer Prometheus. + w.recordDecodeFailsMetric.Inc() + default: + // We're not interested in other types of records. + } + } + if err := r.Err(); err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("segment %d: %w", segmentNum, err) + } + return nil +} + +// readSegment reads all known records into w.writer from a segment. +func (w *Watcher) readSegment(r *LiveReader, segmentNum int) (err error) { + var ( + dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely. + series = w.readSeriesPool.Get()[:0] + samples = w.readSamplesPool.Get()[:0] + samplesToSend = w.readSamplesPool.Get()[:0] + // TODO(bwplotka): Add similar pools to exemplars, histograms and meta. + exemplars []record.RefExemplar + histograms []record.RefHistogramSample + histogramsToSend []record.RefHistogramSample + floatHistograms []record.RefFloatHistogramSample + floatHistogramsToSend []record.RefFloatHistogramSample + metadata []record.RefMetadata + ) + + defer func() { + // NOTE(bwplotka): This assumes no-one uses those arrays after calling WriteTo methods. + w.readSeriesPool.Put(series) + w.readSamplesPool.Put(samples) + w.readSamplesPool.Put(samplesToSend) + }() + + for r.Next() && !isClosed(w.quit) { + rec := r.Record() + w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc() + + switch dec.Type(rec) { + case record.Series: + series, err = dec.Series(rec, series[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + w.writer.StoreSeries(series, segmentNum) case record.Samples, record.SamplesWithCT: - // If we're not tailing a segment we can ignore any samples records we see. - // This speeds up replay of the WAL by > 10x. - if !tail { - break - } - samples, err := dec.Samples(rec, samples[:0]) + samples, err = dec.Samples(rec, samples[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err @@ -534,12 +553,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { if !w.sendExemplars { break } - // If we're not tailing a segment we can ignore any exemplars records we see. - // This speeds up replay of the WAL significantly. - if !tail { - break - } - exemplars, err := dec.Exemplars(rec, exemplars[:0]) + exemplars, err = dec.Exemplars(rec, exemplars[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err @@ -551,10 +565,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { if !w.sendHistograms { break } - if !tail { - break - } - histograms, err := dec.HistogramSamples(rec, histograms[:0]) + histograms, err = dec.HistogramSamples(rec, histograms[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err @@ -579,10 +590,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { if !w.sendHistograms { break } - if !tail { - break - } - floatHistograms, err := dec.FloatHistogramSamples(rec, floatHistograms[:0]) + floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err @@ -621,15 +629,14 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { // We're not interested in other types of records. } } - if err := r.Err(); err != nil { + if err := r.Err(); err != nil && !errors.Is(err, io.EOF) { return fmt.Errorf("segment %d: %w", segmentNum, err) } return nil } -// Go through all series in a segment updating the segmentNum, so we can delete older series. -// Used with readCheckpoint - implements segmentReadFn. -func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error { +// readSegmentForGC goes through all series in a segment updating the segmentNum, so we can delete older series. +func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int) error { var ( dec = record.NewDecoder(labels.NewSymbolTable()) // Needed for decoding; labels do not outlive this function. series []record.RefSeries @@ -655,7 +662,7 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error // We're only interested in series. } } - if err := r.Err(); err != nil { + if err := r.Err(); err != nil && !errors.Is(err, io.EOF) { return fmt.Errorf("segment %d: %w", segmentNum, err) } return nil @@ -666,7 +673,7 @@ func (w *Watcher) SetStartTime(t time.Time) { w.startTimestamp = timestamp.FromTime(t) } -type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) error +type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int) error // Read all the series records from a Checkpoint directory. func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error { @@ -693,7 +700,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err } r := NewLiveReader(w.logger, w.readerMetrics, sr) - err = readFn(w, r, index, false) + err = readFn(w, r, index) sr.Close() if err != nil && !errors.Is(err, io.EOF) { return fmt.Errorf("readSegment: %w", err) diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 786912704e..24aafe63a8 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -14,6 +14,8 @@ package wlog import ( "fmt" + "log/slog" + "math" "math/rand" "os" "path" @@ -27,9 +29,12 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" + "github.com/prometheus/prometheus/util/testrecord" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/compression" "github.com/prometheus/prometheus/tsdb/record" ) @@ -123,7 +128,7 @@ func (wtm *writeToMock) SeriesReset(index int) { } } -func (wtm *writeToMock) checkNumSeries() int { +func (wtm *writeToMock) seriesStored() int { wtm.seriesLock.Lock() defer wtm.seriesLock.Unlock() return len(wtm.seriesSegmentIndexes) @@ -142,7 +147,7 @@ func TestTailSamples(t *testing.T) { const samplesCount = 250 const exemplarsCount = 25 const histogramsCount = 50 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { now := time.Now() @@ -264,8 +269,7 @@ func TestTailSamples(t *testing.T) { require.NoError(t, err) reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment) - // Use tail true so we can ensure we got the right number of samples. - watcher.readSegment(reader, i, true) + require.NoError(t, watcher.readSegment(reader, i)) require.NoError(t, segment.Close()) } @@ -274,9 +278,9 @@ func TestTailSamples(t *testing.T) { expectedExemplars := seriesCount * exemplarsCount expectedHistograms := seriesCount * histogramsCount * 2 retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() >= expectedSeries + return wt.seriesStored() >= expectedSeries }) - require.Equal(t, expectedSeries, wt.checkNumSeries(), "did not receive the expected number of series") + require.Equal(t, expectedSeries, wt.seriesStored(), "did not receive the expected number of series") require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples") require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars") require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms") @@ -290,7 +294,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() wdir := path.Join(dir, "wal") @@ -344,7 +348,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { expected := seriesCount require.Eventually(t, func() bool { - return wt.checkNumSeries() == expected + return wt.seriesStored() == expected }, 20*time.Second, 1*time.Second) watcher.Stop() }) @@ -358,7 +362,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() @@ -434,7 +438,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { expected := seriesCount * 2 require.Eventually(t, func() bool { - return wt.checkNumSeries() == expected + return wt.seriesStored() == expected }, 10*time.Second, 1*time.Second) watcher.Stop() }) @@ -446,7 +450,7 @@ func TestReadCheckpoint(t *testing.T) { const seriesCount = 10 const samplesCount = 250 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() @@ -504,10 +508,10 @@ func TestReadCheckpoint(t *testing.T) { expectedSeries := seriesCount retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() >= expectedSeries + return wt.seriesStored() >= expectedSeries }) watcher.Stop() - require.Equal(t, expectedSeries, wt.checkNumSeries()) + require.Equal(t, expectedSeries, wt.seriesStored()) }) } } @@ -519,7 +523,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { const seriesCount = 20 const samplesCount = 300 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { dir := t.TempDir() @@ -590,11 +594,11 @@ func TestCheckpointSeriesReset(t *testing.T) { const seriesCount = 20 const samplesCount = 350 testCases := []struct { - compress CompressionType + compress compression.Type segments int }{ - {compress: CompressionNone, segments: 14}, - {compress: CompressionSnappy, segments: 13}, + {compress: compression.None, segments: 14}, + {compress: compression.Snappy, segments: 13}, } for _, tc := range testCases { @@ -647,10 +651,10 @@ func TestCheckpointSeriesReset(t *testing.T) { expected := seriesCount retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() >= expected + return wt.seriesStored() >= expected }) require.Eventually(t, func() bool { - return wt.checkNumSeries() == seriesCount + return wt.seriesStored() == seriesCount }, 10*time.Second, 1*time.Second) _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(_ chunks.HeadSeriesRef) bool { return true }, 0) @@ -669,7 +673,7 @@ func TestCheckpointSeriesReset(t *testing.T) { // many series records you end up with and change the last Equals check accordingly // or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10) require.Eventually(t, func() bool { - return wt.checkNumSeries() == tc.segments + return wt.seriesStored() == tc.segments }, 20*time.Second, 1*time.Second) }) } @@ -681,7 +685,7 @@ func TestRun_StartupTime(t *testing.T) { const seriesCount = 20 const samplesCount = 300 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(string(compress), func(t *testing.T) { dir := t.TempDir() @@ -774,7 +778,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) { const seriesCount = 10 const samplesCount = 50 - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { t.Run(string(compress), func(t *testing.T) { dir := t.TempDir() @@ -812,7 +816,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) { // The watcher went through 00000000 and is tailing the next one. retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() == seriesCount + return wt.seriesStored() == seriesCount }) // In the meantime, add some new segments in bulk. @@ -826,9 +830,120 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) { require.NoError(t, g.Wait()) // All series and samples were read. - require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read. + require.Equal(t, (segmentsToRead+1)*seriesCount, wt.seriesStored()) // Series from 00000000 are also read. require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended) require.NoError(t, w.Close()) }) } } + +const ( + seriesRecords = 100 // Targets * Scrapes + seriesPerRecord = 10 // New series per scrape. + sampleRecords = seriesRecords // Targets * Scrapes +) + +var ( + compressions = []compression.Type{compression.None, compression.Snappy, compression.Zstd} + dataCases = []testrecord.RefSamplesCase{ + testrecord.Realistic1000Samples, + testrecord.Realistic1000WithCTSamples, + testrecord.WorstCase1000Samples, + } +) + +/* + export bench=watcher-read-v2 && go test ./tsdb/wlog/... \ + -run '^$' -bench '^BenchmarkWatcherReadSegment' \ + -benchtime 5s -count 6 -cpu 2 -timeout 999m \ + | tee ${bench}.txt +*/ +func BenchmarkWatcherReadSegment(b *testing.B) { + for _, compress := range compressions { + for _, data := range dataCases { + b.Run(fmt.Sprintf("compr=%v/data=%v", compress, data), func(b *testing.B) { + dir := b.TempDir() + wdir := path.Join(dir, "wal") + require.NoError(b, os.Mkdir(wdir, 0o777)) + + generateRealisticSegment(b, wdir, compress, data) + logger := promslog.NewNopLogger() + + b.Run("func=readSegmentSeries", func(b *testing.B) { + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + // Required as WorstCase1000Samples have math.MinInt32 timestamps. + watcher.startTimestamp = math.MinInt32 - 1 + watcher.SetMetrics() + + // Validate our test data first. + testReadFn(b, wdir, 0, logger, watcher, (*Watcher).readSegmentSeries) + require.Equal(b, seriesRecords*seriesPerRecord, wt.seriesStored()) + require.Equal(b, 0, wt.samplesAppended) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + testReadFn(b, wdir, 0, logger, watcher, (*Watcher).readSegmentSeries) + } + }) + b.Run("func=readSegment", func(b *testing.B) { + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + // Required as WorstCase1000Samples have math.MinInt32 timestamps. + watcher.startTimestamp = math.MinInt32 - 1 + watcher.SetMetrics() + + // Validate our test data first. + testReadFn(b, wdir, 0, logger, watcher, (*Watcher).readSegment) + require.Equal(b, seriesRecords*seriesPerRecord, wt.seriesStored()) + require.Equal(b, sampleRecords*1000, wt.samplesAppended) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + testReadFn(b, wdir, 0, logger, watcher, (*Watcher).readSegment) + } + }) + }) + } + } +} + +func generateRealisticSegment(tb testing.TB, wdir string, compress compression.Type, samplesCase testrecord.RefSamplesCase) { + tb.Helper() + + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) + require.NoError(tb, err) + defer w.Close() + + // Generate fake data for WAL. + for i := range seriesRecords { + series := make([]record.RefSeries, seriesPerRecord) + for j := range seriesPerRecord { + series[j] = record.RefSeries{ + Ref: chunks.HeadSeriesRef(i*seriesPerRecord + j), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", 0), "foo", "bar", "foo1", "bar2", "sdfsasdgfadsfgaegga", "dgsfzdsf§sfawf2"), + } + } + rec := enc.Series(series, nil) + require.NoError(tb, w.Log(rec)) + } + for i := 0; i < sampleRecords; i++ { + rec := enc.Samples(testrecord.GenTestRefSamplesCase(tb, samplesCase), nil) + require.NoError(tb, w.Log(rec)) + } + // Build segment. + require.NoError(tb, w.flushPage(true)) +} + +func testReadFn(tb testing.TB, wdir string, segNum int, logger *slog.Logger, watcher *Watcher, fn segmentReadFn) { + tb.Helper() + + segment, err := OpenReadSegment(SegmentName(wdir, segNum)) + require.NoError(tb, err) + + r := NewLiveReader(logger, watcher.readerMetrics, segment) + require.NoError(tb, fn(watcher, r, segNum)) +} diff --git a/tsdb/wlog/wlog.go b/tsdb/wlog/wlog.go index 54c257d61a..7148611bdb 100644 --- a/tsdb/wlog/wlog.go +++ b/tsdb/wlog/wlog.go @@ -29,11 +29,11 @@ import ( "sync" "time" - "github.com/golang/snappy" - "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/promslog" + "github.com/prometheus/prometheus/tsdb/compression" + "github.com/prometheus/prometheus/tsdb/fileutil" ) @@ -169,24 +169,16 @@ func OpenReadSegment(fn string) (*Segment, error) { return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil } -type CompressionType string - -const ( - CompressionNone CompressionType = "none" - CompressionSnappy CompressionType = "snappy" - CompressionZstd CompressionType = "zstd" -) - // ParseCompressionType parses the two compression-related configuration values and returns the CompressionType. If // compression is enabled but the compressType is unrecognized, we default to Snappy compression. -func ParseCompressionType(compress bool, compressType string) CompressionType { +func ParseCompressionType(compress bool, compressType string) compression.Type { if compress { if compressType == "zstd" { - return CompressionZstd + return compression.Zstd } - return CompressionSnappy + return compression.Snappy } - return CompressionNone + return compression.None } // WL is a write log that stores records in segment files. @@ -210,9 +202,9 @@ type WL struct { stopc chan chan struct{} actorc chan func() closed bool // To allow calling Close() more than once without blocking. - compress CompressionType + compress compression.Type + cEnc *compression.Encoder compressBuf []byte - zstdWriter *zstd.Encoder WriteNotified WriteNotified @@ -309,13 +301,13 @@ func newWLMetrics(w *WL, r prometheus.Registerer) *wlMetrics { } // New returns a new WAL over the given directory. -func New(logger *slog.Logger, reg prometheus.Registerer, dir string, compress CompressionType) (*WL, error) { +func New(logger *slog.Logger, reg prometheus.Registerer, dir string, compress compression.Type) (*WL, error) { return NewSize(logger, reg, dir, DefaultSegmentSize, compress) } // NewSize returns a new write log over the given directory. // New segments are created with the specified size. -func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress CompressionType) (*WL, error) { +func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress compression.Type) (*WL, error) { if segmentSize%pageSize != 0 { return nil, errors.New("invalid segment size") } @@ -326,13 +318,9 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment logger = promslog.NewNopLogger() } - var zstdWriter *zstd.Encoder - if compress == CompressionZstd { - var err error - zstdWriter, err = zstd.NewWriter(nil) - if err != nil { - return nil, err - } + cEnc, err := compression.NewEncoder() + if err != nil { + return nil, err } w := &WL{ @@ -343,7 +331,7 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment actorc: make(chan func(), 100), stopc: make(chan chan struct{}), compress: compress, - zstdWriter: zstdWriter, + cEnc: cEnc, } prefix := "prometheus_tsdb_wal_" if filepath.Base(dir) == WblDirName { @@ -382,22 +370,16 @@ func Open(logger *slog.Logger, dir string) (*WL, error) { if logger == nil { logger = promslog.NewNopLogger() } - zstdWriter, err := zstd.NewWriter(nil) - if err != nil { - return nil, err - } w := &WL{ - dir: dir, - logger: logger, - zstdWriter: zstdWriter, + dir: dir, + logger: logger, } - return w, nil } // CompressionType returns if compression is enabled on this WAL. -func (w *WL) CompressionType() CompressionType { +func (w *WL) CompressionType() compression.Type { return w.compress } @@ -705,7 +687,7 @@ func (w *WL) Log(recs ...[]byte) error { // - the final record of a batch // - the record is bigger than the page size // - the current page is full. -func (w *WL) log(rec []byte, final bool) error { +func (w *WL) log(rec []byte, final bool) (err error) { // When the last page flush failed the page will remain full. // When the page is full, need to flush it before trying to add more records to it. if w.page.full() { @@ -716,25 +698,12 @@ func (w *WL) log(rec []byte, final bool) error { // Compress the record before calculating if a new segment is needed. compressed := false - if w.compress == CompressionSnappy && len(rec) > 0 { - // If MaxEncodedLen is less than 0 the record is too large to be compressed. - if len(rec) > 0 && snappy.MaxEncodedLen(len(rec)) >= 0 { - // The snappy library uses `len` to calculate if we need a new buffer. - // In order to allocate as few buffers as possible make the length - // equal to the capacity. - w.compressBuf = w.compressBuf[:cap(w.compressBuf)] - w.compressBuf = snappy.Encode(w.compressBuf, rec) - if len(w.compressBuf) < len(rec) { - rec = w.compressBuf - compressed = true - } - } - } else if w.compress == CompressionZstd && len(rec) > 0 { - w.compressBuf = w.zstdWriter.EncodeAll(rec, w.compressBuf[:0]) - if len(w.compressBuf) < len(rec) { - rec = w.compressBuf - compressed = true - } + w.compressBuf, compressed, err = w.cEnc.Encode(w.compress, rec, w.compressBuf) + if err != nil { + return err + } + if compressed { + rec = w.compressBuf } // If the record is too big to fit within the active page in the current @@ -773,9 +742,9 @@ func (w *WL) log(rec []byte, final bool) error { typ = recMiddle } if compressed { - if w.compress == CompressionSnappy { + if w.compress == compression.Snappy { typ |= snappyMask - } else if w.compress == CompressionZstd { + } else if w.compress == compression.Zstd { typ |= zstdMask } } diff --git a/tsdb/wlog/wlog_test.go b/tsdb/wlog/wlog_test.go index d195aaee2f..b57193ed64 100644 --- a/tsdb/wlog/wlog_test.go +++ b/tsdb/wlog/wlog_test.go @@ -29,6 +29,8 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/goleak" + "github.com/prometheus/prometheus/tsdb/compression" + "github.com/prometheus/prometheus/tsdb/fileutil" ) @@ -125,7 +127,7 @@ func TestWALRepair_ReadingError(t *testing.T) { // then corrupt a given record in a given segment. // As a result we want a repaired WAL with given intact records. segSize := 3 * pageSize - w, err := NewSize(nil, nil, dir, segSize, CompressionNone) + w, err := NewSize(nil, nil, dir, segSize, compression.None) require.NoError(t, err) var records [][]byte @@ -150,7 +152,7 @@ func TestWALRepair_ReadingError(t *testing.T) { require.NoError(t, f.Close()) - w, err = NewSize(nil, nil, dir, segSize, CompressionNone) + w, err = NewSize(nil, nil, dir, segSize, compression.None) require.NoError(t, err) defer w.Close() @@ -222,7 +224,7 @@ func TestCorruptAndCarryOn(t *testing.T) { // Produce a WAL with a two segments of 3 pages with 3 records each, // so when we truncate the file we're guaranteed to split a record. { - w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone) + w, err := NewSize(logger, nil, dir, segmentSize, compression.None) require.NoError(t, err) for i := 0; i < 18; i++ { @@ -293,7 +295,7 @@ func TestCorruptAndCarryOn(t *testing.T) { err = sr.Close() require.NoError(t, err) - w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone) + w, err := NewSize(logger, nil, dir, segmentSize, compression.None) require.NoError(t, err) err = w.Repair(corruptionErr) @@ -336,7 +338,7 @@ func TestCorruptAndCarryOn(t *testing.T) { // TestClose ensures that calling Close more than once doesn't panic and doesn't block. func TestClose(t *testing.T) { dir := t.TempDir() - w, err := NewSize(nil, nil, dir, pageSize, CompressionNone) + w, err := NewSize(nil, nil, dir, pageSize, compression.None) require.NoError(t, err) require.NoError(t, w.Close()) require.Error(t, w.Close()) @@ -349,7 +351,7 @@ func TestSegmentMetric(t *testing.T) { ) dir := t.TempDir() - w, err := NewSize(nil, nil, dir, segmentSize, CompressionNone) + w, err := NewSize(nil, nil, dir, segmentSize, compression.None) require.NoError(t, err) initialSegment := client_testutil.ToFloat64(w.metrics.currentSegment) @@ -368,7 +370,7 @@ func TestSegmentMetric(t *testing.T) { } func TestCompression(t *testing.T) { - bootstrap := func(compressed CompressionType) string { + bootstrap := func(compressed compression.Type) string { const ( segmentSize = pageSize recordSize = (pageSize / 2) - recordHeaderSize @@ -396,10 +398,10 @@ func TestCompression(t *testing.T) { } }() - dirUnCompressed := bootstrap(CompressionNone) + dirUnCompressed := bootstrap(compression.None) tmpDirs = append(tmpDirs, dirUnCompressed) - for _, compressionType := range []CompressionType{CompressionSnappy, CompressionZstd} { + for _, compressionType := range []compression.Type{compression.Snappy, compression.Zstd} { dirCompressed := bootstrap(compressionType) tmpDirs = append(tmpDirs, dirCompressed) @@ -443,7 +445,7 @@ func TestLogPartialWrite(t *testing.T) { t.Run(testName, func(t *testing.T) { dirPath := t.TempDir() - w, err := NewSize(nil, nil, dirPath, segmentSize, CompressionNone) + w, err := NewSize(nil, nil, dirPath, segmentSize, compression.None) require.NoError(t, err) // Replace the underlying segment file with a mocked one that injects a failure. @@ -510,7 +512,7 @@ func (f *faultySegmentFile) Write(p []byte) (int, error) { } func BenchmarkWAL_LogBatched(b *testing.B) { - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) { dir := b.TempDir() @@ -540,7 +542,7 @@ func BenchmarkWAL_LogBatched(b *testing.B) { } func BenchmarkWAL_Log(b *testing.B) { - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) { dir := b.TempDir() @@ -567,7 +569,7 @@ func TestUnregisterMetrics(t *testing.T) { reg := prometheus.NewRegistry() for i := 0; i < 2; i++ { - wl, err := New(promslog.NewNopLogger(), reg, t.TempDir(), CompressionNone) + wl, err := New(promslog.NewNopLogger(), reg, t.TempDir(), compression.None) require.NoError(t, err) require.NoError(t, wl.Close()) } diff --git a/util/testrecord/record.go b/util/testrecord/record.go new file mode 100644 index 0000000000..c035cd3fd9 --- /dev/null +++ b/util/testrecord/record.go @@ -0,0 +1,80 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testrecord + +import ( + "math" + "testing" + + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/record" +) + +type RefSamplesCase string + +const ( + Realistic1000Samples RefSamplesCase = "real1000" + Realistic1000WithCTSamples RefSamplesCase = "real1000-ct" + WorstCase1000Samples RefSamplesCase = "worst1000" +) + +func GenTestRefSamplesCase(t testing.TB, c RefSamplesCase) []record.RefSample { + t.Helper() + + ret := make([]record.RefSample, 1e3) + switch c { + case Realistic1000Samples: + for i := range ret { + ret[i].Ref = chunks.HeadSeriesRef(i) + ret[i].T = 12423423 + ret[i].V = highVarianceFloat(i) + } + case Realistic1000WithCTSamples: + for i := range ret { + ret[i].Ref = chunks.HeadSeriesRef(i) + // For cumulative or gauges, typically in one record from + // scrape we would have exactly same CT and T values. + ret[i].CT = 11234567 + ret[i].T = 12423423 + ret[i].V = highVarianceFloat(i) + } + case WorstCase1000Samples: + for i := range ret { + ret[i].Ref = chunks.HeadSeriesRef(i) + + // Worst case is when the values are significantly different + // to each other which breaks delta encoding. + ret[i].CT = highVarianceInt(i) + ret[i].T = highVarianceInt(i) + ret[i].V = highVarianceFloat(i) + } + default: + t.Fatal("unknown case", c) + } + return ret +} + +func highVarianceInt(i int) int64 { + if i%2 == 0 { + return math.MinInt32 + } + return math.MaxInt32 +} + +func highVarianceFloat(i int) float64 { + if i%2 == 0 { + return math.SmallestNonzeroFloat32 + } + return math.MaxFloat32 +}