Merge remote-tracking branch 'upstream/main'

This commit is contained in:
Jeanette Tan 2023-04-14 17:46:42 +08:00
commit a537d6c5c6
2 changed files with 14 additions and 14 deletions

View file

@ -428,13 +428,13 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// and returns slice of block IDs. Position of returned block ID in the result slice corresponds to the shard index. // and returns slice of block IDs. Position of returned block ID in the result slice corresponds to the shard index.
// If given output block has no series, corresponding block ID will be zero ULID value. // If given output block has no series, corresponding block ID will be zero ULID value.
func (c *LeveledCompactor) CompactWithSplitting(dest string, dirs []string, open []*Block, shardCount uint64) (result []ulid.ULID, _ error) { func (c *LeveledCompactor) CompactWithSplitting(dest string, dirs []string, open []*Block, shardCount uint64) (result []ulid.ULID, _ error) {
return c.CompactWithPopulateBlockFunc(dest, dirs, open, DefaultPopulateBlockFunc{}, shardCount) return c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{}, shardCount)
} }
// Compact creates a new block in the compactor's directory from the blocks in the // Compact creates a new block in the compactor's directory from the blocks in the
// provided directories. // provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
ulids, err := c.CompactWithPopulateBlockFunc(dest, dirs, open, DefaultPopulateBlockFunc{}, 1) ulids, err := c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{}, 1)
if err != nil { if err != nil {
return ulid.ULID{}, err return ulid.ULID{}, err
} }
@ -453,7 +453,7 @@ type shardedBlock struct {
indexw IndexWriter indexw IndexWriter
} }
func (c *LeveledCompactor) CompactWithPopulateBlockFunc(dest string, dirs []string, open []*Block, populateBlockFunc PopulateBlockFunc, shardCount uint64) (_ []ulid.ULID, err error) { func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator, shardCount uint64) (_ []ulid.ULID, err error) {
if shardCount == 0 { if shardCount == 0 {
shardCount = 1 shardCount = 1
} }
@ -487,7 +487,7 @@ func (c *LeveledCompactor) CompactWithPopulateBlockFunc(dest string, dirs []stri
outBlocks[ix] = shardedBlock{meta: CompactBlockMetas(ulid.MustNew(outBlocksTime, rand.Reader), metas...)} outBlocks[ix] = shardedBlock{meta: CompactBlockMetas(ulid.MustNew(outBlocksTime, rand.Reader), metas...)}
} }
err = c.write(dest, outBlocks, populateBlockFunc, blocks...) err = c.write(dest, outBlocks, blockPopulator, blocks...)
if err == nil { if err == nil {
ulids := make([]ulid.ULID, len(outBlocks)) ulids := make([]ulid.ULID, len(outBlocks))
allOutputBlocksAreEmpty := true allOutputBlocksAreEmpty := true
@ -619,7 +619,7 @@ func (c *LeveledCompactor) compactOOO(dest string, oooHead *OOOCompactionHead, s
} }
// Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. // Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.
err := c.write(dest, outBlocks[ix], DefaultPopulateBlockFunc{}, oooHead.CloneForTimeRange(mint, maxt-1)) err := c.write(dest, outBlocks[ix], DefaultBlockPopulator{}, oooHead.CloneForTimeRange(mint, maxt-1))
if err != nil { if err != nil {
// We need to delete all blocks in case there was an error. // We need to delete all blocks in case there was an error.
for _, obs := range outBlocks { for _, obs := range outBlocks {
@ -691,7 +691,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
} }
} }
err := c.write(dest, []shardedBlock{{meta: meta}}, DefaultPopulateBlockFunc{}, b) err := c.write(dest, []shardedBlock{{meta: meta}}, DefaultBlockPopulator{}, b)
if err != nil { if err != nil {
return uid, err return uid, err
} }
@ -736,7 +736,7 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
} }
// write creates new output blocks that are the union of the provided blocks into dir. // write creates new output blocks that are the union of the provided blocks into dir.
func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, populateBlockFunc PopulateBlockFunc, blocks ...BlockReader) (err error) { func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blockPopulator BlockPopulator, blocks ...BlockReader) (err error) {
var closers []io.Closer var closers []io.Closer
defer func(t time.Time) { defer func(t time.Time) {
@ -814,7 +814,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, populate
} }
// We use MinTime and MaxTime from first output block, because ALL output blocks have the same min/max times set. // We use MinTime and MaxTime from first output block, because ALL output blocks have the same min/max times set.
if err := populateBlockFunc.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, c.concurrencyOpts, blocks, outBlocks[0].meta.MinTime, outBlocks[0].meta.MaxTime, outBlocks); err != nil { if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, c.concurrencyOpts, blocks, outBlocks[0].meta.MinTime, outBlocks[0].meta.MaxTime, outBlocks); err != nil {
return errors.Wrap(err, "populate block") return errors.Wrap(err, "populate block")
} }
@ -938,16 +938,16 @@ func timeFromMillis(ms int64) time.Time {
return time.Unix(0, ms*int64(time.Millisecond)) return time.Unix(0, ms*int64(time.Millisecond))
} }
type PopulateBlockFunc interface { type BlockPopulator interface {
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, concurrencyOpts LeveledCompactorConcurrencyOptions, blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) error PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, concurrencyOpts LeveledCompactorConcurrencyOptions, blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) error
} }
type DefaultPopulateBlockFunc struct{} type DefaultBlockPopulator struct{}
// PopulateBlock fills the index and chunk writers with new data gathered as the union // PopulateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block. // of the provided blocks. It returns meta information for the new block.
// It expects sorted blocks input by mint. // It expects sorted blocks input by mint.
func (c DefaultPopulateBlockFunc) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, concurrencyOpts LeveledCompactorConcurrencyOptions, blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) { func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, concurrencyOpts LeveledCompactorConcurrencyOptions, blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) {
if len(blocks) == 0 { if len(blocks) == 0 {
return errors.New("cannot populate block(s) from no readers") return errors.New("cannot populate block(s) from no readers")
} }

View file

@ -450,7 +450,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
{meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), crand.Reader)}}, {meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), crand.Reader)}},
} }
require.Error(t, compactor.write(tmpdir, shardedBlocks, DefaultPopulateBlockFunc{}, erringBReader{})) require.Error(t, compactor.write(tmpdir, shardedBlocks, DefaultBlockPopulator{}, erringBReader{}))
// We rely on the fact that blockDir and tmpDir will be updated by compactor.write. // We rely on the fact that blockDir and tmpDir will be updated by compactor.write.
for _, b := range shardedBlocks { for _, b := range shardedBlocks {
@ -1155,8 +1155,8 @@ func TestCompaction_populateBlock(t *testing.T) {
iw := &mockIndexWriter{} iw := &mockIndexWriter{}
ob := shardedBlock{meta: meta, indexw: iw, chunkw: nopChunkWriter{}} ob := shardedBlock{meta: meta, indexw: iw, chunkw: nopChunkWriter{}}
populateBlockFunc := DefaultPopulateBlockFunc{} blockPopulator := DefaultBlockPopulator{}
err = populateBlockFunc.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, c.concurrencyOpts, blocks, meta.MinTime, meta.MaxTime, []shardedBlock{ob}) err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, c.concurrencyOpts, blocks, meta.MinTime, meta.MaxTime, []shardedBlock{ob})
if tc.expErr != nil { if tc.expErr != nil {
require.Error(t, err) require.Error(t, err)