Merge branch 'master' into rename-bug

Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com>
This commit is contained in:
Krasi Georgiev 2019-06-24 11:50:41 +03:00
commit fe3d374586
17 changed files with 746 additions and 548 deletions

View file

@ -1,5 +1,5 @@
## master / unreleased ## master / unreleased
- [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609)
## 0.8.0 ## 0.8.0
- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.

View file

@ -135,7 +135,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
if err := os.MkdirAll(cpdirtmp, 0777); err != nil { if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return nil, errors.Wrap(err, "create checkpoint dir") return nil, errors.Wrap(err, "create checkpoint dir")
} }
cp, err := wal.New(nil, nil, cpdirtmp) cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled())
if err != nil { if err != nil {
return nil, errors.Wrap(err, "open checkpoint") return nil, errors.Wrap(err, "open checkpoint")
} }

View file

@ -86,6 +86,8 @@ func TestDeleteCheckpoints(t *testing.T) {
} }
func TestCheckpoint(t *testing.T) { func TestCheckpoint(t *testing.T) {
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint") dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err) testutil.Ok(t, err)
defer func() { defer func() {
@ -99,7 +101,7 @@ func TestCheckpoint(t *testing.T) {
testutil.Ok(t, seg.Close()) testutil.Ok(t, seg.Close())
// Manually create checkpoint for 99 and earlier. // Manually create checkpoint for 99 and earlier.
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099")) w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress)
testutil.Ok(t, err) testutil.Ok(t, err)
// Add some data we expect to be around later. // Add some data we expect to be around later.
@ -111,7 +113,7 @@ func TestCheckpoint(t *testing.T) {
testutil.Ok(t, w.Close()) testutil.Ok(t, w.Close())
// Start a WAL and write records to it as usual. // Start a WAL and write records to it as usual.
w, err = wal.NewSize(nil, nil, dir, 64*1024) w, err = wal.NewSize(nil, nil, dir, 64*1024, compress)
testutil.Ok(t, err) testutil.Ok(t, err)
var last int64 var last int64
@ -188,6 +190,8 @@ func TestCheckpoint(t *testing.T) {
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series) }, series)
})
}
} }
func TestCheckpointNoTmpFolderAfterError(t *testing.T) { func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
@ -197,7 +201,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
defer func() { defer func() {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := wal.NewSize(nil, nil, dir, 64*1024) w, err := wal.NewSize(nil, nil, dir, 64*1024, false)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, w.Log([]byte{99})) testutil.Ok(t, w.Log([]byte{99}))
w.Close() w.Close()

View file

@ -506,6 +506,30 @@ func analyzeBlock(b *tsdb.Block, limit int) {
fmt.Printf("\nMost common label pairs:\n") fmt.Printf("\nMost common label pairs:\n")
printInfo(postingInfos) printInfo(postingInfos)
postingInfos = postingInfos[:0]
for _, n := range allLabelNames {
values, err := ir.LabelValues(n)
if err != nil {
exitWithError(err)
}
var cumulativeLength uint64
for i := 0; i < values.Len(); i++ {
value, _ := values.At(i)
if err != nil {
exitWithError(err)
}
for _, str := range value {
cumulativeLength += uint64(len(str))
}
}
postingInfos = append(postingInfos, postingInfo{n, cumulativeLength})
}
fmt.Printf("\nLabel names with highest cumulative label value length:\n")
printInfo(postingInfos)
postingInfos = postingInfos[:0] postingInfos = postingInfos[:0]
for _, n := range allLabelNames { for _, n := range allLabelNames {
lv, err := ir.LabelValues(n) lv, err := ir.LabelValues(n)

6
db.go
View file

@ -51,6 +51,7 @@ var DefaultOptions = &Options{
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5), BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
NoLockfile: false, NoLockfile: false,
AllowOverlappingBlocks: false, AllowOverlappingBlocks: false,
WALCompression: false,
} }
// Options of the DB storage. // Options of the DB storage.
@ -80,6 +81,9 @@ type Options struct {
// Overlapping blocks are allowed if AllowOverlappingBlocks is true. // Overlapping blocks are allowed if AllowOverlappingBlocks is true.
// This in-turn enables vertical compaction and vertical query merge. // This in-turn enables vertical compaction and vertical query merge.
AllowOverlappingBlocks bool AllowOverlappingBlocks bool
// WALCompression will turn on Snappy compression for records on the WAL.
WALCompression bool
} }
// Appender allows appending a batch of data. It must be completed with a // Appender allows appending a batch of data. It must be completed with a
@ -306,7 +310,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if opts.WALSegmentSize > 0 { if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize segmentSize = opts.WALSegmentSize
} }
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize) wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -1404,7 +1404,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
}() }()
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal")) w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
testutil.Ok(t, err) testutil.Ok(t, err)
var enc RecordEncoder var enc RecordEncoder
@ -1454,7 +1454,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
createBlock(t, dir, genSeries(1, 1, 1000, 6000)) createBlock(t, dir, genSeries(1, 1, 1000, 6000))
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal")) w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
testutil.Ok(t, err) testutil.Ok(t, err)
var enc RecordEncoder var enc RecordEncoder

1
go.mod
View file

@ -4,6 +4,7 @@ require (
github.com/cespare/xxhash v1.1.0 github.com/cespare/xxhash v1.1.0
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954 github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954
github.com/go-kit/kit v0.8.0 github.com/go-kit/kit v0.8.0
github.com/golang/snappy v0.0.1
github.com/oklog/ulid v1.3.1 github.com/oklog/ulid v1.3.1
github.com/pkg/errors v0.8.0 github.com/pkg/errors v0.8.0
github.com/prometheus/client_golang v1.0.0 github.com/prometheus/client_golang v1.0.0

2
go.sum
View file

@ -30,6 +30,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"math" "math"
"math/rand" "math/rand"
@ -96,6 +97,8 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
} }
func TestHead_ReadWAL(t *testing.T) { func TestHead_ReadWAL(t *testing.T) {
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
entries := []interface{}{ entries := []interface{}{
[]RefSeries{ []RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")}, {Ref: 10, Labels: labels.FromStrings("a", "1")},
@ -127,7 +130,7 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := wal.New(nil, nil, dir) w, err := wal.New(nil, nil, dir, compress)
testutil.Ok(t, err) testutil.Ok(t, err)
defer w.Close() defer w.Close()
populateTestWAL(t, w, entries) populateTestWAL(t, w, entries)
@ -156,10 +159,11 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Ok(t, c.Err()) testutil.Ok(t, c.Err())
return x return x
} }
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0))) testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0)))
})
}
} }
func TestHead_WALMultiRef(t *testing.T) { func TestHead_WALMultiRef(t *testing.T) {
@ -169,7 +173,7 @@ func TestHead_WALMultiRef(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := wal.New(nil, nil, dir) w, err := wal.New(nil, nil, dir, false)
testutil.Ok(t, err) testutil.Ok(t, err)
head, err := NewHead(nil, nil, w, 1000) head, err := NewHead(nil, nil, w, 1000)
@ -193,7 +197,7 @@ func TestHead_WALMultiRef(t *testing.T) {
} }
testutil.Ok(t, head.Close()) testutil.Ok(t, head.Close())
w, err = wal.New(nil, nil, dir) w, err = wal.New(nil, nil, dir, false)
testutil.Ok(t, err) testutil.Ok(t, err)
head, err = NewHead(nil, nil, w, 1000) head, err = NewHead(nil, nil, w, 1000)
@ -319,6 +323,8 @@ func TestMemSeries_truncateChunks(t *testing.T) {
} }
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
entries := []interface{}{ entries := []interface{}{
[]RefSeries{ []RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")}, {Ref: 10, Labels: labels.FromStrings("a", "1")},
@ -338,7 +344,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := wal.New(nil, nil, dir) w, err := wal.New(nil, nil, dir, compress)
testutil.Ok(t, err) testutil.Ok(t, err)
defer w.Close() defer w.Close()
populateTestWAL(t, w, entries) populateTestWAL(t, w, entries)
@ -349,6 +355,8 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
testutil.Ok(t, head.Init(math.MinInt64)) testutil.Ok(t, head.Init(math.MinInt64))
testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
})
}
} }
func TestHeadDeleteSimple(t *testing.T) { func TestHeadDeleteSimple(t *testing.T) {
@ -388,7 +396,9 @@ func TestHeadDeleteSimple(t *testing.T) {
}, },
} }
Outer: for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
Outer:
for _, c := range cases { for _, c := range cases {
dir, err := ioutil.TempDir("", "test_wal_reload") dir, err := ioutil.TempDir("", "test_wal_reload")
testutil.Ok(t, err) testutil.Ok(t, err)
@ -396,7 +406,7 @@ Outer:
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := wal.New(nil, nil, path.Join(dir, "wal")) w, err := wal.New(nil, nil, path.Join(dir, "wal"), compress)
testutil.Ok(t, err) testutil.Ok(t, err)
defer w.Close() defer w.Close()
@ -418,7 +428,7 @@ Outer:
} }
// Compare the samples for both heads - before and after the reload. // Compare the samples for both heads - before and after the reload.
reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload. reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload.
testutil.Ok(t, err) testutil.Ok(t, err)
defer reloadedW.Close() defer reloadedW.Close()
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
@ -512,6 +522,8 @@ Outer:
} }
} }
} }
})
}
} }
func TestDeleteUntilCurMax(t *testing.T) { func TestDeleteUntilCurMax(t *testing.T) {
@ -559,7 +571,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
defer func() { defer func() {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
wlog, err := wal.NewSize(nil, nil, dir, 32768) wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
testutil.Ok(t, err) testutil.Ok(t, err)
// Enough samples to cause a checkpoint. // Enough samples to cause a checkpoint.
@ -1019,13 +1031,15 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
} }
func TestHead_LogRollback(t *testing.T) { func TestHead_LogRollback(t *testing.T) {
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_rollback") dir, err := ioutil.TempDir("", "wal_rollback")
testutil.Ok(t, err) testutil.Ok(t, err)
defer func() { defer func() {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := wal.New(nil, nil, dir) w, err := wal.New(nil, nil, dir, compress)
testutil.Ok(t, err) testutil.Ok(t, err)
defer w.Close() defer w.Close()
h, err := NewHead(nil, nil, w, 1000) h, err := NewHead(nil, nil, w, 1000)
@ -1043,6 +1057,8 @@ func TestHead_LogRollback(t *testing.T) {
series, ok := recs[0].([]RefSeries) series, ok := recs[0].([]RefSeries)
testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) testutil.Assert(t, ok, "expected series record but got %+v", recs[0])
testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series)
})
}
} }
// TestWalRepair_DecodingError ensures that a repair is run for an error // TestWalRepair_DecodingError ensures that a repair is run for an error
@ -1057,8 +1073,11 @@ func TestWalRepair_DecodingError(t *testing.T) {
}{ }{
"invalid_record": { "invalid_record": {
func(rec []byte) []byte { func(rec []byte) []byte {
rec[0] = byte(RecordInvalid) // Do not modify the base record because it is Logged multiple times.
return rec res := make([]byte, len(rec))
copy(res, rec)
res[0] = byte(RecordInvalid)
return res
}, },
enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}),
9, 9,
@ -1089,7 +1108,8 @@ func TestWalRepair_DecodingError(t *testing.T) {
5, 5,
}, },
} { } {
t.Run(name, func(t *testing.T) { for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_repair") dir, err := ioutil.TempDir("", "wal_repair")
testutil.Ok(t, err) testutil.Ok(t, err)
defer func() { defer func() {
@ -1098,7 +1118,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
// Fill the wal and corrupt it. // Fill the wal and corrupt it.
{ {
w, err := wal.New(nil, nil, filepath.Join(dir, "wal")) w, err := wal.New(nil, nil, filepath.Join(dir, "wal"), compress)
testutil.Ok(t, err) testutil.Ok(t, err)
for i := 1; i <= test.totalRecs; i++ { for i := 1; i <= test.totalRecs; i++ {
@ -1147,7 +1167,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
} }
}) })
} }
}
} }
func TestNewWalSegmentOnTruncate(t *testing.T) { func TestNewWalSegmentOnTruncate(t *testing.T) {
@ -1156,7 +1176,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
defer func() { defer func() {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
wlog, err := wal.NewSize(nil, nil, dir, 32768) wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
testutil.Ok(t, err) testutil.Ok(t, err)
h, err := NewHead(nil, nil, wlog, 1000) h, err := NewHead(nil, nil, wlog, 1000)

View file

@ -16,6 +16,7 @@ package testutil
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath"
) )
const ( const (
@ -127,3 +128,18 @@ func NewTemporaryDirectory(name string, t T) (handler TemporaryDirectory) {
return return
} }
// DirSize returns the size in bytes of all files in a directory.
func DirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += info.Size()
}
return nil
})
return size, err
}

2
wal.go
View file

@ -1246,7 +1246,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
if err := os.RemoveAll(tmpdir); err != nil { if err := os.RemoveAll(tmpdir); err != nil {
return errors.Wrap(err, "cleanup replacement dir") return errors.Wrap(err, "cleanup replacement dir")
} }
repl, err := wal.New(logger, nil, tmpdir) repl, err := wal.New(logger, nil, tmpdir, false)
if err != nil { if err != nil {
return errors.Wrap(err, "open new WAL") return errors.Wrap(err, "open new WAL")
} }

View file

@ -22,6 +22,7 @@ import (
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/golang/snappy"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -57,6 +58,7 @@ type LiveReader struct {
rdr io.Reader rdr io.Reader
err error err error
rec []byte rec []byte
snappyBuf []byte
hdr [recordHeaderSize]byte hdr [recordHeaderSize]byte
buf [pageSize]byte buf [pageSize]byte
readIndex int // Index in buf to start at for next read. readIndex int // Index in buf to start at for next read.
@ -171,11 +173,18 @@ func (r *LiveReader) buildRecord() (bool, error) {
return false, nil return false, nil
} }
rt := recType(r.hdr[0]) rt := recTypeFromHeader(r.hdr[0])
if rt == recFirst || rt == recFull { if rt == recFirst || rt == recFull {
r.rec = r.rec[:0] r.rec = r.rec[:0]
r.snappyBuf = r.snappyBuf[:0]
} }
compressed := r.hdr[0]&snappyMask != 0
if compressed {
r.snappyBuf = append(r.snappyBuf, temp...)
} else {
r.rec = append(r.rec, temp...) r.rec = append(r.rec, temp...)
}
if err := validateRecord(rt, r.index); err != nil { if err := validateRecord(rt, r.index); err != nil {
r.index = 0 r.index = 0
@ -183,6 +192,16 @@ func (r *LiveReader) buildRecord() (bool, error) {
} }
if rt == recLast || rt == recFull { if rt == recLast || rt == recFull {
r.index = 0 r.index = 0
if compressed && len(r.snappyBuf) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
r.rec = r.rec[:cap(r.rec)]
r.rec, err = snappy.Decode(r.rec, r.snappyBuf)
if err != nil {
return false, err
}
}
return true, nil return true, nil
} }
// Only increment i for non-zero records since we use it // Only increment i for non-zero records since we use it

View file

@ -19,6 +19,7 @@ import (
"hash/crc32" "hash/crc32"
"io" "io"
"github.com/golang/snappy"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -27,6 +28,7 @@ type Reader struct {
rdr io.Reader rdr io.Reader
err error err error
rec []byte rec []byte
snappyBuf []byte
buf [pageSize]byte buf [pageSize]byte
total int64 // Total bytes processed. total int64 // Total bytes processed.
curRecTyp recType // Used for checking that the last record is not torn. curRecTyp recType // Used for checking that the last record is not torn.
@ -45,7 +47,7 @@ func (r *Reader) Next() bool {
// The last WAL segment record shouldn't be torn(should be full or last). // The last WAL segment record shouldn't be torn(should be full or last).
// The last record would be torn after a crash just before // The last record would be torn after a crash just before
// the last record part could be persisted to disk. // the last record part could be persisted to disk.
if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle { if r.curRecTyp == recFirst || r.curRecTyp == recMiddle {
r.err = errors.New("last record is torn") r.err = errors.New("last record is torn")
} }
return false return false
@ -61,6 +63,7 @@ func (r *Reader) next() (err error) {
buf := r.buf[recordHeaderSize:] buf := r.buf[recordHeaderSize:]
r.rec = r.rec[:0] r.rec = r.rec[:0]
r.snappyBuf = r.snappyBuf[:0]
i := 0 i := 0
for { for {
@ -68,7 +71,8 @@ func (r *Reader) next() (err error) {
return errors.Wrap(err, "read first header byte") return errors.Wrap(err, "read first header byte")
} }
r.total++ r.total++
r.curRecTyp = recType(hdr[0]) r.curRecTyp = recTypeFromHeader(hdr[0])
compressed := hdr[0]&snappyMask != 0
// Gobble up zero bytes. // Gobble up zero bytes.
if r.curRecTyp == recPageTerm { if r.curRecTyp == recPageTerm {
@ -123,12 +127,25 @@ func (r *Reader) next() (err error) {
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
return errors.Errorf("unexpected checksum %x, expected %x", c, crc) return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
} }
if compressed {
r.snappyBuf = append(r.snappyBuf, buf[:length]...)
} else {
r.rec = append(r.rec, buf[:length]...) r.rec = append(r.rec, buf[:length]...)
}
if err := validateRecord(r.curRecTyp, i); err != nil { if err := validateRecord(r.curRecTyp, i); err != nil {
return err return err
} }
if r.curRecTyp == recLast || r.curRecTyp == recFull { if r.curRecTyp == recLast || r.curRecTyp == recFull {
if compressed && len(r.snappyBuf) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
r.rec = r.rec[:cap(r.rec)]
r.rec, err = snappy.Decode(r.rec, r.snappyBuf)
return err
}
return nil return nil
} }

View file

@ -310,14 +310,15 @@ func allSegments(dir string) (io.ReadCloser, error) {
func TestReaderFuzz(t *testing.T) { func TestReaderFuzz(t *testing.T) {
for name, fn := range readerConstructors { for name, fn := range readerConstructors {
t.Run(name, func(t *testing.T) { for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_fuzz_live") dir, err := ioutil.TempDir("", "wal_fuzz_live")
testutil.Ok(t, err) testutil.Ok(t, err)
defer func() { defer func() {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := NewSize(nil, nil, dir, 128*pageSize) w, err := NewSize(nil, nil, dir, 128*pageSize, compress)
testutil.Ok(t, err) testutil.Ok(t, err)
// Buffering required as we're not reading concurrently. // Buffering required as we're not reading concurrently.
@ -341,17 +342,20 @@ func TestReaderFuzz(t *testing.T) {
testutil.Assert(t, !reader.Next(), "unexpected record") testutil.Assert(t, !reader.Next(), "unexpected record")
}) })
} }
}
} }
func TestReaderFuzz_Live(t *testing.T) { func TestReaderFuzz_Live(t *testing.T) {
logger := testutil.NewLogger(t) logger := testutil.NewLogger(t)
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_fuzz_live") dir, err := ioutil.TempDir("", "wal_fuzz_live")
testutil.Ok(t, err) testutil.Ok(t, err)
defer func() { defer func() {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := NewSize(nil, nil, dir, 128*pageSize) w, err := NewSize(nil, nil, dir, 128*pageSize, compress)
testutil.Ok(t, err) testutil.Ok(t, err)
defer w.Close() defer w.Close()
@ -389,7 +393,7 @@ func TestReaderFuzz_Live(t *testing.T) {
return true return true
} }
outer: outer:
for { for {
select { select {
case <-segmentTicker.C: case <-segmentTicker.C:
@ -422,6 +426,8 @@ outer:
} }
testutil.Assert(t, r.Err() == io.EOF, "expected EOF") testutil.Assert(t, r.Err() == io.EOF, "expected EOF")
})
}
} }
func TestLiveReaderCorrupt_ShortFile(t *testing.T) { func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
@ -434,7 +440,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := NewSize(nil, nil, dir, pageSize) w, err := NewSize(nil, nil, dir, pageSize, false)
testutil.Ok(t, err) testutil.Ok(t, err)
rec := make([]byte, pageSize-recordHeaderSize) rec := make([]byte, pageSize-recordHeaderSize)
@ -478,7 +484,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := NewSize(nil, nil, dir, pageSize*2) w, err := NewSize(nil, nil, dir, pageSize*2, false)
testutil.Ok(t, err) testutil.Ok(t, err)
rec := make([]byte, pageSize-recordHeaderSize) rec := make([]byte, pageSize-recordHeaderSize)
@ -525,7 +531,7 @@ func TestReaderData(t *testing.T) {
for name, fn := range readerConstructors { for name, fn := range readerConstructors {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
w, err := New(nil, nil, dir) w, err := New(nil, nil, dir, true)
testutil.Ok(t, err) testutil.Ok(t, err)
sr, err := allSegments(dir) sr, err := allSegments(dir)

View file

@ -29,6 +29,7 @@ import (
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/golang/snappy"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/fileutil"
@ -165,6 +166,8 @@ type WAL struct {
stopc chan chan struct{} stopc chan chan struct{}
actorc chan func() actorc chan func()
closed bool // To allow calling Close() more than once without blocking. closed bool // To allow calling Close() more than once without blocking.
compress bool
snappyBuf []byte
fsyncDuration prometheus.Summary fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter pageFlushes prometheus.Counter
@ -175,13 +178,13 @@ type WAL struct {
} }
// New returns a new WAL over the given directory. // New returns a new WAL over the given directory.
func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WAL, error) {
return NewSize(logger, reg, dir, DefaultSegmentSize) return NewSize(logger, reg, dir, DefaultSegmentSize, compress)
} }
// NewSize returns a new WAL over the given directory. // NewSize returns a new WAL over the given directory.
// New segments are created with the specified size. // New segments are created with the specified size.
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) { func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WAL, error) {
if segmentSize%pageSize != 0 { if segmentSize%pageSize != 0 {
return nil, errors.New("invalid segment size") return nil, errors.New("invalid segment size")
} }
@ -198,6 +201,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
page: &page{}, page: &page{},
actorc: make(chan func(), 100), actorc: make(chan func(), 100),
stopc: make(chan chan struct{}), stopc: make(chan chan struct{}),
compress: compress,
} }
w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_fsync_duration_seconds", Name: "prometheus_tsdb_wal_fsync_duration_seconds",
@ -253,6 +257,11 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
return w, nil return w, nil
} }
// CompressionEnabled returns if compression is enabled on this WAL.
func (w *WAL) CompressionEnabled() bool {
return w.compress
}
// Dir returns the directory of the WAL. // Dir returns the directory of the WAL.
func (w *WAL) Dir() string { func (w *WAL) Dir() string {
return w.dir return w.dir
@ -476,6 +485,14 @@ func (w *WAL) flushPage(clear bool) error {
return nil return nil
} }
// First Byte of header format:
// [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ]
const (
snappyMask = 1 << 3
recTypeMask = snappyMask - 1
)
type recType uint8 type recType uint8
const ( const (
@ -486,6 +503,10 @@ const (
recLast recType = 4 // Final fragment of a record. recLast recType = 4 // Final fragment of a record.
) )
func recTypeFromHeader(header byte) recType {
return recType(header & recTypeMask)
}
func (t recType) String() string { func (t recType) String() string {
switch t { switch t {
case recPageTerm: case recPageTerm:
@ -546,6 +567,19 @@ func (w *WAL) log(rec []byte, final bool) error {
} }
} }
compressed := false
if w.compress && len(rec) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
w.snappyBuf = w.snappyBuf[:cap(w.snappyBuf)]
w.snappyBuf = snappy.Encode(w.snappyBuf, rec)
if len(w.snappyBuf) < len(rec) {
rec = w.snappyBuf
compressed = true
}
}
// Populate as many pages as necessary to fit the record. // Populate as many pages as necessary to fit the record.
// Be careful to always do one pass to ensure we write zero-length records. // Be careful to always do one pass to ensure we write zero-length records.
for i := 0; i == 0 || len(rec) > 0; i++ { for i := 0; i == 0 || len(rec) > 0; i++ {
@ -569,6 +603,9 @@ func (w *WAL) log(rec []byte, final bool) error {
default: default:
typ = recMiddle typ = recMiddle
} }
if compressed {
typ |= snappyMask
}
buf[0] = byte(typ) buf[0] = byte(typ)
crc := crc32.Checksum(part, castagnoliTable) crc := crc32.Checksum(part, castagnoliTable)

View file

@ -120,7 +120,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
// then corrupt a given record in a given segment. // then corrupt a given record in a given segment.
// As a result we want a repaired WAL with given intact records. // As a result we want a repaired WAL with given intact records.
segSize := 3 * pageSize segSize := 3 * pageSize
w, err := NewSize(nil, nil, dir, segSize) w, err := NewSize(nil, nil, dir, segSize, false)
testutil.Ok(t, err) testutil.Ok(t, err)
var records [][]byte var records [][]byte
@ -145,7 +145,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
testutil.Ok(t, f.Close()) testutil.Ok(t, f.Close())
w, err = NewSize(nil, nil, dir, segSize) w, err = NewSize(nil, nil, dir, segSize, false)
testutil.Ok(t, err) testutil.Ok(t, err)
defer w.Close() defer w.Close()
@ -223,7 +223,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
// Produce a WAL with a two segments of 3 pages with 3 records each, // Produce a WAL with a two segments of 3 pages with 3 records each,
// so when we truncate the file we're guaranteed to split a record. // so when we truncate the file we're guaranteed to split a record.
{ {
w, err := NewSize(logger, nil, dir, segmentSize) w, err := NewSize(logger, nil, dir, segmentSize, false)
testutil.Ok(t, err) testutil.Ok(t, err)
for i := 0; i < 18; i++ { for i := 0; i < 18; i++ {
@ -294,7 +294,7 @@ func TestCorruptAndCarryOn(t *testing.T) {
err = sr.Close() err = sr.Close()
testutil.Ok(t, err) testutil.Ok(t, err)
w, err := NewSize(logger, nil, dir, segmentSize) w, err := NewSize(logger, nil, dir, segmentSize, false)
testutil.Ok(t, err) testutil.Ok(t, err)
err = w.Repair(corruptionErr) err = w.Repair(corruptionErr)
@ -341,7 +341,7 @@ func TestClose(t *testing.T) {
defer func() { defer func() {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := NewSize(nil, nil, dir, pageSize) w, err := NewSize(nil, nil, dir, pageSize, false)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, w.Close()) testutil.Ok(t, w.Close())
testutil.NotOk(t, w.Close()) testutil.NotOk(t, w.Close())
@ -358,7 +358,7 @@ func TestSegmentMetric(t *testing.T) {
defer func() { defer func() {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
w, err := NewSize(nil, nil, dir, segmentSize) w, err := NewSize(nil, nil, dir, segmentSize, false)
testutil.Ok(t, err) testutil.Ok(t, err)
initialSegment := client_testutil.ToFloat64(w.currentSegment) initialSegment := client_testutil.ToFloat64(w.currentSegment)
@ -376,14 +376,56 @@ func TestSegmentMetric(t *testing.T) {
testutil.Ok(t, w.Close()) testutil.Ok(t, w.Close())
} }
func TestCompression(t *testing.T) {
boostrap := func(compressed bool) string {
const (
segmentSize = pageSize
recordSize = (pageSize / 2) - recordHeaderSize
records = 100
)
dirPath, err := ioutil.TempDir("", fmt.Sprintf("TestCompression_%t", compressed))
testutil.Ok(t, err)
w, err := NewSize(nil, nil, dirPath, segmentSize, compressed)
testutil.Ok(t, err)
buf := make([]byte, recordSize)
for i := 0; i < records; i++ {
testutil.Ok(t, w.Log(buf))
}
testutil.Ok(t, w.Close())
return dirPath
}
dirCompressed := boostrap(true)
defer func() {
testutil.Ok(t, os.RemoveAll(dirCompressed))
}()
dirUnCompressed := boostrap(false)
defer func() {
testutil.Ok(t, os.RemoveAll(dirUnCompressed))
}()
uncompressedSize, err := testutil.DirSize(dirUnCompressed)
testutil.Ok(t, err)
compressedSize, err := testutil.DirSize(dirCompressed)
testutil.Ok(t, err)
testutil.Assert(t, float64(uncompressedSize)*0.75 > float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize)
}
func BenchmarkWAL_LogBatched(b *testing.B) { func BenchmarkWAL_LogBatched(b *testing.B) {
for _, compress := range []bool{true, false} {
b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_logbatch") dir, err := ioutil.TempDir("", "bench_logbatch")
testutil.Ok(b, err) testutil.Ok(b, err)
defer func() { defer func() {
testutil.Ok(b, os.RemoveAll(dir)) testutil.Ok(b, os.RemoveAll(dir))
}() }()
w, err := New(nil, nil, "testdir") w, err := New(nil, nil, dir, compress)
testutil.Ok(b, err) testutil.Ok(b, err)
defer w.Close() defer w.Close()
@ -404,16 +446,20 @@ func BenchmarkWAL_LogBatched(b *testing.B) {
// If it's counted batched vs. single benchmarks are very similar but // If it's counted batched vs. single benchmarks are very similar but
// do not show burst throughput well. // do not show burst throughput well.
b.StopTimer() b.StopTimer()
})
}
} }
func BenchmarkWAL_Log(b *testing.B) { func BenchmarkWAL_Log(b *testing.B) {
for _, compress := range []bool{true, false} {
b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_logsingle") dir, err := ioutil.TempDir("", "bench_logsingle")
testutil.Ok(b, err) testutil.Ok(b, err)
defer func() { defer func() {
testutil.Ok(b, os.RemoveAll(dir)) testutil.Ok(b, os.RemoveAll(dir))
}() }()
w, err := New(nil, nil, "testdir") w, err := New(nil, nil, dir, compress)
testutil.Ok(b, err) testutil.Ok(b, err)
defer w.Close() defer w.Close()
@ -428,4 +474,6 @@ func BenchmarkWAL_Log(b *testing.B) {
// If it's counted batched vs. single benchmarks are very similar but // If it's counted batched vs. single benchmarks are very similar but
// do not show burst throughput well. // do not show burst throughput well.
b.StopTimer() b.StopTimer()
})
}
} }

View file

@ -459,7 +459,7 @@ func TestMigrateWAL_Empty(t *testing.T) {
wdir := path.Join(dir, "wal") wdir := path.Join(dir, "wal")
// Initialize empty WAL. // Initialize empty WAL.
w, err := wal.New(nil, nil, wdir) w, err := wal.New(nil, nil, wdir, false)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, w.Close()) testutil.Ok(t, w.Close())
@ -506,7 +506,7 @@ func TestMigrateWAL_Fuzz(t *testing.T) {
// Perform migration. // Perform migration.
testutil.Ok(t, MigrateWAL(nil, wdir)) testutil.Ok(t, MigrateWAL(nil, wdir))
w, err := wal.New(nil, nil, wdir) w, err := wal.New(nil, nil, wdir, false)
testutil.Ok(t, err) testutil.Ok(t, err)
// We can properly write some new data after migration. // We can properly write some new data after migration.