More explicit chunks and head error handling. (#7277)

This commit is contained in:
Krasimir Georgiev 2020-05-22 12:03:23 +03:00 committed by GitHub
parent 1c99adb9fd
commit 09df8d94e0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 106 additions and 49 deletions

View file

@ -432,7 +432,6 @@ func (cdm *ChunkDiskMapper) flushBuffer() error {
}
// Chunk returns a chunk from a given reference.
// Note: The returned chunk will turn invalid after closing ChunkDiskMapper.
func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) {
cdm.readPathMtx.RLock()
// We hold this read lock for the entire duration because if the Close()
@ -466,13 +465,25 @@ func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) {
mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex]
if !ok {
if sgmIndex > cdm.curFileSequence {
return nil, errors.Errorf("head chunk file index %d more than current open file", sgmIndex)
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: -1,
Err: errors.Errorf("head chunk file index %d more than current open file", sgmIndex),
}
}
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.New("head chunk file index %d does not exist on disk"),
}
return nil, errors.Errorf("head chunk file index %d does not exist on disk", sgmIndex)
}
if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() {
return nil, errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v, file:%d", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len(), sgmIndex)
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()),
}
}
// Encoding.
@ -485,27 +496,51 @@ func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) {
c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize)
chkDataLen, n := binary.Uvarint(c)
if n <= 0 {
return nil, errors.Errorf("reading chunk length failed with %d", n)
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("reading chunk length failed with %d", n),
}
}
// Verify the chunk data end.
chkDataEnd := chkDataLenStart + n + int(chkDataLen)
if chkDataEnd > mmapFile.byteSlice.Len() {
return nil, errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len())
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()),
}
}
// Check the CRC.
sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize)
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil {
return nil, err
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: err,
}
}
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act)
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
}
}
// The chunk data itself.
chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd)
return cdm.pool.Get(chunkenc.Encoding(chkEnc), chkData)
chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkData)
if err != nil {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: err,
}
}
return chk, nil
}
// IterateAllChunks iterates on all the chunks in its byte slices in the order of the head chunk file sequence

View file

@ -683,19 +683,19 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
indexr, err := b.Index()
if err != nil {
return errors.Wrapf(err, "open index reader for block %s", b)
return errors.Wrapf(err, "open index reader for block %+v", b.Meta())
}
closers = append(closers, indexr)
chunkr, err := b.Chunks()
if err != nil {
return errors.Wrapf(err, "open chunk reader for block %s", b)
return errors.Wrapf(err, "open chunk reader for block %+v", b.Meta())
}
closers = append(closers, chunkr)
tombsr, err := b.Tombstones()
if err != nil {
return errors.Wrapf(err, "open tombstone reader for block %s", b)
return errors.Wrapf(err, "open tombstone reader for block %+v", b.Meta())
}
closers = append(closers, tombsr)

View file

@ -89,6 +89,9 @@ type Head struct {
chunkDiskMapper *chunks.ChunkDiskMapper
// chunkDirRoot is the parent directory of the chunks directory.
chunkDirRoot string
closedMtx sync.Mutex
closed bool
}
type headMetrics struct {
@ -921,7 +924,7 @@ func (h *RangeHead) Index() (IndexReader, error) {
}
func (h *RangeHead) Chunks() (ChunkReader, error) {
return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State()), nil
return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State())
}
func (h *RangeHead) Tombstones() (tombstones.Reader, error) {
@ -1360,10 +1363,15 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
// Chunks returns a ChunkReader against the block.
func (h *Head) Chunks() (ChunkReader, error) {
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State()), nil
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State())
}
func (h *Head) chunksRange(mint, maxt int64, is *isolationState) *headChunkReader {
func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) {
h.closedMtx.Lock()
defer h.closedMtx.Unlock()
if h.closed {
return nil, errors.New("can't read from a closed head")
}
if hmin := h.MinTime(); hmin > mint {
mint = hmin
}
@ -1373,7 +1381,7 @@ func (h *Head) chunksRange(mint, maxt int64, is *isolationState) *headChunkReade
maxt: maxt,
isoState: is,
memChunkPool: &h.memChunkPool,
}
}, nil
}
// NumSeries returns the number of active series in the head.
@ -1415,6 +1423,9 @@ func (h *Head) compactable() bool {
// Close flushes the WAL and closes the head.
func (h *Head) Close() error {
h.closedMtx.Lock()
defer h.closedMtx.Unlock()
h.closed = true
var merr tsdb_errors.MultiError
merr.Add(h.chunkDiskMapper.Close())
if h.wal != nil {
@ -1462,7 +1473,11 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
}
s.Lock()
c, garbageCollect := s.chunk(int(cid), h.head.chunkDiskMapper)
c, garbageCollect, err := s.chunk(int(cid), h.head.chunkDiskMapper)
if err != nil {
s.Unlock()
return nil, err
}
defer func() {
if garbageCollect {
// Set this to nil so that Go GC can collect it after it has been used.
@ -1471,9 +1486,8 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
}
}()
// This means that the chunk has been garbage collected (or) is outside
// the specified range (or) Head is closing.
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
// This means that the chunk is outside the specified range.
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock()
return nil, storage.ErrNotFound
}
@ -1998,31 +2012,33 @@ func (s *memSeries) appendable(t int64, v float64) error {
// chunk returns the chunk for the chunk id from memory or by m-mapping it from the disk.
// If garbageCollect is true, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after it's usage.
func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool) {
func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk.
ix := id - s.firstChunkID
if ix < 0 || ix > len(s.mmappedChunks) {
return nil, false
return nil, false, storage.ErrNotFound
}
if ix == len(s.mmappedChunks) {
return s.headChunk, false
if s.headChunk == nil {
return nil, false, errors.New("invalid head chunk")
}
return s.headChunk, false, nil
}
chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref)
if err != nil {
if err == chunks.ErrChunkDiskMapperClosed {
return nil, false
if _, ok := err.(*chunks.CorruptionErr); ok {
panic(err)
}
// TODO(codesome): Find a better way to handle this error instead of a panic.
panic(err)
return nil, false, err
}
mc := s.memChunkPool.Get().(*memChunk)
mc.chunk = chk
mc.minTime = s.mmappedChunks[ix].minTime
mc.maxTime = s.mmappedChunks[ix].maxTime
return mc, true
return mc, true, nil
}
func (s *memSeries) chunkID(pos int) int {
@ -2127,7 +2143,14 @@ func computeChunkEndTime(start, cur, max int64) int64 {
// iterator returns a chunk iterator.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
c, garbageCollect := s.chunk(id, chunkDiskMapper)
c, garbageCollect, err := s.chunk(id, chunkDiskMapper)
// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
// accessed. We must ensure to not garbage collect as long as any
// readers still hold a reference.
if err != nil {
return chunkenc.NewNopIterator()
}
defer func() {
if garbageCollect {
// Set this to nil so that Go GC can collect it after it has been used.
@ -2137,14 +2160,6 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *
}
}()
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
// accessed. We must ensure to not garbage collect as long as any
// readers still hold a reference.
if c == nil {
return chunkenc.NewNopIterator()
}
ix := id - s.firstChunkID
numSamples := c.chunk.NumSamples()

View file

@ -412,19 +412,22 @@ func TestMemSeries_truncateChunks(t *testing.T) {
// that the ID of the last chunk still gives us the same chunk afterwards.
countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk.
lastID := s.chunkID(countBefore - 1)
lastChunk, _ := s.chunk(lastID, chunkDiskMapper)
chk, _ := s.chunk(0, chunkDiskMapper)
testutil.Assert(t, chk != nil, "")
lastChunk, _, err := s.chunk(lastID, chunkDiskMapper)
testutil.Ok(t, err)
testutil.Assert(t, lastChunk != nil, "")
chk, _, err := s.chunk(0, chunkDiskMapper)
testutil.Assert(t, chk != nil, "")
testutil.Ok(t, err)
s.truncateChunksBefore(2000)
testutil.Equals(t, int64(2000), s.mmappedChunks[0].minTime)
chk, _ = s.chunk(0, chunkDiskMapper)
testutil.Assert(t, chk == nil, "first chunks not gone")
_, _, err = s.chunk(0, chunkDiskMapper)
testutil.Assert(t, err == storage.ErrNotFound, "first chunks not gone")
testutil.Equals(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk.
chk, _ = s.chunk(lastID, chunkDiskMapper)
chk, _, err = s.chunk(lastID, chunkDiskMapper)
testutil.Ok(t, err)
testutil.Equals(t, lastChunk, chk)
// Validate that the series' sample buffer is applied correctly to the last chunk
@ -1020,8 +1023,9 @@ func TestGCChunkAccess(t *testing.T) {
}}, lset)
testutil.Equals(t, 2, len(chunks))
cr := h.chunksRange(0, 1500, nil)
_, err := cr.Chunk(chunks[0].Ref)
cr, err := h.chunksRange(0, 1500, nil)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[0].Ref)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Ok(t, err)
@ -1074,8 +1078,9 @@ func TestGCSeriesAccess(t *testing.T) {
}}, lset)
testutil.Equals(t, 2, len(chunks))
cr := h.chunksRange(0, 2000, nil)
_, err := cr.Chunk(chunks[0].Ref)
cr, err := h.chunksRange(0, 2000, nil)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[0].Ref)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Ok(t, err)
@ -1415,11 +1420,13 @@ func TestMemSeriesIsolation(t *testing.T) {
iso := h.iso.State()
iso.maxAppendID = maxAppendID
chunks, err := h.chunksRange(math.MinInt64, math.MaxInt64, iso)
testutil.Ok(t, err)
querier := &blockQuerier{
mint: 0,
maxt: 10000,
index: idx,
chunks: h.chunksRange(math.MinInt64, math.MaxInt64, iso),
chunks: chunks,
tombstones: tombstones.NewMemTombstones(),
}