diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 4a8c359307..625e36ad6a 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4236,6 +4236,11 @@ func TestOOOAppendAndQuery(t *testing.T) { require.Equal(t, float64(totalSamples-2), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch") } + verifyOOOMinMaxTimes := func(expMin, expMax int64) { + require.Equal(t, minutes(expMin), db.head.MinOOOTime()) + require.Equal(t, minutes(expMax), db.head.MaxOOOTime()) + } + // In-order samples. addSample(s1, 300, 300, false) addSample(s2, 290, 290, false) @@ -4245,16 +4250,19 @@ func TestOOOAppendAndQuery(t *testing.T) { // Some ooo samples. addSample(s1, 250, 260, false) addSample(s2, 255, 265, false) + verifyOOOMinMaxTimes(250, 265) testQuery() // Out of allowance. addSample(s1, 59, 59, true) addSample(s2, 49, 49, true) + verifyOOOMinMaxTimes(250, 265) testQuery() // At the edge of allowance, also it would be "out of bound" without the ooo support. addSample(s1, 60, 65, false) addSample(s2, 50, 55, false) + verifyOOOMinMaxTimes(50, 265) testQuery() // Out of allowance again. @@ -4268,6 +4276,7 @@ func TestOOOAppendAndQuery(t *testing.T) { require.Equal(t, float64(4), prom_testutil.ToFloat64(db.head.metrics.chunksCreated)) addSample(s1, 180, 249, false) require.Equal(t, float64(6), prom_testutil.ToFloat64(db.head.metrics.chunksCreated)) + verifyOOOMinMaxTimes(50, 265) testQuery() } @@ -4393,6 +4402,7 @@ func TestWBLAndMmapReplay(t *testing.T) { addSample(s1, 195, 249) // This creates some m-map chunks. require.Equal(t, float64(4), prom_testutil.ToFloat64(db.head.metrics.chunksCreated)) testQuery(expSamples) + oooMint, oooMaxt := minutes(195), minutes(260) // Collect the samples only present in the ooo m-map chunks. ms, created, err := db.head.getOrCreate(s1.Hash(), s1) @@ -4436,6 +4446,8 @@ func TestWBLAndMmapReplay(t *testing.T) { t.Run("Restart DB with both WBL and M-map files for ooo data", func(t *testing.T) { db, err = Open(db.dir, nil, nil, opts, nil) require.NoError(t, err) + require.Equal(t, oooMint, db.head.MinOOOTime()) + require.Equal(t, oooMaxt, db.head.MaxOOOTime()) testQuery(expSamples) require.NoError(t, db.Close()) }) @@ -4445,6 +4457,8 @@ func TestWBLAndMmapReplay(t *testing.T) { db, err = Open(db.dir, nil, nil, opts, nil) require.NoError(t, err) + require.Equal(t, oooMint, db.head.MinOOOTime()) + require.Equal(t, oooMaxt, db.head.MaxOOOTime()) testQuery(expSamples) require.NoError(t, db.Close()) }) @@ -4455,6 +4469,8 @@ func TestWBLAndMmapReplay(t *testing.T) { db, err = Open(db.dir, nil, nil, opts, nil) require.NoError(t, err) + require.Equal(t, oooMint, db.head.MinOOOTime()) + require.Equal(t, oooMaxt, db.head.MaxOOOTime()) inOrderSample := expSamples[s1.String()][len(expSamples[s1.String()])-1] testQuery(map[string][]tsdbutil.Sample{ s1.String(): append(s1MmapSamples, inOrderSample), @@ -4469,6 +4485,8 @@ func TestWBLAndMmapReplay(t *testing.T) { opts.OutOfOrderCapMax = 60 db, err = Open(db.dir, nil, nil, opts, nil) require.NoError(t, err) + require.Equal(t, oooMint, db.head.MinOOOTime()) + require.Equal(t, oooMaxt, db.head.MaxOOOTime()) testQuery(expSamples) require.NoError(t, db.Close()) }) @@ -4479,6 +4497,8 @@ func TestWBLAndMmapReplay(t *testing.T) { opts.OutOfOrderCapMax = 10 db, err = Open(db.dir, nil, nil, opts, nil) require.NoError(t, err) + require.Equal(t, oooMint, db.head.MinOOOTime()) + require.Equal(t, oooMaxt, db.head.MaxOOOTime()) testQuery(expSamples) require.NoError(t, db.Close()) }) @@ -4511,6 +4531,8 @@ func TestWBLAndMmapReplay(t *testing.T) { opts.OutOfOrderCapMax = 30 db, err = Open(db.dir, nil, nil, opts, nil) require.NoError(t, err) + require.Equal(t, oooMint, db.head.MinOOOTime()) + require.Equal(t, oooMaxt, db.head.MaxOOOTime()) testQuery(expSamples) require.NoError(t, db.Close()) }) @@ -4521,6 +4543,8 @@ func TestWBLAndMmapReplay(t *testing.T) { db, err = Open(db.dir, nil, nil, opts, nil) require.NoError(t, err) + require.Equal(t, oooMint, db.head.MinOOOTime()) + require.Equal(t, oooMaxt, db.head.MaxOOOTime()) testQuery(expSamples) }) } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 1a78f01791..e92bcc8b88 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -357,6 +357,19 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime) } + if len(oooMmc) != 0 { + // mint and maxt can be in any chunk, they are not sorted. + mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) + for _, ch := range oooMmc { + if ch.minTime < mint { + mint = ch.minTime + } + if ch.maxTime > maxt { + maxt = ch.maxTime + } + } + h.updateMinOOOMaxOOOTime(mint, maxt) + } // Any samples replayed till now would already be compacted. Resetting the head chunk. // We do not reset oooHeadChunk because that is being replayed from a different WAL @@ -497,7 +510,7 @@ func (h *Head) loadWbl(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H processors[i].setup() go func(wp *wblSubsetProcessor) { - unknown := wp.processWALSamples(h) + unknown := wp.processWBLSamples(h) unknownRefs.Add(unknown) wg.Done() }(&processors[i]) @@ -679,14 +692,14 @@ func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample { return nil } -// processWALSamples adds the samples it receives to the head and passes +// processWBLSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. // Samples before the minValidTime timestamp are discarded. -func (wp *wblSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) { +func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { defer close(wp.output) // We don't check for minValidTime for ooo samples. - + mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for samples := range wp.input { wp.mx.Lock() for _, s := range samples { @@ -695,15 +708,26 @@ func (wp *wblSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) { unknownRefs++ continue } - if _, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper); chunkCreated { + ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper) + if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } + if ok { + if s.T < mint { + mint = s.T + } + if s.T > maxt { + maxt = s.T + } + } } wp.mx.Unlock() wp.output <- samples } + h.updateMinOOOMaxOOOTime(mint, maxt) + return unknownRefs }