diff --git a/block_test.go b/block_test.go index 1f091693d..c6c3951df 100644 --- a/block_test.go +++ b/block_test.go @@ -15,6 +15,8 @@ package tsdb import ( "context" + "encoding/binary" + "errors" "io/ioutil" "math/rand" "os" @@ -22,6 +24,7 @@ import ( "testing" "github.com/go-kit/kit/log" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/tsdbutil" ) @@ -77,6 +80,74 @@ func TestCreateBlock(t *testing.T) { testutil.Ok(t, err) } +func TestCorruptedChunk(t *testing.T) { + for name, test := range map[string]struct { + corrFunc func(f *os.File) // Func that applies the corruption. + expErr error + }{ + "invalid header size": { + func(f *os.File) { + err := f.Truncate(1) + testutil.Ok(t, err) + }, + errors.New("invalid chunk header in segment 0: invalid size"), + }, + "invalid magic number": { + func(f *os.File) { + magicChunksOffset := int64(0) + _, err := f.Seek(magicChunksOffset, 0) + testutil.Ok(t, err) + + // Set invalid magic number. + b := make([]byte, chunks.MagicChunksSize) + binary.BigEndian.PutUint32(b[:chunks.MagicChunksSize], 0x00000000) + n, err := f.Write(b) + testutil.Ok(t, err) + testutil.Equals(t, chunks.MagicChunksSize, n) + }, + errors.New("invalid magic number 0"), + }, + "invalid chunk format version": { + func(f *os.File) { + chunksFormatVersionOffset := int64(4) + _, err := f.Seek(chunksFormatVersionOffset, 0) + testutil.Ok(t, err) + + // Set invalid chunk format version. + b := make([]byte, chunks.ChunksFormatVersionSize) + b[0] = 0 + n, err := f.Write(b) + testutil.Ok(t, err) + testutil.Equals(t, chunks.ChunksFormatVersionSize, n) + }, + errors.New("invalid chunk format version 0"), + }, + } { + t.Run(name, func(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test_open_block_chunk_corrupted") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + }() + + blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0)) + files, err := sequenceFiles(chunkDir(blockDir)) + testutil.Ok(t, err) + testutil.Assert(t, len(files) > 0, "No chunk created.") + + f, err := os.OpenFile(files[0], os.O_RDWR, 0666) + testutil.Ok(t, err) + + // Apply corruption function. + test.corrFunc(f) + testutil.Ok(t, f.Close()) + + _, err = OpenBlock(nil, blockDir, nil) + testutil.Equals(t, test.expErr.Error(), err.Error()) + }) + } +} + // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []Series) string { head, err := NewHead(nil, nil, nil, 2*60*60*1000) diff --git a/chunks/chunks.go b/chunks/chunks.go index 07e0be803..4db40f0ba 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -27,12 +27,20 @@ import ( "github.com/pkg/errors" "github.com/prometheus/tsdb/chunkenc" + tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/fileutil" ) const ( // MagicChunks is 4 bytes at the head of a series file. MagicChunks = 0x85BD40DD + // MagicChunksSize is the size in bytes of MagicChunks. + MagicChunksSize = 4 + + chunksFormatV1 = 1 + ChunksFormatVersionSize = 1 + + chunkHeaderSize = MagicChunksSize + ChunksFormatVersionSize ) // Meta holds information about a chunk of data. @@ -93,8 +101,6 @@ type Writer struct { const ( defaultChunkSegmentSize = 512 * 1024 * 1024 - - chunksFormatV1 = 1 ) // NewWriter returns a new writer against the given directory. @@ -170,9 +176,8 @@ func (w *Writer) cut() error { } // Write header metadata for new file. - metab := make([]byte, 8) - binary.BigEndian.PutUint32(metab[:4], MagicChunks) + binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks) metab[4] = chunksFormatV1 if _, err := f.Write(metab); err != nil { @@ -373,13 +378,18 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err var totalSize int64 for i, b := range cr.bs { - if b.Len() < 4 { - return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) + if b.Len() < chunkHeaderSize { + return nil, errors.Wrapf(errInvalidSize, "invalid chunk header in segment %d", i) } // Verify magic number. - if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { + if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks { return nil, errors.Errorf("invalid magic number %x", m) } + + // Verify chunk format version. + if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { + return nil, errors.Errorf("invalid chunk format version %d", v) + } totalSize += int64(b.Len()) } cr.size = totalSize @@ -409,7 +419,15 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { cs = append(cs, f) bs = append(bs, realByteSlice(f.Bytes())) } - return newReader(bs, cs, pool) + + reader, err := newReader(bs, cs, pool) + if err != nil { + var merr tsdb_errors.MultiError + merr.Add(err) + merr.Add(closeAll(cs)) + return nil, merr + } + return reader, nil } func (s *Reader) Close() error {