diff --git a/tsdb/compact.go b/tsdb/compact.go index 7db7f5284..d5952bec2 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -471,26 +471,12 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh err = c.write(dest, outBlocks, blocks...) if err == nil { ulids := make([]ulid.ULID, len(outBlocks)) + allOutputBlocksAreEmpty := true for ix := range outBlocks { meta := outBlocks[ix].meta - ulids[ix] = outBlocks[ix].meta.ULID - if meta.Stats.NumSamples == 0 { - for _, b := range bs { - b.meta.Compaction.Deletable = true - n, err := writeMetaFile(c.logger, b.dir, &b.meta) - if err != nil { - level.Error(c.logger).Log( - "msg", "Failed to write 'Deletable' to meta file after compaction", - "ulid", b.meta.ULID, - "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), - ) - } - b.numBytesMeta = n - } - ulids[ix] = ulid.ULID{} level.Info(c.logger).Log( "msg", "compact blocks resulted in empty block", "count", len(blocks), @@ -499,6 +485,9 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh "shard", fmt.Sprintf("%d_of_%d", ix, shardCount), ) } else { + allOutputBlocksAreEmpty = false + ulids[ix] = outBlocks[ix].meta.ULID + level.Info(c.logger).Log( "msg", "compact blocks", "count", len(blocks), @@ -511,6 +500,22 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh ) } } + + if allOutputBlocksAreEmpty { + // Mark source blocks as deletable. + for _, b := range bs { + b.meta.Compaction.Deletable = true + n, err := writeMetaFile(c.logger, b.dir, &b.meta) + if err != nil { + level.Error(c.logger).Log( + "msg", "Failed to write 'Deletable' to meta file after compaction", + "ulid", b.meta.ULID, + ) + } + b.numBytesMeta = n + } + } + return ulids, nil } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 87270ffa1..11171a69e 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -15,6 +15,7 @@ package tsdb import ( "context" + "crypto/rand" "fmt" "io/ioutil" "math" @@ -574,11 +575,75 @@ func TestCompaction_CompactWithSplitting(t *testing.T) { } require.Equal(t, uint64(series), totalSeries) + + // Source blocks are *not* deletable. + for _, b := range openBlocks { + require.False(t, b.meta.Compaction.Deletable) + } }) } } } +func TestCompaction_CompactEmptyBlocks(t *testing.T) { + dir, err := ioutil.TempDir("", "compact") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + + ranges := [][2]int64{{0, 5000}, {3000, 8000}, {6000, 11000}, {9000, 14000}} + + // Generate blocks. + var blockDirs []string + + for _, r := range ranges { + // Generate blocks using index and chunk writer. CreateBlock would not return valid block for 0 series. + id := ulid.MustNew(ulid.Now(), rand.Reader) + m := &BlockMeta{ + ULID: id, + MinTime: r[0], + MaxTime: r[1], + Compaction: BlockMetaCompaction{Level: 1, Sources: []ulid.ULID{id}}, + Version: metaVersion1, + } + + bdir := filepath.Join(dir, id.String()) + require.NoError(t, os.Mkdir(bdir, 0777)) + require.NoError(t, os.Mkdir(chunkDir(bdir), 0777)) + + _, err := writeMetaFile(log.NewNopLogger(), bdir, m) + require.NoError(t, err) + + iw, err := index.NewWriter(context.Background(), filepath.Join(bdir, indexFilename)) + require.NoError(t, err) + + require.NoError(t, iw.AddSymbol("hello")) + require.NoError(t, iw.AddSymbol("world")) + require.NoError(t, iw.Close()) + + blockDirs = append(blockDirs, bdir) + } + + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) + require.NoError(t, err) + + blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5) + require.NoError(t, err) + + // There are no output blocks. + for _, b := range blockIDs { + require.Equal(t, ulid.ULID{}, b) + } + + // All source blocks are now marked for deletion. + for _, b := range blockDirs { + meta, _, err := readMetaFile(b) + require.NoError(t, err) + require.True(t, meta.Compaction.Deletable) + } +} + func TestCompaction_populateBlock(t *testing.T) { for _, tc := range []struct { title string