From 94f3fd9812e71d5ff7882673ae668a858747fa05 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 26 Apr 2017 18:03:44 +0200 Subject: [PATCH] Move encoding helpers into separate file --- encoding_helpers.go | 157 +++++++++++++++++++++++++++ index.go | 259 +++++++++----------------------------------- 2 files changed, 209 insertions(+), 207 deletions(-) create mode 100644 encoding_helpers.go diff --git a/encoding_helpers.go b/encoding_helpers.go new file mode 100644 index 000000000..c122c58a8 --- /dev/null +++ b/encoding_helpers.go @@ -0,0 +1,157 @@ +package tsdb + +import ( + "encoding/binary" + "hash" + "unsafe" +) + +// enbuf is a helper type to populate a byte slice with various types. +type encbuf struct { + b []byte + c [binary.MaxVarintLen64]byte +} + +func (e *encbuf) reset() { e.b = e.b[:0] } +func (e *encbuf) get() []byte { return e.b } +func (e *encbuf) len() int { return len(e.b) } + +func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } +func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } +func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } + +func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } +func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } +func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } +func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } + +func (e *encbuf) putBE32(x uint32) { + binary.BigEndian.PutUint32(e.c[:], x) + e.b = append(e.b, e.c[:4]...) +} + +func (e *encbuf) putBE64(x uint64) { + binary.BigEndian.PutUint64(e.c[:], x) + e.b = append(e.b, e.c[:8]...) +} + +func (e *encbuf) putUvarint64(x uint64) { + n := binary.PutUvarint(e.c[:], x) + e.b = append(e.b, e.c[:n]...) +} + +func (e *encbuf) putVarint64(x int64) { + n := binary.PutVarint(e.c[:], x) + e.b = append(e.b, e.c[:n]...) +} + +// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!). +func (e *encbuf) putUvarintStr(s string) { + b := *(*[]byte)(unsafe.Pointer(&s)) + e.putUvarint(len(b)) + e.putString(s) +} + +// putHash appends a hash over the buffers current contents to the buffer. +func (e *encbuf) putHash(h hash.Hash) { + h.Reset() + _, err := h.Write(e.b) + if err != nil { + panic(err) // The CRC32 implementation does not error + } + e.b = h.Sum(e.b) +} + +// decbuf provides safe methods to extract data from a byte slice. It does all +// necessary bounds checking and advancing of the byte slice. +// Several datums can be extracted without checking for errors. However, before using +// any datum, the err() method must be checked. +type decbuf struct { + b []byte + e error +} + +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) be32int() int { return int(d.be32()) } + +func (d *decbuf) uvarintStr() string { + l := d.uvarint64() + if d.e != nil { + return "" + } + if len(d.b) < int(l) { + d.e = errInvalidSize + return "" + } + s := string(d.b[:l]) + d.b = d.b[l:] + return s +} + +func (d *decbuf) varint64() int64 { + if d.e != nil { + return 0 + } + x, n := binary.Varint(d.b) + if n < 1 { + d.e = errInvalidSize + return 0 + } + d.b = d.b[n:] + return x +} + +func (d *decbuf) uvarint64() uint64 { + if d.e != nil { + return 0 + } + x, n := binary.Uvarint(d.b) + if n < 1 { + d.e = errInvalidSize + return 0 + } + d.b = d.b[n:] + return x +} + +func (d *decbuf) be64() uint64 { + if d.e != nil { + return 0 + } + if len(d.b) < 4 { + d.e = errInvalidSize + return 0 + } + x := binary.BigEndian.Uint64(d.b) + d.b = d.b[8:] + return x +} + +func (d *decbuf) be32() uint32 { + if d.e != nil { + return 0 + } + if len(d.b) < 4 { + d.e = errInvalidSize + return 0 + } + x := binary.BigEndian.Uint32(d.b) + d.b = d.b[4:] + return x +} + +func (d *decbuf) decbuf(l int) decbuf { + if d.e != nil { + return decbuf{e: d.e} + } + if l > len(d.b) { + return decbuf{e: errInvalidSize} + } + r := decbuf{b: d.b[:l]} + d.b = d.b[l:] + return r +} + +func (d *decbuf) err() error { return d.e } +func (d *decbuf) len() int { return len(d.b) } +func (d *decbuf) get() []byte { return d.b } diff --git a/index.go b/index.go index 8a2dbdb85..bf8b05ea8 100644 --- a/index.go +++ b/index.go @@ -24,7 +24,6 @@ import ( "path/filepath" "sort" "strings" - "unsafe" "math" @@ -557,17 +556,14 @@ func newIndexReader(dir string) (*indexReader, error) { } func (r *indexReader) readTOC() error { - if len(r.b) < indexTOCLen { - return errInvalidSize - } - b := r.b[len(r.b)-indexTOCLen:] + d := r.decbufAt(len(r.b) - indexTOCLen) - r.toc.symbols = binary.BigEndian.Uint64(b[0:8]) - r.toc.series = binary.BigEndian.Uint64(b[8:16]) - r.toc.labelIndices = binary.BigEndian.Uint64(b[16:24]) - r.toc.labelIndicesTable = binary.BigEndian.Uint64(b[24:32]) - r.toc.postings = binary.BigEndian.Uint64(b[32:40]) - r.toc.postingsTable = binary.BigEndian.Uint64(b[40:48]) + r.toc.symbols = d.be64() + r.toc.series = d.be64() + r.toc.labelIndices = d.be64() + r.toc.labelIndicesTable = d.be64() + r.toc.postings = d.be64() + r.toc.postingsTable = d.be64() // TODO(fabxc): validate checksum. @@ -594,21 +590,20 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { var ( d1 = r.decbufAt(int(off)) - cnt = d1.readBE32() - el = d1.readBE32() - d2 = d1.get(int(el)) + cnt = d1.be32() + d2 = d1.decbuf(d1.be32int()) ) res := make(map[string]uint32, 512) for d2.err() == nil && d2.len() > 0 && cnt > 0 { - keyCount := int(d2.readUvarint()) + keyCount := int(d2.uvarint()) keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d2.readUvarintStr()) + keys = append(keys, d2.uvarintStr()) } - res[strings.Join(keys, sep)] = uint32(d2.readUvarint()) + res[strings.Join(keys, sep)] = uint32(d2.uvarint()) cnt-- } @@ -685,19 +680,20 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - b, err := r.getSized(off) - if err != nil { - return nil, errors.Wrapf(err, "get sized region at %d", off) + d1 := r.decbufAt(int(off)) + d2 := d1.decbuf(int(d1.uvarint())) + + c := d2.uvarint() + + if d2.err() != nil { + return nil, errors.Wrap(d2.err(), "read label value index") } - c, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read label index size") - } + // TODO(fabxc): verify checksum in 4 remaining bytes of d1. st := &serializedStringTuples{ l: int(c), - b: b[n:], + b: d2.get(), lookup: r.lookupSymbol, } return st, nil @@ -714,91 +710,61 @@ func (r *indexReader) LabelIndices() ([][]string, error) { res := [][]string{} for s := range r.labels { - res = append(res, strings.Split(s, string(sep))) + res = append(res, strings.Split(s, sep)) } return res, nil } func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { - // Read away length of series data. - _, n := binary.Uvarint(r.b[ref:]) - b := r.b[int(ref)+n:] + d1 := r.decbufAt(int(ref)) + d2 := d1.decbuf(int(d1.uvarint())) - k, n := binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of labels") - } - - b = b[n:] + k := int(d2.uvarint()) lbls := make(labels.Labels, 0, k) - for i := 0; i < 2*int(k); i += 2 { - o, m := binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") - } - n, err := r.lookupSymbol(uint32(o)) - if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") - } - b = b[m:] + for i := 0; i < k; i++ { + lno := uint32(d2.uvarint()) + lvo := uint32(d2.uvarint()) - o, m = binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") + if d2.err() != nil { + return nil, nil, errors.Wrap(d2.err(), "read series label offsets") } - v, err := r.lookupSymbol(uint32(o)) - if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") - } - b = b[m:] - lbls = append(lbls, labels.Label{ - Name: n, - Value: v, - }) + ln, err := r.lookupSymbol(lno) + if err != nil { + return nil, nil, errors.Wrap(err, "lookup label name") + } + lv, err := r.lookupSymbol(lvo) + if err != nil { + return nil, nil, errors.Wrap(err, "lookup label value") + } + + lbls = append(lbls, labels.Label{Name: ln, Value: lv}) } // Read the chunks meta data. - k, n = binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of chunks") - } - - b = b[n:] + k = int(d2.uvarint()) chunks := make([]*ChunkMeta, 0, k) - for i := 0; i < int(k); i++ { - firstTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "first time") - } - b = b[n:] + for i := 0; i < k; i++ { + mint := d2.varint64() + maxt := d2.varint64() + off := d2.uvarint64() + _ = d2.be32() - lastTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "last time") - } - b = b[n:] + // TODO(fabxc): verify CRC32 - o, n := binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "chunk offset") + if d2.err() != nil { + return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i) } - b = b[n:] - - // TODO(fabxc): read and potentially verify checksum. - b = b[4:] chunks = append(chunks, &ChunkMeta{ - Ref: o, - MinTime: firstTime, - MaxTime: lastTime, + Ref: off, + MinTime: mint, + MaxTime: maxt, }) } - // TODO(fabxc): read and potentially verify checksum. - return lbls, chunks, nil } @@ -892,124 +858,3 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { return res, nil } - -// enbuf is a helper type to populate a byte slice with various types. -type encbuf struct { - b []byte - c [binary.MaxVarintLen64]byte -} - -func (e *encbuf) reset() { e.b = e.b[:0] } -func (e *encbuf) get() []byte { return e.b } -func (e *encbuf) len() int { return len(e.b) } - -func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } -func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } -func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } - -func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } -func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } -func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } -func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } - -func (e *encbuf) putBE32(x uint32) { - binary.BigEndian.PutUint32(e.c[:], x) - e.b = append(e.b, e.c[:4]...) -} - -func (e *encbuf) putBE64(x uint64) { - binary.BigEndian.PutUint64(e.c[:], x) - e.b = append(e.b, e.c[:8]...) -} - -func (e *encbuf) putUvarint64(x uint64) { - n := binary.PutUvarint(e.c[:], x) - e.b = append(e.b, e.c[:n]...) -} - -func (e *encbuf) putVarint64(x int64) { - n := binary.PutVarint(e.c[:], x) - e.b = append(e.b, e.c[:n]...) -} - -// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!). -func (e *encbuf) putUvarintStr(s string) { - b := *(*[]byte)(unsafe.Pointer(&s)) - e.putUvarint(len(b)) - e.putString(s) -} - -// putHash appends a hash over the buffers current contents to the buffer. -func (e *encbuf) putHash(h hash.Hash) { - h.Reset() - _, err := h.Write(e.b) - if err != nil { - panic(err) // The CRC32 implementation does not error - } - e.b = h.Sum(e.b) -} - -type decbuf struct { - b []byte - e error -} - -func (d *decbuf) readUvarintStr() string { - l := d.readUvarint() - if d.e != nil { - return "" - } - if len(d.b) < int(l) { - d.e = errInvalidSize - return "" - } - s := string(d.b[:l]) - d.b = d.b[l:] - return s -} - -func (d *decbuf) readUvarint() uint64 { - if d.e != nil { - return 0 - } - x, n := binary.Uvarint(d.b) - if n < 1 { - d.e = errInvalidSize - return 0 - } - d.b = d.b[n:] - return x -} - -func (d *decbuf) readBE32() uint32 { - if d.e != nil { - return 0 - } - if len(d.b) < 4 { - d.e = errInvalidSize - return 0 - } - x := binary.BigEndian.Uint32(d.b) - d.b = d.b[4:] - return x -} - -func (d *decbuf) get(l int) decbuf { - if d.e != nil { - return decbuf{e: d.e} - } - if l > len(d.b) { - return decbuf{e: errInvalidSize} - } - r := decbuf{b: d.b[:l]} - d.b = d.b[l:] - return r -} - -func (d *decbuf) err() error { - return d.e -} - -func (d *decbuf) len() int { - return len(d.b) -}