mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
[Test] TSDB: TestOOOCompaction with samples added after compaction starts
Test fails due to bug. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
2898d5d715
commit
bded853035
|
@ -1295,6 +1295,9 @@ func (db *DB) CompactOOOHead(ctx context.Context) error {
|
||||||
return db.compactOOOHead(ctx)
|
return db.compactOOOHead(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Callback for testing.
|
||||||
|
var compactOOOHeadTestingCallback func()
|
||||||
|
|
||||||
func (db *DB) compactOOOHead(ctx context.Context) error {
|
func (db *DB) compactOOOHead(ctx context.Context) error {
|
||||||
if !db.oooWasEnabled.Load() {
|
if !db.oooWasEnabled.Load() {
|
||||||
return nil
|
return nil
|
||||||
|
@ -1304,6 +1307,11 @@ func (db *DB) compactOOOHead(ctx context.Context) error {
|
||||||
return fmt.Errorf("get ooo compaction head: %w", err)
|
return fmt.Errorf("get ooo compaction head: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if compactOOOHeadTestingCallback != nil {
|
||||||
|
compactOOOHeadTestingCallback()
|
||||||
|
compactOOOHeadTestingCallback = nil
|
||||||
|
}
|
||||||
|
|
||||||
ulids, err := db.compactOOO(db.dir, oooHead)
|
ulids, err := db.compactOOO(db.dir, oooHead)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("compact ooo head: %w", err)
|
return fmt.Errorf("compact ooo head: %w", err)
|
||||||
|
|
|
@ -4497,12 +4497,15 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
|
||||||
func TestOOOCompaction(t *testing.T) {
|
func TestOOOCompaction(t *testing.T) {
|
||||||
for name, scenario := range sampleTypeScenarios {
|
for name, scenario := range sampleTypeScenarios {
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
testOOOCompaction(t, scenario)
|
testOOOCompaction(t, scenario, false)
|
||||||
|
})
|
||||||
|
t.Run(name+"+extra", func(t *testing.T) {
|
||||||
|
testOOOCompaction(t, scenario, true)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
|
func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSamples bool) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
@ -4533,7 +4536,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add an in-order samples.
|
// Add an in-order samples.
|
||||||
addSample(250, 350)
|
addSample(250, 300)
|
||||||
|
|
||||||
// Verify that the in-memory ooo chunk is empty.
|
// Verify that the in-memory ooo chunk is empty.
|
||||||
checkEmptyOOOChunk := func(lbls labels.Labels) {
|
checkEmptyOOOChunk := func(lbls labels.Labels) {
|
||||||
|
@ -4547,15 +4550,17 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
|
||||||
|
|
||||||
// Add ooo samples that creates multiple chunks.
|
// Add ooo samples that creates multiple chunks.
|
||||||
// 90 to 300 spans across 3 block ranges: [0, 120), [120, 240), [240, 360)
|
// 90 to 300 spans across 3 block ranges: [0, 120), [120, 240), [240, 360)
|
||||||
addSample(90, 310)
|
addSample(90, 300)
|
||||||
// Adding same samples to create overlapping chunks.
|
// Adding same samples to create overlapping chunks.
|
||||||
// Since the active chunk won't start at 90 again, all the new
|
// Since the active chunk won't start at 90 again, all the new
|
||||||
// chunks will have different time ranges than the previous chunks.
|
// chunks will have different time ranges than the previous chunks.
|
||||||
addSample(90, 310)
|
addSample(90, 300)
|
||||||
|
|
||||||
|
var highest int64 = 300
|
||||||
|
|
||||||
verifyDBSamples := func() {
|
verifyDBSamples := func() {
|
||||||
var series1Samples, series2Samples []chunks.Sample
|
var series1Samples, series2Samples []chunks.Sample
|
||||||
for _, r := range [][2]int64{{90, 119}, {120, 239}, {240, 350}} {
|
for _, r := range [][2]int64{{90, 119}, {120, 239}, {240, highest}} {
|
||||||
fromMins, toMins := r[0], r[1]
|
fromMins, toMins := r[0], r[1]
|
||||||
for min := fromMins; min <= toMins; min++ {
|
for min := fromMins; min <= toMins; min++ {
|
||||||
ts := min * time.Minute.Milliseconds()
|
ts := min * time.Minute.Milliseconds()
|
||||||
|
@ -4583,7 +4588,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.False(t, created)
|
require.False(t, created)
|
||||||
require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples())
|
require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples())
|
||||||
require.Len(t, ms.ooo.oooMmappedChunks, 14) // 7 original, 7 duplicate.
|
require.Len(t, ms.ooo.oooMmappedChunks, 13) // 7 original, 6 duplicate.
|
||||||
}
|
}
|
||||||
checkNonEmptyOOOChunk(series1)
|
checkNonEmptyOOOChunk(series1)
|
||||||
checkNonEmptyOOOChunk(series2)
|
checkNonEmptyOOOChunk(series2)
|
||||||
|
@ -4601,6 +4606,15 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Greater(t, f.Size(), int64(100))
|
require.Greater(t, f.Size(), int64(100))
|
||||||
|
|
||||||
|
if addExtraSamples {
|
||||||
|
compactOOOHeadTestingCallback = func() {
|
||||||
|
addSample(90, 120) // Back in time, to generate a new OOO chunk.
|
||||||
|
addSample(300, 330) // Now some samples after the previous highest timestamp.
|
||||||
|
addSample(300, 330) // Repeat to generate an OOO chunk at these timestamps.
|
||||||
|
}
|
||||||
|
highest = 330
|
||||||
|
}
|
||||||
|
|
||||||
// OOO compaction happens here.
|
// OOO compaction happens here.
|
||||||
require.NoError(t, db.CompactOOOHead(ctx))
|
require.NoError(t, db.CompactOOOHead(ctx))
|
||||||
|
|
||||||
|
@ -4616,11 +4630,13 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
|
||||||
require.Equal(t, "00000001", files[0].Name())
|
require.Equal(t, "00000001", files[0].Name())
|
||||||
f, err = files[0].Info()
|
f, err = files[0].Info()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, int64(0), f.Size())
|
|
||||||
|
|
||||||
// OOO stuff should not be present in the Head now.
|
if !addExtraSamples {
|
||||||
checkEmptyOOOChunk(series1)
|
require.Equal(t, int64(0), f.Size())
|
||||||
checkEmptyOOOChunk(series2)
|
// OOO stuff should not be present in the Head now.
|
||||||
|
checkEmptyOOOChunk(series1)
|
||||||
|
checkEmptyOOOChunk(series2)
|
||||||
|
}
|
||||||
|
|
||||||
verifySamples := func(block *Block, fromMins, toMins int64) {
|
verifySamples := func(block *Block, fromMins, toMins int64) {
|
||||||
series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
|
series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
|
||||||
|
@ -4645,7 +4661,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
|
||||||
// Checking for expected data in the blocks.
|
// Checking for expected data in the blocks.
|
||||||
verifySamples(db.Blocks()[0], 90, 119)
|
verifySamples(db.Blocks()[0], 90, 119)
|
||||||
verifySamples(db.Blocks()[1], 120, 239)
|
verifySamples(db.Blocks()[1], 120, 239)
|
||||||
verifySamples(db.Blocks()[2], 240, 310)
|
verifySamples(db.Blocks()[2], 240, 299)
|
||||||
|
|
||||||
// There should be a single m-map file.
|
// There should be a single m-map file.
|
||||||
mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot)
|
mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot)
|
||||||
|
@ -4658,7 +4674,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
|
||||||
err = db.CompactHead(NewRangeHead(db.head, 250*time.Minute.Milliseconds(), 350*time.Minute.Milliseconds()))
|
err = db.CompactHead(NewRangeHead(db.head, 250*time.Minute.Milliseconds(), 350*time.Minute.Milliseconds()))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, db.Blocks(), 4) // [0, 120), [120, 240), [240, 360), [250, 351)
|
require.Len(t, db.Blocks(), 4) // [0, 120), [120, 240), [240, 360), [250, 351)
|
||||||
verifySamples(db.Blocks()[3], 250, 350)
|
verifySamples(db.Blocks()[3], 250, highest)
|
||||||
|
|
||||||
verifyDBSamples() // Blocks created out of normal and OOO head now. But not merged.
|
verifyDBSamples() // Blocks created out of normal and OOO head now. But not merged.
|
||||||
|
|
||||||
|
@ -4675,7 +4691,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
|
||||||
require.Len(t, db.Blocks(), 3) // [0, 120), [120, 240), [240, 360)
|
require.Len(t, db.Blocks(), 3) // [0, 120), [120, 240), [240, 360)
|
||||||
verifySamples(db.Blocks()[0], 90, 119)
|
verifySamples(db.Blocks()[0], 90, 119)
|
||||||
verifySamples(db.Blocks()[1], 120, 239)
|
verifySamples(db.Blocks()[1], 120, 239)
|
||||||
verifySamples(db.Blocks()[2], 240, 350) // Merged block.
|
verifySamples(db.Blocks()[2], 240, highest) // Merged block.
|
||||||
|
|
||||||
verifyDBSamples() // Final state. Blocks from normal and OOO head are merged.
|
verifyDBSamples() // Final state. Blocks from normal and OOO head are merged.
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue