From 78396b67dd5abc55ad2276f132c710f60bc14674 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 21 Sep 2021 11:01:26 +0200 Subject: [PATCH 1/9] Compactor with support for splitting input blocks into multiple output blocks. --- tsdb/compact.go | 324 ++++++++++++++++++++++++++----------------- tsdb/compact_test.go | 94 ++++++++++++- 2 files changed, 291 insertions(+), 127 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index b2ae7e4ea5..c8fa5b4ce6 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -388,9 +388,40 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { return res } +// CompactWithSplitting merges and splits the input blocks into shardCount number of output blocks, +// and returns slice of block IDs. Position of returned block ID in the result slice corresponds to the shard index. +// If given output block has no series, corresponding block ID will be zero ULID value. +func (c *LeveledCompactor) CompactWithSplitting(dest string, dirs []string, open []*Block, shardCount uint64) (result []ulid.ULID, _ error) { + return c.compact(dest, dirs, open, shardCount) +} + // Compact creates a new block in the compactor's directory from the blocks in the // provided directories. func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { + ulids, err := c.compact(dest, dirs, open, 1) + if err != nil { + return ulid.ULID{}, err + } + return ulids[0], nil +} + +// shardedBlock describes single *output* block during compaction. This struct is passed between +// compaction methods to wrap output block details, index and chunk writer together. +// Shard index is determined by the position of this structure in the slice of output blocks. +type shardedBlock struct { + meta *BlockMeta + + blockDir string + tmpDir string // Temp directory used when block is being built (= blockDir + temp suffix) + chunkw ChunkWriter + indexw IndexWriter +} + +func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, shardCount uint64) (_ []ulid.ULID, err error) { + if shardCount == 0 { + shardCount = 1 + } + var ( blocks []BlockReader bs []*Block @@ -402,7 +433,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u for _, d := range dirs { meta, _, err := readMetaFile(d) if err != nil { - return uid, err + return nil, err } var b *Block @@ -420,7 +451,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u var err error b, err = OpenBlock(c.logger, d, c.chunkPool) if err != nil { - return uid, err + return nil, err } defer b.Close() } @@ -431,42 +462,58 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u uids = append(uids, meta.ULID.String()) } - uid = ulid.MustNew(ulid.Now(), rand.Reader) - - meta := CompactBlockMetas(uid, metas...) - err = c.write(dest, meta, blocks...) - if err == nil { - if meta.Stats.NumSamples == 0 { - for _, b := range bs { - b.meta.Compaction.Deletable = true - n, err := writeMetaFile(c.logger, b.dir, &b.meta) - if err != nil { - level.Error(c.logger).Log( - "msg", "Failed to write 'Deletable' to meta file after compaction", - "ulid", b.meta.ULID, - ) - } - b.numBytesMeta = n - } - uid = ulid.ULID{} - level.Info(c.logger).Log( - "msg", "compact blocks resulted in empty block", - "count", len(blocks), - "sources", fmt.Sprintf("%v", uids), - "duration", time.Since(start), - ) - } else { - level.Info(c.logger).Log( - "msg", "compact blocks", - "count", len(blocks), - "mint", meta.MinTime, - "maxt", meta.MaxTime, - "ulid", meta.ULID, - "sources", fmt.Sprintf("%v", uids), - "duration", time.Since(start), - ) + outBlocks := make([]shardedBlock, shardCount, shardCount) + for ix := range outBlocks { + uid := ulid.MustNew(ulid.Now(), rand.Reader) + outBlocks[ix] = shardedBlock{ + meta: CompactBlockMetas(uid, metas...), } - return uid, nil + } + + err = c.write(dest, outBlocks, blocks...) + if err == nil { + ulids := make([]ulid.ULID, len(outBlocks), len(outBlocks)) + + for ix := range outBlocks { + meta := outBlocks[ix].meta + + if meta.Stats.NumSamples == 0 { + for _, b := range bs { + b.meta.Compaction.Deletable = true + n, err := writeMetaFile(c.logger, b.dir, &b.meta) + if err != nil { + level.Error(c.logger).Log( + "msg", "Failed to write 'Deletable' to meta file after compaction", + "ulid", b.meta.ULID, + "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), + ) + } + b.numBytesMeta = n + } + outBlocks[ix].meta.ULID = ulid.ULID{} + level.Info(c.logger).Log( + "msg", "compact blocks resulted in empty block", + "count", len(blocks), + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), + ) + } else { + level.Info(c.logger).Log( + "msg", "compact blocks", + "count", len(blocks), + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), + ) + } + + ulids[ix] = outBlocks[ix].meta.ULID + } + return ulids, nil } errs := tsdb_errors.NewMulti(err) @@ -478,7 +525,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u } } - return uid, errs.Err() + return nil, errs.Err() } func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { @@ -500,7 +547,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p } } - err := c.write(dest, meta, b) + err := c.write(dest, []shardedBlock{{meta: meta}}, b) if err != nil { return uid, err } @@ -544,63 +591,79 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { return w.ChunkWriter.WriteChunks(chunks...) } -// write creates a new block that is the union of the provided blocks into dir. -func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { - dir := filepath.Join(dest, meta.ULID.String()) - tmp := dir + tmpForCreationBlockDirSuffix +// write creates new output blocks that are the union of the provided blocks into dir. +func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks ...BlockReader) (err error) { var closers []io.Closer + defer func(t time.Time) { err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err() // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. - if err := os.RemoveAll(tmp); err != nil { - level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + for ix := 0; ix < len(outBlocks); ix++ { + if outBlocks[ix].tmpDir == "" { + continue + } + + if err := os.RemoveAll(outBlocks[ix].tmpDir); err != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + } } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) - if err = os.RemoveAll(tmp); err != nil { - return err - } + for ix := range outBlocks { + dir := filepath.Join(dest, outBlocks[ix].meta.ULID.String()) + tmp := dir + tmpForCreationBlockDirSuffix - if err = os.MkdirAll(tmp, 0777); err != nil { - return err - } + outBlocks[ix].blockDir = dir + outBlocks[ix].tmpDir = tmp - // Populate chunk and index files into temporary directory with - // data of all blocks. - var chunkw ChunkWriter - - chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize) - if err != nil { - return errors.Wrap(err, "open chunk writer") - } - closers = append(closers, chunkw) - // Record written chunk sizes on level 1 compactions. - if meta.Compaction.Level == 1 { - chunkw = &instrumentedChunkWriter{ - ChunkWriter: chunkw, - size: c.metrics.chunkSize, - samples: c.metrics.chunkSamples, - trange: c.metrics.chunkRange, + if err = os.RemoveAll(tmp); err != nil { + return err } + + if err = os.MkdirAll(tmp, 0777); err != nil { + return err + } + + // Populate chunk and index files into temporary directory with + // data of all blocks. + var chunkw ChunkWriter + chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize) + if err != nil { + return errors.Wrap(err, "open chunk writer") + } + + closers = append(closers, chunkw) + + // Record written chunk sizes on level 1 compactions. + if outBlocks[ix].meta.Compaction.Level == 1 { + chunkw = &instrumentedChunkWriter{ + ChunkWriter: chunkw, + size: c.metrics.chunkSize, + samples: c.metrics.chunkSamples, + trange: c.metrics.chunkRange, + } + } + + outBlocks[ix].chunkw = chunkw + + indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) + if err != nil { + return errors.Wrap(err, "open index writer") + } + closers = append(closers, indexw) + + outBlocks[ix].indexw = indexw } - indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) - if err != nil { - return errors.Wrap(err, "open index writer") - } - closers = append(closers, indexw) - - if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { + if err := c.populateBlock(blocks, outBlocks[0].meta.MinTime, outBlocks[0].meta.MaxTime, outBlocks); err != nil { return errors.Wrap(err, "populate block") } - select { - case <-c.ctx.Done(): - return c.ctx.Err() - default: + if err := c.ctx.Err(); err != nil { + return err } // We are explicitly closing them here to check for error even @@ -616,54 +679,59 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errs.Err() } - // Populated block is empty, so exit early. - if meta.Stats.NumSamples == 0 { - return nil - } - - if _, err = writeMetaFile(c.logger, tmp, meta); err != nil { - return errors.Wrap(err, "write merged meta") - } - - // Create an empty tombstones file. - if _, err := tombstones.WriteFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil { - return errors.Wrap(err, "write new tombstones file") - } - - df, err := fileutil.OpenDir(tmp) - if err != nil { - return errors.Wrap(err, "open temporary block dir") - } - defer func() { - if df != nil { - df.Close() + for ix := range outBlocks { + ob := outBlocks[ix] + // Populated block is empty, don't write meta file for it. + if ob.meta.Stats.NumSamples == 0 { + continue } - }() - if err := df.Sync(); err != nil { - return errors.Wrap(err, "sync temporary dir file") - } + if _, err = writeMetaFile(c.logger, ob.tmpDir, ob.meta); err != nil { + return errors.Wrap(err, "write merged meta") + } - // Close temp dir before rename block dir (for windows platform). - if err = df.Close(); err != nil { - return errors.Wrap(err, "close temporary dir") - } - df = nil + // Create an empty tombstones file. + if _, err := tombstones.WriteFile(c.logger, ob.tmpDir, tombstones.NewMemTombstones()); err != nil { + return errors.Wrap(err, "write new tombstones file") + } - // Block successfully written, make it visible in destination dir by moving it from tmp one. - if err := fileutil.Replace(tmp, dir); err != nil { - return errors.Wrap(err, "rename block dir") + df, err := fileutil.OpenDir(ob.tmpDir) + if err != nil { + return errors.Wrap(err, "open temporary block dir") + } + defer func() { + if df != nil { + _ = df.Close() + } + }() + + if err := df.Sync(); err != nil { + return errors.Wrap(err, "sync temporary dir file") + } + + // Close temp dir before rename block dir (for windows platform). + if err = df.Close(); err != nil { + return errors.Wrap(err, "close temporary dir") + } + df = nil + + // Block successfully written, make it visible in destination dir by moving it from tmp one. + if err := fileutil.Replace(ob.tmpDir, ob.blockDir); err != nil { + return errors.Wrap(err, "rename block dir") + } } return nil } -// populateBlock fills the index and chunk writers with new data gathered as the union -// of the provided blocks. It returns meta information for the new block. +// populateBlock fills the index and chunk writers of output blocks with new data gathered as the union +// of the provided blocks. // It expects sorted blocks input by mint. -func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { +// If there is more than 1 output block, each output block will only contain series that hash into its shard +// (based on total number of output blocks). +func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) { if len(blocks) == 0 { - return errors.New("cannot populate block from no readers") + return errors.New("cannot populate block(s) from no readers") } var ( @@ -694,7 +762,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, if i > 0 && b.Meta().MinTime < globalMaxt { c.metrics.overlappingBlocks.Inc() overlapping = true - level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction", "ulid", meta.ULID) + level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction" /* "ulid", meta.ULID */) } if b.Meta().MaxTime > globalMaxt { globalMaxt = b.Meta().MaxTime @@ -726,7 +794,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } all = indexr.SortedPostings(all) // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. - sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1)) + sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1)) syms := indexr.Symbols() if i == 0 { symbols = syms @@ -736,8 +804,10 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for symbols.Next() { - if err := indexw.AddSymbol(symbols.At()); err != nil { - return errors.Wrap(err, "add symbol") + for ix := range outBlocks { + if err := outBlocks[ix].indexw.AddSymbol(symbols.At()); err != nil { + return errors.Wrap(err, "add symbol") + } } } if symbols.Err() != nil { @@ -764,6 +834,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, default: } s := set.At() + chksIter := s.Iterator() chks = chks[:0] for chksIter.Next() { @@ -780,17 +851,22 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, continue } - if err := chunkw.WriteChunks(chks...); err != nil { + ob := outBlocks[0] + if len(outBlocks) > 1 { + ob = outBlocks[s.Labels().Hash()%uint64(len(outBlocks))] + } + + if err := ob.chunkw.WriteChunks(chks...); err != nil { return errors.Wrap(err, "write chunks") } - if err := indexw.AddSeries(ref, s.Labels(), chks...); err != nil { + if err := ob.indexw.AddSeries(ref, s.Labels(), chks...); err != nil { return errors.Wrap(err, "add series") } - meta.Stats.NumChunks += uint64(len(chks)) - meta.Stats.NumSeries++ + ob.meta.Stats.NumChunks += uint64(len(chks)) + ob.meta.Stats.NumSeries++ for _, chk := range chks { - meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) + ob.meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) } for _, chk := range chks { diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index e30f2b190f..3a6a1c8f11 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/oklog/ulid" "github.com/pkg/errors" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" @@ -33,6 +34,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" ) @@ -440,7 +442,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { require.NoError(t, os.RemoveAll(tmpdir)) }() - require.Error(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{})) + require.Error(t, compactor.write(tmpdir, []shardedBlock{{meta: &BlockMeta{}}}, erringBReader{})) _, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix) require.True(t, os.IsNotExist(err), "directory is not cleaned up") } @@ -484,6 +486,91 @@ func samplesForRange(minTime, maxTime int64, maxSamplesPerChunk int) (ret [][]sa return ret } +func TestCompaction_CompactWithSplitting(t *testing.T) { + seriesCounts := []int{10, 1234} + shardCounts := []uint64{1, 13} + + for _, series := range seriesCounts { + dir, err := ioutil.TempDir("", "compact") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + + ranges := [][2]int64{{0, 5000}, {3000, 8000}, {6000, 11000}, {9000, 14000}} + + // Generate blocks. + var blockDirs []string + var openBlocks []*Block + + for _, r := range ranges { + block, err := OpenBlock(nil, createBlock(t, dir, genSeries(series, 10, r[0], r[1])), nil) + require.NoError(t, err) + defer func() { + require.NoError(t, block.Close()) + }() + + openBlocks = append(openBlocks, block) + blockDirs = append(blockDirs, block.Dir()) + } + + for _, shardCount := range shardCounts { + t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) { + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) + require.NoError(t, err) + + blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount) + + require.NoError(t, err) + require.Equal(t, shardCount, uint64(len(blockIDs))) + + // Verify resulting blocks. We will iterate over all series in all blocks, and check two things: + // 1) Make sure that each series in the block belongs to the block (based on sharding). + // 2) Verify that total number of series over all blocks is correct. + totalSeries := uint64(0) + + for shardIndex, blockID := range blockIDs { + // Some blocks may be empty, they will have zero block ID. + if blockID == (ulid.ULID{}) { + continue + } + + block, err := OpenBlock(log.NewNopLogger(), filepath.Join(dir, blockID.String()), nil) + require.NoError(t, err) + + defer func() { + require.NoError(t, block.Close()) + }() + + totalSeries += block.Meta().Stats.NumSeries + + idxr, err := block.Index() + require.NoError(t, err) + + defer func() { + require.NoError(t, idxr.Close()) + }() + + k, v := index.AllPostingsKey() + p, err := idxr.Postings(k, v) + require.NoError(t, err) + + var lbls labels.Labels + for p.Next() { + ref := p.At() + require.NoError(t, idxr.Series(ref, &lbls, nil)) + + require.Equal(t, uint64(shardIndex), lbls.Hash()%shardCount) + } + require.NoError(t, p.Err()) + } + + require.Equal(t, uint64(series), totalSeries) + }) + } + } +} + func TestCompaction_populateBlock(t *testing.T) { for _, tc := range []struct { title string @@ -496,7 +583,7 @@ func TestCompaction_populateBlock(t *testing.T) { { title: "Populate block from empty input should return error.", inputSeriesSamples: [][]seriesSamples{}, - expErr: errors.New("cannot populate block from no readers"), + expErr: errors.New("cannot populate block(s) from no readers"), }, { // Populate from single block without chunks. We expect these kind of series being ignored. @@ -952,7 +1039,8 @@ func TestCompaction_populateBlock(t *testing.T) { } iw := &mockIndexWriter{} - err = c.populateBlock(blocks, meta, iw, nopChunkWriter{}) + ob := shardedBlock{meta: meta, indexw: iw, chunkw: nopChunkWriter{}} + err = c.populateBlock(blocks, meta.MinTime, meta.MaxTime, []shardedBlock{ob}) if tc.expErr != nil { require.Error(t, err) require.Equal(t, tc.expErr.Error(), err.Error()) From 63dbb1c69a3ca7ceba59d721e849cfc7abbb93b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 27 Sep 2021 12:49:08 +0200 Subject: [PATCH 2/9] Make lint happy. --- tsdb/compact.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index c8fa5b4ce6..95e3126ed4 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -462,7 +462,7 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh uids = append(uids, meta.ULID.String()) } - outBlocks := make([]shardedBlock, shardCount, shardCount) + outBlocks := make([]shardedBlock, shardCount) for ix := range outBlocks { uid := ulid.MustNew(ulid.Now(), rand.Reader) outBlocks[ix] = shardedBlock{ @@ -472,7 +472,7 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh err = c.write(dest, outBlocks, blocks...) if err == nil { - ulids := make([]ulid.ULID, len(outBlocks), len(outBlocks)) + ulids := make([]ulid.ULID, len(outBlocks)) for ix := range outBlocks { meta := outBlocks[ix].meta From 006c2d7d555db1ef97aa4c4c57888c0889a95278 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 27 Sep 2021 14:22:51 +0200 Subject: [PATCH 3/9] All output blocks will have the same timestamp. Minor updates. --- tsdb/compact.go | 25 +++++++++++-------------- tsdb/compact_test.go | 8 ++++++++ 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index 95e3126ed4..1144d017e9 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -463,11 +463,9 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh } outBlocks := make([]shardedBlock, shardCount) + outBlocksTime := ulid.Now() // Make all out blocks share the same timestamp in the ULID. for ix := range outBlocks { - uid := ulid.MustNew(ulid.Now(), rand.Reader) - outBlocks[ix] = shardedBlock{ - meta: CompactBlockMetas(uid, metas...), - } + outBlocks[ix] = shardedBlock{meta: CompactBlockMetas(ulid.MustNew(outBlocksTime, rand.Reader), metas...)} } err = c.write(dest, outBlocks, blocks...) @@ -477,6 +475,8 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh for ix := range outBlocks { meta := outBlocks[ix].meta + ulids[ix] = outBlocks[ix].meta.ULID + if meta.Stats.NumSamples == 0 { for _, b := range bs { b.meta.Compaction.Deletable = true @@ -490,7 +490,7 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh } b.numBytesMeta = n } - outBlocks[ix].meta.ULID = ulid.ULID{} + ulids[ix] = ulid.ULID{} level.Info(c.logger).Log( "msg", "compact blocks resulted in empty block", "count", len(blocks), @@ -510,8 +510,6 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), ) } - - ulids[ix] = outBlocks[ix].meta.ULID } return ulids, nil } @@ -599,12 +597,12 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks . err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err() // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. - for ix := 0; ix < len(outBlocks); ix++ { - if outBlocks[ix].tmpDir == "" { + for _, ob := range outBlocks { + if ob.tmpDir == "" { continue } - if err := os.RemoveAll(outBlocks[ix].tmpDir); err != nil { + if err := os.RemoveAll(ob.tmpDir); err != nil { level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) } } @@ -679,8 +677,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks . return errs.Err() } - for ix := range outBlocks { - ob := outBlocks[ix] + for _, ob := range outBlocks { // Populated block is empty, don't write meta file for it. if ob.meta.Stats.NumSamples == 0 { continue @@ -804,8 +801,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, } for symbols.Next() { - for ix := range outBlocks { - if err := outBlocks[ix].indexw.AddSymbol(symbols.At()); err != nil { + for _, ob := range outBlocks { + if err := ob.indexw.AddSymbol(symbols.At()); err != nil { return errors.Wrap(err, "add symbol") } } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 3a6a1c8f11..87270ffa1a 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -529,12 +529,20 @@ func TestCompaction_CompactWithSplitting(t *testing.T) { // 2) Verify that total number of series over all blocks is correct. totalSeries := uint64(0) + ts := uint64(0) for shardIndex, blockID := range blockIDs { // Some blocks may be empty, they will have zero block ID. if blockID == (ulid.ULID{}) { continue } + // All blocks have the same timestamp. + if ts == 0 { + ts = blockID.Time() + } else { + require.Equal(t, ts, blockID.Time()) + } + block, err := OpenBlock(log.NewNopLogger(), filepath.Join(dir, blockID.String()), nil) require.NoError(t, err) From 336f4260db34f9c7a307c9502cc16997b24a1704 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 27 Sep 2021 14:34:04 +0200 Subject: [PATCH 4/9] Removed commented code. --- tsdb/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index 1144d017e9..7db7f52846 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -759,7 +759,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, if i > 0 && b.Meta().MinTime < globalMaxt { c.metrics.overlappingBlocks.Inc() overlapping = true - level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction" /* "ulid", meta.ULID */) + level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction") } if b.Meta().MaxTime > globalMaxt { globalMaxt = b.Meta().MaxTime From 58dab3de8a62c85748e7a8751e3017e47dd1bd05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 27 Sep 2021 16:24:46 +0200 Subject: [PATCH 5/9] Source blocks are deletable only if they are ALL empty. --- tsdb/compact.go | 35 ++++++++++++++---------- tsdb/compact_test.go | 65 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 15 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index 7db7f52846..d5952bec25 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -471,26 +471,12 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh err = c.write(dest, outBlocks, blocks...) if err == nil { ulids := make([]ulid.ULID, len(outBlocks)) + allOutputBlocksAreEmpty := true for ix := range outBlocks { meta := outBlocks[ix].meta - ulids[ix] = outBlocks[ix].meta.ULID - if meta.Stats.NumSamples == 0 { - for _, b := range bs { - b.meta.Compaction.Deletable = true - n, err := writeMetaFile(c.logger, b.dir, &b.meta) - if err != nil { - level.Error(c.logger).Log( - "msg", "Failed to write 'Deletable' to meta file after compaction", - "ulid", b.meta.ULID, - "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), - ) - } - b.numBytesMeta = n - } - ulids[ix] = ulid.ULID{} level.Info(c.logger).Log( "msg", "compact blocks resulted in empty block", "count", len(blocks), @@ -499,6 +485,9 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), ) } else { + allOutputBlocksAreEmpty = false + ulids[ix] = outBlocks[ix].meta.ULID + level.Info(c.logger).Log( "msg", "compact blocks", "count", len(blocks), @@ -511,6 +500,22 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh ) } } + + if allOutputBlocksAreEmpty { + // Mark source blocks as deletable. + for _, b := range bs { + b.meta.Compaction.Deletable = true + n, err := writeMetaFile(c.logger, b.dir, &b.meta) + if err != nil { + level.Error(c.logger).Log( + "msg", "Failed to write 'Deletable' to meta file after compaction", + "ulid", b.meta.ULID, + ) + } + b.numBytesMeta = n + } + } + return ulids, nil } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 87270ffa1a..11171a69e3 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -15,6 +15,7 @@ package tsdb import ( "context" + "crypto/rand" "fmt" "io/ioutil" "math" @@ -574,11 +575,75 @@ func TestCompaction_CompactWithSplitting(t *testing.T) { } require.Equal(t, uint64(series), totalSeries) + + // Source blocks are *not* deletable. + for _, b := range openBlocks { + require.False(t, b.meta.Compaction.Deletable) + } }) } } } +func TestCompaction_CompactEmptyBlocks(t *testing.T) { + dir, err := ioutil.TempDir("", "compact") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + + ranges := [][2]int64{{0, 5000}, {3000, 8000}, {6000, 11000}, {9000, 14000}} + + // Generate blocks. + var blockDirs []string + + for _, r := range ranges { + // Generate blocks using index and chunk writer. CreateBlock would not return valid block for 0 series. + id := ulid.MustNew(ulid.Now(), rand.Reader) + m := &BlockMeta{ + ULID: id, + MinTime: r[0], + MaxTime: r[1], + Compaction: BlockMetaCompaction{Level: 1, Sources: []ulid.ULID{id}}, + Version: metaVersion1, + } + + bdir := filepath.Join(dir, id.String()) + require.NoError(t, os.Mkdir(bdir, 0777)) + require.NoError(t, os.Mkdir(chunkDir(bdir), 0777)) + + _, err := writeMetaFile(log.NewNopLogger(), bdir, m) + require.NoError(t, err) + + iw, err := index.NewWriter(context.Background(), filepath.Join(bdir, indexFilename)) + require.NoError(t, err) + + require.NoError(t, iw.AddSymbol("hello")) + require.NoError(t, iw.AddSymbol("world")) + require.NoError(t, iw.Close()) + + blockDirs = append(blockDirs, bdir) + } + + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) + require.NoError(t, err) + + blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5) + require.NoError(t, err) + + // There are no output blocks. + for _, b := range blockIDs { + require.Equal(t, ulid.ULID{}, b) + } + + // All source blocks are now marked for deletion. + for _, b := range blockDirs { + meta, _, err := readMetaFile(b) + require.NoError(t, err) + require.True(t, meta.Compaction.Deletable) + } +} + func TestCompaction_populateBlock(t *testing.T) { for _, tc := range []struct { title string From ffd281ab9de234592634fafc5f762ccd59269862 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 27 Sep 2021 16:33:43 +0200 Subject: [PATCH 6/9] Address feedback. --- tsdb/compact.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index d5952bec25..aea37fe95b 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -601,12 +601,12 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks . defer func(t time.Time) { err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err() - // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. for _, ob := range outBlocks { if ob.tmpDir == "" { continue } + // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. if err := os.RemoveAll(ob.tmpDir); err != nil { level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) } @@ -661,12 +661,15 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks . outBlocks[ix].indexw = indexw } + // We use MinTime and MaxTime from first output block, because ALL output blocks have the same min/max times set. if err := c.populateBlock(blocks, outBlocks[0].meta.MinTime, outBlocks[0].meta.MaxTime, outBlocks); err != nil { return errors.Wrap(err, "populate block") } - if err := c.ctx.Err(); err != nil { - return err + select { + case <-c.ctx.Done(): + return c.ctx.Err() + default: } // We are explicitly closing them here to check for error even @@ -703,7 +706,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks . } defer func() { if df != nil { - _ = df.Close() + df.Close() } }() From 861f9083d864f29344d010c26c458e58eab5c8a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 27 Sep 2021 17:05:14 +0200 Subject: [PATCH 7/9] Fix directory cleanup in case of compaction failure. --- tsdb/compact.go | 18 +++++++++++++----- tsdb/compact_test.go | 21 ++++++++++++++++++--- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index aea37fe95b..042191875d 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -602,13 +602,21 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks . err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err() for _, ob := range outBlocks { - if ob.tmpDir == "" { - continue + if ob.tmpDir != "" { + // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. + if removeErr := os.RemoveAll(ob.tmpDir); removeErr != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "dir", ob.tmpDir, "err", removeErr.Error()) + } } - // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. - if err := os.RemoveAll(ob.tmpDir); err != nil { - level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + // If there was any error, and we have multiple output blocks, some blocks may have been generated, or at + // least have existing blockDir. In such case, we want to remove them. + // BlockDir may also not be set yet, if preparation for some previous blocks have failed. + if err != nil && ob.blockDir != "" { + // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. + if removeErr := os.RemoveAll(ob.blockDir); removeErr != nil { + level.Error(c.logger).Log("msg", "removed block folder after failed compaction", "dir", ob.blockDir, "err", removeErr.Error()) + } } } c.metrics.ran.Inc() diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 11171a69e3..b24f0b1bd4 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -443,9 +443,24 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { require.NoError(t, os.RemoveAll(tmpdir)) }() - require.Error(t, compactor.write(tmpdir, []shardedBlock{{meta: &BlockMeta{}}}, erringBReader{})) - _, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix) - require.True(t, os.IsNotExist(err), "directory is not cleaned up") + shardedBlocks := []shardedBlock{ + {meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), rand.Reader)}}, + {meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), rand.Reader)}}, + {meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), rand.Reader)}}, + } + + require.Error(t, compactor.write(tmpdir, shardedBlocks, erringBReader{})) + + // We rely on the fact that blockDir and tmpDir will be updated by compactor.write. + for _, b := range shardedBlocks { + require.NotEmpty(t, b.tmpDir) + _, err = os.Stat(b.tmpDir) + require.True(t, os.IsNotExist(err), "tmp directory is not cleaned up") + + require.NotEmpty(t, b.blockDir) + _, err = os.Stat(b.blockDir) + require.True(t, os.IsNotExist(err), "block directory is not cleaned up") + } } func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta { From db7fa7621ca058d0b3cdeedc912a8fbb3a39cd50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 28 Sep 2021 10:13:48 +0200 Subject: [PATCH 8/9] Use _of_ formatting for better readibility. --- tsdb/compact.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index 042191875d..79ec8f60f7 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -482,7 +482,7 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh "count", len(blocks), "sources", fmt.Sprintf("%v", uids), "duration", time.Since(start), - "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), + "shard", fmt.Sprintf("%d_of_%d", ix+1, shardCount), ) } else { allOutputBlocksAreEmpty = false @@ -496,7 +496,7 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh "ulid", meta.ULID, "sources", fmt.Sprintf("%v", uids), "duration", time.Since(start), - "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), + "shard", fmt.Sprintf("%d_of_%d", ix+1, shardCount), ) } } From 57daf79192485494cb49b04c5dab5265ece3ed4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 28 Sep 2021 10:29:54 +0200 Subject: [PATCH 9/9] More review feedback. --- tsdb/compact.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index 79ec8f60f7..167378bd75 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -605,7 +605,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks . if ob.tmpDir != "" { // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. if removeErr := os.RemoveAll(ob.tmpDir); removeErr != nil { - level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "dir", ob.tmpDir, "err", removeErr.Error()) + level.Error(c.logger).Log("msg", "Failed to remove temp folder after failed compaction", "dir", ob.tmpDir, "err", removeErr.Error()) } } @@ -615,7 +615,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks . if err != nil && ob.blockDir != "" { // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. if removeErr := os.RemoveAll(ob.blockDir); removeErr != nil { - level.Error(c.logger).Log("msg", "removed block folder after failed compaction", "dir", ob.blockDir, "err", removeErr.Error()) + level.Error(c.logger).Log("msg", "Failed to remove block folder after failed compaction", "dir", ob.blockDir, "err", removeErr.Error()) } } } @@ -828,7 +828,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, } var ( - ref = uint64(0) + refs = make([]uint64, len(outBlocks)) chks []chunks.Meta ) @@ -864,22 +864,22 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, continue } - ob := outBlocks[0] + obIx := uint64(0) if len(outBlocks) > 1 { - ob = outBlocks[s.Labels().Hash()%uint64(len(outBlocks))] + obIx = s.Labels().Hash() % uint64(len(outBlocks)) } - if err := ob.chunkw.WriteChunks(chks...); err != nil { + if err := outBlocks[obIx].chunkw.WriteChunks(chks...); err != nil { return errors.Wrap(err, "write chunks") } - if err := ob.indexw.AddSeries(ref, s.Labels(), chks...); err != nil { + if err := outBlocks[obIx].indexw.AddSeries(refs[obIx], s.Labels(), chks...); err != nil { return errors.Wrap(err, "add series") } - ob.meta.Stats.NumChunks += uint64(len(chks)) - ob.meta.Stats.NumSeries++ + outBlocks[obIx].meta.Stats.NumChunks += uint64(len(chks)) + outBlocks[obIx].meta.Stats.NumSeries++ for _, chk := range chks { - ob.meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) + outBlocks[obIx].meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) } for _, chk := range chks { @@ -887,7 +887,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, return errors.Wrap(err, "put chunk") } } - ref++ + refs[obIx]++ } if set.Err() != nil { return errors.Wrap(set.Err(), "iterate compaction set")