Added ability to create db with NopWal (#519)

Signed-off-by: Vishnunarayan K I <appukuttancr@gmail.com>
This commit is contained in:
Vishnunarayan K I 2019-03-26 05:08:12 +05:30 committed by Krasi Georgiev
parent db9177de0c
commit 7757fe6f21
3 changed files with 65 additions and 31 deletions

View file

@ -1,6 +1,7 @@
## master / unreleased ## master / unreleased
- [REMOVED] `chunks.NewReader` is removed as it wasn't used anywhere. - [REMOVED] `chunks.NewReader` is removed as it wasn't used anywhere.
- [REMOVED] `FromData` is considered unused so was removed. - [REMOVED] `FromData` is considered unused so was removed.
- [FEATURE] Added option WALSegmentSize -1 to disable the WAL.
## 0.6.1 ## 0.6.1
- [BUGFIX] Update `last` after appending a non-overlapping chunk in `chunks.MergeOverlappingChunks`. [#539](https://github.com/prometheus/tsdb/pull/539) - [BUGFIX] Update `last` after appending a non-overlapping chunk in `chunks.MergeOverlappingChunks`. [#539](https://github.com/prometheus/tsdb/pull/539)
@ -15,7 +16,7 @@
- Added `MergeOverlappingChunks` function in `chunks/chunks.go` to merge multiple time-overlapping Chunk Metas. - Added `MergeOverlappingChunks` function in `chunks/chunks.go` to merge multiple time-overlapping Chunk Metas.
- Added `MinTime` and `MaxTime` method for `BlockReader`. - Added `MinTime` and `MaxTime` method for `BlockReader`.
- [FEATURE] New `dump` command to tsdb tool to dump all samples. - [FEATURE] New `dump` command to tsdb tool to dump all samples.
- [FEATURE] New `encoding` package for common binary encoding/decoding helpers. - [FEATURE] New `encoding` package for common binary encoding/decoding helpers.
- Added to remove some code duplication. - Added to remove some code duplication.
- [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block. - [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block.
- `NewLeveledCompactor` takes a context. - `NewLeveledCompactor` takes a context.
@ -41,7 +42,7 @@
- [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path. - [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path.
- [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct. - [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct.
- [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors. - [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors.
- [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)` - [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)`
- [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field. - [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field.
- [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset. - [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset.
- [FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc. - [FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.

23
db.go
View file

@ -54,7 +54,10 @@ var DefaultOptions = &Options{
// Options of the DB storage. // Options of the DB storage.
type Options struct { type Options struct {
// Segments (wal files) max size // Segments (wal files) max size.
// WALSegmentSize = 0, segment size is default size.
// WALSegmentSize > 0, segment size is WALSegmentSize.
// WALSegmentSize < 0, wal is disabled.
WALSegmentSize int WALSegmentSize int
// Duration of persisted data to keep. // Duration of persisted data to keep.
@ -288,14 +291,20 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
} }
db.compactCancel = cancel db.compactCancel = cancel
var wlog *wal.WAL
segmentSize := wal.DefaultSegmentSize segmentSize := wal.DefaultSegmentSize
if opts.WALSegmentSize > 0 { // Wal is enabled.
segmentSize = opts.WALSegmentSize if opts.WALSegmentSize >= 0 {
} // Wal is set to a custom size.
wlog, err := wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize) if opts.WALSegmentSize > 0 {
if err != nil { segmentSize = opts.WALSegmentSize
return nil, err }
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize)
if err != nil {
return nil, err
}
} }
db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0]) db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0])
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -746,29 +746,53 @@ func TestWALFlushedOnDBClose(t *testing.T) {
testutil.Equals(t, []string{"labelvalue"}, values) testutil.Equals(t, []string{"labelvalue"}, values)
} }
func TestWALSegmentSizeOption(t *testing.T) { func TestWALSegmentSizeOptions(t *testing.T) {
options := *DefaultOptions tests := map[int]func(dbdir string, segmentSize int){
options.WALSegmentSize = 2 * 32 * 1024 // Default Wal Size.
db, delete := openTestDB(t, &options) 0: func(dbDir string, segmentSize int) {
defer delete() files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
app := db.Appender() testutil.Ok(t, err)
for i := int64(0); i < 155; i++ { for _, f := range files[:len(files)-1] {
_, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64()) testutil.Equals(t, int64(DefaultOptions.WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
testutil.Ok(t, err) }
testutil.Ok(t, app.Commit()) lastFile := files[len(files)-1]
testutil.Assert(t, int64(DefaultOptions.WALSegmentSize) > lastFile.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", lastFile.Name())
},
// Custom Wal Size.
2 * 32 * 1024: func(dbDir string, segmentSize int) {
files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.")
testutil.Ok(t, err)
for _, f := range files[:len(files)-1] {
testutil.Equals(t, int64(segmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
}
lastFile := files[len(files)-1]
testutil.Assert(t, int64(segmentSize) > lastFile.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", lastFile.Name())
},
// Wal disabled.
-1: func(dbDir string, segmentSize int) {
if _, err := os.Stat(filepath.Join(dbDir, "wal")); !os.IsNotExist(err) {
t.Fatal("wal directory is present when the wal is disabled")
}
},
} }
for segmentSize, testFunc := range tests {
t.Run(fmt.Sprintf("WALSegmentSize %d test", segmentSize), func(t *testing.T) {
options := *DefaultOptions
options.WALSegmentSize = segmentSize
db, delete := openTestDB(t, &options)
defer delete()
app := db.Appender()
for i := int64(0); i < 155; i++ {
_, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64())
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
}
dbDir := db.Dir() dbDir := db.Dir()
db.Close() db.Close()
files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) testFunc(dbDir, options.WALSegmentSize)
testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.") })
testutil.Ok(t, err)
for i, f := range files {
if len(files)-1 != i {
testutil.Equals(t, int64(options.WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
continue
}
testutil.Assert(t, int64(options.WALSegmentSize) > f.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", f.Name())
} }
} }
@ -1500,7 +1524,7 @@ func TestNoEmptyBlocks(t *testing.T) {
testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples") testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples")
}) })
t.Run(`When no new block is created from head, and there are some blocks on disk t.Run(`When no new block is created from head, and there are some blocks on disk
compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) { compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) {
oldBlocks := db.Blocks() oldBlocks := db.Blocks()
app := db.Appender() app := db.Appender()