return err instead of panic for corrupted chunk (#6040)

* Fix tsdb panic when querying corrupted chunks.
check that the chunk segment has enough data to read all chunk pieces.
* refactor, simplify and add tests.
* simpfiy WriteChunks implementation

Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com>
This commit is contained in:
Krasimir Georgiev 2019-12-04 09:37:49 +02:00 committed by GitHub
parent 5000c05378
commit 549164f252
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 408 additions and 50 deletions

View file

@ -18,6 +18,7 @@ import (
"encoding/binary"
"errors"
"hash/crc32"
"io/ioutil"
"math/rand"
"os"
@ -88,14 +89,16 @@ func TestCreateBlock(t *testing.T) {
func TestCorruptedChunk(t *testing.T) {
for name, test := range map[string]struct {
corrFunc func(f *os.File) // Func that applies the corruption.
expErr error
openErr error
queryErr error
}{
"invalid header size": {
func(f *os.File) {
err := f.Truncate(1)
testutil.Ok(t, err)
},
errors.New("invalid chunk header in segment 0: invalid size"),
errors.New("invalid segment header in segment 0: invalid size"),
nil,
},
"invalid magic number": {
func(f *os.File) {
@ -111,6 +114,7 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Equals(t, chunks.MagicChunksSize, n)
},
errors.New("invalid magic number 0"),
nil,
},
"invalid chunk format version": {
func(f *os.File) {
@ -126,6 +130,45 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Equals(t, chunks.ChunksFormatVersionSize, n)
},
errors.New("invalid chunk format version 0"),
nil,
},
"chunk not enough bytes to read the chunk length": {
func(f *os.File) {
// Truncate one byte after the segment header.
err := f.Truncate(chunks.SegmentHeaderSize + 1)
testutil.Ok(t, err)
},
nil,
errors.New("segment doesn't include enough bytes to read the chunk size data field - required:13, available:9"),
},
"chunk not enough bytes to read the data": {
func(f *os.File) {
fi, err := f.Stat()
testutil.Ok(t, err)
err = f.Truncate(fi.Size() - 1)
testutil.Ok(t, err)
},
nil,
errors.New("segment doesn't include enough bytes to read the chunk - required:26, available:25"),
},
"checksum mismatch": {
func(f *os.File) {
fi, err := f.Stat()
testutil.Ok(t, err)
// Get the chunk data end offset.
chkEndOffset := int(fi.Size()) - crc32.Size
// Seek to the last byte of chunk data and modify it.
_, err = f.Seek(int64(chkEndOffset-1), 0)
testutil.Ok(t, err)
n, err := f.Write([]byte("x"))
testutil.Ok(t, err)
testutil.Equals(t, n, 1)
},
nil,
errors.New("checksum mismatch expected:cfc0526c, actual:34815eae"),
},
} {
t.Run(name, func(t *testing.T) {
@ -135,7 +178,8 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
series := newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{sample{1, 1}})
blockDir := createBlock(t, tmpdir, []Series{series})
files, err := sequenceFiles(chunkDir(blockDir))
testutil.Ok(t, err)
testutil.Assert(t, len(files) > 0, "No chunk created.")
@ -147,8 +191,21 @@ func TestCorruptedChunk(t *testing.T) {
test.corrFunc(f)
testutil.Ok(t, f.Close())
_, err = OpenBlock(nil, blockDir, nil)
testutil.Equals(t, test.expErr.Error(), err.Error())
// Check open err.
b, err := OpenBlock(nil, blockDir, nil)
if test.openErr != nil {
testutil.Equals(t, test.openErr.Error(), err.Error())
return
}
querier, err := NewBlockQuerier(b, 0, 1)
testutil.Ok(t, err)
set, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err)
// Check query err.
testutil.Equals(t, false, set.Next())
testutil.Equals(t, test.queryErr.Error(), set.Err().Error())
})
}
}

View file

@ -15,6 +15,7 @@ package chunks
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"hash"
@ -31,22 +32,32 @@ import (
"github.com/prometheus/prometheus/tsdb/fileutil"
)
// Segment header fields constants.
const (
// MagicChunks is 4 bytes at the head of a series file.
MagicChunks = 0x85BD40DD
// MagicChunksSize is the size in bytes of MagicChunks.
MagicChunksSize = 4
MagicChunksSize = 4
chunksFormatV1 = 1
ChunksFormatVersionSize = 1
segmentHeaderPaddingSize = 3
// SegmentHeaderSize defines the total size of the header part.
SegmentHeaderSize = MagicChunksSize + ChunksFormatVersionSize + segmentHeaderPaddingSize
)
chunksFormatV1 = 1
ChunksFormatVersionSize = 1
chunkHeaderSize = MagicChunksSize + ChunksFormatVersionSize
// Chunk fields constants.
const (
// MaxChunkLengthFieldSize defines the maximum size of the data length part.
MaxChunkLengthFieldSize = binary.MaxVarintLen32
// ChunkEncodingSize defines the size of the chunk encoding part.
ChunkEncodingSize = 1
)
// Meta holds information about a chunk of data.
type Meta struct {
// Ref and Chunk hold either a reference that can be used to retrieve
// chunk data or the data itself.
// When it is a reference it is the segment offset at which the chunk bytes start.
// Generally, only one of them is set.
Ref uint64
Chunk chunkenc.Chunk
@ -104,11 +115,27 @@ type Writer struct {
}
const (
defaultChunkSegmentSize = 512 * 1024 * 1024
// DefaultChunkSegmentSize is the default chunks segment size.
DefaultChunkSegmentSize = 512 * 1024 * 1024
)
// NewWriter returns a new writer against the given directory.
// NewWriterWithSegSize returns a new writer against the given directory
// and allows setting a custom size for the segments.
func NewWriterWithSegSize(dir string, segmentSize int64) (*Writer, error) {
return newWriter(dir, segmentSize)
}
// NewWriter returns a new writer against the given directory
// using the default segment size.
func NewWriter(dir string) (*Writer, error) {
return newWriter(dir, DefaultChunkSegmentSize)
}
func newWriter(dir string, segmentSize int64) (*Writer, error) {
if segmentSize <= 0 {
segmentSize = DefaultChunkSegmentSize
}
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
@ -116,13 +143,12 @@ func NewWriter(dir string) (*Writer, error) {
if err != nil {
return nil, err
}
cw := &Writer{
return &Writer{
dirFile: dirFile,
n: 0,
crc32: newCRC32(),
segmentSize: defaultChunkSegmentSize,
}
return cw, nil
segmentSize: segmentSize,
}, nil
}
func (w *Writer) tail() *os.File {
@ -180,13 +206,15 @@ func (w *Writer) cut() error {
}
// Write header metadata for new file.
metab := make([]byte, 8)
metab := make([]byte, SegmentHeaderSize)
binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks)
metab[4] = chunksFormatV1
if _, err := f.Write(metab); err != nil {
n, err := f.Write(metab)
if err != nil {
return err
}
w.n = int64(n)
w.files = append(w.files, f)
if w.wbuf != nil {
@ -194,7 +222,6 @@ func (w *Writer) cut() error {
} else {
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
}
w.n = 8
return nil
}
@ -284,27 +311,87 @@ func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
return newChunk, nil
}
// WriteChunks writes as many chunks as possible to the current segment,
// cuts a new segment when the current segment is full and
// writes the rest of the chunks in the new segment.
func (w *Writer) WriteChunks(chks ...Meta) error {
// Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one.
maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
for _, c := range chks {
maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
maxLen += int64(len(c.Chunk.Bytes()))
maxLen += 4 // The 4 bytes of crc32
}
newsz := w.n + maxLen
var (
batchSize = int64(0)
batchStart = 0
batches = make([][]Meta, 1)
batchID = 0
firstBatch = true
)
if w.wbuf == nil || newsz > w.segmentSize && maxLen <= w.segmentSize {
for i, chk := range chks {
// Each chunk contains: data length + encoding + the data itself + crc32
chkSize := int64(MaxChunkLengthFieldSize) // The data length is a variable length field so use the maximum possible value.
chkSize += ChunkEncodingSize // The chunk encoding.
chkSize += int64(len(chk.Chunk.Bytes())) // The data itself.
chkSize += crc32.Size // The 4 bytes of crc32.
batchSize += chkSize
// Cut a new batch when it is not the first chunk(to avoid empty segments) and
// the batch is too large to fit in the current segment.
cutNewBatch := (i != 0) && (batchSize+SegmentHeaderSize > w.segmentSize)
// When the segment already has some data than
// the first batch size calculation should account for that.
if firstBatch && w.n > SegmentHeaderSize {
cutNewBatch = batchSize+w.n > w.segmentSize
if cutNewBatch {
firstBatch = false
}
}
if cutNewBatch {
batchStart = i
batches = append(batches, []Meta{})
batchID++
batchSize = chkSize
}
batches[batchID] = chks[batchStart : i+1]
}
// Create a new segment when one doesn't already exist.
if w.n == 0 {
if err := w.cut(); err != nil {
return err
}
}
for i, chks := range batches {
if err := w.writeChunks(chks); err != nil {
return err
}
// Cut a new segment only when there are more chunks to write.
// Avoid creating a new empty segment at the end of the write.
if i < len(batches)-1 {
if err := w.cut(); err != nil {
return err
}
}
}
return nil
}
// writeChunks writes the chunks into the current segment irrespective
// of the configured segment size limit. A segment should have been already
// started before calling this.
func (w *Writer) writeChunks(chks []Meta) error {
if len(chks) == 0 {
return nil
}
var seq = uint64(w.seq()) << 32
for i := range chks {
chk := &chks[i]
// The reference is set to the segment index and the offset where
// the data starts for this chunk.
//
// The upper 4 bytes are for the segment index and
// the lower 4 bytes are for the segment offset where to start reading this chunk.
chk.Ref = seq | uint64(w.n)
n := binary.PutUvarint(w.buf[:], uint64(len(chk.Chunk.Bytes())))
@ -328,7 +415,6 @@ func (w *Writer) WriteChunks(chks ...Meta) error {
return err
}
}
return nil
}
@ -368,19 +454,23 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
// Reader implements a ChunkReader for a serialized byte stream
// of series data.
type Reader struct {
bs []ByteSlice // The underlying bytes holding the encoded series data.
cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc.Pool
// The underlying bytes holding the encoded series data.
// Each slice holds the data for a different segment.
bs []ByteSlice
cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc.Pool
crc32 hash.Hash
buf [binary.MaxVarintLen32]byte
}
func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs}
cr := Reader{pool: pool, bs: bs, cs: cs, crc32: newCRC32()}
var totalSize int64
for i, b := range cr.bs {
if b.Len() < chunkHeaderSize {
return nil, errors.Wrapf(errInvalidSize, "invalid chunk header in segment %d", i)
if b.Len() < SegmentHeaderSize {
return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i)
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks {
@ -445,28 +535,52 @@ func (s *Reader) Size() int64 {
// Chunk returns a chunk from a given reference.
func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
var (
sgmSeq = int(ref >> 32)
sgmOffset = int((ref << 32) >> 32)
// Get the upper 4 bytes.
// These contain the segment index.
sgmIndex = int(ref >> 32)
// Get the lower 4 bytes.
// These contain the segment offset where the data for this chunk starts.
chkStart = int((ref << 32) >> 32)
)
if sgmSeq >= len(s.bs) {
return nil, errors.Errorf("reference sequence %d out of range", sgmSeq)
}
chkS := s.bs[sgmSeq]
if sgmOffset >= chkS.Len() {
return nil, errors.Errorf("offset %d beyond data size %d", sgmOffset, chkS.Len())
if sgmIndex >= len(s.bs) {
return nil, errors.Errorf("segment index %d out of range", sgmIndex)
}
sgmBytes := s.bs[sgmIndex]
if chkStart+MaxChunkLengthFieldSize > sgmBytes.Len() {
return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len())
}
// With the minimum chunk length this should never cause us reading
// over the end of the slice.
chk := chkS.Range(sgmOffset, sgmOffset+binary.MaxVarintLen32)
chkLen, n := binary.Uvarint(chk)
c := sgmBytes.Range(chkStart, chkStart+MaxChunkLengthFieldSize)
chkDataLen, n := binary.Uvarint(c)
if n <= 0 {
return nil, errors.Errorf("reading chunk length failed with %d", n)
}
chk = chkS.Range(sgmOffset+n, sgmOffset+n+1+int(chkLen))
return s.pool.Get(chunkenc.Encoding(chk[0]), chk[1:1+chkLen])
chkEncStart := chkStart + n
chkEnd := chkEncStart + ChunkEncodingSize + int(chkDataLen) + crc32.Size
chkDataStart := chkEncStart + ChunkEncodingSize
chkDataEnd := chkEnd - crc32.Size
if chkEnd > sgmBytes.Len() {
return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len())
}
sum := sgmBytes.Range(chkEnd-crc32.Size, chkEnd)
s.crc32.Reset()
if _, err := s.crc32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil {
return nil, err
}
if act := s.crc32.Sum(s.buf[:0]); !bytes.Equal(act, sum) {
return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act)
}
chkData := sgmBytes.Range(chkDataStart, chkDataEnd)
chkEnc := sgmBytes.Range(chkEncStart, chkEncStart+ChunkEncodingSize)[0]
return s.pool.Get(chunkenc.Encoding(chkEnc), chkData)
}
func nextSequenceFile(dir string) (string, int, error) {

View file

@ -14,7 +14,9 @@
package tsdb
import (
"encoding/binary"
"fmt"
"hash/crc32"
"io/ioutil"
"math"
"math/rand"
@ -22,6 +24,7 @@ import (
"path"
"path/filepath"
"sort"
"strconv"
"testing"
"time"
@ -2480,3 +2483,187 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
testutil.Ok(t, seriesSet.Err())
testutil.Equals(t, 1000.0, sum)
}
// TestChunkWriter ensures that chunk segment are cut at the set segment size and
// that the resulted segments includes the expected chunks data.
func TestChunkWriter(t *testing.T) {
chk1 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}})
chk2 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}})
chk3 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}})
chk4 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}})
chk5 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}})
chunkSize := len(chk1.Chunk.Bytes()) + chunks.MaxChunkLengthFieldSize + chunks.ChunkEncodingSize + crc32.Size
tests := []struct {
chks [][]chunks.Meta
segmentSize,
expSegmentsCount int
expSegmentSizes []int
}{
// 0:Last chunk ends at the segment boundary so
// all chunks should fit in a single segment.
{
chks: [][]chunks.Meta{
[]chunks.Meta{
chk1,
chk2,
chk3,
},
},
segmentSize: 3 * chunkSize,
expSegmentSizes: []int{3 * chunkSize},
expSegmentsCount: 1,
},
// 1:Two chunks can fit in a single segment so the last one should result in a new segment.
{
chks: [][]chunks.Meta{
[]chunks.Meta{
chk1,
chk2,
chk3,
chk4,
chk5,
},
},
segmentSize: 2 * chunkSize,
expSegmentSizes: []int{2 * chunkSize, 2 * chunkSize, chunkSize},
expSegmentsCount: 3,
},
// 2:When the segment size is smaller than the size of 2 chunks
// the last segment should still create a new segment.
{
chks: [][]chunks.Meta{
[]chunks.Meta{
chk1,
chk2,
chk3,
},
},
segmentSize: 2*chunkSize - 1,
expSegmentSizes: []int{chunkSize, chunkSize, chunkSize},
expSegmentsCount: 3,
},
// 3:When the segment is smaller than a single chunk
// it should still be written by ignoring the max segment size.
{
chks: [][]chunks.Meta{
[]chunks.Meta{
chk1,
},
},
segmentSize: chunkSize - 1,
expSegmentSizes: []int{chunkSize},
expSegmentsCount: 1,
},
// 4:All chunks are bigger than the max segment size, but
// these should still be written even when this will result in bigger segment than the set size.
// Each segment will hold a single chunk.
{
chks: [][]chunks.Meta{
[]chunks.Meta{
chk1,
chk2,
chk3,
},
},
segmentSize: 1,
expSegmentSizes: []int{chunkSize, chunkSize, chunkSize},
expSegmentsCount: 3,
},
// 5:Adding multiple batches of chunks.
{
chks: [][]chunks.Meta{
[]chunks.Meta{
chk1,
chk2,
chk3,
},
[]chunks.Meta{
chk4,
chk5,
},
},
segmentSize: 3 * chunkSize,
expSegmentSizes: []int{3 * chunkSize, 2 * chunkSize},
expSegmentsCount: 2,
},
// 6:Adding multiple batches of chunks.
{
chks: [][]chunks.Meta{
[]chunks.Meta{
chk1,
},
[]chunks.Meta{
chk2,
chk3,
},
[]chunks.Meta{
chk4,
},
},
segmentSize: 2 * chunkSize,
expSegmentSizes: []int{2 * chunkSize, 2 * chunkSize},
expSegmentsCount: 2,
},
}
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_chunk_witer")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
chunkw, err := chunks.NewWriterWithSegSize(tmpdir, chunks.SegmentHeaderSize+int64(test.segmentSize))
testutil.Ok(t, err)
for _, chks := range test.chks {
chunkw.WriteChunks(chks...)
}
testutil.Ok(t, chunkw.Close())
files, err := ioutil.ReadDir(tmpdir)
testutil.Ok(t, err)
testutil.Equals(t, test.expSegmentsCount, len(files), "expected segments count mismatch")
// Verify that all data is written to the segments.
sizeExp := 0
sizeAct := 0
for _, chks := range test.chks {
for _, chk := range chks {
l := make([]byte, binary.MaxVarintLen32)
sizeExp += binary.PutUvarint(l, uint64(len(chk.Chunk.Bytes()))) // The length field.
sizeExp += chunks.ChunkEncodingSize
sizeExp += len(chk.Chunk.Bytes()) // The data itself.
sizeExp += crc32.Size // The 4 bytes of crc32
}
}
sizeExp += test.expSegmentsCount * chunks.SegmentHeaderSize // The segment header bytes.
for i, f := range files {
size := int(f.Size())
// Verify that the segment is the same or smaller than the expected size.
testutil.Assert(t, chunks.SegmentHeaderSize+test.expSegmentSizes[i] >= size, "Segment:%v should NOT be bigger than:%v actual:%v", i, chunks.SegmentHeaderSize+test.expSegmentSizes[i], size)
sizeAct += size
}
testutil.Equals(t, sizeExp, sizeAct)
// Check the content of the chunks.
r, err := chunks.NewDirReader(tmpdir, nil)
testutil.Ok(t, err)
for _, chks := range test.chks {
for _, chkExp := range chks {
chkAct, err := r.Chunk(chkExp.Ref)
testutil.Ok(t, err)
testutil.Equals(t, chkExp.Chunk.Bytes(), chkAct.Bytes())
}
}
})
}
}