Compactor: Open blocks concurrently (#67)

* Open blocks concurrently.
This commit is contained in:
Peter Štibraný 2021-12-02 12:42:29 +01:00 committed by GitHub
parent 2fe806f93f
commit c31dd6c8b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 233 additions and 52 deletions

View file

@ -22,6 +22,7 @@ func main() {
segmentSizeMB int64
maxClosingBlocks int
symbolFlushers int
openConcurrency int
)
flag.StringVar(&outputDir, "output-dir", ".", "Output directory for new block(s)")
@ -30,13 +31,13 @@ func main() {
flag.Int64Var(&segmentSizeMB, "segment-file-size", 512, "Size of segment file")
flag.IntVar(&maxClosingBlocks, "max-closing-blocks", 2, "Number of blocks that can close at once during split compaction")
flag.IntVar(&symbolFlushers, "symbol-flushers", 4, "Number of symbol flushers used during split compaction")
flag.IntVar(&openConcurrency, "open-concurrency", 4, "Number of goroutines used when opening blocks")
flag.Parse()
logger := golog.NewLogfmtLogger(os.Stderr)
var blockDirs []string
var blocks []*tsdb.Block
for _, d := range flag.Args() {
s, err := os.Stat(d)
if err != nil {
@ -45,16 +46,7 @@ func main() {
if !s.IsDir() {
log.Fatalln("not a directory: ", d)
}
blockDirs = append(blockDirs, d)
b, err := tsdb.OpenBlock(logger, d, nil)
if err != nil {
log.Fatalln("failed to open block:", d, err)
}
blocks = append(blocks, b)
defer b.Close()
}
if len(blockDirs) == 0 {
@ -84,12 +76,13 @@ func main() {
log.Fatalln("creating compator", err)
}
opts := tsdb.DefaultConcurrencyOptions()
opts := tsdb.DefaultLeveledCompactorConcurrencyOptions()
opts.MaxClosingBlocks = maxClosingBlocks
opts.SymbolsFlushersCount = symbolFlushers
opts.MaxOpeningBlocks = openConcurrency
c.SetConcurrencyOptions(opts)
_, err = c.CompactWithSplitting(outputDir, blockDirs, blocks, uint64(shardCount))
_, err = c.CompactWithSplitting(outputDir, blockDirs, nil, uint64(shardCount))
if err != nil {
log.Fatalln("compacting", err)
}

View file

@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"time"
"github.com/go-kit/log"
@ -29,6 +30,7 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"
"github.com/prometheus/prometheus/model/labels"
@ -86,7 +88,7 @@ type LeveledCompactor struct {
maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
concurrencyOpts ConcurrencyOptions
concurrencyOpts LeveledCompactorConcurrencyOptions
}
type compactorMetrics struct {
@ -175,24 +177,26 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
concurrencyOpts: DefaultConcurrencyOptions(),
concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(),
}, nil
}
// ConcurrencyOptions used by LeveledCompactor.
type ConcurrencyOptions struct {
MaxClosingBlocks int // Max number of blocks that can be closed concurrently during split compaction.
// LeveledCompactorConcurrencyOptions is a collection of concurrency options used by LeveledCompactor.
type LeveledCompactorConcurrencyOptions struct {
MaxOpeningBlocks int // Number of goroutines opening blocks before compaction.
MaxClosingBlocks int // Max number of blocks that can be closed concurrently during split compaction. Note that closing of newly compacted block uses a lot of memory for writing index.
SymbolsFlushersCount int // Number of symbols flushers used when doing split compaction.
}
func DefaultConcurrencyOptions() ConcurrencyOptions {
return ConcurrencyOptions{
func DefaultLeveledCompactorConcurrencyOptions() LeveledCompactorConcurrencyOptions {
return LeveledCompactorConcurrencyOptions{
MaxClosingBlocks: 1,
SymbolsFlushersCount: 1,
MaxOpeningBlocks: 1,
}
}
func (c *LeveledCompactor) SetConcurrencyOptions(opts ConcurrencyOptions) {
func (c *LeveledCompactor) SetConcurrencyOptions(opts LeveledCompactorConcurrencyOptions) {
c.concurrencyOpts = opts
}
@ -444,44 +448,27 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh
shardCount = 1
}
start := time.Now()
bs, blocksToClose, err := openBlocksForCompaction(dirs, open, c.logger, c.chunkPool, c.concurrencyOpts.MaxOpeningBlocks)
for _, b := range blocksToClose {
defer b.Close()
}
if err != nil {
return nil, err
}
var (
blocks []BlockReader
bs []*Block
metas []*BlockMeta
uids []string
)
start := time.Now()
for _, d := range dirs {
meta, _, err := readMetaFile(d)
if err != nil {
return nil, err
}
var b *Block
// Use already open blocks if we can, to avoid
// having the index data in memory twice.
for _, o := range open {
if meta.ULID == o.Meta().ULID {
b = o
break
}
}
if b == nil {
var err error
b, err = OpenBlock(c.logger, d, c.chunkPool)
if err != nil {
return nil, err
}
defer b.Close()
}
metas = append(metas, meta)
for _, b := range bs {
blocks = append(blocks, b)
bs = append(bs, b)
uids = append(uids, meta.ULID.String())
m := b.Meta()
metas = append(metas, &m)
uids = append(uids, b.meta.ULID.String())
}
outBlocks := make([]shardedBlock, shardCount)
@ -1111,3 +1098,89 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo
return nil
}
// Returns opened blocks, and blocks that should be closed (also returned in case of error).
func openBlocksForCompaction(dirs []string, open []*Block, logger log.Logger, pool chunkenc.Pool, concurrency int) (blocks, blocksToClose []*Block, _ error) {
blocks = make([]*Block, 0, len(dirs))
blocksToClose = make([]*Block, 0, len(dirs))
toOpenCh := make(chan string, len(dirs))
for _, d := range dirs {
meta, _, err := readMetaFile(d)
if err != nil {
return nil, blocksToClose, err
}
var b *Block
// Use already open blocks if we can, to avoid
// having the index data in memory twice.
for _, o := range open {
if meta.ULID == o.Meta().ULID {
b = o
break
}
}
if b != nil {
blocks = append(blocks, b)
} else {
toOpenCh <- d
}
}
close(toOpenCh)
type openResult struct {
b *Block
err error
}
openResultCh := make(chan openResult, len(toOpenCh))
// Signals to all opening goroutines that there was an error opening some block, and they can stop early.
// If openingError is true, at least one error is sent to openResultCh.
openingError := atomic.NewBool(false)
wg := sync.WaitGroup{}
if len(dirs) < concurrency {
concurrency = len(dirs)
}
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for d := range toOpenCh {
if openingError.Load() {
return
}
b, err := OpenBlock(logger, d, pool)
openResultCh <- openResult{b: b, err: err}
if err != nil {
openingError.Store(true)
return
}
}
}()
}
wg.Wait()
// All writers to openResultCh have stopped, we can close the output channel, so we can range over it.
close(openResultCh)
var firstErr error
for or := range openResultCh {
if or.err != nil {
// Don't stop on error, but iterate over all opened blocks to collect blocksToClose.
if firstErr == nil {
firstErr = or.err
}
} else {
blocks = append(blocks, or.b)
blocksToClose = append(blocksToClose, or.b)
}
}
return blocks, blocksToClose, firstErr
}

View file

@ -1513,3 +1513,118 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
})
}
}
func TestOpenBlocksForCompaction(t *testing.T) {
dir := t.TempDir()
const blocks = 5
var blockDirs []string
for ix := 0; ix < blocks; ix++ {
d := createBlock(t, dir, genSeries(100, 10, 0, 5000))
blockDirs = append(blockDirs, d)
}
// Open subset of blocks first.
const blocksToOpen = 2
opened, toClose, err := openBlocksForCompaction(blockDirs[:blocksToOpen], nil, log.NewNopLogger(), nil, 10)
for _, b := range toClose {
defer func(b *Block) { require.NoError(t, b.Close()) }(b)
}
require.NoError(t, err)
checkBlocks(t, opened, blockDirs[:blocksToOpen]...)
checkBlocks(t, toClose, blockDirs[:blocksToOpen]...)
// Open all blocks, but provide previously opened blocks.
opened2, toClose2, err := openBlocksForCompaction(blockDirs, opened, log.NewNopLogger(), nil, 10)
for _, b := range toClose2 {
defer func(b *Block) { require.NoError(t, b.Close()) }(b)
}
require.NoError(t, err)
checkBlocks(t, opened2, blockDirs...)
checkBlocks(t, toClose2, blockDirs[blocksToOpen:]...)
}
func TestOpenBlocksForCompactionErrorsNoMeta(t *testing.T) {
dir := t.TempDir()
const blocks = 5
var blockDirs []string
for ix := 0; ix < blocks; ix++ {
d := createBlock(t, dir, genSeries(100, 10, 0, 5000))
blockDirs = append(blockDirs, d)
if ix == 3 {
blockDirs = append(blockDirs, path.Join(dir, "invalid-block"))
}
}
// open block[0]
b0, err := OpenBlock(log.NewNopLogger(), blockDirs[0], nil)
require.NoError(t, err)
defer func() { require.NoError(t, b0.Close()) }()
_, toClose, err := openBlocksForCompaction(blockDirs, []*Block{b0}, log.NewNopLogger(), nil, 10)
require.Error(t, err)
// We didn't get to opening more blocks, because we found invalid dir, so there is nothing to close.
require.Empty(t, toClose)
}
func TestOpenBlocksForCompactionErrorsMissingIndex(t *testing.T) {
dir := t.TempDir()
const blocks = 5
var blockDirs []string
for ix := 0; ix < blocks; ix++ {
d := createBlock(t, dir, genSeries(100, 10, 0, 5000))
blockDirs = append(blockDirs, d)
if ix == 3 {
require.NoError(t, os.Remove(path.Join(d, indexFilename)))
}
}
// open block[1]
b1, err := OpenBlock(log.NewNopLogger(), blockDirs[1], nil)
require.NoError(t, err)
defer func() { require.NoError(t, b1.Close()) }()
// We use concurrency = 1 to simplify the test.
// Block[0] will be opened correctly.
// Block[1] is already opened.
// Block[2] will be opened correctly.
// Block[3] is invalid and will cause error.
// Block[4] will not be opened at all.
opened, toClose, err := openBlocksForCompaction(blockDirs, []*Block{b1}, log.NewNopLogger(), nil, 1)
for _, b := range toClose {
defer func(b *Block) { require.NoError(t, b.Close()) }(b)
}
require.Error(t, err)
checkBlocks(t, opened, blockDirs[0:3]...)
checkBlocks(t, toClose, blockDirs[0], blockDirs[2])
}
// Check that blocks match IDs from directories.
func checkBlocks(t *testing.T, blocks []*Block, dirs ...string) {
t.Helper()
blockIDs := map[string]struct{}{}
for _, b := range blocks {
blockIDs[b.Meta().ULID.String()] = struct{}{}
}
dirBlockIDs := map[string]struct{}{}
for _, d := range dirs {
m, _, err := readMetaFile(d)
require.NoError(t, err)
dirBlockIDs[m.ULID.String()] = struct{}{}
}
require.Equal(t, blockIDs, dirBlockIDs)
}