Use Castagnoli polynomial for CRC32s; cache them

This commit is contained in:
Fabian Reinartz 2017-02-15 15:24:53 -08:00
parent 9c7a88223e
commit afa084920c
2 changed files with 35 additions and 25 deletions

33
wal.go
View file

@ -3,6 +3,7 @@ package tsdb
import ( import (
"bufio" "bufio"
"encoding/binary" "encoding/binary"
"hash"
"hash/crc32" "hash/crc32"
"io" "io"
"math" "math"
@ -47,8 +48,9 @@ type WAL struct {
flushInterval time.Duration flushInterval time.Duration
segmentSize int64 segmentSize int64
cur *bufio.Writer crc32 hash.Hash32
curN int64 cur *bufio.Writer
curN int64
stopc chan struct{} stopc chan struct{}
donec chan struct{} donec chan struct{}
@ -79,6 +81,7 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error
donec: make(chan struct{}), donec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
segmentSize: walSegmentSizeBytes, segmentSize: walSegmentSizeBytes,
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
} }
if err := w.initSegments(); err != nil { if err := w.initSegments(); err != nil {
return nil, err return nil, err
@ -96,7 +99,7 @@ func (w *WAL) Reader() *WALReader {
for _, f := range w.files { for _, f := range w.files {
rs = append(rs, f) rs = append(rs, f)
} }
return &WALReader{rs: rs} return NewWALReader(rs...)
} }
// Log writes a batch of new series labels and samples to the log. // Log writes a batch of new series labels and samples to the log.
@ -301,8 +304,8 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error {
} }
} }
h := crc32.NewIEEE() w.crc32.Reset()
wr := io.MultiWriter(h, w.cur) wr := io.MultiWriter(w.crc32, w.cur)
b := make([]byte, 6) b := make([]byte, 6)
b[0] = byte(et) b[0] = byte(et)
@ -316,7 +319,7 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error {
if _, err := wr.Write(buf); err != nil { if _, err := wr.Write(buf); err != nil {
return err return err
} }
if _, err := w.cur.Write(h.Sum(nil)); err != nil { if _, err := w.cur.Write(w.crc32.Sum(nil)); err != nil {
return err return err
} }
@ -407,9 +410,10 @@ func (w *WAL) encodeSamples(samples []refdSample) error {
// WALReader decodes and emits write ahead log entries. // WALReader decodes and emits write ahead log entries.
type WALReader struct { type WALReader struct {
rs []io.ReadCloser rs []io.ReadCloser
cur int cur int
buf []byte buf []byte
crc32 hash.Hash32
err error err error
labels []labels.Labels labels []labels.Labels
@ -419,8 +423,9 @@ type WALReader struct {
// NewWALReader returns a new WALReader over the sequence of the given ReadClosers. // NewWALReader returns a new WALReader over the sequence of the given ReadClosers.
func NewWALReader(rs ...io.ReadCloser) *WALReader { func NewWALReader(rs ...io.ReadCloser) *WALReader {
return &WALReader{ return &WALReader{
rs: rs, rs: rs,
buf: make([]byte, 0, 1024*1024), buf: make([]byte, 0, 128*4096),
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
} }
} }
@ -483,8 +488,8 @@ func (r *WALReader) Next() bool {
} }
func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
cw := crc32.NewIEEE() r.crc32.Reset()
tr := io.TeeReader(cr, cw) tr := io.TeeReader(cr, r.crc32)
b := make([]byte, 6) b := make([]byte, 6)
if _, err := tr.Read(b); err != nil { if _, err := tr.Read(b); err != nil {
@ -513,7 +518,7 @@ func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
if err != nil { if err != nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
if exp, has := binary.BigEndian.Uint32(b[:4]), cw.Sum32(); has != exp { if exp, has := binary.BigEndian.Uint32(b[:4]), r.crc32.Sum32(); has != exp {
return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp)
} }

View file

@ -3,6 +3,7 @@ package tsdb
import ( import (
"bufio" "bufio"
"encoding/binary" "encoding/binary"
"hash"
"hash/crc32" "hash/crc32"
"io" "io"
"sort" "sort"
@ -42,10 +43,11 @@ type SeriesWriter interface {
// seriesWriter implements the SeriesWriter interface for the standard // seriesWriter implements the SeriesWriter interface for the standard
// serialization format. // serialization format.
type seriesWriter struct { type seriesWriter struct {
ow io.Writer ow io.Writer
w *bufio.Writer w *bufio.Writer
n int64 n int64
c int c int
crc32 hash.Hash
index IndexWriter index IndexWriter
} }
@ -55,6 +57,7 @@ func newSeriesWriter(w io.Writer, index IndexWriter) *seriesWriter {
ow: w, ow: w,
w: bufio.NewWriterSize(w, 1*1024*1024), w: bufio.NewWriterSize(w, 1*1024*1024),
n: 0, n: 0,
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
index: index, index: index,
} }
} }
@ -82,9 +85,8 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkM
} }
} }
// TODO(fabxc): is crc32 enough for chunks of one series? w.crc32.Reset()
h := crc32.NewIEEE() wr := io.MultiWriter(w.crc32, w.w)
wr := io.MultiWriter(h, w.w)
// For normal reads we don't need the number of the chunk section but // For normal reads we don't need the number of the chunk section but
// it allows us to verify checksums without reading the index file. // it allows us to verify checksums without reading the index file.
@ -117,7 +119,7 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkM
chk.Chunk = nil chk.Chunk = nil
} }
if err := w.write(w.w, h.Sum(nil)); err != nil { if err := w.write(w.w, w.crc32.Sum(nil)); err != nil {
return err return err
} }
@ -195,6 +197,8 @@ type indexWriter struct {
symbols map[string]uint32 // symbol offsets symbols map[string]uint32 // symbol offsets
labelIndexes []hashEntry // label index offsets labelIndexes []hashEntry // label index offsets
postings []hashEntry // postings lists offsets postings []hashEntry // postings lists offsets
crc32 hash.Hash
} }
func newIndexWriter(w io.Writer) *indexWriter { func newIndexWriter(w io.Writer) *indexWriter {
@ -204,6 +208,7 @@ func newIndexWriter(w io.Writer) *indexWriter {
n: 0, n: 0,
symbols: make(map[string]uint32, 4096), symbols: make(map[string]uint32, 4096),
series: make(map[uint32]*indexWriterSeries, 4096), series: make(map[uint32]*indexWriterSeries, 4096),
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
} }
} }
@ -215,8 +220,8 @@ func (w *indexWriter) write(wr io.Writer, b []byte) error {
// section writes a CRC32 checksummed section of length l and guarded by flag. // section writes a CRC32 checksummed section of length l and guarded by flag.
func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error { func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error {
h := crc32.NewIEEE() w.crc32.Reset()
wr := io.MultiWriter(h, w.w) wr := io.MultiWriter(w.crc32, w.w)
b := [5]byte{flag, 0, 0, 0, 0} b := [5]byte{flag, 0, 0, 0, 0}
binary.BigEndian.PutUint32(b[1:], l) binary.BigEndian.PutUint32(b[1:], l)
@ -228,7 +233,7 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er
if err := f(wr); err != nil { if err := f(wr); err != nil {
return errors.Wrap(err, "contents write func") return errors.Wrap(err, "contents write func")
} }
if err := w.write(w.w, h.Sum(nil)); err != nil { if err := w.write(w.w, w.crc32.Sum(nil)); err != nil {
return errors.Wrap(err, "writing checksum") return errors.Wrap(err, "writing checksum")
} }
return nil return nil