TSDB: Change block populator to accept postings index function (#14213)

Signed-off-by: Ben Ye <benye@amazon.com>
This commit is contained in:
Ben Ye 2024-06-25 01:21:48 -07:00 committed by GitHub
parent 5585a3c7e5
commit 246b7c6a5c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 79 additions and 11 deletions

View file

@ -656,7 +656,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
}
closers = append(closers, indexw)
if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil {
if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw, AllSortedPostings); err != nil {
return fmt.Errorf("populate block: %w", err)
}
@ -722,7 +722,20 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
}
type BlockPopulator interface {
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) error
}
// IndexReaderPostingsFunc is a function to get a sorted posting iterator from a given index reader.
type IndexReaderPostingsFunc func(ctx context.Context, reader IndexReader) index.Postings
// AllSortedPostings returns a sorted all posting iterator from the input index reader.
func AllSortedPostings(ctx context.Context, reader IndexReader) index.Postings {
k, v := index.AllPostingsKey()
all, err := reader.Postings(ctx, k, v)
if err != nil {
return index.ErrPostings(err)
}
return reader.SortedPostings(all)
}
type DefaultBlockPopulator struct{}
@ -730,7 +743,7 @@ type DefaultBlockPopulator struct{}
// PopulateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
// It expects sorted blocks input by mint.
func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) (err error) {
if len(blocks) == 0 {
return errors.New("cannot populate block from no readers")
}
@ -788,14 +801,9 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
}
closers = append(closers, tombsr)
k, v := index.AllPostingsKey()
all, err := indexr.Postings(ctx, k, v)
if err != nil {
return err
}
all = indexr.SortedPostings(all)
postings := postingsFunc(ctx, indexr)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false))
sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, postings, meta.MinTime, meta.MaxTime-1, false))
syms := indexr.Symbols()
if i == 0 {
symbols = syms

View file

@ -38,6 +38,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wlog"
@ -493,6 +494,7 @@ func TestCompaction_populateBlock(t *testing.T) {
inputSeriesSamples [][]seriesSamples
compactMinTime int64
compactMaxTime int64 // When not defined the test runner sets a default of math.MaxInt64.
irPostingsFunc IndexReaderPostingsFunc
expSeriesSamples []seriesSamples
expErr error
}{
@ -961,6 +963,60 @@ func TestCompaction_populateBlock(t *testing.T) {
},
},
},
{
title: "Populate from single block with index reader postings function selecting different series. Expect empty block.",
inputSeriesSamples: [][]seriesSamples{
{
{
lset: map[string]string{"a": "b"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
},
},
irPostingsFunc: func(ctx context.Context, reader IndexReader) index.Postings {
p, err := reader.Postings(ctx, "a", "c")
if err != nil {
return index.EmptyPostings()
}
return reader.SortedPostings(p)
},
},
{
title: "Populate from single block with index reader postings function selecting one series. Expect partial block.",
inputSeriesSamples: [][]seriesSamples{
{
{
lset: map[string]string{"a": "b"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
{
lset: map[string]string{"a": "c"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
{
lset: map[string]string{"a": "d"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
},
},
irPostingsFunc: func(ctx context.Context, reader IndexReader) index.Postings {
p, err := reader.Postings(ctx, "a", "c", "d")
if err != nil {
return index.EmptyPostings()
}
return reader.SortedPostings(p)
},
expSeriesSamples: []seriesSamples{
{
lset: map[string]string{"a": "c"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
{
lset: map[string]string{"a": "d"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
},
},
} {
t.Run(tc.title, func(t *testing.T) {
blocks := make([]BlockReader, 0, len(tc.inputSeriesSamples))
@ -982,7 +1038,11 @@ func TestCompaction_populateBlock(t *testing.T) {
iw := &mockIndexWriter{}
blockPopulator := DefaultBlockPopulator{}
err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{})
irPostingsFunc := AllSortedPostings
if tc.irPostingsFunc != nil {
irPostingsFunc = tc.irPostingsFunc
}
err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{}, irPostingsFunc)
if tc.expErr != nil {
require.Error(t, err)
require.Equal(t, tc.expErr.Error(), err.Error())