diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 52e20f866..8363c67dc 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -432,7 +432,6 @@ func (cdm *ChunkDiskMapper) flushBuffer() error { } // Chunk returns a chunk from a given reference. -// Note: The returned chunk will turn invalid after closing ChunkDiskMapper. func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) { cdm.readPathMtx.RLock() // We hold this read lock for the entire duration because if the Close() @@ -466,13 +465,25 @@ func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) { mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex] if !ok { if sgmIndex > cdm.curFileSequence { - return nil, errors.Errorf("head chunk file index %d more than current open file", sgmIndex) + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: -1, + Err: errors.Errorf("head chunk file index %d more than current open file", sgmIndex), + } + } + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: errors.New("head chunk file index %d does not exist on disk"), } - return nil, errors.Errorf("head chunk file index %d does not exist on disk", sgmIndex) } if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() { - return nil, errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v, file:%d", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len(), sgmIndex) + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()), + } } // Encoding. @@ -485,27 +496,51 @@ func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) { c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize) chkDataLen, n := binary.Uvarint(c) if n <= 0 { - return nil, errors.Errorf("reading chunk length failed with %d", n) + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: errors.Errorf("reading chunk length failed with %d", n), + } } // Verify the chunk data end. chkDataEnd := chkDataLenStart + n + int(chkDataLen) if chkDataEnd > mmapFile.byteSlice.Len() { - return nil, errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()) + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()), + } } // Check the CRC. sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize) if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil { - return nil, err + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: err, + } } if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { - return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act) + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), + } } // The chunk data itself. chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd) - return cdm.pool.Get(chunkenc.Encoding(chkEnc), chkData) + chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkData) + if err != nil { + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: err, + } + } + return chk, nil } // IterateAllChunks iterates on all the chunks in its byte slices in the order of the head chunk file sequence diff --git a/tsdb/compact.go b/tsdb/compact.go index 9bb04920b..acbdbf644 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -683,19 +683,19 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexr, err := b.Index() if err != nil { - return errors.Wrapf(err, "open index reader for block %s", b) + return errors.Wrapf(err, "open index reader for block %+v", b.Meta()) } closers = append(closers, indexr) chunkr, err := b.Chunks() if err != nil { - return errors.Wrapf(err, "open chunk reader for block %s", b) + return errors.Wrapf(err, "open chunk reader for block %+v", b.Meta()) } closers = append(closers, chunkr) tombsr, err := b.Tombstones() if err != nil { - return errors.Wrapf(err, "open tombstone reader for block %s", b) + return errors.Wrapf(err, "open tombstone reader for block %+v", b.Meta()) } closers = append(closers, tombsr) diff --git a/tsdb/head.go b/tsdb/head.go index 281d763a7..358c3ed40 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -89,6 +89,9 @@ type Head struct { chunkDiskMapper *chunks.ChunkDiskMapper // chunkDirRoot is the parent directory of the chunks directory. chunkDirRoot string + + closedMtx sync.Mutex + closed bool } type headMetrics struct { @@ -921,7 +924,7 @@ func (h *RangeHead) Index() (IndexReader, error) { } func (h *RangeHead) Chunks() (ChunkReader, error) { - return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State()), nil + return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State()) } func (h *RangeHead) Tombstones() (tombstones.Reader, error) { @@ -1360,10 +1363,15 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { // Chunks returns a ChunkReader against the block. func (h *Head) Chunks() (ChunkReader, error) { - return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State()), nil + return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State()) } -func (h *Head) chunksRange(mint, maxt int64, is *isolationState) *headChunkReader { +func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) { + h.closedMtx.Lock() + defer h.closedMtx.Unlock() + if h.closed { + return nil, errors.New("can't read from a closed head") + } if hmin := h.MinTime(); hmin > mint { mint = hmin } @@ -1373,7 +1381,7 @@ func (h *Head) chunksRange(mint, maxt int64, is *isolationState) *headChunkReade maxt: maxt, isoState: is, memChunkPool: &h.memChunkPool, - } + }, nil } // NumSeries returns the number of active series in the head. @@ -1415,6 +1423,9 @@ func (h *Head) compactable() bool { // Close flushes the WAL and closes the head. func (h *Head) Close() error { + h.closedMtx.Lock() + defer h.closedMtx.Unlock() + h.closed = true var merr tsdb_errors.MultiError merr.Add(h.chunkDiskMapper.Close()) if h.wal != nil { @@ -1462,7 +1473,11 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { } s.Lock() - c, garbageCollect := s.chunk(int(cid), h.head.chunkDiskMapper) + c, garbageCollect, err := s.chunk(int(cid), h.head.chunkDiskMapper) + if err != nil { + s.Unlock() + return nil, err + } defer func() { if garbageCollect { // Set this to nil so that Go GC can collect it after it has been used. @@ -1471,9 +1486,8 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { } }() - // This means that the chunk has been garbage collected (or) is outside - // the specified range (or) Head is closing. - if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) { + // This means that the chunk is outside the specified range. + if !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() return nil, storage.ErrNotFound } @@ -1998,31 +2012,33 @@ func (s *memSeries) appendable(t int64, v float64) error { // chunk returns the chunk for the chunk id from memory or by m-mapping it from the disk. // If garbageCollect is true, it means that the returned *memChunk // (and not the chunkenc.Chunk inside it) can be garbage collected after it's usage. -func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool) { +func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix // is len(s.mmappedChunks), it represents the next chunk, which is the head chunk. ix := id - s.firstChunkID if ix < 0 || ix > len(s.mmappedChunks) { - return nil, false + return nil, false, storage.ErrNotFound } if ix == len(s.mmappedChunks) { - return s.headChunk, false + if s.headChunk == nil { + return nil, false, errors.New("invalid head chunk") + } + return s.headChunk, false, nil } chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) if err != nil { - if err == chunks.ErrChunkDiskMapperClosed { - return nil, false + if _, ok := err.(*chunks.CorruptionErr); ok { + panic(err) } - // TODO(codesome): Find a better way to handle this error instead of a panic. - panic(err) + return nil, false, err } mc := s.memChunkPool.Get().(*memChunk) mc.chunk = chk mc.minTime = s.mmappedChunks[ix].minTime mc.maxTime = s.mmappedChunks[ix].maxTime - return mc, true + return mc, true, nil } func (s *memSeries) chunkID(pos int) int { @@ -2127,7 +2143,14 @@ func computeChunkEndTime(start, cur, max int64) int64 { // iterator returns a chunk iterator. // It is unsafe to call this concurrently with s.append(...) without holding the series lock. func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { - c, garbageCollect := s.chunk(id, chunkDiskMapper) + c, garbageCollect, err := s.chunk(id, chunkDiskMapper) + // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a + // series's chunk, which got then garbage collected before it got + // accessed. We must ensure to not garbage collect as long as any + // readers still hold a reference. + if err != nil { + return chunkenc.NewNopIterator() + } defer func() { if garbageCollect { // Set this to nil so that Go GC can collect it after it has been used. @@ -2137,14 +2160,6 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper * } }() - // TODO(fabxc): Work around! A querier may have retrieved a pointer to a - // series's chunk, which got then garbage collected before it got - // accessed. We must ensure to not garbage collect as long as any - // readers still hold a reference. - if c == nil { - return chunkenc.NewNopIterator() - } - ix := id - s.firstChunkID numSamples := c.chunk.NumSamples() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index fd5620bf2..dd51f3620 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -412,19 +412,22 @@ func TestMemSeries_truncateChunks(t *testing.T) { // that the ID of the last chunk still gives us the same chunk afterwards. countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk. lastID := s.chunkID(countBefore - 1) - lastChunk, _ := s.chunk(lastID, chunkDiskMapper) - - chk, _ := s.chunk(0, chunkDiskMapper) - testutil.Assert(t, chk != nil, "") + lastChunk, _, err := s.chunk(lastID, chunkDiskMapper) + testutil.Ok(t, err) testutil.Assert(t, lastChunk != nil, "") + chk, _, err := s.chunk(0, chunkDiskMapper) + testutil.Assert(t, chk != nil, "") + testutil.Ok(t, err) + s.truncateChunksBefore(2000) testutil.Equals(t, int64(2000), s.mmappedChunks[0].minTime) - chk, _ = s.chunk(0, chunkDiskMapper) - testutil.Assert(t, chk == nil, "first chunks not gone") + _, _, err = s.chunk(0, chunkDiskMapper) + testutil.Assert(t, err == storage.ErrNotFound, "first chunks not gone") testutil.Equals(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk. - chk, _ = s.chunk(lastID, chunkDiskMapper) + chk, _, err = s.chunk(lastID, chunkDiskMapper) + testutil.Ok(t, err) testutil.Equals(t, lastChunk, chk) // Validate that the series' sample buffer is applied correctly to the last chunk @@ -1020,8 +1023,9 @@ func TestGCChunkAccess(t *testing.T) { }}, lset) testutil.Equals(t, 2, len(chunks)) - cr := h.chunksRange(0, 1500, nil) - _, err := cr.Chunk(chunks[0].Ref) + cr, err := h.chunksRange(0, 1500, nil) + testutil.Ok(t, err) + _, err = cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) @@ -1074,8 +1078,9 @@ func TestGCSeriesAccess(t *testing.T) { }}, lset) testutil.Equals(t, 2, len(chunks)) - cr := h.chunksRange(0, 2000, nil) - _, err := cr.Chunk(chunks[0].Ref) + cr, err := h.chunksRange(0, 2000, nil) + testutil.Ok(t, err) + _, err = cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) @@ -1415,11 +1420,13 @@ func TestMemSeriesIsolation(t *testing.T) { iso := h.iso.State() iso.maxAppendID = maxAppendID + chunks, err := h.chunksRange(math.MinInt64, math.MaxInt64, iso) + testutil.Ok(t, err) querier := &blockQuerier{ mint: 0, maxt: 10000, index: idx, - chunks: h.chunksRange(math.MinInt64, math.MaxInt64, iso), + chunks: chunks, tombstones: tombstones.NewMemTombstones(), }