diff --git a/block.go b/block.go index 35548f62e..db63e14f2 100644 --- a/block.go +++ b/block.go @@ -7,7 +7,6 @@ import ( "path/filepath" "sort" - "github.com/coreos/etcd/pkg/fileutil" "github.com/pkg/errors" ) @@ -175,14 +174,12 @@ func indexFileName(path string) string { } type mmapFile struct { - f *fileutil.LockedFile + f *os.File b []byte } func openMmapFile(path string) (*mmapFile, error) { - // We have to open the file in RDWR for the lock to work with fileutil. - // TODO(fabxc): use own flock call that supports multi-reader. - f, err := fileutil.TryLockFile(path, os.O_RDWR, 0666) + f, err := os.Open(path) if err != nil { return nil, errors.Wrap(err, "try lock file") } @@ -191,7 +188,7 @@ func openMmapFile(path string) (*mmapFile, error) { return nil, errors.Wrap(err, "stat") } - b, err := mmap(f.File, int(info.Size())) + b, err := mmap(f, int(info.Size())) if err != nil { return nil, errors.Wrap(err, "mmap") } diff --git a/compact.go b/compact.go index 73018ca20..768bc460d 100644 --- a/compact.go +++ b/compact.go @@ -167,11 +167,11 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { return err } - chunkf, err := fileutil.LockFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) + chunkf, err := os.OpenFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) if err != nil { return errors.Wrap(err, "create chunk file") } - indexf, err := fileutil.LockFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) + indexf, err := os.OpenFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) if err != nil { return errors.Wrap(err, "create index file") } @@ -189,10 +189,10 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { if err = indexw.Close(); err != nil { return errors.Wrap(err, "close index writer") } - if err = fileutil.Fsync(chunkf.File); err != nil { + if err = fileutil.Fsync(chunkf); err != nil { return errors.Wrap(err, "fsync chunk file") } - if err = fileutil.Fsync(indexf.File); err != nil { + if err = fileutil.Fsync(indexf); err != nil { return errors.Wrap(err, "fsync index file") } if err = chunkf.Close(); err != nil { diff --git a/db.go b/db.go index 43b8b80d2..27c19b65a 100644 --- a/db.go +++ b/db.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" + "github.com/nightlyone/lockfile" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -86,6 +87,7 @@ const sep = '\xff' // a hashed partition of a seriedb. type DB struct { dir string + lockf lockfile.Lockfile logger log.Logger metrics *dbMetrics opts *Options @@ -130,11 +132,22 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { // Open returns a new DB in the given directory. func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { - if !fileutil.Exist(dir) { - if err := os.MkdirAll(dir, 0777); err != nil { - return nil, err - } + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, err } + + absdir, err := filepath.Abs(dir) + if err != nil { + return nil, err + } + lockf, err := lockfile.New(filepath.Join(absdir, "lock")) + if err != nil { + return nil, err + } + if err := lockf.TryLock(); err != nil { + return nil, errors.Wrapf(err, "open DB in %s", dir) + } + // var r prometheus.Registerer r := prometheus.DefaultRegisterer @@ -147,6 +160,7 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { db = &DB{ dir: dir, + lockf: lockf, logger: logger, metrics: newDBMetrics(r), opts: opts, @@ -374,6 +388,8 @@ func (db *DB) Close() error { merr.Add(hb.Close()) } + merr.Add(db.lockf.Unlock()) + return merr.Err() } diff --git a/wal.go b/wal.go index 33793d782..a923f5b3d 100644 --- a/wal.go +++ b/wal.go @@ -42,7 +42,7 @@ type WAL struct { mtx sync.Mutex dirFile *os.File - files []*fileutil.LockedFile + files []*os.File logger log.Logger flushInterval time.Duration @@ -128,19 +128,19 @@ func (w *WAL) initSegments() error { } if len(fns) > 1 { for _, fn := range fns[:len(fns)-1] { - lf, err := fileutil.TryLockFile(fn, os.O_RDONLY, 0666) + f, err := os.Open(fn) if err != nil { return err } - w.files = append(w.files, lf) + w.files = append(w.files, f) } } // The most recent WAL file is the one we have to keep appending to. - lf, err := fileutil.TryLockFile(fns[len(fns)-1], os.O_RDWR, 0666) + f, err := os.OpenFile(fns[len(fns)-1], os.O_RDWR, 0666) if err != nil { return err } - w.files = append(w.files, lf) + w.files = append(w.files, f) // Consume and validate meta headers. for _, f := range w.files { @@ -187,11 +187,11 @@ func (w *WAL) cut() error { if err != nil { return err } - f, err := fileutil.LockFile(p, os.O_RDWR|os.O_CREATE, 0666) + f, err := os.Create(p) if err != nil { return err } - if err = fileutil.Preallocate(f.File, w.segmentSize, true); err != nil { + if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { return err } if err = w.dirFile.Sync(); err != nil { @@ -214,7 +214,7 @@ func (w *WAL) cut() error { return nil } -func (w *WAL) tail() *fileutil.LockedFile { +func (w *WAL) tail() *os.File { if len(w.files) == 0 { return nil } @@ -235,7 +235,7 @@ func (w *WAL) sync() error { if err := w.cur.Flush(); err != nil { return err } - return fileutil.Fdatasync(w.tail().File) + return fileutil.Fdatasync(w.tail()) } func (w *WAL) run(interval time.Duration) {