diff --git a/cmd/promtool/backfill.go b/cmd/promtool/backfill.go
index 601c3ced9f..79db428c71 100644
--- a/cmd/promtool/backfill.go
+++ b/cmd/promtool/backfill.go
@@ -88,7 +88,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
blockDuration := getCompatibleBlockDuration(maxBlockDuration)
mint = blockDuration * (mint / blockDuration)
- db, err := tsdb.OpenDBReadOnly(outputDir, nil)
+ db, err := tsdb.OpenDBReadOnly(outputDir, "", nil)
if err != nil {
return err
}
diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go
index c0484adcc0..40d933376c 100644
--- a/cmd/promtool/main.go
+++ b/cmd/promtool/main.go
@@ -235,12 +235,14 @@ func main() {
tsdbDumpCmd := tsdbCmd.Command("dump", "Dump samples from a TSDB.")
dumpPath := tsdbDumpCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
+ dumpSandboxDirRoot := tsdbDumpCmd.Flag("sandbox-dir-root", "Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end.").Default(defaultDBPath).String()
dumpMinTime := tsdbDumpCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
dumpMatch := tsdbDumpCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
tsdbDumpOpenMetricsCmd := tsdbCmd.Command("dump-openmetrics", "[Experimental] Dump samples from a TSDB into OpenMetrics format. Native histograms are not dumped.")
dumpOpenMetricsPath := tsdbDumpOpenMetricsCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
+ dumpOpenMetricsSandboxDirRoot := tsdbDumpOpenMetricsCmd.Flag("sandbox-dir-root", "Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end.").Default(defaultDBPath).String()
dumpOpenMetricsMinTime := tsdbDumpOpenMetricsCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpOpenMetricsMaxTime := tsdbDumpOpenMetricsCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
dumpOpenMetricsMatch := tsdbDumpOpenMetricsCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
@@ -396,9 +398,9 @@ func main() {
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
case tsdbDumpCmd.FullCommand():
- os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch, formatSeriesSet)))
+ os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpSandboxDirRoot, *dumpMinTime, *dumpMaxTime, *dumpMatch, formatSeriesSet)))
case tsdbDumpOpenMetricsCmd.FullCommand():
- os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics)))
+ os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsSandboxDirRoot, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics)))
// TODO(aSquare14): Work on adding support for custom block size.
case openMetricsImportCmd.FullCommand():
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration))
diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go
index 6868102fa3..2ed7244b1c 100644
--- a/cmd/promtool/tsdb.go
+++ b/cmd/promtool/tsdb.go
@@ -338,7 +338,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
}
func listBlocks(path string, humanReadable bool) error {
- db, err := tsdb.OpenDBReadOnly(path, nil)
+ db, err := tsdb.OpenDBReadOnly(path, "", nil)
if err != nil {
return err
}
@@ -393,7 +393,7 @@ func getFormatedBytes(bytes int64, humanReadable bool) string {
}
func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) {
- db, err := tsdb.OpenDBReadOnly(path, nil)
+ db, err := tsdb.OpenDBReadOnly(path, "", nil)
if err != nil {
return nil, nil, err
}
@@ -708,8 +708,8 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
type SeriesSetFormatter func(series storage.SeriesSet) error
-func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) {
- db, err := tsdb.OpenDBReadOnly(path, nil)
+func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) {
+ db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil)
if err != nil {
return err
}
diff --git a/cmd/promtool/tsdb_test.go b/cmd/promtool/tsdb_test.go
index 70e8877659..75089b168b 100644
--- a/cmd/promtool/tsdb_test.go
+++ b/cmd/promtool/tsdb_test.go
@@ -64,6 +64,7 @@ func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []strin
err := dumpSamples(
context.Background(),
path,
+ t.TempDir(),
mint,
maxt,
match,
diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md
index 3eceed48f2..9ed51fb7c9 100644
--- a/docs/command-line/promtool.md
+++ b/docs/command-line/promtool.md
@@ -566,6 +566,7 @@ Dump samples from a TSDB.
| Flag | Description | Default |
| --- | --- | --- |
+| --sandbox-dir-root
| Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` |
| --min-time
| Minimum timestamp to dump. | `-9223372036854775808` |
| --max-time
| Maximum timestamp to dump. | `9223372036854775807` |
| --match
| Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
@@ -592,6 +593,7 @@ Dump samples from a TSDB.
| Flag | Description | Default |
| --- | --- | --- |
+| --sandbox-dir-root
| Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` |
| --min-time
| Minimum timestamp to dump. | `-9223372036854775808` |
| --max-time
| Maximum timestamp to dump. | `9223372036854775807` |
| --match
| Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go
index 66dbb07b71..6c8707c57b 100644
--- a/tsdb/chunks/head_chunks.go
+++ b/tsdb/chunks/head_chunks.go
@@ -381,6 +381,33 @@ func listChunkFiles(dir string) (map[int]string, error) {
return res, nil
}
+// HardLinkChunkFiles creates hardlinks for chunk files from src to dst.
+// It does nothing if src doesn't exist and ensures dst is created if not.
+func HardLinkChunkFiles(src, dst string) error {
+ _, err := os.Stat(src)
+ if os.IsNotExist(err) {
+ return nil
+ }
+ if err != nil {
+ return fmt.Errorf("check source chunks dir: %w", err)
+ }
+ if err := os.MkdirAll(dst, 0o777); err != nil {
+ return fmt.Errorf("set up destination chunks dir: %w", err)
+ }
+ files, err := listChunkFiles(src)
+ if err != nil {
+ return fmt.Errorf("list chunks: %w", err)
+ }
+ for _, filePath := range files {
+ _, fileName := filepath.Split(filePath)
+ err := os.Link(filepath.Join(src, fileName), filepath.Join(dst, fileName))
+ if err != nil {
+ return fmt.Errorf("hardlink a chunk: %w", err)
+ }
+ }
+ return nil
+}
+
// repairLastChunkFile deletes the last file if it's empty.
// Because we don't fsync when creating these files, we could end
// up with an empty file at the end during an abrupt shutdown.
diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go
index 10c90e30dc..7a353a556a 100644
--- a/tsdb/compact_test.go
+++ b/tsdb/compact_test.go
@@ -1298,7 +1298,7 @@ func TestCancelCompactions(t *testing.T) {
// This checks that the `context.Canceled` error is properly checked at all levels:
// - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks.
// - callers should check with errors.Is() instead of ==.
- readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, log.NewNopLogger())
+ readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", log.NewNopLogger())
require.NoError(t, err)
blocks, err := readOnlyDB.Blocks()
require.NoError(t, err)
diff --git a/tsdb/db.go b/tsdb/db.go
index c2e8904a25..bca3c99480 100644
--- a/tsdb/db.go
+++ b/tsdb/db.go
@@ -383,26 +383,36 @@ var ErrClosed = errors.New("db already closed")
// Current implementation doesn't support concurrency so
// all API calls should happen in the same go routine.
type DBReadOnly struct {
- logger log.Logger
- dir string
- closers []io.Closer
- closed chan struct{}
+ logger log.Logger
+ dir string
+ sandboxDir string
+ closers []io.Closer
+ closed chan struct{}
}
// OpenDBReadOnly opens DB in the given directory for read only operations.
-func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) {
+func OpenDBReadOnly(dir, sandboxDirRoot string, l log.Logger) (*DBReadOnly, error) {
if _, err := os.Stat(dir); err != nil {
return nil, fmt.Errorf("opening the db dir: %w", err)
}
+ if sandboxDirRoot == "" {
+ sandboxDirRoot = dir
+ }
+ sandboxDir, err := os.MkdirTemp(sandboxDirRoot, "tmp_dbro_sandbox")
+ if err != nil {
+ return nil, fmt.Errorf("setting up sandbox dir: %w", err)
+ }
+
if l == nil {
l = log.NewNopLogger()
}
return &DBReadOnly{
- logger: l,
- dir: dir,
- closed: make(chan struct{}),
+ logger: l,
+ dir: dir,
+ sandboxDir: sandboxDir,
+ closed: make(chan struct{}),
}, nil
}
@@ -491,7 +501,14 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
}
opts := DefaultHeadOptions()
- opts.ChunkDirRoot = db.dir
+ // Hard link the chunk files to a dir in db.sandboxDir in case the Head needs to truncate some of them
+ // or cut new ones while replaying the WAL.
+ // See https://github.com/prometheus/prometheus/issues/11618.
+ err = chunks.HardLinkChunkFiles(mmappedChunksDir(db.dir), mmappedChunksDir(db.sandboxDir))
+ if err != nil {
+ return nil, err
+ }
+ opts.ChunkDirRoot = db.sandboxDir
head, err := NewHead(nil, db.logger, nil, nil, opts, NewHeadStats())
if err != nil {
return nil, err
@@ -519,7 +536,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
}
}
opts := DefaultHeadOptions()
- opts.ChunkDirRoot = db.dir
+ opts.ChunkDirRoot = db.sandboxDir
head, err = NewHead(nil, db.logger, w, wbl, opts, NewHeadStats())
if err != nil {
return nil, err
@@ -690,8 +707,14 @@ func (db *DBReadOnly) Block(blockID string) (BlockReader, error) {
return block, nil
}
-// Close all block readers.
+// Close all block readers and delete the sandbox dir.
func (db *DBReadOnly) Close() error {
+ defer func() {
+ // Delete the temporary sandbox directory that was created when opening the DB.
+ if err := os.RemoveAll(db.sandboxDir); err != nil {
+ level.Error(db.logger).Log("msg", "delete sandbox dir", "err", err)
+ }
+ }()
select {
case <-db.closed:
return ErrClosed
diff --git a/tsdb/db_test.go b/tsdb/db_test.go
index a682f46554..f0b27dcc2a 100644
--- a/tsdb/db_test.go
+++ b/tsdb/db_test.go
@@ -25,6 +25,7 @@ import (
"os"
"path"
"path/filepath"
+ "runtime"
"sort"
"strconv"
"sync"
@@ -2494,7 +2495,7 @@ func TestDBReadOnly(t *testing.T) {
}
// Open a read only db and ensure that the API returns the same result as the normal DB.
- dbReadOnly, err := OpenDBReadOnly(dbDir, logger)
+ dbReadOnly, err := OpenDBReadOnly(dbDir, "", logger)
require.NoError(t, err)
defer func() { require.NoError(t, dbReadOnly.Close()) }()
@@ -2548,10 +2549,14 @@ func TestDBReadOnly(t *testing.T) {
// TestDBReadOnlyClosing ensures that after closing the db
// all api methods return an ErrClosed.
func TestDBReadOnlyClosing(t *testing.T) {
- dbDir := t.TempDir()
- db, err := OpenDBReadOnly(dbDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)))
+ sandboxDir := t.TempDir()
+ db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)))
require.NoError(t, err)
+ // The sandboxDir was there.
+ require.DirExists(t, db.sandboxDir)
require.NoError(t, db.Close())
+ // The sandboxDir was deleted when closing.
+ require.NoDirExists(t, db.sandboxDir)
require.Equal(t, db.Close(), ErrClosed)
_, err = db.Blocks()
require.Equal(t, err, ErrClosed)
@@ -2587,7 +2592,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
}
// Flush WAL.
- db, err := OpenDBReadOnly(dbDir, logger)
+ db, err := OpenDBReadOnly(dbDir, "", logger)
require.NoError(t, err)
flush := t.TempDir()
@@ -2595,7 +2600,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
require.NoError(t, db.Close())
// Reopen the DB from the flushed WAL block.
- db, err = OpenDBReadOnly(flush, logger)
+ db, err = OpenDBReadOnly(flush, "", logger)
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()
blocks, err := db.Blocks()
@@ -2624,6 +2629,80 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
require.Equal(t, 1000.0, sum)
}
+func TestDBReadOnly_Querier_NoAlteration(t *testing.T) {
+ countChunks := func(dir string) int {
+ files, err := os.ReadDir(mmappedChunksDir(dir))
+ require.NoError(t, err)
+ return len(files)
+ }
+
+ dirHash := func(dir string) (hash []byte) {
+ // Windows requires the DB to be closed: "xxx\lock: The process cannot access the file because it is being used by another process."
+ // But closing the DB alters the directory in this case (it'll cut a new chunk).
+ if runtime.GOOS != "windows" {
+ hash = testutil.DirHash(t, dir)
+ }
+ return
+ }
+
+ spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) {
+ dBDirHash := dirHash(dir)
+ // Bootsrap a RO db from the same dir and set up a querier.
+ dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil)
+ require.NoError(t, err)
+ require.Equal(t, chunksCount, countChunks(dir))
+ q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt)
+ require.NoError(t, err)
+ require.NoError(t, q.Close())
+ require.NoError(t, dbReadOnly.Close())
+ // The RO Head doesn't alter RW db chunks_head/.
+ require.Equal(t, chunksCount, countChunks(dir))
+ require.Equal(t, dirHash(dir), dBDirHash)
+ }
+
+ t.Run("doesn't cut chunks while replaying WAL", func(t *testing.T) {
+ db := openTestDB(t, nil, nil)
+ defer func() {
+ require.NoError(t, db.Close())
+ }()
+
+ // Append until the first mmaped head chunk.
+ for i := 0; i < 121; i++ {
+ app := db.Appender(context.Background())
+ _, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), 0)
+ require.NoError(t, err)
+ require.NoError(t, app.Commit())
+ }
+
+ spinUpQuerierAndCheck(db.dir, t.TempDir(), 0)
+
+ // The RW Head should have no problem cutting its own chunk,
+ // this also proves that a chunk needed to be cut.
+ require.NotPanics(t, func() { db.ForceHeadMMap() })
+ require.Equal(t, 1, countChunks(db.dir))
+ })
+
+ t.Run("doesn't truncate corrupted chunks", func(t *testing.T) {
+ db := openTestDB(t, nil, nil)
+ require.NoError(t, db.Close())
+
+ // Simulate a corrupted chunk: without a header.
+ _, err := os.Create(path.Join(mmappedChunksDir(db.dir), "000001"))
+ require.NoError(t, err)
+
+ spinUpQuerierAndCheck(db.dir, t.TempDir(), 1)
+
+ // The RW Head should have no problem truncating its corrupted file:
+ // this proves that the chunk needed to be truncated.
+ db, err = Open(db.dir, nil, nil, nil, nil)
+ defer func() {
+ require.NoError(t, db.Close())
+ }()
+ require.NoError(t, err)
+ require.Equal(t, 0, countChunks(db.dir))
+ })
+}
+
func TestDBCannotSeePartialCommits(t *testing.T) {
if defaultIsolationDisabled {
t.Skip("skipping test since tsdb isolation is disabled")