diff --git a/tsdb/compact.go b/tsdb/compact.go index b2ae7e4ea5..c8fa5b4ce6 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -388,9 +388,40 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { return res } +// CompactWithSplitting merges and splits the input blocks into shardCount number of output blocks, +// and returns slice of block IDs. Position of returned block ID in the result slice corresponds to the shard index. +// If given output block has no series, corresponding block ID will be zero ULID value. +func (c *LeveledCompactor) CompactWithSplitting(dest string, dirs []string, open []*Block, shardCount uint64) (result []ulid.ULID, _ error) { + return c.compact(dest, dirs, open, shardCount) +} + // Compact creates a new block in the compactor's directory from the blocks in the // provided directories. func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { + ulids, err := c.compact(dest, dirs, open, 1) + if err != nil { + return ulid.ULID{}, err + } + return ulids[0], nil +} + +// shardedBlock describes single *output* block during compaction. This struct is passed between +// compaction methods to wrap output block details, index and chunk writer together. +// Shard index is determined by the position of this structure in the slice of output blocks. +type shardedBlock struct { + meta *BlockMeta + + blockDir string + tmpDir string // Temp directory used when block is being built (= blockDir + temp suffix) + chunkw ChunkWriter + indexw IndexWriter +} + +func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, shardCount uint64) (_ []ulid.ULID, err error) { + if shardCount == 0 { + shardCount = 1 + } + var ( blocks []BlockReader bs []*Block @@ -402,7 +433,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u for _, d := range dirs { meta, _, err := readMetaFile(d) if err != nil { - return uid, err + return nil, err } var b *Block @@ -420,7 +451,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u var err error b, err = OpenBlock(c.logger, d, c.chunkPool) if err != nil { - return uid, err + return nil, err } defer b.Close() } @@ -431,42 +462,58 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u uids = append(uids, meta.ULID.String()) } - uid = ulid.MustNew(ulid.Now(), rand.Reader) - - meta := CompactBlockMetas(uid, metas...) - err = c.write(dest, meta, blocks...) - if err == nil { - if meta.Stats.NumSamples == 0 { - for _, b := range bs { - b.meta.Compaction.Deletable = true - n, err := writeMetaFile(c.logger, b.dir, &b.meta) - if err != nil { - level.Error(c.logger).Log( - "msg", "Failed to write 'Deletable' to meta file after compaction", - "ulid", b.meta.ULID, - ) - } - b.numBytesMeta = n - } - uid = ulid.ULID{} - level.Info(c.logger).Log( - "msg", "compact blocks resulted in empty block", - "count", len(blocks), - "sources", fmt.Sprintf("%v", uids), - "duration", time.Since(start), - ) - } else { - level.Info(c.logger).Log( - "msg", "compact blocks", - "count", len(blocks), - "mint", meta.MinTime, - "maxt", meta.MaxTime, - "ulid", meta.ULID, - "sources", fmt.Sprintf("%v", uids), - "duration", time.Since(start), - ) + outBlocks := make([]shardedBlock, shardCount, shardCount) + for ix := range outBlocks { + uid := ulid.MustNew(ulid.Now(), rand.Reader) + outBlocks[ix] = shardedBlock{ + meta: CompactBlockMetas(uid, metas...), } - return uid, nil + } + + err = c.write(dest, outBlocks, blocks...) + if err == nil { + ulids := make([]ulid.ULID, len(outBlocks), len(outBlocks)) + + for ix := range outBlocks { + meta := outBlocks[ix].meta + + if meta.Stats.NumSamples == 0 { + for _, b := range bs { + b.meta.Compaction.Deletable = true + n, err := writeMetaFile(c.logger, b.dir, &b.meta) + if err != nil { + level.Error(c.logger).Log( + "msg", "Failed to write 'Deletable' to meta file after compaction", + "ulid", b.meta.ULID, + "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), + ) + } + b.numBytesMeta = n + } + outBlocks[ix].meta.ULID = ulid.ULID{} + level.Info(c.logger).Log( + "msg", "compact blocks resulted in empty block", + "count", len(blocks), + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), + ) + } else { + level.Info(c.logger).Log( + "msg", "compact blocks", + "count", len(blocks), + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), + ) + } + + ulids[ix] = outBlocks[ix].meta.ULID + } + return ulids, nil } errs := tsdb_errors.NewMulti(err) @@ -478,7 +525,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u } } - return uid, errs.Err() + return nil, errs.Err() } func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { @@ -500,7 +547,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p } } - err := c.write(dest, meta, b) + err := c.write(dest, []shardedBlock{{meta: meta}}, b) if err != nil { return uid, err } @@ -544,63 +591,79 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { return w.ChunkWriter.WriteChunks(chunks...) } -// write creates a new block that is the union of the provided blocks into dir. -func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { - dir := filepath.Join(dest, meta.ULID.String()) - tmp := dir + tmpForCreationBlockDirSuffix +// write creates new output blocks that are the union of the provided blocks into dir. +func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks ...BlockReader) (err error) { var closers []io.Closer + defer func(t time.Time) { err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err() // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. - if err := os.RemoveAll(tmp); err != nil { - level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + for ix := 0; ix < len(outBlocks); ix++ { + if outBlocks[ix].tmpDir == "" { + continue + } + + if err := os.RemoveAll(outBlocks[ix].tmpDir); err != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + } } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) - if err = os.RemoveAll(tmp); err != nil { - return err - } + for ix := range outBlocks { + dir := filepath.Join(dest, outBlocks[ix].meta.ULID.String()) + tmp := dir + tmpForCreationBlockDirSuffix - if err = os.MkdirAll(tmp, 0777); err != nil { - return err - } + outBlocks[ix].blockDir = dir + outBlocks[ix].tmpDir = tmp - // Populate chunk and index files into temporary directory with - // data of all blocks. - var chunkw ChunkWriter - - chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize) - if err != nil { - return errors.Wrap(err, "open chunk writer") - } - closers = append(closers, chunkw) - // Record written chunk sizes on level 1 compactions. - if meta.Compaction.Level == 1 { - chunkw = &instrumentedChunkWriter{ - ChunkWriter: chunkw, - size: c.metrics.chunkSize, - samples: c.metrics.chunkSamples, - trange: c.metrics.chunkRange, + if err = os.RemoveAll(tmp); err != nil { + return err } + + if err = os.MkdirAll(tmp, 0777); err != nil { + return err + } + + // Populate chunk and index files into temporary directory with + // data of all blocks. + var chunkw ChunkWriter + chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize) + if err != nil { + return errors.Wrap(err, "open chunk writer") + } + + closers = append(closers, chunkw) + + // Record written chunk sizes on level 1 compactions. + if outBlocks[ix].meta.Compaction.Level == 1 { + chunkw = &instrumentedChunkWriter{ + ChunkWriter: chunkw, + size: c.metrics.chunkSize, + samples: c.metrics.chunkSamples, + trange: c.metrics.chunkRange, + } + } + + outBlocks[ix].chunkw = chunkw + + indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) + if err != nil { + return errors.Wrap(err, "open index writer") + } + closers = append(closers, indexw) + + outBlocks[ix].indexw = indexw } - indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) - if err != nil { - return errors.Wrap(err, "open index writer") - } - closers = append(closers, indexw) - - if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { + if err := c.populateBlock(blocks, outBlocks[0].meta.MinTime, outBlocks[0].meta.MaxTime, outBlocks); err != nil { return errors.Wrap(err, "populate block") } - select { - case <-c.ctx.Done(): - return c.ctx.Err() - default: + if err := c.ctx.Err(); err != nil { + return err } // We are explicitly closing them here to check for error even @@ -616,54 +679,59 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errs.Err() } - // Populated block is empty, so exit early. - if meta.Stats.NumSamples == 0 { - return nil - } - - if _, err = writeMetaFile(c.logger, tmp, meta); err != nil { - return errors.Wrap(err, "write merged meta") - } - - // Create an empty tombstones file. - if _, err := tombstones.WriteFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil { - return errors.Wrap(err, "write new tombstones file") - } - - df, err := fileutil.OpenDir(tmp) - if err != nil { - return errors.Wrap(err, "open temporary block dir") - } - defer func() { - if df != nil { - df.Close() + for ix := range outBlocks { + ob := outBlocks[ix] + // Populated block is empty, don't write meta file for it. + if ob.meta.Stats.NumSamples == 0 { + continue } - }() - if err := df.Sync(); err != nil { - return errors.Wrap(err, "sync temporary dir file") - } + if _, err = writeMetaFile(c.logger, ob.tmpDir, ob.meta); err != nil { + return errors.Wrap(err, "write merged meta") + } - // Close temp dir before rename block dir (for windows platform). - if err = df.Close(); err != nil { - return errors.Wrap(err, "close temporary dir") - } - df = nil + // Create an empty tombstones file. + if _, err := tombstones.WriteFile(c.logger, ob.tmpDir, tombstones.NewMemTombstones()); err != nil { + return errors.Wrap(err, "write new tombstones file") + } - // Block successfully written, make it visible in destination dir by moving it from tmp one. - if err := fileutil.Replace(tmp, dir); err != nil { - return errors.Wrap(err, "rename block dir") + df, err := fileutil.OpenDir(ob.tmpDir) + if err != nil { + return errors.Wrap(err, "open temporary block dir") + } + defer func() { + if df != nil { + _ = df.Close() + } + }() + + if err := df.Sync(); err != nil { + return errors.Wrap(err, "sync temporary dir file") + } + + // Close temp dir before rename block dir (for windows platform). + if err = df.Close(); err != nil { + return errors.Wrap(err, "close temporary dir") + } + df = nil + + // Block successfully written, make it visible in destination dir by moving it from tmp one. + if err := fileutil.Replace(ob.tmpDir, ob.blockDir); err != nil { + return errors.Wrap(err, "rename block dir") + } } return nil } -// populateBlock fills the index and chunk writers with new data gathered as the union -// of the provided blocks. It returns meta information for the new block. +// populateBlock fills the index and chunk writers of output blocks with new data gathered as the union +// of the provided blocks. // It expects sorted blocks input by mint. -func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { +// If there is more than 1 output block, each output block will only contain series that hash into its shard +// (based on total number of output blocks). +func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) { if len(blocks) == 0 { - return errors.New("cannot populate block from no readers") + return errors.New("cannot populate block(s) from no readers") } var ( @@ -694,7 +762,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, if i > 0 && b.Meta().MinTime < globalMaxt { c.metrics.overlappingBlocks.Inc() overlapping = true - level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction", "ulid", meta.ULID) + level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction" /* "ulid", meta.ULID */) } if b.Meta().MaxTime > globalMaxt { globalMaxt = b.Meta().MaxTime @@ -726,7 +794,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } all = indexr.SortedPostings(all) // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. - sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1)) + sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) syms := indexr.Symbols() if i == 0 { symbols = syms @@ -736,8 +804,10 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for symbols.Next() { - if err := indexw.AddSymbol(symbols.At()); err != nil { - return errors.Wrap(err, "add symbol") + for ix := range outBlocks { + if err := outBlocks[ix].indexw.AddSymbol(symbols.At()); err != nil { + return errors.Wrap(err, "add symbol") + } } } if symbols.Err() != nil { @@ -764,6 +834,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, default: } s := set.At() + chksIter := s.Iterator() chks = chks[:0] for chksIter.Next() { @@ -780,17 +851,22 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, continue } - if err := chunkw.WriteChunks(chks...); err != nil { + ob := outBlocks[0] + if len(outBlocks) > 1 { + ob = outBlocks[s.Labels().Hash()%uint64(len(outBlocks))] + } + + if err := ob.chunkw.WriteChunks(chks...); err != nil { return errors.Wrap(err, "write chunks") } - if err := indexw.AddSeries(ref, s.Labels(), chks...); err != nil { + if err := ob.indexw.AddSeries(ref, s.Labels(), chks...); err != nil { return errors.Wrap(err, "add series") } - meta.Stats.NumChunks += uint64(len(chks)) - meta.Stats.NumSeries++ + ob.meta.Stats.NumChunks += uint64(len(chks)) + ob.meta.Stats.NumSeries++ for _, chk := range chks { - meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) + ob.meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) } for _, chk := range chks { diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index e30f2b190f..3a6a1c8f11 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/oklog/ulid" "github.com/pkg/errors" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" @@ -33,6 +34,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" ) @@ -440,7 +442,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { require.NoError(t, os.RemoveAll(tmpdir)) }() - require.Error(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{})) + require.Error(t, compactor.write(tmpdir, []shardedBlock{{meta: &BlockMeta{}}}, erringBReader{})) _, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix) require.True(t, os.IsNotExist(err), "directory is not cleaned up") } @@ -484,6 +486,91 @@ func samplesForRange(minTime, maxTime int64, maxSamplesPerChunk int) (ret [][]sa return ret } +func TestCompaction_CompactWithSplitting(t *testing.T) { + seriesCounts := []int{10, 1234} + shardCounts := []uint64{1, 13} + + for _, series := range seriesCounts { + dir, err := ioutil.TempDir("", "compact") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + + ranges := [][2]int64{{0, 5000}, {3000, 8000}, {6000, 11000}, {9000, 14000}} + + // Generate blocks. + var blockDirs []string + var openBlocks []*Block + + for _, r := range ranges { + block, err := OpenBlock(nil, createBlock(t, dir, genSeries(series, 10, r[0], r[1])), nil) + require.NoError(t, err) + defer func() { + require.NoError(t, block.Close()) + }() + + openBlocks = append(openBlocks, block) + blockDirs = append(blockDirs, block.Dir()) + } + + for _, shardCount := range shardCounts { + t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) { + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) + require.NoError(t, err) + + blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount) + + require.NoError(t, err) + require.Equal(t, shardCount, uint64(len(blockIDs))) + + // Verify resulting blocks. We will iterate over all series in all blocks, and check two things: + // 1) Make sure that each series in the block belongs to the block (based on sharding). + // 2) Verify that total number of series over all blocks is correct. + totalSeries := uint64(0) + + for shardIndex, blockID := range blockIDs { + // Some blocks may be empty, they will have zero block ID. + if blockID == (ulid.ULID{}) { + continue + } + + block, err := OpenBlock(log.NewNopLogger(), filepath.Join(dir, blockID.String()), nil) + require.NoError(t, err) + + defer func() { + require.NoError(t, block.Close()) + }() + + totalSeries += block.Meta().Stats.NumSeries + + idxr, err := block.Index() + require.NoError(t, err) + + defer func() { + require.NoError(t, idxr.Close()) + }() + + k, v := index.AllPostingsKey() + p, err := idxr.Postings(k, v) + require.NoError(t, err) + + var lbls labels.Labels + for p.Next() { + ref := p.At() + require.NoError(t, idxr.Series(ref, &lbls, nil)) + + require.Equal(t, uint64(shardIndex), lbls.Hash()%shardCount) + } + require.NoError(t, p.Err()) + } + + require.Equal(t, uint64(series), totalSeries) + }) + } + } +} + func TestCompaction_populateBlock(t *testing.T) { for _, tc := range []struct { title string @@ -496,7 +583,7 @@ func TestCompaction_populateBlock(t *testing.T) { { title: "Populate block from empty input should return error.", inputSeriesSamples: [][]seriesSamples{}, - expErr: errors.New("cannot populate block from no readers"), + expErr: errors.New("cannot populate block(s) from no readers"), }, { // Populate from single block without chunks. We expect these kind of series being ignored. @@ -952,7 +1039,8 @@ func TestCompaction_populateBlock(t *testing.T) { } iw := &mockIndexWriter{} - err = c.populateBlock(blocks, meta, iw, nopChunkWriter{}) + ob := shardedBlock{meta: meta, indexw: iw, chunkw: nopChunkWriter{}} + err = c.populateBlock(blocks, meta.MinTime, meta.MaxTime, []shardedBlock{ob}) if tc.expErr != nil { require.Error(t, err) require.Equal(t, tc.expErr.Error(), err.Error())