Don't use returned DB to close resources on TSDB startup error (#8113)

* Don't use returned DB to close resources on TSDB startup error

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Add unit test and fix another panic

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comment

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2020-10-28 15:39:03 +05:30 committed by GitHub
parent daf382e4a9
commit 3245b3267b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 12 deletions

View file

@ -552,7 +552,7 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
return opts, rngs return opts, rngs
} }
func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs []int64) (db *DB, returnedErr error) { func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs []int64) (_ *DB, returnedErr error) {
if err := os.MkdirAll(dir, 0777); err != nil { if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err return nil, err
} }
@ -583,7 +583,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
return nil, errors.Wrap(err, "remove tmp dirs") return nil, errors.Wrap(err, "remove tmp dirs")
} }
db = &DB{ db := &DB{
dir: dir, dir: dir,
logger: l, logger: l,
opts: opts, opts: opts,
@ -600,6 +600,8 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
return return
} }
close(db.donec) // DB is never run if it was an error, so close this channel here.
var merr tsdb_errors.MultiError var merr tsdb_errors.MultiError
merr.Add(returnedErr) merr.Add(returnedErr)
merr.Add(errors.Wrap(db.Close(), "close DB after failed startup")) merr.Add(errors.Wrap(db.Close(), "close DB after failed startup"))
@ -623,11 +625,12 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
db.lockf = lockf db.lockf = lockf
} }
var err error
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
db.compactor, returnedErr = NewLeveledCompactor(ctx, r, l, rngs, db.chunkPool) db.compactor, err = NewLeveledCompactor(ctx, r, l, rngs, db.chunkPool)
if returnedErr != nil { if err != nil {
cancel() cancel()
return nil, errors.Wrap(returnedErr, "create leveled compactor") return nil, errors.Wrap(err, "create leveled compactor")
} }
db.compactCancel = cancel db.compactCancel = cancel
@ -639,15 +642,15 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
if opts.WALSegmentSize > 0 { if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize segmentSize = opts.WALSegmentSize
} }
wlog, returnedErr = wal.NewSize(l, r, walDir, segmentSize, opts.WALCompression) wlog, err = wal.NewSize(l, r, walDir, segmentSize, opts.WALCompression)
if returnedErr != nil { if err != nil {
return nil, returnedErr return nil, err
} }
} }
db.head, returnedErr = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.StripeSize, opts.SeriesLifecycleCallback) db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.StripeSize, opts.SeriesLifecycleCallback)
if returnedErr != nil { if err != nil {
return nil, returnedErr return nil, err
} }
// Register metrics after assigning the head block. // Register metrics after assigning the head block.
@ -1325,7 +1328,9 @@ func (db *DB) Head() *Head {
// Close the partition. // Close the partition.
func (db *DB) Close() error { func (db *DB) Close() error {
close(db.stopc) close(db.stopc)
if db.compactCancel != nil {
db.compactCancel() db.compactCancel()
}
<-db.donec <-db.donec
db.mtx.Lock() db.mtx.Lock()

View file

@ -2961,3 +2961,22 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 54, cno) assert.Equal(t, 54, cno)
} }
func TestNoPanicOnTSDBOpenError(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test")
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, os.RemoveAll(tmpdir))
})
absdir, err := filepath.Abs(tmpdir)
assert.NoError(t, err)
// Taking the file lock will cause TSDB startup error.
lockf, _, err := fileutil.Flock(filepath.Join(absdir, "lock"))
assert.NoError(t, err)
_, err = Open(tmpdir, nil, nil, DefaultOptions())
assert.Error(t, err)
assert.NoError(t, lockf.Release())
}