mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-14 17:44:06 -08:00
Merge pull request #12257 from alexqyle/block-populator-rename
Rename PopulateBlockFunc to BlockPopulator
This commit is contained in:
commit
7309ac2721
|
@ -392,10 +392,10 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
|
||||||
// Compact creates a new block in the compactor's directory from the blocks in the
|
// Compact creates a new block in the compactor's directory from the blocks in the
|
||||||
// provided directories.
|
// provided directories.
|
||||||
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
|
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
|
||||||
return c.CompactWithPopulateBlockFunc(dest, dirs, open, DefaultPopulateBlockFunc{})
|
return c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LeveledCompactor) CompactWithPopulateBlockFunc(dest string, dirs []string, open []*Block, populateBlockFunc PopulateBlockFunc) (uid ulid.ULID, err error) {
|
func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) (uid ulid.ULID, err error) {
|
||||||
var (
|
var (
|
||||||
blocks []BlockReader
|
blocks []BlockReader
|
||||||
bs []*Block
|
bs []*Block
|
||||||
|
@ -439,7 +439,7 @@ func (c *LeveledCompactor) CompactWithPopulateBlockFunc(dest string, dirs []stri
|
||||||
uid = ulid.MustNew(ulid.Now(), rand.Reader)
|
uid = ulid.MustNew(ulid.Now(), rand.Reader)
|
||||||
|
|
||||||
meta := CompactBlockMetas(uid, metas...)
|
meta := CompactBlockMetas(uid, metas...)
|
||||||
err = c.write(dest, meta, populateBlockFunc, blocks...)
|
err = c.write(dest, meta, blockPopulator, blocks...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if meta.Stats.NumSamples == 0 {
|
if meta.Stats.NumSamples == 0 {
|
||||||
for _, b := range bs {
|
for _, b := range bs {
|
||||||
|
@ -505,7 +505,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.write(dest, meta, DefaultPopulateBlockFunc{}, b)
|
err := c.write(dest, meta, DefaultBlockPopulator{}, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return uid, err
|
return uid, err
|
||||||
}
|
}
|
||||||
|
@ -550,7 +550,7 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// write creates a new block that is the union of the provided blocks into dir.
|
// write creates a new block that is the union of the provided blocks into dir.
|
||||||
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, populateBlockFunc PopulateBlockFunc, blocks ...BlockReader) (err error) {
|
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator BlockPopulator, blocks ...BlockReader) (err error) {
|
||||||
dir := filepath.Join(dest, meta.ULID.String())
|
dir := filepath.Join(dest, meta.ULID.String())
|
||||||
tmp := dir + tmpForCreationBlockDirSuffix
|
tmp := dir + tmpForCreationBlockDirSuffix
|
||||||
var closers []io.Closer
|
var closers []io.Closer
|
||||||
|
@ -598,7 +598,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, populateBlockFunc
|
||||||
}
|
}
|
||||||
closers = append(closers, indexw)
|
closers = append(closers, indexw)
|
||||||
|
|
||||||
if err := populateBlockFunc.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil {
|
if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil {
|
||||||
return errors.Wrap(err, "populate block")
|
return errors.Wrap(err, "populate block")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -663,16 +663,16 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, populateBlockFunc
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type PopulateBlockFunc interface {
|
type BlockPopulator interface {
|
||||||
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error
|
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type DefaultPopulateBlockFunc struct{}
|
type DefaultBlockPopulator struct{}
|
||||||
|
|
||||||
// PopulateBlock fills the index and chunk writers with new data gathered as the union
|
// PopulateBlock fills the index and chunk writers with new data gathered as the union
|
||||||
// of the provided blocks. It returns meta information for the new block.
|
// of the provided blocks. It returns meta information for the new block.
|
||||||
// It expects sorted blocks input by mint.
|
// It expects sorted blocks input by mint.
|
||||||
func (c DefaultPopulateBlockFunc) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
|
func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
|
||||||
if len(blocks) == 0 {
|
if len(blocks) == 0 {
|
||||||
return errors.New("cannot populate block from no readers")
|
return errors.New("cannot populate block from no readers")
|
||||||
}
|
}
|
||||||
|
|
|
@ -441,7 +441,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
|
||||||
|
|
||||||
tmpdir := t.TempDir()
|
tmpdir := t.TempDir()
|
||||||
|
|
||||||
require.Error(t, compactor.write(tmpdir, &BlockMeta{}, DefaultPopulateBlockFunc{}, erringBReader{}))
|
require.Error(t, compactor.write(tmpdir, &BlockMeta{}, DefaultBlockPopulator{}, erringBReader{}))
|
||||||
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix)
|
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix)
|
||||||
require.True(t, os.IsNotExist(err), "directory is not cleaned up")
|
require.True(t, os.IsNotExist(err), "directory is not cleaned up")
|
||||||
}
|
}
|
||||||
|
@ -953,8 +953,8 @@ func TestCompaction_populateBlock(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
iw := &mockIndexWriter{}
|
iw := &mockIndexWriter{}
|
||||||
populateBlockFunc := DefaultPopulateBlockFunc{}
|
blockPopulator := DefaultBlockPopulator{}
|
||||||
err = populateBlockFunc.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{})
|
err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{})
|
||||||
if tc.expErr != nil {
|
if tc.expErr != nil {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Equal(t, tc.expErr.Error(), err.Error())
|
require.Equal(t, tc.expErr.Error(), err.Error())
|
||||||
|
|
Loading…
Reference in a new issue