mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 13:44:05 -08:00
Replace per-file locking with single PID lock file
File locks have a multitude of problems that make them hard to use correctly. As they are just advisory, they are only meaningful to prevent accidents like running the same process twice. A simple PID file lock works reliably in those cases and is simpler.
This commit is contained in:
parent
c808928b90
commit
f734773214
9
block.go
9
block.go
|
@ -7,7 +7,6 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -175,14 +174,12 @@ func indexFileName(path string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
type mmapFile struct {
|
type mmapFile struct {
|
||||||
f *fileutil.LockedFile
|
f *os.File
|
||||||
b []byte
|
b []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func openMmapFile(path string) (*mmapFile, error) {
|
func openMmapFile(path string) (*mmapFile, error) {
|
||||||
// We have to open the file in RDWR for the lock to work with fileutil.
|
f, err := os.Open(path)
|
||||||
// TODO(fabxc): use own flock call that supports multi-reader.
|
|
||||||
f, err := fileutil.TryLockFile(path, os.O_RDWR, 0666)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "try lock file")
|
return nil, errors.Wrap(err, "try lock file")
|
||||||
}
|
}
|
||||||
|
@ -191,7 +188,7 @@ func openMmapFile(path string) (*mmapFile, error) {
|
||||||
return nil, errors.Wrap(err, "stat")
|
return nil, errors.Wrap(err, "stat")
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := mmap(f.File, int(info.Size()))
|
b, err := mmap(f, int(info.Size()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "mmap")
|
return nil, errors.Wrap(err, "mmap")
|
||||||
}
|
}
|
||||||
|
|
|
@ -167,11 +167,11 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
chunkf, err := fileutil.LockFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666)
|
chunkf, err := os.OpenFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "create chunk file")
|
return errors.Wrap(err, "create chunk file")
|
||||||
}
|
}
|
||||||
indexf, err := fileutil.LockFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666)
|
indexf, err := os.OpenFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "create index file")
|
return errors.Wrap(err, "create index file")
|
||||||
}
|
}
|
||||||
|
@ -189,10 +189,10 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
|
||||||
if err = indexw.Close(); err != nil {
|
if err = indexw.Close(); err != nil {
|
||||||
return errors.Wrap(err, "close index writer")
|
return errors.Wrap(err, "close index writer")
|
||||||
}
|
}
|
||||||
if err = fileutil.Fsync(chunkf.File); err != nil {
|
if err = fileutil.Fsync(chunkf); err != nil {
|
||||||
return errors.Wrap(err, "fsync chunk file")
|
return errors.Wrap(err, "fsync chunk file")
|
||||||
}
|
}
|
||||||
if err = fileutil.Fsync(indexf.File); err != nil {
|
if err = fileutil.Fsync(indexf); err != nil {
|
||||||
return errors.Wrap(err, "fsync index file")
|
return errors.Wrap(err, "fsync index file")
|
||||||
}
|
}
|
||||||
if err = chunkf.Close(); err != nil {
|
if err = chunkf.Close(); err != nil {
|
||||||
|
|
24
db.go
24
db.go
|
@ -21,6 +21,7 @@ import (
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/fabxc/tsdb/labels"
|
"github.com/fabxc/tsdb/labels"
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/nightlyone/lockfile"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
@ -86,6 +87,7 @@ const sep = '\xff'
|
||||||
// a hashed partition of a seriedb.
|
// a hashed partition of a seriedb.
|
||||||
type DB struct {
|
type DB struct {
|
||||||
dir string
|
dir string
|
||||||
|
lockf lockfile.Lockfile
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
metrics *dbMetrics
|
metrics *dbMetrics
|
||||||
opts *Options
|
opts *Options
|
||||||
|
@ -130,11 +132,22 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics {
|
||||||
|
|
||||||
// Open returns a new DB in the given directory.
|
// Open returns a new DB in the given directory.
|
||||||
func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
||||||
if !fileutil.Exist(dir) {
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
return nil, err
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
absdir, err := filepath.Abs(dir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
lockf, err := lockfile.New(filepath.Join(absdir, "lock"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := lockf.TryLock(); err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "open DB in %s", dir)
|
||||||
|
}
|
||||||
|
|
||||||
// var r prometheus.Registerer
|
// var r prometheus.Registerer
|
||||||
r := prometheus.DefaultRegisterer
|
r := prometheus.DefaultRegisterer
|
||||||
|
|
||||||
|
@ -147,6 +160,7 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
||||||
|
|
||||||
db = &DB{
|
db = &DB{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
|
lockf: lockf,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
metrics: newDBMetrics(r),
|
metrics: newDBMetrics(r),
|
||||||
opts: opts,
|
opts: opts,
|
||||||
|
@ -374,6 +388,8 @@ func (db *DB) Close() error {
|
||||||
merr.Add(hb.Close())
|
merr.Add(hb.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
merr.Add(db.lockf.Unlock())
|
||||||
|
|
||||||
return merr.Err()
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
18
wal.go
18
wal.go
|
@ -42,7 +42,7 @@ type WAL struct {
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
|
|
||||||
dirFile *os.File
|
dirFile *os.File
|
||||||
files []*fileutil.LockedFile
|
files []*os.File
|
||||||
|
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
flushInterval time.Duration
|
flushInterval time.Duration
|
||||||
|
@ -128,19 +128,19 @@ func (w *WAL) initSegments() error {
|
||||||
}
|
}
|
||||||
if len(fns) > 1 {
|
if len(fns) > 1 {
|
||||||
for _, fn := range fns[:len(fns)-1] {
|
for _, fn := range fns[:len(fns)-1] {
|
||||||
lf, err := fileutil.TryLockFile(fn, os.O_RDONLY, 0666)
|
f, err := os.Open(fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.files = append(w.files, lf)
|
w.files = append(w.files, f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// The most recent WAL file is the one we have to keep appending to.
|
// The most recent WAL file is the one we have to keep appending to.
|
||||||
lf, err := fileutil.TryLockFile(fns[len(fns)-1], os.O_RDWR, 0666)
|
f, err := os.OpenFile(fns[len(fns)-1], os.O_RDWR, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.files = append(w.files, lf)
|
w.files = append(w.files, f)
|
||||||
|
|
||||||
// Consume and validate meta headers.
|
// Consume and validate meta headers.
|
||||||
for _, f := range w.files {
|
for _, f := range w.files {
|
||||||
|
@ -187,11 +187,11 @@ func (w *WAL) cut() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
f, err := fileutil.LockFile(p, os.O_RDWR|os.O_CREATE, 0666)
|
f, err := os.Create(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = fileutil.Preallocate(f.File, w.segmentSize, true); err != nil {
|
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = w.dirFile.Sync(); err != nil {
|
if err = w.dirFile.Sync(); err != nil {
|
||||||
|
@ -214,7 +214,7 @@ func (w *WAL) cut() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) tail() *fileutil.LockedFile {
|
func (w *WAL) tail() *os.File {
|
||||||
if len(w.files) == 0 {
|
if len(w.files) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -235,7 +235,7 @@ func (w *WAL) sync() error {
|
||||||
if err := w.cur.Flush(); err != nil {
|
if err := w.cur.Flush(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return fileutil.Fdatasync(w.tail().File)
|
return fileutil.Fdatasync(w.tail())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) run(interval time.Duration) {
|
func (w *WAL) run(interval time.Duration) {
|
||||||
|
|
Loading…
Reference in a new issue