mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 05:47:27 -08:00
Provide option to compress WAL records (#609)
In running Prometheus instances, compressing the records was shown to reduce disk usage by half while incurring a negligible CPU cost. Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
parent
c2c921af75
commit
b40cc43958
|
@ -1,5 +1,5 @@
|
||||||
## master / unreleased
|
## master / unreleased
|
||||||
|
- [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609)
|
||||||
|
|
||||||
## 0.8.0
|
## 0.8.0
|
||||||
- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.
|
- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.
|
||||||
|
|
|
@ -135,7 +135,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
|
||||||
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
|
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
|
||||||
return nil, errors.Wrap(err, "create checkpoint dir")
|
return nil, errors.Wrap(err, "create checkpoint dir")
|
||||||
}
|
}
|
||||||
cp, err := wal.New(nil, nil, cpdirtmp)
|
cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "open checkpoint")
|
return nil, errors.Wrap(err, "open checkpoint")
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,108 +86,112 @@ func TestDeleteCheckpoints(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckpoint(t *testing.T) {
|
func TestCheckpoint(t *testing.T) {
|
||||||
dir, err := ioutil.TempDir("", "test_checkpoint")
|
for _, compress := range []bool{false, true} {
|
||||||
testutil.Ok(t, err)
|
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
||||||
defer func() {
|
dir, err := ioutil.TempDir("", "test_checkpoint")
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
|
||||||
}()
|
|
||||||
|
|
||||||
var enc RecordEncoder
|
|
||||||
// Create a dummy segment to bump the initial number.
|
|
||||||
seg, err := wal.CreateSegment(dir, 100)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
testutil.Ok(t, seg.Close())
|
|
||||||
|
|
||||||
// Manually create checkpoint for 99 and earlier.
|
|
||||||
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"))
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
|
|
||||||
// Add some data we expect to be around later.
|
|
||||||
err = w.Log(enc.Series([]RefSeries{
|
|
||||||
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
|
|
||||||
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
|
|
||||||
}, nil))
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
testutil.Ok(t, w.Close())
|
|
||||||
|
|
||||||
// Start a WAL and write records to it as usual.
|
|
||||||
w, err = wal.NewSize(nil, nil, dir, 64*1024)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
|
|
||||||
var last int64
|
|
||||||
for i := 0; ; i++ {
|
|
||||||
_, n, err := w.Segments()
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
if n >= 106 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// Write some series initially.
|
|
||||||
if i == 0 {
|
|
||||||
b := enc.Series([]RefSeries{
|
|
||||||
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
|
|
||||||
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
|
|
||||||
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
|
|
||||||
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
|
|
||||||
}, nil)
|
|
||||||
testutil.Ok(t, w.Log(b))
|
|
||||||
}
|
|
||||||
// Write samples until the WAL has enough segments.
|
|
||||||
// Make them have drifting timestamps within a record to see that they
|
|
||||||
// get filtered properly.
|
|
||||||
b := enc.Samples([]RefSample{
|
|
||||||
{Ref: 0, T: last, V: float64(i)},
|
|
||||||
{Ref: 1, T: last + 10000, V: float64(i)},
|
|
||||||
{Ref: 2, T: last + 20000, V: float64(i)},
|
|
||||||
{Ref: 3, T: last + 30000, V: float64(i)},
|
|
||||||
}, nil)
|
|
||||||
testutil.Ok(t, w.Log(b))
|
|
||||||
|
|
||||||
last += 100
|
|
||||||
}
|
|
||||||
testutil.Ok(t, w.Close())
|
|
||||||
|
|
||||||
_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
|
|
||||||
return x%2 == 0
|
|
||||||
}, last/2)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
testutil.Ok(t, w.Truncate(107))
|
|
||||||
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))
|
|
||||||
|
|
||||||
// Only the new checkpoint should be left.
|
|
||||||
files, err := fileutil.ReadDir(dir)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
testutil.Equals(t, 1, len(files))
|
|
||||||
testutil.Equals(t, "checkpoint.000106", files[0])
|
|
||||||
|
|
||||||
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer sr.Close()
|
|
||||||
|
|
||||||
var dec RecordDecoder
|
|
||||||
var series []RefSeries
|
|
||||||
r := wal.NewReader(sr)
|
|
||||||
|
|
||||||
for r.Next() {
|
|
||||||
rec := r.Record()
|
|
||||||
|
|
||||||
switch dec.Type(rec) {
|
|
||||||
case RecordSeries:
|
|
||||||
series, err = dec.Series(rec, series)
|
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
case RecordSamples:
|
defer func() {
|
||||||
samples, err := dec.Samples(rec, nil)
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
|
var enc RecordEncoder
|
||||||
|
// Create a dummy segment to bump the initial number.
|
||||||
|
seg, err := wal.CreateSegment(dir, 100)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
for _, s := range samples {
|
testutil.Ok(t, seg.Close())
|
||||||
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
|
|
||||||
|
// Manually create checkpoint for 99 and earlier.
|
||||||
|
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
// Add some data we expect to be around later.
|
||||||
|
err = w.Log(enc.Series([]RefSeries{
|
||||||
|
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
|
||||||
|
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
|
||||||
|
}, nil))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Ok(t, w.Close())
|
||||||
|
|
||||||
|
// Start a WAL and write records to it as usual.
|
||||||
|
w, err = wal.NewSize(nil, nil, dir, 64*1024, compress)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
var last int64
|
||||||
|
for i := 0; ; i++ {
|
||||||
|
_, n, err := w.Segments()
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
if n >= 106 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Write some series initially.
|
||||||
|
if i == 0 {
|
||||||
|
b := enc.Series([]RefSeries{
|
||||||
|
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
|
||||||
|
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
|
||||||
|
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
|
||||||
|
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
|
||||||
|
}, nil)
|
||||||
|
testutil.Ok(t, w.Log(b))
|
||||||
|
}
|
||||||
|
// Write samples until the WAL has enough segments.
|
||||||
|
// Make them have drifting timestamps within a record to see that they
|
||||||
|
// get filtered properly.
|
||||||
|
b := enc.Samples([]RefSample{
|
||||||
|
{Ref: 0, T: last, V: float64(i)},
|
||||||
|
{Ref: 1, T: last + 10000, V: float64(i)},
|
||||||
|
{Ref: 2, T: last + 20000, V: float64(i)},
|
||||||
|
{Ref: 3, T: last + 30000, V: float64(i)},
|
||||||
|
}, nil)
|
||||||
|
testutil.Ok(t, w.Log(b))
|
||||||
|
|
||||||
|
last += 100
|
||||||
}
|
}
|
||||||
}
|
testutil.Ok(t, w.Close())
|
||||||
|
|
||||||
|
_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
|
||||||
|
return x%2 == 0
|
||||||
|
}, last/2)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Ok(t, w.Truncate(107))
|
||||||
|
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))
|
||||||
|
|
||||||
|
// Only the new checkpoint should be left.
|
||||||
|
files, err := fileutil.ReadDir(dir)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Equals(t, 1, len(files))
|
||||||
|
testutil.Equals(t, "checkpoint.000106", files[0])
|
||||||
|
|
||||||
|
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer sr.Close()
|
||||||
|
|
||||||
|
var dec RecordDecoder
|
||||||
|
var series []RefSeries
|
||||||
|
r := wal.NewReader(sr)
|
||||||
|
|
||||||
|
for r.Next() {
|
||||||
|
rec := r.Record()
|
||||||
|
|
||||||
|
switch dec.Type(rec) {
|
||||||
|
case RecordSeries:
|
||||||
|
series, err = dec.Series(rec, series)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
case RecordSamples:
|
||||||
|
samples, err := dec.Samples(rec, nil)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
for _, s := range samples {
|
||||||
|
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
testutil.Ok(t, r.Err())
|
||||||
|
testutil.Equals(t, []RefSeries{
|
||||||
|
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
|
||||||
|
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
|
||||||
|
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
|
||||||
|
}, series)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
testutil.Ok(t, r.Err())
|
|
||||||
testutil.Equals(t, []RefSeries{
|
|
||||||
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
|
|
||||||
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
|
|
||||||
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
|
|
||||||
}, series)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
|
func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
|
||||||
|
@ -197,7 +201,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
w, err := wal.NewSize(nil, nil, dir, 64*1024)
|
w, err := wal.NewSize(nil, nil, dir, 64*1024, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
testutil.Ok(t, w.Log([]byte{99}))
|
testutil.Ok(t, w.Log([]byte{99}))
|
||||||
w.Close()
|
w.Close()
|
||||||
|
|
6
db.go
6
db.go
|
@ -51,6 +51,7 @@ var DefaultOptions = &Options{
|
||||||
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
|
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
|
||||||
NoLockfile: false,
|
NoLockfile: false,
|
||||||
AllowOverlappingBlocks: false,
|
AllowOverlappingBlocks: false,
|
||||||
|
WALCompression: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Options of the DB storage.
|
// Options of the DB storage.
|
||||||
|
@ -80,6 +81,9 @@ type Options struct {
|
||||||
// Overlapping blocks are allowed if AllowOverlappingBlocks is true.
|
// Overlapping blocks are allowed if AllowOverlappingBlocks is true.
|
||||||
// This in-turn enables vertical compaction and vertical query merge.
|
// This in-turn enables vertical compaction and vertical query merge.
|
||||||
AllowOverlappingBlocks bool
|
AllowOverlappingBlocks bool
|
||||||
|
|
||||||
|
// WALCompression will turn on Snappy compression for records on the WAL.
|
||||||
|
WALCompression bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Appender allows appending a batch of data. It must be completed with a
|
// Appender allows appending a batch of data. It must be completed with a
|
||||||
|
@ -306,7 +310,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
if opts.WALSegmentSize > 0 {
|
if opts.WALSegmentSize > 0 {
|
||||||
segmentSize = opts.WALSegmentSize
|
segmentSize = opts.WALSegmentSize
|
||||||
}
|
}
|
||||||
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize)
|
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1404,7 +1404,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
|
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
|
||||||
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
|
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
var enc RecordEncoder
|
var enc RecordEncoder
|
||||||
|
@ -1454,7 +1454,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
||||||
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
|
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
|
||||||
|
|
||||||
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
|
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
|
||||||
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
|
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
var enc RecordEncoder
|
var enc RecordEncoder
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -4,6 +4,7 @@ require (
|
||||||
github.com/cespare/xxhash v1.1.0
|
github.com/cespare/xxhash v1.1.0
|
||||||
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954
|
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954
|
||||||
github.com/go-kit/kit v0.8.0
|
github.com/go-kit/kit v0.8.0
|
||||||
|
github.com/golang/snappy v0.0.1
|
||||||
github.com/oklog/ulid v1.3.1
|
github.com/oklog/ulid v1.3.1
|
||||||
github.com/pkg/errors v0.8.0
|
github.com/pkg/errors v0.8.0
|
||||||
github.com/prometheus/client_golang v1.0.0
|
github.com/prometheus/client_golang v1.0.0
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -30,6 +30,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
|
||||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||||
|
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
|
||||||
|
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
|
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
|
||||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||||
|
|
592
head_test.go
592
head_test.go
|
@ -14,6 +14,7 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
@ -96,70 +97,73 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead_ReadWAL(t *testing.T) {
|
func TestHead_ReadWAL(t *testing.T) {
|
||||||
entries := []interface{}{
|
for _, compress := range []bool{false, true} {
|
||||||
[]RefSeries{
|
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
||||||
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
entries := []interface{}{
|
||||||
{Ref: 11, Labels: labels.FromStrings("a", "2")},
|
[]RefSeries{
|
||||||
{Ref: 100, Labels: labels.FromStrings("a", "3")},
|
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
||||||
},
|
{Ref: 11, Labels: labels.FromStrings("a", "2")},
|
||||||
[]RefSample{
|
{Ref: 100, Labels: labels.FromStrings("a", "3")},
|
||||||
{Ref: 0, T: 99, V: 1},
|
},
|
||||||
{Ref: 10, T: 100, V: 2},
|
[]RefSample{
|
||||||
{Ref: 100, T: 100, V: 3},
|
{Ref: 0, T: 99, V: 1},
|
||||||
},
|
{Ref: 10, T: 100, V: 2},
|
||||||
[]RefSeries{
|
{Ref: 100, T: 100, V: 3},
|
||||||
{Ref: 50, Labels: labels.FromStrings("a", "4")},
|
},
|
||||||
// This series has two refs pointing to it.
|
[]RefSeries{
|
||||||
{Ref: 101, Labels: labels.FromStrings("a", "3")},
|
{Ref: 50, Labels: labels.FromStrings("a", "4")},
|
||||||
},
|
// This series has two refs pointing to it.
|
||||||
[]RefSample{
|
{Ref: 101, Labels: labels.FromStrings("a", "3")},
|
||||||
{Ref: 10, T: 101, V: 5},
|
},
|
||||||
{Ref: 50, T: 101, V: 6},
|
[]RefSample{
|
||||||
{Ref: 101, T: 101, V: 7},
|
{Ref: 10, T: 101, V: 5},
|
||||||
},
|
{Ref: 50, T: 101, V: 6},
|
||||||
[]Stone{
|
{Ref: 101, T: 101, V: 7},
|
||||||
{ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}},
|
},
|
||||||
},
|
[]Stone{
|
||||||
|
{ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
dir, err := ioutil.TempDir("", "test_read_wal")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer func() {
|
||||||
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
|
w, err := wal.New(nil, nil, dir, compress)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer w.Close()
|
||||||
|
populateTestWAL(t, w, entries)
|
||||||
|
|
||||||
|
head, err := NewHead(nil, nil, w, 1000)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
testutil.Ok(t, head.Init(math.MinInt64))
|
||||||
|
testutil.Equals(t, uint64(101), head.lastSeriesID)
|
||||||
|
|
||||||
|
s10 := head.series.getByID(10)
|
||||||
|
s11 := head.series.getByID(11)
|
||||||
|
s50 := head.series.getByID(50)
|
||||||
|
s100 := head.series.getByID(100)
|
||||||
|
|
||||||
|
testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset)
|
||||||
|
testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init().
|
||||||
|
testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset)
|
||||||
|
testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset)
|
||||||
|
|
||||||
|
expandChunk := func(c chunkenc.Iterator) (x []sample) {
|
||||||
|
for c.Next() {
|
||||||
|
t, v := c.At()
|
||||||
|
x = append(x, sample{t: t, v: v})
|
||||||
|
}
|
||||||
|
testutil.Ok(t, c.Err())
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
|
||||||
|
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
|
||||||
|
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0)))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
dir, err := ioutil.TempDir("", "test_read_wal")
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer func() {
|
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
|
||||||
}()
|
|
||||||
|
|
||||||
w, err := wal.New(nil, nil, dir)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer w.Close()
|
|
||||||
populateTestWAL(t, w, entries)
|
|
||||||
|
|
||||||
head, err := NewHead(nil, nil, w, 1000)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
|
|
||||||
testutil.Ok(t, head.Init(math.MinInt64))
|
|
||||||
testutil.Equals(t, uint64(101), head.lastSeriesID)
|
|
||||||
|
|
||||||
s10 := head.series.getByID(10)
|
|
||||||
s11 := head.series.getByID(11)
|
|
||||||
s50 := head.series.getByID(50)
|
|
||||||
s100 := head.series.getByID(100)
|
|
||||||
|
|
||||||
testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset)
|
|
||||||
testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init().
|
|
||||||
testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset)
|
|
||||||
testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset)
|
|
||||||
|
|
||||||
expandChunk := func(c chunkenc.Iterator) (x []sample) {
|
|
||||||
for c.Next() {
|
|
||||||
t, v := c.At()
|
|
||||||
x = append(x, sample{t: t, v: v})
|
|
||||||
}
|
|
||||||
testutil.Ok(t, c.Err())
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
|
|
||||||
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
|
|
||||||
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
|
|
||||||
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead_WALMultiRef(t *testing.T) {
|
func TestHead_WALMultiRef(t *testing.T) {
|
||||||
|
@ -169,7 +173,7 @@ func TestHead_WALMultiRef(t *testing.T) {
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
w, err := wal.New(nil, nil, dir)
|
w, err := wal.New(nil, nil, dir, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
head, err := NewHead(nil, nil, w, 1000)
|
head, err := NewHead(nil, nil, w, 1000)
|
||||||
|
@ -193,7 +197,7 @@ func TestHead_WALMultiRef(t *testing.T) {
|
||||||
}
|
}
|
||||||
testutil.Ok(t, head.Close())
|
testutil.Ok(t, head.Close())
|
||||||
|
|
||||||
w, err = wal.New(nil, nil, dir)
|
w, err = wal.New(nil, nil, dir, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
head, err = NewHead(nil, nil, w, 1000)
|
head, err = NewHead(nil, nil, w, 1000)
|
||||||
|
@ -319,36 +323,40 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
|
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
|
||||||
entries := []interface{}{
|
for _, compress := range []bool{false, true} {
|
||||||
[]RefSeries{
|
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
||||||
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
entries := []interface{}{
|
||||||
},
|
[]RefSeries{
|
||||||
[]RefSample{},
|
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
||||||
[]RefSeries{
|
},
|
||||||
{Ref: 50, Labels: labels.FromStrings("a", "2")},
|
[]RefSample{},
|
||||||
},
|
[]RefSeries{
|
||||||
[]RefSample{
|
{Ref: 50, Labels: labels.FromStrings("a", "2")},
|
||||||
{Ref: 50, T: 80, V: 1},
|
},
|
||||||
{Ref: 50, T: 90, V: 1},
|
[]RefSample{
|
||||||
},
|
{Ref: 50, T: 80, V: 1},
|
||||||
|
{Ref: 50, T: 90, V: 1},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
dir, err := ioutil.TempDir("", "test_delete_series")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer func() {
|
||||||
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
|
w, err := wal.New(nil, nil, dir, compress)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer w.Close()
|
||||||
|
populateTestWAL(t, w, entries)
|
||||||
|
|
||||||
|
head, err := NewHead(nil, nil, w, 1000)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
testutil.Ok(t, head.Init(math.MinInt64))
|
||||||
|
|
||||||
|
testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
dir, err := ioutil.TempDir("", "test_delete_series")
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer func() {
|
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
|
||||||
}()
|
|
||||||
|
|
||||||
w, err := wal.New(nil, nil, dir)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer w.Close()
|
|
||||||
populateTestWAL(t, w, entries)
|
|
||||||
|
|
||||||
head, err := NewHead(nil, nil, w, 1000)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
|
|
||||||
testutil.Ok(t, head.Init(math.MinInt64))
|
|
||||||
|
|
||||||
testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadDeleteSimple(t *testing.T) {
|
func TestHeadDeleteSimple(t *testing.T) {
|
||||||
|
@ -388,129 +396,133 @@ func TestHeadDeleteSimple(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
Outer:
|
for _, compress := range []bool{false, true} {
|
||||||
for _, c := range cases {
|
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
||||||
dir, err := ioutil.TempDir("", "test_wal_reload")
|
Outer:
|
||||||
testutil.Ok(t, err)
|
for _, c := range cases {
|
||||||
defer func() {
|
dir, err := ioutil.TempDir("", "test_wal_reload")
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
|
||||||
}()
|
|
||||||
|
|
||||||
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer w.Close()
|
|
||||||
|
|
||||||
head, err := NewHead(nil, nil, w, 1000)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer head.Close()
|
|
||||||
|
|
||||||
app := head.Appender()
|
|
||||||
for _, smpl := range smplsAll {
|
|
||||||
_, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
|
|
||||||
}
|
|
||||||
testutil.Ok(t, app.Commit())
|
|
||||||
|
|
||||||
// Delete the ranges.
|
|
||||||
for _, r := range c.dranges {
|
|
||||||
testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compare the samples for both heads - before and after the reload.
|
|
||||||
reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload.
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer reloadedW.Close()
|
|
||||||
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer reloadedHead.Close()
|
|
||||||
testutil.Ok(t, reloadedHead.Init(0))
|
|
||||||
for _, h := range []*Head{head, reloadedHead} {
|
|
||||||
indexr, err := h.Index()
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
// Use an emptyTombstoneReader explicitly to get all the samples.
|
|
||||||
css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
|
|
||||||
// Getting the actual samples.
|
|
||||||
actSamples := make([]sample, 0)
|
|
||||||
for css.Next() {
|
|
||||||
lblsAct, chkMetas, intv := css.At()
|
|
||||||
testutil.Equals(t, labels.Labels{lblDefault}, lblsAct)
|
|
||||||
testutil.Equals(t, 0, len(intv))
|
|
||||||
|
|
||||||
chunkr, err := h.Chunks()
|
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
for _, meta := range chkMetas {
|
defer func() {
|
||||||
chk, err := chunkr.Chunk(meta.Ref)
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
|
w, err := wal.New(nil, nil, path.Join(dir, "wal"), compress)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
head, err := NewHead(nil, nil, w, 1000)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer head.Close()
|
||||||
|
|
||||||
|
app := head.Appender()
|
||||||
|
for _, smpl := range smplsAll {
|
||||||
|
_, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
ii := chk.Iterator()
|
|
||||||
for ii.Next() {
|
}
|
||||||
t, v := ii.At()
|
testutil.Ok(t, app.Commit())
|
||||||
actSamples = append(actSamples, sample{t: t, v: v})
|
|
||||||
|
// Delete the ranges.
|
||||||
|
for _, r := range c.dranges {
|
||||||
|
testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compare the samples for both heads - before and after the reload.
|
||||||
|
reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload.
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer reloadedW.Close()
|
||||||
|
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer reloadedHead.Close()
|
||||||
|
testutil.Ok(t, reloadedHead.Init(0))
|
||||||
|
for _, h := range []*Head{head, reloadedHead} {
|
||||||
|
indexr, err := h.Index()
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
// Use an emptyTombstoneReader explicitly to get all the samples.
|
||||||
|
css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
// Getting the actual samples.
|
||||||
|
actSamples := make([]sample, 0)
|
||||||
|
for css.Next() {
|
||||||
|
lblsAct, chkMetas, intv := css.At()
|
||||||
|
testutil.Equals(t, labels.Labels{lblDefault}, lblsAct)
|
||||||
|
testutil.Equals(t, 0, len(intv))
|
||||||
|
|
||||||
|
chunkr, err := h.Chunks()
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
for _, meta := range chkMetas {
|
||||||
|
chk, err := chunkr.Chunk(meta.Ref)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
ii := chk.Iterator()
|
||||||
|
for ii.Next() {
|
||||||
|
t, v := ii.At()
|
||||||
|
actSamples = append(actSamples, sample{t: t, v: v})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
testutil.Ok(t, css.Err())
|
||||||
|
testutil.Equals(t, c.smplsExp, actSamples)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compare the query results for both heads - before and after the reload.
|
||||||
|
expSeriesSet := newMockSeriesSet([]Series{
|
||||||
|
newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample {
|
||||||
|
ss := make([]tsdbutil.Sample, 0, len(c.smplsExp))
|
||||||
|
for _, s := range c.smplsExp {
|
||||||
|
ss = append(ss, s)
|
||||||
|
}
|
||||||
|
return ss
|
||||||
|
}(),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
for _, h := range []*Head{head, reloadedHead} {
|
||||||
|
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
actSeriesSet, err := q.Select(labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
lns, err := q.LabelNames()
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
lvs, err := q.LabelValues(lblDefault.Name)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
// When all samples are deleted we expect that no labels should exist either.
|
||||||
|
if len(c.smplsExp) == 0 {
|
||||||
|
testutil.Equals(t, 0, len(lns))
|
||||||
|
testutil.Equals(t, 0, len(lvs))
|
||||||
|
testutil.Assert(t, actSeriesSet.Next() == false, "")
|
||||||
|
testutil.Ok(t, h.Close())
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
testutil.Equals(t, 1, len(lns))
|
||||||
|
testutil.Equals(t, 1, len(lvs))
|
||||||
|
testutil.Equals(t, lblDefault.Name, lns[0])
|
||||||
|
testutil.Equals(t, lblDefault.Value, lvs[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
eok, rok := expSeriesSet.Next(), actSeriesSet.Next()
|
||||||
|
testutil.Equals(t, eok, rok)
|
||||||
|
|
||||||
|
if !eok {
|
||||||
|
testutil.Ok(t, h.Close())
|
||||||
|
continue Outer
|
||||||
|
}
|
||||||
|
expSeries := expSeriesSet.At()
|
||||||
|
actSeries := actSeriesSet.At()
|
||||||
|
|
||||||
|
testutil.Equals(t, expSeries.Labels(), actSeries.Labels())
|
||||||
|
|
||||||
|
smplExp, errExp := expandSeriesIterator(expSeries.Iterator())
|
||||||
|
smplRes, errRes := expandSeriesIterator(actSeries.Iterator())
|
||||||
|
|
||||||
|
testutil.Equals(t, errExp, errRes)
|
||||||
|
testutil.Equals(t, smplExp, smplRes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
testutil.Ok(t, css.Err())
|
|
||||||
testutil.Equals(t, c.smplsExp, actSamples)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compare the query results for both heads - before and after the reload.
|
|
||||||
expSeriesSet := newMockSeriesSet([]Series{
|
|
||||||
newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample {
|
|
||||||
ss := make([]tsdbutil.Sample, 0, len(c.smplsExp))
|
|
||||||
for _, s := range c.smplsExp {
|
|
||||||
ss = append(ss, s)
|
|
||||||
}
|
|
||||||
return ss
|
|
||||||
}(),
|
|
||||||
),
|
|
||||||
})
|
})
|
||||||
for _, h := range []*Head{head, reloadedHead} {
|
|
||||||
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
actSeriesSet, err := q.Select(labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
|
|
||||||
lns, err := q.LabelNames()
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
lvs, err := q.LabelValues(lblDefault.Name)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
// When all samples are deleted we expect that no labels should exist either.
|
|
||||||
if len(c.smplsExp) == 0 {
|
|
||||||
testutil.Equals(t, 0, len(lns))
|
|
||||||
testutil.Equals(t, 0, len(lvs))
|
|
||||||
testutil.Assert(t, actSeriesSet.Next() == false, "")
|
|
||||||
testutil.Ok(t, h.Close())
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
testutil.Equals(t, 1, len(lns))
|
|
||||||
testutil.Equals(t, 1, len(lvs))
|
|
||||||
testutil.Equals(t, lblDefault.Name, lns[0])
|
|
||||||
testutil.Equals(t, lblDefault.Value, lvs[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
eok, rok := expSeriesSet.Next(), actSeriesSet.Next()
|
|
||||||
testutil.Equals(t, eok, rok)
|
|
||||||
|
|
||||||
if !eok {
|
|
||||||
testutil.Ok(t, h.Close())
|
|
||||||
continue Outer
|
|
||||||
}
|
|
||||||
expSeries := expSeriesSet.At()
|
|
||||||
actSeries := actSeriesSet.At()
|
|
||||||
|
|
||||||
testutil.Equals(t, expSeries.Labels(), actSeries.Labels())
|
|
||||||
|
|
||||||
smplExp, errExp := expandSeriesIterator(expSeries.Iterator())
|
|
||||||
smplRes, errRes := expandSeriesIterator(actSeries.Iterator())
|
|
||||||
|
|
||||||
testutil.Equals(t, errExp, errRes)
|
|
||||||
testutil.Equals(t, smplExp, smplRes)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -559,7 +571,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
wlog, err := wal.NewSize(nil, nil, dir, 32768)
|
wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
// Enough samples to cause a checkpoint.
|
// Enough samples to cause a checkpoint.
|
||||||
|
@ -1019,30 +1031,34 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead_LogRollback(t *testing.T) {
|
func TestHead_LogRollback(t *testing.T) {
|
||||||
dir, err := ioutil.TempDir("", "wal_rollback")
|
for _, compress := range []bool{false, true} {
|
||||||
testutil.Ok(t, err)
|
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
||||||
defer func() {
|
dir, err := ioutil.TempDir("", "wal_rollback")
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
testutil.Ok(t, err)
|
||||||
}()
|
defer func() {
|
||||||
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
w, err := wal.New(nil, nil, dir)
|
w, err := wal.New(nil, nil, dir, compress)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
h, err := NewHead(nil, nil, w, 1000)
|
h, err := NewHead(nil, nil, w, 1000)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
app := h.Appender()
|
app := h.Appender()
|
||||||
_, err = app.Add(labels.FromStrings("a", "b"), 1, 2)
|
_, err = app.Add(labels.FromStrings("a", "b"), 1, 2)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
testutil.Ok(t, app.Rollback())
|
testutil.Ok(t, app.Rollback())
|
||||||
recs := readTestWAL(t, w.Dir())
|
recs := readTestWAL(t, w.Dir())
|
||||||
|
|
||||||
testutil.Equals(t, 1, len(recs))
|
testutil.Equals(t, 1, len(recs))
|
||||||
|
|
||||||
series, ok := recs[0].([]RefSeries)
|
series, ok := recs[0].([]RefSeries)
|
||||||
testutil.Assert(t, ok, "expected series record but got %+v", recs[0])
|
testutil.Assert(t, ok, "expected series record but got %+v", recs[0])
|
||||||
testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series)
|
testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestWalRepair_DecodingError ensures that a repair is run for an error
|
// TestWalRepair_DecodingError ensures that a repair is run for an error
|
||||||
|
@ -1057,8 +1073,11 @@ func TestWalRepair_DecodingError(t *testing.T) {
|
||||||
}{
|
}{
|
||||||
"invalid_record": {
|
"invalid_record": {
|
||||||
func(rec []byte) []byte {
|
func(rec []byte) []byte {
|
||||||
rec[0] = byte(RecordInvalid)
|
// Do not modify the base record because it is Logged multiple times.
|
||||||
return rec
|
res := make([]byte, len(rec))
|
||||||
|
copy(res, rec)
|
||||||
|
res[0] = byte(RecordInvalid)
|
||||||
|
return res
|
||||||
},
|
},
|
||||||
enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}),
|
enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}),
|
||||||
9,
|
9,
|
||||||
|
@ -1089,65 +1108,66 @@ func TestWalRepair_DecodingError(t *testing.T) {
|
||||||
5,
|
5,
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
t.Run(name, func(t *testing.T) {
|
for _, compress := range []bool{false, true} {
|
||||||
dir, err := ioutil.TempDir("", "wal_repair")
|
t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
dir, err := ioutil.TempDir("", "wal_repair")
|
||||||
defer func() {
|
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Fill the wal and corrupt it.
|
|
||||||
{
|
|
||||||
w, err := wal.New(nil, nil, filepath.Join(dir, "wal"))
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
|
|
||||||
for i := 1; i <= test.totalRecs; i++ {
|
|
||||||
// At this point insert a corrupted record.
|
|
||||||
if i-1 == test.expRecs {
|
|
||||||
testutil.Ok(t, w.Log(test.corrFunc(test.rec)))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
testutil.Ok(t, w.Log(test.rec))
|
|
||||||
}
|
|
||||||
|
|
||||||
h, err := NewHead(nil, nil, w, 1)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
|
|
||||||
initErr := h.Init(math.MinInt64)
|
|
||||||
|
|
||||||
err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
|
|
||||||
_, corrErr := err.(*wal.CorruptionErr)
|
|
||||||
testutil.Assert(t, corrErr, "reading the wal didn't return corruption error")
|
|
||||||
testutil.Ok(t, w.Close())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open the db to trigger a repair.
|
|
||||||
{
|
|
||||||
db, err := Open(dir, nil, nil, DefaultOptions)
|
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer func() {
|
defer func() {
|
||||||
testutil.Ok(t, db.Close())
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read the wal content after the repair.
|
// Fill the wal and corrupt it.
|
||||||
{
|
{
|
||||||
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal"))
|
w, err := wal.New(nil, nil, filepath.Join(dir, "wal"), compress)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer sr.Close()
|
|
||||||
r := wal.NewReader(sr)
|
|
||||||
|
|
||||||
var actRec int
|
for i := 1; i <= test.totalRecs; i++ {
|
||||||
for r.Next() {
|
// At this point insert a corrupted record.
|
||||||
actRec++
|
if i-1 == test.expRecs {
|
||||||
|
testutil.Ok(t, w.Log(test.corrFunc(test.rec)))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
testutil.Ok(t, w.Log(test.rec))
|
||||||
|
}
|
||||||
|
|
||||||
|
h, err := NewHead(nil, nil, w, 1)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
|
||||||
|
initErr := h.Init(math.MinInt64)
|
||||||
|
|
||||||
|
err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
|
||||||
|
_, corrErr := err.(*wal.CorruptionErr)
|
||||||
|
testutil.Assert(t, corrErr, "reading the wal didn't return corruption error")
|
||||||
|
testutil.Ok(t, w.Close())
|
||||||
}
|
}
|
||||||
testutil.Ok(t, r.Err())
|
|
||||||
testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Open the db to trigger a repair.
|
||||||
|
{
|
||||||
|
db, err := Open(dir, nil, nil, DefaultOptions)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer func() {
|
||||||
|
testutil.Ok(t, db.Close())
|
||||||
|
}()
|
||||||
|
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the wal content after the repair.
|
||||||
|
{
|
||||||
|
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal"))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer sr.Close()
|
||||||
|
r := wal.NewReader(sr)
|
||||||
|
|
||||||
|
var actRec int
|
||||||
|
for r.Next() {
|
||||||
|
actRec++
|
||||||
|
}
|
||||||
|
testutil.Ok(t, r.Err())
|
||||||
|
testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewWalSegmentOnTruncate(t *testing.T) {
|
func TestNewWalSegmentOnTruncate(t *testing.T) {
|
||||||
|
@ -1156,7 +1176,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
wlog, err := wal.NewSize(nil, nil, dir, 32768)
|
wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
h, err := NewHead(nil, nil, wlog, 1000)
|
h, err := NewHead(nil, nil, wlog, 1000)
|
||||||
|
|
|
@ -16,6 +16,7 @@ package testutil
|
||||||
import (
|
import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -127,3 +128,18 @@ func NewTemporaryDirectory(name string, t T) (handler TemporaryDirectory) {
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DirSize returns the size in bytes of all files in a directory.
|
||||||
|
func DirSize(path string) (int64, error) {
|
||||||
|
var size int64
|
||||||
|
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !info.IsDir() {
|
||||||
|
size += info.Size()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return size, err
|
||||||
|
}
|
||||||
|
|
2
wal.go
2
wal.go
|
@ -1246,7 +1246,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
|
||||||
if err := os.RemoveAll(tmpdir); err != nil {
|
if err := os.RemoveAll(tmpdir); err != nil {
|
||||||
return errors.Wrap(err, "cleanup replacement dir")
|
return errors.Wrap(err, "cleanup replacement dir")
|
||||||
}
|
}
|
||||||
repl, err := wal.New(logger, nil, tmpdir)
|
repl, err := wal.New(logger, nil, tmpdir, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "open new WAL")
|
return errors.Wrap(err, "open new WAL")
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
|
"github.com/golang/snappy"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
@ -57,6 +58,7 @@ type LiveReader struct {
|
||||||
rdr io.Reader
|
rdr io.Reader
|
||||||
err error
|
err error
|
||||||
rec []byte
|
rec []byte
|
||||||
|
snappyBuf []byte
|
||||||
hdr [recordHeaderSize]byte
|
hdr [recordHeaderSize]byte
|
||||||
buf [pageSize]byte
|
buf [pageSize]byte
|
||||||
readIndex int // Index in buf to start at for next read.
|
readIndex int // Index in buf to start at for next read.
|
||||||
|
@ -171,11 +173,18 @@ func (r *LiveReader) buildRecord() (bool, error) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
rt := recType(r.hdr[0])
|
rt := recTypeFromHeader(r.hdr[0])
|
||||||
if rt == recFirst || rt == recFull {
|
if rt == recFirst || rt == recFull {
|
||||||
r.rec = r.rec[:0]
|
r.rec = r.rec[:0]
|
||||||
|
r.snappyBuf = r.snappyBuf[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
compressed := r.hdr[0]&snappyMask != 0
|
||||||
|
if compressed {
|
||||||
|
r.snappyBuf = append(r.snappyBuf, temp...)
|
||||||
|
} else {
|
||||||
|
r.rec = append(r.rec, temp...)
|
||||||
}
|
}
|
||||||
r.rec = append(r.rec, temp...)
|
|
||||||
|
|
||||||
if err := validateRecord(rt, r.index); err != nil {
|
if err := validateRecord(rt, r.index); err != nil {
|
||||||
r.index = 0
|
r.index = 0
|
||||||
|
@ -183,6 +192,16 @@ func (r *LiveReader) buildRecord() (bool, error) {
|
||||||
}
|
}
|
||||||
if rt == recLast || rt == recFull {
|
if rt == recLast || rt == recFull {
|
||||||
r.index = 0
|
r.index = 0
|
||||||
|
if compressed && len(r.snappyBuf) > 0 {
|
||||||
|
// The snappy library uses `len` to calculate if we need a new buffer.
|
||||||
|
// In order to allocate as few buffers as possible make the length
|
||||||
|
// equal to the capacity.
|
||||||
|
r.rec = r.rec[:cap(r.rec)]
|
||||||
|
r.rec, err = snappy.Decode(r.rec, r.snappyBuf)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
// Only increment i for non-zero records since we use it
|
// Only increment i for non-zero records since we use it
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
"github.com/golang/snappy"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -27,6 +28,7 @@ type Reader struct {
|
||||||
rdr io.Reader
|
rdr io.Reader
|
||||||
err error
|
err error
|
||||||
rec []byte
|
rec []byte
|
||||||
|
snappyBuf []byte
|
||||||
buf [pageSize]byte
|
buf [pageSize]byte
|
||||||
total int64 // Total bytes processed.
|
total int64 // Total bytes processed.
|
||||||
curRecTyp recType // Used for checking that the last record is not torn.
|
curRecTyp recType // Used for checking that the last record is not torn.
|
||||||
|
@ -45,7 +47,7 @@ func (r *Reader) Next() bool {
|
||||||
// The last WAL segment record shouldn't be torn(should be full or last).
|
// The last WAL segment record shouldn't be torn(should be full or last).
|
||||||
// The last record would be torn after a crash just before
|
// The last record would be torn after a crash just before
|
||||||
// the last record part could be persisted to disk.
|
// the last record part could be persisted to disk.
|
||||||
if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle {
|
if r.curRecTyp == recFirst || r.curRecTyp == recMiddle {
|
||||||
r.err = errors.New("last record is torn")
|
r.err = errors.New("last record is torn")
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
@ -61,6 +63,7 @@ func (r *Reader) next() (err error) {
|
||||||
buf := r.buf[recordHeaderSize:]
|
buf := r.buf[recordHeaderSize:]
|
||||||
|
|
||||||
r.rec = r.rec[:0]
|
r.rec = r.rec[:0]
|
||||||
|
r.snappyBuf = r.snappyBuf[:0]
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
for {
|
for {
|
||||||
|
@ -68,7 +71,8 @@ func (r *Reader) next() (err error) {
|
||||||
return errors.Wrap(err, "read first header byte")
|
return errors.Wrap(err, "read first header byte")
|
||||||
}
|
}
|
||||||
r.total++
|
r.total++
|
||||||
r.curRecTyp = recType(hdr[0])
|
r.curRecTyp = recTypeFromHeader(hdr[0])
|
||||||
|
compressed := hdr[0]&snappyMask != 0
|
||||||
|
|
||||||
// Gobble up zero bytes.
|
// Gobble up zero bytes.
|
||||||
if r.curRecTyp == recPageTerm {
|
if r.curRecTyp == recPageTerm {
|
||||||
|
@ -123,12 +127,25 @@ func (r *Reader) next() (err error) {
|
||||||
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
|
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
|
||||||
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
|
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
|
||||||
}
|
}
|
||||||
r.rec = append(r.rec, buf[:length]...)
|
|
||||||
|
if compressed {
|
||||||
|
r.snappyBuf = append(r.snappyBuf, buf[:length]...)
|
||||||
|
} else {
|
||||||
|
r.rec = append(r.rec, buf[:length]...)
|
||||||
|
}
|
||||||
|
|
||||||
if err := validateRecord(r.curRecTyp, i); err != nil {
|
if err := validateRecord(r.curRecTyp, i); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if r.curRecTyp == recLast || r.curRecTyp == recFull {
|
if r.curRecTyp == recLast || r.curRecTyp == recFull {
|
||||||
|
if compressed && len(r.snappyBuf) > 0 {
|
||||||
|
// The snappy library uses `len` to calculate if we need a new buffer.
|
||||||
|
// In order to allocate as few buffers as possible make the length
|
||||||
|
// equal to the capacity.
|
||||||
|
r.rec = r.rec[:cap(r.rec)]
|
||||||
|
r.rec, err = snappy.Decode(r.rec, r.snappyBuf)
|
||||||
|
return err
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -310,118 +310,124 @@ func allSegments(dir string) (io.ReadCloser, error) {
|
||||||
|
|
||||||
func TestReaderFuzz(t *testing.T) {
|
func TestReaderFuzz(t *testing.T) {
|
||||||
for name, fn := range readerConstructors {
|
for name, fn := range readerConstructors {
|
||||||
t.Run(name, func(t *testing.T) {
|
for _, compress := range []bool{false, true} {
|
||||||
|
t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) {
|
||||||
|
dir, err := ioutil.TempDir("", "wal_fuzz_live")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer func() {
|
||||||
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
|
w, err := NewSize(nil, nil, dir, 128*pageSize, compress)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
// Buffering required as we're not reading concurrently.
|
||||||
|
input := make(chan []byte, fuzzLen)
|
||||||
|
err = generateRandomEntries(w, input)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
close(input)
|
||||||
|
|
||||||
|
err = w.Close()
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
sr, err := allSegments(w.Dir())
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer sr.Close()
|
||||||
|
|
||||||
|
reader := fn(sr)
|
||||||
|
for expected := range input {
|
||||||
|
testutil.Assert(t, reader.Next(), "expected record: %v", reader.Err())
|
||||||
|
testutil.Equals(t, expected, reader.Record(), "read wrong record")
|
||||||
|
}
|
||||||
|
testutil.Assert(t, !reader.Next(), "unexpected record")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReaderFuzz_Live(t *testing.T) {
|
||||||
|
logger := testutil.NewLogger(t)
|
||||||
|
for _, compress := range []bool{false, true} {
|
||||||
|
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
|
||||||
dir, err := ioutil.TempDir("", "wal_fuzz_live")
|
dir, err := ioutil.TempDir("", "wal_fuzz_live")
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer func() {
|
defer func() {
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
w, err := NewSize(nil, nil, dir, 128*pageSize)
|
w, err := NewSize(nil, nil, dir, 128*pageSize, compress)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
// In the background, generate a stream of random records and write them
|
||||||
|
// to the WAL.
|
||||||
|
input := make(chan []byte, fuzzLen/10) // buffering required as we sometimes batch WAL writes.
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
err := generateRandomEntries(w, input)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Tail the WAL and compare the results.
|
||||||
|
m, _, err := w.Segments()
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
// Buffering required as we're not reading concurrently.
|
seg, err := OpenReadSegment(SegmentName(dir, m))
|
||||||
input := make(chan []byte, fuzzLen)
|
|
||||||
err = generateRandomEntries(w, input)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
close(input)
|
|
||||||
|
|
||||||
err = w.Close()
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
|
|
||||||
sr, err := allSegments(w.Dir())
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer sr.Close()
|
|
||||||
|
|
||||||
reader := fn(sr)
|
|
||||||
for expected := range input {
|
|
||||||
testutil.Assert(t, reader.Next(), "expected record: %v", reader.Err())
|
|
||||||
testutil.Equals(t, expected, reader.Record(), "read wrong record")
|
|
||||||
}
|
|
||||||
testutil.Assert(t, !reader.Next(), "unexpected record")
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReaderFuzz_Live(t *testing.T) {
|
|
||||||
logger := testutil.NewLogger(t)
|
|
||||||
dir, err := ioutil.TempDir("", "wal_fuzz_live")
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer func() {
|
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
|
||||||
}()
|
|
||||||
|
|
||||||
w, err := NewSize(nil, nil, dir, 128*pageSize)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer w.Close()
|
|
||||||
|
|
||||||
// In the background, generate a stream of random records and write them
|
|
||||||
// to the WAL.
|
|
||||||
input := make(chan []byte, fuzzLen/10) // buffering required as we sometimes batch WAL writes.
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
err := generateRandomEntries(w, input)
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
close(done)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Tail the WAL and compare the results.
|
|
||||||
m, _, err := w.Segments()
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
|
|
||||||
seg, err := OpenReadSegment(SegmentName(dir, m))
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
defer seg.Close()
|
|
||||||
|
|
||||||
r := NewLiveReader(logger, nil, seg)
|
|
||||||
segmentTicker := time.NewTicker(100 * time.Millisecond)
|
|
||||||
readTicker := time.NewTicker(10 * time.Millisecond)
|
|
||||||
|
|
||||||
readSegment := func(r *LiveReader) bool {
|
|
||||||
for r.Next() {
|
|
||||||
rec := r.Record()
|
|
||||||
expected, ok := <-input
|
|
||||||
testutil.Assert(t, ok, "unexpected record")
|
|
||||||
testutil.Equals(t, expected, rec, "record does not match expected")
|
|
||||||
}
|
|
||||||
testutil.Assert(t, r.Err() == io.EOF, "expected EOF, got: %v", r.Err())
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
outer:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-segmentTicker.C:
|
|
||||||
// check if new segments exist
|
|
||||||
_, last, err := w.Segments()
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
if last <= seg.i {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// read to end of segment.
|
|
||||||
readSegment(r)
|
|
||||||
|
|
||||||
fi, err := os.Stat(SegmentName(dir, seg.i))
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
testutil.Assert(t, r.Offset() == fi.Size(), "expected to have read whole segment, but read %d of %d", r.Offset(), fi.Size())
|
|
||||||
|
|
||||||
seg, err = OpenReadSegment(SegmentName(dir, seg.i+1))
|
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer seg.Close()
|
defer seg.Close()
|
||||||
r = NewLiveReader(logger, nil, seg)
|
|
||||||
|
|
||||||
case <-readTicker.C:
|
r := NewLiveReader(logger, nil, seg)
|
||||||
readSegment(r)
|
segmentTicker := time.NewTicker(100 * time.Millisecond)
|
||||||
|
readTicker := time.NewTicker(10 * time.Millisecond)
|
||||||
|
|
||||||
case <-done:
|
readSegment := func(r *LiveReader) bool {
|
||||||
readSegment(r)
|
for r.Next() {
|
||||||
break outer
|
rec := r.Record()
|
||||||
}
|
expected, ok := <-input
|
||||||
|
testutil.Assert(t, ok, "unexpected record")
|
||||||
|
testutil.Equals(t, expected, rec, "record does not match expected")
|
||||||
|
}
|
||||||
|
testutil.Assert(t, r.Err() == io.EOF, "expected EOF, got: %v", r.Err())
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
outer:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-segmentTicker.C:
|
||||||
|
// check if new segments exist
|
||||||
|
_, last, err := w.Segments()
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
if last <= seg.i {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// read to end of segment.
|
||||||
|
readSegment(r)
|
||||||
|
|
||||||
|
fi, err := os.Stat(SegmentName(dir, seg.i))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Assert(t, r.Offset() == fi.Size(), "expected to have read whole segment, but read %d of %d", r.Offset(), fi.Size())
|
||||||
|
|
||||||
|
seg, err = OpenReadSegment(SegmentName(dir, seg.i+1))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer seg.Close()
|
||||||
|
r = NewLiveReader(logger, nil, seg)
|
||||||
|
|
||||||
|
case <-readTicker.C:
|
||||||
|
readSegment(r)
|
||||||
|
|
||||||
|
case <-done:
|
||||||
|
readSegment(r)
|
||||||
|
break outer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
testutil.Assert(t, r.Err() == io.EOF, "expected EOF")
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
testutil.Assert(t, r.Err() == io.EOF, "expected EOF")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
|
func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
|
||||||
|
@ -434,7 +440,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
w, err := NewSize(nil, nil, dir, pageSize)
|
w, err := NewSize(nil, nil, dir, pageSize, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
rec := make([]byte, pageSize-recordHeaderSize)
|
rec := make([]byte, pageSize-recordHeaderSize)
|
||||||
|
@ -478,7 +484,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
w, err := NewSize(nil, nil, dir, pageSize*2)
|
w, err := NewSize(nil, nil, dir, pageSize*2, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
rec := make([]byte, pageSize-recordHeaderSize)
|
rec := make([]byte, pageSize-recordHeaderSize)
|
||||||
|
@ -525,7 +531,7 @@ func TestReaderData(t *testing.T) {
|
||||||
|
|
||||||
for name, fn := range readerConstructors {
|
for name, fn := range readerConstructors {
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
w, err := New(nil, nil, dir)
|
w, err := New(nil, nil, dir, true)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
sr, err := allSegments(dir)
|
sr, err := allSegments(dir)
|
||||||
|
|
43
wal/wal.go
43
wal/wal.go
|
@ -29,6 +29,7 @@ import (
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
|
"github.com/golang/snappy"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/tsdb/fileutil"
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
|
@ -165,6 +166,8 @@ type WAL struct {
|
||||||
stopc chan chan struct{}
|
stopc chan chan struct{}
|
||||||
actorc chan func()
|
actorc chan func()
|
||||||
closed bool // To allow calling Close() more than once without blocking.
|
closed bool // To allow calling Close() more than once without blocking.
|
||||||
|
compress bool
|
||||||
|
snappyBuf []byte
|
||||||
|
|
||||||
fsyncDuration prometheus.Summary
|
fsyncDuration prometheus.Summary
|
||||||
pageFlushes prometheus.Counter
|
pageFlushes prometheus.Counter
|
||||||
|
@ -175,13 +178,13 @@ type WAL struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new WAL over the given directory.
|
// New returns a new WAL over the given directory.
|
||||||
func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) {
|
func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WAL, error) {
|
||||||
return NewSize(logger, reg, dir, DefaultSegmentSize)
|
return NewSize(logger, reg, dir, DefaultSegmentSize, compress)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSize returns a new WAL over the given directory.
|
// NewSize returns a new WAL over the given directory.
|
||||||
// New segments are created with the specified size.
|
// New segments are created with the specified size.
|
||||||
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) {
|
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WAL, error) {
|
||||||
if segmentSize%pageSize != 0 {
|
if segmentSize%pageSize != 0 {
|
||||||
return nil, errors.New("invalid segment size")
|
return nil, errors.New("invalid segment size")
|
||||||
}
|
}
|
||||||
|
@ -198,6 +201,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
|
||||||
page: &page{},
|
page: &page{},
|
||||||
actorc: make(chan func(), 100),
|
actorc: make(chan func(), 100),
|
||||||
stopc: make(chan chan struct{}),
|
stopc: make(chan chan struct{}),
|
||||||
|
compress: compress,
|
||||||
}
|
}
|
||||||
w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||||||
Name: "prometheus_tsdb_wal_fsync_duration_seconds",
|
Name: "prometheus_tsdb_wal_fsync_duration_seconds",
|
||||||
|
@ -253,6 +257,11 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CompressionEnabled returns if compression is enabled on this WAL.
|
||||||
|
func (w *WAL) CompressionEnabled() bool {
|
||||||
|
return w.compress
|
||||||
|
}
|
||||||
|
|
||||||
// Dir returns the directory of the WAL.
|
// Dir returns the directory of the WAL.
|
||||||
func (w *WAL) Dir() string {
|
func (w *WAL) Dir() string {
|
||||||
return w.dir
|
return w.dir
|
||||||
|
@ -476,6 +485,14 @@ func (w *WAL) flushPage(clear bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First Byte of header format:
|
||||||
|
// [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ]
|
||||||
|
|
||||||
|
const (
|
||||||
|
snappyMask = 1 << 3
|
||||||
|
recTypeMask = snappyMask - 1
|
||||||
|
)
|
||||||
|
|
||||||
type recType uint8
|
type recType uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -486,6 +503,10 @@ const (
|
||||||
recLast recType = 4 // Final fragment of a record.
|
recLast recType = 4 // Final fragment of a record.
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func recTypeFromHeader(header byte) recType {
|
||||||
|
return recType(header & recTypeMask)
|
||||||
|
}
|
||||||
|
|
||||||
func (t recType) String() string {
|
func (t recType) String() string {
|
||||||
switch t {
|
switch t {
|
||||||
case recPageTerm:
|
case recPageTerm:
|
||||||
|
@ -546,6 +567,19 @@ func (w *WAL) log(rec []byte, final bool) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
compressed := false
|
||||||
|
if w.compress && len(rec) > 0 {
|
||||||
|
// The snappy library uses `len` to calculate if we need a new buffer.
|
||||||
|
// In order to allocate as few buffers as possible make the length
|
||||||
|
// equal to the capacity.
|
||||||
|
w.snappyBuf = w.snappyBuf[:cap(w.snappyBuf)]
|
||||||
|
w.snappyBuf = snappy.Encode(w.snappyBuf, rec)
|
||||||
|
if len(w.snappyBuf) < len(rec) {
|
||||||
|
rec = w.snappyBuf
|
||||||
|
compressed = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Populate as many pages as necessary to fit the record.
|
// Populate as many pages as necessary to fit the record.
|
||||||
// Be careful to always do one pass to ensure we write zero-length records.
|
// Be careful to always do one pass to ensure we write zero-length records.
|
||||||
for i := 0; i == 0 || len(rec) > 0; i++ {
|
for i := 0; i == 0 || len(rec) > 0; i++ {
|
||||||
|
@ -569,6 +603,9 @@ func (w *WAL) log(rec []byte, final bool) error {
|
||||||
default:
|
default:
|
||||||
typ = recMiddle
|
typ = recMiddle
|
||||||
}
|
}
|
||||||
|
if compressed {
|
||||||
|
typ |= snappyMask
|
||||||
|
}
|
||||||
|
|
||||||
buf[0] = byte(typ)
|
buf[0] = byte(typ)
|
||||||
crc := crc32.Checksum(part, castagnoliTable)
|
crc := crc32.Checksum(part, castagnoliTable)
|
||||||
|
|
138
wal/wal_test.go
138
wal/wal_test.go
|
@ -120,7 +120,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
|
||||||
// then corrupt a given record in a given segment.
|
// then corrupt a given record in a given segment.
|
||||||
// As a result we want a repaired WAL with given intact records.
|
// As a result we want a repaired WAL with given intact records.
|
||||||
segSize := 3 * pageSize
|
segSize := 3 * pageSize
|
||||||
w, err := NewSize(nil, nil, dir, segSize)
|
w, err := NewSize(nil, nil, dir, segSize, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
var records [][]byte
|
var records [][]byte
|
||||||
|
@ -145,7 +145,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
|
||||||
|
|
||||||
testutil.Ok(t, f.Close())
|
testutil.Ok(t, f.Close())
|
||||||
|
|
||||||
w, err = NewSize(nil, nil, dir, segSize)
|
w, err = NewSize(nil, nil, dir, segSize, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
|
|
||||||
|
@ -223,7 +223,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
|
||||||
// Produce a WAL with a two segments of 3 pages with 3 records each,
|
// Produce a WAL with a two segments of 3 pages with 3 records each,
|
||||||
// so when we truncate the file we're guaranteed to split a record.
|
// so when we truncate the file we're guaranteed to split a record.
|
||||||
{
|
{
|
||||||
w, err := NewSize(logger, nil, dir, segmentSize)
|
w, err := NewSize(logger, nil, dir, segmentSize, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
for i := 0; i < 18; i++ {
|
for i := 0; i < 18; i++ {
|
||||||
|
@ -294,7 +294,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
|
||||||
err = sr.Close()
|
err = sr.Close()
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
w, err := NewSize(logger, nil, dir, segmentSize)
|
w, err := NewSize(logger, nil, dir, segmentSize, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
err = w.Repair(corruptionErr)
|
err = w.Repair(corruptionErr)
|
||||||
|
@ -341,7 +341,7 @@ func TestClose(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
w, err := NewSize(nil, nil, dir, pageSize)
|
w, err := NewSize(nil, nil, dir, pageSize, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
testutil.Ok(t, w.Close())
|
testutil.Ok(t, w.Close())
|
||||||
testutil.NotOk(t, w.Close())
|
testutil.NotOk(t, w.Close())
|
||||||
|
@ -358,7 +358,7 @@ func TestSegmentMetric(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
testutil.Ok(t, os.RemoveAll(dir))
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
w, err := NewSize(nil, nil, dir, segmentSize)
|
w, err := NewSize(nil, nil, dir, segmentSize, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
initialSegment := client_testutil.ToFloat64(w.currentSegment)
|
initialSegment := client_testutil.ToFloat64(w.currentSegment)
|
||||||
|
@ -376,56 +376,104 @@ func TestSegmentMetric(t *testing.T) {
|
||||||
testutil.Ok(t, w.Close())
|
testutil.Ok(t, w.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkWAL_LogBatched(b *testing.B) {
|
func TestCompression(t *testing.T) {
|
||||||
dir, err := ioutil.TempDir("", "bench_logbatch")
|
boostrap := func(compressed bool) string {
|
||||||
testutil.Ok(b, err)
|
const (
|
||||||
|
segmentSize = pageSize
|
||||||
|
recordSize = (pageSize / 2) - recordHeaderSize
|
||||||
|
records = 100
|
||||||
|
)
|
||||||
|
|
||||||
|
dirPath, err := ioutil.TempDir("", fmt.Sprintf("TestCompression_%t", compressed))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
w, err := NewSize(nil, nil, dirPath, segmentSize, compressed)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
buf := make([]byte, recordSize)
|
||||||
|
for i := 0; i < records; i++ {
|
||||||
|
testutil.Ok(t, w.Log(buf))
|
||||||
|
}
|
||||||
|
testutil.Ok(t, w.Close())
|
||||||
|
|
||||||
|
return dirPath
|
||||||
|
}
|
||||||
|
|
||||||
|
dirCompressed := boostrap(true)
|
||||||
defer func() {
|
defer func() {
|
||||||
testutil.Ok(b, os.RemoveAll(dir))
|
testutil.Ok(t, os.RemoveAll(dirCompressed))
|
||||||
|
}()
|
||||||
|
dirUnCompressed := boostrap(false)
|
||||||
|
defer func() {
|
||||||
|
testutil.Ok(t, os.RemoveAll(dirUnCompressed))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
w, err := New(nil, nil, "testdir")
|
uncompressedSize, err := testutil.DirSize(dirUnCompressed)
|
||||||
testutil.Ok(b, err)
|
testutil.Ok(t, err)
|
||||||
defer w.Close()
|
compressedSize, err := testutil.DirSize(dirCompressed)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
var buf [2048]byte
|
testutil.Assert(t, float64(uncompressedSize)*0.75 > float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize)
|
||||||
var recs [][]byte
|
}
|
||||||
b.SetBytes(2048)
|
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
func BenchmarkWAL_LogBatched(b *testing.B) {
|
||||||
recs = append(recs, buf[:])
|
for _, compress := range []bool{true, false} {
|
||||||
if len(recs) < 1000 {
|
b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) {
|
||||||
continue
|
dir, err := ioutil.TempDir("", "bench_logbatch")
|
||||||
}
|
testutil.Ok(b, err)
|
||||||
err := w.Log(recs...)
|
defer func() {
|
||||||
testutil.Ok(b, err)
|
testutil.Ok(b, os.RemoveAll(dir))
|
||||||
recs = recs[:0]
|
}()
|
||||||
|
|
||||||
|
w, err := New(nil, nil, dir, compress)
|
||||||
|
testutil.Ok(b, err)
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
var buf [2048]byte
|
||||||
|
var recs [][]byte
|
||||||
|
b.SetBytes(2048)
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
recs = append(recs, buf[:])
|
||||||
|
if len(recs) < 1000 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err := w.Log(recs...)
|
||||||
|
testutil.Ok(b, err)
|
||||||
|
recs = recs[:0]
|
||||||
|
}
|
||||||
|
// Stop timer to not count fsync time on close.
|
||||||
|
// If it's counted batched vs. single benchmarks are very similar but
|
||||||
|
// do not show burst throughput well.
|
||||||
|
b.StopTimer()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
// Stop timer to not count fsync time on close.
|
|
||||||
// If it's counted batched vs. single benchmarks are very similar but
|
|
||||||
// do not show burst throughput well.
|
|
||||||
b.StopTimer()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkWAL_Log(b *testing.B) {
|
func BenchmarkWAL_Log(b *testing.B) {
|
||||||
dir, err := ioutil.TempDir("", "bench_logsingle")
|
for _, compress := range []bool{true, false} {
|
||||||
testutil.Ok(b, err)
|
b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) {
|
||||||
defer func() {
|
dir, err := ioutil.TempDir("", "bench_logsingle")
|
||||||
testutil.Ok(b, os.RemoveAll(dir))
|
testutil.Ok(b, err)
|
||||||
}()
|
defer func() {
|
||||||
|
testutil.Ok(b, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
w, err := New(nil, nil, "testdir")
|
w, err := New(nil, nil, dir, compress)
|
||||||
testutil.Ok(b, err)
|
testutil.Ok(b, err)
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
|
|
||||||
var buf [2048]byte
|
var buf [2048]byte
|
||||||
b.SetBytes(2048)
|
b.SetBytes(2048)
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
err := w.Log(buf[:])
|
err := w.Log(buf[:])
|
||||||
testutil.Ok(b, err)
|
testutil.Ok(b, err)
|
||||||
|
}
|
||||||
|
// Stop timer to not count fsync time on close.
|
||||||
|
// If it's counted batched vs. single benchmarks are very similar but
|
||||||
|
// do not show burst throughput well.
|
||||||
|
b.StopTimer()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
// Stop timer to not count fsync time on close.
|
|
||||||
// If it's counted batched vs. single benchmarks are very similar but
|
|
||||||
// do not show burst throughput well.
|
|
||||||
b.StopTimer()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -459,7 +459,7 @@ func TestMigrateWAL_Empty(t *testing.T) {
|
||||||
wdir := path.Join(dir, "wal")
|
wdir := path.Join(dir, "wal")
|
||||||
|
|
||||||
// Initialize empty WAL.
|
// Initialize empty WAL.
|
||||||
w, err := wal.New(nil, nil, wdir)
|
w, err := wal.New(nil, nil, wdir, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
testutil.Ok(t, w.Close())
|
testutil.Ok(t, w.Close())
|
||||||
|
|
||||||
|
@ -506,7 +506,7 @@ func TestMigrateWAL_Fuzz(t *testing.T) {
|
||||||
// Perform migration.
|
// Perform migration.
|
||||||
testutil.Ok(t, MigrateWAL(nil, wdir))
|
testutil.Ok(t, MigrateWAL(nil, wdir))
|
||||||
|
|
||||||
w, err := wal.New(nil, nil, wdir)
|
w, err := wal.New(nil, nil, wdir, false)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
// We can properly write some new data after migration.
|
// We can properly write some new data after migration.
|
||||||
|
|
Loading…
Reference in a new issue