More review feedback.

This commit is contained in:
Peter Štibraný 2021-09-28 10:29:54 +02:00
parent db7fa7621c
commit 57daf79192

View file

@ -605,7 +605,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks .
if ob.tmpDir != "" { if ob.tmpDir != "" {
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
if removeErr := os.RemoveAll(ob.tmpDir); removeErr != nil { if removeErr := os.RemoveAll(ob.tmpDir); removeErr != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "dir", ob.tmpDir, "err", removeErr.Error()) level.Error(c.logger).Log("msg", "Failed to remove temp folder after failed compaction", "dir", ob.tmpDir, "err", removeErr.Error())
} }
} }
@ -615,7 +615,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks .
if err != nil && ob.blockDir != "" { if err != nil && ob.blockDir != "" {
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
if removeErr := os.RemoveAll(ob.blockDir); removeErr != nil { if removeErr := os.RemoveAll(ob.blockDir); removeErr != nil {
level.Error(c.logger).Log("msg", "removed block folder after failed compaction", "dir", ob.blockDir, "err", removeErr.Error()) level.Error(c.logger).Log("msg", "Failed to remove block folder after failed compaction", "dir", ob.blockDir, "err", removeErr.Error())
} }
} }
} }
@ -828,7 +828,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
} }
var ( var (
ref = uint64(0) refs = make([]uint64, len(outBlocks))
chks []chunks.Meta chks []chunks.Meta
) )
@ -864,22 +864,22 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
continue continue
} }
ob := outBlocks[0] obIx := uint64(0)
if len(outBlocks) > 1 { if len(outBlocks) > 1 {
ob = outBlocks[s.Labels().Hash()%uint64(len(outBlocks))] obIx = s.Labels().Hash() % uint64(len(outBlocks))
} }
if err := ob.chunkw.WriteChunks(chks...); err != nil { if err := outBlocks[obIx].chunkw.WriteChunks(chks...); err != nil {
return errors.Wrap(err, "write chunks") return errors.Wrap(err, "write chunks")
} }
if err := ob.indexw.AddSeries(ref, s.Labels(), chks...); err != nil { if err := outBlocks[obIx].indexw.AddSeries(refs[obIx], s.Labels(), chks...); err != nil {
return errors.Wrap(err, "add series") return errors.Wrap(err, "add series")
} }
ob.meta.Stats.NumChunks += uint64(len(chks)) outBlocks[obIx].meta.Stats.NumChunks += uint64(len(chks))
ob.meta.Stats.NumSeries++ outBlocks[obIx].meta.Stats.NumSeries++
for _, chk := range chks { for _, chk := range chks {
ob.meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) outBlocks[obIx].meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
} }
for _, chk := range chks { for _, chk := range chks {
@ -887,7 +887,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
return errors.Wrap(err, "put chunk") return errors.Wrap(err, "put chunk")
} }
} }
ref++ refs[obIx]++
} }
if set.Err() != nil { if set.Err() != nil {
return errors.Wrap(set.Err(), "iterate compaction set") return errors.Wrap(set.Err(), "iterate compaction set")