Validate index TOC checksum on read

This commit is contained in:
Sunny Klair 2017-10-25 18:12:13 -04:00
parent 6ca5e52b69
commit 4fdf9b195c
3 changed files with 15 additions and 6 deletions

View file

@ -2,7 +2,7 @@
The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block. The maximum size per segment file is 512MiB. The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block. The maximum size per segment file is 512MiB.
Chunks in the files are referenced from the index by the in-file offset in the 4 LSB and the segment sequence number in the bigher 4 MSBs. Chunks in the files are referenced from the index by the in-file offset in the 4 LSB and the segment sequence number in the higher 4 MSBs.
``` ```
┌────────────────────────────────────────┬──────────────────────┐ ┌────────────────────────────────────────┬──────────────────────┐

View file

@ -133,7 +133,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
return renameFile(tmp, path) return renameFile(tmp, path)
} }
// Block represents a directory of time series data covering a continous time range. // Block represents a directory of time series data covering a continuous time range.
type Block struct { type Block struct {
mtx sync.RWMutex mtx sync.RWMutex
closing bool closing bool

View file

@ -573,8 +573,9 @@ type indexReader struct {
} }
var ( var (
errInvalidSize = fmt.Errorf("invalid size") errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag") errInvalidFlag = fmt.Errorf("invalid flag")
errInvalidChecksum = fmt.Errorf("invalid checksum")
) )
// NewIndexReader returns a new IndexReader on the given directory. // NewIndexReader returns a new IndexReader on the given directory.
@ -618,6 +619,11 @@ func newIndexReader(dir string) (*indexReader, error) {
func (r *indexReader) readTOC() error { func (r *indexReader) readTOC() error {
d := r.decbufAt(len(r.b) - indexTOCLen) d := r.decbufAt(len(r.b) - indexTOCLen)
crc := newCRC32()
if _, err := crc.Write(d.get()[:d.len()-4]); err != nil {
return errors.Wrap(err, "write to hash")
}
r.toc.symbols = d.be64() r.toc.symbols = d.be64()
r.toc.series = d.be64() r.toc.series = d.be64()
r.toc.labelIndices = d.be64() r.toc.labelIndices = d.be64()
@ -625,9 +631,12 @@ func (r *indexReader) readTOC() error {
r.toc.postings = d.be64() r.toc.postings = d.be64()
r.toc.postingsTable = d.be64() r.toc.postingsTable = d.be64()
// TODO(fabxc): validate checksum. // Validate checksum
if d.be32() != crc.Sum32() {
return errors.Wrap(errInvalidChecksum, "TOC checksum")
}
return nil return d.err()
} }
func (r *indexReader) decbufAt(off int) decbuf { func (r *indexReader) decbufAt(off int) decbuf {