mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-15 01:54:06 -08:00
Run OOO compaction after restart if there is OOO data from WBL (#320)
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
54196bb7c4
commit
4aa6d561a1
|
@ -859,6 +859,11 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if db.head.MinOOOTime() != int64(math.MaxInt64) {
|
||||||
|
// Some OOO data was replayed from the disk that needs compaction and cleanup.
|
||||||
|
db.oooWasEnabled.Store(true)
|
||||||
|
}
|
||||||
|
|
||||||
go db.run()
|
go db.run()
|
||||||
|
|
||||||
return db, nil
|
return db, nil
|
||||||
|
|
|
@ -5435,3 +5435,87 @@ func TestPanicOnApplyConfig(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
opts := DefaultOptions()
|
||||||
|
opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds()
|
||||||
|
opts.AllowOverlappingQueries = true
|
||||||
|
|
||||||
|
db, err := Open(dir, nil, nil, opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
db.DisableCompactions()
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
series1 := labels.FromStrings("foo", "bar1")
|
||||||
|
var allSamples []tsdbutil.Sample
|
||||||
|
addSamples := func(fromMins, toMins int64) {
|
||||||
|
app := db.Appender(context.Background())
|
||||||
|
for min := fromMins; min <= toMins; min++ {
|
||||||
|
ts := min * time.Minute.Milliseconds()
|
||||||
|
_, err := app.Append(0, series1, ts, float64(ts))
|
||||||
|
require.NoError(t, err)
|
||||||
|
allSamples = append(allSamples, sample{t: ts, v: float64(ts)})
|
||||||
|
}
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
|
|
||||||
|
// In-order samples.
|
||||||
|
addSamples(290, 300)
|
||||||
|
// OOO samples.
|
||||||
|
addSamples(250, 299)
|
||||||
|
|
||||||
|
// Restart DB with OOO disabled.
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
opts.OutOfOrderTimeWindow = 0
|
||||||
|
db, err = Open(db.dir, nil, prometheus.NewRegistry(), opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
db.DisableCompactions()
|
||||||
|
|
||||||
|
ms := db.head.series.getByHash(series1.Hash(), series1)
|
||||||
|
require.Greater(t, len(ms.oooMmappedChunks), 0, "OOO mmap chunk was not replayed")
|
||||||
|
|
||||||
|
checkMmapFileContents := func(contains, notContains []string) {
|
||||||
|
mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot)
|
||||||
|
files, err := os.ReadDir(mmapDir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fnames := make([]string, 0, len(files))
|
||||||
|
for _, f := range files {
|
||||||
|
fnames = append(fnames, f.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range contains {
|
||||||
|
require.Contains(t, fnames, f)
|
||||||
|
}
|
||||||
|
for _, f := range notContains {
|
||||||
|
require.NotContains(t, fnames, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add in-order samples until ready for compaction..
|
||||||
|
addSamples(301, 500)
|
||||||
|
|
||||||
|
// Check that m-map files gets deleted properly after compactions.
|
||||||
|
|
||||||
|
checkMmapFileContents([]string{"000001", "000002"}, nil)
|
||||||
|
require.NoError(t, db.Compact())
|
||||||
|
checkMmapFileContents([]string{"000002"}, []string{"000001"})
|
||||||
|
require.Equal(t, 0, len(ms.oooMmappedChunks), "OOO mmap chunk was not compacted")
|
||||||
|
|
||||||
|
addSamples(501, 650)
|
||||||
|
checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"})
|
||||||
|
require.NoError(t, db.Compact())
|
||||||
|
checkMmapFileContents(nil, []string{"000001", "000002", "000003"})
|
||||||
|
|
||||||
|
// Verify that WBL is empty.
|
||||||
|
files, err := os.ReadDir(db.head.wbl.Dir())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, files, 1) // Last empty file after compaction.
|
||||||
|
finfo, err := files[0].Info()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, int64(0), finfo.Size())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue