mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
Compactor with support for splitting input blocks into multiple output blocks.
This commit is contained in:
parent
a9bc9c27ba
commit
78396b67dd
324
tsdb/compact.go
324
tsdb/compact.go
|
@ -388,9 +388,40 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
|
|||
return res
|
||||
}
|
||||
|
||||
// CompactWithSplitting merges and splits the input blocks into shardCount number of output blocks,
|
||||
// and returns slice of block IDs. Position of returned block ID in the result slice corresponds to the shard index.
|
||||
// If given output block has no series, corresponding block ID will be zero ULID value.
|
||||
func (c *LeveledCompactor) CompactWithSplitting(dest string, dirs []string, open []*Block, shardCount uint64) (result []ulid.ULID, _ error) {
|
||||
return c.compact(dest, dirs, open, shardCount)
|
||||
}
|
||||
|
||||
// Compact creates a new block in the compactor's directory from the blocks in the
|
||||
// provided directories.
|
||||
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
|
||||
ulids, err := c.compact(dest, dirs, open, 1)
|
||||
if err != nil {
|
||||
return ulid.ULID{}, err
|
||||
}
|
||||
return ulids[0], nil
|
||||
}
|
||||
|
||||
// shardedBlock describes single *output* block during compaction. This struct is passed between
|
||||
// compaction methods to wrap output block details, index and chunk writer together.
|
||||
// Shard index is determined by the position of this structure in the slice of output blocks.
|
||||
type shardedBlock struct {
|
||||
meta *BlockMeta
|
||||
|
||||
blockDir string
|
||||
tmpDir string // Temp directory used when block is being built (= blockDir + temp suffix)
|
||||
chunkw ChunkWriter
|
||||
indexw IndexWriter
|
||||
}
|
||||
|
||||
func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, shardCount uint64) (_ []ulid.ULID, err error) {
|
||||
if shardCount == 0 {
|
||||
shardCount = 1
|
||||
}
|
||||
|
||||
var (
|
||||
blocks []BlockReader
|
||||
bs []*Block
|
||||
|
@ -402,7 +433,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
|
|||
for _, d := range dirs {
|
||||
meta, _, err := readMetaFile(d)
|
||||
if err != nil {
|
||||
return uid, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var b *Block
|
||||
|
@ -420,7 +451,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
|
|||
var err error
|
||||
b, err = OpenBlock(c.logger, d, c.chunkPool)
|
||||
if err != nil {
|
||||
return uid, err
|
||||
return nil, err
|
||||
}
|
||||
defer b.Close()
|
||||
}
|
||||
|
@ -431,42 +462,58 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
|
|||
uids = append(uids, meta.ULID.String())
|
||||
}
|
||||
|
||||
uid = ulid.MustNew(ulid.Now(), rand.Reader)
|
||||
|
||||
meta := CompactBlockMetas(uid, metas...)
|
||||
err = c.write(dest, meta, blocks...)
|
||||
if err == nil {
|
||||
if meta.Stats.NumSamples == 0 {
|
||||
for _, b := range bs {
|
||||
b.meta.Compaction.Deletable = true
|
||||
n, err := writeMetaFile(c.logger, b.dir, &b.meta)
|
||||
if err != nil {
|
||||
level.Error(c.logger).Log(
|
||||
"msg", "Failed to write 'Deletable' to meta file after compaction",
|
||||
"ulid", b.meta.ULID,
|
||||
)
|
||||
}
|
||||
b.numBytesMeta = n
|
||||
}
|
||||
uid = ulid.ULID{}
|
||||
level.Info(c.logger).Log(
|
||||
"msg", "compact blocks resulted in empty block",
|
||||
"count", len(blocks),
|
||||
"sources", fmt.Sprintf("%v", uids),
|
||||
"duration", time.Since(start),
|
||||
)
|
||||
} else {
|
||||
level.Info(c.logger).Log(
|
||||
"msg", "compact blocks",
|
||||
"count", len(blocks),
|
||||
"mint", meta.MinTime,
|
||||
"maxt", meta.MaxTime,
|
||||
"ulid", meta.ULID,
|
||||
"sources", fmt.Sprintf("%v", uids),
|
||||
"duration", time.Since(start),
|
||||
)
|
||||
outBlocks := make([]shardedBlock, shardCount, shardCount)
|
||||
for ix := range outBlocks {
|
||||
uid := ulid.MustNew(ulid.Now(), rand.Reader)
|
||||
outBlocks[ix] = shardedBlock{
|
||||
meta: CompactBlockMetas(uid, metas...),
|
||||
}
|
||||
return uid, nil
|
||||
}
|
||||
|
||||
err = c.write(dest, outBlocks, blocks...)
|
||||
if err == nil {
|
||||
ulids := make([]ulid.ULID, len(outBlocks), len(outBlocks))
|
||||
|
||||
for ix := range outBlocks {
|
||||
meta := outBlocks[ix].meta
|
||||
|
||||
if meta.Stats.NumSamples == 0 {
|
||||
for _, b := range bs {
|
||||
b.meta.Compaction.Deletable = true
|
||||
n, err := writeMetaFile(c.logger, b.dir, &b.meta)
|
||||
if err != nil {
|
||||
level.Error(c.logger).Log(
|
||||
"msg", "Failed to write 'Deletable' to meta file after compaction",
|
||||
"ulid", b.meta.ULID,
|
||||
"shard", fmt.Sprintf("%d_of_%d", ix, shardCount),
|
||||
)
|
||||
}
|
||||
b.numBytesMeta = n
|
||||
}
|
||||
outBlocks[ix].meta.ULID = ulid.ULID{}
|
||||
level.Info(c.logger).Log(
|
||||
"msg", "compact blocks resulted in empty block",
|
||||
"count", len(blocks),
|
||||
"sources", fmt.Sprintf("%v", uids),
|
||||
"duration", time.Since(start),
|
||||
"shard", fmt.Sprintf("%d_of_%d", ix, shardCount),
|
||||
)
|
||||
} else {
|
||||
level.Info(c.logger).Log(
|
||||
"msg", "compact blocks",
|
||||
"count", len(blocks),
|
||||
"mint", meta.MinTime,
|
||||
"maxt", meta.MaxTime,
|
||||
"ulid", meta.ULID,
|
||||
"sources", fmt.Sprintf("%v", uids),
|
||||
"duration", time.Since(start),
|
||||
"shard", fmt.Sprintf("%d_of_%d", ix, shardCount),
|
||||
)
|
||||
}
|
||||
|
||||
ulids[ix] = outBlocks[ix].meta.ULID
|
||||
}
|
||||
return ulids, nil
|
||||
}
|
||||
|
||||
errs := tsdb_errors.NewMulti(err)
|
||||
|
@ -478,7 +525,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
|
|||
}
|
||||
}
|
||||
|
||||
return uid, errs.Err()
|
||||
return nil, errs.Err()
|
||||
}
|
||||
|
||||
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
|
||||
|
@ -500,7 +547,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
|
|||
}
|
||||
}
|
||||
|
||||
err := c.write(dest, meta, b)
|
||||
err := c.write(dest, []shardedBlock{{meta: meta}}, b)
|
||||
if err != nil {
|
||||
return uid, err
|
||||
}
|
||||
|
@ -544,63 +591,79 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
|
|||
return w.ChunkWriter.WriteChunks(chunks...)
|
||||
}
|
||||
|
||||
// write creates a new block that is the union of the provided blocks into dir.
|
||||
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
|
||||
dir := filepath.Join(dest, meta.ULID.String())
|
||||
tmp := dir + tmpForCreationBlockDirSuffix
|
||||
// write creates new output blocks that are the union of the provided blocks into dir.
|
||||
func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks ...BlockReader) (err error) {
|
||||
var closers []io.Closer
|
||||
|
||||
defer func(t time.Time) {
|
||||
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err()
|
||||
|
||||
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
|
||||
if err := os.RemoveAll(tmp); err != nil {
|
||||
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
|
||||
for ix := 0; ix < len(outBlocks); ix++ {
|
||||
if outBlocks[ix].tmpDir == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := os.RemoveAll(outBlocks[ix].tmpDir); err != nil {
|
||||
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
|
||||
}
|
||||
}
|
||||
c.metrics.ran.Inc()
|
||||
c.metrics.duration.Observe(time.Since(t).Seconds())
|
||||
}(time.Now())
|
||||
|
||||
if err = os.RemoveAll(tmp); err != nil {
|
||||
return err
|
||||
}
|
||||
for ix := range outBlocks {
|
||||
dir := filepath.Join(dest, outBlocks[ix].meta.ULID.String())
|
||||
tmp := dir + tmpForCreationBlockDirSuffix
|
||||
|
||||
if err = os.MkdirAll(tmp, 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
outBlocks[ix].blockDir = dir
|
||||
outBlocks[ix].tmpDir = tmp
|
||||
|
||||
// Populate chunk and index files into temporary directory with
|
||||
// data of all blocks.
|
||||
var chunkw ChunkWriter
|
||||
|
||||
chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open chunk writer")
|
||||
}
|
||||
closers = append(closers, chunkw)
|
||||
// Record written chunk sizes on level 1 compactions.
|
||||
if meta.Compaction.Level == 1 {
|
||||
chunkw = &instrumentedChunkWriter{
|
||||
ChunkWriter: chunkw,
|
||||
size: c.metrics.chunkSize,
|
||||
samples: c.metrics.chunkSamples,
|
||||
trange: c.metrics.chunkRange,
|
||||
if err = os.RemoveAll(tmp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = os.MkdirAll(tmp, 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Populate chunk and index files into temporary directory with
|
||||
// data of all blocks.
|
||||
var chunkw ChunkWriter
|
||||
chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open chunk writer")
|
||||
}
|
||||
|
||||
closers = append(closers, chunkw)
|
||||
|
||||
// Record written chunk sizes on level 1 compactions.
|
||||
if outBlocks[ix].meta.Compaction.Level == 1 {
|
||||
chunkw = &instrumentedChunkWriter{
|
||||
ChunkWriter: chunkw,
|
||||
size: c.metrics.chunkSize,
|
||||
samples: c.metrics.chunkSamples,
|
||||
trange: c.metrics.chunkRange,
|
||||
}
|
||||
}
|
||||
|
||||
outBlocks[ix].chunkw = chunkw
|
||||
|
||||
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open index writer")
|
||||
}
|
||||
closers = append(closers, indexw)
|
||||
|
||||
outBlocks[ix].indexw = indexw
|
||||
}
|
||||
|
||||
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open index writer")
|
||||
}
|
||||
closers = append(closers, indexw)
|
||||
|
||||
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
|
||||
if err := c.populateBlock(blocks, outBlocks[0].meta.MinTime, outBlocks[0].meta.MaxTime, outBlocks); err != nil {
|
||||
return errors.Wrap(err, "populate block")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return c.ctx.Err()
|
||||
default:
|
||||
if err := c.ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We are explicitly closing them here to check for error even
|
||||
|
@ -616,54 +679,59 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
|||
return errs.Err()
|
||||
}
|
||||
|
||||
// Populated block is empty, so exit early.
|
||||
if meta.Stats.NumSamples == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err = writeMetaFile(c.logger, tmp, meta); err != nil {
|
||||
return errors.Wrap(err, "write merged meta")
|
||||
}
|
||||
|
||||
// Create an empty tombstones file.
|
||||
if _, err := tombstones.WriteFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil {
|
||||
return errors.Wrap(err, "write new tombstones file")
|
||||
}
|
||||
|
||||
df, err := fileutil.OpenDir(tmp)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open temporary block dir")
|
||||
}
|
||||
defer func() {
|
||||
if df != nil {
|
||||
df.Close()
|
||||
for ix := range outBlocks {
|
||||
ob := outBlocks[ix]
|
||||
// Populated block is empty, don't write meta file for it.
|
||||
if ob.meta.Stats.NumSamples == 0 {
|
||||
continue
|
||||
}
|
||||
}()
|
||||
|
||||
if err := df.Sync(); err != nil {
|
||||
return errors.Wrap(err, "sync temporary dir file")
|
||||
}
|
||||
if _, err = writeMetaFile(c.logger, ob.tmpDir, ob.meta); err != nil {
|
||||
return errors.Wrap(err, "write merged meta")
|
||||
}
|
||||
|
||||
// Close temp dir before rename block dir (for windows platform).
|
||||
if err = df.Close(); err != nil {
|
||||
return errors.Wrap(err, "close temporary dir")
|
||||
}
|
||||
df = nil
|
||||
// Create an empty tombstones file.
|
||||
if _, err := tombstones.WriteFile(c.logger, ob.tmpDir, tombstones.NewMemTombstones()); err != nil {
|
||||
return errors.Wrap(err, "write new tombstones file")
|
||||
}
|
||||
|
||||
// Block successfully written, make it visible in destination dir by moving it from tmp one.
|
||||
if err := fileutil.Replace(tmp, dir); err != nil {
|
||||
return errors.Wrap(err, "rename block dir")
|
||||
df, err := fileutil.OpenDir(ob.tmpDir)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open temporary block dir")
|
||||
}
|
||||
defer func() {
|
||||
if df != nil {
|
||||
_ = df.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
if err := df.Sync(); err != nil {
|
||||
return errors.Wrap(err, "sync temporary dir file")
|
||||
}
|
||||
|
||||
// Close temp dir before rename block dir (for windows platform).
|
||||
if err = df.Close(); err != nil {
|
||||
return errors.Wrap(err, "close temporary dir")
|
||||
}
|
||||
df = nil
|
||||
|
||||
// Block successfully written, make it visible in destination dir by moving it from tmp one.
|
||||
if err := fileutil.Replace(ob.tmpDir, ob.blockDir); err != nil {
|
||||
return errors.Wrap(err, "rename block dir")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// populateBlock fills the index and chunk writers with new data gathered as the union
|
||||
// of the provided blocks. It returns meta information for the new block.
|
||||
// populateBlock fills the index and chunk writers of output blocks with new data gathered as the union
|
||||
// of the provided blocks.
|
||||
// It expects sorted blocks input by mint.
|
||||
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
|
||||
// If there is more than 1 output block, each output block will only contain series that hash into its shard
|
||||
// (based on total number of output blocks).
|
||||
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) {
|
||||
if len(blocks) == 0 {
|
||||
return errors.New("cannot populate block from no readers")
|
||||
return errors.New("cannot populate block(s) from no readers")
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -694,7 +762,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
if i > 0 && b.Meta().MinTime < globalMaxt {
|
||||
c.metrics.overlappingBlocks.Inc()
|
||||
overlapping = true
|
||||
level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction", "ulid", meta.ULID)
|
||||
level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction" /* "ulid", meta.ULID */)
|
||||
}
|
||||
if b.Meta().MaxTime > globalMaxt {
|
||||
globalMaxt = b.Meta().MaxTime
|
||||
|
@ -726,7 +794,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
}
|
||||
all = indexr.SortedPostings(all)
|
||||
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
|
||||
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1))
|
||||
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1))
|
||||
syms := indexr.Symbols()
|
||||
if i == 0 {
|
||||
symbols = syms
|
||||
|
@ -736,8 +804,10 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
}
|
||||
|
||||
for symbols.Next() {
|
||||
if err := indexw.AddSymbol(symbols.At()); err != nil {
|
||||
return errors.Wrap(err, "add symbol")
|
||||
for ix := range outBlocks {
|
||||
if err := outBlocks[ix].indexw.AddSymbol(symbols.At()); err != nil {
|
||||
return errors.Wrap(err, "add symbol")
|
||||
}
|
||||
}
|
||||
}
|
||||
if symbols.Err() != nil {
|
||||
|
@ -764,6 +834,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
default:
|
||||
}
|
||||
s := set.At()
|
||||
|
||||
chksIter := s.Iterator()
|
||||
chks = chks[:0]
|
||||
for chksIter.Next() {
|
||||
|
@ -780,17 +851,22 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
continue
|
||||
}
|
||||
|
||||
if err := chunkw.WriteChunks(chks...); err != nil {
|
||||
ob := outBlocks[0]
|
||||
if len(outBlocks) > 1 {
|
||||
ob = outBlocks[s.Labels().Hash()%uint64(len(outBlocks))]
|
||||
}
|
||||
|
||||
if err := ob.chunkw.WriteChunks(chks...); err != nil {
|
||||
return errors.Wrap(err, "write chunks")
|
||||
}
|
||||
if err := indexw.AddSeries(ref, s.Labels(), chks...); err != nil {
|
||||
if err := ob.indexw.AddSeries(ref, s.Labels(), chks...); err != nil {
|
||||
return errors.Wrap(err, "add series")
|
||||
}
|
||||
|
||||
meta.Stats.NumChunks += uint64(len(chks))
|
||||
meta.Stats.NumSeries++
|
||||
ob.meta.Stats.NumChunks += uint64(len(chks))
|
||||
ob.meta.Stats.NumSeries++
|
||||
for _, chk := range chks {
|
||||
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
|
||||
ob.meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
|
||||
}
|
||||
|
||||
for _, chk := range chks {
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/oklog/ulid"
|
||||
"github.com/pkg/errors"
|
||||
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -33,6 +34,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/index"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
)
|
||||
|
||||
|
@ -440,7 +442,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
|
|||
require.NoError(t, os.RemoveAll(tmpdir))
|
||||
}()
|
||||
|
||||
require.Error(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{}))
|
||||
require.Error(t, compactor.write(tmpdir, []shardedBlock{{meta: &BlockMeta{}}}, erringBReader{}))
|
||||
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix)
|
||||
require.True(t, os.IsNotExist(err), "directory is not cleaned up")
|
||||
}
|
||||
|
@ -484,6 +486,91 @@ func samplesForRange(minTime, maxTime int64, maxSamplesPerChunk int) (ret [][]sa
|
|||
return ret
|
||||
}
|
||||
|
||||
func TestCompaction_CompactWithSplitting(t *testing.T) {
|
||||
seriesCounts := []int{10, 1234}
|
||||
shardCounts := []uint64{1, 13}
|
||||
|
||||
for _, series := range seriesCounts {
|
||||
dir, err := ioutil.TempDir("", "compact")
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, os.RemoveAll(dir))
|
||||
}()
|
||||
|
||||
ranges := [][2]int64{{0, 5000}, {3000, 8000}, {6000, 11000}, {9000, 14000}}
|
||||
|
||||
// Generate blocks.
|
||||
var blockDirs []string
|
||||
var openBlocks []*Block
|
||||
|
||||
for _, r := range ranges {
|
||||
block, err := OpenBlock(nil, createBlock(t, dir, genSeries(series, 10, r[0], r[1])), nil)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, block.Close())
|
||||
}()
|
||||
|
||||
openBlocks = append(openBlocks, block)
|
||||
blockDirs = append(blockDirs, block.Dir())
|
||||
}
|
||||
|
||||
for _, shardCount := range shardCounts {
|
||||
t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) {
|
||||
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, shardCount, uint64(len(blockIDs)))
|
||||
|
||||
// Verify resulting blocks. We will iterate over all series in all blocks, and check two things:
|
||||
// 1) Make sure that each series in the block belongs to the block (based on sharding).
|
||||
// 2) Verify that total number of series over all blocks is correct.
|
||||
totalSeries := uint64(0)
|
||||
|
||||
for shardIndex, blockID := range blockIDs {
|
||||
// Some blocks may be empty, they will have zero block ID.
|
||||
if blockID == (ulid.ULID{}) {
|
||||
continue
|
||||
}
|
||||
|
||||
block, err := OpenBlock(log.NewNopLogger(), filepath.Join(dir, blockID.String()), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
require.NoError(t, block.Close())
|
||||
}()
|
||||
|
||||
totalSeries += block.Meta().Stats.NumSeries
|
||||
|
||||
idxr, err := block.Index()
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
require.NoError(t, idxr.Close())
|
||||
}()
|
||||
|
||||
k, v := index.AllPostingsKey()
|
||||
p, err := idxr.Postings(k, v)
|
||||
require.NoError(t, err)
|
||||
|
||||
var lbls labels.Labels
|
||||
for p.Next() {
|
||||
ref := p.At()
|
||||
require.NoError(t, idxr.Series(ref, &lbls, nil))
|
||||
|
||||
require.Equal(t, uint64(shardIndex), lbls.Hash()%shardCount)
|
||||
}
|
||||
require.NoError(t, p.Err())
|
||||
}
|
||||
|
||||
require.Equal(t, uint64(series), totalSeries)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompaction_populateBlock(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
title string
|
||||
|
@ -496,7 +583,7 @@ func TestCompaction_populateBlock(t *testing.T) {
|
|||
{
|
||||
title: "Populate block from empty input should return error.",
|
||||
inputSeriesSamples: [][]seriesSamples{},
|
||||
expErr: errors.New("cannot populate block from no readers"),
|
||||
expErr: errors.New("cannot populate block(s) from no readers"),
|
||||
},
|
||||
{
|
||||
// Populate from single block without chunks. We expect these kind of series being ignored.
|
||||
|
@ -952,7 +1039,8 @@ func TestCompaction_populateBlock(t *testing.T) {
|
|||
}
|
||||
|
||||
iw := &mockIndexWriter{}
|
||||
err = c.populateBlock(blocks, meta, iw, nopChunkWriter{})
|
||||
ob := shardedBlock{meta: meta, indexw: iw, chunkw: nopChunkWriter{}}
|
||||
err = c.populateBlock(blocks, meta.MinTime, meta.MaxTime, []shardedBlock{ob})
|
||||
if tc.expErr != nil {
|
||||
require.Error(t, err)
|
||||
require.Equal(t, tc.expErr.Error(), err.Error())
|
||||
|
|
Loading…
Reference in a new issue