mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Source blocks are deletable only if they are ALL empty.
This commit is contained in:
parent
336f4260db
commit
58dab3de8a
|
@ -471,26 +471,12 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh
|
||||||
err = c.write(dest, outBlocks, blocks...)
|
err = c.write(dest, outBlocks, blocks...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ulids := make([]ulid.ULID, len(outBlocks))
|
ulids := make([]ulid.ULID, len(outBlocks))
|
||||||
|
allOutputBlocksAreEmpty := true
|
||||||
|
|
||||||
for ix := range outBlocks {
|
for ix := range outBlocks {
|
||||||
meta := outBlocks[ix].meta
|
meta := outBlocks[ix].meta
|
||||||
|
|
||||||
ulids[ix] = outBlocks[ix].meta.ULID
|
|
||||||
|
|
||||||
if meta.Stats.NumSamples == 0 {
|
if meta.Stats.NumSamples == 0 {
|
||||||
for _, b := range bs {
|
|
||||||
b.meta.Compaction.Deletable = true
|
|
||||||
n, err := writeMetaFile(c.logger, b.dir, &b.meta)
|
|
||||||
if err != nil {
|
|
||||||
level.Error(c.logger).Log(
|
|
||||||
"msg", "Failed to write 'Deletable' to meta file after compaction",
|
|
||||||
"ulid", b.meta.ULID,
|
|
||||||
"shard", fmt.Sprintf("%d_of_%d", ix, shardCount),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
b.numBytesMeta = n
|
|
||||||
}
|
|
||||||
ulids[ix] = ulid.ULID{}
|
|
||||||
level.Info(c.logger).Log(
|
level.Info(c.logger).Log(
|
||||||
"msg", "compact blocks resulted in empty block",
|
"msg", "compact blocks resulted in empty block",
|
||||||
"count", len(blocks),
|
"count", len(blocks),
|
||||||
|
@ -499,6 +485,9 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh
|
||||||
"shard", fmt.Sprintf("%d_of_%d", ix, shardCount),
|
"shard", fmt.Sprintf("%d_of_%d", ix, shardCount),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
|
allOutputBlocksAreEmpty = false
|
||||||
|
ulids[ix] = outBlocks[ix].meta.ULID
|
||||||
|
|
||||||
level.Info(c.logger).Log(
|
level.Info(c.logger).Log(
|
||||||
"msg", "compact blocks",
|
"msg", "compact blocks",
|
||||||
"count", len(blocks),
|
"count", len(blocks),
|
||||||
|
@ -511,6 +500,22 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if allOutputBlocksAreEmpty {
|
||||||
|
// Mark source blocks as deletable.
|
||||||
|
for _, b := range bs {
|
||||||
|
b.meta.Compaction.Deletable = true
|
||||||
|
n, err := writeMetaFile(c.logger, b.dir, &b.meta)
|
||||||
|
if err != nil {
|
||||||
|
level.Error(c.logger).Log(
|
||||||
|
"msg", "Failed to write 'Deletable' to meta file after compaction",
|
||||||
|
"ulid", b.meta.ULID,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
b.numBytesMeta = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ulids, nil
|
return ulids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
|
@ -574,11 +575,75 @@ func TestCompaction_CompactWithSplitting(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, uint64(series), totalSeries)
|
require.Equal(t, uint64(series), totalSeries)
|
||||||
|
|
||||||
|
// Source blocks are *not* deletable.
|
||||||
|
for _, b := range openBlocks {
|
||||||
|
require.False(t, b.meta.Compaction.Deletable)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCompaction_CompactEmptyBlocks(t *testing.T) {
|
||||||
|
dir, err := ioutil.TempDir("", "compact")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
|
ranges := [][2]int64{{0, 5000}, {3000, 8000}, {6000, 11000}, {9000, 14000}}
|
||||||
|
|
||||||
|
// Generate blocks.
|
||||||
|
var blockDirs []string
|
||||||
|
|
||||||
|
for _, r := range ranges {
|
||||||
|
// Generate blocks using index and chunk writer. CreateBlock would not return valid block for 0 series.
|
||||||
|
id := ulid.MustNew(ulid.Now(), rand.Reader)
|
||||||
|
m := &BlockMeta{
|
||||||
|
ULID: id,
|
||||||
|
MinTime: r[0],
|
||||||
|
MaxTime: r[1],
|
||||||
|
Compaction: BlockMetaCompaction{Level: 1, Sources: []ulid.ULID{id}},
|
||||||
|
Version: metaVersion1,
|
||||||
|
}
|
||||||
|
|
||||||
|
bdir := filepath.Join(dir, id.String())
|
||||||
|
require.NoError(t, os.Mkdir(bdir, 0777))
|
||||||
|
require.NoError(t, os.Mkdir(chunkDir(bdir), 0777))
|
||||||
|
|
||||||
|
_, err := writeMetaFile(log.NewNopLogger(), bdir, m)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
iw, err := index.NewWriter(context.Background(), filepath.Join(bdir, indexFilename))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, iw.AddSymbol("hello"))
|
||||||
|
require.NoError(t, iw.AddSymbol("world"))
|
||||||
|
require.NoError(t, iw.Close())
|
||||||
|
|
||||||
|
blockDirs = append(blockDirs, bdir)
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// There are no output blocks.
|
||||||
|
for _, b := range blockIDs {
|
||||||
|
require.Equal(t, ulid.ULID{}, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// All source blocks are now marked for deletion.
|
||||||
|
for _, b := range blockDirs {
|
||||||
|
meta, _, err := readMetaFile(b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, meta.Compaction.Deletable)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCompaction_populateBlock(t *testing.T) {
|
func TestCompaction_populateBlock(t *testing.T) {
|
||||||
for _, tc := range []struct {
|
for _, tc := range []struct {
|
||||||
title string
|
title string
|
||||||
|
|
Loading…
Reference in a new issue