diff --git a/Documentation/format/chunks.md b/Documentation/format/chunks.md index 51ac1242c2..3faae9b7df 100644 --- a/Documentation/format/chunks.md +++ b/Documentation/format/chunks.md @@ -2,7 +2,7 @@ The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block. The maximum size per segment file is 512MiB. -Chunks in the files are referenced from the index by the in-file offset in the 4 LSB and the segment sequence number in the bigher 4 MSBs. +Chunks in the files are referenced from the index by the in-file offset in the 4 LSB and the segment sequence number in the higher 4 MSBs. ``` ┌────────────────────────────────────────┬──────────────────────┐ diff --git a/block.go b/block.go index 63b468f272..2bd1fd3648 100644 --- a/block.go +++ b/block.go @@ -133,7 +133,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error { return renameFile(tmp, path) } -// Block represents a directory of time series data covering a continous time range. +// Block represents a directory of time series data covering a continuous time range. type Block struct { mtx sync.RWMutex closing bool diff --git a/index.go b/index.go index 258db74d25..36507f645f 100644 --- a/index.go +++ b/index.go @@ -575,11 +575,14 @@ type indexReader struct { // prevents memory faults when applications work with read symbols after // the block has been unmapped. symbols map[uint32]string + + crc32 hash.Hash32 } var ( - errInvalidSize = fmt.Errorf("invalid size") - errInvalidFlag = fmt.Errorf("invalid flag") + errInvalidSize = fmt.Errorf("invalid size") + errInvalidFlag = fmt.Errorf("invalid flag") + errInvalidChecksum = fmt.Errorf("invalid checksum") ) // NewIndexReader returns a new IndexReader on the given directory. @@ -595,6 +598,7 @@ func newIndexReader(dir string) (*indexReader, error) { b: f.b, c: f, symbols: map[uint32]string{}, + crc32: newCRC32(), } // Verify magic number. @@ -630,9 +634,11 @@ func (r *indexReader) readTOC() error { r.toc.postings = d.be64() r.toc.postingsTable = d.be64() - // TODO(fabxc): validate checksum. + if valid, err := r.checkCRC(d.be32(), len(r.b)-indexTOCLen, indexTOCLen-4); !valid { + return errors.Wrap(err, "TOC checksum") + } - return nil + return d.err() } func (r *indexReader) decbufAt(off int) decbuf { @@ -642,6 +648,20 @@ func (r *indexReader) decbufAt(off int) decbuf { return decbuf{b: r.b[off:]} } +func (r *indexReader) checkCRC(crc uint32, off, cnt int) (bool, error) { + r.crc32.Reset() + if len(r.b) < off+cnt { + return false, errInvalidSize + } + if _, err := r.crc32.Write(r.b[off : off+cnt]); err != nil { + return false, errors.Wrap(err, "write to hash") + } + if r.crc32.Sum32() != crc { + return false, errInvalidChecksum + } + return true, nil +} + // readSymbols reads the symbol table fully into memory and allocates proper strings for them. // Strings backed by the mmap'd memory would cause memory faults if applications keep using them // after the reader is closed. @@ -651,7 +671,8 @@ func (r *indexReader) readSymbols(off int) error { } var ( d1 = r.decbufAt(int(off)) - d2 = d1.decbuf(d1.be32int()) + l = d1.be32int() + d2 = d1.decbuf(l) origLen = d2.len() cnt = d2.be32int() basePos = uint32(off) + 4 @@ -664,6 +685,9 @@ func (r *indexReader) readSymbols(off int) error { nextPos = basePos + uint32(origLen-d2.len()) cnt-- } + if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { + return errors.Wrap(err, "symbol table checksum") + } return d2.err() } @@ -674,7 +698,8 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { var ( d1 = r.decbufAt(int(off)) - d2 = d1.decbuf(d1.be32int()) + l = d1.be32int() + d2 = d1.decbuf(l) cnt = d2.be32() ) @@ -692,7 +717,10 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { cnt-- } - // TODO(fabxc): verify checksum from remainer of d1. + if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { + return res, errors.Wrap(err, "offset table checksum") + } + return res, d2.err() } @@ -749,7 +777,8 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { } d1 := r.decbufAt(int(off)) - d2 := d1.decbuf(d1.be32int()) + l := d1.be32int() + d2 := d1.decbuf(l) nc := d2.be32int() d2.be32() // consume unused value entry count. @@ -758,7 +787,9 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { return nil, errors.Wrap(d2.err(), "read label value index") } - // TODO(fabxc): verify checksum in 4 remaining bytes of d1. + if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { + return nil, errors.Wrap(err, "read label values checksum") + } st := &serializedStringTuples{ l: nc, @@ -786,7 +817,9 @@ func (r *indexReader) LabelIndices() ([][]string, error) { func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { d1 := r.decbufAt(int(ref)) - d2 := d1.decbuf(int(d1.uvarint())) + l := d1.uvarint() + sl := len(r.b[ref:]) - d1.len() // # bytes in l + d2 := d1.decbuf(l) *lbls = (*lbls)[:0] *chks = (*chks)[:0] @@ -849,7 +882,9 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) }) } - // TODO(fabxc): verify CRC32. + if valid, err := r.checkCRC(d1.be32(), int(ref)+sl, l); !valid { + return errors.Wrap(err, "series checksum") + } return nil } @@ -864,7 +899,8 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { } d1 := r.decbufAt(int(off)) - d2 := d1.decbuf(d1.be32int()) + l := d1.be32int() + d2 := d1.decbuf(l) d2.be32() // consume unused postings list length. @@ -872,7 +908,9 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { return nil, errors.Wrap(d2.err(), "get postings bytes") } - // TODO(fabxc): read checksum from 4 remainer bytes of d1 and verify. + if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { + return nil, errors.Wrap(err, "postings checksum") + } return newBigEndianPostings(d2.get()), nil }