Allow populate block logic in compact to be overriden outside Prometheus (#11711)

Signed-off-by: Alex Le <leqiyue@amazon.com>
Signed-off-by: Alex Le <emoc1989@gmail.com>
This commit is contained in:
Alex Le 2023-04-03 23:31:49 -07:00 committed by GitHub
parent 3923e83413
commit 1936868e9d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 75 additions and 64 deletions

View file

@ -75,7 +75,7 @@ type Compactor interface {
// LeveledCompactor implements the Compactor interface.
type LeveledCompactor struct {
metrics *compactorMetrics
metrics *CompactorMetrics
logger log.Logger
ranges []int64
chunkPool chunkenc.Pool
@ -84,47 +84,47 @@ type LeveledCompactor struct {
mergeFunc storage.VerticalChunkSeriesMergeFunc
}
type compactorMetrics struct {
ran prometheus.Counter
populatingBlocks prometheus.Gauge
overlappingBlocks prometheus.Counter
duration prometheus.Histogram
chunkSize prometheus.Histogram
chunkSamples prometheus.Histogram
chunkRange prometheus.Histogram
type CompactorMetrics struct {
Ran prometheus.Counter
PopulatingBlocks prometheus.Gauge
OverlappingBlocks prometheus.Counter
Duration prometheus.Histogram
ChunkSize prometheus.Histogram
ChunkSamples prometheus.Histogram
ChunkRange prometheus.Histogram
}
func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
m := &compactorMetrics{}
func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics {
m := &CompactorMetrics{}
m.ran = prometheus.NewCounter(prometheus.CounterOpts{
m.Ran = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_total",
Help: "Total number of compactions that were executed for the partition.",
})
m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{
m.PopulatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_compaction_populating_block",
Help: "Set to 1 when a block is currently being written to the disk.",
})
m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{
m.OverlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_vertical_compactions_total",
Help: "Total number of compactions done on overlapping blocks.",
})
m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
m.Duration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_duration_seconds",
Help: "Duration of compaction runs",
Buckets: prometheus.ExponentialBuckets(1, 2, 14),
})
m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
m.ChunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_chunk_size_bytes",
Help: "Final size of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(32, 1.5, 12),
})
m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{
m.ChunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_chunk_samples",
Help: "Final number of samples on their first compaction",
Buckets: prometheus.ExponentialBuckets(4, 1.5, 12),
})
m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
m.ChunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_chunk_range_seconds",
Help: "Final time range of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(100, 4, 10),
@ -132,13 +132,13 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
if r != nil {
r.MustRegister(
m.ran,
m.populatingBlocks,
m.overlappingBlocks,
m.duration,
m.chunkRange,
m.chunkSamples,
m.chunkSize,
m.Ran,
m.PopulatingBlocks,
m.OverlappingBlocks,
m.Duration,
m.ChunkRange,
m.ChunkSamples,
m.ChunkSize,
)
}
return m
@ -392,6 +392,10 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// Compact creates a new block in the compactor's directory from the blocks in the
// provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
return c.CompactWithPopulateBlockFunc(dest, dirs, open, DefaultPopulateBlockFunc{})
}
func (c *LeveledCompactor) CompactWithPopulateBlockFunc(dest string, dirs []string, open []*Block, populateBlockFunc PopulateBlockFunc) (uid ulid.ULID, err error) {
var (
blocks []BlockReader
bs []*Block
@ -435,7 +439,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
uid = ulid.MustNew(ulid.Now(), rand.Reader)
meta := CompactBlockMetas(uid, metas...)
err = c.write(dest, meta, blocks...)
err = c.write(dest, meta, populateBlockFunc, blocks...)
if err == nil {
if meta.Stats.NumSamples == 0 {
for _, b := range bs {
@ -501,7 +505,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
}
}
err := c.write(dest, meta, b)
err := c.write(dest, meta, DefaultPopulateBlockFunc{}, b)
if err != nil {
return uid, err
}
@ -546,7 +550,7 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
}
// write creates a new block that is the union of the provided blocks into dir.
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, populateBlockFunc PopulateBlockFunc, blocks ...BlockReader) (err error) {
dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + tmpForCreationBlockDirSuffix
var closers []io.Closer
@ -557,8 +561,8 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err := os.RemoveAll(tmp); err != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
}
c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(t).Seconds())
c.metrics.Ran.Inc()
c.metrics.Duration.Observe(time.Since(t).Seconds())
}(time.Now())
if err = os.RemoveAll(tmp); err != nil {
@ -582,9 +586,9 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if meta.Compaction.Level == 1 {
chunkw = &instrumentedChunkWriter{
ChunkWriter: chunkw,
size: c.metrics.chunkSize,
samples: c.metrics.chunkSamples,
trange: c.metrics.chunkRange,
size: c.metrics.ChunkSize,
samples: c.metrics.ChunkSamples,
trange: c.metrics.ChunkRange,
}
}
@ -594,7 +598,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}
closers = append(closers, indexw)
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
if err := populateBlockFunc.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "populate block")
}
@ -659,10 +663,16 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return nil
}
// populateBlock fills the index and chunk writers with new data gathered as the union
type PopulateBlockFunc interface {
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error
}
type DefaultPopulateBlockFunc struct{}
// PopulateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
// It expects sorted blocks input by mint.
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
func (c DefaultPopulateBlockFunc) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
if len(blocks) == 0 {
return errors.New("cannot populate block from no readers")
}
@ -679,23 +689,23 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
errs.Add(errors.Wrap(cerr, "close"))
}
err = errs.Err()
c.metrics.populatingBlocks.Set(0)
metrics.PopulatingBlocks.Set(0)
}()
c.metrics.populatingBlocks.Set(1)
metrics.PopulatingBlocks.Set(1)
globalMaxt := blocks[0].Meta().MaxTime
for i, b := range blocks {
select {
case <-c.ctx.Done():
return c.ctx.Err()
case <-ctx.Done():
return ctx.Err()
default:
}
if !overlapping {
if i > 0 && b.Meta().MinTime < globalMaxt {
c.metrics.overlappingBlocks.Inc()
metrics.OverlappingBlocks.Inc()
overlapping = true
level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction", "ulid", meta.ULID)
level.Info(logger).Log("msg", "Found overlapping blocks during compaction", "ulid", meta.ULID)
}
if b.Meta().MaxTime > globalMaxt {
globalMaxt = b.Meta().MaxTime
@ -727,7 +737,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, newBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false))
sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false))
syms := indexr.Symbols()
if i == 0 {
symbols = syms
@ -755,14 +765,14 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
if len(sets) > 1 {
// Merge series using specified chunk series merger.
// The default one is the compacting series merger.
set = storage.NewMergeChunkSeriesSet(sets, c.mergeFunc)
set = storage.NewMergeChunkSeriesSet(sets, mergeFunc)
}
// Iterate over all sorted chunk series.
for set.Next() {
select {
case <-c.ctx.Done():
return c.ctx.Err()
case <-ctx.Done():
return ctx.Err()
default:
}
s := set.At()
@ -797,7 +807,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
for _, chk := range chks {
if err := c.chunkPool.Put(chk.Chunk); err != nil {
if err := chunkPool.Put(chk.Chunk); err != nil {
return errors.Wrap(err, "put chunk")
}
}

View file

@ -441,7 +441,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
tmpdir := t.TempDir()
require.Error(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{}))
require.Error(t, compactor.write(tmpdir, &BlockMeta{}, DefaultPopulateBlockFunc{}, erringBReader{}))
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix)
require.True(t, os.IsNotExist(err), "directory is not cleaned up")
}
@ -953,7 +953,8 @@ func TestCompaction_populateBlock(t *testing.T) {
}
iw := &mockIndexWriter{}
err = c.populateBlock(blocks, meta, iw, nopChunkWriter{})
populateBlockFunc := DefaultPopulateBlockFunc{}
err = populateBlockFunc.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{})
if tc.expErr != nil {
require.Error(t, err)
require.Equal(t, tc.expErr.Error(), err.Error())
@ -1181,14 +1182,14 @@ func TestCancelCompactions(t *testing.T) {
db, err := open(tmpdir, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil)
require.NoError(t, err)
require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch")
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "initial compaction counter mismatch")
db.compactc <- struct{}{} // Trigger a compaction.
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 {
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.PopulatingBlocks) <= 0 {
time.Sleep(3 * time.Millisecond)
}
start := time.Now()
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) != 1 {
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) != 1 {
time.Sleep(3 * time.Millisecond)
}
timeCompactionUninterrupted = time.Since(start)
@ -1200,10 +1201,10 @@ func TestCancelCompactions(t *testing.T) {
db, err := open(tmpdirCopy, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil)
require.NoError(t, err)
require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch")
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "initial compaction counter mismatch")
db.compactc <- struct{}{} // Trigger a compaction.
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 {
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.PopulatingBlocks) <= 0 {
time.Sleep(3 * time.Millisecond)
}
@ -1284,7 +1285,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
require.NoError(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file.
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reloadBlocks' count metrics mismatch")
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch")
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "initial `compactions` count metric mismatch")
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "initial `compactions failed` count metric mismatch")
// Do the compaction and check the metrics.
@ -1292,7 +1293,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
// the new block created from the compaction should be deleted.
require.Error(t, db.Compact())
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reloadBlocks' count metrics mismatch")
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch")
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "`compaction` count metric mismatch")
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "`compactions failed` count metric mismatch")
actBlocks, err = blockDirs(db.Dir())

View file

@ -2055,7 +2055,7 @@ func TestNoEmptyBlocks(t *testing.T) {
require.NoError(t, err)
require.Equal(t, len(db.Blocks()), len(actBlocks))
require.Equal(t, 0, len(actBlocks))
require.Equal(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here")
require.Equal(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "no compaction should be triggered here")
})
t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) {
@ -2069,7 +2069,7 @@ func TestNoEmptyBlocks(t *testing.T) {
require.NoError(t, app.Commit())
require.NoError(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
require.NoError(t, db.Compact())
require.Equal(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
require.Equal(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here")
actBlocks, err := blockDirs(db.Dir())
require.NoError(t, err)
@ -2091,7 +2091,7 @@ func TestNoEmptyBlocks(t *testing.T) {
require.NoError(t, app.Commit())
require.NoError(t, db.Compact())
require.Equal(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
require.Equal(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here")
actBlocks, err = blockDirs(db.Dir())
require.NoError(t, err)
require.Equal(t, len(db.Blocks()), len(actBlocks))
@ -2112,7 +2112,7 @@ func TestNoEmptyBlocks(t *testing.T) {
require.NoError(t, app.Commit())
require.NoError(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
require.NoError(t, db.Compact())
require.Equal(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
require.Equal(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here")
require.Equal(t, oldBlocks, db.Blocks())
})
@ -2131,7 +2131,7 @@ func TestNoEmptyBlocks(t *testing.T) {
require.Equal(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered.
require.NoError(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
require.NoError(t, db.Compact())
require.Equal(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here once for each block that have tombstones")
require.Equal(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here once for each block that have tombstones")
actBlocks, err := blockDirs(db.Dir())
require.NoError(t, err)

View file

@ -180,7 +180,7 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints,
if sortSeries {
p = q.index.SortedPostings(p)
}
return newBlockChunkSeriesSet(q.blockID, q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
return NewBlockChunkSeriesSet(q.blockID, q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
}
func findSetMatches(pattern string) []string {
@ -438,7 +438,7 @@ func (s *seriesData) Labels() labels.Labels { return s.labels }
// blockBaseSeriesSet allows to iterate over all series in the single block.
// Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
// See newBlockSeriesSet and NewBlockChunkSeriesSet to use it for either sample or chunk iterating.
type blockBaseSeriesSet struct {
blockID ulid.ULID
p index.Postings
@ -924,7 +924,7 @@ type blockChunkSeriesSet struct {
blockBaseSeriesSet
}
func newBlockChunkSeriesSet(id ulid.ULID, i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet {
func NewBlockChunkSeriesSet(id ulid.ULID, i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet {
return &blockChunkSeriesSet{
blockBaseSeriesSet{
blockID: id,