diff --git a/tsdb/db.go b/tsdb/db.go index 090d6fcf0c..87870a8472 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1295,6 +1295,9 @@ func (db *DB) CompactOOOHead(ctx context.Context) error { return db.compactOOOHead(ctx) } +// Callback for testing. +var compactOOOHeadTestingCallback func() + func (db *DB) compactOOOHead(ctx context.Context) error { if !db.oooWasEnabled.Load() { return nil @@ -1304,6 +1307,11 @@ func (db *DB) compactOOOHead(ctx context.Context) error { return fmt.Errorf("get ooo compaction head: %w", err) } + if compactOOOHeadTestingCallback != nil { + compactOOOHeadTestingCallback() + compactOOOHeadTestingCallback = nil + } + ulids, err := db.compactOOO(db.dir, oooHead) if err != nil { return fmt.Errorf("compact ooo head: %w", err) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index c0edafe087..3dae9a5d18 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4497,12 +4497,15 @@ func TestMetadataAssertInMemoryData(t *testing.T) { func TestOOOCompaction(t *testing.T) { for name, scenario := range sampleTypeScenarios { t.Run(name, func(t *testing.T) { - testOOOCompaction(t, scenario) + testOOOCompaction(t, scenario, false) + }) + t.Run(name+"+extra", func(t *testing.T) { + testOOOCompaction(t, scenario, true) }) } } -func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) { +func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSamples bool) { dir := t.TempDir() ctx := context.Background() @@ -4533,7 +4536,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) { } // Add an in-order samples. - addSample(250, 350) + addSample(250, 300) // Verify that the in-memory ooo chunk is empty. checkEmptyOOOChunk := func(lbls labels.Labels) { @@ -4547,15 +4550,17 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) { // Add ooo samples that creates multiple chunks. // 90 to 300 spans across 3 block ranges: [0, 120), [120, 240), [240, 360) - addSample(90, 310) + addSample(90, 300) // Adding same samples to create overlapping chunks. // Since the active chunk won't start at 90 again, all the new // chunks will have different time ranges than the previous chunks. - addSample(90, 310) + addSample(90, 300) + + var highest int64 = 300 verifyDBSamples := func() { var series1Samples, series2Samples []chunks.Sample - for _, r := range [][2]int64{{90, 119}, {120, 239}, {240, 350}} { + for _, r := range [][2]int64{{90, 119}, {120, 239}, {240, highest}} { fromMins, toMins := r[0], r[1] for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() @@ -4583,7 +4588,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) { require.NoError(t, err) require.False(t, created) require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples()) - require.Len(t, ms.ooo.oooMmappedChunks, 14) // 7 original, 7 duplicate. + require.Len(t, ms.ooo.oooMmappedChunks, 13) // 7 original, 6 duplicate. } checkNonEmptyOOOChunk(series1) checkNonEmptyOOOChunk(series2) @@ -4601,6 +4606,15 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) { require.NoError(t, err) require.Greater(t, f.Size(), int64(100)) + if addExtraSamples { + compactOOOHeadTestingCallback = func() { + addSample(90, 120) // Back in time, to generate a new OOO chunk. + addSample(300, 330) // Now some samples after the previous highest timestamp. + addSample(300, 330) // Repeat to generate an OOO chunk at these timestamps. + } + highest = 330 + } + // OOO compaction happens here. require.NoError(t, db.CompactOOOHead(ctx)) @@ -4616,11 +4630,13 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) { require.Equal(t, "00000001", files[0].Name()) f, err = files[0].Info() require.NoError(t, err) - require.Equal(t, int64(0), f.Size()) - // OOO stuff should not be present in the Head now. - checkEmptyOOOChunk(series1) - checkEmptyOOOChunk(series2) + if !addExtraSamples { + require.Equal(t, int64(0), f.Size()) + // OOO stuff should not be present in the Head now. + checkEmptyOOOChunk(series1) + checkEmptyOOOChunk(series2) + } verifySamples := func(block *Block, fromMins, toMins int64) { series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1) @@ -4645,7 +4661,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) { // Checking for expected data in the blocks. verifySamples(db.Blocks()[0], 90, 119) verifySamples(db.Blocks()[1], 120, 239) - verifySamples(db.Blocks()[2], 240, 310) + verifySamples(db.Blocks()[2], 240, 299) // There should be a single m-map file. mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot) @@ -4658,7 +4674,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) { err = db.CompactHead(NewRangeHead(db.head, 250*time.Minute.Milliseconds(), 350*time.Minute.Milliseconds())) require.NoError(t, err) require.Len(t, db.Blocks(), 4) // [0, 120), [120, 240), [240, 360), [250, 351) - verifySamples(db.Blocks()[3], 250, 350) + verifySamples(db.Blocks()[3], 250, highest) verifyDBSamples() // Blocks created out of normal and OOO head now. But not merged. @@ -4675,7 +4691,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) { require.Len(t, db.Blocks(), 3) // [0, 120), [120, 240), [240, 360) verifySamples(db.Blocks()[0], 90, 119) verifySamples(db.Blocks()[1], 120, 239) - verifySamples(db.Blocks()[2], 240, 350) // Merged block. + verifySamples(db.Blocks()[2], 240, highest) // Merged block. verifyDBSamples() // Final state. Blocks from normal and OOO head are merged. } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 9ba8785ad2..87564ae3c9 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -467,7 +467,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi // amongst all the chunks in the OOOHead. // This function is not thread safe unless the caller holds a lock. // The caller must ensure that s.ooo is not nil. -func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (*mergedOOOChunks, error) { +func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) { _, cid := chunks.HeadChunkRef(meta.Ref).Unpack() // ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are @@ -490,6 +490,9 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1) for i, c := range s.ooo.oooMmappedChunks { + if maxMmapRef != 0 && c.ref > maxMmapRef { + break + } if c.OverlapsClosedInterval(mint, maxt) { tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ meta: chunks.Meta{ diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index b2556d62e9..01b5bff636 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -201,7 +201,7 @@ func (oh *OOORangeHead) Index() (IndexReader, error) { } func (oh *OOORangeHead) Chunks() (ChunkReader, error) { - return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState), nil + return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState, 0), nil } func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error) { diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index a35276af50..9d5b9d6443 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -243,14 +243,16 @@ type OOOHeadChunkReader struct { head *Head mint, maxt int64 isoState *oooIsolationState + maxMmapRef chunks.ChunkDiskMapperRef } -func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState) *OOOHeadChunkReader { +func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *OOOHeadChunkReader { return &OOOHeadChunkReader{ - head: head, - mint: mint, - maxt: maxt, - isoState: isoState, + head: head, + mint: mint, + maxt: maxt, + isoState: isoState, + maxMmapRef: maxMmapRef, } } @@ -269,7 +271,7 @@ func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, s.Unlock() return nil, nil, storage.ErrNotFound } - mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt) + mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt, cr.maxMmapRef) s.Unlock() if err != nil { return nil, nil, err @@ -386,7 +388,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) { } func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) { - return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil), nil + return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, ch.lastMmapRef), nil } func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 7ecd355b55..8cc3f1dde6 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -481,7 +481,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) { db := newTestDBWithOpts(t, opts) - cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil) + cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil, 0) defer cr.Close() c, iterable, err := cr.ChunkOrIterable(chunks.Meta{ Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, @@ -839,7 +839,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { require.NoError(t, err) require.Equal(t, len(tc.expChunksSamples), len(chks)) - cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil) + cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) defer cr.Close() for i := 0; i < len(chks); i++ { c, iterable, err := cr.ChunkOrIterable(chks[i]) @@ -1013,7 +1013,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( } require.NoError(t, app.Commit()) - cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil) + cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) defer cr.Close() for i := 0; i < len(chks); i++ { c, iterable, err := cr.ChunkOrIterable(chks[i])