diff --git a/tsdb/compact.go b/tsdb/compact.go index acc1d991df..cb5e8d4c16 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -428,13 +428,13 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { // and returns slice of block IDs. Position of returned block ID in the result slice corresponds to the shard index. // If given output block has no series, corresponding block ID will be zero ULID value. func (c *LeveledCompactor) CompactWithSplitting(dest string, dirs []string, open []*Block, shardCount uint64) (result []ulid.ULID, _ error) { - return c.CompactWithPopulateBlockFunc(dest, dirs, open, DefaultPopulateBlockFunc{}, shardCount) + return c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{}, shardCount) } // Compact creates a new block in the compactor's directory from the blocks in the // provided directories. func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { - ulids, err := c.CompactWithPopulateBlockFunc(dest, dirs, open, DefaultPopulateBlockFunc{}, 1) + ulids, err := c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{}, 1) if err != nil { return ulid.ULID{}, err } @@ -453,7 +453,7 @@ type shardedBlock struct { indexw IndexWriter } -func (c *LeveledCompactor) CompactWithPopulateBlockFunc(dest string, dirs []string, open []*Block, populateBlockFunc PopulateBlockFunc, shardCount uint64) (_ []ulid.ULID, err error) { +func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator, shardCount uint64) (_ []ulid.ULID, err error) { if shardCount == 0 { shardCount = 1 } @@ -487,7 +487,7 @@ func (c *LeveledCompactor) CompactWithPopulateBlockFunc(dest string, dirs []stri outBlocks[ix] = shardedBlock{meta: CompactBlockMetas(ulid.MustNew(outBlocksTime, rand.Reader), metas...)} } - err = c.write(dest, outBlocks, populateBlockFunc, blocks...) + err = c.write(dest, outBlocks, blockPopulator, blocks...) if err == nil { ulids := make([]ulid.ULID, len(outBlocks)) allOutputBlocksAreEmpty := true @@ -619,7 +619,7 @@ func (c *LeveledCompactor) compactOOO(dest string, oooHead *OOOCompactionHead, s } // Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. - err := c.write(dest, outBlocks[ix], DefaultPopulateBlockFunc{}, oooHead.CloneForTimeRange(mint, maxt-1)) + err := c.write(dest, outBlocks[ix], DefaultBlockPopulator{}, oooHead.CloneForTimeRange(mint, maxt-1)) if err != nil { // We need to delete all blocks in case there was an error. for _, obs := range outBlocks { @@ -691,7 +691,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p } } - err := c.write(dest, []shardedBlock{{meta: meta}}, DefaultPopulateBlockFunc{}, b) + err := c.write(dest, []shardedBlock{{meta: meta}}, DefaultBlockPopulator{}, b) if err != nil { return uid, err } @@ -736,7 +736,7 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { } // write creates new output blocks that are the union of the provided blocks into dir. -func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, populateBlockFunc PopulateBlockFunc, blocks ...BlockReader) (err error) { +func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blockPopulator BlockPopulator, blocks ...BlockReader) (err error) { var closers []io.Closer defer func(t time.Time) { @@ -814,7 +814,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, populate } // We use MinTime and MaxTime from first output block, because ALL output blocks have the same min/max times set. - if err := populateBlockFunc.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, c.concurrencyOpts, blocks, outBlocks[0].meta.MinTime, outBlocks[0].meta.MaxTime, outBlocks); err != nil { + if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, c.concurrencyOpts, blocks, outBlocks[0].meta.MinTime, outBlocks[0].meta.MaxTime, outBlocks); err != nil { return errors.Wrap(err, "populate block") } @@ -938,16 +938,16 @@ func timeFromMillis(ms int64) time.Time { return time.Unix(0, ms*int64(time.Millisecond)) } -type PopulateBlockFunc interface { +type BlockPopulator interface { PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, concurrencyOpts LeveledCompactorConcurrencyOptions, blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) error } -type DefaultPopulateBlockFunc struct{} +type DefaultBlockPopulator struct{} // PopulateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. // It expects sorted blocks input by mint. -func (c DefaultPopulateBlockFunc) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, concurrencyOpts LeveledCompactorConcurrencyOptions, blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) { +func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, concurrencyOpts LeveledCompactorConcurrencyOptions, blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) { if len(blocks) == 0 { return errors.New("cannot populate block(s) from no readers") } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 5783cccaf0..ae8ba050d9 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -450,7 +450,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { {meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), crand.Reader)}}, } - require.Error(t, compactor.write(tmpdir, shardedBlocks, DefaultPopulateBlockFunc{}, erringBReader{})) + require.Error(t, compactor.write(tmpdir, shardedBlocks, DefaultBlockPopulator{}, erringBReader{})) // We rely on the fact that blockDir and tmpDir will be updated by compactor.write. for _, b := range shardedBlocks { @@ -1155,8 +1155,8 @@ func TestCompaction_populateBlock(t *testing.T) { iw := &mockIndexWriter{} ob := shardedBlock{meta: meta, indexw: iw, chunkw: nopChunkWriter{}} - populateBlockFunc := DefaultPopulateBlockFunc{} - err = populateBlockFunc.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, c.concurrencyOpts, blocks, meta.MinTime, meta.MaxTime, []shardedBlock{ob}) + blockPopulator := DefaultBlockPopulator{} + err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, c.concurrencyOpts, blocks, meta.MinTime, meta.MaxTime, []shardedBlock{ob}) if tc.expErr != nil { require.Error(t, err)