prometheus/tsdb/compact.go

1345 lines
41 KiB
Go
Raw Normal View History

2017-04-10 11:59:45 -07:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"crypto/rand"
"fmt"
"io"
"os"
2017-01-02 05:41:13 -08:00
"path/filepath"
2017-05-18 08:30:52 -07:00
"sort"
"sync"
2017-01-03 06:43:26 -08:00
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
2017-02-27 01:46:15 -08:00
"github.com/oklog/ulid"
2017-01-03 06:43:26 -08:00
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
)
// ExponentialBlockRanges returns the time ranges based on the stepSize.
func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
ranges := make([]int64, 0, steps)
curRange := minSize
for i := 0; i < steps; i++ {
ranges = append(ranges, curRange)
curRange *= int64(stepSize)
}
return ranges
}
// Compactor provides compaction against an underlying storage
// of time series data.
2017-03-02 00:13:29 -08:00
type Compactor interface {
// Plan returns a set of directories that can be compacted concurrently.
// The directories can be overlapping.
// Results returned when compactions are in progress are undefined.
2017-08-09 02:10:29 -07:00
Plan(dir string) ([]string, error)
2017-03-02 00:13:29 -08:00
// Write persists a Block into a directory.
// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}.
Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)
2017-03-02 00:13:29 -08:00
// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
// Can optionally pass a list of already open blocks,
// to avoid having to reopen them.
// When resulting Block has 0 samples
// * No block is written.
// * The source dirs are marked Deletable.
// * Returns empty ulid.ULID{}.
Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
// CompactOOO creates a new block per possible block range in the compactor's directory from the OOO Head given.
// Each ULID in the result corresponds to a block in a unique time range.
CompactOOO(dest string, oooHead *OOOCompactionHead) (result []ulid.ULID, err error)
2017-03-02 00:13:29 -08:00
}
2017-08-09 02:10:29 -07:00
// LeveledCompactor implements the Compactor interface.
type LeveledCompactor struct {
metrics *CompactorMetrics
logger log.Logger
ranges []int64
chunkPool chunkenc.Pool
ctx context.Context
maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
enableOverlappingCompaction bool
concurrencyOpts LeveledCompactorConcurrencyOptions
}
type CompactorMetrics struct {
Ran prometheus.Counter
PopulatingBlocks prometheus.Gauge
OverlappingBlocks prometheus.Counter
Duration prometheus.Histogram
ChunkSize prometheus.Histogram
ChunkSamples prometheus.Histogram
ChunkRange prometheus.Histogram
2017-01-03 06:43:26 -08:00
}
func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics {
m := &CompactorMetrics{}
2017-01-03 06:43:26 -08:00
m.Ran = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_total",
Help: "Total number of compactions that were executed for the partition.",
2017-01-03 06:43:26 -08:00
})
m.PopulatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_compaction_populating_block",
Help: "Set to 1 when a block is currently being written to the disk.",
})
m.OverlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
Name: "prometheus_tsdb_vertical_compactions_total",
Help: "Total number of compactions done on overlapping blocks.",
})
m.Duration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_duration_seconds",
Help: "Duration of compaction runs",
Buckets: prometheus.ExponentialBuckets(1, 2, 14),
})
m.ChunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_chunk_size_bytes",
Help: "Final size of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(32, 1.5, 12),
})
m.ChunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_chunk_samples",
Help: "Final number of samples on their first compaction",
Buckets: prometheus.ExponentialBuckets(4, 1.5, 12),
})
m.ChunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_chunk_range_seconds",
Help: "Final time range of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(100, 4, 10),
2017-01-03 06:43:26 -08:00
})
if r != nil {
r.MustRegister(
m.Ran,
m.PopulatingBlocks,
m.OverlappingBlocks,
m.Duration,
m.ChunkRange,
m.ChunkSamples,
m.ChunkSize,
)
}
2017-01-03 06:43:26 -08:00
return m
}
// NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) {
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction)
}
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) {
2017-09-01 02:46:46 -07:00
if len(ranges) == 0 {
return nil, errors.Errorf("at least one range must be provided")
2017-08-08 08:35:34 -07:00
}
2017-09-01 02:46:46 -07:00
if pool == nil {
pool = chunkenc.NewPool()
}
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
if l == nil {
l = log.NewNopLogger()
}
if mergeFunc == nil {
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)
}
2017-09-01 02:46:46 -07:00
return &LeveledCompactor{
ranges: ranges,
chunkPool: pool,
logger: l,
metrics: newCompactorMetrics(r),
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(),
enableOverlappingCompaction: enableOverlappingCompaction,
2017-09-01 02:46:46 -07:00
}, nil
}
// LeveledCompactorConcurrencyOptions is a collection of concurrency options used by LeveledCompactor.
type LeveledCompactorConcurrencyOptions struct {
MaxOpeningBlocks int // Number of goroutines opening blocks before compaction.
MaxClosingBlocks int // Max number of blocks that can be closed concurrently during split compaction. Note that closing of newly compacted block uses a lot of memory for writing index.
SymbolsFlushersCount int // Number of symbols flushers used when doing split compaction.
}
func DefaultLeveledCompactorConcurrencyOptions() LeveledCompactorConcurrencyOptions {
return LeveledCompactorConcurrencyOptions{
MaxClosingBlocks: 1,
SymbolsFlushersCount: 1,
MaxOpeningBlocks: 1,
}
}
func (c *LeveledCompactor) SetConcurrencyOptions(opts LeveledCompactorConcurrencyOptions) {
c.concurrencyOpts = opts
}
type dirMeta struct {
dir string
meta *BlockMeta
}
2017-08-09 02:10:29 -07:00
// Plan returns a list of compactable blocks in the provided directory.
func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
dirs, err := blockDirs(dir)
2017-03-02 00:13:29 -08:00
if err != nil {
return nil, err
2017-01-03 06:43:26 -08:00
}
2018-02-28 03:04:55 -08:00
if len(dirs) < 1 {
return nil, nil
}
2017-01-17 21:18:32 -08:00
2017-05-18 08:30:52 -07:00
var dms []dirMeta
2017-03-02 00:13:29 -08:00
for _, dir := range dirs {
meta, _, err := readMetaFile(dir)
2017-03-02 00:13:29 -08:00
if err != nil {
return nil, err
}
2017-09-01 02:46:46 -07:00
dms = append(dms, dirMeta{dir, meta})
2017-03-02 00:13:29 -08:00
}
return c.plan(dms)
}
func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
2017-09-01 02:46:46 -07:00
sort.Slice(dms, func(i, j int) bool {
return dms[i].meta.MinTime < dms[j].meta.MinTime
})
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
res := c.selectOverlappingDirs(dms)
if len(res) > 0 {
return res, nil
}
// No overlapping blocks or overlapping block compaction not allowed, do compaction the usual way.
// We do not include a recently created block with max(minTime), so the block which was just created from WAL.
// This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap.
dms = dms[:len(dms)-1]
2017-08-09 02:10:29 -07:00
for _, dm := range c.selectDirs(dms) {
res = append(res, dm.dir)
}
2017-08-09 02:10:29 -07:00
if len(res) > 0 {
return res, nil
}
// Compact any blocks with big enough time range that have >5% tombstones.
for i := len(dms) - 1; i >= 0; i-- {
meta := dms[i].meta
2017-09-01 02:46:46 -07:00
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
// If the block is entirely deleted, then we don't care about the block being big enough.
// TODO: This is assuming single tombstone is for distinct series, which might be no true.
if meta.Stats.NumTombstones > 0 && meta.Stats.NumTombstones >= meta.Stats.NumSeries {
return []string{dms[i].dir}, nil
}
break
}
2017-09-01 02:46:46 -07:00
if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
2017-08-09 02:10:29 -07:00
return []string{dms[i].dir}, nil
}
}
return nil, nil
}
2017-08-03 09:33:13 -07:00
// selectDirs returns the dir metas that should be compacted into a single new block.
// If only a single block range is configured, the result is always nil.
2017-08-09 02:10:29 -07:00
func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
2017-09-01 02:46:46 -07:00
if len(c.ranges) < 2 || len(ds) < 1 {
return nil
}
2017-08-03 09:33:13 -07:00
highTime := ds[len(ds)-1].meta.MinTime
2017-09-01 02:46:46 -07:00
for _, iv := range c.ranges[1:] {
2017-08-03 09:33:13 -07:00
parts := splitByRange(ds, iv)
if len(parts) == 0 {
continue
2017-01-03 06:43:26 -08:00
}
Outer:
2017-08-03 09:33:13 -07:00
for _, p := range parts {
// Do not select the range if it has a block whose compaction failed.
for _, dm := range p {
if dm.meta.Compaction.Failed {
continue Outer
}
}
2017-08-03 09:33:13 -07:00
mint := p[0].meta.MinTime
maxt := p[len(p)-1].meta.MaxTime
// Pick the range of blocks if it spans the full range (potentially with gaps)
// or is before the most recent block.
// This ensures we don't compact blocks prematurely when another one of the same
// size still fits in the range.
if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 {
return p
}
}
}
2017-08-03 09:33:13 -07:00
return nil
}
// selectOverlappingDirs returns all dirs with overlapping time ranges.
// It expects sorted input by mint and returns the overlapping dirs in the same order as received.
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
if !c.enableOverlappingCompaction {
return nil
}
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
if len(ds) < 2 {
return nil
}
var overlappingDirs []string
globalMaxt := ds[0].meta.MaxTime
for i, d := range ds[1:] {
if d.meta.MinTime < globalMaxt {
if len(overlappingDirs) == 0 { // When it is the first overlap, need to add the last one as well.
overlappingDirs = append(overlappingDirs, ds[i].dir)
}
overlappingDirs = append(overlappingDirs, d.dir)
} else if len(overlappingDirs) > 0 {
break
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
}
if d.meta.MaxTime > globalMaxt {
globalMaxt = d.meta.MaxTime
}
}
return overlappingDirs
}
2017-08-03 09:33:13 -07:00
// splitByRange splits the directories by the time range. The range sequence starts at 0.
//
// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30
// it returns [0-10, 10-20], [50-60], [90-100].
func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
var splitDirs [][]dirMeta
for i := 0; i < len(ds); {
2017-08-03 09:33:13 -07:00
var (
group []dirMeta
t0 int64
m = ds[i].meta
)
// Compute start of aligned time range of size tr closest to the current block's start.
2017-07-13 07:13:59 -07:00
if m.MinTime >= 0 {
t0 = tr * (m.MinTime / tr)
} else {
t0 = tr * ((m.MinTime - tr + 1) / tr)
}
// Skip blocks that don't fall into the range. This can happen via mis-alignment or
// by being the multiple of the intended range.
if m.MaxTime > t0+tr {
2017-07-13 07:13:59 -07:00
i++
continue
}
2017-01-19 10:45:52 -08:00
// Add all dirs to the current group that are within [t0, t0+tr].
for ; i < len(ds); i++ {
2017-07-13 07:13:59 -07:00
// Either the block falls into the next range or doesn't fit at all (checked above).
if ds[i].meta.MaxTime > t0+tr {
break
}
group = append(group, ds[i])
2017-01-19 10:45:52 -08:00
}
if len(group) > 0 {
splitDirs = append(splitDirs, group)
}
}
return splitDirs
2017-01-03 01:09:20 -08:00
}
// CompactBlockMetas merges many block metas into one, combining it's source blocks together
// and adjusting compaction level. Min/Max time of result block meta covers all input blocks.
func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
res := &BlockMeta{
ULID: uid,
}
sources := map[ulid.ULID]struct{}{}
mint := blocks[0].MinTime
maxt := blocks[0].MaxTime
2017-01-03 06:43:26 -08:00
for _, b := range blocks {
if b.MinTime < mint {
mint = b.MinTime
}
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
if b.MaxTime > maxt {
maxt = b.MaxTime
}
2017-08-09 02:10:29 -07:00
if b.Compaction.Level > res.Compaction.Level {
res.Compaction.Level = b.Compaction.Level
}
for _, s := range b.Compaction.Sources {
sources[s] = struct{}{}
}
res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{
ULID: b.ULID,
MinTime: b.MinTime,
MaxTime: b.MaxTime,
})
}
2017-08-09 02:10:29 -07:00
res.Compaction.Level++
for s := range sources {
res.Compaction.Sources = append(res.Compaction.Sources, s)
}
sort.Slice(res.Compaction.Sources, func(i, j int) bool {
return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0
})
res.MinTime = mint
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
res.MaxTime = maxt
2017-01-03 06:43:26 -08:00
return res
}
// CompactWithSplitting merges and splits the input blocks into shardCount number of output blocks,
// and returns slice of block IDs. Position of returned block ID in the result slice corresponds to the shard index.
// If given output block has no series, corresponding block ID will be zero ULID value.
func (c *LeveledCompactor) CompactWithSplitting(dest string, dirs []string, open []*Block, shardCount uint64) (result []ulid.ULID, _ error) {
return c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{}, shardCount)
}
2017-08-09 02:10:29 -07:00
// Compact creates a new block in the compactor's directory from the blocks in the
// provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
ulids, err := c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{}, 1)
if err != nil {
return ulid.ULID{}, err
}
return ulids[0], nil
}
// shardedBlock describes single *output* block during compaction. This struct is passed between
// compaction methods to wrap output block details, index and chunk writer together.
// Shard index is determined by the position of this structure in the slice of output blocks.
type shardedBlock struct {
meta *BlockMeta
blockDir string
tmpDir string // Temp directory used when block is being built (= blockDir + temp suffix)
chunkw ChunkWriter
indexw IndexWriter
}
func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator, shardCount uint64) (_ []ulid.ULID, err error) {
if shardCount == 0 {
shardCount = 1
}
start := time.Now()
bs, blocksToClose, err := openBlocksForCompaction(dirs, open, c.logger, c.chunkPool, c.concurrencyOpts.MaxOpeningBlocks)
for _, b := range blocksToClose {
defer b.Close()
}
if err != nil {
return nil, err
}
var (
blocks []BlockReader
metas []*BlockMeta
uids []string
)
for _, b := range bs {
2017-03-02 00:13:29 -08:00
blocks = append(blocks, b)
m := b.Meta()
metas = append(metas, &m)
uids = append(uids, b.meta.ULID.String())
2017-03-02 00:13:29 -08:00
}
2021-09-27 03:49:08 -07:00
outBlocks := make([]shardedBlock, shardCount)
outBlocksTime := ulid.Now() // Make all out blocks share the same timestamp in the ULID.
for ix := range outBlocks {
outBlocks[ix] = shardedBlock{meta: CompactBlockMetas(ulid.MustNew(outBlocksTime, rand.Reader), metas...)}
}
err = c.write(dest, outBlocks, blockPopulator, blocks...)
if err == nil {
2021-09-27 03:49:08 -07:00
ulids := make([]ulid.ULID, len(outBlocks))
allOutputBlocksAreEmpty := true
for ix := range outBlocks {
meta := outBlocks[ix].meta
if meta.Stats.NumSamples == 0 {
level.Info(c.logger).Log(
"msg", "compact blocks resulted in empty block",
"count", len(blocks),
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
"shard", fmt.Sprintf("%d_of_%d", ix+1, shardCount),
)
} else {
allOutputBlocksAreEmpty = false
ulids[ix] = outBlocks[ix].meta.ULID
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
"shard", fmt.Sprintf("%d_of_%d", ix+1, shardCount),
)
}
}
if allOutputBlocksAreEmpty {
// Mark source blocks as deletable.
for _, b := range bs {
b.meta.Compaction.Deletable = true
n, err := writeMetaFile(c.logger, b.dir, &b.meta)
if err != nil {
level.Error(c.logger).Log(
"msg", "Failed to write 'Deletable' to meta file after compaction",
"ulid", b.meta.ULID,
)
}
b.numBytesMeta = n
}
}
return ulids, nil
}
errs := tsdb_errors.NewMulti(err)
if !errors.Is(err, context.Canceled) {
for _, b := range bs {
if err := b.setCompactionFailed(); err != nil {
errs.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
}
}
}
return nil, errs.Err()
}
// CompactOOOWithSplitting splits the input OOO Head into shardCount number of output blocks
// per possible block range, and returns slice of block IDs. In result[i][j],
// 'i' corresponds to a single time range of blocks while 'j' corresponds to the shard index.
// If given output block has no series, corresponding block ID will be zero ULID value.
// TODO: write tests for this.
func (c *LeveledCompactor) CompactOOOWithSplitting(dest string, oooHead *OOOCompactionHead, shardCount uint64) (result [][]ulid.ULID, _ error) {
return c.compactOOO(dest, oooHead, shardCount)
}
// CompactOOO creates a new block per possible block range in the compactor's directory from the OOO Head given.
// Each ULID in the result corresponds to a block in a unique time range.
func (c *LeveledCompactor) CompactOOO(dest string, oooHead *OOOCompactionHead) (result []ulid.ULID, err error) {
ulids, err := c.compactOOO(dest, oooHead, 1)
if err != nil {
return nil, err
}
for _, s := range ulids {
if s[0].Compare(ulid.ULID{}) != 0 {
result = append(result, s[0])
}
}
return result, err
}
func (c *LeveledCompactor) compactOOO(dest string, oooHead *OOOCompactionHead, shardCount uint64) (_ [][]ulid.ULID, err error) {
if shardCount == 0 {
shardCount = 1
}
start := time.Now()
// The first dimension of outBlocks determines the time based splitting (i.e. outBlocks[i] has blocks all for the same time range).
// The second dimension of outBlocks determines the label based shard (i.e. outBlocks[i][j] is the (j+1)th shard.
// During ingestion of samples we can identify which ooo blocks will exists so that
// we dont have to prefill symbols and etc for the blocks that will be empty.
// With this, len(outBlocks[x]) will still be the same for all x so that we can pick blocks easily.
// Just that, only some of the outBlocks[x][y] will be valid and populated based on preexisting knowledge of
// which blocks to expect.
// In case we see a sample that is not present in the estimated block ranges, we will create them on flight.
outBlocks := make([][]shardedBlock, 0)
outBlocksTime := ulid.Now() // Make all out blocks share the same timestamp in the ULID.
blockSize := oooHead.ChunkRange()
oooHeadMint, oooHeadMaxt := oooHead.MinTime(), oooHead.MaxTime()
ulids := make([][]ulid.ULID, 0)
2023-04-26 07:22:31 -07:00
for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize {
mint, maxt := t, t+blockSize
outBlocks = append(outBlocks, make([]shardedBlock, shardCount))
ulids = append(ulids, make([]ulid.ULID, shardCount))
ix := len(outBlocks) - 1
for jx := range outBlocks[ix] {
uid := ulid.MustNew(outBlocksTime, rand.Reader)
meta := &BlockMeta{
ULID: uid,
MinTime: mint,
MaxTime: maxt,
OutOfOrder: true,
}
meta.Compaction.Level = 1
meta.Compaction.Sources = []ulid.ULID{uid}
outBlocks[ix][jx] = shardedBlock{
meta: meta,
}
ulids[ix][jx] = meta.ULID
}
// Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.
err := c.write(dest, outBlocks[ix], DefaultBlockPopulator{}, oooHead.CloneForTimeRange(mint, maxt-1))
if err != nil {
// We need to delete all blocks in case there was an error.
for _, obs := range outBlocks {
for _, ob := range obs {
if ob.tmpDir != "" {
if removeErr := os.RemoveAll(ob.tmpDir); removeErr != nil {
level.Error(c.logger).Log("msg", "Failed to remove temp folder after failed compaction", "dir", ob.tmpDir, "err", removeErr.Error())
}
}
if ob.blockDir != "" {
if removeErr := os.RemoveAll(ob.blockDir); removeErr != nil {
level.Error(c.logger).Log("msg", "Failed to remove block folder after failed compaction", "dir", ob.blockDir, "err", removeErr.Error())
}
}
}
}
return nil, err
}
}
noOOOBlock := true
for ix, obs := range outBlocks {
for jx := range obs {
meta := outBlocks[ix][jx].meta
if meta.Stats.NumSamples != 0 {
noOOOBlock = false
level.Info(c.logger).Log(
"msg", "compact ooo head",
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"duration", time.Since(start),
"shard", fmt.Sprintf("%d_of_%d", jx+1, shardCount),
)
} else {
// This block did not get any data. So clear out the ulid to signal this.
ulids[ix][jx] = ulid.ULID{}
}
}
}
if noOOOBlock {
level.Info(c.logger).Log(
"msg", "compact ooo head resulted in no blocks",
"duration", time.Since(start),
)
return nil, nil
}
return ulids, nil
}
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
start := time.Now()
uid := ulid.MustNew(ulid.Now(), rand.Reader)
meta := &BlockMeta{
ULID: uid,
MinTime: mint,
MaxTime: maxt,
}
meta.Compaction.Level = 1
meta.Compaction.Sources = []ulid.ULID{uid}
if parent != nil {
meta.Compaction.Parents = []BlockDesc{
{ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime},
}
}
err := c.write(dest, []shardedBlock{{meta: meta}}, DefaultBlockPopulator{}, b)
if err != nil {
return uid, err
}
if meta.Stats.NumSamples == 0 {
level.Info(c.logger).Log(
"msg", "write block resulted in empty block",
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"duration", time.Since(start),
)
return ulid.ULID{}, nil
}
level.Info(c.logger).Log(
"msg", "write block",
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"duration", time.Since(start),
)
return uid, nil
2017-03-02 00:13:29 -08:00
}
// instrumentedChunkWriter is used for level 1 compactions to record statistics
// about compacted chunks.
type instrumentedChunkWriter struct {
ChunkWriter
size prometheus.Histogram
samples prometheus.Histogram
trange prometheus.Histogram
}
func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
for _, c := range chunks {
w.size.Observe(float64(len(c.Chunk.Bytes())))
w.samples.Observe(float64(c.Chunk.NumSamples()))
w.trange.Observe(float64(c.MaxTime - c.MinTime))
}
return w.ChunkWriter.WriteChunks(chunks...)
}
// write creates new output blocks that are the union of the provided blocks into dir.
func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blockPopulator BlockPopulator, blocks ...BlockReader) (err error) {
var closers []io.Closer
2017-03-02 00:13:29 -08:00
defer func(t time.Time) {
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err()
for _, ob := range outBlocks {
if ob.tmpDir != "" {
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
if removeErr := os.RemoveAll(ob.tmpDir); removeErr != nil {
2021-09-28 01:29:54 -07:00
level.Error(c.logger).Log("msg", "Failed to remove temp folder after failed compaction", "dir", ob.tmpDir, "err", removeErr.Error())
}
}
// If there was any error, and we have multiple output blocks, some blocks may have been generated, or at
// least have existing blockDir. In such case, we want to remove them.
// BlockDir may also not be set yet, if preparation for some previous blocks have failed.
if err != nil && ob.blockDir != "" {
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
if removeErr := os.RemoveAll(ob.blockDir); removeErr != nil {
2021-09-28 01:29:54 -07:00
level.Error(c.logger).Log("msg", "Failed to remove block folder after failed compaction", "dir", ob.blockDir, "err", removeErr.Error())
}
}
}
c.metrics.Ran.Inc()
c.metrics.Duration.Observe(time.Since(t).Seconds())
2017-03-02 00:13:29 -08:00
}(time.Now())
2017-01-03 06:43:26 -08:00
for ix := range outBlocks {
dir := filepath.Join(dest, outBlocks[ix].meta.ULID.String())
tmp := dir + tmpForCreationBlockDirSuffix
2017-02-19 07:04:37 -08:00
outBlocks[ix].blockDir = dir
outBlocks[ix].tmpDir = tmp
if err = os.RemoveAll(tmp); err != nil {
return err
}
if err = os.MkdirAll(tmp, 0o777); err != nil {
return err
}
// Populate chunk and index files into temporary directory with
// data of all blocks.
var chunkw ChunkWriter
chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize)
if err != nil {
return errors.Wrap(err, "open chunk writer")
}
chunkw = newPreventDoubleCloseChunkWriter(chunkw) // We now close chunkWriter in populateBlock, but keep it in the closers here as well.
closers = append(closers, chunkw)
// Record written chunk sizes on level 1 compactions.
if outBlocks[ix].meta.Compaction.Level == 1 {
chunkw = &instrumentedChunkWriter{
ChunkWriter: chunkw,
size: c.metrics.ChunkSize,
samples: c.metrics.ChunkSamples,
trange: c.metrics.ChunkRange,
}
}
outBlocks[ix].chunkw = chunkw
var indexw IndexWriter
indexw, err = index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
if err != nil {
return errors.Wrap(err, "open index writer")
}
indexw = newPreventDoubleCloseIndexWriter(indexw) // We now close indexWriter in populateBlock, but keep it in the closers here as well.
closers = append(closers, indexw)
outBlocks[ix].indexw = indexw
}
2017-01-03 06:43:26 -08:00
2021-09-27 07:33:43 -07:00
// We use MinTime and MaxTime from first output block, because ALL output blocks have the same min/max times set.
if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, c.concurrencyOpts, blocks, outBlocks[0].meta.MinTime, outBlocks[0].meta.MaxTime, outBlocks); err != nil {
tsdb: Added ChunkQueryable implementations to db; unified MergeSeriesSets and vertical to single struct. (#7069) * tsdb: Added ChunkQueryable implementations to db; unified compactor, querier and fanout block iterating. Chained to https://github.com/prometheus/prometheus/pull/7059 * NewMerge(Chunk)Querier now takies multiple primaries allowing tsdb DB code to use it. * Added single SeriesEntry / ChunkEntry for all series implementations. * Unified all vertical, and non vertical for compact and querying to single merge series / chunk sets by reusing VerticalSeriesMergeFunc for overlapping algorithm (same logic as before) * Added block (Base/Chunk/)Querier for block querying. We then use populateAndTomb(Base/Chunk/) to iterate over chunks or samples. * Refactored endpoint tests and querier tests to include subtests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments from Brian and Beorn. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed snapshot test and added chunk iterator support for DBReadOnly. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed race when iterating over Ats first. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed populate block tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed endpoints test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added test & fixed case of head open chunk. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed DBReadOnly tests and bug producing 1 sample chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added cases for partial block overlap for multiple full chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added extra tests for chunk meta after compaction. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed small vertical merge bug and added more tests for that. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-07-31 08:03:02 -07:00
return errors.Wrap(err, "populate block")
2017-01-03 06:43:26 -08:00
}
2021-09-27 07:33:43 -07:00
select {
case <-c.ctx.Done():
return c.ctx.Err()
default:
}
// We are explicitly closing them here to check for error even
// though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to
// make sure they are closed if the function exits due to an error above.
errs := tsdb_errors.NewMulti()
for _, w := range closers {
errs.Add(w.Close())
}
closers = closers[:0] // Avoid closing the writers twice in the defer.
if errs.Err() != nil {
return errs.Err()
2017-01-03 06:43:26 -08:00
}
for _, ob := range outBlocks {
// Populated block is empty, don't write meta file for it.
if ob.meta.Stats.NumSamples == 0 {
continue
}
if _, err = writeMetaFile(c.logger, ob.tmpDir, ob.meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
// Create an empty tombstones file.
if _, err := tombstones.WriteFile(c.logger, ob.tmpDir, tombstones.NewMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}
df, err := fileutil.OpenDir(ob.tmpDir)
if err != nil {
return errors.Wrap(err, "open temporary block dir")
}
defer func() {
if df != nil {
2021-09-27 07:33:43 -07:00
df.Close()
}
}()
2017-06-11 15:05:04 -07:00
if err := df.Sync(); err != nil {
return errors.Wrap(err, "sync temporary dir file")
}
// Close temp dir before rename block dir (for windows platform).
if err = df.Close(); err != nil {
return errors.Wrap(err, "close temporary dir")
}
df = nil
// Block successfully written, make it visible in destination dir by moving it from tmp one.
if err := fileutil.Replace(ob.tmpDir, ob.blockDir); err != nil {
return errors.Wrap(err, "rename block dir")
}
2017-10-04 01:42:25 -07:00
}
return nil
2017-01-03 06:43:26 -08:00
}
func debugOutOfOrderChunks(chks []chunks.Meta, logger log.Logger) {
if len(chks) <= 1 {
return
}
prevChk := chks[0]
for i := 1; i < len(chks); i++ {
currChk := chks[i]
if currChk.MinTime > prevChk.MaxTime {
// Not out of order.
continue
}
// Looks like the chunk is out of order.
prevSafeChk, prevIsSafeChk := prevChk.Chunk.(*safeChunk)
currSafeChk, currIsSafeChk := currChk.Chunk.(*safeChunk)
// Get info out of safeChunk (if possible).
prevHeadChunkID := chunks.HeadChunkID(0)
currHeadChunkID := chunks.HeadChunkID(0)
prevLabels := labels.Labels{}
currLabels := labels.Labels{}
if prevSafeChk != nil {
prevHeadChunkID = prevSafeChk.cid
prevLabels = prevSafeChk.s.lset
}
if currSafeChk != nil {
currHeadChunkID = currSafeChk.cid
currLabels = currSafeChk.s.lset
}
level.Warn(logger).Log(
"msg", "found out-of-order chunk when compacting",
"prev_ref", prevChk.Ref,
"curr_ref", currChk.Ref,
"prev_min_time", timeFromMillis(prevChk.MinTime).UTC().String(),
"prev_max_time", timeFromMillis(prevChk.MaxTime).UTC().String(),
"curr_min_time", timeFromMillis(currChk.MinTime).UTC().String(),
"curr_max_time", timeFromMillis(currChk.MaxTime).UTC().String(),
"prev_samples", prevChk.Chunk.NumSamples(),
"curr_samples", currChk.Chunk.NumSamples(),
"prev_is_safe_chunk", prevIsSafeChk,
"curr_is_safe_chunk", currIsSafeChk,
"prev_head_chunk_id", prevHeadChunkID,
"curr_head_chunk_id", currHeadChunkID,
"prev_labelset", prevLabels.String(),
"curr_labelset", currLabels.String(),
"num_chunks_for_series", len(chks),
)
}
}
func timeFromMillis(ms int64) time.Time {
return time.Unix(0, ms*int64(time.Millisecond))
}
type BlockPopulator interface {
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, concurrencyOpts LeveledCompactorConcurrencyOptions, blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) error
}
type DefaultBlockPopulator struct{}
// PopulateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
// It expects sorted blocks input by mint.
2023-04-17 19:08:43 -07:00
// If there is more than 1 output block, each output block will only contain series that hash into its shard
// (based on total number of output blocks).
func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, concurrencyOpts LeveledCompactorConcurrencyOptions, blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) {
if len(blocks) == 0 {
return errors.New("cannot populate block(s) from no readers")
}
var (
tsdb: Added ChunkQueryable implementations to db; unified MergeSeriesSets and vertical to single struct. (#7069) * tsdb: Added ChunkQueryable implementations to db; unified compactor, querier and fanout block iterating. Chained to https://github.com/prometheus/prometheus/pull/7059 * NewMerge(Chunk)Querier now takies multiple primaries allowing tsdb DB code to use it. * Added single SeriesEntry / ChunkEntry for all series implementations. * Unified all vertical, and non vertical for compact and querying to single merge series / chunk sets by reusing VerticalSeriesMergeFunc for overlapping algorithm (same logic as before) * Added block (Base/Chunk/)Querier for block querying. We then use populateAndTomb(Base/Chunk/) to iterate over chunks or samples. * Refactored endpoint tests and querier tests to include subtests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments from Brian and Beorn. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed snapshot test and added chunk iterator support for DBReadOnly. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed race when iterating over Ats first. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed populate block tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed endpoints test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added test & fixed case of head open chunk. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed DBReadOnly tests and bug producing 1 sample chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added cases for partial block overlap for multiple full chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added extra tests for chunk meta after compaction. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed small vertical merge bug and added more tests for that. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-07-31 08:03:02 -07:00
sets []storage.ChunkSeriesSet
symbolsSets []storage.ChunkSeriesSet // series sets used for finding symbols. Only used when doing sharding.
Stream symbols during compaction. (#6468) Rather than buffer up symbols in RAM, do it one by one during compaction. Then use the reader's symbol handling for symbol lookups during the rest of the index write. There is some slowdown in compaction, due to having to look through a file rather than a hash lookup. This is noise to the overall cost of compacting series with thousands of samples though. benchmark old ns/op new ns/op delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 539917175 675341565 +25.08% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2441815993 2477453524 +1.46% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3978543559 3922909687 -1.40% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8430219716 8586610007 +1.86% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1786424591 1909552782 +6.89% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5328998202 6020839950 +12.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 10085059958 11085278690 +9.92% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 25497010155 27018079806 +5.97% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 2427391406 2817217987 +16.06% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 2592965497 2538805050 -2.09% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 2437388343 2668012858 +9.46% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 2317095324 2787423966 +20.30% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 2600239857 2096973860 -19.35% benchmark old allocs new allocs delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 500851 470794 -6.00% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 821527 791451 -3.66% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1141562 1111508 -2.63% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2141576 2111504 -1.40% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 871466 841424 -3.45% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1941428 1911415 -1.55% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3071573 3041510 -0.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6771648 6741509 -0.45% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 731493 824888 +12.77% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 793918 887311 +11.76% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 811842 905204 +11.50% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 832244 925081 +11.16% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 921553 1019162 +10.59% benchmark old bytes new bytes delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 40532648 35698276 -11.93% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 60340216 53409568 -11.49% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 81087336 72065552 -11.13% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 142485576 120878544 -15.16% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 208661368 203831136 -2.31% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 347345904 340484696 -1.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 585185856 576244648 -1.53% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1357641792 1358966528 +0.10% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 126486664 119666744 -5.39% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 122323192 115117224 -5.89% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 126404504 119469864 -5.49% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 119047832 112230408 -5.73% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 136576016 116634800 -14.60% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2019-12-17 11:49:54 -08:00
symbols index.StringIter
tsdb: Added ChunkQueryable implementations to db; unified MergeSeriesSets and vertical to single struct. (#7069) * tsdb: Added ChunkQueryable implementations to db; unified compactor, querier and fanout block iterating. Chained to https://github.com/prometheus/prometheus/pull/7059 * NewMerge(Chunk)Querier now takies multiple primaries allowing tsdb DB code to use it. * Added single SeriesEntry / ChunkEntry for all series implementations. * Unified all vertical, and non vertical for compact and querying to single merge series / chunk sets by reusing VerticalSeriesMergeFunc for overlapping algorithm (same logic as before) * Added block (Base/Chunk/)Querier for block querying. We then use populateAndTomb(Base/Chunk/) to iterate over chunks or samples. * Refactored endpoint tests and querier tests to include subtests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments from Brian and Beorn. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed snapshot test and added chunk iterator support for DBReadOnly. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed race when iterating over Ats first. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed populate block tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed endpoints test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added test & fixed case of head open chunk. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed DBReadOnly tests and bug producing 1 sample chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added cases for partial block overlap for multiple full chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added extra tests for chunk meta after compaction. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed small vertical merge bug and added more tests for that. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-07-31 08:03:02 -07:00
closers []io.Closer
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
overlapping bool
)
defer func() {
errs := tsdb_errors.NewMulti(err)
if cerr := tsdb_errors.CloseAll(closers); cerr != nil {
errs.Add(errors.Wrap(cerr, "close"))
}
err = errs.Err()
metrics.PopulatingBlocks.Set(0)
}()
metrics.PopulatingBlocks.Set(1)
globalMaxt := blocks[0].Meta().MaxTime
2017-01-03 06:43:26 -08:00
for i, b := range blocks {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
if !overlapping {
if i > 0 && b.Meta().MinTime < globalMaxt {
metrics.OverlappingBlocks.Inc()
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
overlapping = true
level.Info(logger).Log("msg", "Found overlapping blocks during compaction")
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
}
if b.Meta().MaxTime > globalMaxt {
globalMaxt = b.Meta().MaxTime
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
}
}
indexr, err := b.Index()
if err != nil {
return errors.Wrapf(err, "open index reader for block %+v", b.Meta())
}
closers = append(closers, indexr)
chunkr, err := b.Chunks()
if err != nil {
return errors.Wrapf(err, "open chunk reader for block %+v", b.Meta())
}
closers = append(closers, chunkr)
tombsr, err := b.Tombstones()
if err != nil {
return errors.Wrapf(err, "open tombstone reader for block %+v", b.Meta())
}
closers = append(closers, tombsr)
Reduce memory used by postings offset table. Rather than keeping the offset of each postings list, instead keep the nth offset of the offset of the posting list. As postings list offsets have always been sorted, we can then get to the closest entry before the one we want an iterate forwards. I haven't done much tuning on the 32 number, it was chosen to try not to read through more than a 4k page of data. Switch to a bulk interface for fetching postings. Use it to avoid having to re-read parts of the posting offset table when querying lots of it. For a index with what BenchmarkHeadPostingForMatchers uses RAM for r.postings drops from 3.79MB to 80.19kB or about 48x. Bytes allocated go down by 30%, and suprisingly CPU usage drops by 4-6% for typical queries too. benchmark old ns/op new ns/op delta BenchmarkPostingsForMatchers/Block/n="1"-4 35231 36673 +4.09% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 563380 540627 -4.04% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 536782 534186 -0.48% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 533990 541550 +1.42% BenchmarkPostingsForMatchers/Block/i=~".*"-4 113374598 117969608 +4.05% BenchmarkPostingsForMatchers/Block/i=~".+"-4 146329884 139651442 -4.56% BenchmarkPostingsForMatchers/Block/i=~""-4 50346510 44961127 -10.70% BenchmarkPostingsForMatchers/Block/i!=""-4 41261550 35356165 -14.31% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 112544418 116904010 +3.87% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 112487086 116864918 +3.89% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 41094758 35457904 -13.72% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 41906372 36151473 -13.73% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 147262414 140424800 -4.64% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 28615629 27872072 -2.60% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 147117177 140462403 -4.52% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 175096826 167902298 -4.11% benchmark old allocs new allocs delta BenchmarkPostingsForMatchers/Block/n="1"-4 4 6 +50.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 7 11 +57.14% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 7 11 +57.14% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 15 17 +13.33% BenchmarkPostingsForMatchers/Block/i=~".*"-4 100010 100012 +0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 200069 200040 -0.01% BenchmarkPostingsForMatchers/Block/i=~""-4 200072 200045 -0.01% BenchmarkPostingsForMatchers/Block/i!=""-4 200070 200041 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 100013 100017 +0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 100017 100023 +0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 200073 200046 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 200075 200050 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 200074 200049 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 111165 111150 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 200078 200055 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 311282 311238 -0.01% benchmark old bytes new bytes delta BenchmarkPostingsForMatchers/Block/n="1"-4 264 296 +12.12% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 360 424 +17.78% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 360 424 +17.78% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 520 552 +6.15% BenchmarkPostingsForMatchers/Block/i=~".*"-4 1600461 1600482 +0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 24900801 17259077 -30.69% BenchmarkPostingsForMatchers/Block/i=~""-4 24900836 17259151 -30.69% BenchmarkPostingsForMatchers/Block/i!=""-4 24900760 17259048 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 1600557 1600621 +0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 1600717 1600813 +0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 24900856 17259176 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 24900952 17259304 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 24900993 17259333 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 3788311 3142630 -17.04% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 24901137 17259509 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 28693086 20405680 -28.88% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2019-12-05 10:27:40 -08:00
k, v := index.AllPostingsKey()
all, err := indexr.Postings(k, v)
2017-01-03 06:43:26 -08:00
if err != nil {
return err
2017-01-03 06:43:26 -08:00
}
all = indexr.SortedPostings(all)
tsdb: Added ChunkQueryable implementations to db; unified MergeSeriesSets and vertical to single struct. (#7069) * tsdb: Added ChunkQueryable implementations to db; unified compactor, querier and fanout block iterating. Chained to https://github.com/prometheus/prometheus/pull/7059 * NewMerge(Chunk)Querier now takies multiple primaries allowing tsdb DB code to use it. * Added single SeriesEntry / ChunkEntry for all series implementations. * Unified all vertical, and non vertical for compact and querying to single merge series / chunk sets by reusing VerticalSeriesMergeFunc for overlapping algorithm (same logic as before) * Added block (Base/Chunk/)Querier for block querying. We then use populateAndTomb(Base/Chunk/) to iterate over chunks or samples. * Refactored endpoint tests and querier tests to include subtests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments from Brian and Beorn. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed snapshot test and added chunk iterator support for DBReadOnly. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed race when iterating over Ats first. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed populate block tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed endpoints test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added test & fixed case of head open chunk. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed DBReadOnly tests and bug producing 1 sample chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added cases for partial block overlap for multiple full chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added extra tests for chunk meta after compaction. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed small vertical merge bug and added more tests for that. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-07-31 08:03:02 -07:00
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, all, minT, maxT-1, false))
2021-10-06 02:04:37 -07:00
if len(outBlocks) > 1 {
// To iterate series when populating symbols, we cannot reuse postings we just got, but need to get a new copy.
// Postings can only be iterated once.
k, v = index.AllPostingsKey()
all, err = indexr.Postings(k, v)
if err != nil {
return err
}
all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
symbolsSets = append(symbolsSets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, all, minT, maxT-1, false))
2021-10-06 02:04:37 -07:00
} else {
syms := indexr.Symbols()
if i == 0 {
symbols = syms
continue
}
symbols = NewMergedStringIter(symbols, syms)
2017-01-03 06:43:26 -08:00
}
}
if len(outBlocks) == 1 {
for symbols.Next() {
if err := outBlocks[0].indexw.AddSymbol(symbols.At()); err != nil {
return errors.Wrap(err, "add symbol")
}
Stream symbols during compaction. (#6468) Rather than buffer up symbols in RAM, do it one by one during compaction. Then use the reader's symbol handling for symbol lookups during the rest of the index write. There is some slowdown in compaction, due to having to look through a file rather than a hash lookup. This is noise to the overall cost of compacting series with thousands of samples though. benchmark old ns/op new ns/op delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 539917175 675341565 +25.08% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2441815993 2477453524 +1.46% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3978543559 3922909687 -1.40% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8430219716 8586610007 +1.86% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1786424591 1909552782 +6.89% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5328998202 6020839950 +12.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 10085059958 11085278690 +9.92% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 25497010155 27018079806 +5.97% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 2427391406 2817217987 +16.06% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 2592965497 2538805050 -2.09% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 2437388343 2668012858 +9.46% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 2317095324 2787423966 +20.30% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 2600239857 2096973860 -19.35% benchmark old allocs new allocs delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 500851 470794 -6.00% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 821527 791451 -3.66% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1141562 1111508 -2.63% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2141576 2111504 -1.40% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 871466 841424 -3.45% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1941428 1911415 -1.55% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3071573 3041510 -0.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6771648 6741509 -0.45% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 731493 824888 +12.77% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 793918 887311 +11.76% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 811842 905204 +11.50% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 832244 925081 +11.16% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 921553 1019162 +10.59% benchmark old bytes new bytes delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 40532648 35698276 -11.93% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 60340216 53409568 -11.49% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 81087336 72065552 -11.13% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 142485576 120878544 -15.16% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 208661368 203831136 -2.31% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 347345904 340484696 -1.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 585185856 576244648 -1.53% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1357641792 1358966528 +0.10% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 126486664 119666744 -5.39% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 122323192 115117224 -5.89% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 126404504 119469864 -5.49% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 119047832 112230408 -5.73% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 136576016 116634800 -14.60% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2019-12-17 11:49:54 -08:00
}
if symbols.Err() != nil {
return errors.Wrap(symbols.Err(), "next symbol")
}
} else {
if err := populateSymbols(ctx, mergeFunc, concurrencyOpts, symbolsSets, outBlocks); err != nil {
return err
}
}
// Semaphore for number of blocks that can be closed at once.
sema := semaphore.NewWeighted(int64(concurrencyOpts.MaxClosingBlocks))
blockWriters := make([]*asyncBlockWriter, len(outBlocks))
for ix := range outBlocks {
blockWriters[ix] = newAsyncBlockWriter(chunkPool, outBlocks[ix].chunkw, outBlocks[ix].indexw, sema)
}
defer func() {
// Stop all async writers.
for ix := range outBlocks {
blockWriters[ix].closeAsync()
}
// And wait until they have finished, to make sure that they no longer update chunk or index writers.
for ix := range outBlocks {
_, _ = blockWriters[ix].waitFinished()
}
}()
tsdb: Added ChunkQueryable implementations to db; unified MergeSeriesSets and vertical to single struct. (#7069) * tsdb: Added ChunkQueryable implementations to db; unified compactor, querier and fanout block iterating. Chained to https://github.com/prometheus/prometheus/pull/7059 * NewMerge(Chunk)Querier now takies multiple primaries allowing tsdb DB code to use it. * Added single SeriesEntry / ChunkEntry for all series implementations. * Unified all vertical, and non vertical for compact and querying to single merge series / chunk sets by reusing VerticalSeriesMergeFunc for overlapping algorithm (same logic as before) * Added block (Base/Chunk/)Querier for block querying. We then use populateAndTomb(Base/Chunk/) to iterate over chunks or samples. * Refactored endpoint tests and querier tests to include subtests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments from Brian and Beorn. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed snapshot test and added chunk iterator support for DBReadOnly. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed race when iterating over Ats first. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed populate block tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed endpoints test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added test & fixed case of head open chunk. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed DBReadOnly tests and bug producing 1 sample chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added cases for partial block overlap for multiple full chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added extra tests for chunk meta after compaction. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed small vertical merge bug and added more tests for that. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-07-31 08:03:02 -07:00
var chksIter chunks.Iterator
tsdb: Added ChunkQueryable implementations to db; unified MergeSeriesSets and vertical to single struct. (#7069) * tsdb: Added ChunkQueryable implementations to db; unified compactor, querier and fanout block iterating. Chained to https://github.com/prometheus/prometheus/pull/7059 * NewMerge(Chunk)Querier now takies multiple primaries allowing tsdb DB code to use it. * Added single SeriesEntry / ChunkEntry for all series implementations. * Unified all vertical, and non vertical for compact and querying to single merge series / chunk sets by reusing VerticalSeriesMergeFunc for overlapping algorithm (same logic as before) * Added block (Base/Chunk/)Querier for block querying. We then use populateAndTomb(Base/Chunk/) to iterate over chunks or samples. * Refactored endpoint tests and querier tests to include subtests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments from Brian and Beorn. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed snapshot test and added chunk iterator support for DBReadOnly. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed race when iterating over Ats first. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed populate block tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed endpoints test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added test & fixed case of head open chunk. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed DBReadOnly tests and bug producing 1 sample chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added cases for partial block overlap for multiple full chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added extra tests for chunk meta after compaction. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed small vertical merge bug and added more tests for that. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-07-31 08:03:02 -07:00
set := sets[0]
if len(sets) > 1 {
// Merge series using specified chunk series merger.
// The default one is the compacting series merger.
set = storage.NewMergeChunkSeriesSet(sets, mergeFunc)
tsdb: Added ChunkQueryable implementations to db; unified MergeSeriesSets and vertical to single struct. (#7069) * tsdb: Added ChunkQueryable implementations to db; unified compactor, querier and fanout block iterating. Chained to https://github.com/prometheus/prometheus/pull/7059 * NewMerge(Chunk)Querier now takies multiple primaries allowing tsdb DB code to use it. * Added single SeriesEntry / ChunkEntry for all series implementations. * Unified all vertical, and non vertical for compact and querying to single merge series / chunk sets by reusing VerticalSeriesMergeFunc for overlapping algorithm (same logic as before) * Added block (Base/Chunk/)Querier for block querying. We then use populateAndTomb(Base/Chunk/) to iterate over chunks or samples. * Refactored endpoint tests and querier tests to include subtests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments from Brian and Beorn. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed snapshot test and added chunk iterator support for DBReadOnly. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed race when iterating over Ats first. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed populate block tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed endpoints test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added test & fixed case of head open chunk. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed DBReadOnly tests and bug producing 1 sample chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added cases for partial block overlap for multiple full chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added extra tests for chunk meta after compaction. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed small vertical merge bug and added more tests for that. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-07-31 08:03:02 -07:00
}
// Iterate over all sorted chunk series.
for set.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
tsdb: Added ChunkQueryable implementations to db; unified MergeSeriesSets and vertical to single struct. (#7069) * tsdb: Added ChunkQueryable implementations to db; unified compactor, querier and fanout block iterating. Chained to https://github.com/prometheus/prometheus/pull/7059 * NewMerge(Chunk)Querier now takies multiple primaries allowing tsdb DB code to use it. * Added single SeriesEntry / ChunkEntry for all series implementations. * Unified all vertical, and non vertical for compact and querying to single merge series / chunk sets by reusing VerticalSeriesMergeFunc for overlapping algorithm (same logic as before) * Added block (Base/Chunk/)Querier for block querying. We then use populateAndTomb(Base/Chunk/) to iterate over chunks or samples. * Refactored endpoint tests and querier tests to include subtests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments from Brian and Beorn. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed snapshot test and added chunk iterator support for DBReadOnly. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed race when iterating over Ats first. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed populate block tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed endpoints test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added test & fixed case of head open chunk. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed DBReadOnly tests and bug producing 1 sample chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added cases for partial block overlap for multiple full chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added extra tests for chunk meta after compaction. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed small vertical merge bug and added more tests for that. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-07-31 08:03:02 -07:00
s := set.At()
chksIter := s.Iterator(chksIter)
var chks []chunks.Meta
tsdb: Added ChunkQueryable implementations to db; unified MergeSeriesSets and vertical to single struct. (#7069) * tsdb: Added ChunkQueryable implementations to db; unified compactor, querier and fanout block iterating. Chained to https://github.com/prometheus/prometheus/pull/7059 * NewMerge(Chunk)Querier now takies multiple primaries allowing tsdb DB code to use it. * Added single SeriesEntry / ChunkEntry for all series implementations. * Unified all vertical, and non vertical for compact and querying to single merge series / chunk sets by reusing VerticalSeriesMergeFunc for overlapping algorithm (same logic as before) * Added block (Base/Chunk/)Querier for block querying. We then use populateAndTomb(Base/Chunk/) to iterate over chunks or samples. * Refactored endpoint tests and querier tests to include subtests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments from Brian and Beorn. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed snapshot test and added chunk iterator support for DBReadOnly. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed race when iterating over Ats first. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed populate block tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed endpoints test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added test & fixed case of head open chunk. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed DBReadOnly tests and bug producing 1 sample chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added cases for partial block overlap for multiple full chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added extra tests for chunk meta after compaction. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed small vertical merge bug and added more tests for that. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-07-31 08:03:02 -07:00
for chksIter.Next() {
// We are not iterating in streaming way over chunk as
// it's more efficient to do bulk write for index and
tsdb: Added ChunkQueryable implementations to db; unified MergeSeriesSets and vertical to single struct. (#7069) * tsdb: Added ChunkQueryable implementations to db; unified compactor, querier and fanout block iterating. Chained to https://github.com/prometheus/prometheus/pull/7059 * NewMerge(Chunk)Querier now takies multiple primaries allowing tsdb DB code to use it. * Added single SeriesEntry / ChunkEntry for all series implementations. * Unified all vertical, and non vertical for compact and querying to single merge series / chunk sets by reusing VerticalSeriesMergeFunc for overlapping algorithm (same logic as before) * Added block (Base/Chunk/)Querier for block querying. We then use populateAndTomb(Base/Chunk/) to iterate over chunks or samples. * Refactored endpoint tests and querier tests to include subtests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments from Brian and Beorn. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed snapshot test and added chunk iterator support for DBReadOnly. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed race when iterating over Ats first. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed populate block tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed endpoints test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed test. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added test & fixed case of head open chunk. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed DBReadOnly tests and bug producing 1 sample chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added cases for partial block overlap for multiple full chunks. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added extra tests for chunk meta after compaction. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixed small vertical merge bug and added more tests for that. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-07-31 08:03:02 -07:00
// chunk file purposes.
chks = append(chks, chksIter.At())
}
if chksIter.Err() != nil {
return errors.Wrap(chksIter.Err(), "chunk iter")
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 05:29:41 -08:00
}
// Skip the series with all deleted chunks.
if len(chks) == 0 {
continue
}
debugOutOfOrderChunks(chks, logger)
2021-09-28 01:29:54 -07:00
obIx := uint64(0)
if len(outBlocks) > 1 {
obIx = labels.StableHash(s.Labels()) % uint64(len(outBlocks))
}
err := blockWriters[obIx].addSeries(s.Labels(), chks)
if err != nil {
return errors.Wrap(err, "adding series")
2017-08-08 08:35:34 -07:00
}
}
if set.Err() != nil {
return errors.Wrap(set.Err(), "iterate compaction set")
}
for ix := range blockWriters {
blockWriters[ix].closeAsync()
}
for ix := range blockWriters {
stats, err := blockWriters[ix].waitFinished()
if err != nil {
return errors.Wrap(err, "writing block")
}
outBlocks[ix].meta.Stats = stats
}
return nil
}
2021-10-06 02:04:37 -07:00
// How many symbols we buffer in memory per output block.
const inMemorySymbolsLimit = 1_000_000
// populateSymbols writes symbols to output blocks. We need to iterate through all series to find
// which series belongs to what block. We collect symbols per sharded block, and then add sorted symbols to
// block's index.
func populateSymbols(ctx context.Context, mergeFunc storage.VerticalChunkSeriesMergeFunc, concurrencyOpts LeveledCompactorConcurrencyOptions, sets []storage.ChunkSeriesSet, outBlocks []shardedBlock) error {
if len(outBlocks) == 0 {
return errors.New("no output block")
}
flushers := newSymbolFlushers(concurrencyOpts.SymbolsFlushersCount)
defer flushers.close() // Make sure to stop flushers before exiting to avoid leaking goroutines.
batchers := make([]*symbolsBatcher, len(outBlocks))
for ix := range outBlocks {
batchers[ix] = newSymbolsBatcher(inMemorySymbolsLimit, outBlocks[ix].tmpDir, flushers)
// Always include empty symbol. Blocks created from Head always have it in the symbols table,
// and if we only include symbols from series, we would skip it.
// It may not be required, but it's small and better be safe than sorry.
if err := batchers[ix].addSymbol(""); err != nil {
return errors.Wrap(err, "addSymbol to batcher")
}
}
seriesSet := sets[0]
if len(sets) > 1 {
seriesSet = storage.NewMergeChunkSeriesSet(sets, mergeFunc)
}
for seriesSet.Next() {
if err := ctx.Err(); err != nil {
return err
}
s := seriesSet.At()
var obIx uint64
if len(outBlocks) > 1 {
obIx = labels.StableHash(s.Labels()) % uint64(len(outBlocks))
}
err := s.Labels().Validate(func(l labels.Label) error {
if err := batchers[obIx].addSymbol(l.Name); err != nil {
return errors.Wrap(err, "addSymbol to batcher")
}
if err := batchers[obIx].addSymbol(l.Value); err != nil {
return errors.Wrap(err, "addSymbol to batcher")
}
return nil
})
if err != nil {
return err
}
}
for ix := range outBlocks {
// Flush the batcher to write remaining symbols.
if err := batchers[ix].flushSymbols(true); err != nil {
return errors.Wrap(err, "flushing batcher")
}
}
err := flushers.close()
if err != nil {
return errors.Wrap(err, "closing flushers")
}
for ix := range outBlocks {
if err := ctx.Err(); err != nil {
return err
}
symbolFiles := batchers[ix].getSymbolFiles()
it, err := newSymbolsIterator(symbolFiles)
if err != nil {
return errors.Wrap(err, "opening symbols iterator")
}
// Each symbols iterator must be closed to close underlying files.
closeIt := it
defer func() {
if closeIt != nil {
_ = closeIt.Close()
}
}()
var sym string
for sym, err = it.NextSymbol(); err == nil; sym, err = it.NextSymbol() {
err = outBlocks[ix].indexw.AddSymbol(sym)
if err != nil {
return errors.Wrap(err, "AddSymbol")
}
}
if err != io.EOF {
return errors.Wrap(err, "iterating symbols")
}
// if err == io.EOF, we have iterated through all symbols. We can close underlying
// files now.
closeIt = nil
_ = it.Close()
2021-10-05 08:56:20 -07:00
2021-10-06 02:04:37 -07:00
// Delete symbol files from symbolsBatcher. We don't need to perform the cleanup if populateSymbols
// or compaction fails, because in that case compactor already removes entire (temp) output block directory.
for _, fn := range symbolFiles {
2021-10-05 08:56:20 -07:00
if err := os.Remove(fn); err != nil {
return errors.Wrap(err, "deleting symbols file")
}
}
}
return nil
}
// Returns opened blocks, and blocks that should be closed (also returned in case of error).
func openBlocksForCompaction(dirs []string, open []*Block, logger log.Logger, pool chunkenc.Pool, concurrency int) (blocks, blocksToClose []*Block, _ error) {
blocks = make([]*Block, 0, len(dirs))
blocksToClose = make([]*Block, 0, len(dirs))
toOpenCh := make(chan string, len(dirs))
for _, d := range dirs {
meta, _, err := readMetaFile(d)
if err != nil {
return nil, blocksToClose, err
}
var b *Block
// Use already open blocks if we can, to avoid
// having the index data in memory twice.
for _, o := range open {
if meta.ULID == o.Meta().ULID {
b = o
break
}
}
if b != nil {
blocks = append(blocks, b)
} else {
toOpenCh <- d
}
}
close(toOpenCh)
type openResult struct {
b *Block
err error
}
openResultCh := make(chan openResult, len(toOpenCh))
// Signals to all opening goroutines that there was an error opening some block, and they can stop early.
// If openingError is true, at least one error is sent to openResultCh.
openingError := atomic.NewBool(false)
wg := sync.WaitGroup{}
if len(dirs) < concurrency {
concurrency = len(dirs)
}
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for d := range toOpenCh {
if openingError.Load() {
return
}
b, err := OpenBlock(logger, d, pool)
openResultCh <- openResult{b: b, err: err}
if err != nil {
openingError.Store(true)
return
}
}
}()
}
wg.Wait()
// All writers to openResultCh have stopped, we can close the output channel, so we can range over it.
close(openResultCh)
var firstErr error
for or := range openResultCh {
if or.err != nil {
// Don't stop on error, but iterate over all opened blocks to collect blocksToClose.
if firstErr == nil {
firstErr = or.err
}
} else {
blocks = append(blocks, or.b)
blocksToClose = append(blocksToClose, or.b)
}
}
return blocks, blocksToClose, firstErr
}