diff --git a/cmd/compact/main.go b/cmd/compact/main.go new file mode 100644 index 0000000000..d3795ec3c2 --- /dev/null +++ b/cmd/compact/main.go @@ -0,0 +1,96 @@ +package main + +import ( + "context" + "flag" + "log" + "os" + "os/signal" + "runtime/pprof" + "syscall" + + golog "github.com/go-kit/log" + + "github.com/prometheus/prometheus/tsdb" +) + +func main() { + var ( + outputDir string + shardCount int + cpuProf string + segmentSizeMB int64 + maxClosingBlocks int + symbolFlushers int + ) + + flag.StringVar(&outputDir, "output-dir", ".", "Output directory for new block(s)") + flag.StringVar(&cpuProf, "cpuprofile", "", "Where to store CPU profile (it not empty)") + flag.IntVar(&shardCount, "shard-count", 1, "Number of shards for splitting") + flag.Int64Var(&segmentSizeMB, "segment-file-size", 512, "Size of segment file") + flag.IntVar(&maxClosingBlocks, "max-closing-blocks", 2, "Number of blocks that can close at once during split compaction") + flag.IntVar(&symbolFlushers, "symbol-flushers", 4, "Number of symbol flushers used during split compaction") + + flag.Parse() + + logger := golog.NewLogfmtLogger(os.Stderr) + + var blockDirs []string + var blocks []*tsdb.Block + for _, d := range flag.Args() { + s, err := os.Stat(d) + if err != nil { + panic(err) + } + if !s.IsDir() { + log.Fatalln("not a directory: ", d) + } + + blockDirs = append(blockDirs, d) + + b, err := tsdb.OpenBlock(logger, d, nil) + if err != nil { + log.Fatalln("failed to open block:", d, err) + } + + blocks = append(blocks, b) + defer b.Close() + } + + if len(blockDirs) == 0 { + log.Fatalln("no blocks to compact") + } + + if cpuProf != "" { + f, err := os.Create(cpuProf) + if err != nil { + log.Fatalln(err) + } + + log.Println("writing to", cpuProf) + err = pprof.StartCPUProfile(f) + if err != nil { + log.Fatalln(err) + } + + defer pprof.StopCPUProfile() + } + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil) + if err != nil { + log.Fatalln("creating compator", err) + } + + opts := tsdb.DefaultConcurrencyOptions() + opts.MaxClosingBlocks = maxClosingBlocks + opts.SymbolsFlushersCount = symbolFlushers + c.SetConcurrencyOptions(opts) + + _, err = c.CompactWithSplitting(outputDir, blockDirs, blocks, uint64(shardCount)) + if err != nil { + log.Fatalln("compacting", err) + } +} diff --git a/tsdb/async_block_writer.go b/tsdb/async_block_writer.go new file mode 100644 index 0000000000..47833def72 --- /dev/null +++ b/tsdb/async_block_writer.go @@ -0,0 +1,166 @@ +package tsdb + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "go.uber.org/atomic" + "golang.org/x/sync/semaphore" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" +) + +// asyncBlockWriter runs a background goroutine that writes series and chunks to the block asynchronously. +type asyncBlockWriter struct { + chunkPool chunkenc.Pool // Where to return chunks after writing. + + chunkw ChunkWriter + indexw IndexWriter + + closeSemaphore *semaphore.Weighted + + seriesChan chan seriesToWrite + finishedCh chan asyncBlockWriterResult + + closed bool + result asyncBlockWriterResult +} + +type asyncBlockWriterResult struct { + stats BlockStats + err error +} + +type seriesToWrite struct { + lbls labels.Labels + chks []chunks.Meta +} + +func newAsyncBlockWriter(chunkPool chunkenc.Pool, chunkw ChunkWriter, indexw IndexWriter, closeSema *semaphore.Weighted) *asyncBlockWriter { + bw := &asyncBlockWriter{ + chunkPool: chunkPool, + chunkw: chunkw, + indexw: indexw, + seriesChan: make(chan seriesToWrite, 64), + finishedCh: make(chan asyncBlockWriterResult, 1), + closeSemaphore: closeSema, + } + + go bw.loop() + return bw +} + +// loop doing the writes. Return value is only used by defer statement, and is sent to the channel, +// before closing it. +func (bw *asyncBlockWriter) loop() (res asyncBlockWriterResult) { + defer func() { + bw.finishedCh <- res + close(bw.finishedCh) + }() + + stats := BlockStats{} + ref := storage.SeriesRef(0) + for sw := range bw.seriesChan { + if err := bw.chunkw.WriteChunks(sw.chks...); err != nil { + return asyncBlockWriterResult{err: errors.Wrap(err, "write chunks")} + } + if err := bw.indexw.AddSeries(ref, sw.lbls, sw.chks...); err != nil { + return asyncBlockWriterResult{err: errors.Wrap(err, "add series")} + } + + stats.NumChunks += uint64(len(sw.chks)) + stats.NumSeries++ + for _, chk := range sw.chks { + stats.NumSamples += uint64(chk.Chunk.NumSamples()) + } + + for _, chk := range sw.chks { + if err := bw.chunkPool.Put(chk.Chunk); err != nil { + return asyncBlockWriterResult{err: errors.Wrap(err, "put chunk")} + } + } + ref++ + } + + err := bw.closeSemaphore.Acquire(context.Background(), 1) + if err != nil { + return asyncBlockWriterResult{err: errors.Wrap(err, "failed to acquire semaphore before closing writers")} + } + defer bw.closeSemaphore.Release(1) + + // If everything went fine with writing so far, close writers. + if err := bw.chunkw.Close(); err != nil { + return asyncBlockWriterResult{err: errors.Wrap(err, "closing chunk writer")} + } + if err := bw.indexw.Close(); err != nil { + return asyncBlockWriterResult{err: errors.Wrap(err, "closing index writer")} + } + + return asyncBlockWriterResult{stats: stats} +} + +func (bw *asyncBlockWriter) addSeries(lbls labels.Labels, chks []chunks.Meta) error { + select { + case bw.seriesChan <- seriesToWrite{lbls: lbls, chks: chks}: + return nil + case result, ok := <-bw.finishedCh: + if ok { + bw.result = result + } + return fmt.Errorf("asyncBlockWriter doesn't run anymore") + } +} + +func (bw *asyncBlockWriter) closeAsync() { + if !bw.closed { + bw.closed = true + + close(bw.seriesChan) + } +} + +func (bw *asyncBlockWriter) waitFinished() (BlockStats, error) { + // Wait for flusher to finish. + result, ok := <-bw.finishedCh + if ok { + bw.result = result + } + + return bw.result.stats, bw.result.err +} + +type preventDoubleCloseIndexWriter struct { + IndexWriter + closed atomic.Bool +} + +func newPreventDoubleCloseIndexWriter(iw IndexWriter) *preventDoubleCloseIndexWriter { + return &preventDoubleCloseIndexWriter{IndexWriter: iw} +} + +func (p *preventDoubleCloseIndexWriter) Close() error { + if p.closed.CAS(false, true) { + return p.IndexWriter.Close() + } + return nil +} + +type preventDoubleCloseChunkWriter struct { + ChunkWriter + closed atomic.Bool +} + +func newPreventDoubleCloseChunkWriter(cw ChunkWriter) *preventDoubleCloseChunkWriter { + return &preventDoubleCloseChunkWriter{ChunkWriter: cw} +} + +func (p *preventDoubleCloseChunkWriter) Close() error { + if p.closed.CAS(false, true) { + return p.ChunkWriter.Close() + } + return nil +} diff --git a/tsdb/compact.go b/tsdb/compact.go index 07ae18e975..46877a2061 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -29,6 +29,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/semaphore" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -84,6 +85,8 @@ type LeveledCompactor struct { ctx context.Context maxBlockChunkSegmentSize int64 mergeFunc storage.VerticalChunkSeriesMergeFunc + + concurrencyOpts ConcurrencyOptions } type compactorMetrics struct { @@ -172,9 +175,27 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register ctx: ctx, maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, mergeFunc: mergeFunc, + concurrencyOpts: DefaultConcurrencyOptions(), }, nil } +// ConcurrencyOptions used by LeveledCompactor. +type ConcurrencyOptions struct { + MaxClosingBlocks int // Max number of blocks that can be closed concurrently during split compaction. + SymbolsFlushersCount int // Number of symbols flushers used when doing split compaction. +} + +func DefaultConcurrencyOptions() ConcurrencyOptions { + return ConcurrencyOptions{ + MaxClosingBlocks: 1, + SymbolsFlushersCount: 1, + } +} + +func (c *LeveledCompactor) SetConcurrencyOptions(opts ConcurrencyOptions) { + c.concurrencyOpts = opts +} + type dirMeta struct { dir string meta *BlockMeta @@ -646,6 +667,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks . if err != nil { return errors.Wrap(err, "open chunk writer") } + chunkw = newPreventDoubleCloseChunkWriter(chunkw) // We now close chunkWriter in populateBlock, but keep it in the closers here as well. closers = append(closers, chunkw) @@ -661,10 +683,12 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks . outBlocks[ix].chunkw = chunkw - indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) + var indexw IndexWriter + indexw, err = index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) if err != nil { return errors.Wrap(err, "open index writer") } + indexw = newPreventDoubleCloseIndexWriter(indexw) // We now close indexWriter in populateBlock, but keep it in the closers here as well. closers = append(closers, indexw) outBlocks[ix].indexw = indexw @@ -904,10 +928,14 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, } } - var ( - refs = make([]storage.SeriesRef, len(outBlocks)) - chks []chunks.Meta - ) + // Semaphore for number of blocks that can be closed at once. + sema := semaphore.NewWeighted(int64(c.concurrencyOpts.MaxClosingBlocks)) + + blockWriters := make([]*asyncBlockWriter, len(outBlocks)) + for ix := range outBlocks { + blockWriters[ix] = newAsyncBlockWriter(c.chunkPool, outBlocks[ix].chunkw, outBlocks[ix].indexw, sema) + defer blockWriters[ix].closeAsync() // Make sure to close writer to stop goroutine. + } set := sets[0] if len(sets) > 1 { @@ -926,7 +954,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, s := set.At() chksIter := s.Iterator() - chks = chks[:0] + var chks []chunks.Meta for chksIter.Next() { // We are not iterating in streaming way over chunk as it's more efficient to do bulk write for index and // chunk file purposes. @@ -948,30 +976,28 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, obIx = s.Labels().Hash() % uint64(len(outBlocks)) } - if err := outBlocks[obIx].chunkw.WriteChunks(chks...); err != nil { - return errors.Wrap(err, "write chunks") + err := blockWriters[obIx].addSeries(s.Labels(), chks) + if err != nil { + return errors.Wrap(err, "adding series") } - if err := outBlocks[obIx].indexw.AddSeries(refs[obIx], s.Labels(), chks...); err != nil { - return errors.Wrap(err, "add series") - } - - outBlocks[obIx].meta.Stats.NumChunks += uint64(len(chks)) - outBlocks[obIx].meta.Stats.NumSeries++ - for _, chk := range chks { - outBlocks[obIx].meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) - } - - for _, chk := range chks { - if err := c.chunkPool.Put(chk.Chunk); err != nil { - return errors.Wrap(err, "put chunk") - } - } - refs[obIx]++ } if set.Err() != nil { return errors.Wrap(set.Err(), "iterate compaction set") } + for ix := range blockWriters { + blockWriters[ix].closeAsync() + } + + for ix := range blockWriters { + stats, err := blockWriters[ix].waitFinished() + if err != nil { + return errors.Wrap(err, "writing block") + } + + outBlocks[ix].meta.Stats = stats + } + return nil } @@ -986,9 +1012,12 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo return errors.New("no output block") } + flushers := newSymbolFlushers(c.concurrencyOpts.SymbolsFlushersCount) + defer flushers.close() // Make sure to stop flushers before exiting to avoid leaking goroutines. + batchers := make([]*symbolsBatcher, len(outBlocks)) for ix := range outBlocks { - batchers[ix] = newSymbolsBatcher(inMemorySymbolsLimit, outBlocks[ix].tmpDir) + batchers[ix] = newSymbolsBatcher(inMemorySymbolsLimit, outBlocks[ix].tmpDir, flushers) // Always include empty symbol. Blocks created from Head always have it in the symbols table, // and if we only include symbols from series, we would skip it. @@ -1023,16 +1052,25 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo } for ix := range outBlocks { - if err := c.ctx.Err(); err != nil { - return err - } - // Flush the batcher to write remaining symbols. if err := batchers[ix].flushSymbols(true); err != nil { return errors.Wrap(err, "flushing batcher") } + } - it, err := newSymbolsIterator(batchers[ix].symbolFiles()) + err := flushers.close() + if err != nil { + return errors.Wrap(err, "closing flushers") + } + + for ix := range outBlocks { + if err := c.ctx.Err(); err != nil { + return err + } + + symbolFiles := batchers[ix].getSymbolFiles() + + it, err := newSymbolsIterator(symbolFiles) if err != nil { return errors.Wrap(err, "opening symbols iterator") } @@ -1064,7 +1102,7 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo // Delete symbol files from symbolsBatcher. We don't need to perform the cleanup if populateSymbols // or compaction fails, because in that case compactor already removes entire (temp) output block directory. - for _, fn := range batchers[ix].symbolFiles() { + for _, fn := range symbolFiles { if err := os.Remove(fn); err != nil { return errors.Wrap(err, "deleting symbols file") } diff --git a/tsdb/symbols_batch.go b/tsdb/symbols_batch.go index 8ef4e9ac28..18cf798579 100644 --- a/tsdb/symbols_batch.go +++ b/tsdb/symbols_batch.go @@ -8,12 +8,120 @@ import ( "os" "path/filepath" "sort" + "sync" "github.com/golang/snappy" "github.com/prometheus/prometheus/tsdb/errors" ) +// symbolFlushers writes symbols to provided files in background goroutines. +type symbolFlushers struct { + jobs chan flusherJob + wg sync.WaitGroup + + closed bool + + errMu sync.Mutex + err error + + pool *sync.Pool +} + +func newSymbolFlushers(concurrency int) *symbolFlushers { + f := &symbolFlushers{ + jobs: make(chan flusherJob), + pool: &sync.Pool{}, + } + + for i := 0; i < concurrency; i++ { + f.wg.Add(1) + go f.loop() + } + + return f +} + +func (f *symbolFlushers) flushSymbols(outputFile string, symbols map[string]struct{}) error { + if len(symbols) == 0 { + return fmt.Errorf("no symbols") + } + + f.errMu.Lock() + err := f.err + f.errMu.Unlock() + + // If there was any error previously, return it. + if err != nil { + return err + } + + f.jobs <- flusherJob{ + outputFile: outputFile, + symbols: symbols, + } + return nil +} + +func (f *symbolFlushers) loop() { + defer f.wg.Done() + + for j := range f.jobs { + var sortedSymbols []string + + pooled := f.pool.Get() + if pooled == nil { + sortedSymbols = make([]string, 0, len(j.symbols)) + } else { + sortedSymbols = pooled.([]string) + sortedSymbols = sortedSymbols[:0] + } + + for s := range j.symbols { + sortedSymbols = append(sortedSymbols, s) + } + sort.Strings(sortedSymbols) + + err := writeSymbolsToFile(j.outputFile, sortedSymbols) + sortedSymbols = sortedSymbols[:0] + + //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. + f.pool.Put(sortedSymbols) + + if err != nil { + f.errMu.Lock() + if f.err == nil { + f.err = err + } + f.errMu.Unlock() + + break + } + } + + for range f.jobs { + // drain the channel, don't do more flushing. only used when error occurs. + } +} + +// Stops and waits until all flusher goroutines are finished. +func (f *symbolFlushers) close() error { + if f.closed { + return f.err + } + + f.closed = true + close(f.jobs) + f.wg.Wait() + + return f.err +} + +type flusherJob struct { + outputFile string + symbols map[string]struct{} +} + // symbolsBatcher keeps buffer of symbols in memory. Once the buffer reaches the size limit (number of symbols), // batcher writes currently buffered symbols to file. At the end remaining symbols must be flushed. After writing // all batches, symbolsBatcher has list of files that can be used together with newSymbolsIterator to iterate @@ -22,15 +130,18 @@ type symbolsBatcher struct { dir string limit int - buffer map[string]struct{} // using map to deduplicate - symbolsFiles []string // paths of symbol files that have been successfully written. + symbolsFiles []string // paths of symbol files, which were sent to flushers for flushing + + buffer map[string]struct{} // using map to deduplicate + flushers *symbolFlushers } -func newSymbolsBatcher(limit int, dir string) *symbolsBatcher { +func newSymbolsBatcher(limit int, dir string, flushers *symbolFlushers) *symbolsBatcher { return &symbolsBatcher{ - limit: limit, - dir: dir, - buffer: make(map[string]struct{}, limit), + limit: limit, + dir: dir, + buffer: make(map[string]struct{}, limit), + flushers: flushers, } } @@ -44,23 +155,21 @@ func (sw *symbolsBatcher) flushSymbols(force bool) error { return nil } - sortedSymbols := make([]string, 0, len(sw.buffer)) - for s := range sw.buffer { - sortedSymbols = append(sortedSymbols, s) + if len(sw.buffer) == 0 { + return nil } - sort.Strings(sortedSymbols) symbolsFile := filepath.Join(sw.dir, fmt.Sprintf("symbols_%d", len(sw.symbolsFiles))) - err := writeSymbolsToFile(symbolsFile, sortedSymbols) - if err == nil { - sw.buffer = make(map[string]struct{}, sw.limit) - sw.symbolsFiles = append(sw.symbolsFiles, symbolsFile) - } + sw.symbolsFiles = append(sw.symbolsFiles, symbolsFile) - return err + buf := sw.buffer + sw.buffer = make(map[string]struct{}, sw.limit) + return sw.flushers.flushSymbols(symbolsFile, buf) } -func (sw *symbolsBatcher) symbolFiles() []string { +// getSymbolFiles returns list of symbol files used to flush symbols to. These files are only valid if flushers +// finish successfully. +func (sw *symbolsBatcher) getSymbolFiles() []string { return sw.symbolsFiles } diff --git a/tsdb/symbols_batch_test.go b/tsdb/symbols_batch_test.go index 8b41d89630..1053f796f4 100644 --- a/tsdb/symbols_batch_test.go +++ b/tsdb/symbols_batch_test.go @@ -8,14 +8,25 @@ import ( "github.com/stretchr/testify/require" ) -func TestSymbolsBatchAndIteration(t *testing.T) { +func TestSymbolsBatchAndIteration1(t *testing.T) { + testSymbolsBatchAndIterationWithFlushersConcurrency(t, 1) +} + +func TestSymbolsBatchAndIteration5(t *testing.T) { + testSymbolsBatchAndIterationWithFlushersConcurrency(t, 5) +} + +func testSymbolsBatchAndIterationWithFlushersConcurrency(t *testing.T, flushersConcurrency int) { + flushers := newSymbolFlushers(flushersConcurrency) + defer func() { _ = flushers.close() }() + dir := t.TempDir() - b := newSymbolsBatcher(100, dir) + b := newSymbolsBatcher(100, dir, flushers) allWords := map[string]struct{}{} - for i := 0; i < 10; i++ { + for i := 0; i < 10*flushersConcurrency; i++ { require.NoError(t, b.addSymbol("")) allWords[""] = struct{}{} @@ -29,8 +40,12 @@ func TestSymbolsBatchAndIteration(t *testing.T) { } require.NoError(t, b.flushSymbols(true)) + require.NoError(t, b.flushSymbols(true)) // call again, this should do nothing, and not create new empty file. + require.NoError(t, flushers.close()) - it, err := newSymbolsIterator(b.symbolFiles()) + symbols := b.getSymbolFiles() + + it, err := newSymbolsIterator(symbols) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, it.Close())