Ensure correct block order on reload

Due to a regression blocks were no longer ordered by time before
being stored in memory. This made data intermittently become unavailable
for queries.

Signed-off-by: Fabian Reinartz <freinartz@google.com>
This commit is contained in:
Fabian Reinartz 2018-05-28 16:00:36 -04:00
parent 2aae939d6c
commit 76c1b2cdb6
3 changed files with 34 additions and 3 deletions

View file

@ -43,7 +43,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
b := createEmptyBlock(t, tmpdir) b := createEmptyBlock(t, tmpdir, &BlockMeta{Version: 2})
testutil.Equals(t, false, b.meta.Compaction.Failed) testutil.Equals(t, false, b.meta.Compaction.Failed)
testutil.Ok(t, b.setCompactionFailed()) testutil.Ok(t, b.setCompactionFailed())
@ -55,10 +55,11 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Equals(t, true, b.meta.Compaction.Failed) testutil.Equals(t, true, b.meta.Compaction.Failed)
} }
func createEmptyBlock(t *testing.T, dir string) *Block { // createEmpty block creates a block with the given meta but without any data.
func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block {
testutil.Ok(t, os.MkdirAll(dir, 0777)) testutil.Ok(t, os.MkdirAll(dir, 0777))
testutil.Ok(t, writeMetaFile(dir, &BlockMeta{Version: 2})) testutil.Ok(t, writeMetaFile(dir, meta))
ir, err := index.NewWriter(filepath.Join(dir, indexFilename)) ir, err := index.NewWriter(filepath.Join(dir, indexFilename))
testutil.Ok(t, err) testutil.Ok(t, err)

3
db.go
View file

@ -522,6 +522,9 @@ func (db *DB) reload(deleteable ...string) (err error) {
blocks = append(blocks, b) blocks = append(blocks, b)
exist[meta.ULID] = struct{}{} exist[meta.ULID] = struct{}{}
} }
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
})
if err := validateBlockSequence(blocks); err != nil { if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence") return errors.Wrap(err, "invalid block sequence")

View file

@ -18,9 +18,11 @@ import (
"math" "math"
"math/rand" "math/rand"
"os" "os"
"path/filepath"
"sort" "sort"
"testing" "testing"
"github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
@ -63,6 +65,31 @@ func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sam
return result return result
} }
// Ensure that blocks are held in memory in their time order
// and not in ULID order as they are read from the directory.
func TestDB_reloadOrder(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()
metas := []*BlockMeta{
{ULID: ulid.MustNew(100, nil), MinTime: 90, MaxTime: 100},
{ULID: ulid.MustNew(200, nil), MinTime: 70, MaxTime: 80},
{ULID: ulid.MustNew(300, nil), MinTime: 100, MaxTime: 110},
}
for _, m := range metas {
bdir := filepath.Join(db.Dir(), m.ULID.String())
createEmptyBlock(t, bdir, m)
}
testutil.Ok(t, db.reload())
blocks := db.Blocks()
testutil.Equals(t, 3, len(blocks))
testutil.Equals(t, *metas[1], blocks[0].Meta())
testutil.Equals(t, *metas[0], blocks[1].Meta())
testutil.Equals(t, *metas[2], blocks[2].Meta())
}
func TestDataAvailableOnlyAfterCommit(t *testing.T) { func TestDataAvailableOnlyAfterCommit(t *testing.T) {
db, close := openTestDB(t, nil) db, close := openTestDB(t, nil)
defer close() defer close()