Share TSDB locker code with agent (#9623)

* share tsdb db locker code with agent

Closes #9616

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* add flag to disable lockfile for agent

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* use agentOnlySetting instead of PreAction

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* tsdb: address review feedback

1. Rename Locker to DirLocker
2. Move DirLocker to tsdb/tsdbutil
3. Name metric using fmt.Sprintf
4. Refine error checking in DirLocker test

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* tsdb: create test utilities to assert expected DirLocker behavior

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* tsdb/tsdbutil: fix lint errors

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* tsdb/agent: fix windows test failure

Use new DB variable instead of overriding the old one.

Signed-off-by: Robert Fratto <robertfratto@gmail.com>
This commit is contained in:
Robert Fratto 2021-11-11 11:45:25 -05:00 committed by GitHub
parent 5a9be19062
commit 72a9f7fee9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 329 additions and 129 deletions

View file

@ -333,6 +333,9 @@ func main() {
"Maximum age samples may be before being forcibly deleted when the WAL is truncated").
SetValue(&cfg.agent.MaxWALTime)
agentOnlyFlag(a, "storage.agent.no-lockfile", "Do not create lockfile in data directory.").
Default("false").BoolVar(&cfg.agent.NoLockfile)
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
@ -1505,6 +1508,7 @@ type agentOptions struct {
StripeSize int
TruncateFrequency model.Duration
MinWALTime, MaxWALTime model.Duration
NoLockfile bool
}
func (opts agentOptions) ToAgentOptions() agent.Options {
@ -1515,6 +1519,7 @@ func (opts agentOptions) ToAgentOptions() agent.Options {
TruncateFrequency: time.Duration(opts.TruncateFrequency),
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
NoLockfile: opts.NoLockfile,
}
}

View file

@ -34,7 +34,9 @@ import (
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal"
)
@ -66,6 +68,9 @@ type Options struct {
// Shortest and longest amount of time data can exist in the WAL before being
// deleted.
MinWALTime, MaxWALTime int64
// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool
}
// DefaultOptions used for the WAL storage. They are sane for setups using
@ -78,6 +83,7 @@ func DefaultOptions() *Options {
TruncateFrequency: DefaultTruncateFrequency,
MinWALTime: DefaultMinWALTime,
MaxWALTime: DefaultMaxWALTime,
NoLockfile: false,
}
}
@ -187,7 +193,8 @@ type DB struct {
opts *Options
rs *remote.Storage
wal *wal.WAL
wal *wal.WAL
locker *tsdbutil.DirLocker
appenderPool sync.Pool
bufPool sync.Pool
@ -208,6 +215,16 @@ type DB struct {
func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir string, opts *Options) (*DB, error) {
opts = validateOptions(opts)
locker, err := tsdbutil.NewDirLocker(dir, "agent", l, reg)
if err != nil {
return nil, err
}
if !opts.NoLockfile {
if err := locker.Lock(); err != nil {
return nil, err
}
}
// remote_write expects WAL to be stored in a "wal" subdirectory of the main storage.
dir = filepath.Join(dir, "wal")
@ -221,7 +238,8 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin
opts: opts,
rs: rs,
wal: w,
wal: w,
locker: locker,
nextRef: atomic.NewUint64(0),
series: newStripeSeries(opts.StripeSize),
@ -641,7 +659,7 @@ func (db *DB) Close() error {
db.metrics.Unregister()
return db.wal.Close()
return tsdb_errors.NewMulti(db.locker.Release(), db.wal.Close()).Err()
}
type appender struct {

View file

@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal"
"github.com/prometheus/prometheus/util/testutil"
)
func TestUnsupported(t *testing.T) {
@ -80,9 +81,6 @@ func TestCommit(t *testing.T) {
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts)
require.NoError(t, err)
defer func() {
require.NoError(t, s.Close())
}()
a := s.Appender(context.TODO())
@ -94,9 +92,11 @@ func TestCommit(t *testing.T) {
_, err := a.Append(0, lset, sample[0].T(), sample[0].V())
require.NoError(t, err)
}
require.NoError(t, a.Commit())
}
require.NoError(t, a.Commit())
require.NoError(t, s.Close())
// Read records from WAL and check for expected count of series and samples.
walSeriesCount := 0
walSamplesCount := 0
@ -176,9 +176,6 @@ func TestRollback(t *testing.T) {
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts)
require.NoError(t, err)
defer func() {
require.NoError(t, s.Close())
}()
a := s.Appender(context.TODO())
@ -193,6 +190,7 @@ func TestRollback(t *testing.T) {
}
require.NoError(t, a.Rollback())
require.NoError(t, s.Close())
// Read records from WAL and check for expected count of series and samples.
walSeriesCount := 0
@ -380,9 +378,6 @@ func TestWALReplay(t *testing.T) {
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts)
require.NoError(t, err)
defer func() {
require.NoError(t, s.Close())
}()
a := s.Appender(context.TODO())
@ -396,22 +391,26 @@ func TestWALReplay(t *testing.T) {
}
require.NoError(t, a.Commit())
require.NoError(t, s.Close())
restartOpts := DefaultOptions()
restartLogger := log.NewNopLogger()
restartReg := prometheus.NewRegistry()
s, err = Open(restartLogger, restartReg, nil, promAgentDir, restartOpts)
// Open a new DB with the same WAL to check that series from the previous DB
// get replayed.
replayDB, err := Open(restartLogger, restartReg, nil, promAgentDir, restartOpts)
require.NoError(t, err)
defer func() {
require.NoError(t, replayDB.Close())
}()
// Check if all the series are retrieved back from the WAL.
m := gatherFamily(t, restartReg, "prometheus_agent_active_series")
require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count")
// Check if lastTs of the samples retrieved from the WAL is retained.
metrics := s.series.series
metrics := replayDB.series.series
for i := 0; i < len(metrics); i++ {
mp := metrics[i]
for _, v := range mp {
@ -420,6 +419,28 @@ func TestWALReplay(t *testing.T) {
}
}
func TestLockfile(t *testing.T) {
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil)
t.Cleanup(func() {
require.NoError(t, rs.Close())
})
opts := DefaultOptions()
opts.NoLockfile = !createLock
// Create the DB. This should create lockfile and its metrics.
db, err := Open(logger, nil, rs, data, opts)
require.NoError(t, err)
return db.locker, testutil.NewCallbackCloser(func() {
require.NoError(t, db.Close())
})
})
}
func startTime() (int64, error) {
return time.Now().Unix() * 1000, nil
}

View file

@ -43,6 +43,7 @@ import (
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met.
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal"
)
@ -59,10 +60,6 @@ const (
tmpForCreationBlockDirSuffix = ".tmp-for-creation"
// Pre-2.21 tmp dir suffix, used in clean-up functions.
tmpLegacy = ".tmp"
lockfileDisabled = -1
lockfileReplaced = 0
lockfileCreatedCleanly = 1
)
// ErrNotReady is returned if the underlying storage is not ready yet.
@ -162,9 +159,8 @@ type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
// DB handles reads and writes of time series falling into
// a hashed partition of a seriedb.
type DB struct {
dir string
lockf fileutil.Releaser
lockfPath string
dir string
locker *tsdbutil.DirLocker
logger log.Logger
metrics *dbMetrics
@ -196,20 +192,19 @@ type DB struct {
}
type dbMetrics struct {
loadedBlocks prometheus.GaugeFunc
symbolTableSize prometheus.GaugeFunc
reloads prometheus.Counter
reloadsFailed prometheus.Counter
compactionsFailed prometheus.Counter
compactionsTriggered prometheus.Counter
compactionsSkipped prometheus.Counter
sizeRetentionCount prometheus.Counter
timeRetentionCount prometheus.Counter
startTime prometheus.GaugeFunc
tombCleanTimer prometheus.Histogram
blocksBytes prometheus.Gauge
maxBytes prometheus.Gauge
lockfileCreatedCleanly prometheus.Gauge
loadedBlocks prometheus.GaugeFunc
symbolTableSize prometheus.GaugeFunc
reloads prometheus.Counter
reloadsFailed prometheus.Counter
compactionsFailed prometheus.Counter
compactionsTriggered prometheus.Counter
compactionsSkipped prometheus.Counter
sizeRetentionCount prometheus.Counter
timeRetentionCount prometheus.Counter
startTime prometheus.GaugeFunc
tombCleanTimer prometheus.Histogram
blocksBytes prometheus.Gauge
maxBytes prometheus.Gauge
}
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
@ -287,10 +282,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_size_retentions_total",
Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.",
})
m.lockfileCreatedCleanly = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_clean_start",
Help: "-1: lockfile is disabled. 0: a lockfile from a previous execution was replaced. 1: lockfile creation was clean",
})
if r != nil {
r.MustRegister(
@ -307,7 +298,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m.tombCleanTimer,
m.blocksBytes,
m.maxBytes,
m.lockfileCreatedCleanly,
)
}
return m
@ -669,29 +659,17 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
db.blocksToDelete = DefaultBlocksToDelete(db)
}
lockfileCreationStatus := lockfileDisabled
var err error
db.locker, err = tsdbutil.NewDirLocker(dir, "tsdb", db.logger, r)
if err != nil {
return nil, err
}
if !opts.NoLockfile {
absdir, err := filepath.Abs(dir)
if err != nil {
if err := db.locker.Lock(); err != nil {
return nil, err
}
db.lockfPath = filepath.Join(absdir, "lock")
if _, err := os.Stat(db.lockfPath); err == nil {
level.Warn(db.logger).Log("msg", "A TSDB lockfile from a previous execution already existed. It was replaced", "file", db.lockfPath)
lockfileCreationStatus = lockfileReplaced
} else {
lockfileCreationStatus = lockfileCreatedCleanly
}
lockf, _, err := fileutil.Flock(db.lockfPath)
if err != nil {
return nil, errors.Wrap(err, "lock DB directory")
}
db.lockf = lockf
}
var err error
ctx, cancel := context.WithCancel(context.Background())
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil)
if err != nil {
@ -735,7 +713,6 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
if maxBytes < 0 {
maxBytes = 0
}
db.metrics.lockfileCreatedCleanly.Set(float64(lockfileCreationStatus))
db.metrics.maxBytes.Set(float64(maxBytes))
if err := db.reload(); err != nil {
@ -1448,11 +1425,7 @@ func (db *DB) Close() error {
g.Go(pb.Close)
}
errs := tsdb_errors.NewMulti(g.Wait())
if db.lockf != nil {
errs.Add(db.lockf.Release())
errs.Add(os.Remove(db.lockfPath))
}
errs := tsdb_errors.NewMulti(g.Wait(), db.locker.Release())
if db.head != nil {
errs.Add(db.head.Close())
}

View file

@ -3116,80 +3116,30 @@ func TestNoPanicOnTSDBOpenError(t *testing.T) {
require.NoError(t, os.RemoveAll(tmpdir))
})
absdir, err := filepath.Abs(tmpdir)
require.NoError(t, err)
// Taking the file lock will cause TSDB startup error.
lockf, _, err := fileutil.Flock(filepath.Join(absdir, "lock"))
// Taking the lock will cause a TSDB startup error.
l, err := tsdbutil.NewDirLocker(tmpdir, "tsdb", log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, l.Lock())
_, err = Open(tmpdir, nil, nil, DefaultOptions(), nil)
require.Error(t, err)
require.NoError(t, lockf.Release())
require.NoError(t, l.Release())
}
func TestLockfileMetric(t *testing.T) {
cases := []struct {
fileAlreadyExists bool
lockFileDisabled bool
expectedValue int
}{
{
fileAlreadyExists: false,
lockFileDisabled: false,
expectedValue: lockfileCreatedCleanly,
},
{
fileAlreadyExists: true,
lockFileDisabled: false,
expectedValue: lockfileReplaced,
},
{
fileAlreadyExists: true,
lockFileDisabled: true,
expectedValue: lockfileDisabled,
},
{
fileAlreadyExists: false,
lockFileDisabled: true,
expectedValue: lockfileDisabled,
},
}
func TestLockfile(t *testing.T) {
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
opts := DefaultOptions()
opts.NoLockfile = !createLock
for _, c := range cases {
t.Run(fmt.Sprintf("%+v", c), func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpdir))
})
absdir, err := filepath.Abs(tmpdir)
require.NoError(t, err)
// Create the DB. This should create lockfile and its metrics.
db, err := Open(data, nil, nil, opts, nil)
require.NoError(t, err)
// Test preconditions (file already exists + lockfile option)
lockfilePath := filepath.Join(absdir, "lock")
if c.fileAlreadyExists {
err = ioutil.WriteFile(lockfilePath, []byte{}, 0o644)
require.NoError(t, err)
}
opts := DefaultOptions()
opts.NoLockfile = c.lockFileDisabled
// Create the DB, this should create a lockfile and the metrics
db, err := Open(tmpdir, nil, nil, opts, nil)
require.NoError(t, err)
require.Equal(t, float64(c.expectedValue), prom_testutil.ToFloat64(db.metrics.lockfileCreatedCleanly))
// Close the DB, this should delete the lockfile
return db.locker, testutil.NewCallbackCloser(func() {
require.NoError(t, db.Close())
// Check that the lockfile is always deleted
if !c.lockFileDisabled {
_, err = os.Stat(lockfilePath)
require.Error(t, err, "lockfile was not deleted")
}
})
}
})
}
func TestQuerier_ShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t *testing.T) {

104
tsdb/tsdbutil/dir_locker.go Normal file
View file

@ -0,0 +1,104 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdbutil
import (
"fmt"
"os"
"path/filepath"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
)
const (
lockfileDisabled = -1
lockfileReplaced = 0
lockfileCreatedCleanly = 1
)
type DirLocker struct {
logger log.Logger
createdCleanly prometheus.Gauge
releaser fileutil.Releaser
path string
}
// NewDirLocker creates a DirLocker that can obtain an exclusive lock on dir.
func NewDirLocker(dir, subsystem string, l log.Logger, r prometheus.Registerer) (*DirLocker, error) {
lock := &DirLocker{
logger: l,
createdCleanly: prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("prometheus_%s_clean_start", subsystem),
Help: "-1: lockfile is disabled. 0: a lockfile from a previous execution was replaced. 1: lockfile creation was clean",
}),
}
if r != nil {
r.MustRegister(lock.createdCleanly)
}
lock.createdCleanly.Set(lockfileDisabled)
absdir, err := filepath.Abs(dir)
if err != nil {
return nil, err
}
lock.path = filepath.Join(absdir, "lock")
return lock, nil
}
// Lock obtains the lock on the locker directory.
func (l *DirLocker) Lock() error {
if l.releaser != nil {
return errors.New("DB lock already obtained")
}
if _, err := os.Stat(l.path); err == nil {
level.Warn(l.logger).Log("msg", "A lockfile from a previous execution already existed. It was replaced", "file", l.path)
l.createdCleanly.Set(lockfileReplaced)
} else {
l.createdCleanly.Set(lockfileCreatedCleanly)
}
lockf, _, err := fileutil.Flock(l.path)
if err != nil {
return errors.Wrap(err, "lock DB directory")
}
l.releaser = lockf
return nil
}
// Release releases the lock. No-op if the lock is not held.
func (l *DirLocker) Release() error {
if l.releaser == nil {
return nil
}
errs := tsdb_errors.NewMulti()
errs.Add(l.releaser.Release())
errs.Add(os.Remove(l.path))
l.releaser = nil
return errs.Err()
}

View file

@ -0,0 +1,38 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdbutil
import (
"testing"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/util/testutil"
)
func TestLockfile(t *testing.T) {
TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*DirLocker, testutil.Closer) {
locker, err := NewDirLocker(data, "tsdbutil", log.NewNopLogger(), nil)
require.NoError(t, err)
if createLock {
require.NoError(t, locker.Lock())
}
return locker, testutil.NewCallbackCloser(func() {
require.NoError(t, locker.Release())
})
})
}

View file

@ -0,0 +1,91 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdbutil
import (
"fmt"
"io/ioutil"
"os"
"testing"
"github.com/go-kit/log"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/util/testutil"
)
// TestDirLockerUsage performs a set of tests which guarantee correct usage of
// DirLocker. open should use data as the storage directory, and createLock
// to determine if a lock file should be used.
func TestDirLockerUsage(t *testing.T, open func(t *testing.T, data string, createLock bool) (*DirLocker, testutil.Closer)) {
t.Helper()
cases := []struct {
fileAlreadyExists bool
lockFileDisabled bool
expectedValue int
}{
{
fileAlreadyExists: false,
lockFileDisabled: false,
expectedValue: lockfileCreatedCleanly,
},
{
fileAlreadyExists: true,
lockFileDisabled: false,
expectedValue: lockfileReplaced,
},
{
fileAlreadyExists: true,
lockFileDisabled: true,
expectedValue: lockfileDisabled,
},
{
fileAlreadyExists: false,
lockFileDisabled: true,
expectedValue: lockfileDisabled,
},
}
for _, c := range cases {
t.Run(fmt.Sprintf("%+v", c), func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpdir))
})
// Test preconditions (file already exists + lockfile option)
if c.fileAlreadyExists {
tmpLocker, err := NewDirLocker(tmpdir, "tsdb", log.NewNopLogger(), nil)
require.NoError(t, err)
err = ioutil.WriteFile(tmpLocker.path, []byte{}, 0o644)
require.NoError(t, err)
}
locker, closer := open(t, tmpdir, !c.lockFileDisabled)
require.Equal(t, float64(c.expectedValue), prom_testutil.ToFloat64(locker.createdCleanly))
// Close the client. This should delete the lockfile.
closer.Close()
// Check that the lockfile is always deleted
if !c.lockFileDisabled {
_, err = os.Stat(locker.path)
require.True(t, os.IsNotExist(err), "lockfile was not deleted")
}
})
}
}