Merge pull request #12257 from alexqyle/block-populator-rename

Rename PopulateBlockFunc to BlockPopulator
This commit is contained in:
Ganesh Vernekar 2023-04-14 13:35:01 +08:00 committed by GitHub
commit 7309ac2721
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 12 deletions

View file

@ -392,10 +392,10 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// Compact creates a new block in the compactor's directory from the blocks in the // Compact creates a new block in the compactor's directory from the blocks in the
// provided directories. // provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
return c.CompactWithPopulateBlockFunc(dest, dirs, open, DefaultPopulateBlockFunc{}) return c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{})
} }
func (c *LeveledCompactor) CompactWithPopulateBlockFunc(dest string, dirs []string, open []*Block, populateBlockFunc PopulateBlockFunc) (uid ulid.ULID, err error) { func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) (uid ulid.ULID, err error) {
var ( var (
blocks []BlockReader blocks []BlockReader
bs []*Block bs []*Block
@ -439,7 +439,7 @@ func (c *LeveledCompactor) CompactWithPopulateBlockFunc(dest string, dirs []stri
uid = ulid.MustNew(ulid.Now(), rand.Reader) uid = ulid.MustNew(ulid.Now(), rand.Reader)
meta := CompactBlockMetas(uid, metas...) meta := CompactBlockMetas(uid, metas...)
err = c.write(dest, meta, populateBlockFunc, blocks...) err = c.write(dest, meta, blockPopulator, blocks...)
if err == nil { if err == nil {
if meta.Stats.NumSamples == 0 { if meta.Stats.NumSamples == 0 {
for _, b := range bs { for _, b := range bs {
@ -505,7 +505,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
} }
} }
err := c.write(dest, meta, DefaultPopulateBlockFunc{}, b) err := c.write(dest, meta, DefaultBlockPopulator{}, b)
if err != nil { if err != nil {
return uid, err return uid, err
} }
@ -550,7 +550,7 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
} }
// write creates a new block that is the union of the provided blocks into dir. // write creates a new block that is the union of the provided blocks into dir.
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, populateBlockFunc PopulateBlockFunc, blocks ...BlockReader) (err error) { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator BlockPopulator, blocks ...BlockReader) (err error) {
dir := filepath.Join(dest, meta.ULID.String()) dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + tmpForCreationBlockDirSuffix tmp := dir + tmpForCreationBlockDirSuffix
var closers []io.Closer var closers []io.Closer
@ -598,7 +598,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, populateBlockFunc
} }
closers = append(closers, indexw) closers = append(closers, indexw)
if err := populateBlockFunc.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil { if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "populate block") return errors.Wrap(err, "populate block")
} }
@ -663,16 +663,16 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, populateBlockFunc
return nil return nil
} }
type PopulateBlockFunc interface { type BlockPopulator interface {
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error
} }
type DefaultPopulateBlockFunc struct{} type DefaultBlockPopulator struct{}
// PopulateBlock fills the index and chunk writers with new data gathered as the union // PopulateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block. // of the provided blocks. It returns meta information for the new block.
// It expects sorted blocks input by mint. // It expects sorted blocks input by mint.
func (c DefaultPopulateBlockFunc) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
if len(blocks) == 0 { if len(blocks) == 0 {
return errors.New("cannot populate block from no readers") return errors.New("cannot populate block from no readers")
} }

View file

@ -441,7 +441,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
tmpdir := t.TempDir() tmpdir := t.TempDir()
require.Error(t, compactor.write(tmpdir, &BlockMeta{}, DefaultPopulateBlockFunc{}, erringBReader{})) require.Error(t, compactor.write(tmpdir, &BlockMeta{}, DefaultBlockPopulator{}, erringBReader{}))
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix) _, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix)
require.True(t, os.IsNotExist(err), "directory is not cleaned up") require.True(t, os.IsNotExist(err), "directory is not cleaned up")
} }
@ -953,8 +953,8 @@ func TestCompaction_populateBlock(t *testing.T) {
} }
iw := &mockIndexWriter{} iw := &mockIndexWriter{}
populateBlockFunc := DefaultPopulateBlockFunc{} blockPopulator := DefaultBlockPopulator{}
err = populateBlockFunc.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{}) err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{})
if tc.expErr != nil { if tc.expErr != nil {
require.Error(t, err) require.Error(t, err)
require.Equal(t, tc.expErr.Error(), err.Error()) require.Equal(t, tc.expErr.Error(), err.Error())