mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 06:47:28 -08:00
Merge pull request #11962 from jesusvazquez/jvp/protect-new-compaction-head-from-uninitialized-wbl
TSDB: Protect NewOOOCompactionHead from an uninitialized wbl
This commit is contained in:
commit
6c008ec56a
100
tsdb/db_test.go
100
tsdb/db_test.go
|
@ -4333,6 +4333,106 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
|
|||
verifySamples(db.Blocks()[1], 250, 350)
|
||||
}
|
||||
|
||||
// TestOOOCompactionWithDisabledWriteLog tests the scenario where the TSDB is
|
||||
// configured to not have wal and wbl but its able to compact both the in-order
|
||||
// and out-of-order head
|
||||
func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 30
|
||||
opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds()
|
||||
opts.WALSegmentSize = -1 // disabled WAL and WBL
|
||||
|
||||
db, err := Open(dir, nil, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
db.DisableCompactions() // We want to manually call it.
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
|
||||
series1 := labels.FromStrings("foo", "bar1")
|
||||
series2 := labels.FromStrings("foo", "bar2")
|
||||
|
||||
addSamples := func(fromMins, toMins int64) {
|
||||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, series2, ts, float64(2*ts))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
// Add an in-order samples.
|
||||
addSamples(250, 350)
|
||||
|
||||
// Add ooo samples that will result into a single block.
|
||||
addSamples(90, 110)
|
||||
|
||||
// Checking that ooo chunk is not empty.
|
||||
for _, lbls := range []labels.Labels{series1, series2} {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0)
|
||||
}
|
||||
|
||||
// If the normal Head is not compacted, the OOO head compaction does not take place.
|
||||
require.NoError(t, db.Compact())
|
||||
require.Equal(t, len(db.Blocks()), 0)
|
||||
|
||||
// Add more in-order samples in future that would trigger the compaction.
|
||||
addSamples(400, 450)
|
||||
|
||||
// No blocks before compaction.
|
||||
require.Equal(t, len(db.Blocks()), 0)
|
||||
|
||||
// Compacts normal and OOO head.
|
||||
require.NoError(t, db.Compact())
|
||||
|
||||
// 2 blocks exist now. [0, 120), [250, 360)
|
||||
require.Equal(t, len(db.Blocks()), 2)
|
||||
require.Equal(t, int64(0), db.Blocks()[0].MinTime())
|
||||
require.Equal(t, 120*time.Minute.Milliseconds(), db.Blocks()[0].MaxTime())
|
||||
require.Equal(t, 250*time.Minute.Milliseconds(), db.Blocks()[1].MinTime())
|
||||
require.Equal(t, 360*time.Minute.Milliseconds(), db.Blocks()[1].MaxTime())
|
||||
|
||||
// Checking that ooo chunk is empty.
|
||||
for _, lbls := range []labels.Labels{series1, series2} {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Nil(t, ms.ooo)
|
||||
}
|
||||
|
||||
verifySamples := func(block *Block, fromMins, toMins int64) {
|
||||
series1Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1)
|
||||
series2Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1)
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
|
||||
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
|
||||
}
|
||||
expRes := map[string][]tsdbutil.Sample{
|
||||
series1.String(): series1Samples,
|
||||
series2.String(): series2Samples,
|
||||
}
|
||||
|
||||
q, err := NewBlockQuerier(block, math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
require.Equal(t, expRes, actRes)
|
||||
}
|
||||
|
||||
// Checking for expected data in the blocks.
|
||||
verifySamples(db.Blocks()[0], 90, 110)
|
||||
verifySamples(db.Blocks()[1], 250, 350)
|
||||
}
|
||||
|
||||
func Test_Querier_OOOQuery(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 30
|
||||
|
|
|
@ -1257,6 +1257,10 @@ func (h *Head) truncateOOO(lastWBLFile int, minOOOMmapRef chunks.ChunkDiskMapper
|
|||
}
|
||||
}
|
||||
|
||||
if h.wbl == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return h.wbl.Truncate(lastWBLFile)
|
||||
}
|
||||
|
||||
|
|
|
@ -276,16 +276,18 @@ type OOOCompactionHead struct {
|
|||
// All the above together have a bit of CPU and memory overhead, and can have a bit of impact
|
||||
// on the sample append latency. So call NewOOOCompactionHead only right before compaction.
|
||||
func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) {
|
||||
newWBLFile, err := head.wbl.NextSegmentSync()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
ch := &OOOCompactionHead{
|
||||
chunkRange: head.chunkRange.Load(),
|
||||
mint: math.MaxInt64,
|
||||
maxt: math.MinInt64,
|
||||
}
|
||||
|
||||
ch := &OOOCompactionHead{
|
||||
chunkRange: head.chunkRange.Load(),
|
||||
mint: math.MaxInt64,
|
||||
maxt: math.MinInt64,
|
||||
lastWBLFile: newWBLFile,
|
||||
if head.wbl != nil {
|
||||
lastWBLFile, err := head.wbl.NextSegmentSync()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ch.lastWBLFile = lastWBLFile
|
||||
}
|
||||
|
||||
ch.oooIR = NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64)
|
||||
|
|
Loading…
Reference in a new issue