mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Merge pull request #13 from grafana/splitting-compactor
Compactor with support for splitting input blocks into multiple output blocks
This commit is contained in:
commit
04e7926b03
199
tsdb/compact.go
199
tsdb/compact.go
|
@ -388,9 +388,40 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CompactWithSplitting merges and splits the input blocks into shardCount number of output blocks,
|
||||||
|
// and returns slice of block IDs. Position of returned block ID in the result slice corresponds to the shard index.
|
||||||
|
// If given output block has no series, corresponding block ID will be zero ULID value.
|
||||||
|
func (c *LeveledCompactor) CompactWithSplitting(dest string, dirs []string, open []*Block, shardCount uint64) (result []ulid.ULID, _ error) {
|
||||||
|
return c.compact(dest, dirs, open, shardCount)
|
||||||
|
}
|
||||||
|
|
||||||
// Compact creates a new block in the compactor's directory from the blocks in the
|
// Compact creates a new block in the compactor's directory from the blocks in the
|
||||||
// provided directories.
|
// provided directories.
|
||||||
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
|
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
|
||||||
|
ulids, err := c.compact(dest, dirs, open, 1)
|
||||||
|
if err != nil {
|
||||||
|
return ulid.ULID{}, err
|
||||||
|
}
|
||||||
|
return ulids[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// shardedBlock describes single *output* block during compaction. This struct is passed between
|
||||||
|
// compaction methods to wrap output block details, index and chunk writer together.
|
||||||
|
// Shard index is determined by the position of this structure in the slice of output blocks.
|
||||||
|
type shardedBlock struct {
|
||||||
|
meta *BlockMeta
|
||||||
|
|
||||||
|
blockDir string
|
||||||
|
tmpDir string // Temp directory used when block is being built (= blockDir + temp suffix)
|
||||||
|
chunkw ChunkWriter
|
||||||
|
indexw IndexWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, shardCount uint64) (_ []ulid.ULID, err error) {
|
||||||
|
if shardCount == 0 {
|
||||||
|
shardCount = 1
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
blocks []BlockReader
|
blocks []BlockReader
|
||||||
bs []*Block
|
bs []*Block
|
||||||
|
@ -402,7 +433,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
|
||||||
for _, d := range dirs {
|
for _, d := range dirs {
|
||||||
meta, _, err := readMetaFile(d)
|
meta, _, err := readMetaFile(d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return uid, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var b *Block
|
var b *Block
|
||||||
|
@ -420,7 +451,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
|
||||||
var err error
|
var err error
|
||||||
b, err = OpenBlock(c.logger, d, c.chunkPool)
|
b, err = OpenBlock(c.logger, d, c.chunkPool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return uid, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer b.Close()
|
defer b.Close()
|
||||||
}
|
}
|
||||||
|
@ -431,12 +462,47 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
|
||||||
uids = append(uids, meta.ULID.String())
|
uids = append(uids, meta.ULID.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
uid = ulid.MustNew(ulid.Now(), rand.Reader)
|
outBlocks := make([]shardedBlock, shardCount)
|
||||||
|
outBlocksTime := ulid.Now() // Make all out blocks share the same timestamp in the ULID.
|
||||||
|
for ix := range outBlocks {
|
||||||
|
outBlocks[ix] = shardedBlock{meta: CompactBlockMetas(ulid.MustNew(outBlocksTime, rand.Reader), metas...)}
|
||||||
|
}
|
||||||
|
|
||||||
meta := CompactBlockMetas(uid, metas...)
|
err = c.write(dest, outBlocks, blocks...)
|
||||||
err = c.write(dest, meta, blocks...)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
ulids := make([]ulid.ULID, len(outBlocks))
|
||||||
|
allOutputBlocksAreEmpty := true
|
||||||
|
|
||||||
|
for ix := range outBlocks {
|
||||||
|
meta := outBlocks[ix].meta
|
||||||
|
|
||||||
if meta.Stats.NumSamples == 0 {
|
if meta.Stats.NumSamples == 0 {
|
||||||
|
level.Info(c.logger).Log(
|
||||||
|
"msg", "compact blocks resulted in empty block",
|
||||||
|
"count", len(blocks),
|
||||||
|
"sources", fmt.Sprintf("%v", uids),
|
||||||
|
"duration", time.Since(start),
|
||||||
|
"shard", fmt.Sprintf("%d_of_%d", ix+1, shardCount),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
allOutputBlocksAreEmpty = false
|
||||||
|
ulids[ix] = outBlocks[ix].meta.ULID
|
||||||
|
|
||||||
|
level.Info(c.logger).Log(
|
||||||
|
"msg", "compact blocks",
|
||||||
|
"count", len(blocks),
|
||||||
|
"mint", meta.MinTime,
|
||||||
|
"maxt", meta.MaxTime,
|
||||||
|
"ulid", meta.ULID,
|
||||||
|
"sources", fmt.Sprintf("%v", uids),
|
||||||
|
"duration", time.Since(start),
|
||||||
|
"shard", fmt.Sprintf("%d_of_%d", ix+1, shardCount),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if allOutputBlocksAreEmpty {
|
||||||
|
// Mark source blocks as deletable.
|
||||||
for _, b := range bs {
|
for _, b := range bs {
|
||||||
b.meta.Compaction.Deletable = true
|
b.meta.Compaction.Deletable = true
|
||||||
n, err := writeMetaFile(c.logger, b.dir, &b.meta)
|
n, err := writeMetaFile(c.logger, b.dir, &b.meta)
|
||||||
|
@ -448,25 +514,9 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
|
||||||
}
|
}
|
||||||
b.numBytesMeta = n
|
b.numBytesMeta = n
|
||||||
}
|
}
|
||||||
uid = ulid.ULID{}
|
|
||||||
level.Info(c.logger).Log(
|
|
||||||
"msg", "compact blocks resulted in empty block",
|
|
||||||
"count", len(blocks),
|
|
||||||
"sources", fmt.Sprintf("%v", uids),
|
|
||||||
"duration", time.Since(start),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
level.Info(c.logger).Log(
|
|
||||||
"msg", "compact blocks",
|
|
||||||
"count", len(blocks),
|
|
||||||
"mint", meta.MinTime,
|
|
||||||
"maxt", meta.MaxTime,
|
|
||||||
"ulid", meta.ULID,
|
|
||||||
"sources", fmt.Sprintf("%v", uids),
|
|
||||||
"duration", time.Since(start),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
return uid, nil
|
|
||||||
|
return ulids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
errs := tsdb_errors.NewMulti(err)
|
errs := tsdb_errors.NewMulti(err)
|
||||||
|
@ -478,7 +528,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return uid, errs.Err()
|
return nil, errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
|
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
|
||||||
|
@ -500,7 +550,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.write(dest, meta, b)
|
err := c.write(dest, []shardedBlock{{meta: meta}}, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return uid, err
|
return uid, err
|
||||||
}
|
}
|
||||||
|
@ -544,22 +594,42 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
|
||||||
return w.ChunkWriter.WriteChunks(chunks...)
|
return w.ChunkWriter.WriteChunks(chunks...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// write creates a new block that is the union of the provided blocks into dir.
|
// write creates new output blocks that are the union of the provided blocks into dir.
|
||||||
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
|
func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks ...BlockReader) (err error) {
|
||||||
dir := filepath.Join(dest, meta.ULID.String())
|
|
||||||
tmp := dir + tmpForCreationBlockDirSuffix
|
|
||||||
var closers []io.Closer
|
var closers []io.Closer
|
||||||
|
|
||||||
defer func(t time.Time) {
|
defer func(t time.Time) {
|
||||||
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err()
|
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err()
|
||||||
|
|
||||||
|
for _, ob := range outBlocks {
|
||||||
|
if ob.tmpDir != "" {
|
||||||
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
|
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
|
||||||
if err := os.RemoveAll(tmp); err != nil {
|
if removeErr := os.RemoveAll(ob.tmpDir); removeErr != nil {
|
||||||
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
|
level.Error(c.logger).Log("msg", "Failed to remove temp folder after failed compaction", "dir", ob.tmpDir, "err", removeErr.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If there was any error, and we have multiple output blocks, some blocks may have been generated, or at
|
||||||
|
// least have existing blockDir. In such case, we want to remove them.
|
||||||
|
// BlockDir may also not be set yet, if preparation for some previous blocks have failed.
|
||||||
|
if err != nil && ob.blockDir != "" {
|
||||||
|
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
|
||||||
|
if removeErr := os.RemoveAll(ob.blockDir); removeErr != nil {
|
||||||
|
level.Error(c.logger).Log("msg", "Failed to remove block folder after failed compaction", "dir", ob.blockDir, "err", removeErr.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
c.metrics.ran.Inc()
|
c.metrics.ran.Inc()
|
||||||
c.metrics.duration.Observe(time.Since(t).Seconds())
|
c.metrics.duration.Observe(time.Since(t).Seconds())
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
|
for ix := range outBlocks {
|
||||||
|
dir := filepath.Join(dest, outBlocks[ix].meta.ULID.String())
|
||||||
|
tmp := dir + tmpForCreationBlockDirSuffix
|
||||||
|
|
||||||
|
outBlocks[ix].blockDir = dir
|
||||||
|
outBlocks[ix].tmpDir = tmp
|
||||||
|
|
||||||
if err = os.RemoveAll(tmp); err != nil {
|
if err = os.RemoveAll(tmp); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -571,14 +641,15 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
// Populate chunk and index files into temporary directory with
|
// Populate chunk and index files into temporary directory with
|
||||||
// data of all blocks.
|
// data of all blocks.
|
||||||
var chunkw ChunkWriter
|
var chunkw ChunkWriter
|
||||||
|
|
||||||
chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize)
|
chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "open chunk writer")
|
return errors.Wrap(err, "open chunk writer")
|
||||||
}
|
}
|
||||||
|
|
||||||
closers = append(closers, chunkw)
|
closers = append(closers, chunkw)
|
||||||
|
|
||||||
// Record written chunk sizes on level 1 compactions.
|
// Record written chunk sizes on level 1 compactions.
|
||||||
if meta.Compaction.Level == 1 {
|
if outBlocks[ix].meta.Compaction.Level == 1 {
|
||||||
chunkw = &instrumentedChunkWriter{
|
chunkw = &instrumentedChunkWriter{
|
||||||
ChunkWriter: chunkw,
|
ChunkWriter: chunkw,
|
||||||
size: c.metrics.chunkSize,
|
size: c.metrics.chunkSize,
|
||||||
|
@ -587,13 +658,19 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
outBlocks[ix].chunkw = chunkw
|
||||||
|
|
||||||
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
|
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "open index writer")
|
return errors.Wrap(err, "open index writer")
|
||||||
}
|
}
|
||||||
closers = append(closers, indexw)
|
closers = append(closers, indexw)
|
||||||
|
|
||||||
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
|
outBlocks[ix].indexw = indexw
|
||||||
|
}
|
||||||
|
|
||||||
|
// We use MinTime and MaxTime from first output block, because ALL output blocks have the same min/max times set.
|
||||||
|
if err := c.populateBlock(blocks, outBlocks[0].meta.MinTime, outBlocks[0].meta.MaxTime, outBlocks); err != nil {
|
||||||
return errors.Wrap(err, "populate block")
|
return errors.Wrap(err, "populate block")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -616,21 +693,22 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
return errs.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Populated block is empty, so exit early.
|
for _, ob := range outBlocks {
|
||||||
if meta.Stats.NumSamples == 0 {
|
// Populated block is empty, don't write meta file for it.
|
||||||
return nil
|
if ob.meta.Stats.NumSamples == 0 {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = writeMetaFile(c.logger, tmp, meta); err != nil {
|
if _, err = writeMetaFile(c.logger, ob.tmpDir, ob.meta); err != nil {
|
||||||
return errors.Wrap(err, "write merged meta")
|
return errors.Wrap(err, "write merged meta")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create an empty tombstones file.
|
// Create an empty tombstones file.
|
||||||
if _, err := tombstones.WriteFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil {
|
if _, err := tombstones.WriteFile(c.logger, ob.tmpDir, tombstones.NewMemTombstones()); err != nil {
|
||||||
return errors.Wrap(err, "write new tombstones file")
|
return errors.Wrap(err, "write new tombstones file")
|
||||||
}
|
}
|
||||||
|
|
||||||
df, err := fileutil.OpenDir(tmp)
|
df, err := fileutil.OpenDir(ob.tmpDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "open temporary block dir")
|
return errors.Wrap(err, "open temporary block dir")
|
||||||
}
|
}
|
||||||
|
@ -651,19 +729,22 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
df = nil
|
df = nil
|
||||||
|
|
||||||
// Block successfully written, make it visible in destination dir by moving it from tmp one.
|
// Block successfully written, make it visible in destination dir by moving it from tmp one.
|
||||||
if err := fileutil.Replace(tmp, dir); err != nil {
|
if err := fileutil.Replace(ob.tmpDir, ob.blockDir); err != nil {
|
||||||
return errors.Wrap(err, "rename block dir")
|
return errors.Wrap(err, "rename block dir")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// populateBlock fills the index and chunk writers with new data gathered as the union
|
// populateBlock fills the index and chunk writers of output blocks with new data gathered as the union
|
||||||
// of the provided blocks. It returns meta information for the new block.
|
// of the provided blocks.
|
||||||
// It expects sorted blocks input by mint.
|
// It expects sorted blocks input by mint.
|
||||||
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
|
// If there is more than 1 output block, each output block will only contain series that hash into its shard
|
||||||
|
// (based on total number of output blocks).
|
||||||
|
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) {
|
||||||
if len(blocks) == 0 {
|
if len(blocks) == 0 {
|
||||||
return errors.New("cannot populate block from no readers")
|
return errors.New("cannot populate block(s) from no readers")
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -694,7 +775,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
if i > 0 && b.Meta().MinTime < globalMaxt {
|
if i > 0 && b.Meta().MinTime < globalMaxt {
|
||||||
c.metrics.overlappingBlocks.Inc()
|
c.metrics.overlappingBlocks.Inc()
|
||||||
overlapping = true
|
overlapping = true
|
||||||
level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction", "ulid", meta.ULID)
|
level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction")
|
||||||
}
|
}
|
||||||
if b.Meta().MaxTime > globalMaxt {
|
if b.Meta().MaxTime > globalMaxt {
|
||||||
globalMaxt = b.Meta().MaxTime
|
globalMaxt = b.Meta().MaxTime
|
||||||
|
@ -726,7 +807,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
}
|
}
|
||||||
all = indexr.SortedPostings(all)
|
all = indexr.SortedPostings(all)
|
||||||
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
|
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
|
||||||
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1))
|
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1))
|
||||||
syms := indexr.Symbols()
|
syms := indexr.Symbols()
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
symbols = syms
|
symbols = syms
|
||||||
|
@ -736,16 +817,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
}
|
}
|
||||||
|
|
||||||
for symbols.Next() {
|
for symbols.Next() {
|
||||||
if err := indexw.AddSymbol(symbols.At()); err != nil {
|
for _, ob := range outBlocks {
|
||||||
|
if err := ob.indexw.AddSymbol(symbols.At()); err != nil {
|
||||||
return errors.Wrap(err, "add symbol")
|
return errors.Wrap(err, "add symbol")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if symbols.Err() != nil {
|
if symbols.Err() != nil {
|
||||||
return errors.Wrap(symbols.Err(), "next symbol")
|
return errors.Wrap(symbols.Err(), "next symbol")
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ref = uint64(0)
|
refs = make([]uint64, len(outBlocks))
|
||||||
chks []chunks.Meta
|
chks []chunks.Meta
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -764,6 +847,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
s := set.At()
|
s := set.At()
|
||||||
|
|
||||||
chksIter := s.Iterator()
|
chksIter := s.Iterator()
|
||||||
chks = chks[:0]
|
chks = chks[:0]
|
||||||
for chksIter.Next() {
|
for chksIter.Next() {
|
||||||
|
@ -780,17 +864,22 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := chunkw.WriteChunks(chks...); err != nil {
|
obIx := uint64(0)
|
||||||
|
if len(outBlocks) > 1 {
|
||||||
|
obIx = s.Labels().Hash() % uint64(len(outBlocks))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := outBlocks[obIx].chunkw.WriteChunks(chks...); err != nil {
|
||||||
return errors.Wrap(err, "write chunks")
|
return errors.Wrap(err, "write chunks")
|
||||||
}
|
}
|
||||||
if err := indexw.AddSeries(ref, s.Labels(), chks...); err != nil {
|
if err := outBlocks[obIx].indexw.AddSeries(refs[obIx], s.Labels(), chks...); err != nil {
|
||||||
return errors.Wrap(err, "add series")
|
return errors.Wrap(err, "add series")
|
||||||
}
|
}
|
||||||
|
|
||||||
meta.Stats.NumChunks += uint64(len(chks))
|
outBlocks[obIx].meta.Stats.NumChunks += uint64(len(chks))
|
||||||
meta.Stats.NumSeries++
|
outBlocks[obIx].meta.Stats.NumSeries++
|
||||||
for _, chk := range chks {
|
for _, chk := range chks {
|
||||||
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
|
outBlocks[obIx].meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, chk := range chks {
|
for _, chk := range chks {
|
||||||
|
@ -798,7 +887,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
return errors.Wrap(err, "put chunk")
|
return errors.Wrap(err, "put chunk")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ref++
|
refs[obIx]++
|
||||||
}
|
}
|
||||||
if set.Err() != nil {
|
if set.Err() != nil {
|
||||||
return errors.Wrap(set.Err(), "iterate compaction set")
|
return errors.Wrap(set.Err(), "iterate compaction set")
|
||||||
|
|
|
@ -15,6 +15,7 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
|
@ -25,6 +26,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -33,6 +35,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/index"
|
||||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -440,9 +443,24 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
|
||||||
require.NoError(t, os.RemoveAll(tmpdir))
|
require.NoError(t, os.RemoveAll(tmpdir))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
require.Error(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{}))
|
shardedBlocks := []shardedBlock{
|
||||||
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix)
|
{meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), rand.Reader)}},
|
||||||
require.True(t, os.IsNotExist(err), "directory is not cleaned up")
|
{meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), rand.Reader)}},
|
||||||
|
{meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), rand.Reader)}},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Error(t, compactor.write(tmpdir, shardedBlocks, erringBReader{}))
|
||||||
|
|
||||||
|
// We rely on the fact that blockDir and tmpDir will be updated by compactor.write.
|
||||||
|
for _, b := range shardedBlocks {
|
||||||
|
require.NotEmpty(t, b.tmpDir)
|
||||||
|
_, err = os.Stat(b.tmpDir)
|
||||||
|
require.True(t, os.IsNotExist(err), "tmp directory is not cleaned up")
|
||||||
|
|
||||||
|
require.NotEmpty(t, b.blockDir)
|
||||||
|
_, err = os.Stat(b.blockDir)
|
||||||
|
require.True(t, os.IsNotExist(err), "block directory is not cleaned up")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta {
|
func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta {
|
||||||
|
@ -484,6 +502,163 @@ func samplesForRange(minTime, maxTime int64, maxSamplesPerChunk int) (ret [][]sa
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCompaction_CompactWithSplitting(t *testing.T) {
|
||||||
|
seriesCounts := []int{10, 1234}
|
||||||
|
shardCounts := []uint64{1, 13}
|
||||||
|
|
||||||
|
for _, series := range seriesCounts {
|
||||||
|
dir, err := ioutil.TempDir("", "compact")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
|
ranges := [][2]int64{{0, 5000}, {3000, 8000}, {6000, 11000}, {9000, 14000}}
|
||||||
|
|
||||||
|
// Generate blocks.
|
||||||
|
var blockDirs []string
|
||||||
|
var openBlocks []*Block
|
||||||
|
|
||||||
|
for _, r := range ranges {
|
||||||
|
block, err := OpenBlock(nil, createBlock(t, dir, genSeries(series, 10, r[0], r[1])), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, block.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
openBlocks = append(openBlocks, block)
|
||||||
|
blockDirs = append(blockDirs, block.Dir())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, shardCount := range shardCounts {
|
||||||
|
t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) {
|
||||||
|
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount)
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, shardCount, uint64(len(blockIDs)))
|
||||||
|
|
||||||
|
// Verify resulting blocks. We will iterate over all series in all blocks, and check two things:
|
||||||
|
// 1) Make sure that each series in the block belongs to the block (based on sharding).
|
||||||
|
// 2) Verify that total number of series over all blocks is correct.
|
||||||
|
totalSeries := uint64(0)
|
||||||
|
|
||||||
|
ts := uint64(0)
|
||||||
|
for shardIndex, blockID := range blockIDs {
|
||||||
|
// Some blocks may be empty, they will have zero block ID.
|
||||||
|
if blockID == (ulid.ULID{}) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// All blocks have the same timestamp.
|
||||||
|
if ts == 0 {
|
||||||
|
ts = blockID.Time()
|
||||||
|
} else {
|
||||||
|
require.Equal(t, ts, blockID.Time())
|
||||||
|
}
|
||||||
|
|
||||||
|
block, err := OpenBlock(log.NewNopLogger(), filepath.Join(dir, blockID.String()), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, block.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
totalSeries += block.Meta().Stats.NumSeries
|
||||||
|
|
||||||
|
idxr, err := block.Index()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, idxr.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
k, v := index.AllPostingsKey()
|
||||||
|
p, err := idxr.Postings(k, v)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var lbls labels.Labels
|
||||||
|
for p.Next() {
|
||||||
|
ref := p.At()
|
||||||
|
require.NoError(t, idxr.Series(ref, &lbls, nil))
|
||||||
|
|
||||||
|
require.Equal(t, uint64(shardIndex), lbls.Hash()%shardCount)
|
||||||
|
}
|
||||||
|
require.NoError(t, p.Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, uint64(series), totalSeries)
|
||||||
|
|
||||||
|
// Source blocks are *not* deletable.
|
||||||
|
for _, b := range openBlocks {
|
||||||
|
require.False(t, b.meta.Compaction.Deletable)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompaction_CompactEmptyBlocks(t *testing.T) {
|
||||||
|
dir, err := ioutil.TempDir("", "compact")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
|
ranges := [][2]int64{{0, 5000}, {3000, 8000}, {6000, 11000}, {9000, 14000}}
|
||||||
|
|
||||||
|
// Generate blocks.
|
||||||
|
var blockDirs []string
|
||||||
|
|
||||||
|
for _, r := range ranges {
|
||||||
|
// Generate blocks using index and chunk writer. CreateBlock would not return valid block for 0 series.
|
||||||
|
id := ulid.MustNew(ulid.Now(), rand.Reader)
|
||||||
|
m := &BlockMeta{
|
||||||
|
ULID: id,
|
||||||
|
MinTime: r[0],
|
||||||
|
MaxTime: r[1],
|
||||||
|
Compaction: BlockMetaCompaction{Level: 1, Sources: []ulid.ULID{id}},
|
||||||
|
Version: metaVersion1,
|
||||||
|
}
|
||||||
|
|
||||||
|
bdir := filepath.Join(dir, id.String())
|
||||||
|
require.NoError(t, os.Mkdir(bdir, 0777))
|
||||||
|
require.NoError(t, os.Mkdir(chunkDir(bdir), 0777))
|
||||||
|
|
||||||
|
_, err := writeMetaFile(log.NewNopLogger(), bdir, m)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
iw, err := index.NewWriter(context.Background(), filepath.Join(bdir, indexFilename))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, iw.AddSymbol("hello"))
|
||||||
|
require.NoError(t, iw.AddSymbol("world"))
|
||||||
|
require.NoError(t, iw.Close())
|
||||||
|
|
||||||
|
blockDirs = append(blockDirs, bdir)
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// There are no output blocks.
|
||||||
|
for _, b := range blockIDs {
|
||||||
|
require.Equal(t, ulid.ULID{}, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// All source blocks are now marked for deletion.
|
||||||
|
for _, b := range blockDirs {
|
||||||
|
meta, _, err := readMetaFile(b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, meta.Compaction.Deletable)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCompaction_populateBlock(t *testing.T) {
|
func TestCompaction_populateBlock(t *testing.T) {
|
||||||
for _, tc := range []struct {
|
for _, tc := range []struct {
|
||||||
title string
|
title string
|
||||||
|
@ -496,7 +671,7 @@ func TestCompaction_populateBlock(t *testing.T) {
|
||||||
{
|
{
|
||||||
title: "Populate block from empty input should return error.",
|
title: "Populate block from empty input should return error.",
|
||||||
inputSeriesSamples: [][]seriesSamples{},
|
inputSeriesSamples: [][]seriesSamples{},
|
||||||
expErr: errors.New("cannot populate block from no readers"),
|
expErr: errors.New("cannot populate block(s) from no readers"),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// Populate from single block without chunks. We expect these kind of series being ignored.
|
// Populate from single block without chunks. We expect these kind of series being ignored.
|
||||||
|
@ -952,7 +1127,8 @@ func TestCompaction_populateBlock(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
iw := &mockIndexWriter{}
|
iw := &mockIndexWriter{}
|
||||||
err = c.populateBlock(blocks, meta, iw, nopChunkWriter{})
|
ob := shardedBlock{meta: meta, indexw: iw, chunkw: nopChunkWriter{}}
|
||||||
|
err = c.populateBlock(blocks, meta.MinTime, meta.MaxTime, []shardedBlock{ob})
|
||||||
if tc.expErr != nil {
|
if tc.expErr != nil {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Equal(t, tc.expErr.Error(), err.Error())
|
require.Equal(t, tc.expErr.Error(), err.Error())
|
||||||
|
|
Loading…
Reference in a new issue