Merge pull request #534 from codesome/optional-vertical-blocks

Make vertical compaction and query merge optional
This commit is contained in:
Ganesh Vernekar 2019-02-26 13:50:05 -08:00 committed by GitHub
commit 158c3074cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 7 deletions

View file

@ -51,8 +51,8 @@ func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
// Compactor provides compaction against an underlying storage
// of time series data.
type Compactor interface {
// Plan returns a set of non-overlapping directories that can
// be compacted concurrently.
// Plan returns a set of directories that can be compacted concurrently.
// The directories can be overlapping.
// Results returned when compactions are in progress are undefined.
Plan(dir string) ([]string, error)

29
db.go
View file

@ -49,6 +49,7 @@ var DefaultOptions = &Options{
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
NoLockfile: false,
AllowOverlappingBlock: false,
}
// Options of the DB storage.
@ -71,6 +72,10 @@ type Options struct {
// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool
// Overlapping blocks are allowed iff AllowOverlappingBlock is true.
// This in-turn enables vertical compaction and vertical query merge.
AllowOverlappingBlock bool
}
// Appender allows appending a batch of data. It must be completed with a
@ -548,6 +553,11 @@ func (db *DB) reload() (err error) {
sort.Slice(loadable, func(i, j int) bool {
return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime
})
if !db.opts.AllowOverlappingBlock {
if err := validateBlockSequence(loadable); err != nil {
return errors.Wrap(err, "invalid block sequence")
}
}
// Swap new blocks first for subsequently created readers to be seen.
db.mtx.Lock()
@ -699,6 +709,25 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
return nil
}
// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func validateBlockSequence(bs []*Block) error {
if len(bs) <= 1 {
return nil
}
var metas []BlockMeta
for _, b := range bs {
metas = append(metas, b.meta)
}
overlaps := OverlappingBlocks(metas)
if len(overlaps) > 0 {
return errors.Errorf("block time ranges overlap: %s", overlaps)
}
return nil
}
// TimeRange specifies minTime and maxTime range.
type TimeRange struct {
Min, Max int64

View file

@ -1932,7 +1932,9 @@ func TestVerticalCompaction(t *testing.T) {
for _, series := range c.blockSeries {
createBlock(t, tmpdir, series)
}
db, err := Open(tmpdir, nil, nil, nil)
opts := *DefaultOptions
opts.AllowOverlappingBlock = true
db, err := Open(tmpdir, nil, nil, &opts)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, db.Close())