Include source block in error message when loading chunk fails.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
This commit is contained in:
Peter Štibraný 2022-11-28 09:12:54 +01:00
parent dfa5cd55db
commit af838ccf83
4 changed files with 24 additions and 15 deletions

View file

@ -142,7 +142,7 @@ func TestCorruptedChunk(t *testing.T) {
// Truncate one byte after the segment header. // Truncate one byte after the segment header.
require.NoError(t, f.Truncate(chunks.SegmentHeaderSize+1)) require.NoError(t, f.Truncate(chunks.SegmentHeaderSize+1))
}, },
iterErr: errors.New("cannot populate chunk 8: segment doesn't include enough bytes to read the chunk size data field - required:13, available:9"), iterErr: errors.New("cannot populate chunk 8 from block 00000000000000000000000000: segment doesn't include enough bytes to read the chunk size data field - required:13, available:9"),
}, },
{ {
name: "chunk not enough bytes to read the data", name: "chunk not enough bytes to read the data",
@ -151,7 +151,7 @@ func TestCorruptedChunk(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, f.Truncate(fi.Size()-1)) require.NoError(t, f.Truncate(fi.Size()-1))
}, },
iterErr: errors.New("cannot populate chunk 8: segment doesn't include enough bytes to read the chunk - required:26, available:25"), iterErr: errors.New("cannot populate chunk 8 from block 00000000000000000000000000: segment doesn't include enough bytes to read the chunk - required:26, available:25"),
}, },
{ {
name: "checksum mismatch", name: "checksum mismatch",
@ -169,7 +169,7 @@ func TestCorruptedChunk(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, n, 1) require.Equal(t, n, 1)
}, },
iterErr: errors.New("cannot populate chunk 8: checksum mismatch expected:cfc0526c, actual:34815eae"), iterErr: errors.New("cannot populate chunk 8 from block 00000000000000000000000000: checksum mismatch expected:cfc0526c, actual:34815eae"),
}, },
} { } {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {

View file

@ -727,7 +727,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
all = indexr.SortedPostings(all) all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false)) sets = append(sets, newBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false))
syms := indexr.Symbols() syms := indexr.Symbols()
if i == 0 { if i == 0 {
symbols = syms symbols = syms

View file

@ -19,6 +19,7 @@ import (
"strings" "strings"
"unicode/utf8" "unicode/utf8"
"github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
@ -47,6 +48,7 @@ func init() {
} }
type blockBaseQuerier struct { type blockBaseQuerier struct {
blockID ulid.ULID
index IndexReader index IndexReader
chunks ChunkReader chunks ChunkReader
tombstones tombstones.Reader tombstones tombstones.Reader
@ -77,6 +79,7 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er
tombsr = tombstones.NewMemTombstones() tombsr = tombstones.NewMemTombstones()
} }
return &blockBaseQuerier{ return &blockBaseQuerier{
blockID: b.Meta().ULID,
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
index: indexr, index: indexr,
@ -178,7 +181,7 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints,
if sortSeries { if sortSeries {
p = q.index.SortedPostings(p) p = q.index.SortedPostings(p)
} }
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming) return newBlockChunkSeriesSet(q.blockID, q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
} }
func findSetMatches(pattern string) []string { func findSetMatches(pattern string) []string {
@ -427,6 +430,7 @@ func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]strin
// Iterated series are trimmed with given min and max time as well as tombstones. // Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating. // See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
type blockBaseSeriesSet struct { type blockBaseSeriesSet struct {
blockID ulid.ULID
p index.Postings p index.Postings
index IndexReader index IndexReader
chunks ChunkReader chunks ChunkReader
@ -512,7 +516,7 @@ func (b *blockBaseSeriesSet) Next() bool {
copy(b.currLabels, b.bufLbls) copy(b.currLabels, b.bufLbls)
b.currIterFn = func() *populateWithDelGenericSeriesIterator { b.currIterFn = func() *populateWithDelGenericSeriesIterator {
return newPopulateWithDelGenericSeriesIterator(b.chunks, chks, intervals) return newPopulateWithDelGenericSeriesIterator(b.blockID, b.chunks, chks, intervals)
} }
return true return true
} }
@ -539,6 +543,7 @@ func (b *blockBaseSeriesSet) Warnings() storage.Warnings { return nil }
// means that the chunk iterator in currChkMeta is invalid and a chunk rewrite // means that the chunk iterator in currChkMeta is invalid and a chunk rewrite
// is needed, for which currDelIter should be used. // is needed, for which currDelIter should be used.
type populateWithDelGenericSeriesIterator struct { type populateWithDelGenericSeriesIterator struct {
blockID ulid.ULID
chunks ChunkReader chunks ChunkReader
// chks are expected to be sorted by minTime and should be related to // chks are expected to be sorted by minTime and should be related to
// the same, single series. // the same, single series.
@ -554,11 +559,13 @@ type populateWithDelGenericSeriesIterator struct {
} }
func newPopulateWithDelGenericSeriesIterator( func newPopulateWithDelGenericSeriesIterator(
blockID ulid.ULID,
chunks ChunkReader, chunks ChunkReader,
chks []chunks.Meta, chks []chunks.Meta,
intervals tombstones.Intervals, intervals tombstones.Intervals,
) *populateWithDelGenericSeriesIterator { ) *populateWithDelGenericSeriesIterator {
return &populateWithDelGenericSeriesIterator{ return &populateWithDelGenericSeriesIterator{
blockID: blockID,
chunks: chunks, chunks: chunks,
chks: chks, chks: chks,
i: -1, i: -1,
@ -577,7 +584,7 @@ func (p *populateWithDelGenericSeriesIterator) next() bool {
p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta) p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta)
if p.err != nil { if p.err != nil {
p.err = errors.Wrapf(p.err, "cannot populate chunk %d", p.currChkMeta.Ref) p.err = errors.Wrapf(p.err, "cannot populate chunk %d from block %s", p.currChkMeta.Ref, p.blockID.String())
return false return false
} }
@ -842,9 +849,10 @@ type blockChunkSeriesSet struct {
blockBaseSeriesSet blockBaseSeriesSet
} }
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet { func newBlockChunkSeriesSet(id ulid.ULID, i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet {
return &blockChunkSeriesSet{ return &blockChunkSeriesSet{
blockBaseSeriesSet{ blockBaseSeriesSet{
blockID: id,
index: i, index: i,
chunks: c, chunks: c,
tombstones: t, tombstones: t,

View file

@ -24,6 +24,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -858,7 +859,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
t.Run("sample", func(t *testing.T) { t.Run("sample", func(t *testing.T) {
f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...) f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...)
it := newPopulateWithDelGenericSeriesIterator(f, chkMetas, tc.intervals).toSeriesIterator() it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, tc.intervals).toSeriesIterator()
var r []tsdbutil.Sample var r []tsdbutil.Sample
if tc.seek != 0 { if tc.seek != 0 {
@ -878,7 +879,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
}) })
t.Run("chunk", func(t *testing.T) { t.Run("chunk", func(t *testing.T) {
f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...) f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...)
it := newPopulateWithDelGenericSeriesIterator(f, chkMetas, tc.intervals).toChunkSeriesIterator() it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, tc.intervals).toChunkSeriesIterator()
if tc.seek != 0 { if tc.seek != 0 {
// Chunk iterator does not have Seek method. // Chunk iterator does not have Seek method.
@ -910,7 +911,7 @@ func TestPopulateWithDelSeriesIterator_DoubleSeek(t *testing.T) {
[]tsdbutil.Sample{sample{4, 4, nil, nil}, sample{5, 5, nil, nil}}, []tsdbutil.Sample{sample{4, 4, nil, nil}, sample{5, 5, nil, nil}},
) )
it := newPopulateWithDelGenericSeriesIterator(f, chkMetas, nil).toSeriesIterator() it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator()
require.Equal(t, chunkenc.ValFloat, it.Seek(1)) require.Equal(t, chunkenc.ValFloat, it.Seek(1))
require.Equal(t, chunkenc.ValFloat, it.Seek(2)) require.Equal(t, chunkenc.ValFloat, it.Seek(2))
require.Equal(t, chunkenc.ValFloat, it.Seek(2)) require.Equal(t, chunkenc.ValFloat, it.Seek(2))
@ -928,7 +929,7 @@ func TestPopulateWithDelSeriesIterator_SeekInCurrentChunk(t *testing.T) {
[]tsdbutil.Sample{}, []tsdbutil.Sample{},
) )
it := newPopulateWithDelGenericSeriesIterator(f, chkMetas, nil).toSeriesIterator() it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator()
require.Equal(t, chunkenc.ValFloat, it.Next()) require.Equal(t, chunkenc.ValFloat, it.Next())
ts, v := it.At() ts, v := it.At()
require.Equal(t, int64(1), ts) require.Equal(t, int64(1), ts)
@ -945,7 +946,7 @@ func TestPopulateWithDelSeriesIterator_SeekWithMinTime(t *testing.T) {
[]tsdbutil.Sample{sample{1, 6, nil, nil}, sample{5, 6, nil, nil}, sample{6, 8, nil, nil}}, []tsdbutil.Sample{sample{1, 6, nil, nil}, sample{5, 6, nil, nil}, sample{6, 8, nil, nil}},
) )
it := newPopulateWithDelGenericSeriesIterator(f, chkMetas, nil).toSeriesIterator() it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator()
require.Equal(t, chunkenc.ValNone, it.Seek(7)) require.Equal(t, chunkenc.ValNone, it.Seek(7))
require.Equal(t, chunkenc.ValFloat, it.Seek(3)) require.Equal(t, chunkenc.ValFloat, it.Seek(3))
} }
@ -958,7 +959,7 @@ func TestPopulateWithDelSeriesIterator_NextWithMinTime(t *testing.T) {
) )
it := newPopulateWithDelGenericSeriesIterator( it := newPopulateWithDelGenericSeriesIterator(
f, chkMetas, tombstones.Intervals{{Mint: math.MinInt64, Maxt: 2}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64}), ulid.ULID{}, f, chkMetas, tombstones.Intervals{{Mint: math.MinInt64, Maxt: 2}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64}),
).toSeriesIterator() ).toSeriesIterator()
require.Equal(t, chunkenc.ValNone, it.Next()) require.Equal(t, chunkenc.ValNone, it.Next())
} }