index: simplify checksum validation

This commit is contained in:
Fabian Reinartz 2017-11-09 15:56:40 +00:00
parent 798f2bdb0a
commit c354d6bd59
3 changed files with 88 additions and 53 deletions

View file

@ -3,6 +3,7 @@ package tsdb
import (
"encoding/binary"
"hash"
"hash/crc32"
"unsafe"
)
@ -77,6 +78,11 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// crc32 returns a CRC32 checksum over the remaining bytes.
func (d *decbuf) crc32() uint32 {
return crc32.Checksum(d.b, castagnoliTable)
}
func (d *decbuf) uvarintStr() string {
l := d.uvarint64()
if d.e != nil {

View file

@ -625,20 +625,24 @@ func newIndexReader(dir string) (*indexReader, error) {
}
func (r *indexReader) readTOC() error {
d := r.decbufAt(len(r.b) - indexTOCLen)
d1 := r.decbufAt(len(r.b) - indexTOCLen)
d2 := d1.decbuf(indexTOCLen - 4)
crc := d2.crc32()
r.toc.symbols = d.be64()
r.toc.series = d.be64()
r.toc.labelIndices = d.be64()
r.toc.labelIndicesTable = d.be64()
r.toc.postings = d.be64()
r.toc.postingsTable = d.be64()
r.toc.symbols = d2.be64()
r.toc.series = d2.be64()
r.toc.labelIndices = d2.be64()
r.toc.labelIndicesTable = d2.be64()
r.toc.postings = d2.be64()
r.toc.postingsTable = d2.be64()
if valid, err := r.checkCRC(d.be32(), len(r.b)-indexTOCLen, indexTOCLen-4); !valid {
return errors.Wrap(err, "TOC checksum")
if d2.err() != nil {
return d2.err()
}
return d.err()
if read := d1.be32(); crc != read {
return errors.Wrap(errInvalidChecksum, "read TOC")
}
return d1.err()
}
func (r *indexReader) decbufAt(off int) decbuf {
@ -648,20 +652,6 @@ func (r *indexReader) decbufAt(off int) decbuf {
return decbuf{b: r.b[off:]}
}
func (r *indexReader) checkCRC(crc uint32, off, cnt int) (bool, error) {
r.crc32.Reset()
if len(r.b) < off+cnt {
return false, errInvalidSize
}
if _, err := r.crc32.Write(r.b[off : off+cnt]); err != nil {
return false, errors.Wrap(err, "write to hash")
}
if r.crc32.Sum32() != crc {
return false, errInvalidChecksum
}
return true, nil
}
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them
// after the reader is closed.
@ -671,8 +661,8 @@ func (r *indexReader) readSymbols(off int) error {
}
var (
d1 = r.decbufAt(int(off))
l = d1.be32int()
d2 = d1.decbuf(l)
d2 = d1.decbuf(d1.be32int())
crc = d2.crc32()
origLen = d2.len()
cnt = d2.be32int()
basePos = uint32(off) + 4
@ -685,8 +675,8 @@ func (r *indexReader) readSymbols(off int) error {
nextPos = basePos + uint32(origLen-d2.len())
cnt--
}
if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid {
return errors.Wrap(err, "symbol table checksum")
if read := d1.be32(); crc != read {
return errors.Wrap(errInvalidChecksum, "read symbols")
}
return d2.err()
}
@ -698,8 +688,8 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
var (
d1 = r.decbufAt(int(off))
l = d1.be32int()
d2 = d1.decbuf(l)
d2 = d1.decbuf(d1.be32int())
crc = d2.crc32()
cnt = d2.be32()
)
@ -716,11 +706,9 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
cnt--
}
if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid {
return res, errors.Wrap(err, "offset table checksum")
if read := d1.be32(); crc != read {
return nil, errors.Wrap(errInvalidChecksum, "read offset table")
}
return res, d2.err()
}
@ -777,8 +765,8 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
}
d1 := r.decbufAt(int(off))
l := d1.be32int()
d2 := d1.decbuf(l)
d2 := d1.decbuf(d1.be32int())
crc := d2.crc32()
nc := d2.be32int()
d2.be32() // consume unused value entry count.
@ -787,10 +775,9 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
return nil, errors.Wrap(d2.err(), "read label value index")
}
if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid {
return nil, errors.Wrap(err, "read label values checksum")
if read := d1.be32(); crc != read {
return nil, errors.Wrap(errInvalidChecksum, "read label values")
}
st := &serializedStringTuples{
l: nc,
b: d2.get(),
@ -817,9 +804,8 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
d1 := r.decbufAt(int(ref))
l := d1.uvarint()
sl := len(r.b[ref:]) - d1.len() // # bytes in l
d2 := d1.decbuf(l)
d2 := d1.decbuf(d1.uvarint())
crc := d2.crc32()
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]
@ -881,11 +867,9 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
MaxTime: maxt,
})
}
if valid, err := r.checkCRC(d1.be32(), int(ref)+sl, l); !valid {
return errors.Wrap(err, "series checksum")
if read := d1.be32(); crc != read {
return errors.Wrap(errInvalidChecksum, "read series")
}
return nil
}
@ -899,19 +883,18 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
}
d1 := r.decbufAt(int(off))
l := d1.be32int()
d2 := d1.decbuf(l)
d2 := d1.decbuf(d1.be32int())
crc := d2.crc32()
d2.be32() // consume unused postings list length.
if d2.err() != nil {
return nil, errors.Wrap(d2.err(), "get postings bytes")
}
if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid {
return nil, errors.Wrap(err, "postings checksum")
if read := d1.be32(); crc != read {
return nil, errors.Wrap(errInvalidChecksum, "read postings")
}
return newBigEndianPostings(d2.get()), nil
}

View file

@ -14,6 +14,9 @@
package test
import (
"crypto/rand"
"fmt"
"hash/crc32"
"testing"
"github.com/cespare/xxhash"
@ -76,3 +79,46 @@ func fnv64a(b []byte) uint64 {
}
return h
}
func BenchmarkCRC32_diff(b *testing.B) {
data := [][]byte{}
for i := 0; i < 1000; i++ {
b := make([]byte, 512)
rand.Read(b)
data = append(data, b)
}
ctab := crc32.MakeTable(crc32.Castagnoli)
total := uint32(0)
b.Run("direct", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
total += crc32.Checksum(data[i%1000], ctab)
}
})
b.Run("hash-reuse", func(b *testing.B) {
b.ReportAllocs()
h := crc32.New(ctab)
for i := 0; i < b.N; i++ {
h.Reset()
h.Write(data[i%1000])
total += h.Sum32()
}
})
b.Run("hash-new", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
h := crc32.New(ctab)
h.Write(data[i%1000])
total += h.Sum32()
}
})
fmt.Println(total)
}