Add unit test and also protect truncateOOO

Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com>
This commit is contained in:
Jesus Vazquez 2023-02-10 15:18:15 +01:00
parent f269077855
commit 5c3f058755
3 changed files with 108 additions and 4 deletions

View file

@ -4333,6 +4333,106 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
verifySamples(db.Blocks()[1], 250, 350) verifySamples(db.Blocks()[1], 250, 350)
} }
// TestOOOCompactionWithDisabledWriteLog tests the scenario where the TSDB is
// configured to not have wal and wbl but its able to compact both the in-order
// and out-of-order head
func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
dir := t.TempDir()
opts := DefaultOptions()
opts.OutOfOrderCapMax = 30
opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds()
opts.WALSegmentSize = -1 // disabled WAL and WBL
db, err := Open(dir, nil, nil, opts, nil)
require.NoError(t, err)
db.DisableCompactions() // We want to manually call it.
t.Cleanup(func() {
require.NoError(t, db.Close())
})
series1 := labels.FromStrings("foo", "bar1")
series2 := labels.FromStrings("foo", "bar2")
addSamples := func(fromMins, toMins int64) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
require.NoError(t, err)
_, err = app.Append(0, series2, ts, float64(2*ts))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Add an in-order samples.
addSamples(250, 350)
// Add ooo samples that will result into a single block.
addSamples(90, 110)
// Checking that ooo chunk is not empty.
for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0)
}
// If the normal Head is not compacted, the OOO head compaction does not take place.
require.NoError(t, db.Compact())
require.Equal(t, len(db.Blocks()), 0)
// Add more in-order samples in future that would trigger the compaction.
addSamples(400, 450)
// No blocks before compaction.
require.Equal(t, len(db.Blocks()), 0)
// Compacts normal and OOO head.
require.NoError(t, db.Compact())
// 2 blocks exist now. [0, 120), [250, 360)
require.Equal(t, len(db.Blocks()), 2)
require.Equal(t, int64(0), db.Blocks()[0].MinTime())
require.Equal(t, 120*time.Minute.Milliseconds(), db.Blocks()[0].MaxTime())
require.Equal(t, 250*time.Minute.Milliseconds(), db.Blocks()[1].MinTime())
require.Equal(t, 360*time.Minute.Milliseconds(), db.Blocks()[1].MaxTime())
// Checking that ooo chunk is empty.
for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.Nil(t, ms.ooo)
}
verifySamples := func(block *Block, fromMins, toMins int64) {
series1Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1)
series2Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
}
expRes := map[string][]tsdbutil.Sample{
series1.String(): series1Samples,
series2.String(): series2Samples,
}
q, err := NewBlockQuerier(block, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
}
// Checking for expected data in the blocks.
verifySamples(db.Blocks()[0], 90, 110)
verifySamples(db.Blocks()[1], 250, 350)
}
func Test_Querier_OOOQuery(t *testing.T) { func Test_Querier_OOOQuery(t *testing.T) {
opts := DefaultOptions() opts := DefaultOptions()
opts.OutOfOrderCapMax = 30 opts.OutOfOrderCapMax = 30

View file

@ -1207,6 +1207,10 @@ func (h *Head) truncateOOO(lastWBLFile int, minOOOMmapRef chunks.ChunkDiskMapper
} }
} }
if h.wbl == nil {
return nil
}
return h.wbl.Truncate(lastWBLFile) return h.wbl.Truncate(lastWBLFile)
} }

View file

@ -280,8 +280,8 @@ func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) {
chunkRange: head.chunkRange.Load(), chunkRange: head.chunkRange.Load(),
mint: math.MaxInt64, mint: math.MaxInt64,
maxt: math.MinInt64, maxt: math.MinInt64,
lastWBLFile: 0,
} }
if head.wbl != nil { if head.wbl != nil {
lastWBLFile, err := head.wbl.NextSegmentSync() lastWBLFile, err := head.wbl.NextSegmentSync()
if err != nil { if err != nil {