From 140f4aa9aed6674c582dc603431a4aaa0c702cb7 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 10 Nov 2024 22:59:24 -0800 Subject: [PATCH] feat: Allow customizing TSDB postings decoder (#13567) * allow customizing TSDB postings decoder --------- Signed-off-by: Ben Ye --- cmd/promtool/tsdb.go | 2 +- tsdb/block.go | 8 ++++++-- tsdb/block_test.go | 26 +++++++++++++------------- tsdb/blockwriter_test.go | 2 +- tsdb/compact.go | 13 ++++++++++++- tsdb/compact_test.go | 6 +++--- tsdb/db.go | 18 ++++++++++++------ tsdb/db_test.go | 12 ++++++------ tsdb/index/index.go | 35 ++++++++++++++++------------------- tsdb/index/index_test.go | 15 ++++++++------- tsdb/querier_bench_test.go | 4 ++-- tsdb/querier_test.go | 10 +++++----- tsdb/repair_test.go | 4 ++-- 13 files changed, 87 insertions(+), 68 deletions(-) diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 847ea6be0c..8a5a728875 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -405,7 +405,7 @@ func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) } } - b, err := db.Block(blockID) + b, err := db.Block(blockID, tsdb.DefaultPostingsDecoderFactory) if err != nil { return nil, nil, err } diff --git a/tsdb/block.go b/tsdb/block.go index 48ba4588aa..aec99788c9 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -330,7 +330,7 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(logger *slog.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { +func OpenBlock(logger *slog.Logger, dir string, pool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory) (pb *Block, err error) { if logger == nil { logger = promslog.NewNopLogger() } @@ -351,7 +351,11 @@ func OpenBlock(logger *slog.Logger, dir string, pool chunkenc.Pool) (pb *Block, } closers = append(closers, cr) - ir, err := index.NewFileReader(filepath.Join(dir, indexFilename)) + decoder := index.DecodePostingsRaw + if postingsDecoderFactory != nil { + decoder = postingsDecoderFactory(meta) + } + ir, err := index.NewFileReader(filepath.Join(dir, indexFilename), decoder) if err != nil { return nil, err } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 3589b42c17..3e25cff3da 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -59,14 +59,14 @@ func TestSetCompactionFailed(t *testing.T) { tmpdir := t.TempDir() blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1)) - b, err := OpenBlock(nil, blockDir, nil) + b, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) require.False(t, b.meta.Compaction.Failed) require.NoError(t, b.setCompactionFailed()) require.True(t, b.meta.Compaction.Failed) require.NoError(t, b.Close()) - b, err = OpenBlock(nil, blockDir, nil) + b, err = OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) require.True(t, b.meta.Compaction.Failed) require.NoError(t, b.Close()) @@ -74,7 +74,7 @@ func TestSetCompactionFailed(t *testing.T) { func TestCreateBlock(t *testing.T) { tmpdir := t.TempDir() - b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil) + b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil, nil) require.NoError(t, err) require.NoError(t, b.Close()) } @@ -84,7 +84,7 @@ func BenchmarkOpenBlock(b *testing.B) { blockDir := createBlock(b, tmpdir, genSeries(1e6, 20, 0, 10)) b.Run("benchmark", func(b *testing.B) { for i := 0; i < b.N; i++ { - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(b, err) require.NoError(b, block.Close()) } @@ -190,7 +190,7 @@ func TestCorruptedChunk(t *testing.T) { require.NoError(t, f.Close()) // Check open err. - b, err := OpenBlock(nil, blockDir, nil) + b, err := OpenBlock(nil, blockDir, nil, nil) if tc.openErr != nil { require.EqualError(t, err, tc.openErr.Error()) return @@ -245,7 +245,7 @@ func TestLabelValuesWithMatchers(t *testing.T) { require.NotEmpty(t, files, "No chunk created.") // Check open err. - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, block.Close()) }() @@ -325,7 +325,7 @@ func TestBlockQuerierReturnsSortedLabelValues(t *testing.T) { blockDir := createBlock(t, tmpdir, seriesEntries) // Check open err. - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, block.Close()) }) @@ -352,7 +352,7 @@ func TestBlockSize(t *testing.T) { // Create a block and compare the reported size vs actual disk size. { blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100)) - blockInit, err = OpenBlock(nil, blockDirInit, nil) + blockInit, err = OpenBlock(nil, blockDirInit, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, blockInit.Close()) @@ -377,7 +377,7 @@ func TestBlockSize(t *testing.T) { blockDirsAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) require.NoError(t, err) require.Len(t, blockDirsAfterCompact, 1) - blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirsAfterCompact[0].String()), nil) + blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirsAfterCompact[0].String()), nil, nil) require.NoError(t, err) defer func() { require.NoError(t, blockAfterCompact.Close()) @@ -408,7 +408,7 @@ func TestReadIndexFormatV1(t *testing.T) { */ blockDir := filepath.Join("testdata", "index_format_v1") - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) q, err := NewBlockQuerier(block, 0, 1000) @@ -445,7 +445,7 @@ func BenchmarkLabelValuesWithMatchers(b *testing.B) { require.NotEmpty(b, files, "No chunk created.") // Check open err. - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(b, err) defer func() { require.NoError(b, block.Close()) }() @@ -497,7 +497,7 @@ func TestLabelNamesWithMatchers(t *testing.T) { require.NotEmpty(t, files, "No chunk created.") // Check open err. - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, block.Close()) }) @@ -551,7 +551,7 @@ func TestBlockIndexReader_PostingsForLabelMatching(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, files, "No chunk created.") - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, block.Close()) }) diff --git a/tsdb/blockwriter_test.go b/tsdb/blockwriter_test.go index 4ec25df70a..2704b53566 100644 --- a/tsdb/blockwriter_test.go +++ b/tsdb/blockwriter_test.go @@ -47,7 +47,7 @@ func TestBlockWriter(t *testing.T) { // Confirm the block has the correct data. blockpath := filepath.Join(outputDir, id.String()) - b, err := OpenBlock(nil, blockpath, nil) + b, err := OpenBlock(nil, blockpath, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, b.Close()) }() q, err := NewBlockQuerier(b, math.MinInt64, math.MaxInt64) diff --git a/tsdb/compact.go b/tsdb/compact.go index 17374531d4..74fde4c783 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -87,6 +87,7 @@ type LeveledCompactor struct { maxBlockChunkSegmentSize int64 mergeFunc storage.VerticalChunkSeriesMergeFunc postingsEncoder index.PostingsEncoder + postingsDecoderFactory PostingsDecoderFactory enableOverlappingCompaction bool } @@ -158,6 +159,9 @@ type LeveledCompactorOptions struct { // PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction. // If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more. PE index.PostingsEncoder + // PD specifies the postings decoder factory to return different postings decoder based on BlockMeta. It is called when opening a block or opening the index file. + // If it is nil then a default decoder is used, compatible with Prometheus v2. + PD PostingsDecoderFactory // MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used. MaxBlockChunkSegmentSize int64 // MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used. @@ -167,6 +171,12 @@ type LeveledCompactorOptions struct { EnableOverlappingCompaction bool } +type PostingsDecoderFactory func(meta *BlockMeta) index.PostingsDecoder + +func DefaultPostingsDecoderFactory(_ *BlockMeta) index.PostingsDecoder { + return index.DecodePostingsRaw +} + func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, @@ -213,6 +223,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, mergeFunc: mergeFunc, postingsEncoder: pe, + postingsDecoderFactory: opts.PD, enableOverlappingCompaction: opts.EnableOverlappingCompaction, }, nil } @@ -477,7 +488,7 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, if b == nil { var err error - b, err = OpenBlock(c.logger, d, c.chunkPool) + b, err = OpenBlock(c.logger, d, c.chunkPool, c.postingsDecoderFactory) if err != nil { return nil, err } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 5123d6e624..310fe8f25a 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1153,7 +1153,7 @@ func BenchmarkCompaction(b *testing.B) { blockDirs := make([]string, 0, len(c.ranges)) var blocks []*Block for _, r := range c.ranges { - block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil) + block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil, nil) require.NoError(b, err) blocks = append(blocks, block) defer func() { @@ -1549,7 +1549,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) { require.Len(t, ids, 1) // Open the block and query it and check the histograms. - block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, ids[0].String()), nil) + block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, ids[0].String()), nil, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, block.Close()) @@ -1911,7 +1911,7 @@ func TestCompactEmptyResultBlockWithTombstone(t *testing.T) { ctx := context.Background() tmpdir := t.TempDir() blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 10)) - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) // Write tombstone covering the whole block. err = block.Delete(ctx, 0, 10, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "0")) diff --git a/tsdb/db.go b/tsdb/db.go index ab919c3100..2cfa4f3638 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -91,6 +91,7 @@ func DefaultOptions() *Options { EnableDelayedCompaction: false, CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent, CompactionDelay: time.Duration(0), + PostingsDecoderFactory: DefaultPostingsDecoderFactory, } } @@ -219,6 +220,10 @@ type Options struct { // BlockChunkQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader. BlockChunkQuerierFunc BlockChunkQuerierFunc + + // PostingsDecoderFactory allows users to customize postings decoders based on BlockMeta. + // By default, DefaultPostingsDecoderFactory will be used to create raw posting decoder. + PostingsDecoderFactory PostingsDecoderFactory } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) @@ -633,7 +638,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { return nil, ErrClosed default: } - loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) + loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, DefaultPostingsDecoderFactory) if err != nil { return nil, err } @@ -731,7 +736,7 @@ func (db *DBReadOnly) LastBlockID() (string, error) { } // Block returns a block reader by given block id. -func (db *DBReadOnly) Block(blockID string) (BlockReader, error) { +func (db *DBReadOnly) Block(blockID string, postingsDecoderFactory PostingsDecoderFactory) (BlockReader, error) { select { case <-db.closed: return nil, ErrClosed @@ -743,7 +748,7 @@ func (db *DBReadOnly) Block(blockID string) (BlockReader, error) { return nil, fmt.Errorf("invalid block ID %s", blockID) } - block, err := OpenBlock(db.logger, filepath.Join(db.dir, blockID), nil) + block, err := OpenBlock(db.logger, filepath.Join(db.dir, blockID), nil, postingsDecoderFactory) if err != nil { return nil, err } @@ -902,6 +907,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{ MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + PD: opts.PostingsDecoderFactory, }) } if err != nil { @@ -1568,7 +1574,7 @@ func (db *DB) reloadBlocks() (err error) { db.mtx.Lock() defer db.mtx.Unlock() - loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool) + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory) if err != nil { return err } @@ -1663,7 +1669,7 @@ func (db *DB) reloadBlocks() (err error) { return nil } -func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { +func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { bDirs, err := blockDirs(dir) if err != nil { return nil, nil, fmt.Errorf("find blocks: %w", err) @@ -1680,7 +1686,7 @@ func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc. // See if we already have the block in memory or open it otherwise. block, open := getBlock(loaded, meta.ULID) if !open { - block, err = OpenBlock(l, bDir, chunkPool) + block, err = OpenBlock(l, bDir, chunkPool, postingsDecoderFactory) if err != nil { corrupted[meta.ULID] = err continue diff --git a/tsdb/db_test.go b/tsdb/db_test.go index bfdf7aa4ad..f851f2b911 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1320,7 +1320,7 @@ func TestTombstoneCleanFail(t *testing.T) { totalBlocks := 2 for i := 0; i < totalBlocks; i++ { blockDir := createBlock(t, db.Dir(), genSeries(1, 1, int64(i), int64(i)+1)) - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) // Add some fake tombstones to trigger the compaction. tomb := tombstones.NewMemTombstones() @@ -1375,7 +1375,7 @@ func TestTombstoneCleanRetentionLimitsRace(t *testing.T) { // Generate some blocks with old mint (near epoch). for j := 0; j < totalBlocks; j++ { blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1)) - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) // Cover block with tombstones so it can be deleted with CleanTombstones() as well. tomb := tombstones.NewMemTombstones() @@ -1436,7 +1436,7 @@ func (c *mockCompactorFailing) Write(dest string, _ BlockReader, _, _ int64, _ * return []ulid.ULID{}, errors.New("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil, nil) require.NoError(c.t, err) require.NoError(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) @@ -2509,13 +2509,13 @@ func TestDBReadOnly(t *testing.T) { }) t.Run("block", func(t *testing.T) { blockID := expBlock.meta.ULID.String() - block, err := dbReadOnly.Block(blockID) + block, err := dbReadOnly.Block(blockID, nil) require.NoError(t, err) require.Equal(t, expBlock.Meta(), block.Meta(), "block meta mismatch") }) t.Run("invalid block ID", func(t *testing.T) { blockID := "01GTDVZZF52NSWB5SXQF0P2PGF" - _, err := dbReadOnly.Block(blockID) + _, err := dbReadOnly.Block(blockID, nil) require.Error(t, err) }) t.Run("last block ID", func(t *testing.T) { @@ -8851,7 +8851,7 @@ func TestBlockQuerierAndBlockChunkQuerier(t *testing.T) { // Include blockID into series to identify which block got touched. serieses := []storage.Series{storage.NewListSeries(labels.FromMap(map[string]string{"block": fmt.Sprintf("block-%d", i), labels.MetricName: "test_metric"}), []chunks.Sample{sample{t: 0, f: 1}})} blockDir := createBlock(t, db.Dir(), serieses) - b, err := OpenBlock(db.logger, blockDir, db.chunkPool) + b, err := OpenBlock(db.logger, blockDir, db.chunkPool, nil) require.NoError(t, err) // Overwrite meta.json with compaction section for testing purpose. diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 8c0f698eae..4e199c5bd2 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -118,6 +118,8 @@ type symbolCacheEntry struct { type PostingsEncoder func(*encoding.Encbuf, []uint32) error +type PostingsDecoder func(encoding.Decbuf) (int, Postings, error) + // Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { @@ -1157,17 +1159,17 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { // NewReader returns a new index reader on the given byte slice. It automatically // handles different format versions. -func NewReader(b ByteSlice) (*Reader, error) { - return newReader(b, io.NopCloser(nil)) +func NewReader(b ByteSlice, decoder PostingsDecoder) (*Reader, error) { + return newReader(b, io.NopCloser(nil), decoder) } // NewFileReader returns a new index reader against the given index file. -func NewFileReader(path string) (*Reader, error) { +func NewFileReader(path string, decoder PostingsDecoder) (*Reader, error) { f, err := fileutil.OpenMmapFile(path) if err != nil { return nil, err } - r, err := newReader(realByteSlice(f.Bytes()), f) + r, err := newReader(realByteSlice(f.Bytes()), f, decoder) if err != nil { return nil, tsdb_errors.NewMulti( err, @@ -1178,7 +1180,7 @@ func NewFileReader(path string) (*Reader, error) { return r, nil } -func newReader(b ByteSlice, c io.Closer) (*Reader, error) { +func newReader(b ByteSlice, c io.Closer, postingsDecoder PostingsDecoder) (*Reader, error) { r := &Reader{ b: b, c: c, @@ -1277,7 +1279,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { r.nameSymbols[off] = k } - r.dec = &Decoder{LookupSymbol: r.lookupSymbol} + r.dec = &Decoder{LookupSymbol: r.lookupSymbol, DecodePostings: postingsDecoder} return r, nil } @@ -1706,7 +1708,7 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P } // Read from the postings table. d := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) - _, p, err := r.dec.Postings(d.Get()) + _, p, err := r.dec.DecodePostings(d) if err != nil { return nil, fmt.Errorf("decode postings: %w", err) } @@ -1749,7 +1751,7 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P if val == value { // Read from the postings table. d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) - _, p, err := r.dec.Postings(d2.Get()) + _, p, err := r.dec.DecodePostings(d2) if err != nil { return false, fmt.Errorf("decode postings: %w", err) } @@ -1790,7 +1792,7 @@ func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, matc if match(val) { // We want this postings iterator since the value is a match postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) - _, p, err := r.dec.PostingsFromDecbuf(postingsDec) + _, p, err := r.dec.DecodePostings(postingsDec) if err != nil { return false, fmt.Errorf("decode postings: %w", err) } @@ -1823,7 +1825,7 @@ func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, ma // Read from the postings table. d := encoding.NewDecbufAt(r.b, int(offset), castagnoliTable) - _, p, err := r.dec.PostingsFromDecbuf(d) + _, p, err := r.dec.DecodePostings(d) if err != nil { return ErrPostings(fmt.Errorf("decode postings: %w", err)) } @@ -1918,17 +1920,12 @@ func (s stringListIter) Err() error { return nil } // It currently does not contain decoding methods for all entry types but can be extended // by them if there's demand. type Decoder struct { - LookupSymbol func(context.Context, uint32) (string, error) + LookupSymbol func(context.Context, uint32) (string, error) + DecodePostings PostingsDecoder } -// Postings returns a postings list for b and its number of elements. -func (dec *Decoder) Postings(b []byte) (int, Postings, error) { - d := encoding.Decbuf{B: b} - return dec.PostingsFromDecbuf(d) -} - -// PostingsFromDecbuf returns a postings list for d and its number of elements. -func (dec *Decoder) PostingsFromDecbuf(d encoding.Decbuf) (int, Postings, error) { +// DecodePostingsRaw returns a postings list for d and its number of elements. +func DecodePostingsRaw(d encoding.Decbuf) (int, Postings, error) { n := d.Be32int() l := d.Get() if d.Err() != nil { diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index d81dd8696c..0d330cfcb9 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -146,7 +146,7 @@ func TestIndexRW_Create_Open(t *testing.T) { require.NoError(t, err) require.NoError(t, iw.Close()) - ir, err := NewFileReader(fn) + ir, err := NewFileReader(fn, DecodePostingsRaw) require.NoError(t, err) require.NoError(t, ir.Close()) @@ -157,7 +157,7 @@ func TestIndexRW_Create_Open(t *testing.T) { require.NoError(t, err) f.Close() - _, err = NewFileReader(dir) + _, err = NewFileReader(dir, DecodePostingsRaw) require.Error(t, err) } @@ -218,7 +218,7 @@ func TestIndexRW_Postings(t *testing.T) { }, labelIndices) t.Run("ShardedPostings()", func(t *testing.T) { - ir, err := NewFileReader(fn) + ir, err := NewFileReader(fn, DecodePostingsRaw) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, ir.Close()) @@ -469,7 +469,7 @@ func TestDecbufUvarintWithInvalidBuffer(t *testing.T) { func TestReaderWithInvalidBuffer(t *testing.T) { b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) - _, err := NewReader(b) + _, err := NewReader(b, DecodePostingsRaw) require.Error(t, err) } @@ -481,7 +481,7 @@ func TestNewFileReaderErrorNoOpenFiles(t *testing.T) { err := os.WriteFile(idxName, []byte("corrupted contents"), 0o666) require.NoError(t, err) - _, err = NewFileReader(idxName) + _, err = NewFileReader(idxName, DecodePostingsRaw) require.Error(t, err) // dir.Close will fail on Win if idxName fd is not closed on error path. @@ -560,7 +560,8 @@ func BenchmarkReader_ShardedPostings(b *testing.B) { } func TestDecoder_Postings_WrongInput(t *testing.T) { - _, _, err := (&Decoder{}).Postings([]byte("the cake is a lie")) + d := encoding.Decbuf{B: []byte("the cake is a lie")} + _, _, err := (&Decoder{DecodePostings: DecodePostingsRaw}).DecodePostings(d) require.Error(t, err) } @@ -690,7 +691,7 @@ func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSerie } require.NoError(tb, iw.Close()) - ir, err := NewFileReader(fn) + ir, err := NewFileReader(fn, DecodePostingsRaw) require.NoError(tb, err) tb.Cleanup(func() { require.NoError(tb, ir.Close()) diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 33dca1284d..8b8f092a81 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -75,7 +75,7 @@ func BenchmarkQuerier(b *testing.B) { b.Run("Block", func(b *testing.B) { blockdir := createBlockFromHead(b, b.TempDir(), h) - block, err := OpenBlock(nil, blockdir, nil) + block, err := OpenBlock(nil, blockdir, nil, nil) require.NoError(b, err) defer func() { require.NoError(b, block.Close()) @@ -315,7 +315,7 @@ func BenchmarkQuerierSelect(b *testing.B) { tmpdir := b.TempDir() blockdir := createBlockFromHead(b, tmpdir, h) - block, err := OpenBlock(nil, blockdir, nil) + block, err := OpenBlock(nil, blockdir, nil, nil) require.NoError(b, err) defer func() { require.NoError(b, block.Close()) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 2d66102bfe..e97aea2fde 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2443,7 +2443,7 @@ func BenchmarkQueryIterator(b *testing.B) { } else { generatedSeries = populateSeries(prefilledLabels, mint, maxt) } - block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) + block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil) require.NoError(b, err) blocks = append(blocks, block) defer block.Close() @@ -2506,7 +2506,7 @@ func BenchmarkQuerySeek(b *testing.B) { } else { generatedSeries = populateSeries(prefilledLabels, mint, maxt) } - block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) + block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil) require.NoError(b, err) blocks = append(blocks, block) defer block.Close() @@ -2641,7 +2641,7 @@ func BenchmarkSetMatcher(b *testing.B) { } else { generatedSeries = populateSeries(prefilledLabels, mint, maxt) } - block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) + block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil) require.NoError(b, err) blocks = append(blocks, block) defer block.Close() @@ -3209,7 +3209,7 @@ func BenchmarkQueries(b *testing.B) { qs := make([]storage.Querier, 0, 10) for x := 0; x <= 10; x++ { - block, err := OpenBlock(nil, createBlock(b, dir, series), nil) + block, err := OpenBlock(nil, createBlock(b, dir, series), nil, nil) require.NoError(b, err) q, err := NewBlockQuerier(block, 1, nSamples) require.NoError(b, err) @@ -3792,7 +3792,7 @@ func (m mockReaderOfLabels) Symbols() index.StringIter { // https://github.com/prometheus/prometheus/issues/14723, when one of the queriers (blockQuerier in this case) // alters the passed matchers. func TestMergeQuerierConcurrentSelectMatchers(t *testing.T) { - block, err := OpenBlock(nil, createBlock(t, t.TempDir(), genSeries(1, 1, 0, 1)), nil) + block, err := OpenBlock(nil, createBlock(t, t.TempDir(), genSeries(1, 1, 0, 1)), nil, nil) require.NoError(t, err) defer func() { require.NoError(t, block.Close()) diff --git a/tsdb/repair_test.go b/tsdb/repair_test.go index 8a70e05f31..8a192c4f78 100644 --- a/tsdb/repair_test.go +++ b/tsdb/repair_test.go @@ -79,7 +79,7 @@ func TestRepairBadIndexVersion(t *testing.T) { require.NoError(t, os.MkdirAll(filepath.Join(tmpDbDir, "chunks"), 0o777)) // Read current index to check integrity. - r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) + r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename), index.DecodePostingsRaw) require.NoError(t, err) p, err := r.Postings(ctx, "b", "1") require.NoError(t, err) @@ -97,7 +97,7 @@ func TestRepairBadIndexVersion(t *testing.T) { require.NoError(t, err) db.Close() - r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) + r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename), index.DecodePostingsRaw) require.NoError(t, err) defer r.Close() p, err = r.Postings(ctx, "b", "1")