Merge pull request #335 from prometheus/blockorder

Ensure correct block order on reload
This commit is contained in:
Fabian Reinartz 2018-05-28 16:41:47 -04:00 committed by GitHub
commit 3cf059159e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 3 deletions

View file

@ -43,7 +43,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)
b := createEmptyBlock(t, tmpdir)
b := createEmptyBlock(t, tmpdir, &BlockMeta{Version: 2})
testutil.Equals(t, false, b.meta.Compaction.Failed)
testutil.Ok(t, b.setCompactionFailed())
@ -55,10 +55,11 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Equals(t, true, b.meta.Compaction.Failed)
}
func createEmptyBlock(t *testing.T, dir string) *Block {
// createEmpty block creates a block with the given meta but without any data.
func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block {
testutil.Ok(t, os.MkdirAll(dir, 0777))
testutil.Ok(t, writeMetaFile(dir, &BlockMeta{Version: 2}))
testutil.Ok(t, writeMetaFile(dir, meta))
ir, err := index.NewWriter(filepath.Join(dir, indexFilename))
testutil.Ok(t, err)

3
db.go
View file

@ -522,6 +522,9 @@ func (db *DB) reload(deleteable ...string) (err error) {
blocks = append(blocks, b)
exist[meta.ULID] = struct{}{}
}
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
})
if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence")

View file

@ -18,9 +18,11 @@ import (
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"testing"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
@ -63,6 +65,31 @@ func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sam
return result
}
// Ensure that blocks are held in memory in their time order
// and not in ULID order as they are read from the directory.
func TestDB_reloadOrder(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()
metas := []*BlockMeta{
{ULID: ulid.MustNew(100, nil), MinTime: 90, MaxTime: 100},
{ULID: ulid.MustNew(200, nil), MinTime: 70, MaxTime: 80},
{ULID: ulid.MustNew(300, nil), MinTime: 100, MaxTime: 110},
}
for _, m := range metas {
bdir := filepath.Join(db.Dir(), m.ULID.String())
createEmptyBlock(t, bdir, m)
}
testutil.Ok(t, db.reload())
blocks := db.Blocks()
testutil.Equals(t, 3, len(blocks))
testutil.Equals(t, *metas[1], blocks[0].Meta())
testutil.Equals(t, *metas[0], blocks[1].Meta())
testutil.Equals(t, *metas[2], blocks[2].Meta())
}
func TestDataAvailableOnlyAfterCommit(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()