mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Add Zstandard compression option for wlog (#11666)
Snappy remains as the default compression but there is now a flag to switch the compression algorithm. Signed-off-by: Justin Lei <justin.lei@grafana.com>
This commit is contained in:
parent
95606830fd
commit
32d87282ad
|
@ -71,6 +71,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/tracing"
|
"github.com/prometheus/prometheus/tracing"
|
||||||
"github.com/prometheus/prometheus/tsdb"
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
"github.com/prometheus/prometheus/tsdb/agent"
|
"github.com/prometheus/prometheus/tsdb/agent"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||||
"github.com/prometheus/prometheus/util/documentcli"
|
"github.com/prometheus/prometheus/util/documentcli"
|
||||||
"github.com/prometheus/prometheus/util/logging"
|
"github.com/prometheus/prometheus/util/logging"
|
||||||
prom_runtime "github.com/prometheus/prometheus/util/runtime"
|
prom_runtime "github.com/prometheus/prometheus/util/runtime"
|
||||||
|
@ -334,6 +335,9 @@ func main() {
|
||||||
serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL.").
|
serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL.").
|
||||||
Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression)
|
Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression)
|
||||||
|
|
||||||
|
serverOnlyFlag(a, "storage.tsdb.wal-compression-type", "Compression algorithm for the tsdb WAL.").
|
||||||
|
Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.tsdb.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd))
|
||||||
|
|
||||||
serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental.").
|
serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental.").
|
||||||
Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize)
|
Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize)
|
||||||
|
|
||||||
|
@ -350,6 +354,9 @@ func main() {
|
||||||
agentOnlyFlag(a, "storage.agent.wal-compression", "Compress the agent WAL.").
|
agentOnlyFlag(a, "storage.agent.wal-compression", "Compress the agent WAL.").
|
||||||
Default("true").BoolVar(&cfg.agent.WALCompression)
|
Default("true").BoolVar(&cfg.agent.WALCompression)
|
||||||
|
|
||||||
|
agentOnlyFlag(a, "storage.agent.wal-compression-type", "Compression algorithm for the agent WAL.").
|
||||||
|
Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.agent.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd))
|
||||||
|
|
||||||
agentOnlyFlag(a, "storage.agent.wal-truncate-frequency",
|
agentOnlyFlag(a, "storage.agent.wal-truncate-frequency",
|
||||||
"The frequency at which to truncate the WAL and remove old data.").
|
"The frequency at which to truncate the WAL and remove old data.").
|
||||||
Hidden().PlaceHolder("<duration>").SetValue(&cfg.agent.TruncateFrequency)
|
Hidden().PlaceHolder("<duration>").SetValue(&cfg.agent.TruncateFrequency)
|
||||||
|
@ -1546,6 +1553,7 @@ type tsdbOptions struct {
|
||||||
MaxBytes units.Base2Bytes
|
MaxBytes units.Base2Bytes
|
||||||
NoLockfile bool
|
NoLockfile bool
|
||||||
WALCompression bool
|
WALCompression bool
|
||||||
|
WALCompressionType string
|
||||||
HeadChunksWriteQueueSize int
|
HeadChunksWriteQueueSize int
|
||||||
SamplesPerChunk int
|
SamplesPerChunk int
|
||||||
StripeSize int
|
StripeSize int
|
||||||
|
@ -1566,7 +1574,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
||||||
MaxBytes: int64(opts.MaxBytes),
|
MaxBytes: int64(opts.MaxBytes),
|
||||||
NoLockfile: opts.NoLockfile,
|
NoLockfile: opts.NoLockfile,
|
||||||
AllowOverlappingCompaction: true,
|
AllowOverlappingCompaction: true,
|
||||||
WALCompression: opts.WALCompression,
|
WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType),
|
||||||
HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize,
|
HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize,
|
||||||
SamplesPerChunk: opts.SamplesPerChunk,
|
SamplesPerChunk: opts.SamplesPerChunk,
|
||||||
StripeSize: opts.StripeSize,
|
StripeSize: opts.StripeSize,
|
||||||
|
@ -1585,6 +1593,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
||||||
type agentOptions struct {
|
type agentOptions struct {
|
||||||
WALSegmentSize units.Base2Bytes
|
WALSegmentSize units.Base2Bytes
|
||||||
WALCompression bool
|
WALCompression bool
|
||||||
|
WALCompressionType string
|
||||||
StripeSize int
|
StripeSize int
|
||||||
TruncateFrequency model.Duration
|
TruncateFrequency model.Duration
|
||||||
MinWALTime, MaxWALTime model.Duration
|
MinWALTime, MaxWALTime model.Duration
|
||||||
|
@ -1594,7 +1603,7 @@ type agentOptions struct {
|
||||||
func (opts agentOptions) ToAgentOptions() agent.Options {
|
func (opts agentOptions) ToAgentOptions() agent.Options {
|
||||||
return agent.Options{
|
return agent.Options{
|
||||||
WALSegmentSize: int(opts.WALSegmentSize),
|
WALSegmentSize: int(opts.WALSegmentSize),
|
||||||
WALCompression: opts.WALCompression,
|
WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType),
|
||||||
StripeSize: opts.StripeSize,
|
StripeSize: opts.StripeSize,
|
||||||
TruncateFrequency: time.Duration(opts.TruncateFrequency),
|
TruncateFrequency: time.Duration(opts.TruncateFrequency),
|
||||||
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
|
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -34,6 +34,7 @@ require (
|
||||||
github.com/hetznercloud/hcloud-go v1.47.0
|
github.com/hetznercloud/hcloud-go v1.47.0
|
||||||
github.com/ionos-cloud/sdk-go/v6 v6.1.7
|
github.com/ionos-cloud/sdk-go/v6 v6.1.7
|
||||||
github.com/json-iterator/go v1.1.12
|
github.com/json-iterator/go v1.1.12
|
||||||
|
github.com/klauspost/compress v1.15.12
|
||||||
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b
|
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b
|
||||||
github.com/linode/linodego v1.17.2
|
github.com/linode/linodego v1.17.2
|
||||||
github.com/miekg/dns v1.1.54
|
github.com/miekg/dns v1.1.54
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -507,6 +507,8 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
|
||||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||||
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||||
|
github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM=
|
||||||
|
github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
|
||||||
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
|
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
|
||||||
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
|
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
|
||||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||||
|
|
|
@ -65,8 +65,8 @@ type Options struct {
|
||||||
// WALSegmentSize > 0, segment size is WALSegmentSize.
|
// WALSegmentSize > 0, segment size is WALSegmentSize.
|
||||||
WALSegmentSize int
|
WALSegmentSize int
|
||||||
|
|
||||||
// WALCompression will turn on Snappy compression for records on the WAL.
|
// WALCompression configures the compression type to use on records in the WAL.
|
||||||
WALCompression bool
|
WALCompression wlog.CompressionType
|
||||||
|
|
||||||
// StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance.
|
// StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance.
|
||||||
StripeSize int
|
StripeSize int
|
||||||
|
@ -87,7 +87,7 @@ type Options struct {
|
||||||
func DefaultOptions() *Options {
|
func DefaultOptions() *Options {
|
||||||
return &Options{
|
return &Options{
|
||||||
WALSegmentSize: wlog.DefaultSegmentSize,
|
WALSegmentSize: wlog.DefaultSegmentSize,
|
||||||
WALCompression: false,
|
WALCompression: wlog.CompressionNone,
|
||||||
StripeSize: tsdb.DefaultStripeSize,
|
StripeSize: tsdb.DefaultStripeSize,
|
||||||
TruncateFrequency: DefaultTruncateFrequency,
|
TruncateFrequency: DefaultTruncateFrequency,
|
||||||
MinWALTime: DefaultMinWALTime,
|
MinWALTime: DefaultMinWALTime,
|
||||||
|
@ -318,6 +318,10 @@ func validateOptions(opts *Options) *Options {
|
||||||
opts.WALSegmentSize = wlog.DefaultSegmentSize
|
opts.WALSegmentSize = wlog.DefaultSegmentSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opts.WALCompression == "" {
|
||||||
|
opts.WALCompression = wlog.CompressionNone
|
||||||
|
}
|
||||||
|
|
||||||
// Revert Stripesize to DefaultStripsize if Stripsize is either 0 or not a power of 2.
|
// Revert Stripesize to DefaultStripsize if Stripsize is either 0 or not a power of 2.
|
||||||
if opts.StripeSize <= 0 || ((opts.StripeSize & (opts.StripeSize - 1)) != 0) {
|
if opts.StripeSize <= 0 || ((opts.StripeSize & (opts.StripeSize - 1)) != 0) {
|
||||||
opts.StripeSize = tsdb.DefaultStripeSize
|
opts.StripeSize = tsdb.DefaultStripeSize
|
||||||
|
|
|
@ -39,6 +39,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSplitByRange(t *testing.T) {
|
func TestSplitByRange(t *testing.T) {
|
||||||
|
@ -1306,7 +1307,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
|
||||||
func TestHeadCompactionWithHistograms(t *testing.T) {
|
func TestHeadCompactionWithHistograms(t *testing.T) {
|
||||||
for _, floatTest := range []bool{true, false} {
|
for _, floatTest := range []bool{true, false} {
|
||||||
t.Run(fmt.Sprintf("float=%t", floatTest), func(t *testing.T) {
|
t.Run(fmt.Sprintf("float=%t", floatTest), func(t *testing.T) {
|
||||||
head, _ := newTestHead(t, DefaultBlockDuration, false, false)
|
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
|
||||||
require.NoError(t, head.Init(0))
|
require.NoError(t, head.Init(0))
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
|
@ -1485,11 +1486,11 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
|
||||||
c.numBuckets,
|
c.numBuckets,
|
||||||
),
|
),
|
||||||
func(t *testing.T) {
|
func(t *testing.T) {
|
||||||
oldHead, _ := newTestHead(t, DefaultBlockDuration, false, false)
|
oldHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, oldHead.Close())
|
require.NoError(t, oldHead.Close())
|
||||||
})
|
})
|
||||||
sparseHead, _ := newTestHead(t, DefaultBlockDuration, false, false)
|
sparseHead, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, sparseHead.Close())
|
require.NoError(t, sparseHead.Close())
|
||||||
})
|
})
|
||||||
|
|
|
@ -77,8 +77,8 @@ func DefaultOptions() *Options {
|
||||||
MaxBlockDuration: DefaultBlockDuration,
|
MaxBlockDuration: DefaultBlockDuration,
|
||||||
NoLockfile: false,
|
NoLockfile: false,
|
||||||
AllowOverlappingCompaction: true,
|
AllowOverlappingCompaction: true,
|
||||||
WALCompression: false,
|
|
||||||
SamplesPerChunk: DefaultSamplesPerChunk,
|
SamplesPerChunk: DefaultSamplesPerChunk,
|
||||||
|
WALCompression: wlog.CompressionNone,
|
||||||
StripeSize: DefaultStripeSize,
|
StripeSize: DefaultStripeSize,
|
||||||
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
|
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
|
||||||
IsolationDisabled: defaultIsolationDisabled,
|
IsolationDisabled: defaultIsolationDisabled,
|
||||||
|
@ -123,8 +123,8 @@ type Options struct {
|
||||||
// For Prometheus, this will always be true.
|
// For Prometheus, this will always be true.
|
||||||
AllowOverlappingCompaction bool
|
AllowOverlappingCompaction bool
|
||||||
|
|
||||||
// WALCompression will turn on Snappy compression for records on the WAL.
|
// WALCompression configures the compression type to use on records in the WAL.
|
||||||
WALCompression bool
|
WALCompression wlog.CompressionType
|
||||||
|
|
||||||
// Maximum number of CPUs that can simultaneously processes WAL replay.
|
// Maximum number of CPUs that can simultaneously processes WAL replay.
|
||||||
// If it is <=0, then GOMAXPROCS is used.
|
// If it is <=0, then GOMAXPROCS is used.
|
||||||
|
|
|
@ -1965,7 +1965,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
|
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
|
||||||
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), false)
|
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var enc record.Encoder
|
var enc record.Encoder
|
||||||
|
@ -2007,7 +2007,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
||||||
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
|
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
|
||||||
|
|
||||||
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
|
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
|
||||||
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), false)
|
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var enc record.Encoder
|
var enc record.Encoder
|
||||||
|
@ -2408,7 +2408,7 @@ func TestDBReadOnly(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add head to test DBReadOnly WAL reading capabilities.
|
// Add head to test DBReadOnly WAL reading capabilities.
|
||||||
w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), true)
|
w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir)
|
h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir)
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
|
@ -2972,7 +2972,7 @@ func TestCompactHead(t *testing.T) {
|
||||||
NoLockfile: true,
|
NoLockfile: true,
|
||||||
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
|
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
|
||||||
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
|
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
|
||||||
WALCompression: true,
|
WALCompression: wlog.CompressionSnappy,
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil)
|
db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil)
|
||||||
|
@ -3912,7 +3912,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) {
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
numSamples := 10000
|
numSamples := 10000
|
||||||
hb, w := newTestHead(t, int64(numSamples)*10, false, false)
|
hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false)
|
||||||
|
|
||||||
// Add some series so we can append metadata to them.
|
// Add some series so we can append metadata to them.
|
||||||
app := hb.Appender(ctx)
|
app := hb.Appender(ctx)
|
||||||
|
@ -5099,7 +5099,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
|
||||||
resetMmapToOriginal() // We neet to reset because new duplicate chunks can be written above.
|
resetMmapToOriginal() // We neet to reset because new duplicate chunks can be written above.
|
||||||
|
|
||||||
// Removing m-map markers in WBL by rewriting it.
|
// Removing m-map markers in WBL by rewriting it.
|
||||||
newWbl, err := wlog.New(log.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), false)
|
newWbl, err := wlog.New(log.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
sr, err := wlog.NewSegmentsReader(originalWblDir)
|
sr, err := wlog.NewSegmentsReader(originalWblDir)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -52,7 +52,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (*Head, *wlog.WL) {
|
func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
|
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -79,7 +79,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (
|
||||||
|
|
||||||
func BenchmarkCreateSeries(b *testing.B) {
|
func BenchmarkCreateSeries(b *testing.B) {
|
||||||
series := genSeries(b.N, 10, 0, 0)
|
series := genSeries(b.N, 10, 0, 0)
|
||||||
h, _ := newTestHead(b, 10000, false, false)
|
h, _ := newTestHead(b, 10000, wlog.CompressionNone, false)
|
||||||
b.Cleanup(func() {
|
b.Cleanup(func() {
|
||||||
require.NoError(b, h.Close())
|
require.NoError(b, h.Close())
|
||||||
})
|
})
|
||||||
|
@ -100,7 +100,7 @@ func BenchmarkHeadAppender_Append_Commit_ExistingSeries(b *testing.B) {
|
||||||
b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) {
|
b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) {
|
||||||
for _, samplesPerAppend := range []int64{1, 2, 5, 100} {
|
for _, samplesPerAppend := range []int64{1, 2, 5, 100} {
|
||||||
b.Run(fmt.Sprintf("%d samples per append", samplesPerAppend), func(b *testing.B) {
|
b.Run(fmt.Sprintf("%d samples per append", samplesPerAppend), func(b *testing.B) {
|
||||||
h, _ := newTestHead(b, 10000, false, false)
|
h, _ := newTestHead(b, 10000, wlog.CompressionNone, false)
|
||||||
b.Cleanup(func() { require.NoError(b, h.Close()) })
|
b.Cleanup(func() { require.NoError(b, h.Close()) })
|
||||||
|
|
||||||
ts := int64(1000)
|
ts := int64(1000)
|
||||||
|
@ -245,7 +245,7 @@ func BenchmarkLoadWAL(b *testing.B) {
|
||||||
func(b *testing.B) {
|
func(b *testing.B) {
|
||||||
dir := b.TempDir()
|
dir := b.TempDir()
|
||||||
|
|
||||||
w, err := wlog.New(nil, nil, dir, false)
|
w, err := wlog.New(nil, nil, dir, wlog.CompressionNone)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
|
|
||||||
// Write series.
|
// Write series.
|
||||||
|
@ -337,7 +337,7 @@ func BenchmarkLoadWAL(b *testing.B) {
|
||||||
// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the
|
// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the
|
||||||
// returned results are correct.
|
// returned results are correct.
|
||||||
func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
|
func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
|
||||||
head, _ := newTestHead(t, DefaultBlockDuration, false, false)
|
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -527,8 +527,8 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead_ReadWAL(t *testing.T) {
|
func TestHead_ReadWAL(t *testing.T) {
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
entries := []interface{}{
|
entries := []interface{}{
|
||||||
[]record.RefSeries{
|
[]record.RefSeries{
|
||||||
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
||||||
|
@ -609,7 +609,7 @@ func TestHead_ReadWAL(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead_WALMultiRef(t *testing.T) {
|
func TestHead_WALMultiRef(t *testing.T) {
|
||||||
head, w := newTestHead(t, 1000, false, false)
|
head, w := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
|
|
||||||
require.NoError(t, head.Init(0))
|
require.NoError(t, head.Init(0))
|
||||||
|
|
||||||
|
@ -644,7 +644,7 @@ func TestHead_WALMultiRef(t *testing.T) {
|
||||||
require.NotEqual(t, ref1, ref2, "Refs are the same")
|
require.NotEqual(t, ref1, ref2, "Refs are the same")
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
|
|
||||||
w, err = wlog.New(nil, nil, w.Dir(), false)
|
w, err = wlog.New(nil, nil, w.Dir(), wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
|
@ -669,7 +669,7 @@ func TestHead_WALMultiRef(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead_ActiveAppenders(t *testing.T) {
|
func TestHead_ActiveAppenders(t *testing.T) {
|
||||||
head, _ := newTestHead(t, 1000, false, false)
|
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer head.Close()
|
defer head.Close()
|
||||||
|
|
||||||
require.NoError(t, head.Init(0))
|
require.NoError(t, head.Init(0))
|
||||||
|
@ -702,14 +702,14 @@ func TestHead_ActiveAppenders(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead_UnknownWALRecord(t *testing.T) {
|
func TestHead_UnknownWALRecord(t *testing.T) {
|
||||||
head, w := newTestHead(t, 1000, false, false)
|
head, w := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
w.Log([]byte{255, 42})
|
w.Log([]byte{255, 42})
|
||||||
require.NoError(t, head.Init(0))
|
require.NoError(t, head.Init(0))
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead_Truncate(t *testing.T) {
|
func TestHead_Truncate(t *testing.T) {
|
||||||
h, _ := newTestHead(t, 1000, false, false)
|
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -847,8 +847,8 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
|
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
entries := []interface{}{
|
entries := []interface{}{
|
||||||
[]record.RefSeries{
|
[]record.RefSeries{
|
||||||
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
||||||
|
@ -927,8 +927,8 @@ func TestHeadDeleteSimple(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
head, w := newTestHead(t, 1000, compress, false)
|
head, w := newTestHead(t, 1000, compress, false)
|
||||||
require.NoError(t, head.Init(0))
|
require.NoError(t, head.Init(0))
|
||||||
|
@ -1011,7 +1011,7 @@ func TestHeadDeleteSimple(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeleteUntilCurMax(t *testing.T) {
|
func TestDeleteUntilCurMax(t *testing.T) {
|
||||||
hb, _ := newTestHead(t, 1000000, false, false)
|
hb, _ := newTestHead(t, 1000000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, hb.Close())
|
require.NoError(t, hb.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -1064,7 +1064,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
|
||||||
numSamples := 10000
|
numSamples := 10000
|
||||||
|
|
||||||
// Enough samples to cause a checkpoint.
|
// Enough samples to cause a checkpoint.
|
||||||
hb, w := newTestHead(t, int64(numSamples)*10, false, false)
|
hb, w := newTestHead(t, int64(numSamples)*10, wlog.CompressionNone, false)
|
||||||
|
|
||||||
for i := 0; i < numSamples; i++ {
|
for i := 0; i < numSamples; i++ {
|
||||||
app := hb.Appender(context.Background())
|
app := hb.Appender(context.Background())
|
||||||
|
@ -1156,7 +1156,7 @@ func TestDelete_e2e(t *testing.T) {
|
||||||
seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{}
|
seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{}
|
||||||
}
|
}
|
||||||
|
|
||||||
hb, _ := newTestHead(t, 100000, false, false)
|
hb, _ := newTestHead(t, 100000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, hb.Close())
|
require.NoError(t, hb.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -1506,7 +1506,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
|
||||||
func TestGCChunkAccess(t *testing.T) {
|
func TestGCChunkAccess(t *testing.T) {
|
||||||
// Put a chunk, select it. GC it and then access it.
|
// Put a chunk, select it. GC it and then access it.
|
||||||
const chunkRange = 1000
|
const chunkRange = 1000
|
||||||
h, _ := newTestHead(t, chunkRange, false, false)
|
h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -1565,7 +1565,7 @@ func TestGCChunkAccess(t *testing.T) {
|
||||||
func TestGCSeriesAccess(t *testing.T) {
|
func TestGCSeriesAccess(t *testing.T) {
|
||||||
// Put a series, select it. GC it and then access it.
|
// Put a series, select it. GC it and then access it.
|
||||||
const chunkRange = 1000
|
const chunkRange = 1000
|
||||||
h, _ := newTestHead(t, chunkRange, false, false)
|
h, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -1624,7 +1624,7 @@ func TestGCSeriesAccess(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
|
func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
|
||||||
h, _ := newTestHead(t, 1000, false, false)
|
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -1654,7 +1654,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
|
func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
|
||||||
h, _ := newTestHead(t, 1000, false, false)
|
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -1685,8 +1685,8 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead_LogRollback(t *testing.T) {
|
func TestHead_LogRollback(t *testing.T) {
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
h, w := newTestHead(t, 1000, compress, false)
|
h, w := newTestHead(t, 1000, compress, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
|
@ -1743,8 +1743,8 @@ func TestWalRepair_DecodingError(t *testing.T) {
|
||||||
5,
|
5,
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
// Fill the wal and corrupt it.
|
// Fill the wal and corrupt it.
|
||||||
|
@ -1812,7 +1812,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
|
||||||
walDir := filepath.Join(dir, "wal")
|
walDir := filepath.Join(dir, "wal")
|
||||||
// Fill the chunk segments and corrupt it.
|
// Fill the chunk segments and corrupt it.
|
||||||
{
|
{
|
||||||
w, err := wlog.New(nil, nil, walDir, false)
|
w, err := wlog.New(nil, nil, walDir, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
|
@ -1880,7 +1880,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewWalSegmentOnTruncate(t *testing.T) {
|
func TestNewWalSegmentOnTruncate(t *testing.T) {
|
||||||
h, wal := newTestHead(t, 1000, false, false)
|
h, wal := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -1910,7 +1910,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAddDuplicateLabelName(t *testing.T) {
|
func TestAddDuplicateLabelName(t *testing.T) {
|
||||||
h, _ := newTestHead(t, 1000, false, false)
|
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -1993,7 +1993,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test isolation without restart of Head.
|
// Test isolation without restart of Head.
|
||||||
hb, _ := newTestHead(t, 1000, false, false)
|
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
i := addSamples(hb)
|
i := addSamples(hb)
|
||||||
testIsolation(hb, i)
|
testIsolation(hb, i)
|
||||||
|
|
||||||
|
@ -2055,11 +2055,11 @@ func TestMemSeriesIsolation(t *testing.T) {
|
||||||
require.NoError(t, hb.Close())
|
require.NoError(t, hb.Close())
|
||||||
|
|
||||||
// Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay.
|
// Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay.
|
||||||
hb, w := newTestHead(t, 1000, false, false)
|
hb, w := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
i = addSamples(hb)
|
i = addSamples(hb)
|
||||||
require.NoError(t, hb.Close())
|
require.NoError(t, hb.Close())
|
||||||
|
|
||||||
wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, false)
|
wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
opts.ChunkRange = 1000
|
opts.ChunkRange = 1000
|
||||||
|
@ -2108,7 +2108,7 @@ func TestIsolationRollback(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rollback after a failed append and test if the low watermark has progressed anyway.
|
// Rollback after a failed append and test if the low watermark has progressed anyway.
|
||||||
hb, _ := newTestHead(t, 1000, false, false)
|
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, hb.Close())
|
require.NoError(t, hb.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -2139,7 +2139,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) {
|
||||||
t.Skip("skipping test since tsdb isolation is disabled")
|
t.Skip("skipping test since tsdb isolation is disabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
hb, _ := newTestHead(t, 1000, false, false)
|
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, hb.Close())
|
require.NoError(t, hb.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -2176,7 +2176,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
|
||||||
t.Skip("skipping test since tsdb isolation is disabled")
|
t.Skip("skipping test since tsdb isolation is disabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
h, _ := newTestHead(t, 1000, false, false)
|
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -2207,7 +2207,7 @@ func TestIsolationWithoutAdd(t *testing.T) {
|
||||||
t.Skip("skipping test since tsdb isolation is disabled")
|
t.Skip("skipping test since tsdb isolation is disabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
hb, _ := newTestHead(t, 1000, false, false)
|
hb, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, hb.Close())
|
require.NoError(t, hb.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -2302,7 +2302,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func testHeadSeriesChunkRace(t *testing.T) {
|
func testHeadSeriesChunkRace(t *testing.T) {
|
||||||
h, _ := newTestHead(t, 1000, false, false)
|
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -2337,7 +2337,7 @@ func testHeadSeriesChunkRace(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
|
func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
|
||||||
head, _ := newTestHead(t, 1000, false, false)
|
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -2397,7 +2397,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadLabelValuesWithMatchers(t *testing.T) {
|
func TestHeadLabelValuesWithMatchers(t *testing.T) {
|
||||||
head, _ := newTestHead(t, 1000, false, false)
|
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
t.Cleanup(func() { require.NoError(t, head.Close()) })
|
t.Cleanup(func() { require.NoError(t, head.Close()) })
|
||||||
|
|
||||||
app := head.Appender(context.Background())
|
app := head.Appender(context.Background())
|
||||||
|
@ -2456,7 +2456,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadLabelNamesWithMatchers(t *testing.T) {
|
func TestHeadLabelNamesWithMatchers(t *testing.T) {
|
||||||
head, _ := newTestHead(t, 1000, false, false)
|
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -2524,7 +2524,7 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestErrReuseAppender(t *testing.T) {
|
func TestErrReuseAppender(t *testing.T) {
|
||||||
head, _ := newTestHead(t, 1000, false, false)
|
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -2560,7 +2560,7 @@ func TestErrReuseAppender(t *testing.T) {
|
||||||
|
|
||||||
func TestHeadMintAfterTruncation(t *testing.T) {
|
func TestHeadMintAfterTruncation(t *testing.T) {
|
||||||
chunkRange := int64(2000)
|
chunkRange := int64(2000)
|
||||||
head, _ := newTestHead(t, chunkRange, false, false)
|
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
|
||||||
|
|
||||||
app := head.Appender(context.Background())
|
app := head.Appender(context.Background())
|
||||||
_, err := app.Append(0, labels.FromStrings("a", "b"), 100, 100)
|
_, err := app.Append(0, labels.FromStrings("a", "b"), 100, 100)
|
||||||
|
@ -2594,7 +2594,7 @@ func TestHeadMintAfterTruncation(t *testing.T) {
|
||||||
|
|
||||||
func TestHeadExemplars(t *testing.T) {
|
func TestHeadExemplars(t *testing.T) {
|
||||||
chunkRange := int64(2000)
|
chunkRange := int64(2000)
|
||||||
head, _ := newTestHead(t, chunkRange, false, false)
|
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, false)
|
||||||
app := head.Appender(context.Background())
|
app := head.Appender(context.Background())
|
||||||
|
|
||||||
l := labels.FromStrings("traceId", "123")
|
l := labels.FromStrings("traceId", "123")
|
||||||
|
@ -2616,7 +2616,7 @@ func TestHeadExemplars(t *testing.T) {
|
||||||
|
|
||||||
func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
|
func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
|
||||||
chunkRange := int64(2000)
|
chunkRange := int64(2000)
|
||||||
head, _ := newTestHead(b, chunkRange, false, false)
|
head, _ := newTestHead(b, chunkRange, wlog.CompressionNone, false)
|
||||||
b.Cleanup(func() { require.NoError(b, head.Close()) })
|
b.Cleanup(func() { require.NoError(b, head.Close()) })
|
||||||
|
|
||||||
app := head.Appender(context.Background())
|
app := head.Appender(context.Background())
|
||||||
|
@ -2930,7 +2930,7 @@ func TestAppendHistogram(t *testing.T) {
|
||||||
l := labels.FromStrings("a", "b")
|
l := labels.FromStrings("a", "b")
|
||||||
for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} {
|
for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} {
|
||||||
t.Run(fmt.Sprintf("%d", numHistograms), func(t *testing.T) {
|
t.Run(fmt.Sprintf("%d", numHistograms), func(t *testing.T) {
|
||||||
head, _ := newTestHead(t, 1000, false, false)
|
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
})
|
})
|
||||||
|
@ -3034,7 +3034,7 @@ func TestAppendHistogram(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
||||||
head, _ := newTestHead(t, 3000, false, false)
|
head, _ := newTestHead(t, 3000, wlog.CompressionNone, false)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
})
|
})
|
||||||
|
@ -3188,7 +3188,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
||||||
// Restart head.
|
// Restart head.
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
startHead := func() {
|
startHead := func() {
|
||||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -3217,7 +3217,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestChunkSnapshot(t *testing.T) {
|
func TestChunkSnapshot(t *testing.T) {
|
||||||
head, _ := newTestHead(t, 120*4, false, false)
|
head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
head.opts.EnableMemorySnapshotOnShutdown = false
|
head.opts.EnableMemorySnapshotOnShutdown = false
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
|
@ -3310,7 +3310,7 @@ func TestChunkSnapshot(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
openHeadAndCheckReplay := func() {
|
openHeadAndCheckReplay := func() {
|
||||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -3505,7 +3505,7 @@ func TestChunkSnapshot(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSnapshotError(t *testing.T) {
|
func TestSnapshotError(t *testing.T) {
|
||||||
head, _ := newTestHead(t, 120*4, false, false)
|
head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
head.opts.EnableMemorySnapshotOnShutdown = false
|
head.opts.EnableMemorySnapshotOnShutdown = false
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
|
@ -3562,7 +3562,7 @@ func TestSnapshotError(t *testing.T) {
|
||||||
require.NoError(t, f.Close())
|
require.NoError(t, f.Close())
|
||||||
|
|
||||||
// Create new Head which should replay this snapshot.
|
// Create new Head which should replay this snapshot.
|
||||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// Testing https://github.com/prometheus/prometheus/issues/9437 with the registry.
|
// Testing https://github.com/prometheus/prometheus/issues/9437 with the registry.
|
||||||
head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil)
|
head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil)
|
||||||
|
@ -3579,7 +3579,7 @@ func TestSnapshotError(t *testing.T) {
|
||||||
|
|
||||||
func TestHistogramMetrics(t *testing.T) {
|
func TestHistogramMetrics(t *testing.T) {
|
||||||
numHistograms := 10
|
numHistograms := 10
|
||||||
head, _ := newTestHead(t, 1000, false, false)
|
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
})
|
})
|
||||||
|
@ -3609,7 +3609,7 @@ func TestHistogramMetrics(t *testing.T) {
|
||||||
require.Equal(t, float64(expHSamples), prom_testutil.ToFloat64(head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram)))
|
require.Equal(t, float64(expHSamples), prom_testutil.ToFloat64(head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram)))
|
||||||
|
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -3631,7 +3631,7 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
l := labels.FromStrings("a", "b")
|
l := labels.FromStrings("a", "b")
|
||||||
numHistograms := 20
|
numHistograms := 20
|
||||||
head, _ := newTestHead(t, 100000, false, false)
|
head, _ := newTestHead(t, 100000, wlog.CompressionNone, false)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
})
|
})
|
||||||
|
@ -3778,7 +3778,7 @@ func TestHistogramCounterResetHeader(t *testing.T) {
|
||||||
for _, floatHisto := range []bool{true, false} {
|
for _, floatHisto := range []bool{true, false} {
|
||||||
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
|
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
|
||||||
l := labels.FromStrings("a", "b")
|
l := labels.FromStrings("a", "b")
|
||||||
head, _ := newTestHead(t, 1000, false, false)
|
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
})
|
})
|
||||||
|
@ -4041,7 +4041,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
||||||
// Tests https://github.com/prometheus/prometheus/issues/9725.
|
// Tests https://github.com/prometheus/prometheus/issues/9725.
|
||||||
func TestChunkSnapshotReplayBug(t *testing.T) {
|
func TestChunkSnapshotReplayBug(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Write few series records and samples such that the series references are not in order in the WAL
|
// Write few series records and samples such that the series references are not in order in the WAL
|
||||||
|
@ -4108,7 +4108,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) {
|
||||||
|
|
||||||
func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
|
func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots.
|
// Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots.
|
||||||
|
@ -4146,9 +4146,9 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
|
||||||
// TODO(codesome): Needs test for ooo WAL repair.
|
// TODO(codesome): Needs test for ooo WAL repair.
|
||||||
func TestOOOWalReplay(t *testing.T) {
|
func TestOOOWalReplay(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
|
@ -4193,9 +4193,9 @@ func TestOOOWalReplay(t *testing.T) {
|
||||||
|
|
||||||
// Restart head.
|
// Restart head.
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
|
h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -4230,9 +4230,9 @@ func TestOOOWalReplay(t *testing.T) {
|
||||||
// TestOOOMmapReplay checks the replay at a low level.
|
// TestOOOMmapReplay checks the replay at a low level.
|
||||||
func TestOOOMmapReplay(t *testing.T) {
|
func TestOOOMmapReplay(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
|
@ -4281,9 +4281,9 @@ func TestOOOMmapReplay(t *testing.T) {
|
||||||
// Restart head.
|
// Restart head.
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
|
|
||||||
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
|
h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -4312,7 +4312,7 @@ func TestOOOMmapReplay(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
|
func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
|
||||||
h, _ := newTestHead(t, 1000, false, false)
|
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -4355,7 +4355,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
|
|
||||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, false)
|
wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
h, err = NewHead(nil, nil, wal, nil, h.opts, nil)
|
h, err = NewHead(nil, nil, wal, nil, h.opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -4390,7 +4390,7 @@ func (c *unsupportedChunk) Encoding() chunkenc.Encoding {
|
||||||
// Tests https://github.com/prometheus/prometheus/issues/10277.
|
// Tests https://github.com/prometheus/prometheus/issues/10277.
|
||||||
func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
|
func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
|
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
|
@ -4423,7 +4423,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
|
||||||
addChunks()
|
addChunks()
|
||||||
|
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
|
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
mmapFilePath := filepath.Join(dir, "chunks_head", "000001")
|
mmapFilePath := filepath.Join(dir, "chunks_head", "000001")
|
||||||
|
@ -4449,7 +4449,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
openHead := func() {
|
openHead := func() {
|
||||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
|
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
|
@ -4667,9 +4667,9 @@ func generateBigTestHistograms(n int) []*histogram.Histogram {
|
||||||
|
|
||||||
func TestOOOAppendWithNoSeries(t *testing.T) {
|
func TestOOOAppendWithNoSeries(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
|
@ -4748,9 +4748,9 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
|
||||||
|
|
||||||
func TestHeadMinOOOTimeUpdate(t *testing.T) {
|
func TestHeadMinOOOTimeUpdate(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
|
oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
|
@ -4795,7 +4795,7 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) {
|
||||||
|
|
||||||
func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
|
func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
|
||||||
l := labels.FromStrings("a", "b")
|
l := labels.FromStrings("a", "b")
|
||||||
head, _ := newTestHead(t, 1000, false, false)
|
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
})
|
})
|
||||||
|
@ -4859,7 +4859,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
|
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
|
||||||
|
|
||||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -4870,7 +4870,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
|
||||||
|
|
||||||
func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
|
func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
|
||||||
l := labels.FromStrings("a", "b")
|
l := labels.FromStrings("a", "b")
|
||||||
head, _ := newTestHead(t, 1000, false, false)
|
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
})
|
})
|
||||||
|
@ -4934,7 +4934,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
|
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
|
||||||
|
|
||||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -4944,7 +4944,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSnapshotAheadOfWALError(t *testing.T) {
|
func TestSnapshotAheadOfWALError(t *testing.T) {
|
||||||
head, _ := newTestHead(t, 120*4, false, false)
|
head, _ := newTestHead(t, 120*4, wlog.CompressionNone, false)
|
||||||
head.opts.EnableMemorySnapshotOnShutdown = true
|
head.opts.EnableMemorySnapshotOnShutdown = true
|
||||||
// Add a sample to fill WAL.
|
// Add a sample to fill WAL.
|
||||||
app := head.Appender(context.Background())
|
app := head.Appender(context.Background())
|
||||||
|
@ -4967,7 +4967,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) {
|
||||||
// to keep using the same snapshot directory instead of a random one.
|
// to keep using the same snapshot directory instead of a random one.
|
||||||
require.NoError(t, os.RemoveAll(head.wal.Dir()))
|
require.NoError(t, os.RemoveAll(head.wal.Dir()))
|
||||||
head.opts.EnableMemorySnapshotOnShutdown = false
|
head.opts.EnableMemorySnapshotOnShutdown = false
|
||||||
w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
|
||||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// Add a sample to fill WAL.
|
// Add a sample to fill WAL.
|
||||||
|
@ -4986,7 +4986,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) {
|
||||||
|
|
||||||
// Create new Head which should detect the incorrect index and delete the snapshot.
|
// Create new Head which should detect the incorrect index and delete the snapshot.
|
||||||
head.opts.EnableMemorySnapshotOnShutdown = true
|
head.opts.EnableMemorySnapshotOnShutdown = true
|
||||||
w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
|
||||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, head.Init(math.MinInt64))
|
require.NoError(t, head.Init(math.MinInt64))
|
||||||
|
|
|
@ -1119,7 +1119,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
|
||||||
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
|
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
|
||||||
return stats, errors.Wrap(err, "create chunk snapshot dir")
|
return stats, errors.Wrap(err, "create chunk snapshot dir")
|
||||||
}
|
}
|
||||||
cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled())
|
cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return stats, errors.Wrap(err, "open chunk snapshot")
|
return stats, errors.Wrap(err, "open chunk snapshot")
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||||
)
|
)
|
||||||
|
|
||||||
type chunkInterval struct {
|
type chunkInterval struct {
|
||||||
|
@ -295,7 +296,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
||||||
for perm, intervals := range permutations {
|
for perm, intervals := range permutations {
|
||||||
for _, headChunk := range []bool{false, true} {
|
for _, headChunk := range []bool{false, true} {
|
||||||
t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) {
|
t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) {
|
||||||
h, _ := newTestHead(t, 1000, false, true)
|
h, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}()
|
}()
|
||||||
|
@ -375,7 +376,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
||||||
|
|
||||||
func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
|
func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
|
||||||
chunkRange := int64(2000)
|
chunkRange := int64(2000)
|
||||||
head, _ := newTestHead(t, chunkRange, false, true)
|
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true)
|
||||||
t.Cleanup(func() { require.NoError(t, head.Close()) })
|
t.Cleanup(func() { require.NoError(t, head.Close()) })
|
||||||
|
|
||||||
app := head.Appender(context.Background())
|
app := head.Appender(context.Background())
|
||||||
|
|
|
@ -1226,7 +1226,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
|
||||||
if err := os.RemoveAll(tmpdir); err != nil {
|
if err := os.RemoveAll(tmpdir); err != nil {
|
||||||
return errors.Wrap(err, "cleanup replacement dir")
|
return errors.Wrap(err, "cleanup replacement dir")
|
||||||
}
|
}
|
||||||
repl, err := wlog.New(logger, nil, tmpdir, false)
|
repl, err := wlog.New(logger, nil, tmpdir, wlog.CompressionNone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "open new WAL")
|
return errors.Wrap(err, "open new WAL")
|
||||||
}
|
}
|
||||||
|
|
|
@ -450,7 +450,7 @@ func TestMigrateWAL_Empty(t *testing.T) {
|
||||||
wdir := path.Join(dir, "wal")
|
wdir := path.Join(dir, "wal")
|
||||||
|
|
||||||
// Initialize empty WAL.
|
// Initialize empty WAL.
|
||||||
w, err := wlog.New(nil, nil, wdir, false)
|
w, err := wlog.New(nil, nil, wdir, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, w.Close())
|
require.NoError(t, w.Close())
|
||||||
|
|
||||||
|
@ -493,7 +493,7 @@ func TestMigrateWAL_Fuzz(t *testing.T) {
|
||||||
// Perform migration.
|
// Perform migration.
|
||||||
require.NoError(t, MigrateWAL(nil, wdir))
|
require.NoError(t, MigrateWAL(nil, wdir))
|
||||||
|
|
||||||
w, err := wlog.New(nil, nil, wdir, false)
|
w, err := wlog.New(nil, nil, wdir, wlog.CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// We can properly write some new data after migration.
|
// We can properly write some new data after migration.
|
||||||
|
|
|
@ -134,7 +134,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
|
||||||
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
|
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
|
||||||
return nil, errors.Wrap(err, "create checkpoint dir")
|
return nil, errors.Wrap(err, "create checkpoint dir")
|
||||||
}
|
}
|
||||||
cp, err := New(nil, nil, cpdirtmp, w.CompressionEnabled())
|
cp, err := New(nil, nil, cpdirtmp, w.CompressionType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "open checkpoint")
|
return nil, errors.Wrap(err, "open checkpoint")
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,8 +126,8 @@ func TestCheckpoint(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
var enc record.Encoder
|
var enc record.Encoder
|
||||||
|
@ -303,7 +303,7 @@ func TestCheckpoint(t *testing.T) {
|
||||||
func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
|
func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
|
||||||
// Create a new wlog with invalid data.
|
// Create a new wlog with invalid data.
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
w, err := NewSize(nil, nil, dir, 64*1024, false)
|
w, err := NewSize(nil, nil, dir, 64*1024, CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
var enc record.Encoder
|
var enc record.Encoder
|
||||||
require.NoError(t, w.Log(enc.Series([]record.RefSeries{
|
require.NoError(t, w.Log(enc.Series([]record.RefSeries{
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
"github.com/go-kit/log/level"
|
"github.com/go-kit/log/level"
|
||||||
"github.com/golang/snappy"
|
"github.com/golang/snappy"
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
@ -51,10 +52,14 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics {
|
||||||
|
|
||||||
// NewLiveReader returns a new live reader.
|
// NewLiveReader returns a new live reader.
|
||||||
func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader {
|
func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader {
|
||||||
|
// Calling zstd.NewReader with a nil io.Reader and no options cannot return an error.
|
||||||
|
zstdReader, _ := zstd.NewReader(nil)
|
||||||
|
|
||||||
lr := &LiveReader{
|
lr := &LiveReader{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
rdr: r,
|
rdr: r,
|
||||||
metrics: metrics,
|
zstdReader: zstdReader,
|
||||||
|
metrics: metrics,
|
||||||
|
|
||||||
// Until we understand how they come about, make readers permissive
|
// Until we understand how they come about, make readers permissive
|
||||||
// to records spanning pages.
|
// to records spanning pages.
|
||||||
|
@ -68,17 +73,18 @@ func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) *
|
||||||
// that are still in the process of being written, and returns records as soon
|
// that are still in the process of being written, and returns records as soon
|
||||||
// as they can be read.
|
// as they can be read.
|
||||||
type LiveReader struct {
|
type LiveReader struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
rdr io.Reader
|
rdr io.Reader
|
||||||
err error
|
err error
|
||||||
rec []byte
|
rec []byte
|
||||||
snappyBuf []byte
|
compressBuf []byte
|
||||||
hdr [recordHeaderSize]byte
|
zstdReader *zstd.Decoder
|
||||||
buf [pageSize]byte
|
hdr [recordHeaderSize]byte
|
||||||
readIndex int // Index in buf to start at for next read.
|
buf [pageSize]byte
|
||||||
writeIndex int // Index in buf to start at for next write.
|
readIndex int // Index in buf to start at for next read.
|
||||||
total int64 // Total bytes processed during reading in calls to Next().
|
writeIndex int // Index in buf to start at for next write.
|
||||||
index int // Used to track partial records, should be 0 at the start of every new record.
|
total int64 // Total bytes processed during reading in calls to Next().
|
||||||
|
index int // Used to track partial records, should be 0 at the start of every new record.
|
||||||
|
|
||||||
// For testing, we can treat EOF as a non-error.
|
// For testing, we can treat EOF as a non-error.
|
||||||
eofNonErr bool
|
eofNonErr bool
|
||||||
|
@ -191,12 +197,14 @@ func (r *LiveReader) buildRecord() (bool, error) {
|
||||||
rt := recTypeFromHeader(r.hdr[0])
|
rt := recTypeFromHeader(r.hdr[0])
|
||||||
if rt == recFirst || rt == recFull {
|
if rt == recFirst || rt == recFull {
|
||||||
r.rec = r.rec[:0]
|
r.rec = r.rec[:0]
|
||||||
r.snappyBuf = r.snappyBuf[:0]
|
r.compressBuf = r.compressBuf[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
compressed := r.hdr[0]&snappyMask != 0
|
isSnappyCompressed := r.hdr[0]&snappyMask == snappyMask
|
||||||
if compressed {
|
isZstdCompressed := r.hdr[0]&zstdMask == zstdMask
|
||||||
r.snappyBuf = append(r.snappyBuf, temp...)
|
|
||||||
|
if isSnappyCompressed || isZstdCompressed {
|
||||||
|
r.compressBuf = append(r.compressBuf, temp...)
|
||||||
} else {
|
} else {
|
||||||
r.rec = append(r.rec, temp...)
|
r.rec = append(r.rec, temp...)
|
||||||
}
|
}
|
||||||
|
@ -207,12 +215,17 @@ func (r *LiveReader) buildRecord() (bool, error) {
|
||||||
}
|
}
|
||||||
if rt == recLast || rt == recFull {
|
if rt == recLast || rt == recFull {
|
||||||
r.index = 0
|
r.index = 0
|
||||||
if compressed && len(r.snappyBuf) > 0 {
|
if isSnappyCompressed && len(r.compressBuf) > 0 {
|
||||||
// The snappy library uses `len` to calculate if we need a new buffer.
|
// The snappy library uses `len` to calculate if we need a new buffer.
|
||||||
// In order to allocate as few buffers as possible make the length
|
// In order to allocate as few buffers as possible make the length
|
||||||
// equal to the capacity.
|
// equal to the capacity.
|
||||||
r.rec = r.rec[:cap(r.rec)]
|
r.rec = r.rec[:cap(r.rec)]
|
||||||
r.rec, err = snappy.Decode(r.rec, r.snappyBuf)
|
r.rec, err = snappy.Decode(r.rec, r.compressBuf)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
} else if isZstdCompressed && len(r.compressBuf) > 0 {
|
||||||
|
r.rec, err = r.zstdReader.DecodeAll(r.compressBuf, r.rec[:0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,23 +20,27 @@ import (
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/golang/snappy"
|
"github.com/golang/snappy"
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Reader reads WAL records from an io.Reader.
|
// Reader reads WAL records from an io.Reader.
|
||||||
type Reader struct {
|
type Reader struct {
|
||||||
rdr io.Reader
|
rdr io.Reader
|
||||||
err error
|
err error
|
||||||
rec []byte
|
rec []byte
|
||||||
snappyBuf []byte
|
compressBuf []byte
|
||||||
buf [pageSize]byte
|
zstdReader *zstd.Decoder
|
||||||
total int64 // Total bytes processed.
|
buf [pageSize]byte
|
||||||
curRecTyp recType // Used for checking that the last record is not torn.
|
total int64 // Total bytes processed.
|
||||||
|
curRecTyp recType // Used for checking that the last record is not torn.
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReader returns a new reader.
|
// NewReader returns a new reader.
|
||||||
func NewReader(r io.Reader) *Reader {
|
func NewReader(r io.Reader) *Reader {
|
||||||
return &Reader{rdr: r}
|
// Calling zstd.NewReader with a nil io.Reader and no options cannot return an error.
|
||||||
|
zstdReader, _ := zstd.NewReader(nil)
|
||||||
|
return &Reader{rdr: r, zstdReader: zstdReader}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next advances the reader to the next records and returns true if it exists.
|
// Next advances the reader to the next records and returns true if it exists.
|
||||||
|
@ -63,7 +67,7 @@ func (r *Reader) next() (err error) {
|
||||||
buf := r.buf[recordHeaderSize:]
|
buf := r.buf[recordHeaderSize:]
|
||||||
|
|
||||||
r.rec = r.rec[:0]
|
r.rec = r.rec[:0]
|
||||||
r.snappyBuf = r.snappyBuf[:0]
|
r.compressBuf = r.compressBuf[:0]
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
for {
|
for {
|
||||||
|
@ -72,7 +76,8 @@ func (r *Reader) next() (err error) {
|
||||||
}
|
}
|
||||||
r.total++
|
r.total++
|
||||||
r.curRecTyp = recTypeFromHeader(hdr[0])
|
r.curRecTyp = recTypeFromHeader(hdr[0])
|
||||||
compressed := hdr[0]&snappyMask != 0
|
isSnappyCompressed := hdr[0]&snappyMask == snappyMask
|
||||||
|
isZstdCompressed := hdr[0]&zstdMask == zstdMask
|
||||||
|
|
||||||
// Gobble up zero bytes.
|
// Gobble up zero bytes.
|
||||||
if r.curRecTyp == recPageTerm {
|
if r.curRecTyp == recPageTerm {
|
||||||
|
@ -128,8 +133,8 @@ func (r *Reader) next() (err error) {
|
||||||
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
|
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
|
||||||
}
|
}
|
||||||
|
|
||||||
if compressed {
|
if isSnappyCompressed || isZstdCompressed {
|
||||||
r.snappyBuf = append(r.snappyBuf, buf[:length]...)
|
r.compressBuf = append(r.compressBuf, buf[:length]...)
|
||||||
} else {
|
} else {
|
||||||
r.rec = append(r.rec, buf[:length]...)
|
r.rec = append(r.rec, buf[:length]...)
|
||||||
}
|
}
|
||||||
|
@ -138,12 +143,15 @@ func (r *Reader) next() (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if r.curRecTyp == recLast || r.curRecTyp == recFull {
|
if r.curRecTyp == recLast || r.curRecTyp == recFull {
|
||||||
if compressed && len(r.snappyBuf) > 0 {
|
if isSnappyCompressed && len(r.compressBuf) > 0 {
|
||||||
// The snappy library uses `len` to calculate if we need a new buffer.
|
// The snappy library uses `len` to calculate if we need a new buffer.
|
||||||
// In order to allocate as few buffers as possible make the length
|
// In order to allocate as few buffers as possible make the length
|
||||||
// equal to the capacity.
|
// equal to the capacity.
|
||||||
r.rec = r.rec[:cap(r.rec)]
|
r.rec = r.rec[:cap(r.rec)]
|
||||||
r.rec, err = snappy.Decode(r.rec, r.snappyBuf)
|
r.rec, err = snappy.Decode(r.rec, r.compressBuf)
|
||||||
|
return err
|
||||||
|
} else if isZstdCompressed && len(r.compressBuf) > 0 {
|
||||||
|
r.rec, err = r.zstdReader.DecodeAll(r.compressBuf, r.rec[:0])
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -310,8 +310,8 @@ func allSegments(dir string) (io.ReadCloser, error) {
|
||||||
|
|
||||||
func TestReaderFuzz(t *testing.T) {
|
func TestReaderFuzz(t *testing.T) {
|
||||||
for name, fn := range readerConstructors {
|
for name, fn := range readerConstructors {
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("%s,compress=%s", name, compress), func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
w, err := NewSize(nil, nil, dir, 128*pageSize, compress)
|
w, err := NewSize(nil, nil, dir, 128*pageSize, compress)
|
||||||
|
@ -349,8 +349,8 @@ func TestReaderFuzz(t *testing.T) {
|
||||||
|
|
||||||
func TestReaderFuzz_Live(t *testing.T) {
|
func TestReaderFuzz_Live(t *testing.T) {
|
||||||
logger := testutil.NewLogger(t)
|
logger := testutil.NewLogger(t)
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
w, err := NewSize(nil, nil, dir, 128*pageSize, compress)
|
w, err := NewSize(nil, nil, dir, 128*pageSize, compress)
|
||||||
|
@ -439,7 +439,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
|
||||||
logger := testutil.NewLogger(t)
|
logger := testutil.NewLogger(t)
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
w, err := NewSize(nil, nil, dir, pageSize, false)
|
w, err := NewSize(nil, nil, dir, pageSize, CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
rec := make([]byte, pageSize-recordHeaderSize)
|
rec := make([]byte, pageSize-recordHeaderSize)
|
||||||
|
@ -479,7 +479,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
|
||||||
logger := testutil.NewLogger(t)
|
logger := testutil.NewLogger(t)
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
w, err := NewSize(nil, nil, dir, pageSize*2, false)
|
w, err := NewSize(nil, nil, dir, pageSize*2, CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
rec := make([]byte, pageSize-recordHeaderSize)
|
rec := make([]byte, pageSize-recordHeaderSize)
|
||||||
|
@ -526,7 +526,7 @@ func TestReaderData(t *testing.T) {
|
||||||
|
|
||||||
for name, fn := range readerConstructors {
|
for name, fn := range readerConstructors {
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
w, err := New(nil, nil, dir, true)
|
w, err := New(nil, nil, dir, CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
sr, err := allSegments(dir)
|
sr, err := allSegments(dir)
|
||||||
|
|
|
@ -122,8 +122,8 @@ func TestTailSamples(t *testing.T) {
|
||||||
const samplesCount = 250
|
const samplesCount = 250
|
||||||
const exemplarsCount = 25
|
const exemplarsCount = 25
|
||||||
const histogramsCount = 50
|
const histogramsCount = 50
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
@ -246,8 +246,8 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
|
||||||
const seriesCount = 10
|
const seriesCount = 10
|
||||||
const samplesCount = 250
|
const samplesCount = 250
|
||||||
|
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
wdir := path.Join(dir, "wal")
|
wdir := path.Join(dir, "wal")
|
||||||
err := os.Mkdir(wdir, 0o777)
|
err := os.Mkdir(wdir, 0o777)
|
||||||
|
@ -314,8 +314,8 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
|
||||||
const seriesCount = 10
|
const seriesCount = 10
|
||||||
const samplesCount = 250
|
const samplesCount = 250
|
||||||
|
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
wdir := path.Join(dir, "wal")
|
wdir := path.Join(dir, "wal")
|
||||||
|
@ -402,8 +402,8 @@ func TestReadCheckpoint(t *testing.T) {
|
||||||
const seriesCount = 10
|
const seriesCount = 10
|
||||||
const samplesCount = 250
|
const samplesCount = 250
|
||||||
|
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
wdir := path.Join(dir, "wal")
|
wdir := path.Join(dir, "wal")
|
||||||
|
@ -475,8 +475,8 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
|
||||||
const seriesCount = 20
|
const seriesCount = 20
|
||||||
const samplesCount = 300
|
const samplesCount = 300
|
||||||
|
|
||||||
for _, compress := range []bool{false, true} {
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
wdir := path.Join(dir, "wal")
|
wdir := path.Join(dir, "wal")
|
||||||
|
@ -546,15 +546,15 @@ func TestCheckpointSeriesReset(t *testing.T) {
|
||||||
const seriesCount = 20
|
const seriesCount = 20
|
||||||
const samplesCount = 350
|
const samplesCount = 350
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
compress bool
|
compress CompressionType
|
||||||
segments int
|
segments int
|
||||||
}{
|
}{
|
||||||
{compress: false, segments: 14},
|
{compress: CompressionNone, segments: 14},
|
||||||
{compress: true, segments: 13},
|
{compress: CompressionSnappy, segments: 13},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(fmt.Sprintf("compress=%t", tc.compress), func(t *testing.T) {
|
t.Run(fmt.Sprintf("compress=%s", tc.compress), func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
wdir := path.Join(dir, "wal")
|
wdir := path.Join(dir, "wal")
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
"github.com/go-kit/log/level"
|
"github.com/go-kit/log/level"
|
||||||
"github.com/golang/snappy"
|
"github.com/golang/snappy"
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
|
@ -164,6 +165,26 @@ func OpenReadSegment(fn string) (*Segment, error) {
|
||||||
return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil
|
return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CompressionType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
CompressionNone CompressionType = "none"
|
||||||
|
CompressionSnappy CompressionType = "snappy"
|
||||||
|
CompressionZstd CompressionType = "zstd"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ParseCompressionType parses the two compression-related configuration values and returns the CompressionType. If
|
||||||
|
// compression is enabled but the compressType is unrecognized, we default to Snappy compression.
|
||||||
|
func ParseCompressionType(compress bool, compressType string) CompressionType {
|
||||||
|
if compress {
|
||||||
|
if compressType == "zstd" {
|
||||||
|
return CompressionZstd
|
||||||
|
}
|
||||||
|
return CompressionSnappy
|
||||||
|
}
|
||||||
|
return CompressionNone
|
||||||
|
}
|
||||||
|
|
||||||
// WL is a write log that stores records in segment files.
|
// WL is a write log that stores records in segment files.
|
||||||
// It must be read from start to end once before logging new data.
|
// It must be read from start to end once before logging new data.
|
||||||
// If an error occurs during read, the repair procedure must be called
|
// If an error occurs during read, the repair procedure must be called
|
||||||
|
@ -185,8 +206,9 @@ type WL struct {
|
||||||
stopc chan chan struct{}
|
stopc chan chan struct{}
|
||||||
actorc chan func()
|
actorc chan func()
|
||||||
closed bool // To allow calling Close() more than once without blocking.
|
closed bool // To allow calling Close() more than once without blocking.
|
||||||
compress bool
|
compress CompressionType
|
||||||
snappyBuf []byte
|
compressBuf []byte
|
||||||
|
zstdWriter *zstd.Encoder
|
||||||
|
|
||||||
WriteNotified WriteNotified
|
WriteNotified WriteNotified
|
||||||
|
|
||||||
|
@ -265,13 +287,13 @@ func newWLMetrics(w *WL, r prometheus.Registerer) *wlMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new WAL over the given directory.
|
// New returns a new WAL over the given directory.
|
||||||
func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WL, error) {
|
func New(logger log.Logger, reg prometheus.Registerer, dir string, compress CompressionType) (*WL, error) {
|
||||||
return NewSize(logger, reg, dir, DefaultSegmentSize, compress)
|
return NewSize(logger, reg, dir, DefaultSegmentSize, compress)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSize returns a new write log over the given directory.
|
// NewSize returns a new write log over the given directory.
|
||||||
// New segments are created with the specified size.
|
// New segments are created with the specified size.
|
||||||
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WL, error) {
|
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress CompressionType) (*WL, error) {
|
||||||
if segmentSize%pageSize != 0 {
|
if segmentSize%pageSize != 0 {
|
||||||
return nil, errors.New("invalid segment size")
|
return nil, errors.New("invalid segment size")
|
||||||
}
|
}
|
||||||
|
@ -281,6 +303,16 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.NewNopLogger()
|
logger = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var zstdWriter *zstd.Encoder
|
||||||
|
if compress == CompressionZstd {
|
||||||
|
var err error
|
||||||
|
zstdWriter, err = zstd.NewWriter(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
w := &WL{
|
w := &WL{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
@ -289,6 +321,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
|
||||||
actorc: make(chan func(), 100),
|
actorc: make(chan func(), 100),
|
||||||
stopc: make(chan chan struct{}),
|
stopc: make(chan chan struct{}),
|
||||||
compress: compress,
|
compress: compress,
|
||||||
|
zstdWriter: zstdWriter,
|
||||||
}
|
}
|
||||||
prefix := "prometheus_tsdb_wal_"
|
prefix := "prometheus_tsdb_wal_"
|
||||||
if filepath.Base(dir) == WblDirName {
|
if filepath.Base(dir) == WblDirName {
|
||||||
|
@ -327,16 +360,22 @@ func Open(logger log.Logger, dir string) (*WL, error) {
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.NewNopLogger()
|
logger = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
zstdWriter, err := zstd.NewWriter(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
w := &WL{
|
w := &WL{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
zstdWriter: zstdWriter,
|
||||||
}
|
}
|
||||||
|
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompressionEnabled returns if compression is enabled on this WAL.
|
// CompressionType returns if compression is enabled on this WAL.
|
||||||
func (w *WL) CompressionEnabled() bool {
|
func (w *WL) CompressionType() CompressionType {
|
||||||
return w.compress
|
return w.compress
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -583,9 +622,10 @@ func (w *WL) flushPage(clear bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// First Byte of header format:
|
// First Byte of header format:
|
||||||
// [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ]
|
// [3 bits unallocated] [1 bit zstd compression flag] [1 bit snappy compression flag] [3 bit record type ]
|
||||||
const (
|
const (
|
||||||
snappyMask = 1 << 3
|
snappyMask = 1 << 3
|
||||||
|
zstdMask = 1 << 4
|
||||||
recTypeMask = snappyMask - 1
|
recTypeMask = snappyMask - 1
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -655,17 +695,23 @@ func (w *WL) log(rec []byte, final bool) error {
|
||||||
|
|
||||||
// Compress the record before calculating if a new segment is needed.
|
// Compress the record before calculating if a new segment is needed.
|
||||||
compressed := false
|
compressed := false
|
||||||
if w.compress &&
|
if w.compress == CompressionSnappy && len(rec) > 0 {
|
||||||
len(rec) > 0 &&
|
|
||||||
// If MaxEncodedLen is less than 0 the record is too large to be compressed.
|
// If MaxEncodedLen is less than 0 the record is too large to be compressed.
|
||||||
snappy.MaxEncodedLen(len(rec)) >= 0 {
|
if len(rec) > 0 && snappy.MaxEncodedLen(len(rec)) >= 0 {
|
||||||
// The snappy library uses `len` to calculate if we need a new buffer.
|
// The snappy library uses `len` to calculate if we need a new buffer.
|
||||||
// In order to allocate as few buffers as possible make the length
|
// In order to allocate as few buffers as possible make the length
|
||||||
// equal to the capacity.
|
// equal to the capacity.
|
||||||
w.snappyBuf = w.snappyBuf[:cap(w.snappyBuf)]
|
w.compressBuf = w.compressBuf[:cap(w.compressBuf)]
|
||||||
w.snappyBuf = snappy.Encode(w.snappyBuf, rec)
|
w.compressBuf = snappy.Encode(w.compressBuf, rec)
|
||||||
if len(w.snappyBuf) < len(rec) {
|
if len(w.compressBuf) < len(rec) {
|
||||||
rec = w.snappyBuf
|
rec = w.compressBuf
|
||||||
|
compressed = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if w.compress == CompressionZstd && len(rec) > 0 {
|
||||||
|
w.compressBuf = w.zstdWriter.EncodeAll(rec, w.compressBuf[:0])
|
||||||
|
if len(w.compressBuf) < len(rec) {
|
||||||
|
rec = w.compressBuf
|
||||||
compressed = true
|
compressed = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -706,7 +752,11 @@ func (w *WL) log(rec []byte, final bool) error {
|
||||||
typ = recMiddle
|
typ = recMiddle
|
||||||
}
|
}
|
||||||
if compressed {
|
if compressed {
|
||||||
typ |= snappyMask
|
if w.compress == CompressionSnappy {
|
||||||
|
typ |= snappyMask
|
||||||
|
} else if w.compress == CompressionZstd {
|
||||||
|
typ |= zstdMask
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
buf[0] = byte(typ)
|
buf[0] = byte(typ)
|
||||||
|
|
|
@ -124,7 +124,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
|
||||||
// then corrupt a given record in a given segment.
|
// then corrupt a given record in a given segment.
|
||||||
// As a result we want a repaired WAL with given intact records.
|
// As a result we want a repaired WAL with given intact records.
|
||||||
segSize := 3 * pageSize
|
segSize := 3 * pageSize
|
||||||
w, err := NewSize(nil, nil, dir, segSize, false)
|
w, err := NewSize(nil, nil, dir, segSize, CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var records [][]byte
|
var records [][]byte
|
||||||
|
@ -149,7 +149,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, f.Close())
|
require.NoError(t, f.Close())
|
||||||
|
|
||||||
w, err = NewSize(nil, nil, dir, segSize, false)
|
w, err = NewSize(nil, nil, dir, segSize, CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
|
|
||||||
|
@ -223,7 +223,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
|
||||||
// Produce a WAL with a two segments of 3 pages with 3 records each,
|
// Produce a WAL with a two segments of 3 pages with 3 records each,
|
||||||
// so when we truncate the file we're guaranteed to split a record.
|
// so when we truncate the file we're guaranteed to split a record.
|
||||||
{
|
{
|
||||||
w, err := NewSize(logger, nil, dir, segmentSize, false)
|
w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
for i := 0; i < 18; i++ {
|
for i := 0; i < 18; i++ {
|
||||||
|
@ -294,7 +294,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
|
||||||
err = sr.Close()
|
err = sr.Close()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
w, err := NewSize(logger, nil, dir, segmentSize, false)
|
w, err := NewSize(logger, nil, dir, segmentSize, CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = w.Repair(corruptionErr)
|
err = w.Repair(corruptionErr)
|
||||||
|
@ -337,7 +337,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
|
||||||
// TestClose ensures that calling Close more than once doesn't panic and doesn't block.
|
// TestClose ensures that calling Close more than once doesn't panic and doesn't block.
|
||||||
func TestClose(t *testing.T) {
|
func TestClose(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
w, err := NewSize(nil, nil, dir, pageSize, false)
|
w, err := NewSize(nil, nil, dir, pageSize, CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, w.Close())
|
require.NoError(t, w.Close())
|
||||||
require.Error(t, w.Close())
|
require.Error(t, w.Close())
|
||||||
|
@ -350,7 +350,7 @@ func TestSegmentMetric(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
w, err := NewSize(nil, nil, dir, segmentSize, false)
|
w, err := NewSize(nil, nil, dir, segmentSize, CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
initialSegment := client_testutil.ToFloat64(w.metrics.currentSegment)
|
initialSegment := client_testutil.ToFloat64(w.metrics.currentSegment)
|
||||||
|
@ -369,7 +369,7 @@ func TestSegmentMetric(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCompression(t *testing.T) {
|
func TestCompression(t *testing.T) {
|
||||||
bootstrap := func(compressed bool) string {
|
bootstrap := func(compressed CompressionType) string {
|
||||||
const (
|
const (
|
||||||
segmentSize = pageSize
|
segmentSize = pageSize
|
||||||
recordSize = (pageSize / 2) - recordHeaderSize
|
recordSize = (pageSize / 2) - recordHeaderSize
|
||||||
|
@ -390,21 +390,27 @@ func TestCompression(t *testing.T) {
|
||||||
return dirPath
|
return dirPath
|
||||||
}
|
}
|
||||||
|
|
||||||
dirCompressed := bootstrap(true)
|
tmpDirs := make([]string, 0, 3)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, os.RemoveAll(dirCompressed))
|
for _, dir := range tmpDirs {
|
||||||
}()
|
require.NoError(t, os.RemoveAll(dir))
|
||||||
dirUnCompressed := bootstrap(false)
|
}
|
||||||
defer func() {
|
|
||||||
require.NoError(t, os.RemoveAll(dirUnCompressed))
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
uncompressedSize, err := fileutil.DirSize(dirUnCompressed)
|
dirUnCompressed := bootstrap(CompressionNone)
|
||||||
require.NoError(t, err)
|
tmpDirs = append(tmpDirs, dirUnCompressed)
|
||||||
compressedSize, err := fileutil.DirSize(dirCompressed)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.Greater(t, float64(uncompressedSize)*0.75, float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize)
|
for _, compressionType := range []CompressionType{CompressionSnappy, CompressionZstd} {
|
||||||
|
dirCompressed := bootstrap(compressionType)
|
||||||
|
tmpDirs = append(tmpDirs, dirCompressed)
|
||||||
|
|
||||||
|
uncompressedSize, err := fileutil.DirSize(dirUnCompressed)
|
||||||
|
require.NoError(t, err)
|
||||||
|
compressedSize, err := fileutil.DirSize(dirCompressed)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Greater(t, float64(uncompressedSize)*0.75, float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLogPartialWrite(t *testing.T) {
|
func TestLogPartialWrite(t *testing.T) {
|
||||||
|
@ -438,7 +444,7 @@ func TestLogPartialWrite(t *testing.T) {
|
||||||
t.Run(testName, func(t *testing.T) {
|
t.Run(testName, func(t *testing.T) {
|
||||||
dirPath := t.TempDir()
|
dirPath := t.TempDir()
|
||||||
|
|
||||||
w, err := NewSize(nil, nil, dirPath, segmentSize, false)
|
w, err := NewSize(nil, nil, dirPath, segmentSize, CompressionNone)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Replace the underlying segment file with a mocked one that injects a failure.
|
// Replace the underlying segment file with a mocked one that injects a failure.
|
||||||
|
@ -505,8 +511,8 @@ func (f *faultySegmentFile) Write(p []byte) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkWAL_LogBatched(b *testing.B) {
|
func BenchmarkWAL_LogBatched(b *testing.B) {
|
||||||
for _, compress := range []bool{true, false} {
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) {
|
b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) {
|
||||||
dir := b.TempDir()
|
dir := b.TempDir()
|
||||||
|
|
||||||
w, err := New(nil, nil, dir, compress)
|
w, err := New(nil, nil, dir, compress)
|
||||||
|
@ -535,8 +541,8 @@ func BenchmarkWAL_LogBatched(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkWAL_Log(b *testing.B) {
|
func BenchmarkWAL_Log(b *testing.B) {
|
||||||
for _, compress := range []bool{true, false} {
|
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
|
||||||
b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) {
|
b.Run(fmt.Sprintf("compress=%s", compress), func(b *testing.B) {
|
||||||
dir := b.TempDir()
|
dir := b.TempDir()
|
||||||
|
|
||||||
w, err := New(nil, nil, dir, compress)
|
w, err := New(nil, nil, dir, compress)
|
||||||
|
|
Loading…
Reference in a new issue