mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 05:47:27 -08:00
Merge pull request #13218 from machine424/ro-promtool
Make DBReadOnly more RO
This commit is contained in:
commit
3119b8a055
|
@ -88,7 +88,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
|
||||||
blockDuration := getCompatibleBlockDuration(maxBlockDuration)
|
blockDuration := getCompatibleBlockDuration(maxBlockDuration)
|
||||||
mint = blockDuration * (mint / blockDuration)
|
mint = blockDuration * (mint / blockDuration)
|
||||||
|
|
||||||
db, err := tsdb.OpenDBReadOnly(outputDir, nil)
|
db, err := tsdb.OpenDBReadOnly(outputDir, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -235,12 +235,14 @@ func main() {
|
||||||
|
|
||||||
tsdbDumpCmd := tsdbCmd.Command("dump", "Dump samples from a TSDB.")
|
tsdbDumpCmd := tsdbCmd.Command("dump", "Dump samples from a TSDB.")
|
||||||
dumpPath := tsdbDumpCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
|
dumpPath := tsdbDumpCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
|
||||||
|
dumpSandboxDirRoot := tsdbDumpCmd.Flag("sandbox-dir-root", "Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end.").Default(defaultDBPath).String()
|
||||||
dumpMinTime := tsdbDumpCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
|
dumpMinTime := tsdbDumpCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
|
||||||
dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
|
dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
|
||||||
dumpMatch := tsdbDumpCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
|
dumpMatch := tsdbDumpCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
|
||||||
|
|
||||||
tsdbDumpOpenMetricsCmd := tsdbCmd.Command("dump-openmetrics", "[Experimental] Dump samples from a TSDB into OpenMetrics format. Native histograms are not dumped.")
|
tsdbDumpOpenMetricsCmd := tsdbCmd.Command("dump-openmetrics", "[Experimental] Dump samples from a TSDB into OpenMetrics format. Native histograms are not dumped.")
|
||||||
dumpOpenMetricsPath := tsdbDumpOpenMetricsCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
|
dumpOpenMetricsPath := tsdbDumpOpenMetricsCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
|
||||||
|
dumpOpenMetricsSandboxDirRoot := tsdbDumpOpenMetricsCmd.Flag("sandbox-dir-root", "Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end.").Default(defaultDBPath).String()
|
||||||
dumpOpenMetricsMinTime := tsdbDumpOpenMetricsCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
|
dumpOpenMetricsMinTime := tsdbDumpOpenMetricsCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
|
||||||
dumpOpenMetricsMaxTime := tsdbDumpOpenMetricsCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
|
dumpOpenMetricsMaxTime := tsdbDumpOpenMetricsCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
|
||||||
dumpOpenMetricsMatch := tsdbDumpOpenMetricsCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
|
dumpOpenMetricsMatch := tsdbDumpOpenMetricsCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
|
||||||
|
@ -396,9 +398,9 @@ func main() {
|
||||||
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
|
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
|
||||||
|
|
||||||
case tsdbDumpCmd.FullCommand():
|
case tsdbDumpCmd.FullCommand():
|
||||||
os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch, formatSeriesSet)))
|
os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpSandboxDirRoot, *dumpMinTime, *dumpMaxTime, *dumpMatch, formatSeriesSet)))
|
||||||
case tsdbDumpOpenMetricsCmd.FullCommand():
|
case tsdbDumpOpenMetricsCmd.FullCommand():
|
||||||
os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics)))
|
os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsSandboxDirRoot, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics)))
|
||||||
// TODO(aSquare14): Work on adding support for custom block size.
|
// TODO(aSquare14): Work on adding support for custom block size.
|
||||||
case openMetricsImportCmd.FullCommand():
|
case openMetricsImportCmd.FullCommand():
|
||||||
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration))
|
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration))
|
||||||
|
|
|
@ -338,7 +338,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func listBlocks(path string, humanReadable bool) error {
|
func listBlocks(path string, humanReadable bool) error {
|
||||||
db, err := tsdb.OpenDBReadOnly(path, nil)
|
db, err := tsdb.OpenDBReadOnly(path, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -393,7 +393,7 @@ func getFormatedBytes(bytes int64, humanReadable bool) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) {
|
func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) {
|
||||||
db, err := tsdb.OpenDBReadOnly(path, nil)
|
db, err := tsdb.OpenDBReadOnly(path, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -708,8 +708,8 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
|
||||||
|
|
||||||
type SeriesSetFormatter func(series storage.SeriesSet) error
|
type SeriesSetFormatter func(series storage.SeriesSet) error
|
||||||
|
|
||||||
func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) {
|
func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) {
|
||||||
db, err := tsdb.OpenDBReadOnly(path, nil)
|
db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,7 @@ func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []strin
|
||||||
err := dumpSamples(
|
err := dumpSamples(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
path,
|
path,
|
||||||
|
t.TempDir(),
|
||||||
mint,
|
mint,
|
||||||
maxt,
|
maxt,
|
||||||
match,
|
match,
|
||||||
|
|
|
@ -566,6 +566,7 @@ Dump samples from a TSDB.
|
||||||
|
|
||||||
| Flag | Description | Default |
|
| Flag | Description | Default |
|
||||||
| --- | --- | --- |
|
| --- | --- | --- |
|
||||||
|
| <code class="text-nowrap">--sandbox-dir-root</code> | Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` |
|
||||||
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
|
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
|
||||||
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
|
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
|
||||||
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
|
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
|
||||||
|
@ -592,6 +593,7 @@ Dump samples from a TSDB.
|
||||||
|
|
||||||
| Flag | Description | Default |
|
| Flag | Description | Default |
|
||||||
| --- | --- | --- |
|
| --- | --- | --- |
|
||||||
|
| <code class="text-nowrap">--sandbox-dir-root</code> | Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` |
|
||||||
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
|
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
|
||||||
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
|
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
|
||||||
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
|
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
|
||||||
|
|
|
@ -381,6 +381,33 @@ func listChunkFiles(dir string) (map[int]string, error) {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HardLinkChunkFiles creates hardlinks for chunk files from src to dst.
|
||||||
|
// It does nothing if src doesn't exist and ensures dst is created if not.
|
||||||
|
func HardLinkChunkFiles(src, dst string) error {
|
||||||
|
_, err := os.Stat(src)
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("check source chunks dir: %w", err)
|
||||||
|
}
|
||||||
|
if err := os.MkdirAll(dst, 0o777); err != nil {
|
||||||
|
return fmt.Errorf("set up destination chunks dir: %w", err)
|
||||||
|
}
|
||||||
|
files, err := listChunkFiles(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("list chunks: %w", err)
|
||||||
|
}
|
||||||
|
for _, filePath := range files {
|
||||||
|
_, fileName := filepath.Split(filePath)
|
||||||
|
err := os.Link(filepath.Join(src, fileName), filepath.Join(dst, fileName))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("hardlink a chunk: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// repairLastChunkFile deletes the last file if it's empty.
|
// repairLastChunkFile deletes the last file if it's empty.
|
||||||
// Because we don't fsync when creating these files, we could end
|
// Because we don't fsync when creating these files, we could end
|
||||||
// up with an empty file at the end during an abrupt shutdown.
|
// up with an empty file at the end during an abrupt shutdown.
|
||||||
|
|
|
@ -1298,7 +1298,7 @@ func TestCancelCompactions(t *testing.T) {
|
||||||
// This checks that the `context.Canceled` error is properly checked at all levels:
|
// This checks that the `context.Canceled` error is properly checked at all levels:
|
||||||
// - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks.
|
// - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks.
|
||||||
// - callers should check with errors.Is() instead of ==.
|
// - callers should check with errors.Is() instead of ==.
|
||||||
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, log.NewNopLogger())
|
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", log.NewNopLogger())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
blocks, err := readOnlyDB.Blocks()
|
blocks, err := readOnlyDB.Blocks()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
45
tsdb/db.go
45
tsdb/db.go
|
@ -383,26 +383,36 @@ var ErrClosed = errors.New("db already closed")
|
||||||
// Current implementation doesn't support concurrency so
|
// Current implementation doesn't support concurrency so
|
||||||
// all API calls should happen in the same go routine.
|
// all API calls should happen in the same go routine.
|
||||||
type DBReadOnly struct {
|
type DBReadOnly struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
dir string
|
dir string
|
||||||
closers []io.Closer
|
sandboxDir string
|
||||||
closed chan struct{}
|
closers []io.Closer
|
||||||
|
closed chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenDBReadOnly opens DB in the given directory for read only operations.
|
// OpenDBReadOnly opens DB in the given directory for read only operations.
|
||||||
func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) {
|
func OpenDBReadOnly(dir, sandboxDirRoot string, l log.Logger) (*DBReadOnly, error) {
|
||||||
if _, err := os.Stat(dir); err != nil {
|
if _, err := os.Stat(dir); err != nil {
|
||||||
return nil, fmt.Errorf("opening the db dir: %w", err)
|
return nil, fmt.Errorf("opening the db dir: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if sandboxDirRoot == "" {
|
||||||
|
sandboxDirRoot = dir
|
||||||
|
}
|
||||||
|
sandboxDir, err := os.MkdirTemp(sandboxDirRoot, "tmp_dbro_sandbox")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("setting up sandbox dir: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
if l == nil {
|
if l == nil {
|
||||||
l = log.NewNopLogger()
|
l = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
|
||||||
return &DBReadOnly{
|
return &DBReadOnly{
|
||||||
logger: l,
|
logger: l,
|
||||||
dir: dir,
|
dir: dir,
|
||||||
closed: make(chan struct{}),
|
sandboxDir: sandboxDir,
|
||||||
|
closed: make(chan struct{}),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -491,7 +501,14 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
opts.ChunkDirRoot = db.dir
|
// Hard link the chunk files to a dir in db.sandboxDir in case the Head needs to truncate some of them
|
||||||
|
// or cut new ones while replaying the WAL.
|
||||||
|
// See https://github.com/prometheus/prometheus/issues/11618.
|
||||||
|
err = chunks.HardLinkChunkFiles(mmappedChunksDir(db.dir), mmappedChunksDir(db.sandboxDir))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
opts.ChunkDirRoot = db.sandboxDir
|
||||||
head, err := NewHead(nil, db.logger, nil, nil, opts, NewHeadStats())
|
head, err := NewHead(nil, db.logger, nil, nil, opts, NewHeadStats())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -519,7 +536,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
opts.ChunkDirRoot = db.dir
|
opts.ChunkDirRoot = db.sandboxDir
|
||||||
head, err = NewHead(nil, db.logger, w, wbl, opts, NewHeadStats())
|
head, err = NewHead(nil, db.logger, w, wbl, opts, NewHeadStats())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -690,8 +707,14 @@ func (db *DBReadOnly) Block(blockID string) (BlockReader, error) {
|
||||||
return block, nil
|
return block, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close all block readers.
|
// Close all block readers and delete the sandbox dir.
|
||||||
func (db *DBReadOnly) Close() error {
|
func (db *DBReadOnly) Close() error {
|
||||||
|
defer func() {
|
||||||
|
// Delete the temporary sandbox directory that was created when opening the DB.
|
||||||
|
if err := os.RemoveAll(db.sandboxDir); err != nil {
|
||||||
|
level.Error(db.logger).Log("msg", "delete sandbox dir", "err", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
select {
|
select {
|
||||||
case <-db.closed:
|
case <-db.closed:
|
||||||
return ErrClosed
|
return ErrClosed
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -2494,7 +2495,7 @@ func TestDBReadOnly(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open a read only db and ensure that the API returns the same result as the normal DB.
|
// Open a read only db and ensure that the API returns the same result as the normal DB.
|
||||||
dbReadOnly, err := OpenDBReadOnly(dbDir, logger)
|
dbReadOnly, err := OpenDBReadOnly(dbDir, "", logger)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() { require.NoError(t, dbReadOnly.Close()) }()
|
defer func() { require.NoError(t, dbReadOnly.Close()) }()
|
||||||
|
|
||||||
|
@ -2548,10 +2549,14 @@ func TestDBReadOnly(t *testing.T) {
|
||||||
// TestDBReadOnlyClosing ensures that after closing the db
|
// TestDBReadOnlyClosing ensures that after closing the db
|
||||||
// all api methods return an ErrClosed.
|
// all api methods return an ErrClosed.
|
||||||
func TestDBReadOnlyClosing(t *testing.T) {
|
func TestDBReadOnlyClosing(t *testing.T) {
|
||||||
dbDir := t.TempDir()
|
sandboxDir := t.TempDir()
|
||||||
db, err := OpenDBReadOnly(dbDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)))
|
db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
// The sandboxDir was there.
|
||||||
|
require.DirExists(t, db.sandboxDir)
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
|
// The sandboxDir was deleted when closing.
|
||||||
|
require.NoDirExists(t, db.sandboxDir)
|
||||||
require.Equal(t, db.Close(), ErrClosed)
|
require.Equal(t, db.Close(), ErrClosed)
|
||||||
_, err = db.Blocks()
|
_, err = db.Blocks()
|
||||||
require.Equal(t, err, ErrClosed)
|
require.Equal(t, err, ErrClosed)
|
||||||
|
@ -2587,7 +2592,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush WAL.
|
// Flush WAL.
|
||||||
db, err := OpenDBReadOnly(dbDir, logger)
|
db, err := OpenDBReadOnly(dbDir, "", logger)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
flush := t.TempDir()
|
flush := t.TempDir()
|
||||||
|
@ -2595,7 +2600,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
// Reopen the DB from the flushed WAL block.
|
// Reopen the DB from the flushed WAL block.
|
||||||
db, err = OpenDBReadOnly(flush, logger)
|
db, err = OpenDBReadOnly(flush, "", logger)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() { require.NoError(t, db.Close()) }()
|
defer func() { require.NoError(t, db.Close()) }()
|
||||||
blocks, err := db.Blocks()
|
blocks, err := db.Blocks()
|
||||||
|
@ -2624,6 +2629,80 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
|
||||||
require.Equal(t, 1000.0, sum)
|
require.Equal(t, 1000.0, sum)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDBReadOnly_Querier_NoAlteration(t *testing.T) {
|
||||||
|
countChunks := func(dir string) int {
|
||||||
|
files, err := os.ReadDir(mmappedChunksDir(dir))
|
||||||
|
require.NoError(t, err)
|
||||||
|
return len(files)
|
||||||
|
}
|
||||||
|
|
||||||
|
dirHash := func(dir string) (hash []byte) {
|
||||||
|
// Windows requires the DB to be closed: "xxx\lock: The process cannot access the file because it is being used by another process."
|
||||||
|
// But closing the DB alters the directory in this case (it'll cut a new chunk).
|
||||||
|
if runtime.GOOS != "windows" {
|
||||||
|
hash = testutil.DirHash(t, dir)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) {
|
||||||
|
dBDirHash := dirHash(dir)
|
||||||
|
// Bootsrap a RO db from the same dir and set up a querier.
|
||||||
|
dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, chunksCount, countChunks(dir))
|
||||||
|
q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, q.Close())
|
||||||
|
require.NoError(t, dbReadOnly.Close())
|
||||||
|
// The RO Head doesn't alter RW db chunks_head/.
|
||||||
|
require.Equal(t, chunksCount, countChunks(dir))
|
||||||
|
require.Equal(t, dirHash(dir), dBDirHash)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("doesn't cut chunks while replaying WAL", func(t *testing.T) {
|
||||||
|
db := openTestDB(t, nil, nil)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Append until the first mmaped head chunk.
|
||||||
|
for i := 0; i < 121; i++ {
|
||||||
|
app := db.Appender(context.Background())
|
||||||
|
_, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
|
|
||||||
|
spinUpQuerierAndCheck(db.dir, t.TempDir(), 0)
|
||||||
|
|
||||||
|
// The RW Head should have no problem cutting its own chunk,
|
||||||
|
// this also proves that a chunk needed to be cut.
|
||||||
|
require.NotPanics(t, func() { db.ForceHeadMMap() })
|
||||||
|
require.Equal(t, 1, countChunks(db.dir))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("doesn't truncate corrupted chunks", func(t *testing.T) {
|
||||||
|
db := openTestDB(t, nil, nil)
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
|
// Simulate a corrupted chunk: without a header.
|
||||||
|
_, err := os.Create(path.Join(mmappedChunksDir(db.dir), "000001"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
spinUpQuerierAndCheck(db.dir, t.TempDir(), 1)
|
||||||
|
|
||||||
|
// The RW Head should have no problem truncating its corrupted file:
|
||||||
|
// this proves that the chunk needed to be truncated.
|
||||||
|
db, err = Open(db.dir, nil, nil, nil, nil)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
}()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, countChunks(db.dir))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestDBCannotSeePartialCommits(t *testing.T) {
|
func TestDBCannotSeePartialCommits(t *testing.T) {
|
||||||
if defaultIsolationDisabled {
|
if defaultIsolationDisabled {
|
||||||
t.Skip("skipping test since tsdb isolation is disabled")
|
t.Skip("skipping test since tsdb isolation is disabled")
|
||||||
|
|
Loading…
Reference in a new issue