Optimize Decode/Encode and WAL watching.
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-02-28 12:08:29 +00:00
parent aee78bdb31
commit d4dd997a07
21 changed files with 791 additions and 326 deletions

View file

@ -73,6 +73,7 @@ import (
"github.com/prometheus/prometheus/tracing"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/agent"
"github.com/prometheus/prometheus/tsdb/compression"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/documentcli"
"github.com/prometheus/prometheus/util/logging"
@ -442,7 +443,7 @@ func main() {
Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression)
serverOnlyFlag(a, "storage.tsdb.wal-compression-type", "Compression algorithm for the tsdb WAL.").
Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.tsdb.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd))
Hidden().Default(string(compression.Snappy)).EnumVar(&cfg.tsdb.WALCompressionType, string(compression.Snappy), string(compression.Zstd))
serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental.").
Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize)
@ -464,7 +465,7 @@ func main() {
Default("true").BoolVar(&cfg.agent.WALCompression)
agentOnlyFlag(a, "storage.agent.wal-compression-type", "Compression algorithm for the agent WAL.").
Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.agent.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd))
Hidden().Default(string(compression.Snappy)).EnumVar(&cfg.agent.WALCompressionType, string(compression.Snappy), string(compression.Zstd))
agentOnlyFlag(a, "storage.agent.wal-truncate-frequency",
"The frequency at which to truncate the WAL and remove old data.").

View file

@ -37,6 +37,7 @@ import (
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/compression"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
@ -66,7 +67,7 @@ type Options struct {
WALSegmentSize int
// WALCompression configures the compression type to use on records in the WAL.
WALCompression wlog.CompressionType
WALCompression compression.Type
// StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance.
StripeSize int
@ -90,7 +91,7 @@ type Options struct {
func DefaultOptions() *Options {
return &Options{
WALSegmentSize: wlog.DefaultSegmentSize,
WALCompression: wlog.CompressionNone,
WALCompression: compression.None,
StripeSize: tsdb.DefaultStripeSize,
TruncateFrequency: DefaultTruncateFrequency,
MinWALTime: DefaultMinWALTime,
@ -339,7 +340,7 @@ func validateOptions(opts *Options) *Options {
}
if opts.WALCompression == "" {
opts.WALCompression = wlog.CompressionNone
opts.WALCompression = compression.None
}
// Revert StripeSize to DefaultStripeSize if StripeSize is either 0 or not a power of 2.

View file

@ -38,11 +38,11 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/compression"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wlog"
)
func TestSplitByRange(t *testing.T) {
@ -1447,7 +1447,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
func TestHeadCompactionWithHistograms(t *testing.T) {
for _, floatTest := range []bool{true, false} {
t.Run(fmt.Sprintf("float=%t", floatTest), func(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
require.NoError(t, head.Init(0))
t.Cleanup(func() {
require.NoError(t, head.Close())
@ -1627,11 +1627,11 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
c.numBuckets,
),
func(t *testing.T) {
oldHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
oldHead, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
t.Cleanup(func() {
require.NoError(t, oldHead.Close())
})
sparseHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
sparseHead, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
t.Cleanup(func() {
require.NoError(t, sparseHead.Close())
})

View file

@ -0,0 +1,120 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compression
import (
"errors"
"fmt"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
)
type Type string
const (
// None represents no compression case.
// None it's a default when Type is empty.
None Type = "none"
// Snappy represents snappy block format.
Snappy Type = "snappy"
// Zstd represents zstd compression.
Zstd Type = "zstd"
)
type Encoder struct {
w *zstd.Encoder
}
func NewEncoder() (*Encoder, error) {
e := &Encoder{}
w, err := zstd.NewWriter(nil)
if err != nil {
return nil, err
}
e.w = w
return e, nil
}
// Encode returns the encoded form of src for the given compression type. It also
// returns the indicator if the compression was performed. Encode may skip
// compressing for None type, but also when src is too large e.g. for Snappy block format.
//
// The buf is used as a buffer for returned encoding, and it must not overlap with
// src. It is valid to pass a nil buf.
//
// Encoder may be nil compression types other than Zstd.
func (e *Encoder) Encode(t Type, src, buf []byte) (_ []byte, compressed bool, err error) {
switch {
case len(src) == 0, t == "", t == None:
return src, false, nil
case t == Snappy:
// If MaxEncodedLen is less than 0 the record is too large to be compressed.
if snappy.MaxEncodedLen(len(src)) < 0 {
return src, false, nil
}
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
buf = buf[:cap(buf)]
return snappy.Encode(buf, src), true, nil
case t == Zstd:
if e == nil {
return nil, false, errors.New("zstd requested but encoder was not initialized with NewEncoder()")
}
return e.w.EncodeAll(src, buf[:0]), true, nil
default:
return nil, false, fmt.Errorf("unsupported compression type: %s", t)
}
}
type Decoder struct {
r *zstd.Decoder
}
func NewDecoder() *Decoder {
d := &Decoder{}
// Calling zstd.NewReader with a nil io.Reader and no options cannot return an error.
r, _ := zstd.NewReader(nil)
d.r = r
return d
}
// Decode returns the decoded form of src or error, given expected compression type.
//
// The buf is used as a buffer for the returned decoded entry, and it must not
// overlap with src. It is valid to pass a nil buf.
//
// Decoder may be nil compression types other than Zstd.
func (d *Decoder) Decode(t Type, src, buf []byte) (_ []byte, err error) {
switch {
case len(src) == 0, t == "", t == None:
return src, nil
case t == Snappy:
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
buf = buf[:cap(buf)]
return snappy.Decode(buf, src)
case t == Zstd:
if d == nil {
return nil, errors.New("zstd requested but Decoder was not initialized with NewDecoder()")
}
return d.r.DecodeAll(src, buf[:0])
default:
return nil, fmt.Errorf("unsupported compression type: %s", t)
}
}

View file

@ -41,6 +41,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/compression"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met.
@ -80,7 +81,7 @@ func DefaultOptions() *Options {
MaxBlockDuration: DefaultBlockDuration,
NoLockfile: false,
SamplesPerChunk: DefaultSamplesPerChunk,
WALCompression: wlog.CompressionNone,
WALCompression: compression.None,
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
@ -124,7 +125,7 @@ type Options struct {
NoLockfile bool
// WALCompression configures the compression type to use on records in the WAL.
WALCompression wlog.CompressionType
WALCompression compression.Type
// Maximum number of CPUs that can simultaneously processes WAL replay.
// If it is <=0, then GOMAXPROCS is used.

View file

@ -58,6 +58,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/compression"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/record"
@ -1962,7 +1963,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
dir := t.TempDir()
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone)
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None)
require.NoError(t, err)
var enc record.Encoder
@ -2006,7 +2007,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone)
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None)
require.NoError(t, err)
var enc record.Encoder
@ -2407,7 +2408,7 @@ func TestDBReadOnly(t *testing.T) {
}
// Add head to test DBReadOnly WAL reading capabilities.
w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), wlog.CompressionSnappy)
w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), compression.Snappy)
require.NoError(t, err)
h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir)
require.NoError(t, h.Close())
@ -3058,7 +3059,7 @@ func TestCompactHead(t *testing.T) {
NoLockfile: true,
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
WALCompression: wlog.CompressionSnappy,
WALCompression: compression.Snappy,
}
db, err := Open(dbDir, promslog.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil)
@ -4662,7 +4663,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) {
ctx := context.Background()
numSamples := 10000
hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false)
hb, w := newTestHead(t, int64(numSamples)*10, compression.None, false)
// Add some series so we can append metadata to them.
app := hb.Appender(ctx)
@ -7019,7 +7020,7 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) {
resetMmapToOriginal() // We neet to reset because new duplicate chunks can be written above.
// Removing m-map markers in WBL by rewriting it.
newWbl, err := wlog.New(promslog.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), wlog.CompressionNone)
newWbl, err := wlog.New(promslog.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), compression.None)
require.NoError(t, err)
sr, err := wlog.NewSegmentsReader(originalWblDir)
require.NoError(t, err)

View file

@ -676,7 +676,7 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
return storage.SeriesRef(s.ref), nil
}
func (a *headAppender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
func (a *headAppender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, _ int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// TODO(bwplotka): Add support for native histograms with CTs in WAL; add/consolidate records.
// We ignore CT for now.
return a.AppendHistogram(ref, lset, t, h, fh)

View file

@ -29,7 +29,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/tsdb/compression"
)
func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
@ -132,7 +132,7 @@ func BenchmarkHead_WalCommit(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
h, w := newTestHead(b, 10000, wlog.CompressionNone, false)
h, w := newTestHead(b, 10000, compression.None, false)
b.Cleanup(func() {
if h != nil {
h.Close()

View file

@ -46,6 +46,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/compression"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/record"
@ -68,11 +69,11 @@ func newTestHeadDefaultOptions(chunkRange int64, oooEnabled bool) *HeadOptions {
return opts
}
func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) {
func newTestHead(t testing.TB, chunkRange int64, compressWAL compression.Type, oooEnabled bool) (*Head, *wlog.WL) {
return newTestHeadWithOptions(t, compressWAL, newTestHeadDefaultOptions(chunkRange, oooEnabled))
}
func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts *HeadOptions) (*Head, *wlog.WL) {
func newTestHeadWithOptions(t testing.TB, compressWAL compression.Type, opts *HeadOptions) (*Head, *wlog.WL) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
require.NoError(t, err)
@ -92,7 +93,7 @@ func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts
func BenchmarkCreateSeries(b *testing.B) {
series := genSeries(b.N, 10, 0, 0)
h, _ := newTestHead(b, 10000, wlog.CompressionNone, false)
h, _ := newTestHead(b, 10000, compression.None, false)
b.Cleanup(func() {
require.NoError(b, h.Close())
})
@ -113,7 +114,7 @@ func BenchmarkHeadAppender_Append_Commit_ExistingSeries(b *testing.B) {
b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) {
for _, samplesPerAppend := range []int64{1, 2, 5, 100} {
b.Run(fmt.Sprintf("%d samples per append", samplesPerAppend), func(b *testing.B) {
h, _ := newTestHead(b, 10000, wlog.CompressionNone, false)
h, _ := newTestHead(b, 10000, compression.None, false)
b.Cleanup(func() { require.NoError(b, h.Close()) })
ts := int64(1000)
@ -294,11 +295,11 @@ func BenchmarkLoadWLs(b *testing.B) {
func(b *testing.B) {
dir := b.TempDir()
wal, err := wlog.New(nil, nil, dir, wlog.CompressionNone)
wal, err := wlog.New(nil, nil, dir, compression.None)
require.NoError(b, err)
var wbl *wlog.WL
if c.oooSeriesPct != 0 {
wbl, err = wlog.New(nil, nil, dir, wlog.CompressionNone)
wbl, err = wlog.New(nil, nil, dir, compression.None)
require.NoError(b, err)
}
@ -459,11 +460,11 @@ func BenchmarkLoadRealWLs(b *testing.B) {
dir := b.TempDir()
require.NoError(b, fileutil.CopyDirs(srcDir, dir))
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), compression.None)
require.NoError(b, err)
b.Cleanup(func() { wal.Close() })
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), compression.None)
require.NoError(b, err)
b.Cleanup(func() { wbl.Close() })
b.StartTimer()
@ -484,7 +485,7 @@ func BenchmarkLoadRealWLs(b *testing.B) {
// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the
// returned results are correct.
func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()
@ -674,7 +675,7 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
}
func TestHead_ReadWAL(t *testing.T) {
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
entries := []interface{}{
[]record.RefSeries{
@ -756,7 +757,7 @@ func TestHead_ReadWAL(t *testing.T) {
}
func TestHead_WALMultiRef(t *testing.T) {
head, w := newTestHead(t, 1000, wlog.CompressionNone, false)
head, w := newTestHead(t, 1000, compression.None, false)
require.NoError(t, head.Init(0))
@ -791,7 +792,7 @@ func TestHead_WALMultiRef(t *testing.T) {
require.NotEqual(t, ref1, ref2, "Refs are the same")
require.NoError(t, head.Close())
w, err = wlog.New(nil, nil, w.Dir(), wlog.CompressionNone)
w, err = wlog.New(nil, nil, w.Dir(), compression.None)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -816,7 +817,7 @@ func TestHead_WALMultiRef(t *testing.T) {
}
func TestHead_ActiveAppenders(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
defer head.Close()
require.NoError(t, head.Init(0))
@ -849,7 +850,7 @@ func TestHead_ActiveAppenders(t *testing.T) {
}
func TestHead_UnknownWALRecord(t *testing.T) {
head, w := newTestHead(t, 1000, wlog.CompressionNone, false)
head, w := newTestHead(t, 1000, compression.None, false)
w.Log([]byte{255, 42})
require.NoError(t, head.Init(0))
require.NoError(t, head.Close())
@ -861,7 +862,7 @@ func BenchmarkHead_Truncate(b *testing.B) {
const total = 1e6
prepare := func(b *testing.B, churn int) *Head {
h, _ := newTestHead(b, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(b, 1000, compression.None, false)
b.Cleanup(func() {
require.NoError(b, h.Close())
})
@ -930,7 +931,7 @@ func BenchmarkHead_Truncate(b *testing.B) {
}
func TestHead_Truncate(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -1240,7 +1241,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
}
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
entries := []interface{}{
[]record.RefSeries{
@ -1320,7 +1321,7 @@ func TestHeadDeleteSimple(t *testing.T) {
},
}
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
for _, c := range cases {
head, w := newTestHead(t, 1000, compress, false)
@ -1402,7 +1403,7 @@ func TestHeadDeleteSimple(t *testing.T) {
}
func TestDeleteUntilCurMax(t *testing.T) {
hb, _ := newTestHead(t, 1000000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 1000000, compression.None, false)
defer func() {
require.NoError(t, hb.Close())
}()
@ -1455,7 +1456,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
numSamples := 10000
// Enough samples to cause a checkpoint.
hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false)
hb, w := newTestHead(t, int64(numSamples)*10, compression.None, false)
for i := 0; i < numSamples; i++ {
app := hb.Appender(context.Background())
@ -1547,7 +1548,7 @@ func TestDelete_e2e(t *testing.T) {
seriesMap[labels.New(l...).String()] = []chunks.Sample{}
}
hb, _ := newTestHead(t, 100000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 100000, compression.None, false)
defer func() {
require.NoError(t, hb.Close())
}()
@ -1915,7 +1916,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
func TestGCChunkAccess(t *testing.T) {
// Put a chunk, select it. GC it and then access it.
const chunkRange = 1000
h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
h, _ := newTestHead(t, chunkRange, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -1974,7 +1975,7 @@ func TestGCChunkAccess(t *testing.T) {
func TestGCSeriesAccess(t *testing.T) {
// Put a series, select it. GC it and then access it.
const chunkRange = 1000
h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
h, _ := newTestHead(t, chunkRange, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2033,7 +2034,7 @@ func TestGCSeriesAccess(t *testing.T) {
}
func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2063,7 +2064,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
}
func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2094,7 +2095,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
}
func TestHead_LogRollback(t *testing.T) {
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
h, w := newTestHead(t, 1000, compress, false)
defer func() {
@ -2118,7 +2119,7 @@ func TestHead_LogRollback(t *testing.T) {
}
func TestHead_ReturnsSortedLabelValues(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2182,7 +2183,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
5,
},
} {
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) {
dir := t.TempDir()
@ -2256,9 +2257,9 @@ func TestWblRepair_DecodingError(t *testing.T) {
// Fill the wbl and corrupt it.
{
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), compression.None)
require.NoError(t, err)
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), compression.None)
require.NoError(t, err)
for i := 1; i <= totalRecs; i++ {
@ -2322,7 +2323,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
walDir := filepath.Join(dir, "wal")
// Fill the chunk segments and corrupt it.
{
w, err := wlog.New(nil, nil, walDir, wlog.CompressionNone)
w, err := wlog.New(nil, nil, walDir, compression.None)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -2391,7 +2392,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
}
func TestNewWalSegmentOnTruncate(t *testing.T) {
h, wal := newTestHead(t, 1000, wlog.CompressionNone, false)
h, wal := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2421,7 +2422,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
}
func TestAddDuplicateLabelName(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2504,7 +2505,7 @@ func TestMemSeriesIsolation(t *testing.T) {
}
// Test isolation without restart of Head.
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 1000, compression.None, false)
i := addSamples(hb)
testIsolation(hb, i)
@ -2566,11 +2567,11 @@ func TestMemSeriesIsolation(t *testing.T) {
require.NoError(t, hb.Close())
// Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay.
hb, w := newTestHead(t, 1000, wlog.CompressionNone, false)
hb, w := newTestHead(t, 1000, compression.None, false)
i = addSamples(hb)
require.NoError(t, hb.Close())
wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, wlog.CompressionNone)
wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, compression.None)
require.NoError(t, err)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
@ -2619,7 +2620,7 @@ func TestIsolationRollback(t *testing.T) {
}
// Rollback after a failed append and test if the low watermark has progressed anyway.
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, hb.Close())
}()
@ -2650,7 +2651,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) {
t.Skip("skipping test since tsdb isolation is disabled")
}
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, hb.Close())
}()
@ -2687,7 +2688,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
t.Skip("skipping test since tsdb isolation is disabled")
}
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2718,7 +2719,7 @@ func TestIsolationWithoutAdd(t *testing.T) {
t.Skip("skipping test since tsdb isolation is disabled")
}
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
hb, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, hb.Close())
}()
@ -2843,7 +2844,7 @@ func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario, opti
}
func testHeadSeriesChunkRace(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -2878,7 +2879,7 @@ func testHeadSeriesChunkRace(t *testing.T) {
}
func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()
@ -2939,7 +2940,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
}
func TestHeadLabelValuesWithMatchers(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() { require.NoError(t, head.Close()) })
ctx := context.Background()
@ -3015,7 +3016,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) {
}
func TestHeadLabelNamesWithMatchers(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()
@ -3085,7 +3086,7 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) {
func TestHeadShardedPostings(t *testing.T) {
headOpts := newTestHeadDefaultOptions(1000, false)
headOpts.EnableSharding = true
head, _ := newTestHeadWithOptions(t, wlog.CompressionNone, headOpts)
head, _ := newTestHeadWithOptions(t, compression.None, headOpts)
defer func() {
require.NoError(t, head.Close())
}()
@ -3148,7 +3149,7 @@ func TestHeadShardedPostings(t *testing.T) {
}
func TestErrReuseAppender(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()
@ -3184,7 +3185,7 @@ func TestErrReuseAppender(t *testing.T) {
func TestHeadMintAfterTruncation(t *testing.T) {
chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
head, _ := newTestHead(t, chunkRange, compression.None, false)
app := head.Appender(context.Background())
_, err := app.Append(0, labels.FromStrings("a", "b"), 100, 100)
@ -3218,7 +3219,7 @@ func TestHeadMintAfterTruncation(t *testing.T) {
func TestHeadExemplars(t *testing.T) {
chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
head, _ := newTestHead(t, chunkRange, compression.None, false)
app := head.Appender(context.Background())
l := labels.FromStrings("trace_id", "123")
@ -3240,7 +3241,7 @@ func TestHeadExemplars(t *testing.T) {
func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
chunkRange := int64(2000)
head, _ := newTestHead(b, chunkRange, wlog.CompressionNone, false)
head, _ := newTestHead(b, chunkRange, compression.None, false)
b.Cleanup(func() { require.NoError(b, head.Close()) })
ctx := context.Background()
@ -3685,7 +3686,7 @@ func TestAppendHistogram(t *testing.T) {
l := labels.FromStrings("a", "b")
for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} {
t.Run(strconv.Itoa(numHistograms), func(t *testing.T) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -3789,7 +3790,7 @@ func TestAppendHistogram(t *testing.T) {
}
func TestHistogramInWALAndMmapChunk(t *testing.T) {
head, _ := newTestHead(t, 3000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 3000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -3940,7 +3941,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
// Restart head.
require.NoError(t, head.Close())
startHead := func() {
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -3969,7 +3970,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
}
func TestChunkSnapshot(t *testing.T) {
head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false)
head, _ := newTestHead(t, 120*4, compression.None, false)
defer func() {
head.opts.EnableMemorySnapshotOnShutdown = false
require.NoError(t, head.Close())
@ -4062,7 +4063,7 @@ func TestChunkSnapshot(t *testing.T) {
}
openHeadAndCheckReplay := func() {
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -4256,7 +4257,7 @@ func TestChunkSnapshot(t *testing.T) {
}
func TestSnapshotError(t *testing.T) {
head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false)
head, _ := newTestHead(t, 120*4, compression.None, false)
defer func() {
head.opts.EnableMemorySnapshotOnShutdown = false
require.NoError(t, head.Close())
@ -4316,7 +4317,7 @@ func TestSnapshotError(t *testing.T) {
require.NoError(t, f.Close())
// Create new Head which should replay this snapshot.
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
// Testing https://github.com/prometheus/prometheus/issues/9437 with the registry.
head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil)
@ -4345,7 +4346,7 @@ func TestSnapshotError(t *testing.T) {
opts := head.opts
opts.SeriesCallback = c
w, err = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -4367,7 +4368,7 @@ func TestSnapshotError(t *testing.T) {
func TestHistogramMetrics(t *testing.T) {
numHistograms := 10
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -4397,7 +4398,7 @@ func TestHistogramMetrics(t *testing.T) {
require.Equal(t, float64(expHSamples), prom_testutil.ToFloat64(head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram)))
require.NoError(t, head.Close())
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -4419,7 +4420,7 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
t.Helper()
l := labels.FromStrings("a", "b")
numHistograms := 20
head, _ := newTestHead(t, 100000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 100000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -4571,7 +4572,7 @@ func TestHistogramCounterResetHeader(t *testing.T) {
for _, floatHisto := range []bool{true} { // FIXME
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -4692,7 +4693,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
head, _ := newTestHead(t, 1000, compression.None, true)
head.opts.OutOfOrderCapMax.Store(5)
head.opts.EnableOOONativeHistograms.Store(true)
@ -5003,7 +5004,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
// Tests https://github.com/prometheus/prometheus/issues/9725.
func TestChunkSnapshotReplayBug(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
// Write few series records and samples such that the series references are not in order in the WAL
@ -5070,7 +5071,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) {
func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
dir := t.TempDir()
wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
// Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots.
@ -5115,9 +5116,9 @@ func TestWBLReplay(t *testing.T) {
func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5163,9 +5164,9 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
// Restart head.
require.NoError(t, h.Close())
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)
@ -5209,9 +5210,9 @@ func TestOOOMmapReplay(t *testing.T) {
func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5261,9 +5262,9 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
// Restart head.
require.NoError(t, h.Close())
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)
@ -5292,7 +5293,7 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
}
func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
h, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -5336,7 +5337,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
require.NoError(t, h.Close())
wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, wlog.CompressionNone)
wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, compression.None)
require.NoError(t, err)
h, err = NewHead(nil, nil, wal, nil, h.opts, nil)
require.NoError(t, err)
@ -5371,7 +5372,7 @@ func (c *unsupportedChunk) Encoding() chunkenc.Encoding {
// Tests https://github.com/prometheus/prometheus/issues/10277.
func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5404,7 +5405,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
addChunks()
require.NoError(t, h.Close())
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None)
require.NoError(t, err)
mmapFilePath := filepath.Join(dir, "chunks_head", "000001")
@ -5430,7 +5431,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
var err error
openHead := func() {
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5513,9 +5514,9 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5606,9 +5607,9 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) {
func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.Snappy)
require.NoError(t, err)
opts := DefaultHeadOptions()
@ -5653,7 +5654,7 @@ func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) {
func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -5718,7 +5719,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
require.NoError(t, head.Close())
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -5729,7 +5730,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -5794,7 +5795,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
require.NoError(t, head.Close())
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
@ -5804,7 +5805,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
}
func TestSnapshotAheadOfWALError(t *testing.T) {
head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false)
head, _ := newTestHead(t, 120*4, compression.None, false)
head.opts.EnableMemorySnapshotOnShutdown = true
// Add a sample to fill WAL.
app := head.Appender(context.Background())
@ -5827,7 +5828,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) {
// to keep using the same snapshot directory instead of a random one.
require.NoError(t, os.RemoveAll(head.wal.Dir()))
head.opts.EnableMemorySnapshotOnShutdown = false
w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
// Add a sample to fill WAL.
@ -5846,7 +5847,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) {
// Create new Head which should detect the incorrect index and delete the snapshot.
head.opts.EnableMemorySnapshotOnShutdown = true
w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, compression.None)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
@ -5865,7 +5866,7 @@ func BenchmarkCuttingHeadHistogramChunks(b *testing.B) {
)
samples := histogram.GenerateBigTestHistograms(numSamples, numBuckets)
h, _ := newTestHead(b, DefaultBlockDuration, wlog.CompressionNone, false)
h, _ := newTestHead(b, DefaultBlockDuration, compression.None, false)
defer func() {
require.NoError(b, h.Close())
}()
@ -5982,7 +5983,7 @@ func TestCuttingNewHeadChunks(t *testing.T) {
}
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -6050,7 +6051,7 @@ func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) {
numSamples := 1000
baseTS := int64(1695209650)
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -6117,7 +6118,7 @@ func TestWALSampleAndExemplarOrder(t *testing.T) {
for testName, tc := range testcases {
t.Run(testName, func(t *testing.T) {
h, w := newTestHead(t, 1000, wlog.CompressionNone, false)
h, w := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -6154,7 +6155,7 @@ func TestWALSampleAndExemplarOrder(t *testing.T) {
// `signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0xbb03d1`
// panic, that we have seen in the wild once.
func TestHeadCompactionWhileAppendAndCommitExemplar(t *testing.T) {
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
app := h.Appender(context.Background())
lbls := labels.FromStrings("foo", "bar")
ref, err := app.Append(0, lbls, 1, 1)
@ -6267,7 +6268,7 @@ func TestPostingsCardinalityStats(t *testing.T) {
}
func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
t.Cleanup(func() { head.Close() })
ls := labels.FromStrings(labels.MetricName, "test")
@ -6482,7 +6483,7 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -6567,7 +6568,7 @@ func TestHeadAppender_AppendWithCT(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
h, w := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
h, w := newTestHead(t, DefaultBlockDuration, compression.None, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -6596,7 +6597,7 @@ func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) {
// was compactable using default values for min and max times, `Head.compactable()`
// would return true which is incorrect. This test verifies that we short-circuit
// the check when the head has not yet had any samples added.
head, _ := newTestHead(t, 1, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()
@ -6638,7 +6639,7 @@ func TestHeadAppendHistogramAndCommitConcurrency(t *testing.T) {
}
func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(storage.Appender, int) error) {
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
head, _ := newTestHead(t, 1000, compression.None, false)
defer func() {
require.NoError(t, head.Close())
}()

View file

@ -28,7 +28,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/tsdb/compression"
)
type chunkInterval struct {
@ -300,7 +300,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
for perm, intervals := range permutations {
for _, headChunk := range []bool{false, true} {
t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
h, _ := newTestHead(t, 1000, compression.None, true)
defer func() {
require.NoError(t, h.Close())
}()
@ -388,7 +388,7 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
//nolint:revive // unexported-return.
func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) {
chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true)
head, _ := newTestHead(t, chunkRange, compression.None, true)
head.opts.EnableOOONativeHistograms.Store(true)
t.Cleanup(func() { require.NoError(t, head.Close()) })

172
tsdb/record/bench_test.go Normal file
View file

@ -0,0 +1,172 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package record_test
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/tsdb/compression"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/util/testrecord"
)
func TestEncodeDecode(t *testing.T) {
for _, tcase := range []testrecord.RefSamplesCase{
testrecord.Realistic1000Samples,
testrecord.Realistic1000WithCTSamples,
testrecord.WorstCase1000Samples,
} {
var (
enc record.Encoder
dec record.Decoder
buf []byte
)
s := testrecord.GenTestRefSamplesCase(t, tcase)
{
got, err := dec.Samples(enc.Samples(s, nil), nil)
require.NoError(t, err)
require.Equal(t, s, got)
}
// With byte buffer (append!)
{
buf = make([]byte, 10, 1e5)
got, err := dec.Samples(enc.Samples(s, buf)[10:], nil)
require.NoError(t, err)
require.Equal(t, s, got)
}
// With sample slice
{
samples := make([]record.RefSample, 0, len(s)+1)
got, err := dec.Samples(enc.Samples(s, nil), samples)
require.NoError(t, err)
require.Equal(t, s, got)
}
// With compression.
{
buf := enc.Samples(s, nil)
cEnc, err := compression.NewEncoder()
require.NoError(t, err)
buf, _, err = cEnc.Encode(compression.Zstd, buf, nil)
require.NoError(t, err)
buf, err = compression.NewDecoder().Decode(compression.Zstd, buf, nil)
require.NoError(t, err)
got, err := dec.Samples(buf, nil)
require.NoError(t, err)
require.Equal(t, s, got)
}
}
}
var (
compressions = []compression.Type{compression.None, compression.Snappy, compression.Zstd}
dataCases = []testrecord.RefSamplesCase{
testrecord.Realistic1000Samples,
testrecord.Realistic1000WithCTSamples,
testrecord.WorstCase1000Samples,
}
)
/*
export bench=encode-v1 && go test ./tsdb/record/... \
-run '^$' -bench '^BenchmarkEncode_Samples' \
-benchtime 5s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
*/
func BenchmarkEncode_Samples(b *testing.B) {
for _, compr := range compressions {
for _, data := range dataCases {
b.Run(fmt.Sprintf("compr=%v/data=%v", compr, data), func(b *testing.B) {
var (
samples = testrecord.GenTestRefSamplesCase(b, data)
enc record.Encoder
buf []byte
cBuf []byte
)
cEnc, err := compression.NewEncoder()
require.NoError(b, err)
// Warm up.
buf = enc.Samples(samples, buf[:0])
cBuf, _, err = cEnc.Encode(compr, buf, cBuf[:0])
require.NoError(b, err)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
buf = enc.Samples(samples, buf[:0])
b.ReportMetric(float64(len(buf)), "B/rec")
cBuf, _, _ = cEnc.Encode(compr, buf, cBuf[:0])
b.ReportMetric(float64(len(cBuf)), "B/compressed-rec")
}
})
}
}
}
/*
export bench=decode-v1 && go test ./tsdb/record/... \
-run '^$' -bench '^BenchmarkDecode_Samples' \
-benchtime 5s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
*/
func BenchmarkDecode_Samples(b *testing.B) {
for _, compr := range compressions {
for _, data := range dataCases {
b.Run(fmt.Sprintf("compr=%v/data=%v", compr, data), func(b *testing.B) {
var (
samples = testrecord.GenTestRefSamplesCase(b, data)
enc record.Encoder
dec record.Decoder
cDec = compression.NewDecoder()
cBuf []byte
samplesBuf []record.RefSample
)
buf := enc.Samples(samples, nil)
cEnc, err := compression.NewEncoder()
require.NoError(b, err)
buf, _, err = cEnc.Encode(compr, buf, nil)
require.NoError(b, err)
// Warm up.
cBuf, err = cDec.Decode(compr, buf, cBuf[:0])
require.NoError(b, err)
samplesBuf, err = dec.Samples(cBuf, samplesBuf[:0])
require.NoError(b, err)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
cBuf, _ = cDec.Decode(compr, buf, cBuf[:0])
samplesBuf, _ = dec.Samples(cBuf, samplesBuf[:0])
}
})
}
}
}

View file

@ -163,7 +163,7 @@ type RefSeries struct {
// TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogramSample.
type RefSample struct {
Ref chunks.HeadSeriesRef
T, CT int64
CT, T int64
V float64
}
@ -361,23 +361,23 @@ func (d *Decoder) samplesWithCT(dec *encoding.Decbuf, samples []RefSample) ([]Re
}
var (
baseRef = dec.Be64()
baseTime = dec.Be64int64()
baseCT = dec.Be64int64()
baseTime = dec.Be64int64()
)
// Allow 1 byte for each varint and 8 for the value; the output slice must be at least that big.
if minSize := dec.Len() / (1 + 1 + 8); cap(samples) < minSize {
if minSize := dec.Len() / (1 + 1 + 1 + 8); cap(samples) < minSize {
samples = make([]RefSample, 0, minSize)
}
for len(dec.B) > 0 && dec.Err() == nil {
dref := dec.Varint64()
dtime := dec.Varint64()
dCT := dec.Varint64()
dtime := dec.Varint64()
val := dec.Be64()
samples = append(samples, RefSample{
Ref: chunks.HeadSeriesRef(int64(baseRef) + dref),
T: baseTime + dtime,
CT: baseCT + dCT,
T: baseTime + dtime,
V: math.Float64frombits(val),
})
}
@ -763,13 +763,13 @@ func (e *Encoder) samplesWithCT(samples []RefSample, b []byte) []byte {
first := samples[0]
buf.PutBE64(uint64(first.Ref))
buf.PutBE64int64(first.T)
buf.PutBE64int64(first.CT)
buf.PutBE64int64(first.T)
for _, s := range samples {
buf.PutVarint64(int64(s.Ref) - int64(first.Ref))
buf.PutVarint64(s.T - first.T)
buf.PutVarint64(s.CT - first.CT)
buf.PutVarint64(s.T - first.T)
buf.PutBE64(math.Float64bits(s.V))
}
return buf.Get()

View file

@ -75,6 +75,7 @@ func TestRecord_EncodeDecode(t *testing.T) {
require.NoError(t, err)
require.Equal(t, metadata, decMetadata)
// Without CT.
samples := []RefSample{
{Ref: 0, T: 12423423, V: 1.2345},
{Ref: 123, T: -1231, V: -123},
@ -84,6 +85,7 @@ func TestRecord_EncodeDecode(t *testing.T) {
require.NoError(t, err)
require.Equal(t, samples, decSamples)
// With CT.
samplesWithCT := []RefSample{
{Ref: 0, T: 12423423, CT: 14, V: 1.2345},
{Ref: 123, T: -1231, CT: 14, V: -123},
@ -486,7 +488,7 @@ type recordsMaker struct {
}
// BenchmarkWAL_HistogramEncoding measures efficiency of encoding classic
// histograms and native historgrams with custom buckets (NHCB).
// histograms and native histograms with custom buckets (NHCB).
func BenchmarkWAL_HistogramEncoding(b *testing.B) {
initClassicRefs := func(labelCount, histograms, buckets int) (series []RefSeries, floatSamples []RefSample, histSamples []RefHistogramSample) {
ref := chunks.HeadSeriesRef(0)

View file

@ -25,6 +25,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/tsdb/compression"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/model/histogram"
@ -170,7 +172,7 @@ func TestCheckpoint(t *testing.T) {
}
}
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
@ -385,7 +387,7 @@ func TestCheckpoint(t *testing.T) {
func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
// Create a new wlog with invalid data.
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, 64*1024, CompressionNone)
w, err := NewSize(nil, nil, dir, 64*1024, compression.None)
require.NoError(t, err)
var enc record.Encoder
require.NoError(t, w.Log(enc.Series([]record.RefSeries{

View file

@ -21,8 +21,7 @@ import (
"hash/crc32"
"io"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/prometheus/tsdb/compression"
)
// Reader reads WAL records from an io.Reader.
@ -31,7 +30,7 @@ type Reader struct {
err error
rec []byte
compressBuf []byte
zstdReader *zstd.Decoder
cDec *compression.Decoder
buf [pageSize]byte
total int64 // Total bytes processed.
curRecTyp recType // Used for checking that the last record is not torn.
@ -39,9 +38,7 @@ type Reader struct {
// NewReader returns a new reader.
func NewReader(r io.Reader) *Reader {
// Calling zstd.NewReader with a nil io.Reader and no options cannot return an error.
zstdReader, _ := zstd.NewReader(nil)
return &Reader{rdr: r, zstdReader: zstdReader}
return &Reader{rdr: r, cDec: compression.NewDecoder()}
}
// Next advances the reader to the next records and returns true if it exists.
@ -67,7 +64,6 @@ func (r *Reader) next() (err error) {
hdr := r.buf[:recordHeaderSize]
buf := r.buf[recordHeaderSize:]
r.rec = r.rec[:0]
r.compressBuf = r.compressBuf[:0]
i := 0
@ -77,8 +73,13 @@ func (r *Reader) next() (err error) {
}
r.total++
r.curRecTyp = recTypeFromHeader(hdr[0])
isSnappyCompressed := hdr[0]&snappyMask == snappyMask
isZstdCompressed := hdr[0]&zstdMask == zstdMask
// TODO(bwplotka): Handle unknown compressions.
compr := compression.None
if hdr[0]&snappyMask == snappyMask {
compr = compression.Snappy
} else if hdr[0]&zstdMask == zstdMask {
compr = compression.Zstd
}
// Gobble up zero bytes.
if r.curRecTyp == recPageTerm {
@ -133,26 +134,14 @@ func (r *Reader) next() (err error) {
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
return fmt.Errorf("unexpected checksum %x, expected %x", c, crc)
}
if isSnappyCompressed || isZstdCompressed {
r.compressBuf = append(r.compressBuf, buf[:length]...)
} else {
r.rec = append(r.rec, buf[:length]...)
}
if err := validateRecord(r.curRecTyp, i); err != nil {
return err
}
r.compressBuf = append(r.compressBuf, buf[:length]...)
if r.curRecTyp == recLast || r.curRecTyp == recFull {
if isSnappyCompressed && len(r.compressBuf) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
r.rec = r.rec[:cap(r.rec)]
r.rec, err = snappy.Decode(r.rec, r.compressBuf)
return err
} else if isZstdCompressed && len(r.compressBuf) > 0 {
r.rec, err = r.zstdReader.DecodeAll(r.compressBuf, r.rec[:0])
r.rec, err = r.cDec.Decode(compr, r.compressBuf, r.rec[:0])
if err != nil {
return err
}
return nil

View file

@ -31,6 +31,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/tsdb/compression"
"github.com/prometheus/common/promslog"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
@ -315,7 +317,7 @@ func allSegments(dir string) (io.ReadCloser, error) {
func TestReaderFuzz(t *testing.T) {
for name, fn := range readerConstructors {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) {
dir := t.TempDir()
@ -354,7 +356,7 @@ func TestReaderFuzz(t *testing.T) {
func TestReaderFuzz_Live(t *testing.T) {
logger := promslog.NewNopLogger()
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
@ -444,7 +446,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
logger := promslog.NewNopLogger()
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, pageSize, CompressionNone)
w, err := NewSize(nil, nil, dir, pageSize, compression.None)
require.NoError(t, err)
rec := make([]byte, pageSize-recordHeaderSize)
@ -484,7 +486,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
logger := promslog.NewNopLogger()
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, pageSize*2, CompressionNone)
w, err := NewSize(nil, nil, dir, pageSize*2, compression.None)
require.NoError(t, err)
rec := make([]byte, pageSize-recordHeaderSize)
@ -531,7 +533,7 @@ func TestReaderData(t *testing.T) {
for name, fn := range readerConstructors {
t.Run(name, func(t *testing.T) {
w, err := New(nil, nil, dir, CompressionSnappy)
w, err := New(nil, nil, dir, compression.Snappy)
require.NoError(t, err)
sr, err := allSegments(dir)

View file

@ -18,7 +18,6 @@ import (
"fmt"
"io"
"log/slog"
"math"
"os"
"path/filepath"
"strconv"
@ -28,6 +27,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/util/zeropool"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb/record"
@ -109,6 +110,9 @@ type Watcher struct {
// For testing, stop when we hit this segment.
MaxSegment int
readSeriesPool zeropool.Pool[[]record.RefSeries]
readSamplesPool zeropool.Pool[[]record.RefSample]
}
func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
@ -332,26 +336,6 @@ func (w *Watcher) findSegmentForIndex(index int) (int, error) {
return -1, errors.New("failed to find segment for index")
}
func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error {
err := w.readSegment(r, segmentNum, tail)
// Ignore all errors reading to end of segment whilst replaying the WAL.
if !tail {
if err != nil && !errors.Is(err, io.EOF) {
w.logger.Warn("Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
} else if r.Offset() != size {
w.logger.Warn("Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size)
}
return ErrIgnorable
}
// Otherwise, when we are tailing, non-EOFs are fatal.
if err != nil && !errors.Is(err, io.EOF) {
return err
}
return nil
}
// Use tail true to indicate that the reader is currently on a segment that is
// actively being written to. If false, assume it's a full segment and we're
// replaying it on start to cache the series records.
@ -364,15 +348,18 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
reader := NewLiveReader(w.logger, w.readerMetrics, segment)
size := int64(math.MaxInt64)
if !tail {
var err error
size, err = getSegmentSize(w.walDir, segmentNum)
size, err := getSegmentSize(w.walDir, segmentNum)
if err != nil {
return fmt.Errorf("getSegmentSize: %w", err)
}
return w.readAndHandleError(reader, segmentNum, tail, size)
// Ignore all errors reading to end of segment whilst replaying the WAL.
if err := w.readSegmentSeries(reader, segmentNum); err != nil && !errors.Is(err, io.EOF) {
w.logger.Warn("Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
} else if reader.Offset() != size {
w.logger.Warn("Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
}
return ErrIgnorable
}
checkpointTicker := time.NewTicker(checkpointPeriod)
@ -418,23 +405,21 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
}
if last > segmentNum {
return w.readAndHandleError(reader, segmentNum, tail, size)
return w.readSegment(reader, segmentNum)
}
continue
// we haven't read due to a notification in quite some time, try reading anyways
case <-readTicker.C:
w.logger.Debug("Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout)
err := w.readAndHandleError(reader, segmentNum, tail, size)
if err != nil {
if err := w.readSegment(reader, segmentNum); err != nil {
return err
}
// reset the ticker so we don't read too often
readTicker.Reset(readTimeout)
case <-w.readNotify:
err := w.readAndHandleError(reader, segmentNum, tail, size)
if err != nil {
if err := w.readSegment(reader, segmentNum); err != nil {
return err
}
// reset the ticker so we don't read too often
@ -475,20 +460,11 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
return nil
}
// Read from a segment and pass the details to w.writer.
// Also used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
// readSegmentSeries reads the series records into w.writer from a segment.
func (w *Watcher) readSegmentSeries(r *LiveReader, segmentNum int) error {
var (
dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely.
series []record.RefSeries
samples []record.RefSample
samplesToSend []record.RefSample
exemplars []record.RefExemplar
histograms []record.RefHistogramSample
histogramsToSend []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
floatHistogramsToSend []record.RefFloatHistogramSample
metadata []record.RefMetadata
dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely.
series []record.RefSeries
)
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
@ -502,14 +478,57 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
return err
}
w.writer.StoreSeries(series, segmentNum)
case record.Unknown:
// Could be corruption, or reading from a WAL from a newer Prometheus.
w.recordDecodeFailsMetric.Inc()
default:
// We're not interested in other types of records.
}
}
if err := r.Err(); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("segment %d: %w", segmentNum, err)
}
return nil
}
// readSegment reads all known records into w.writer from a segment.
func (w *Watcher) readSegment(r *LiveReader, segmentNum int) (err error) {
var (
dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely.
series = w.readSeriesPool.Get()[:0]
samples = w.readSamplesPool.Get()[:0]
samplesToSend = w.readSamplesPool.Get()[:0]
// TODO(bwplotka): Add similar pools to exemplars, histograms and meta.
exemplars []record.RefExemplar
histograms []record.RefHistogramSample
histogramsToSend []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
floatHistogramsToSend []record.RefFloatHistogramSample
metadata []record.RefMetadata
)
defer func() {
// NOTE(bwplotka): This assumes no-one uses those arrays after calling WriteTo methods.
w.readSeriesPool.Put(series)
w.readSamplesPool.Put(samples)
w.readSamplesPool.Put(samplesToSend)
}()
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
switch dec.Type(rec) {
case record.Series:
series, err = dec.Series(rec, series[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.StoreSeries(series, segmentNum)
case record.Samples, record.SamplesWithCT:
// If we're not tailing a segment we can ignore any samples records we see.
// This speeds up replay of the WAL by > 10x.
if !tail {
break
}
samples, err := dec.Samples(rec, samples[:0])
samples, err = dec.Samples(rec, samples[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
@ -534,12 +553,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if !w.sendExemplars {
break
}
// If we're not tailing a segment we can ignore any exemplars records we see.
// This speeds up replay of the WAL significantly.
if !tail {
break
}
exemplars, err := dec.Exemplars(rec, exemplars[:0])
exemplars, err = dec.Exemplars(rec, exemplars[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
@ -551,10 +565,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if !w.sendHistograms {
break
}
if !tail {
break
}
histograms, err := dec.HistogramSamples(rec, histograms[:0])
histograms, err = dec.HistogramSamples(rec, histograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
@ -579,10 +590,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if !w.sendHistograms {
break
}
if !tail {
break
}
floatHistograms, err := dec.FloatHistogramSamples(rec, floatHistograms[:0])
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
@ -621,15 +629,14 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
// We're not interested in other types of records.
}
}
if err := r.Err(); err != nil {
if err := r.Err(); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("segment %d: %w", segmentNum, err)
}
return nil
}
// Go through all series in a segment updating the segmentNum, so we can delete older series.
// Used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {
// readSegmentForGC goes through all series in a segment updating the segmentNum, so we can delete older series.
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int) error {
var (
dec = record.NewDecoder(labels.NewSymbolTable()) // Needed for decoding; labels do not outlive this function.
series []record.RefSeries
@ -655,7 +662,7 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error
// We're only interested in series.
}
}
if err := r.Err(); err != nil {
if err := r.Err(); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("segment %d: %w", segmentNum, err)
}
return nil
@ -666,7 +673,7 @@ func (w *Watcher) SetStartTime(t time.Time) {
w.startTimestamp = timestamp.FromTime(t)
}
type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) error
type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int) error
// Read all the series records from a Checkpoint directory.
func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error {
@ -693,7 +700,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err
}
r := NewLiveReader(w.logger, w.readerMetrics, sr)
err = readFn(w, r, index, false)
err = readFn(w, r, index)
sr.Close()
if err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("readSegment: %w", err)

View file

@ -14,6 +14,8 @@ package wlog
import (
"fmt"
"log/slog"
"math"
"math/rand"
"os"
"path"
@ -27,9 +29,12 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/prometheus/prometheus/util/testrecord"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/compression"
"github.com/prometheus/prometheus/tsdb/record"
)
@ -123,7 +128,7 @@ func (wtm *writeToMock) SeriesReset(index int) {
}
}
func (wtm *writeToMock) checkNumSeries() int {
func (wtm *writeToMock) seriesStored() int {
wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock()
return len(wtm.seriesSegmentIndexes)
@ -142,7 +147,7 @@ func TestTailSamples(t *testing.T) {
const samplesCount = 250
const exemplarsCount = 25
const histogramsCount = 50
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
now := time.Now()
@ -264,8 +269,7 @@ func TestTailSamples(t *testing.T) {
require.NoError(t, err)
reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment)
// Use tail true so we can ensure we got the right number of samples.
watcher.readSegment(reader, i, true)
require.NoError(t, watcher.readSegment(reader, i))
require.NoError(t, segment.Close())
}
@ -274,9 +278,9 @@ func TestTailSamples(t *testing.T) {
expectedExemplars := seriesCount * exemplarsCount
expectedHistograms := seriesCount * histogramsCount * 2
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() >= expectedSeries
return wt.seriesStored() >= expectedSeries
})
require.Equal(t, expectedSeries, wt.checkNumSeries(), "did not receive the expected number of series")
require.Equal(t, expectedSeries, wt.seriesStored(), "did not receive the expected number of series")
require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples")
require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars")
require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms")
@ -290,7 +294,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
const seriesCount = 10
const samplesCount = 250
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
wdir := path.Join(dir, "wal")
@ -344,7 +348,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
expected := seriesCount
require.Eventually(t, func() bool {
return wt.checkNumSeries() == expected
return wt.seriesStored() == expected
}, 20*time.Second, 1*time.Second)
watcher.Stop()
})
@ -358,7 +362,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
const seriesCount = 10
const samplesCount = 250
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
@ -434,7 +438,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
expected := seriesCount * 2
require.Eventually(t, func() bool {
return wt.checkNumSeries() == expected
return wt.seriesStored() == expected
}, 10*time.Second, 1*time.Second)
watcher.Stop()
})
@ -446,7 +450,7 @@ func TestReadCheckpoint(t *testing.T) {
const seriesCount = 10
const samplesCount = 250
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
@ -504,10 +508,10 @@ func TestReadCheckpoint(t *testing.T) {
expectedSeries := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() >= expectedSeries
return wt.seriesStored() >= expectedSeries
})
watcher.Stop()
require.Equal(t, expectedSeries, wt.checkNumSeries())
require.Equal(t, expectedSeries, wt.seriesStored())
})
}
}
@ -519,7 +523,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
const seriesCount = 20
const samplesCount = 300
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
dir := t.TempDir()
@ -590,11 +594,11 @@ func TestCheckpointSeriesReset(t *testing.T) {
const seriesCount = 20
const samplesCount = 350
testCases := []struct {
compress CompressionType
compress compression.Type
segments int
}{
{compress: CompressionNone, segments: 14},
{compress: CompressionSnappy, segments: 13},
{compress: compression.None, segments: 14},
{compress: compression.Snappy, segments: 13},
}
for _, tc := range testCases {
@ -647,10 +651,10 @@ func TestCheckpointSeriesReset(t *testing.T) {
expected := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() >= expected
return wt.seriesStored() >= expected
})
require.Eventually(t, func() bool {
return wt.checkNumSeries() == seriesCount
return wt.seriesStored() == seriesCount
}, 10*time.Second, 1*time.Second)
_, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(_ chunks.HeadSeriesRef) bool { return true }, 0)
@ -669,7 +673,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
// many series records you end up with and change the last Equals check accordingly
// or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10)
require.Eventually(t, func() bool {
return wt.checkNumSeries() == tc.segments
return wt.seriesStored() == tc.segments
}, 20*time.Second, 1*time.Second)
})
}
@ -681,7 +685,7 @@ func TestRun_StartupTime(t *testing.T) {
const seriesCount = 20
const samplesCount = 300
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(string(compress), func(t *testing.T) {
dir := t.TempDir()
@ -774,7 +778,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
const seriesCount = 10
const samplesCount = 50
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
t.Run(string(compress), func(t *testing.T) {
dir := t.TempDir()
@ -812,7 +816,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
// The watcher went through 00000000 and is tailing the next one.
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() == seriesCount
return wt.seriesStored() == seriesCount
})
// In the meantime, add some new segments in bulk.
@ -826,9 +830,120 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
require.NoError(t, g.Wait())
// All series and samples were read.
require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read.
require.Equal(t, (segmentsToRead+1)*seriesCount, wt.seriesStored()) // Series from 00000000 are also read.
require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended)
require.NoError(t, w.Close())
})
}
}
const (
seriesRecords = 100 // Targets * Scrapes
seriesPerRecord = 10 // New series per scrape.
sampleRecords = seriesRecords // Targets * Scrapes
)
var (
compressions = []compression.Type{compression.None, compression.Snappy, compression.Zstd}
dataCases = []testrecord.RefSamplesCase{
testrecord.Realistic1000Samples,
testrecord.Realistic1000WithCTSamples,
testrecord.WorstCase1000Samples,
}
)
/*
export bench=watcher-read-v2 && go test ./tsdb/wlog/... \
-run '^$' -bench '^BenchmarkWatcherReadSegment' \
-benchtime 5s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
*/
func BenchmarkWatcherReadSegment(b *testing.B) {
for _, compress := range compressions {
for _, data := range dataCases {
b.Run(fmt.Sprintf("compr=%v/data=%v", compress, data), func(b *testing.B) {
dir := b.TempDir()
wdir := path.Join(dir, "wal")
require.NoError(b, os.Mkdir(wdir, 0o777))
generateRealisticSegment(b, wdir, compress, data)
logger := promslog.NewNopLogger()
b.Run("func=readSegmentSeries", func(b *testing.B) {
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
// Required as WorstCase1000Samples have math.MinInt32 timestamps.
watcher.startTimestamp = math.MinInt32 - 1
watcher.SetMetrics()
// Validate our test data first.
testReadFn(b, wdir, 0, logger, watcher, (*Watcher).readSegmentSeries)
require.Equal(b, seriesRecords*seriesPerRecord, wt.seriesStored())
require.Equal(b, 0, wt.samplesAppended)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
testReadFn(b, wdir, 0, logger, watcher, (*Watcher).readSegmentSeries)
}
})
b.Run("func=readSegment", func(b *testing.B) {
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
// Required as WorstCase1000Samples have math.MinInt32 timestamps.
watcher.startTimestamp = math.MinInt32 - 1
watcher.SetMetrics()
// Validate our test data first.
testReadFn(b, wdir, 0, logger, watcher, (*Watcher).readSegment)
require.Equal(b, seriesRecords*seriesPerRecord, wt.seriesStored())
require.Equal(b, sampleRecords*1000, wt.samplesAppended)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
testReadFn(b, wdir, 0, logger, watcher, (*Watcher).readSegment)
}
})
})
}
}
}
func generateRealisticSegment(tb testing.TB, wdir string, compress compression.Type, samplesCase testrecord.RefSamplesCase) {
tb.Helper()
enc := record.Encoder{}
w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
require.NoError(tb, err)
defer w.Close()
// Generate fake data for WAL.
for i := range seriesRecords {
series := make([]record.RefSeries, seriesPerRecord)
for j := range seriesPerRecord {
series[j] = record.RefSeries{
Ref: chunks.HeadSeriesRef(i*seriesPerRecord + j),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", 0), "foo", "bar", "foo1", "bar2", "sdfsasdgfadsfgaegga", "dgsfzdsf§sfawf2"),
}
}
rec := enc.Series(series, nil)
require.NoError(tb, w.Log(rec))
}
for i := 0; i < sampleRecords; i++ {
rec := enc.Samples(testrecord.GenTestRefSamplesCase(tb, samplesCase), nil)
require.NoError(tb, w.Log(rec))
}
// Build segment.
require.NoError(tb, w.flushPage(true))
}
func testReadFn(tb testing.TB, wdir string, segNum int, logger *slog.Logger, watcher *Watcher, fn segmentReadFn) {
tb.Helper()
segment, err := OpenReadSegment(SegmentName(wdir, segNum))
require.NoError(tb, err)
r := NewLiveReader(logger, watcher.readerMetrics, segment)
require.NoError(tb, fn(watcher, r, segNum))
}

View file

@ -29,11 +29,11 @@ import (
"sync"
"time"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/tsdb/compression"
"github.com/prometheus/prometheus/tsdb/fileutil"
)
@ -169,24 +169,16 @@ func OpenReadSegment(fn string) (*Segment, error) {
return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil
}
type CompressionType string
const (
CompressionNone CompressionType = "none"
CompressionSnappy CompressionType = "snappy"
CompressionZstd CompressionType = "zstd"
)
// ParseCompressionType parses the two compression-related configuration values and returns the CompressionType. If
// compression is enabled but the compressType is unrecognized, we default to Snappy compression.
func ParseCompressionType(compress bool, compressType string) CompressionType {
func ParseCompressionType(compress bool, compressType string) compression.Type {
if compress {
if compressType == "zstd" {
return CompressionZstd
return compression.Zstd
}
return CompressionSnappy
return compression.Snappy
}
return CompressionNone
return compression.None
}
// WL is a write log that stores records in segment files.
@ -210,9 +202,9 @@ type WL struct {
stopc chan chan struct{}
actorc chan func()
closed bool // To allow calling Close() more than once without blocking.
compress CompressionType
compress compression.Type
cEnc *compression.Encoder
compressBuf []byte
zstdWriter *zstd.Encoder
WriteNotified WriteNotified
@ -309,13 +301,13 @@ func newWLMetrics(w *WL, r prometheus.Registerer) *wlMetrics {
}
// New returns a new WAL over the given directory.
func New(logger *slog.Logger, reg prometheus.Registerer, dir string, compress CompressionType) (*WL, error) {
func New(logger *slog.Logger, reg prometheus.Registerer, dir string, compress compression.Type) (*WL, error) {
return NewSize(logger, reg, dir, DefaultSegmentSize, compress)
}
// NewSize returns a new write log over the given directory.
// New segments are created with the specified size.
func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress CompressionType) (*WL, error) {
func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress compression.Type) (*WL, error) {
if segmentSize%pageSize != 0 {
return nil, errors.New("invalid segment size")
}
@ -326,13 +318,9 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment
logger = promslog.NewNopLogger()
}
var zstdWriter *zstd.Encoder
if compress == CompressionZstd {
var err error
zstdWriter, err = zstd.NewWriter(nil)
if err != nil {
return nil, err
}
cEnc, err := compression.NewEncoder()
if err != nil {
return nil, err
}
w := &WL{
@ -343,7 +331,7 @@ func NewSize(logger *slog.Logger, reg prometheus.Registerer, dir string, segment
actorc: make(chan func(), 100),
stopc: make(chan chan struct{}),
compress: compress,
zstdWriter: zstdWriter,
cEnc: cEnc,
}
prefix := "prometheus_tsdb_wal_"
if filepath.Base(dir) == WblDirName {
@ -382,22 +370,16 @@ func Open(logger *slog.Logger, dir string) (*WL, error) {
if logger == nil {
logger = promslog.NewNopLogger()
}
zstdWriter, err := zstd.NewWriter(nil)
if err != nil {
return nil, err
}
w := &WL{
dir: dir,
logger: logger,
zstdWriter: zstdWriter,
dir: dir,
logger: logger,
}
return w, nil
}
// CompressionType returns if compression is enabled on this WAL.
func (w *WL) CompressionType() CompressionType {
func (w *WL) CompressionType() compression.Type {
return w.compress
}
@ -705,7 +687,7 @@ func (w *WL) Log(recs ...[]byte) error {
// - the final record of a batch
// - the record is bigger than the page size
// - the current page is full.
func (w *WL) log(rec []byte, final bool) error {
func (w *WL) log(rec []byte, final bool) (err error) {
// When the last page flush failed the page will remain full.
// When the page is full, need to flush it before trying to add more records to it.
if w.page.full() {
@ -716,25 +698,12 @@ func (w *WL) log(rec []byte, final bool) error {
// Compress the record before calculating if a new segment is needed.
compressed := false
if w.compress == CompressionSnappy && len(rec) > 0 {
// If MaxEncodedLen is less than 0 the record is too large to be compressed.
if len(rec) > 0 && snappy.MaxEncodedLen(len(rec)) >= 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
w.compressBuf = w.compressBuf[:cap(w.compressBuf)]
w.compressBuf = snappy.Encode(w.compressBuf, rec)
if len(w.compressBuf) < len(rec) {
rec = w.compressBuf
compressed = true
}
}
} else if w.compress == CompressionZstd && len(rec) > 0 {
w.compressBuf = w.zstdWriter.EncodeAll(rec, w.compressBuf[:0])
if len(w.compressBuf) < len(rec) {
rec = w.compressBuf
compressed = true
}
w.compressBuf, compressed, err = w.cEnc.Encode(w.compress, rec, w.compressBuf)
if err != nil {
return err
}
if compressed {
rec = w.compressBuf
}
// If the record is too big to fit within the active page in the current
@ -773,9 +742,9 @@ func (w *WL) log(rec []byte, final bool) error {
typ = recMiddle
}
if compressed {
if w.compress == CompressionSnappy {
if w.compress == compression.Snappy {
typ |= snappyMask
} else if w.compress == CompressionZstd {
} else if w.compress == compression.Zstd {
typ |= zstdMask
}
}

View file

@ -29,6 +29,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"github.com/prometheus/prometheus/tsdb/compression"
"github.com/prometheus/prometheus/tsdb/fileutil"
)
@ -125,7 +127,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
// then corrupt a given record in a given segment.
// As a result we want a repaired WAL with given intact records.
segSize := 3 * pageSize
w, err := NewSize(nil, nil, dir, segSize, CompressionNone)
w, err := NewSize(nil, nil, dir, segSize, compression.None)
require.NoError(t, err)
var records [][]byte
@ -150,7 +152,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
require.NoError(t, f.Close())
w, err = NewSize(nil, nil, dir, segSize, CompressionNone)
w, err = NewSize(nil, nil, dir, segSize, compression.None)
require.NoError(t, err)
defer w.Close()
@ -222,7 +224,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
// Produce a WAL with a two segments of 3 pages with 3 records each,
// so when we truncate the file we're guaranteed to split a record.
{
w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone)
w, err := NewSize(logger, nil, dir, segmentSize, compression.None)
require.NoError(t, err)
for i := 0; i < 18; i++ {
@ -293,7 +295,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
err = sr.Close()
require.NoError(t, err)
w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone)
w, err := NewSize(logger, nil, dir, segmentSize, compression.None)
require.NoError(t, err)
err = w.Repair(corruptionErr)
@ -336,7 +338,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
// TestClose ensures that calling Close more than once doesn't panic and doesn't block.
func TestClose(t *testing.T) {
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, pageSize, CompressionNone)
w, err := NewSize(nil, nil, dir, pageSize, compression.None)
require.NoError(t, err)
require.NoError(t, w.Close())
require.Error(t, w.Close())
@ -349,7 +351,7 @@ func TestSegmentMetric(t *testing.T) {
)
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, segmentSize, CompressionNone)
w, err := NewSize(nil, nil, dir, segmentSize, compression.None)
require.NoError(t, err)
initialSegment := client_testutil.ToFloat64(w.metrics.currentSegment)
@ -368,7 +370,7 @@ func TestSegmentMetric(t *testing.T) {
}
func TestCompression(t *testing.T) {
bootstrap := func(compressed CompressionType) string {
bootstrap := func(compressed compression.Type) string {
const (
segmentSize = pageSize
recordSize = (pageSize / 2) - recordHeaderSize
@ -396,10 +398,10 @@ func TestCompression(t *testing.T) {
}
}()
dirUnCompressed := bootstrap(CompressionNone)
dirUnCompressed := bootstrap(compression.None)
tmpDirs = append(tmpDirs, dirUnCompressed)
for _, compressionType := range []CompressionType{CompressionSnappy, CompressionZstd} {
for _, compressionType := range []compression.Type{compression.Snappy, compression.Zstd} {
dirCompressed := bootstrap(compressionType)
tmpDirs = append(tmpDirs, dirCompressed)
@ -443,7 +445,7 @@ func TestLogPartialWrite(t *testing.T) {
t.Run(testName, func(t *testing.T) {
dirPath := t.TempDir()
w, err := NewSize(nil, nil, dirPath, segmentSize, CompressionNone)
w, err := NewSize(nil, nil, dirPath, segmentSize, compression.None)
require.NoError(t, err)
// Replace the underlying segment file with a mocked one that injects a failure.
@ -510,7 +512,7 @@ func (f *faultySegmentFile) Write(p []byte) (int, error) {
}
func BenchmarkWAL_LogBatched(b *testing.B) {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) {
dir := b.TempDir()
@ -540,7 +542,7 @@ func BenchmarkWAL_LogBatched(b *testing.B) {
}
func BenchmarkWAL_Log(b *testing.B) {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) {
dir := b.TempDir()
@ -567,7 +569,7 @@ func TestUnregisterMetrics(t *testing.T) {
reg := prometheus.NewRegistry()
for i := 0; i < 2; i++ {
wl, err := New(promslog.NewNopLogger(), reg, t.TempDir(), CompressionNone)
wl, err := New(promslog.NewNopLogger(), reg, t.TempDir(), compression.None)
require.NoError(t, err)
require.NoError(t, wl.Close())
}

80
util/testrecord/record.go Normal file
View file

@ -0,0 +1,80 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testrecord
import (
"math"
"testing"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
)
type RefSamplesCase string
const (
Realistic1000Samples RefSamplesCase = "real1000"
Realistic1000WithCTSamples RefSamplesCase = "real1000-ct"
WorstCase1000Samples RefSamplesCase = "worst1000"
)
func GenTestRefSamplesCase(t testing.TB, c RefSamplesCase) []record.RefSample {
t.Helper()
ret := make([]record.RefSample, 1e3)
switch c {
case Realistic1000Samples:
for i := range ret {
ret[i].Ref = chunks.HeadSeriesRef(i)
ret[i].T = 12423423
ret[i].V = highVarianceFloat(i)
}
case Realistic1000WithCTSamples:
for i := range ret {
ret[i].Ref = chunks.HeadSeriesRef(i)
// For cumulative or gauges, typically in one record from
// scrape we would have exactly same CT and T values.
ret[i].CT = 11234567
ret[i].T = 12423423
ret[i].V = highVarianceFloat(i)
}
case WorstCase1000Samples:
for i := range ret {
ret[i].Ref = chunks.HeadSeriesRef(i)
// Worst case is when the values are significantly different
// to each other which breaks delta encoding.
ret[i].CT = highVarianceInt(i)
ret[i].T = highVarianceInt(i)
ret[i].V = highVarianceFloat(i)
}
default:
t.Fatal("unknown case", c)
}
return ret
}
func highVarianceInt(i int) int64 {
if i%2 == 0 {
return math.MinInt32
}
return math.MaxInt32
}
func highVarianceFloat(i int) float64 {
if i%2 == 0 {
return math.SmallestNonzeroFloat32
}
return math.MaxFloat32
}