// Copyright 2017 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tsdb import ( "context" "crypto/rand" "fmt" "io" "os" "path/filepath" "sort" "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "golang.org/x/sync/semaphore" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" ) // ExponentialBlockRanges returns the time ranges based on the stepSize. func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 { ranges := make([]int64, 0, steps) curRange := minSize for i := 0; i < steps; i++ { ranges = append(ranges, curRange) curRange = curRange * int64(stepSize) } return ranges } // Compactor provides compaction against an underlying storage // of time series data. type Compactor interface { // Plan returns a set of directories that can be compacted concurrently. // The directories can be overlapping. // Results returned when compactions are in progress are undefined. Plan(dir string) ([]string, error) // Write persists a Block into a directory. // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). // Can optionally pass a list of already open blocks, // to avoid having to reopen them. // When resulting Block has 0 samples // * No block is written. // * The source dirs are marked Deletable. // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) } // LeveledCompactor implements the Compactor interface. type LeveledCompactor struct { metrics *compactorMetrics logger log.Logger ranges []int64 chunkPool chunkenc.Pool ctx context.Context maxBlockChunkSegmentSize int64 mergeFunc storage.VerticalChunkSeriesMergeFunc concurrencyOpts LeveledCompactorConcurrencyOptions } type compactorMetrics struct { ran prometheus.Counter populatingBlocks prometheus.Gauge overlappingBlocks prometheus.Counter duration prometheus.Histogram chunkSize prometheus.Histogram chunkSamples prometheus.Histogram chunkRange prometheus.Histogram } func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { m := &compactorMetrics{} m.ran = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_total", Help: "Total number of compactions that were executed for the partition.", }) m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "prometheus_tsdb_compaction_populating_block", Help: "Set to 1 when a block is currently being written to the disk.", }) m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_vertical_compactions_total", Help: "Total number of compactions done on overlapping blocks.", }) m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "prometheus_tsdb_compaction_duration_seconds", Help: "Duration of compaction runs", Buckets: prometheus.ExponentialBuckets(1, 2, 14), }) m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "prometheus_tsdb_compaction_chunk_size_bytes", Help: "Final size of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(32, 1.5, 12), }) m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "prometheus_tsdb_compaction_chunk_samples", Help: "Final number of samples on their first compaction", Buckets: prometheus.ExponentialBuckets(4, 1.5, 12), }) m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "prometheus_tsdb_compaction_chunk_range_seconds", Help: "Final time range of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(100, 4, 10), }) if r != nil { r.MustRegister( m.ran, m.populatingBlocks, m.overlappingBlocks, m.duration, m.chunkRange, m.chunkSamples, m.chunkSize, ) } return m } // NewLeveledCompactor returns a LeveledCompactor. func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc) } func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, errors.Errorf("at least one range must be provided") } if pool == nil { pool = chunkenc.NewPool() } if l == nil { l = log.NewNopLogger() } if mergeFunc == nil { mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) } return &LeveledCompactor{ ranges: ranges, chunkPool: pool, logger: l, metrics: newCompactorMetrics(r), ctx: ctx, maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, mergeFunc: mergeFunc, concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(), }, nil } // LeveledCompactorConcurrencyOptions is a collection of concurrency options used by LeveledCompactor. type LeveledCompactorConcurrencyOptions struct { MaxOpeningBlocks int // Number of goroutines opening blocks before compaction. MaxClosingBlocks int // Max number of blocks that can be closed concurrently during split compaction. Note that closing of newly compacted block uses a lot of memory for writing index. SymbolsFlushersCount int // Number of symbols flushers used when doing split compaction. } func DefaultLeveledCompactorConcurrencyOptions() LeveledCompactorConcurrencyOptions { return LeveledCompactorConcurrencyOptions{ MaxClosingBlocks: 1, SymbolsFlushersCount: 1, MaxOpeningBlocks: 1, } } func (c *LeveledCompactor) SetConcurrencyOptions(opts LeveledCompactorConcurrencyOptions) { c.concurrencyOpts = opts } type dirMeta struct { dir string meta *BlockMeta } // Plan returns a list of compactable blocks in the provided directory. func (c *LeveledCompactor) Plan(dir string) ([]string, error) { dirs, err := blockDirs(dir) if err != nil { return nil, err } if len(dirs) < 1 { return nil, nil } var dms []dirMeta for _, dir := range dirs { meta, _, err := readMetaFile(dir) if err != nil { return nil, err } dms = append(dms, dirMeta{dir, meta}) } return c.plan(dms) } func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { sort.Slice(dms, func(i, j int) bool { return dms[i].meta.MinTime < dms[j].meta.MinTime }) res := c.selectOverlappingDirs(dms) if len(res) > 0 { return res, nil } // No overlapping blocks, do compaction the usual way. // We do not include a recently created block with max(minTime), so the block which was just created from WAL. // This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap. dms = dms[:len(dms)-1] for _, dm := range c.selectDirs(dms) { res = append(res, dm.dir) } if len(res) > 0 { return res, nil } // Compact any blocks with big enough time range that have >5% tombstones. for i := len(dms) - 1; i >= 0; i-- { meta := dms[i].meta if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { // If the block is entirely deleted, then we don't care about the block being big enough. // TODO: This is assuming single tombstone is for distinct series, which might be no true. if meta.Stats.NumTombstones > 0 && meta.Stats.NumTombstones >= meta.Stats.NumSeries { return []string{dms[i].dir}, nil } break } if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { return []string{dms[i].dir}, nil } } return nil, nil } // selectDirs returns the dir metas that should be compacted into a single new block. // If only a single block range is configured, the result is always nil. func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { if len(c.ranges) < 2 || len(ds) < 1 { return nil } highTime := ds[len(ds)-1].meta.MinTime for _, iv := range c.ranges[1:] { parts := splitByRange(ds, iv) if len(parts) == 0 { continue } Outer: for _, p := range parts { // Do not select the range if it has a block whose compaction failed. for _, dm := range p { if dm.meta.Compaction.Failed { continue Outer } } mint := p[0].meta.MinTime maxt := p[len(p)-1].meta.MaxTime // Pick the range of blocks if it spans the full range (potentially with gaps) // or is before the most recent block. // This ensures we don't compact blocks prematurely when another one of the same // size still fits in the range. if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 { return p } } } return nil } // selectOverlappingDirs returns all dirs with overlapping time ranges. // It expects sorted input by mint and returns the overlapping dirs in the same order as received. func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string { if len(ds) < 2 { return nil } var overlappingDirs []string globalMaxt := ds[0].meta.MaxTime for i, d := range ds[1:] { if d.meta.MinTime < globalMaxt { if len(overlappingDirs) == 0 { // When it is the first overlap, need to add the last one as well. overlappingDirs = append(overlappingDirs, ds[i].dir) } overlappingDirs = append(overlappingDirs, d.dir) } else if len(overlappingDirs) > 0 { break } if d.meta.MaxTime > globalMaxt { globalMaxt = d.meta.MaxTime } } return overlappingDirs } // splitByRange splits the directories by the time range. The range sequence starts at 0. // // For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 // it returns [0-10, 10-20], [50-60], [90-100]. func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { var splitDirs [][]dirMeta for i := 0; i < len(ds); { var ( group []dirMeta t0 int64 m = ds[i].meta ) // Compute start of aligned time range of size tr closest to the current block's start. if m.MinTime >= 0 { t0 = tr * (m.MinTime / tr) } else { t0 = tr * ((m.MinTime - tr + 1) / tr) } // Skip blocks that don't fall into the range. This can happen via mis-alignment or // by being the multiple of the intended range. if m.MaxTime > t0+tr { i++ continue } // Add all dirs to the current group that are within [t0, t0+tr]. for ; i < len(ds); i++ { // Either the block falls into the next range or doesn't fit at all (checked above). if ds[i].meta.MaxTime > t0+tr { break } group = append(group, ds[i]) } if len(group) > 0 { splitDirs = append(splitDirs, group) } } return splitDirs } // CompactBlockMetas merges many block metas into one, combining it's source blocks together // and adjusting compaction level. Min/Max time of result block meta covers all input blocks. func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { res := &BlockMeta{ ULID: uid, } sources := map[ulid.ULID]struct{}{} mint := blocks[0].MinTime maxt := blocks[0].MaxTime for _, b := range blocks { if b.MinTime < mint { mint = b.MinTime } if b.MaxTime > maxt { maxt = b.MaxTime } if b.Compaction.Level > res.Compaction.Level { res.Compaction.Level = b.Compaction.Level } for _, s := range b.Compaction.Sources { sources[s] = struct{}{} } res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{ ULID: b.ULID, MinTime: b.MinTime, MaxTime: b.MaxTime, }) } res.Compaction.Level++ for s := range sources { res.Compaction.Sources = append(res.Compaction.Sources, s) } sort.Slice(res.Compaction.Sources, func(i, j int) bool { return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0 }) res.MinTime = mint res.MaxTime = maxt return res } // CompactWithSplitting merges and splits the input blocks into shardCount number of output blocks, // and returns slice of block IDs. Position of returned block ID in the result slice corresponds to the shard index. // If given output block has no series, corresponding block ID will be zero ULID value. func (c *LeveledCompactor) CompactWithSplitting(dest string, dirs []string, open []*Block, shardCount uint64) (result []ulid.ULID, _ error) { return c.compact(dest, dirs, open, shardCount) } // Compact creates a new block in the compactor's directory from the blocks in the // provided directories. func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { ulids, err := c.compact(dest, dirs, open, 1) if err != nil { return ulid.ULID{}, err } return ulids[0], nil } // shardedBlock describes single *output* block during compaction. This struct is passed between // compaction methods to wrap output block details, index and chunk writer together. // Shard index is determined by the position of this structure in the slice of output blocks. type shardedBlock struct { meta *BlockMeta blockDir string tmpDir string // Temp directory used when block is being built (= blockDir + temp suffix) chunkw ChunkWriter indexw IndexWriter } func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, shardCount uint64) (_ []ulid.ULID, err error) { if shardCount == 0 { shardCount = 1 } start := time.Now() bs, blocksToClose, err := openBlocksForCompaction(dirs, open, c.logger, c.chunkPool, c.concurrencyOpts.MaxOpeningBlocks) for _, b := range blocksToClose { defer b.Close() } if err != nil { return nil, err } var ( blocks []BlockReader metas []*BlockMeta uids []string ) for _, b := range bs { blocks = append(blocks, b) m := b.Meta() metas = append(metas, &m) uids = append(uids, b.meta.ULID.String()) } outBlocks := make([]shardedBlock, shardCount) outBlocksTime := ulid.Now() // Make all out blocks share the same timestamp in the ULID. for ix := range outBlocks { outBlocks[ix] = shardedBlock{meta: CompactBlockMetas(ulid.MustNew(outBlocksTime, rand.Reader), metas...)} } err = c.write(dest, outBlocks, blocks...) if err == nil { ulids := make([]ulid.ULID, len(outBlocks)) allOutputBlocksAreEmpty := true for ix := range outBlocks { meta := outBlocks[ix].meta if meta.Stats.NumSamples == 0 { level.Info(c.logger).Log( "msg", "compact blocks resulted in empty block", "count", len(blocks), "sources", fmt.Sprintf("%v", uids), "duration", time.Since(start), "shard", fmt.Sprintf("%d_of_%d", ix+1, shardCount), ) } else { allOutputBlocksAreEmpty = false ulids[ix] = outBlocks[ix].meta.ULID level.Info(c.logger).Log( "msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID, "sources", fmt.Sprintf("%v", uids), "duration", time.Since(start), "shard", fmt.Sprintf("%d_of_%d", ix+1, shardCount), ) } } if allOutputBlocksAreEmpty { // Mark source blocks as deletable. for _, b := range bs { b.meta.Compaction.Deletable = true n, err := writeMetaFile(c.logger, b.dir, &b.meta) if err != nil { level.Error(c.logger).Log( "msg", "Failed to write 'Deletable' to meta file after compaction", "ulid", b.meta.ULID, ) } b.numBytesMeta = n } } return ulids, nil } errs := tsdb_errors.NewMulti(err) if err != context.Canceled { for _, b := range bs { if err := b.setCompactionFailed(); err != nil { errs.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) } } } return nil, errs.Err() } func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { start := time.Now() uid := ulid.MustNew(ulid.Now(), rand.Reader) meta := &BlockMeta{ ULID: uid, MinTime: mint, MaxTime: maxt, } meta.Compaction.Level = 1 meta.Compaction.Sources = []ulid.ULID{uid} if parent != nil { meta.Compaction.Parents = []BlockDesc{ {ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime}, } } err := c.write(dest, []shardedBlock{{meta: meta}}, b) if err != nil { return uid, err } if meta.Stats.NumSamples == 0 { level.Info(c.logger).Log( "msg", "write block resulted in empty block", "mint", meta.MinTime, "maxt", meta.MaxTime, "duration", time.Since(start), ) return ulid.ULID{}, nil } level.Info(c.logger).Log( "msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID, "duration", time.Since(start), ) return uid, nil } // instrumentedChunkWriter is used for level 1 compactions to record statistics // about compacted chunks. type instrumentedChunkWriter struct { ChunkWriter size prometheus.Histogram samples prometheus.Histogram trange prometheus.Histogram } func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { for _, c := range chunks { w.size.Observe(float64(len(c.Chunk.Bytes()))) w.samples.Observe(float64(c.Chunk.NumSamples())) w.trange.Observe(float64(c.MaxTime - c.MinTime)) } return w.ChunkWriter.WriteChunks(chunks...) } // write creates new output blocks that are the union of the provided blocks into dir. func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks ...BlockReader) (err error) { var closers []io.Closer defer func(t time.Time) { err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err() for _, ob := range outBlocks { if ob.tmpDir != "" { // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. if removeErr := os.RemoveAll(ob.tmpDir); removeErr != nil { level.Error(c.logger).Log("msg", "Failed to remove temp folder after failed compaction", "dir", ob.tmpDir, "err", removeErr.Error()) } } // If there was any error, and we have multiple output blocks, some blocks may have been generated, or at // least have existing blockDir. In such case, we want to remove them. // BlockDir may also not be set yet, if preparation for some previous blocks have failed. if err != nil && ob.blockDir != "" { // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. if removeErr := os.RemoveAll(ob.blockDir); removeErr != nil { level.Error(c.logger).Log("msg", "Failed to remove block folder after failed compaction", "dir", ob.blockDir, "err", removeErr.Error()) } } } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) for ix := range outBlocks { dir := filepath.Join(dest, outBlocks[ix].meta.ULID.String()) tmp := dir + tmpForCreationBlockDirSuffix outBlocks[ix].blockDir = dir outBlocks[ix].tmpDir = tmp if err = os.RemoveAll(tmp); err != nil { return err } if err = os.MkdirAll(tmp, 0o777); err != nil { return err } // Populate chunk and index files into temporary directory with // data of all blocks. var chunkw ChunkWriter chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize) if err != nil { return errors.Wrap(err, "open chunk writer") } chunkw = newPreventDoubleCloseChunkWriter(chunkw) // We now close chunkWriter in populateBlock, but keep it in the closers here as well. closers = append(closers, chunkw) // Record written chunk sizes on level 1 compactions. if outBlocks[ix].meta.Compaction.Level == 1 { chunkw = &instrumentedChunkWriter{ ChunkWriter: chunkw, size: c.metrics.chunkSize, samples: c.metrics.chunkSamples, trange: c.metrics.chunkRange, } } outBlocks[ix].chunkw = chunkw var indexw IndexWriter indexw, err = index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) if err != nil { return errors.Wrap(err, "open index writer") } indexw = newPreventDoubleCloseIndexWriter(indexw) // We now close indexWriter in populateBlock, but keep it in the closers here as well. closers = append(closers, indexw) outBlocks[ix].indexw = indexw } // We use MinTime and MaxTime from first output block, because ALL output blocks have the same min/max times set. if err := c.populateBlock(blocks, outBlocks[0].meta.MinTime, outBlocks[0].meta.MaxTime, outBlocks); err != nil { return errors.Wrap(err, "populate block") } select { case <-c.ctx.Done(): return c.ctx.Err() default: } // We are explicitly closing them here to check for error even // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to // make sure they are closed if the function exits due to an error above. errs := tsdb_errors.NewMulti() for _, w := range closers { errs.Add(w.Close()) } closers = closers[:0] // Avoid closing the writers twice in the defer. if errs.Err() != nil { return errs.Err() } for _, ob := range outBlocks { // Populated block is empty, don't write meta file for it. if ob.meta.Stats.NumSamples == 0 { continue } if _, err = writeMetaFile(c.logger, ob.tmpDir, ob.meta); err != nil { return errors.Wrap(err, "write merged meta") } // Create an empty tombstones file. if _, err := tombstones.WriteFile(c.logger, ob.tmpDir, tombstones.NewMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } df, err := fileutil.OpenDir(ob.tmpDir) if err != nil { return errors.Wrap(err, "open temporary block dir") } defer func() { if df != nil { df.Close() } }() if err := df.Sync(); err != nil { return errors.Wrap(err, "sync temporary dir file") } // Close temp dir before rename block dir (for windows platform). if err = df.Close(); err != nil { return errors.Wrap(err, "close temporary dir") } df = nil // Block successfully written, make it visible in destination dir by moving it from tmp one. if err := fileutil.Replace(ob.tmpDir, ob.blockDir); err != nil { return errors.Wrap(err, "rename block dir") } } return nil } func debugOutOfOrderChunks(chks []chunks.Meta, logger log.Logger) { if len(chks) <= 1 { return } prevChk := chks[0] for i := 1; i < len(chks); i++ { currChk := chks[i] if currChk.MinTime > prevChk.MaxTime { // Not out of order. continue } // Looks like the chunk is out of order. prevSafeChk, prevIsSafeChk := prevChk.Chunk.(*safeChunk) currSafeChk, currIsSafeChk := currChk.Chunk.(*safeChunk) // Get info out of safeChunk (if possible). prevHeadChunkID := chunks.HeadChunkID(0) currHeadChunkID := chunks.HeadChunkID(0) prevLabels := labels.Labels{} currLabels := labels.Labels{} if prevSafeChk != nil { prevHeadChunkID = prevSafeChk.cid prevLabels = prevSafeChk.s.lset } if currSafeChk != nil { currHeadChunkID = currSafeChk.cid currLabels = currSafeChk.s.lset } level.Warn(logger).Log( "msg", "found out-of-order chunk when compacting", "prev_ref", prevChk.Ref, "curr_ref", currChk.Ref, "prev_min_time", timeFromMillis(prevChk.MinTime).UTC().String(), "prev_max_time", timeFromMillis(prevChk.MaxTime).UTC().String(), "curr_min_time", timeFromMillis(currChk.MinTime).UTC().String(), "curr_max_time", timeFromMillis(currChk.MaxTime).UTC().String(), "prev_samples", prevChk.Chunk.NumSamples(), "curr_samples", currChk.Chunk.NumSamples(), "prev_is_safe_chunk", prevIsSafeChk, "curr_is_safe_chunk", currIsSafeChk, "prev_head_chunk_id", prevHeadChunkID, "curr_head_chunk_id", currHeadChunkID, "prev_labelset", prevLabels.String(), "curr_labelset", currLabels.String(), "num_chunks_for_series", len(chks), ) } } func timeFromMillis(ms int64) time.Time { return time.Unix(0, ms*int64(time.Millisecond)) } // populateBlock fills the index and chunk writers of output blocks with new data gathered as the union // of the provided blocks. // It expects sorted blocks input by mint. // If there is more than 1 output block, each output block will only contain series that hash into its shard // (based on total number of output blocks). func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) { if len(blocks) == 0 { return errors.New("cannot populate block(s) from no readers") } var ( sets []storage.ChunkSeriesSet symbolsSets []storage.ChunkSeriesSet // series sets used for finding symbols. Only used when doing sharding. symbols index.StringIter closers []io.Closer overlapping bool ) defer func() { errs := tsdb_errors.NewMulti(err) if cerr := tsdb_errors.CloseAll(closers); cerr != nil { errs.Add(errors.Wrap(cerr, "close")) } err = errs.Err() c.metrics.populatingBlocks.Set(0) }() c.metrics.populatingBlocks.Set(1) globalMaxt := blocks[0].Meta().MaxTime for i, b := range blocks { select { case <-c.ctx.Done(): return c.ctx.Err() default: } if !overlapping { if i > 0 && b.Meta().MinTime < globalMaxt { c.metrics.overlappingBlocks.Inc() overlapping = true level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction") } if b.Meta().MaxTime > globalMaxt { globalMaxt = b.Meta().MaxTime } } indexr, err := b.Index() if err != nil { return errors.Wrapf(err, "open index reader for block %+v", b.Meta()) } closers = append(closers, indexr) chunkr, err := b.Chunks() if err != nil { return errors.Wrapf(err, "open chunk reader for block %+v", b.Meta()) } closers = append(closers, chunkr) tombsr, err := b.Tombstones() if err != nil { return errors.Wrapf(err, "open tombstone reader for block %+v", b.Meta()) } closers = append(closers, tombsr) k, v := index.AllPostingsKey() all, err := indexr.Postings(k, v) if err != nil { return err } all = indexr.SortedPostings(all) // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1, false)) if len(outBlocks) > 1 { // To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy. // Postings can only be iterated once. k, v = index.AllPostingsKey() all, err = indexr.Postings(k, v) if err != nil { return err } all = indexr.SortedPostings(all) // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. symbolsSets = append(symbolsSets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, minT, maxT-1, false)) } else { syms := indexr.Symbols() if i == 0 { symbols = syms continue } symbols = NewMergedStringIter(symbols, syms) } } if len(outBlocks) == 1 { for symbols.Next() { if err := outBlocks[0].indexw.AddSymbol(symbols.At()); err != nil { return errors.Wrap(err, "add symbol") } } if symbols.Err() != nil { return errors.Wrap(symbols.Err(), "next symbol") } } else { if err := c.populateSymbols(symbolsSets, outBlocks); err != nil { return err } } // Semaphore for number of blocks that can be closed at once. sema := semaphore.NewWeighted(int64(c.concurrencyOpts.MaxClosingBlocks)) blockWriters := make([]*asyncBlockWriter, len(outBlocks)) for ix := range outBlocks { blockWriters[ix] = newAsyncBlockWriter(c.chunkPool, outBlocks[ix].chunkw, outBlocks[ix].indexw, sema) defer blockWriters[ix].closeAsync() // Make sure to close writer to stop goroutine. } set := sets[0] if len(sets) > 1 { // Merge series using specified chunk series merger. // The default one is the compacting series merger. set = storage.NewMergeChunkSeriesSet(sets, c.mergeFunc) } // Iterate over all sorted chunk series. for set.Next() { select { case <-c.ctx.Done(): return c.ctx.Err() default: } s := set.At() chksIter := s.Iterator() var chks []chunks.Meta for chksIter.Next() { // We are not iterating in streaming way over chunk as it's more efficient to do bulk write for index and // chunk file purposes. chks = append(chks, chksIter.At()) } if chksIter.Err() != nil { return errors.Wrap(chksIter.Err(), "chunk iter") } // Skip the series with all deleted chunks. if len(chks) == 0 { continue } debugOutOfOrderChunks(chks, c.logger) obIx := uint64(0) if len(outBlocks) > 1 { obIx = s.Labels().Hash() % uint64(len(outBlocks)) } err := blockWriters[obIx].addSeries(s.Labels(), chks) if err != nil { return errors.Wrap(err, "adding series") } } if set.Err() != nil { return errors.Wrap(set.Err(), "iterate compaction set") } for ix := range blockWriters { blockWriters[ix].closeAsync() } for ix := range blockWriters { stats, err := blockWriters[ix].waitFinished() if err != nil { return errors.Wrap(err, "writing block") } outBlocks[ix].meta.Stats = stats } return nil } // How many symbols we buffer in memory per output block. const inMemorySymbolsLimit = 1_000_000 // populateSymbols writes symbols to output blocks. We need to iterate through all series to find // which series belongs to what block. We collect symbols per sharded block, and then add sorted symbols to // block's index. func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlocks []shardedBlock) error { if len(outBlocks) == 0 { return errors.New("no output block") } flushers := newSymbolFlushers(c.concurrencyOpts.SymbolsFlushersCount) defer flushers.close() // Make sure to stop flushers before exiting to avoid leaking goroutines. batchers := make([]*symbolsBatcher, len(outBlocks)) for ix := range outBlocks { batchers[ix] = newSymbolsBatcher(inMemorySymbolsLimit, outBlocks[ix].tmpDir, flushers) // Always include empty symbol. Blocks created from Head always have it in the symbols table, // and if we only include symbols from series, we would skip it. // It may not be required, but it's small and better be safe than sorry. if err := batchers[ix].addSymbol(""); err != nil { return errors.Wrap(err, "addSymbol to batcher") } } seriesSet := sets[0] if len(sets) > 1 { seriesSet = storage.NewMergeChunkSeriesSet(sets, c.mergeFunc) } for seriesSet.Next() { if err := c.ctx.Err(); err != nil { return err } s := seriesSet.At() obIx := s.Labels().Hash() % uint64(len(outBlocks)) for _, l := range s.Labels() { if err := batchers[obIx].addSymbol(l.Name); err != nil { return errors.Wrap(err, "addSymbol to batcher") } if err := batchers[obIx].addSymbol(l.Value); err != nil { return errors.Wrap(err, "addSymbol to batcher") } } } for ix := range outBlocks { // Flush the batcher to write remaining symbols. if err := batchers[ix].flushSymbols(true); err != nil { return errors.Wrap(err, "flushing batcher") } } err := flushers.close() if err != nil { return errors.Wrap(err, "closing flushers") } for ix := range outBlocks { if err := c.ctx.Err(); err != nil { return err } symbolFiles := batchers[ix].getSymbolFiles() it, err := newSymbolsIterator(symbolFiles) if err != nil { return errors.Wrap(err, "opening symbols iterator") } // Each symbols iterator must be closed to close underlying files. closeIt := it defer func() { if closeIt != nil { _ = closeIt.Close() } }() var sym string for sym, err = it.NextSymbol(); err == nil; sym, err = it.NextSymbol() { err = outBlocks[ix].indexw.AddSymbol(sym) if err != nil { return errors.Wrap(err, "AddSymbol") } } if err != io.EOF { return errors.Wrap(err, "iterating symbols") } // if err == io.EOF, we have iterated through all symbols. We can close underlying // files now. closeIt = nil _ = it.Close() // Delete symbol files from symbolsBatcher. We don't need to perform the cleanup if populateSymbols // or compaction fails, because in that case compactor already removes entire (temp) output block directory. for _, fn := range symbolFiles { if err := os.Remove(fn); err != nil { return errors.Wrap(err, "deleting symbols file") } } } return nil } // Returns opened blocks, and blocks that should be closed (also returned in case of error). func openBlocksForCompaction(dirs []string, open []*Block, logger log.Logger, pool chunkenc.Pool, concurrency int) (blocks, blocksToClose []*Block, _ error) { blocks = make([]*Block, 0, len(dirs)) blocksToClose = make([]*Block, 0, len(dirs)) toOpenCh := make(chan string, len(dirs)) for _, d := range dirs { meta, _, err := readMetaFile(d) if err != nil { return nil, blocksToClose, err } var b *Block // Use already open blocks if we can, to avoid // having the index data in memory twice. for _, o := range open { if meta.ULID == o.Meta().ULID { b = o break } } if b != nil { blocks = append(blocks, b) } else { toOpenCh <- d } } close(toOpenCh) type openResult struct { b *Block err error } openResultCh := make(chan openResult, len(toOpenCh)) // Signals to all opening goroutines that there was an error opening some block, and they can stop early. // If openingError is true, at least one error is sent to openResultCh. openingError := atomic.NewBool(false) wg := sync.WaitGroup{} if len(dirs) < concurrency { concurrency = len(dirs) } for i := 0; i < concurrency; i++ { wg.Add(1) go func() { defer wg.Done() for d := range toOpenCh { if openingError.Load() { return } b, err := OpenBlock(logger, d, pool) openResultCh <- openResult{b: b, err: err} if err != nil { openingError.Store(true) return } } }() } wg.Wait() // All writers to openResultCh have stopped, we can close the output channel, so we can range over it. close(openResultCh) var firstErr error for or := range openResultCh { if or.err != nil { // Don't stop on error, but iterate over all opened blocks to collect blocksToClose. if firstErr == nil { firstErr = or.err } } else { blocks = append(blocks, or.b) blocksToClose = append(blocksToClose, or.b) } } return blocks, blocksToClose, firstErr }