diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b325b7efe5..a1636fc9c7 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -333,6 +333,9 @@ func main() { "Maximum age samples may be before being forcibly deleted when the WAL is truncated"). SetValue(&cfg.agent.MaxWALTime) + agentOnlyFlag(a, "storage.agent.no-lockfile", "Do not create lockfile in data directory."). + Default("false").BoolVar(&cfg.agent.NoLockfile) + a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). Default("1m").PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) @@ -1505,6 +1508,7 @@ type agentOptions struct { StripeSize int TruncateFrequency model.Duration MinWALTime, MaxWALTime model.Duration + NoLockfile bool } func (opts agentOptions) ToAgentOptions() agent.Options { @@ -1515,6 +1519,7 @@ func (opts agentOptions) ToAgentOptions() agent.Options { TruncateFrequency: time.Duration(opts.TruncateFrequency), MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)), MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)), + NoLockfile: opts.NoLockfile, } } diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index b862494276..f81f55248b 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -34,7 +34,9 @@ import ( "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wal" ) @@ -66,6 +68,9 @@ type Options struct { // Shortest and longest amount of time data can exist in the WAL before being // deleted. MinWALTime, MaxWALTime int64 + + // NoLockfile disables creation and consideration of a lock file. + NoLockfile bool } // DefaultOptions used for the WAL storage. They are sane for setups using @@ -78,6 +83,7 @@ func DefaultOptions() *Options { TruncateFrequency: DefaultTruncateFrequency, MinWALTime: DefaultMinWALTime, MaxWALTime: DefaultMaxWALTime, + NoLockfile: false, } } @@ -187,7 +193,8 @@ type DB struct { opts *Options rs *remote.Storage - wal *wal.WAL + wal *wal.WAL + locker *tsdbutil.DirLocker appenderPool sync.Pool bufPool sync.Pool @@ -208,6 +215,16 @@ type DB struct { func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir string, opts *Options) (*DB, error) { opts = validateOptions(opts) + locker, err := tsdbutil.NewDirLocker(dir, "agent", l, reg) + if err != nil { + return nil, err + } + if !opts.NoLockfile { + if err := locker.Lock(); err != nil { + return nil, err + } + } + // remote_write expects WAL to be stored in a "wal" subdirectory of the main storage. dir = filepath.Join(dir, "wal") @@ -221,7 +238,8 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin opts: opts, rs: rs, - wal: w, + wal: w, + locker: locker, nextRef: atomic.NewUint64(0), series: newStripeSeries(opts.StripeSize), @@ -641,7 +659,7 @@ func (db *DB) Close() error { db.metrics.Unregister() - return db.wal.Close() + return tsdb_errors.NewMulti(db.locker.Release(), db.wal.Close()).Err() } type appender struct { diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 3ebf4785fa..4a196180db 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wal" + "github.com/prometheus/prometheus/util/testutil" ) func TestUnsupported(t *testing.T) { @@ -80,9 +81,6 @@ func TestCommit(t *testing.T) { s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) require.NoError(t, err) - defer func() { - require.NoError(t, s.Close()) - }() a := s.Appender(context.TODO()) @@ -94,9 +92,11 @@ func TestCommit(t *testing.T) { _, err := a.Append(0, lset, sample[0].T(), sample[0].V()) require.NoError(t, err) } - require.NoError(t, a.Commit()) } + require.NoError(t, a.Commit()) + require.NoError(t, s.Close()) + // Read records from WAL and check for expected count of series and samples. walSeriesCount := 0 walSamplesCount := 0 @@ -176,9 +176,6 @@ func TestRollback(t *testing.T) { s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) require.NoError(t, err) - defer func() { - require.NoError(t, s.Close()) - }() a := s.Appender(context.TODO()) @@ -193,6 +190,7 @@ func TestRollback(t *testing.T) { } require.NoError(t, a.Rollback()) + require.NoError(t, s.Close()) // Read records from WAL and check for expected count of series and samples. walSeriesCount := 0 @@ -380,9 +378,6 @@ func TestWALReplay(t *testing.T) { s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) require.NoError(t, err) - defer func() { - require.NoError(t, s.Close()) - }() a := s.Appender(context.TODO()) @@ -396,22 +391,26 @@ func TestWALReplay(t *testing.T) { } require.NoError(t, a.Commit()) - require.NoError(t, s.Close()) restartOpts := DefaultOptions() restartLogger := log.NewNopLogger() restartReg := prometheus.NewRegistry() - s, err = Open(restartLogger, restartReg, nil, promAgentDir, restartOpts) + // Open a new DB with the same WAL to check that series from the previous DB + // get replayed. + replayDB, err := Open(restartLogger, restartReg, nil, promAgentDir, restartOpts) require.NoError(t, err) + defer func() { + require.NoError(t, replayDB.Close()) + }() // Check if all the series are retrieved back from the WAL. m := gatherFamily(t, restartReg, "prometheus_agent_active_series") require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count") // Check if lastTs of the samples retrieved from the WAL is retained. - metrics := s.series.series + metrics := replayDB.series.series for i := 0; i < len(metrics); i++ { mp := metrics[i] for _, v := range mp { @@ -420,6 +419,28 @@ func TestWALReplay(t *testing.T) { } } +func TestLockfile(t *testing.T) { + tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) { + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil) + t.Cleanup(func() { + require.NoError(t, rs.Close()) + }) + + opts := DefaultOptions() + opts.NoLockfile = !createLock + + // Create the DB. This should create lockfile and its metrics. + db, err := Open(logger, nil, rs, data, opts) + require.NoError(t, err) + + return db.locker, testutil.NewCallbackCloser(func() { + require.NoError(t, db.Close()) + }) + }) +} + func startTime() (int64, error) { return time.Now().Unix() * 1000, nil } diff --git a/tsdb/db.go b/tsdb/db.go index 64e2a2bfb9..df032f006e 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -43,6 +43,7 @@ import ( tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" _ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met. + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wal" ) @@ -59,10 +60,6 @@ const ( tmpForCreationBlockDirSuffix = ".tmp-for-creation" // Pre-2.21 tmp dir suffix, used in clean-up functions. tmpLegacy = ".tmp" - - lockfileDisabled = -1 - lockfileReplaced = 0 - lockfileCreatedCleanly = 1 ) // ErrNotReady is returned if the underlying storage is not ready yet. @@ -162,9 +159,8 @@ type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { - dir string - lockf fileutil.Releaser - lockfPath string + dir string + locker *tsdbutil.DirLocker logger log.Logger metrics *dbMetrics @@ -196,20 +192,19 @@ type DB struct { } type dbMetrics struct { - loadedBlocks prometheus.GaugeFunc - symbolTableSize prometheus.GaugeFunc - reloads prometheus.Counter - reloadsFailed prometheus.Counter - compactionsFailed prometheus.Counter - compactionsTriggered prometheus.Counter - compactionsSkipped prometheus.Counter - sizeRetentionCount prometheus.Counter - timeRetentionCount prometheus.Counter - startTime prometheus.GaugeFunc - tombCleanTimer prometheus.Histogram - blocksBytes prometheus.Gauge - maxBytes prometheus.Gauge - lockfileCreatedCleanly prometheus.Gauge + loadedBlocks prometheus.GaugeFunc + symbolTableSize prometheus.GaugeFunc + reloads prometheus.Counter + reloadsFailed prometheus.Counter + compactionsFailed prometheus.Counter + compactionsTriggered prometheus.Counter + compactionsSkipped prometheus.Counter + sizeRetentionCount prometheus.Counter + timeRetentionCount prometheus.Counter + startTime prometheus.GaugeFunc + tombCleanTimer prometheus.Histogram + blocksBytes prometheus.Gauge + maxBytes prometheus.Gauge } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -287,10 +282,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_size_retentions_total", Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", }) - m.lockfileCreatedCleanly = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_clean_start", - Help: "-1: lockfile is disabled. 0: a lockfile from a previous execution was replaced. 1: lockfile creation was clean", - }) if r != nil { r.MustRegister( @@ -307,7 +298,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.tombCleanTimer, m.blocksBytes, m.maxBytes, - m.lockfileCreatedCleanly, ) } return m @@ -669,29 +659,17 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs db.blocksToDelete = DefaultBlocksToDelete(db) } - lockfileCreationStatus := lockfileDisabled + var err error + db.locker, err = tsdbutil.NewDirLocker(dir, "tsdb", db.logger, r) + if err != nil { + return nil, err + } if !opts.NoLockfile { - absdir, err := filepath.Abs(dir) - if err != nil { + if err := db.locker.Lock(); err != nil { return nil, err } - db.lockfPath = filepath.Join(absdir, "lock") - - if _, err := os.Stat(db.lockfPath); err == nil { - level.Warn(db.logger).Log("msg", "A TSDB lockfile from a previous execution already existed. It was replaced", "file", db.lockfPath) - lockfileCreationStatus = lockfileReplaced - } else { - lockfileCreationStatus = lockfileCreatedCleanly - } - - lockf, _, err := fileutil.Flock(db.lockfPath) - if err != nil { - return nil, errors.Wrap(err, "lock DB directory") - } - db.lockf = lockf } - var err error ctx, cancel := context.WithCancel(context.Background()) db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil) if err != nil { @@ -735,7 +713,6 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs if maxBytes < 0 { maxBytes = 0 } - db.metrics.lockfileCreatedCleanly.Set(float64(lockfileCreationStatus)) db.metrics.maxBytes.Set(float64(maxBytes)) if err := db.reload(); err != nil { @@ -1448,11 +1425,7 @@ func (db *DB) Close() error { g.Go(pb.Close) } - errs := tsdb_errors.NewMulti(g.Wait()) - if db.lockf != nil { - errs.Add(db.lockf.Release()) - errs.Add(os.Remove(db.lockfPath)) - } + errs := tsdb_errors.NewMulti(g.Wait(), db.locker.Release()) if db.head != nil { errs.Add(db.head.Close()) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index d4f134b77e..4448c65230 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -3116,80 +3116,30 @@ func TestNoPanicOnTSDBOpenError(t *testing.T) { require.NoError(t, os.RemoveAll(tmpdir)) }) - absdir, err := filepath.Abs(tmpdir) - require.NoError(t, err) - // Taking the file lock will cause TSDB startup error. - lockf, _, err := fileutil.Flock(filepath.Join(absdir, "lock")) + // Taking the lock will cause a TSDB startup error. + l, err := tsdbutil.NewDirLocker(tmpdir, "tsdb", log.NewNopLogger(), nil) require.NoError(t, err) + require.NoError(t, l.Lock()) _, err = Open(tmpdir, nil, nil, DefaultOptions(), nil) require.Error(t, err) - require.NoError(t, lockf.Release()) + require.NoError(t, l.Release()) } -func TestLockfileMetric(t *testing.T) { - cases := []struct { - fileAlreadyExists bool - lockFileDisabled bool - expectedValue int - }{ - { - fileAlreadyExists: false, - lockFileDisabled: false, - expectedValue: lockfileCreatedCleanly, - }, - { - fileAlreadyExists: true, - lockFileDisabled: false, - expectedValue: lockfileReplaced, - }, - { - fileAlreadyExists: true, - lockFileDisabled: true, - expectedValue: lockfileDisabled, - }, - { - fileAlreadyExists: false, - lockFileDisabled: true, - expectedValue: lockfileDisabled, - }, - } +func TestLockfile(t *testing.T) { + tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) { + opts := DefaultOptions() + opts.NoLockfile = !createLock - for _, c := range cases { - t.Run(fmt.Sprintf("%+v", c), func(t *testing.T) { - tmpdir, err := ioutil.TempDir("", "test") - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, os.RemoveAll(tmpdir)) - }) - absdir, err := filepath.Abs(tmpdir) - require.NoError(t, err) + // Create the DB. This should create lockfile and its metrics. + db, err := Open(data, nil, nil, opts, nil) + require.NoError(t, err) - // Test preconditions (file already exists + lockfile option) - lockfilePath := filepath.Join(absdir, "lock") - if c.fileAlreadyExists { - err = ioutil.WriteFile(lockfilePath, []byte{}, 0o644) - require.NoError(t, err) - } - opts := DefaultOptions() - opts.NoLockfile = c.lockFileDisabled - - // Create the DB, this should create a lockfile and the metrics - db, err := Open(tmpdir, nil, nil, opts, nil) - require.NoError(t, err) - require.Equal(t, float64(c.expectedValue), prom_testutil.ToFloat64(db.metrics.lockfileCreatedCleanly)) - - // Close the DB, this should delete the lockfile + return db.locker, testutil.NewCallbackCloser(func() { require.NoError(t, db.Close()) - - // Check that the lockfile is always deleted - if !c.lockFileDisabled { - _, err = os.Stat(lockfilePath) - require.Error(t, err, "lockfile was not deleted") - } }) - } + }) } func TestQuerier_ShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t *testing.T) { diff --git a/tsdb/tsdbutil/dir_locker.go b/tsdb/tsdbutil/dir_locker.go new file mode 100644 index 0000000000..155f586415 --- /dev/null +++ b/tsdb/tsdbutil/dir_locker.go @@ -0,0 +1,104 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdbutil + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/fileutil" +) + +const ( + lockfileDisabled = -1 + lockfileReplaced = 0 + lockfileCreatedCleanly = 1 +) + +type DirLocker struct { + logger log.Logger + + createdCleanly prometheus.Gauge + + releaser fileutil.Releaser + path string +} + +// NewDirLocker creates a DirLocker that can obtain an exclusive lock on dir. +func NewDirLocker(dir, subsystem string, l log.Logger, r prometheus.Registerer) (*DirLocker, error) { + lock := &DirLocker{ + logger: l, + createdCleanly: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: fmt.Sprintf("prometheus_%s_clean_start", subsystem), + Help: "-1: lockfile is disabled. 0: a lockfile from a previous execution was replaced. 1: lockfile creation was clean", + }), + } + + if r != nil { + r.MustRegister(lock.createdCleanly) + } + + lock.createdCleanly.Set(lockfileDisabled) + + absdir, err := filepath.Abs(dir) + if err != nil { + return nil, err + } + lock.path = filepath.Join(absdir, "lock") + + return lock, nil +} + +// Lock obtains the lock on the locker directory. +func (l *DirLocker) Lock() error { + if l.releaser != nil { + return errors.New("DB lock already obtained") + } + + if _, err := os.Stat(l.path); err == nil { + level.Warn(l.logger).Log("msg", "A lockfile from a previous execution already existed. It was replaced", "file", l.path) + + l.createdCleanly.Set(lockfileReplaced) + } else { + l.createdCleanly.Set(lockfileCreatedCleanly) + } + + lockf, _, err := fileutil.Flock(l.path) + if err != nil { + return errors.Wrap(err, "lock DB directory") + } + l.releaser = lockf + return nil +} + +// Release releases the lock. No-op if the lock is not held. +func (l *DirLocker) Release() error { + if l.releaser == nil { + return nil + } + + errs := tsdb_errors.NewMulti() + errs.Add(l.releaser.Release()) + errs.Add(os.Remove(l.path)) + + l.releaser = nil + return errs.Err() +} diff --git a/tsdb/tsdbutil/dir_locker_test.go b/tsdb/tsdbutil/dir_locker_test.go new file mode 100644 index 0000000000..fc7d905b2d --- /dev/null +++ b/tsdb/tsdbutil/dir_locker_test.go @@ -0,0 +1,38 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdbutil + +import ( + "testing" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/util/testutil" +) + +func TestLockfile(t *testing.T) { + TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*DirLocker, testutil.Closer) { + locker, err := NewDirLocker(data, "tsdbutil", log.NewNopLogger(), nil) + require.NoError(t, err) + + if createLock { + require.NoError(t, locker.Lock()) + } + + return locker, testutil.NewCallbackCloser(func() { + require.NoError(t, locker.Release()) + }) + }) +} diff --git a/tsdb/tsdbutil/dir_locker_testutil.go b/tsdb/tsdbutil/dir_locker_testutil.go new file mode 100644 index 0000000000..cbb21e254f --- /dev/null +++ b/tsdb/tsdbutil/dir_locker_testutil.go @@ -0,0 +1,91 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdbutil + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + + "github.com/go-kit/log" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/util/testutil" +) + +// TestDirLockerUsage performs a set of tests which guarantee correct usage of +// DirLocker. open should use data as the storage directory, and createLock +// to determine if a lock file should be used. +func TestDirLockerUsage(t *testing.T, open func(t *testing.T, data string, createLock bool) (*DirLocker, testutil.Closer)) { + t.Helper() + + cases := []struct { + fileAlreadyExists bool + lockFileDisabled bool + expectedValue int + }{ + { + fileAlreadyExists: false, + lockFileDisabled: false, + expectedValue: lockfileCreatedCleanly, + }, + { + fileAlreadyExists: true, + lockFileDisabled: false, + expectedValue: lockfileReplaced, + }, + { + fileAlreadyExists: true, + lockFileDisabled: true, + expectedValue: lockfileDisabled, + }, + { + fileAlreadyExists: false, + lockFileDisabled: true, + expectedValue: lockfileDisabled, + }, + } + + for _, c := range cases { + t.Run(fmt.Sprintf("%+v", c), func(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test") + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(tmpdir)) + }) + + // Test preconditions (file already exists + lockfile option) + if c.fileAlreadyExists { + tmpLocker, err := NewDirLocker(tmpdir, "tsdb", log.NewNopLogger(), nil) + require.NoError(t, err) + err = ioutil.WriteFile(tmpLocker.path, []byte{}, 0o644) + require.NoError(t, err) + } + + locker, closer := open(t, tmpdir, !c.lockFileDisabled) + require.Equal(t, float64(c.expectedValue), prom_testutil.ToFloat64(locker.createdCleanly)) + + // Close the client. This should delete the lockfile. + closer.Close() + + // Check that the lockfile is always deleted + if !c.lockFileDisabled { + _, err = os.Stat(locker.path) + require.True(t, os.IsNotExist(err), "lockfile was not deleted") + } + }) + } +}