diff --git a/tsdb/db.go b/tsdb/db.go index 6d2c4e18a..ee7b5d980 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -859,6 +859,11 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } } + if db.head.MinOOOTime() != int64(math.MaxInt64) { + // Some OOO data was replayed from the disk that needs compaction and cleanup. + db.oooWasEnabled.Store(true) + } + go db.run() return db, nil diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 80875c944..908153e06 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -5435,3 +5435,87 @@ func TestPanicOnApplyConfig(t *testing.T) { }) require.NoError(t, err) } + +func TestDiskFillingUpAfterDisablingOOO(t *testing.T) { + dir := t.TempDir() + + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds() + opts.AllowOverlappingQueries = true + + db, err := Open(dir, nil, nil, opts, nil) + require.NoError(t, err) + db.DisableCompactions() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + series1 := labels.FromStrings("foo", "bar1") + var allSamples []tsdbutil.Sample + addSamples := func(fromMins, toMins int64) { + app := db.Appender(context.Background()) + for min := fromMins; min <= toMins; min++ { + ts := min * time.Minute.Milliseconds() + _, err := app.Append(0, series1, ts, float64(ts)) + require.NoError(t, err) + allSamples = append(allSamples, sample{t: ts, v: float64(ts)}) + } + require.NoError(t, app.Commit()) + } + + // In-order samples. + addSamples(290, 300) + // OOO samples. + addSamples(250, 299) + + // Restart DB with OOO disabled. + require.NoError(t, db.Close()) + opts.OutOfOrderTimeWindow = 0 + db, err = Open(db.dir, nil, prometheus.NewRegistry(), opts, nil) + require.NoError(t, err) + db.DisableCompactions() + + ms := db.head.series.getByHash(series1.Hash(), series1) + require.Greater(t, len(ms.oooMmappedChunks), 0, "OOO mmap chunk was not replayed") + + checkMmapFileContents := func(contains, notContains []string) { + mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot) + files, err := os.ReadDir(mmapDir) + require.NoError(t, err) + + fnames := make([]string, 0, len(files)) + for _, f := range files { + fnames = append(fnames, f.Name()) + } + + for _, f := range contains { + require.Contains(t, fnames, f) + } + for _, f := range notContains { + require.NotContains(t, fnames, f) + } + } + + // Add in-order samples until ready for compaction.. + addSamples(301, 500) + + // Check that m-map files gets deleted properly after compactions. + + checkMmapFileContents([]string{"000001", "000002"}, nil) + require.NoError(t, db.Compact()) + checkMmapFileContents([]string{"000002"}, []string{"000001"}) + require.Equal(t, 0, len(ms.oooMmappedChunks), "OOO mmap chunk was not compacted") + + addSamples(501, 650) + checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"}) + require.NoError(t, db.Compact()) + checkMmapFileContents(nil, []string{"000001", "000002", "000003"}) + + // Verify that WBL is empty. + files, err := os.ReadDir(db.head.wbl.Dir()) + require.NoError(t, err) + require.Len(t, files, 1) // Last empty file after compaction. + finfo, err := files[0].Info() + require.NoError(t, err) + require.Equal(t, int64(0), finfo.Size()) +}