All output blocks will have the same timestamp.

Minor updates.
This commit is contained in:
Peter Štibraný 2021-09-27 14:22:51 +02:00
parent 63dbb1c69a
commit 006c2d7d55
2 changed files with 19 additions and 14 deletions

View file

@ -463,11 +463,9 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh
}
outBlocks := make([]shardedBlock, shardCount)
outBlocksTime := ulid.Now() // Make all out blocks share the same timestamp in the ULID.
for ix := range outBlocks {
uid := ulid.MustNew(ulid.Now(), rand.Reader)
outBlocks[ix] = shardedBlock{
meta: CompactBlockMetas(uid, metas...),
}
outBlocks[ix] = shardedBlock{meta: CompactBlockMetas(ulid.MustNew(outBlocksTime, rand.Reader), metas...)}
}
err = c.write(dest, outBlocks, blocks...)
@ -477,6 +475,8 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh
for ix := range outBlocks {
meta := outBlocks[ix].meta
ulids[ix] = outBlocks[ix].meta.ULID
if meta.Stats.NumSamples == 0 {
for _, b := range bs {
b.meta.Compaction.Deletable = true
@ -490,7 +490,7 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh
}
b.numBytesMeta = n
}
outBlocks[ix].meta.ULID = ulid.ULID{}
ulids[ix] = ulid.ULID{}
level.Info(c.logger).Log(
"msg", "compact blocks resulted in empty block",
"count", len(blocks),
@ -510,8 +510,6 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh
"shard", fmt.Sprintf("%d_of_%d", ix, shardCount),
)
}
ulids[ix] = outBlocks[ix].meta.ULID
}
return ulids, nil
}
@ -599,12 +597,12 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks .
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err()
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
for ix := 0; ix < len(outBlocks); ix++ {
if outBlocks[ix].tmpDir == "" {
for _, ob := range outBlocks {
if ob.tmpDir == "" {
continue
}
if err := os.RemoveAll(outBlocks[ix].tmpDir); err != nil {
if err := os.RemoveAll(ob.tmpDir); err != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
}
}
@ -679,8 +677,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks .
return errs.Err()
}
for ix := range outBlocks {
ob := outBlocks[ix]
for _, ob := range outBlocks {
// Populated block is empty, don't write meta file for it.
if ob.meta.Stats.NumSamples == 0 {
continue
@ -804,8 +801,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
}
for symbols.Next() {
for ix := range outBlocks {
if err := outBlocks[ix].indexw.AddSymbol(symbols.At()); err != nil {
for _, ob := range outBlocks {
if err := ob.indexw.AddSymbol(symbols.At()); err != nil {
return errors.Wrap(err, "add symbol")
}
}

View file

@ -529,12 +529,20 @@ func TestCompaction_CompactWithSplitting(t *testing.T) {
// 2) Verify that total number of series over all blocks is correct.
totalSeries := uint64(0)
ts := uint64(0)
for shardIndex, blockID := range blockIDs {
// Some blocks may be empty, they will have zero block ID.
if blockID == (ulid.ULID{}) {
continue
}
// All blocks have the same timestamp.
if ts == 0 {
ts = blockID.Time()
} else {
require.Equal(t, ts, blockID.Time())
}
block, err := OpenBlock(log.NewNopLogger(), filepath.Join(dir, blockID.String()), nil)
require.NoError(t, err)