mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 14:09:41 -08:00
Merge pull request #512 from krasi-georgiev/delete-compact-block-on-reload-error
Delete compact block on reload error
This commit is contained in:
commit
6fce018def
20
block.go
20
block.go
|
@ -16,6 +16,7 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -269,10 +270,19 @@ type Block struct {
|
||||||
|
|
||||||
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
|
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
|
||||||
// to instantiate chunk structs.
|
// to instantiate chunk structs.
|
||||||
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) {
|
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.NewNopLogger()
|
logger = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
var closers []io.Closer
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
var merr MultiError
|
||||||
|
merr.Add(err)
|
||||||
|
merr.Add(closeAll(closers))
|
||||||
|
err = merr.Err()
|
||||||
|
}
|
||||||
|
}()
|
||||||
meta, err := readMetaFile(dir)
|
meta, err := readMetaFile(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -282,15 +292,19 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ir, err := index.NewFileReader(filepath.Join(dir, "index"))
|
closers = append(closers, cr)
|
||||||
|
|
||||||
|
ir, err := index.NewFileReader(filepath.Join(dir, indexFilename))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
closers = append(closers, ir)
|
||||||
|
|
||||||
tr, tsr, err := readTombstones(dir)
|
tr, tsr, err := readTombstones(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
closers = append(closers, tr)
|
||||||
|
|
||||||
// TODO refactor to set this at block creation time as
|
// TODO refactor to set this at block creation time as
|
||||||
// that would be the logical place for a block size to be calculated.
|
// that would be the logical place for a block size to be calculated.
|
||||||
|
@ -301,7 +315,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error
|
||||||
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
|
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pb := &Block{
|
pb = &Block{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
meta: *meta,
|
meta: *meta,
|
||||||
chunkr: cr,
|
chunkr: cr,
|
||||||
|
|
|
@ -342,7 +342,7 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Reader) Close() error {
|
func (s *Reader) Close() error {
|
||||||
return closeAll(s.cs...)
|
return closeAll(s.cs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the size of the chunks.
|
// Size returns the size of the chunks.
|
||||||
|
@ -410,7 +410,7 @@ func sequenceFiles(dir string) ([]string, error) {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func closeAll(cs ...io.Closer) (err error) {
|
func closeAll(cs []io.Closer) (err error) {
|
||||||
for _, c := range cs {
|
for _, c := range cs {
|
||||||
if e := c.Close(); e != nil {
|
if e := c.Close(); e != nil {
|
||||||
err = e
|
err = e
|
||||||
|
|
|
@ -582,7 +582,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
|
|
||||||
// populateBlock fills the index and chunk writers with new data gathered as the union
|
// populateBlock fills the index and chunk writers with new data gathered as the union
|
||||||
// of the provided blocks. It returns meta information for the new block.
|
// of the provided blocks. It returns meta information for the new block.
|
||||||
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
|
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
|
||||||
if len(blocks) == 0 {
|
if len(blocks) == 0 {
|
||||||
return errors.New("cannot populate block from no readers")
|
return errors.New("cannot populate block from no readers")
|
||||||
}
|
}
|
||||||
|
@ -592,7 +592,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
allSymbols = make(map[string]struct{}, 1<<16)
|
allSymbols = make(map[string]struct{}, 1<<16)
|
||||||
closers = []io.Closer{}
|
closers = []io.Closer{}
|
||||||
)
|
)
|
||||||
defer func() { closeAll(closers...) }()
|
defer func() {
|
||||||
|
var merr MultiError
|
||||||
|
merr.Add(err)
|
||||||
|
merr.Add(closeAll(closers))
|
||||||
|
err = merr.Err()
|
||||||
|
}()
|
||||||
|
|
||||||
for i, b := range blocks {
|
for i, b := range blocks {
|
||||||
indexr, err := b.Index()
|
indexr, err := b.Index()
|
||||||
|
|
|
@ -17,12 +17,14 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
"github.com/prometheus/tsdb/chunks"
|
"github.com/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
|
@ -691,9 +693,11 @@ func TestCompaction_populateBlock(t *testing.T) {
|
||||||
// This is needed for unit tests that rely on
|
// This is needed for unit tests that rely on
|
||||||
// checking state before and after a compaction.
|
// checking state before and after a compaction.
|
||||||
func TestDisableAutoCompactions(t *testing.T) {
|
func TestDisableAutoCompactions(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
blockRange := DefaultOptions.BlockRanges[0]
|
blockRange := DefaultOptions.BlockRanges[0]
|
||||||
label := labels.FromStrings("foo", "bar")
|
label := labels.FromStrings("foo", "bar")
|
||||||
|
@ -741,3 +745,77 @@ func TestDisableAutoCompactions(t *testing.T) {
|
||||||
}
|
}
|
||||||
testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.")
|
testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload immediately after a compaction
|
||||||
|
// deletes the resulting block to avoid creatings blocks with the same time range.
|
||||||
|
func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
|
||||||
|
|
||||||
|
tests := map[string]func(*DB) int{
|
||||||
|
"Test Head Compaction": func(db *DB) int {
|
||||||
|
rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1
|
||||||
|
defaultLabel := labels.FromStrings("foo", "bar")
|
||||||
|
|
||||||
|
// Add some data to the head that is enough to trigger a compaction.
|
||||||
|
app := db.Appender()
|
||||||
|
_, err := app.Add(defaultLabel, 1, 0)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
_, err = app.Add(defaultLabel, 2, 0)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
_, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Ok(t, app.Commit())
|
||||||
|
|
||||||
|
return 0
|
||||||
|
},
|
||||||
|
"Test Block Compaction": func(db *DB) int {
|
||||||
|
blocks := []*BlockMeta{
|
||||||
|
{MinTime: 0, MaxTime: 100},
|
||||||
|
{MinTime: 100, MaxTime: 150},
|
||||||
|
{MinTime: 150, MaxTime: 200},
|
||||||
|
}
|
||||||
|
for _, m := range blocks {
|
||||||
|
createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
|
||||||
|
}
|
||||||
|
testutil.Ok(t, db.reload())
|
||||||
|
testutil.Equals(t, len(blocks), len(db.Blocks()), "unexpected block count after a reload")
|
||||||
|
|
||||||
|
return len(blocks)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for title, bootStrap := range tests {
|
||||||
|
t.Run(title, func(t *testing.T) {
|
||||||
|
db, delete := openTestDB(t, &Options{
|
||||||
|
BlockRanges: []int64{1, 100},
|
||||||
|
})
|
||||||
|
defer func() {
|
||||||
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
db.DisableCompactions()
|
||||||
|
|
||||||
|
expBlocks := bootStrap(db)
|
||||||
|
|
||||||
|
// Create a block that will trigger the reload to fail.
|
||||||
|
blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300))
|
||||||
|
lastBlockIndex := path.Join(blockPath, indexFilename)
|
||||||
|
actBlocks, err := blockDirs(db.Dir())
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Equals(t, expBlocks, len(actBlocks)-1) // -1 to exclude the corrupted block.
|
||||||
|
testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file.
|
||||||
|
|
||||||
|
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch")
|
||||||
|
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch")
|
||||||
|
|
||||||
|
// Do the compaction and check the metrics.
|
||||||
|
// Compaction should succeed, but the reload should fail and
|
||||||
|
// the new block created from the compaction should be deleted.
|
||||||
|
testutil.NotOk(t, db.compact())
|
||||||
|
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reload' count metrics mismatch")
|
||||||
|
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch")
|
||||||
|
actBlocks, err = blockDirs(db.Dir())
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Equals(t, expBlocks, len(actBlocks)-1, "block count should be the same as before the compaction") // -1 to exclude the corrupted block.
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
19
db.go
19
db.go
|
@ -425,6 +425,9 @@ func (db *DB) compact() (err error) {
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
|
|
||||||
if err := db.reload(); err != nil {
|
if err := db.reload(); err != nil {
|
||||||
|
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
|
||||||
|
return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid)
|
||||||
|
}
|
||||||
return errors.Wrap(err, "reload blocks")
|
return errors.Wrap(err, "reload blocks")
|
||||||
}
|
}
|
||||||
if (uid == ulid.ULID{}) {
|
if (uid == ulid.ULID{}) {
|
||||||
|
@ -454,12 +457,16 @@ func (db *DB) compact() (err error) {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil {
|
uid, err := db.compactor.Compact(db.dir, plan, db.blocks)
|
||||||
|
if err != nil {
|
||||||
return errors.Wrapf(err, "compact %s", plan)
|
return errors.Wrapf(err, "compact %s", plan)
|
||||||
}
|
}
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
|
|
||||||
if err := db.reload(); err != nil {
|
if err := db.reload(); err != nil {
|
||||||
|
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
|
||||||
|
return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid)
|
||||||
|
}
|
||||||
return errors.Wrap(err, "reload blocks")
|
return errors.Wrap(err, "reload blocks")
|
||||||
}
|
}
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
|
@ -505,7 +512,13 @@ func (db *DB) reload() (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(corrupted) > 0 {
|
if len(corrupted) > 0 {
|
||||||
return errors.Wrap(err, "unexpected corrupted block")
|
// Close all new blocks to release the lock for windows.
|
||||||
|
for _, block := range loadable {
|
||||||
|
if _, loaded := db.getBlock(block.Meta().ULID); !loaded {
|
||||||
|
block.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("unexpected corrupted block:%v", corrupted)
|
||||||
}
|
}
|
||||||
|
|
||||||
// All deletable blocks should not be loaded.
|
// All deletable blocks should not be loaded.
|
||||||
|
@ -1084,7 +1097,7 @@ func (es MultiError) Err() error {
|
||||||
return es
|
return es
|
||||||
}
|
}
|
||||||
|
|
||||||
func closeAll(cs ...io.Closer) error {
|
func closeAll(cs []io.Closer) error {
|
||||||
var merr MultiError
|
var merr MultiError
|
||||||
|
|
||||||
for _, c := range cs {
|
for _, c := range cs {
|
||||||
|
|
196
db_test.go
196
db_test.go
|
@ -46,7 +46,9 @@ func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
// Do not close the test database by default as it will deadlock on test failures.
|
// Do not close the test database by default as it will deadlock on test failures.
|
||||||
return db, func() { os.RemoveAll(tmpdir) }
|
return db, func() {
|
||||||
|
testutil.Ok(t, os.RemoveAll(tmpdir))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// query runs a matcher query against the querier and fully expands its data.
|
// query runs a matcher query against the querier and fully expands its data.
|
||||||
|
@ -78,9 +80,11 @@ func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sam
|
||||||
// Ensure that blocks are held in memory in their time order
|
// Ensure that blocks are held in memory in their time order
|
||||||
// and not in ULID order as they are read from the directory.
|
// and not in ULID order as they are read from the directory.
|
||||||
func TestDB_reloadOrder(t *testing.T) {
|
func TestDB_reloadOrder(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
metas := []BlockMeta{
|
metas := []BlockMeta{
|
||||||
{MinTime: 90, MaxTime: 100},
|
{MinTime: 90, MaxTime: 100},
|
||||||
|
@ -106,9 +110,11 @@ func TestDB_reloadOrder(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDataAvailableOnlyAfterCommit(t *testing.T) {
|
func TestDataAvailableOnlyAfterCommit(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
|
|
||||||
|
@ -135,9 +141,11 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDataNotAvailableAfterRollback(t *testing.T) {
|
func TestDataNotAvailableAfterRollback(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
|
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
|
||||||
|
@ -156,9 +164,11 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDBAppenderAddRef(t *testing.T) {
|
func TestDBAppenderAddRef(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
app1 := db.Appender()
|
app1 := db.Appender()
|
||||||
|
|
||||||
|
@ -241,9 +251,11 @@ func TestDeleteSimple(t *testing.T) {
|
||||||
|
|
||||||
Outer:
|
Outer:
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
|
|
||||||
|
@ -315,9 +327,11 @@ Outer:
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAmendDatapointCausesError(t *testing.T) {
|
func TestAmendDatapointCausesError(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
_, err := app.Add(labels.Labels{}, 0, 0)
|
_, err := app.Add(labels.Labels{}, 0, 0)
|
||||||
|
@ -331,9 +345,11 @@ func TestAmendDatapointCausesError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
_, err := app.Add(labels.Labels{}, 0, math.NaN())
|
_, err := app.Add(labels.Labels{}, 0, math.NaN())
|
||||||
|
@ -346,10 +362,11 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
_, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
|
_, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
@ -361,9 +378,11 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
// Append AmendedValue.
|
// Append AmendedValue.
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
|
@ -405,8 +424,8 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDB_Snapshot(t *testing.T) {
|
func TestDB_Snapshot(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer delete()
|
||||||
|
|
||||||
// append data
|
// append data
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
|
@ -429,11 +448,11 @@ func TestDB_Snapshot(t *testing.T) {
|
||||||
// reopen DB from snapshot
|
// reopen DB from snapshot
|
||||||
db, err = Open(snap, nil, nil, nil)
|
db, err = Open(snap, nil, nil, nil)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer db.Close()
|
defer func() { testutil.Ok(t, db.Close()) }()
|
||||||
|
|
||||||
querier, err := db.Querier(mint, mint+1000)
|
querier, err := db.Querier(mint, mint+1000)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer querier.Close()
|
defer func() { testutil.Ok(t, querier.Close()) }()
|
||||||
|
|
||||||
// sum values
|
// sum values
|
||||||
seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar"))
|
seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar"))
|
||||||
|
@ -455,8 +474,8 @@ func TestDB_Snapshot(t *testing.T) {
|
||||||
func TestDB_SnapshotWithDelete(t *testing.T) {
|
func TestDB_SnapshotWithDelete(t *testing.T) {
|
||||||
numSamples := int64(10)
|
numSamples := int64(10)
|
||||||
|
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer delete()
|
||||||
|
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
|
|
||||||
|
@ -496,12 +515,12 @@ Outer:
|
||||||
// reopen DB from snapshot
|
// reopen DB from snapshot
|
||||||
db, err = Open(snap, nil, nil, nil)
|
db, err = Open(snap, nil, nil, nil)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer db.Close()
|
defer func() { testutil.Ok(t, db.Close()) }()
|
||||||
|
|
||||||
// Compare the result.
|
// Compare the result.
|
||||||
q, err := db.Querier(0, numSamples)
|
q, err := db.Querier(0, numSamples)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer q.Close()
|
defer func() { testutil.Ok(t, q.Close()) }()
|
||||||
|
|
||||||
res, err := q.Select(labels.NewEqualMatcher("a", "b"))
|
res, err := q.Select(labels.NewEqualMatcher("a", "b"))
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
@ -596,9 +615,11 @@ func TestDB_e2e(t *testing.T) {
|
||||||
seriesMap[labels.New(l...).String()] = []sample{}
|
seriesMap[labels.New(l...).String()] = []sample{}
|
||||||
}
|
}
|
||||||
|
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
|
|
||||||
|
@ -699,8 +720,8 @@ func TestDB_e2e(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWALFlushedOnDBClose(t *testing.T) {
|
func TestWALFlushedOnDBClose(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer delete()
|
||||||
|
|
||||||
dirDb := db.Dir()
|
dirDb := db.Dir()
|
||||||
|
|
||||||
|
@ -715,7 +736,7 @@ func TestWALFlushedOnDBClose(t *testing.T) {
|
||||||
|
|
||||||
db, err = Open(dirDb, nil, nil, nil)
|
db, err = Open(dirDb, nil, nil, nil)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer db.Close()
|
defer func() { testutil.Ok(t, db.Close()) }()
|
||||||
|
|
||||||
q, err := db.Querier(0, 1)
|
q, err := db.Querier(0, 1)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
@ -728,8 +749,8 @@ func TestWALFlushedOnDBClose(t *testing.T) {
|
||||||
func TestWALSegmentSizeOption(t *testing.T) {
|
func TestWALSegmentSizeOption(t *testing.T) {
|
||||||
options := *DefaultOptions
|
options := *DefaultOptions
|
||||||
options.WALSegmentSize = 2 * 32 * 1024
|
options.WALSegmentSize = 2 * 32 * 1024
|
||||||
db, close := openTestDB(t, &options)
|
db, delete := openTestDB(t, &options)
|
||||||
defer close()
|
defer delete()
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
for i := int64(0); i < 155; i++ {
|
for i := int64(0); i < 155; i++ {
|
||||||
_, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64())
|
_, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64())
|
||||||
|
@ -754,8 +775,8 @@ func TestWALSegmentSizeOption(t *testing.T) {
|
||||||
func TestTombstoneClean(t *testing.T) {
|
func TestTombstoneClean(t *testing.T) {
|
||||||
numSamples := int64(10)
|
numSamples := int64(10)
|
||||||
|
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer delete()
|
||||||
|
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
|
|
||||||
|
@ -851,9 +872,11 @@ func TestTombstoneClean(t *testing.T) {
|
||||||
// if TombstoneClean leaves any blocks behind these will overlap.
|
// if TombstoneClean leaves any blocks behind these will overlap.
|
||||||
func TestTombstoneCleanFail(t *testing.T) {
|
func TestTombstoneCleanFail(t *testing.T) {
|
||||||
|
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer db.Close()
|
defer func() {
|
||||||
defer close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
var expectedBlockDirs []string
|
var expectedBlockDirs []string
|
||||||
|
|
||||||
|
@ -930,11 +953,13 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTimeRetention(t *testing.T) {
|
func TestTimeRetention(t *testing.T) {
|
||||||
db, close := openTestDB(t, &Options{
|
db, delete := openTestDB(t, &Options{
|
||||||
BlockRanges: []int64{1000},
|
BlockRanges: []int64{1000},
|
||||||
})
|
})
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
blocks := []*BlockMeta{
|
blocks := []*BlockMeta{
|
||||||
{MinTime: 500, MaxTime: 900}, // Oldest block
|
{MinTime: 500, MaxTime: 900}, // Oldest block
|
||||||
|
@ -962,11 +987,13 @@ func TestTimeRetention(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSizeRetention(t *testing.T) {
|
func TestSizeRetention(t *testing.T) {
|
||||||
db, close := openTestDB(t, &Options{
|
db, delete := openTestDB(t, &Options{
|
||||||
BlockRanges: []int64{100},
|
BlockRanges: []int64{100},
|
||||||
})
|
})
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
blocks := []*BlockMeta{
|
blocks := []*BlockMeta{
|
||||||
{MinTime: 100, MaxTime: 200}, // Oldest block
|
{MinTime: 100, MaxTime: 200}, // Oldest block
|
||||||
|
@ -1024,8 +1051,11 @@ func dbDiskSize(dir string) int64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
|
func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
labelpairs := []labels.Labels{
|
labelpairs := []labels.Labels{
|
||||||
labels.FromStrings("a", "abcd", "b", "abcde"),
|
labels.FromStrings("a", "abcd", "b", "abcde"),
|
||||||
|
@ -1083,7 +1113,7 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
|
||||||
|
|
||||||
q, err := db.Querier(0, 10)
|
q, err := db.Querier(0, 10)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer q.Close()
|
defer func() { testutil.Ok(t, q.Close()) }()
|
||||||
|
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
ss, err := q.Select(c.selector...)
|
ss, err := q.Select(c.selector...)
|
||||||
|
@ -1199,9 +1229,11 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
|
||||||
|
|
||||||
// Regression test for https://github.com/prometheus/tsdb/issues/347
|
// Regression test for https://github.com/prometheus/tsdb/issues/347
|
||||||
func TestChunkAtBlockBoundary(t *testing.T) {
|
func TestChunkAtBlockBoundary(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
|
|
||||||
|
@ -1253,9 +1285,11 @@ func TestChunkAtBlockBoundary(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQuerierWithBoundaryChunks(t *testing.T) {
|
func TestQuerierWithBoundaryChunks(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
app := db.Appender()
|
app := db.Appender()
|
||||||
|
|
||||||
|
@ -1390,14 +1424,16 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNoEmptyBlocks(t *testing.T) {
|
func TestNoEmptyBlocks(t *testing.T) {
|
||||||
db, close := openTestDB(t, &Options{
|
db, delete := openTestDB(t, &Options{
|
||||||
BlockRanges: []int64{100},
|
BlockRanges: []int64{100},
|
||||||
})
|
})
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
db.DisableCompactions()
|
db.DisableCompactions()
|
||||||
|
|
||||||
rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1
|
rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1
|
||||||
defaultLabel := labels.FromStrings("foo", "bar")
|
defaultLabel := labels.FromStrings("foo", "bar")
|
||||||
defaultMatcher := labels.NewMustRegexpMatcher("", ".*")
|
defaultMatcher := labels.NewMustRegexpMatcher("", ".*")
|
||||||
|
|
||||||
|
@ -1416,7 +1452,7 @@ func TestNoEmptyBlocks(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
_, err = app.Add(defaultLabel, 2, 0)
|
_, err = app.Add(defaultLabel, 2, 0)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
_, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0)
|
_, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
testutil.Ok(t, app.Commit())
|
testutil.Ok(t, app.Commit())
|
||||||
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
|
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
|
||||||
|
@ -1438,7 +1474,7 @@ func TestNoEmptyBlocks(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
_, err = app.Add(defaultLabel, currentTime+1, 0)
|
_, err = app.Add(defaultLabel, currentTime+1, 0)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0)
|
_, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
testutil.Ok(t, app.Commit())
|
testutil.Ok(t, app.Commit())
|
||||||
|
|
||||||
|
@ -1459,7 +1495,7 @@ func TestNoEmptyBlocks(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
_, err = app.Add(defaultLabel, currentTime+1, 0)
|
_, err = app.Add(defaultLabel, currentTime+1, 0)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0)
|
_, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
testutil.Ok(t, app.Commit())
|
testutil.Ok(t, app.Commit())
|
||||||
testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
|
testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
|
||||||
|
@ -1548,9 +1584,11 @@ func TestDB_LabelNames(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
}
|
}
|
||||||
for _, tst := range tests {
|
for _, tst := range tests {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
appendSamples(db, 0, 4, tst.sampleLabels1)
|
appendSamples(db, 0, 4, tst.sampleLabels1)
|
||||||
|
|
||||||
|
@ -1591,9 +1629,11 @@ func TestDB_LabelNames(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCorrectNumTombstones(t *testing.T) {
|
func TestCorrectNumTombstones(t *testing.T) {
|
||||||
db, close := openTestDB(t, nil)
|
db, delete := openTestDB(t, nil)
|
||||||
defer close()
|
defer func() {
|
||||||
defer db.Close()
|
testutil.Ok(t, db.Close())
|
||||||
|
delete()
|
||||||
|
}()
|
||||||
|
|
||||||
blockRange := DefaultOptions.BlockRanges[0]
|
blockRange := DefaultOptions.BlockRanges[0]
|
||||||
defaultLabel := labels.FromStrings("foo", "bar")
|
defaultLabel := labels.FromStrings("foo", "bar")
|
||||||
|
|
|
@ -45,6 +45,8 @@ const (
|
||||||
FormatV2 = 2
|
FormatV2 = 2
|
||||||
|
|
||||||
labelNameSeperator = "\xff"
|
labelNameSeperator = "\xff"
|
||||||
|
|
||||||
|
indexFilename = "index"
|
||||||
)
|
)
|
||||||
|
|
||||||
type indexWriterSeries struct {
|
type indexWriterSeries struct {
|
||||||
|
|
|
@ -151,7 +151,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
fn := filepath.Join(dir, "index")
|
fn := filepath.Join(dir, indexFilename)
|
||||||
|
|
||||||
// An empty index must still result in a readable file.
|
// An empty index must still result in a readable file.
|
||||||
iw, err := NewWriter(fn)
|
iw, err := NewWriter(fn)
|
||||||
|
@ -177,7 +177,7 @@ func TestIndexRW_Postings(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
fn := filepath.Join(dir, "index")
|
fn := filepath.Join(dir, indexFilename)
|
||||||
|
|
||||||
iw, err := NewWriter(fn)
|
iw, err := NewWriter(fn)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
@ -271,7 +271,7 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
iw, err := NewWriter(filepath.Join(dir, "index"))
|
iw, err := NewWriter(filepath.Join(dir, indexFilename))
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
testutil.Ok(t, iw.AddSymbols(symbols))
|
testutil.Ok(t, iw.AddSymbols(symbols))
|
||||||
|
@ -331,7 +331,7 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||||
err = iw.Close()
|
err = iw.Close()
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
ir, err := NewFileReader(filepath.Join(dir, "index"))
|
ir, err := NewFileReader(filepath.Join(dir, indexFilename))
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
for p := range mi.postings {
|
for p := range mi.postings {
|
||||||
|
|
|
@ -64,7 +64,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return wrapErr(err, d)
|
return wrapErr(err, d)
|
||||||
}
|
}
|
||||||
broken, err := os.Open(filepath.Join(d, "index"))
|
broken, err := os.Open(filepath.Join(d, indexFilename))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return wrapErr(err, d)
|
return wrapErr(err, d)
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
|
||||||
// at a broken revision.
|
// at a broken revision.
|
||||||
//
|
//
|
||||||
// func main() {
|
// func main() {
|
||||||
// w, err := index.NewWriter("index")
|
// w, err := index.NewWriter(indexFilename)
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
// panic(err)
|
// panic(err)
|
||||||
// }
|
// }
|
||||||
|
@ -72,7 +72,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
|
||||||
os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777)
|
os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777)
|
||||||
defer os.RemoveAll(filepath.Join(dbDir, "chunks"))
|
defer os.RemoveAll(filepath.Join(dbDir, "chunks"))
|
||||||
|
|
||||||
r, err := index.NewFileReader(filepath.Join(dbDir, "index"))
|
r, err := index.NewFileReader(filepath.Join(dbDir, indexFilename))
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
p, err := r.Postings("b", "1")
|
p, err := r.Postings("b", "1")
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
@ -95,7 +95,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
db.Close()
|
db.Close()
|
||||||
|
|
||||||
r, err = index.NewFileReader(filepath.Join(tmpDbDir, "index"))
|
r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename))
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
p, err = r.Postings("b", "1")
|
p, err = r.Postings("b", "1")
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
Loading…
Reference in a new issue