diff --git a/storage/local/flock/flock.go b/storage/local/flock/flock.go new file mode 100644 index 000000000..51e3469a9 --- /dev/null +++ b/storage/local/flock/flock.go @@ -0,0 +1,33 @@ +// Package flock provides portable file locking. It is essentially ripped out +// from the code of github.com/syndtr/goleveldb. Strange enough that the +// standard library does not provide this functionality. Once this package has +// proven to work as expected, we should probably turn it into a separate +// general purpose package for humanity. +package flock + +import ( + "os" + "path/filepath" +) + +// Releaser provides the Release method to release a file lock. +type Releaser interface { + Release() error +} + +// New locks the file with the provided name. If the file does not exist, it is +// created. The returned Releaser is used to release the lock. existed is true +// if the file to lock already existed. A non-nil error is returned if the +// locking has failed. Neither this function nor the returned Releaser is +// goroutine-safe. +func New(fileName string) (r Releaser, existed bool, err error) { + if err = os.MkdirAll(filepath.Dir(fileName), 0755); err != nil { + return + } + + _, err = os.Stat(fileName) + existed = err == nil + + r, err = newLock(fileName) + return +} diff --git a/storage/local/flock/flock_plan9.go b/storage/local/flock/flock_plan9.go new file mode 100644 index 000000000..362911bec --- /dev/null +++ b/storage/local/flock/flock_plan9.go @@ -0,0 +1,19 @@ +package flock + +import "os" + +type plan9Lock struct { + f *os.File +} + +func (l *plan9Lock) Release() error { + return l.f.Close() +} + +func newLock(fileName string) (Releaser, error) { + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, os.ModeExclusive|0644) + if err != nil { + return nil, err + } + return &plan9Lock{f}, nil +} diff --git a/storage/local/flock/flock_solaris.go b/storage/local/flock/flock_solaris.go new file mode 100644 index 000000000..462daea06 --- /dev/null +++ b/storage/local/flock/flock_solaris.go @@ -0,0 +1,46 @@ +// +build solaris + +package flock + +import ( + "os" + "syscall" +) + +type unixLock struct { + f *os.File +} + +func (l *unixLock) Release() error { + if err := l.set(false); err != nil { + return err + } + return l.f.Close() +} + +func (l *unixLock) set(lock bool) error { + flock := syscall.Flock_t{ + Type: syscall.F_UNLCK, + Start: 0, + Len: 0, + Whence: 1, + } + if lock { + flock.Type = syscall.F_WRLCK + } + return syscall.FcntlFlock(l.f.Fd(), syscall.F_SETLK, &flock) +} + +func newLock(fileName string) (Releaser, error) { + f, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + l := &unixLock{f} + err = l.set(true) + if err != nil { + f.Close() + return nil, err + } + return l, nil +} diff --git a/storage/local/flock/flock_test.go b/storage/local/flock/flock_test.go new file mode 100644 index 000000000..d50437a66 --- /dev/null +++ b/storage/local/flock/flock_test.go @@ -0,0 +1,67 @@ +package flock + +import ( + "os" + "path/filepath" + "testing" + + "github.com/prometheus/prometheus/utility/test" +) + +func TestLocking(t *testing.T) { + dir := test.NewTemporaryDirectory("test_flock", t) + defer dir.Close() + + fileName := filepath.Join(dir.Path(), "LOCK") + + if _, err := os.Stat(fileName); err == nil { + t.Fatalf("File %q unexpectedly exists.", fileName) + } + + lock, existed, err := New(fileName) + if err != nil { + t.Fatalf("Error locking file %q: %s", fileName, err) + } + if existed { + t.Errorf("File %q reported as existing during locking.", fileName) + } + + // File must now exist. + if _, err := os.Stat(fileName); err != nil { + t.Errorf("Could not stat file %q expected to exist: %s", fileName, err) + } + + // Try to lock again. + lockedAgain, existed, err := New(fileName) + if err == nil { + t.Fatalf("File %q locked twice.", fileName) + } + if lockedAgain != nil { + t.Error("Unsuccessful locking did not return nil.") + } + if !existed { + t.Errorf("Existing file %q not recognized.", fileName) + } + + if err := lock.Release(); err != nil { + t.Errorf("Error releasing lock for file %q: %s", fileName, err) + } + + // File must still exist. + if _, err := os.Stat(fileName); err != nil { + t.Errorf("Could not stat file %q expected to exist: %s", fileName, err) + } + + // Lock existing file. + lock, existed, err = New(fileName) + if err != nil { + t.Fatalf("Error locking file %q: %s", fileName, err) + } + if !existed { + t.Errorf("Existing file %q not recognized.", fileName) + } + + if err := lock.Release(); err != nil { + t.Errorf("Error releasing lock for file %q: %s", fileName, err) + } +} diff --git a/storage/local/flock/flock_unix.go b/storage/local/flock/flock_unix.go new file mode 100644 index 000000000..2bfb48fbc --- /dev/null +++ b/storage/local/flock/flock_unix.go @@ -0,0 +1,41 @@ +// +build darwin dragonfly freebsd linux netbsd openbsd + +package flock + +import ( + "os" + "syscall" +) + +type unixLock struct { + f *os.File +} + +func (l *unixLock) Release() error { + if err := l.set(false); err != nil { + return err + } + return l.f.Close() +} + +func (l *unixLock) set(lock bool) error { + how := syscall.LOCK_UN + if lock { + how = syscall.LOCK_EX + } + return syscall.Flock(int(l.f.Fd()), how|syscall.LOCK_NB) +} + +func newLock(fileName string) (Releaser, error) { + f, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + l := &unixLock{f} + err = l.set(true) + if err != nil { + f.Close() + return nil, err + } + return l, nil +} diff --git a/storage/local/flock/flock_windows.go b/storage/local/flock/flock_windows.go new file mode 100644 index 000000000..66aa4de2c --- /dev/null +++ b/storage/local/flock/flock_windows.go @@ -0,0 +1,23 @@ +package flock + +import "syscall" + +type windowsLock struct { + fd syscall.Handle +} + +func (fl *windowsLock) Release() error { + return syscall.Close(fl.fd) +} + +func newLock(fileName string) (Releaser, error) { + pathp, err := syscall.UTF16PtrFromString(fileName) + if err != nil { + return nil, err + } + fd, err := syscall.CreateFile(pathp, syscall.GENERIC_READ|syscall.GENERIC_WRITE, 0, nil, syscall.CREATE_ALWAYS, syscall.FILE_ATTRIBUTE_NORMAL, 0) + if err != nil { + return nil, err + } + return &windowsFileLock{fd}, nil +} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index f4557d89b..6a2d2532e 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -21,6 +21,7 @@ import ( "math" "os" "path" + "path/filepath" "strings" "sync" "sync/atomic" @@ -32,6 +33,7 @@ import ( clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/storage/local/codable" + "github.com/prometheus/prometheus/storage/local/flock" "github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/metric" ) @@ -106,9 +108,11 @@ type persistence struct { indexingBatchLatency prometheus.Summary checkpointDuration prometheus.Gauge - dirtyMtx sync.Mutex // Protects dirty and becameDirty. - dirty bool // true if persistence was started in dirty state. - becameDirty bool // true if an inconsistency came up during runtime. + dirtyMtx sync.Mutex // Protects dirty and becameDirty. + dirty bool // true if persistence was started in dirty state. + becameDirty bool // true if an inconsistency came up during runtime. + dirtyFileName string // The file used for locking and to mark dirty state. + fLock flock.Releaser // The file lock to protect against concurrent usage. } // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. @@ -116,6 +120,17 @@ func newPersistence(basePath string, chunkLen int, dirty bool) (*persistence, er if err := os.MkdirAll(basePath, 0700); err != nil { return nil, err } + dirtyPath := filepath.Join(basePath, dirtyFileName) + + fLock, dirtyfileExisted, err := flock.New(dirtyPath) + if err != nil { + glog.Errorf("Could not lock %s, Prometheus already running?", dirtyPath) + return nil, err + } + if dirtyfileExisted { + dirty = true + } + archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath) if err != nil { return nil, err @@ -173,14 +188,9 @@ func newPersistence(basePath string, chunkLen int, dirty bool) (*persistence, er Name: "checkpoint_duration_milliseconds", Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.", }), - dirty: dirty, - } - if dirtyFile, err := os.OpenFile(p.dirtyFileName(), os.O_CREATE|os.O_EXCL, 0666); err == nil { - dirtyFile.Close() - } else if os.IsExist(err) { - p.dirty = true - } else { - return nil, err + dirty: dirty, + dirtyFileName: dirtyPath, + fLock: fLock, } if p.dirty { @@ -227,12 +237,6 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) { ch <- p.checkpointDuration } -// dirtyFileName returns the name of the (empty) file used to mark the -// persistency layer as dirty. -func (p *persistence) dirtyFileName() string { - return path.Join(p.basePath, dirtyFileName) -} - // isDirty returns the dirty flag in a goroutine-safe way. func (p *persistence) isDirty() bool { p.dirtyMtx.Lock() @@ -1344,7 +1348,7 @@ func (p *persistence) close() error { close(p.indexingQueue) <-p.indexingStopped - var lastError error + var lastError, dirtyFileRemoveError error if err := p.archivedFingerprintToMetrics.Close(); err != nil { lastError = err glog.Error("Error closing archivedFingerprintToMetric index DB: ", err) @@ -1362,7 +1366,16 @@ func (p *persistence) close() error { glog.Error("Error closing labelNameToLabelValues index DB: ", err) } if lastError == nil && !p.isDirty() { - lastError = os.Remove(p.dirtyFileName()) + dirtyFileRemoveError = os.Remove(p.dirtyFileName) + } + if err := p.fLock.Release(); err != nil { + lastError = err + glog.Error("Error releasing file lock: ", err) + } + if dirtyFileRemoveError != nil { + // On Windows, removing the dirty file before unlocking is not + // possible. So remove it here if it failed above. + lastError = os.Remove(p.dirtyFileName) } return lastError }