Move encoding helpers into separate file

This commit is contained in:
Fabian Reinartz 2017-04-26 18:03:44 +02:00
parent 35b62f001e
commit 94f3fd9812
2 changed files with 209 additions and 207 deletions

157
encoding_helpers.go Normal file
View file

@ -0,0 +1,157 @@
package tsdb
import (
"encoding/binary"
"hash"
"unsafe"
)
// enbuf is a helper type to populate a byte slice with various types.
type encbuf struct {
b []byte
c [binary.MaxVarintLen64]byte
}
func (e *encbuf) reset() { e.b = e.b[:0] }
func (e *encbuf) get() []byte { return e.b }
func (e *encbuf) len() int { return len(e.b) }
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putBE32(x uint32) {
binary.BigEndian.PutUint32(e.c[:], x)
e.b = append(e.b, e.c[:4]...)
}
func (e *encbuf) putBE64(x uint64) {
binary.BigEndian.PutUint64(e.c[:], x)
e.b = append(e.b, e.c[:8]...)
}
func (e *encbuf) putUvarint64(x uint64) {
n := binary.PutUvarint(e.c[:], x)
e.b = append(e.b, e.c[:n]...)
}
func (e *encbuf) putVarint64(x int64) {
n := binary.PutVarint(e.c[:], x)
e.b = append(e.b, e.c[:n]...)
}
// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!).
func (e *encbuf) putUvarintStr(s string) {
b := *(*[]byte)(unsafe.Pointer(&s))
e.putUvarint(len(b))
e.putString(s)
}
// putHash appends a hash over the buffers current contents to the buffer.
func (e *encbuf) putHash(h hash.Hash) {
h.Reset()
_, err := h.Write(e.b)
if err != nil {
panic(err) // The CRC32 implementation does not error
}
e.b = h.Sum(e.b)
}
// decbuf provides safe methods to extract data from a byte slice. It does all
// necessary bounds checking and advancing of the byte slice.
// Several datums can be extracted without checking for errors. However, before using
// any datum, the err() method must be checked.
type decbuf struct {
b []byte
e error
}
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) uvarintStr() string {
l := d.uvarint64()
if d.e != nil {
return ""
}
if len(d.b) < int(l) {
d.e = errInvalidSize
return ""
}
s := string(d.b[:l])
d.b = d.b[l:]
return s
}
func (d *decbuf) varint64() int64 {
if d.e != nil {
return 0
}
x, n := binary.Varint(d.b)
if n < 1 {
d.e = errInvalidSize
return 0
}
d.b = d.b[n:]
return x
}
func (d *decbuf) uvarint64() uint64 {
if d.e != nil {
return 0
}
x, n := binary.Uvarint(d.b)
if n < 1 {
d.e = errInvalidSize
return 0
}
d.b = d.b[n:]
return x
}
func (d *decbuf) be64() uint64 {
if d.e != nil {
return 0
}
if len(d.b) < 4 {
d.e = errInvalidSize
return 0
}
x := binary.BigEndian.Uint64(d.b)
d.b = d.b[8:]
return x
}
func (d *decbuf) be32() uint32 {
if d.e != nil {
return 0
}
if len(d.b) < 4 {
d.e = errInvalidSize
return 0
}
x := binary.BigEndian.Uint32(d.b)
d.b = d.b[4:]
return x
}
func (d *decbuf) decbuf(l int) decbuf {
if d.e != nil {
return decbuf{e: d.e}
}
if l > len(d.b) {
return decbuf{e: errInvalidSize}
}
r := decbuf{b: d.b[:l]}
d.b = d.b[l:]
return r
}
func (d *decbuf) err() error { return d.e }
func (d *decbuf) len() int { return len(d.b) }
func (d *decbuf) get() []byte { return d.b }

259
index.go
View file

@ -24,7 +24,6 @@ import (
"path/filepath" "path/filepath"
"sort" "sort"
"strings" "strings"
"unsafe"
"math" "math"
@ -557,17 +556,14 @@ func newIndexReader(dir string) (*indexReader, error) {
} }
func (r *indexReader) readTOC() error { func (r *indexReader) readTOC() error {
if len(r.b) < indexTOCLen { d := r.decbufAt(len(r.b) - indexTOCLen)
return errInvalidSize
}
b := r.b[len(r.b)-indexTOCLen:]
r.toc.symbols = binary.BigEndian.Uint64(b[0:8]) r.toc.symbols = d.be64()
r.toc.series = binary.BigEndian.Uint64(b[8:16]) r.toc.series = d.be64()
r.toc.labelIndices = binary.BigEndian.Uint64(b[16:24]) r.toc.labelIndices = d.be64()
r.toc.labelIndicesTable = binary.BigEndian.Uint64(b[24:32]) r.toc.labelIndicesTable = d.be64()
r.toc.postings = binary.BigEndian.Uint64(b[32:40]) r.toc.postings = d.be64()
r.toc.postingsTable = binary.BigEndian.Uint64(b[40:48]) r.toc.postingsTable = d.be64()
// TODO(fabxc): validate checksum. // TODO(fabxc): validate checksum.
@ -594,21 +590,20 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
var ( var (
d1 = r.decbufAt(int(off)) d1 = r.decbufAt(int(off))
cnt = d1.readBE32() cnt = d1.be32()
el = d1.readBE32() d2 = d1.decbuf(d1.be32int())
d2 = d1.get(int(el))
) )
res := make(map[string]uint32, 512) res := make(map[string]uint32, 512)
for d2.err() == nil && d2.len() > 0 && cnt > 0 { for d2.err() == nil && d2.len() > 0 && cnt > 0 {
keyCount := int(d2.readUvarint()) keyCount := int(d2.uvarint())
keys := make([]string, 0, keyCount) keys := make([]string, 0, keyCount)
for i := 0; i < keyCount; i++ { for i := 0; i < keyCount; i++ {
keys = append(keys, d2.readUvarintStr()) keys = append(keys, d2.uvarintStr())
} }
res[strings.Join(keys, sep)] = uint32(d2.readUvarint()) res[strings.Join(keys, sep)] = uint32(d2.uvarint())
cnt-- cnt--
} }
@ -685,19 +680,20 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
//return nil, fmt.Errorf("label index doesn't exist") //return nil, fmt.Errorf("label index doesn't exist")
} }
b, err := r.getSized(off) d1 := r.decbufAt(int(off))
if err != nil { d2 := d1.decbuf(int(d1.uvarint()))
return nil, errors.Wrapf(err, "get sized region at %d", off)
c := d2.uvarint()
if d2.err() != nil {
return nil, errors.Wrap(d2.err(), "read label value index")
} }
c, n := binary.Uvarint(b) // TODO(fabxc): verify checksum in 4 remaining bytes of d1.
if n < 1 {
return nil, errors.Wrap(errInvalidSize, "read label index size")
}
st := &serializedStringTuples{ st := &serializedStringTuples{
l: int(c), l: int(c),
b: b[n:], b: d2.get(),
lookup: r.lookupSymbol, lookup: r.lookupSymbol,
} }
return st, nil return st, nil
@ -714,91 +710,61 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
res := [][]string{} res := [][]string{}
for s := range r.labels { for s := range r.labels {
res = append(res, strings.Split(s, string(sep))) res = append(res, strings.Split(s, sep))
} }
return res, nil return res, nil
} }
func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
// Read away length of series data. d1 := r.decbufAt(int(ref))
_, n := binary.Uvarint(r.b[ref:]) d2 := d1.decbuf(int(d1.uvarint()))
b := r.b[int(ref)+n:]
k, n := binary.Uvarint(b) k := int(d2.uvarint())
if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "number of labels")
}
b = b[n:]
lbls := make(labels.Labels, 0, k) lbls := make(labels.Labels, 0, k)
for i := 0; i < 2*int(k); i += 2 { for i := 0; i < k; i++ {
o, m := binary.Uvarint(b) lno := uint32(d2.uvarint())
if m < 1 { lvo := uint32(d2.uvarint())
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
}
n, err := r.lookupSymbol(uint32(o))
if err != nil {
return nil, nil, errors.Wrap(err, "symbol lookup")
}
b = b[m:]
o, m = binary.Uvarint(b) if d2.err() != nil {
if m < 1 { return nil, nil, errors.Wrap(d2.err(), "read series label offsets")
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
} }
v, err := r.lookupSymbol(uint32(o))
if err != nil {
return nil, nil, errors.Wrap(err, "symbol lookup")
}
b = b[m:]
lbls = append(lbls, labels.Label{ ln, err := r.lookupSymbol(lno)
Name: n, if err != nil {
Value: v, return nil, nil, errors.Wrap(err, "lookup label name")
}) }
lv, err := r.lookupSymbol(lvo)
if err != nil {
return nil, nil, errors.Wrap(err, "lookup label value")
}
lbls = append(lbls, labels.Label{Name: ln, Value: lv})
} }
// Read the chunks meta data. // Read the chunks meta data.
k, n = binary.Uvarint(b) k = int(d2.uvarint())
if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "number of chunks")
}
b = b[n:]
chunks := make([]*ChunkMeta, 0, k) chunks := make([]*ChunkMeta, 0, k)
for i := 0; i < int(k); i++ { for i := 0; i < k; i++ {
firstTime, n := binary.Varint(b) mint := d2.varint64()
if n < 1 { maxt := d2.varint64()
return nil, nil, errors.Wrap(errInvalidSize, "first time") off := d2.uvarint64()
} _ = d2.be32()
b = b[n:]
lastTime, n := binary.Varint(b) // TODO(fabxc): verify CRC32
if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "last time")
}
b = b[n:]
o, n := binary.Uvarint(b) if d2.err() != nil {
if n < 1 { return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i)
return nil, nil, errors.Wrap(errInvalidSize, "chunk offset")
} }
b = b[n:]
// TODO(fabxc): read and potentially verify checksum.
b = b[4:]
chunks = append(chunks, &ChunkMeta{ chunks = append(chunks, &ChunkMeta{
Ref: o, Ref: off,
MinTime: firstTime, MinTime: mint,
MaxTime: lastTime, MaxTime: maxt,
}) })
} }
// TODO(fabxc): read and potentially verify checksum.
return lbls, chunks, nil return lbls, chunks, nil
} }
@ -892,124 +858,3 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
return res, nil return res, nil
} }
// enbuf is a helper type to populate a byte slice with various types.
type encbuf struct {
b []byte
c [binary.MaxVarintLen64]byte
}
func (e *encbuf) reset() { e.b = e.b[:0] }
func (e *encbuf) get() []byte { return e.b }
func (e *encbuf) len() int { return len(e.b) }
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putBE32(x uint32) {
binary.BigEndian.PutUint32(e.c[:], x)
e.b = append(e.b, e.c[:4]...)
}
func (e *encbuf) putBE64(x uint64) {
binary.BigEndian.PutUint64(e.c[:], x)
e.b = append(e.b, e.c[:8]...)
}
func (e *encbuf) putUvarint64(x uint64) {
n := binary.PutUvarint(e.c[:], x)
e.b = append(e.b, e.c[:n]...)
}
func (e *encbuf) putVarint64(x int64) {
n := binary.PutVarint(e.c[:], x)
e.b = append(e.b, e.c[:n]...)
}
// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!).
func (e *encbuf) putUvarintStr(s string) {
b := *(*[]byte)(unsafe.Pointer(&s))
e.putUvarint(len(b))
e.putString(s)
}
// putHash appends a hash over the buffers current contents to the buffer.
func (e *encbuf) putHash(h hash.Hash) {
h.Reset()
_, err := h.Write(e.b)
if err != nil {
panic(err) // The CRC32 implementation does not error
}
e.b = h.Sum(e.b)
}
type decbuf struct {
b []byte
e error
}
func (d *decbuf) readUvarintStr() string {
l := d.readUvarint()
if d.e != nil {
return ""
}
if len(d.b) < int(l) {
d.e = errInvalidSize
return ""
}
s := string(d.b[:l])
d.b = d.b[l:]
return s
}
func (d *decbuf) readUvarint() uint64 {
if d.e != nil {
return 0
}
x, n := binary.Uvarint(d.b)
if n < 1 {
d.e = errInvalidSize
return 0
}
d.b = d.b[n:]
return x
}
func (d *decbuf) readBE32() uint32 {
if d.e != nil {
return 0
}
if len(d.b) < 4 {
d.e = errInvalidSize
return 0
}
x := binary.BigEndian.Uint32(d.b)
d.b = d.b[4:]
return x
}
func (d *decbuf) get(l int) decbuf {
if d.e != nil {
return decbuf{e: d.e}
}
if l > len(d.b) {
return decbuf{e: errInvalidSize}
}
r := decbuf{b: d.b[:l]}
d.b = d.b[l:]
return r
}
func (d *decbuf) err() error {
return d.e
}
func (d *decbuf) len() int {
return len(d.b)
}