Exposed helper methods for reading index bytes. (#492)

Changes:
* Make `NewReader` method useful. It was impossible to use it, because closer was always nil.
* ReadSymbols, TOC and ReadOffsetTable are not public functions (used by Thanos).
* decbufXXX are now functions.
* More verbose errors.
* Removed unused crc32 field.
* Some var name changes to make it more verbose:
  * symbols -> allocatedSymbols
  * symbolsSlice -> symbolsV1
  * symbols -> symbolsV2
  *
* Pre-calculate symbolsTableSize.
* Initialized symbols for Symbols() method with valid length.
* Added test for Symbol method.
* Made Decoder LookupSymbol method public. Kept Decode public as it is useful as helper from index package.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartek Płotka 2019-01-11 17:31:26 +00:00 committed by Brian Brazil
parent 8d991bdc1e
commit c065fa6957
3 changed files with 203 additions and 183 deletions

View file

@ -18,6 +18,8 @@ import (
"hash"
"hash/crc32"
"unsafe"
"github.com/pkg/errors"
)
// enbuf is a helper type to populate a byte slice with various types.
@ -86,6 +88,60 @@ type decbuf struct {
e error
}
// newDecbufAt returns a new decoding buffer. It expects the first 4 bytes
// after offset to hold the big endian encoded content length, followed by the contents and the expected
// checksum.
func newDecbufAt(bs ByteSlice, off int) decbuf {
if bs.Len() < off+4 {
return decbuf{e: errInvalidSize}
}
b := bs.Range(off, off+4)
l := int(binary.BigEndian.Uint32(b))
if bs.Len() < off+4+l+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = bs.Range(off+4, off+4+l+4)
dec := decbuf{b: b[:len(b)-4]}
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
// checksum.
func newDecbufUvarintAt(bs ByteSlice, off int) decbuf {
// We never have to access this method at the far end of the byte slice. Thus just checking
// against the MaxVarintLen32 is sufficient.
if bs.Len() < off+binary.MaxVarintLen32 {
return decbuf{e: errInvalidSize}
}
b := bs.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b)
if n <= 0 || n > binary.MaxVarintLen32 {
return decbuf{e: errors.Errorf("invalid uvarint %d", n)}
}
if bs.Len() < off+n+int(l)+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = bs.Range(off+n, off+n+int(l)+4)
dec := decbuf{b: b[:len(b)-4]}
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
return decbuf{e: errInvalidChecksum}
}
return dec
}
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }

View file

@ -20,6 +20,7 @@ import (
"hash"
"hash/crc32"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
@ -35,9 +36,13 @@ import (
const (
// MagicIndex 4 bytes at the head of an index file.
MagicIndex = 0xBAAAD700
// HeaderLen represents number of bytes reserved of index for header.
HeaderLen = 5
indexFormatV1 = 1
indexFormatV2 = 2
// FormatV1 represents 1 version of index.
FormatV1 = 1
// FormatV2 represents 2 version of index.
FormatV2 = 2
labelNameSeperator = "\xff"
)
@ -108,7 +113,7 @@ type Writer struct {
fbuf *bufio.Writer
pos uint64
toc indexTOC
toc TOC
stage indexWriterStage
// Reusable memory.
@ -129,13 +134,42 @@ type Writer struct {
Version int
}
type indexTOC struct {
symbols uint64
series uint64
labelIndices uint64
labelIndicesTable uint64
postings uint64
postingsTable uint64
// TOC represents index Table Of Content that states where each section of index starts.
type TOC struct {
Symbols uint64
Series uint64
LabelIndices uint64
LabelIndicesTable uint64
Postings uint64
PostingsTable uint64
}
// NewTOCFromByteSlice return parsed TOC from given index byte slice.
func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
if bs.Len() < indexTOCLen {
return nil, errInvalidSize
}
b := bs.Range(bs.Len()-indexTOCLen, bs.Len())
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
d := decbuf{b: b[:len(b)-4]}
if d.crc32() != expCRC {
return nil, errors.Wrap(errInvalidChecksum, "read TOC")
}
if err := d.err(); err != nil {
return nil, err
}
return &TOC{
Symbols: d.be64(),
Series: d.be64(),
LabelIndices: d.be64(),
LabelIndicesTable: d.be64(),
Postings: d.be64(),
PostingsTable: d.be64(),
}, nil
}
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
@ -223,22 +257,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
// Mark start of sections in table of contents.
switch s {
case idxStageSymbols:
w.toc.symbols = w.pos
w.toc.Symbols = w.pos
case idxStageSeries:
w.toc.series = w.pos
w.toc.Series = w.pos
case idxStageLabelIndex:
w.toc.labelIndices = w.pos
w.toc.LabelIndices = w.pos
case idxStagePostings:
w.toc.postings = w.pos
w.toc.Postings = w.pos
case idxStageDone:
w.toc.labelIndicesTable = w.pos
w.toc.LabelIndicesTable = w.pos
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
return err
}
w.toc.postingsTable = w.pos
w.toc.PostingsTable = w.pos
if err := w.writeOffsetTable(w.postings); err != nil {
return err
}
@ -254,7 +288,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
func (w *Writer) writeMeta() error {
w.buf1.reset()
w.buf1.putBE32(MagicIndex)
w.buf1.putByte(indexFormatV2)
w.buf1.putByte(FormatV2)
return w.write(w.buf1.get())
}
@ -346,8 +380,6 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error {
}
sort.Strings(symbols)
const headerSize = 4
w.buf1.reset()
w.buf2.reset()
@ -438,12 +470,12 @@ const indexTOCLen = 6*8 + 4
func (w *Writer) writeTOC() error {
w.buf1.reset()
w.buf1.putBE64(w.toc.symbols)
w.buf1.putBE64(w.toc.series)
w.buf1.putBE64(w.toc.labelIndices)
w.buf1.putBE64(w.toc.labelIndicesTable)
w.buf1.putBE64(w.toc.postings)
w.buf1.putBE64(w.toc.postingsTable)
w.buf1.putBE64(w.toc.Symbols)
w.buf1.putBE64(w.toc.Series)
w.buf1.putBE64(w.toc.LabelIndices)
w.buf1.putBE64(w.toc.LabelIndicesTable)
w.buf1.putBE64(w.toc.Postings)
w.buf1.putBE64(w.toc.PostingsTable)
w.buf1.putHash(w.crc32)
@ -535,15 +567,14 @@ type StringTuples interface {
}
type Reader struct {
// The underlying byte slice holding the encoded series data.
b ByteSlice
toc indexTOC
b ByteSlice
// Close that releases the underlying resources of the byte slice.
c io.Closer
// Cached hashmaps of section offsets.
labels map[string]uint64
labels map[string]uint64
// LabelName to LabelValue to offset map.
postings map[string]map[string]uint64
// Cache of read symbols. Strings that are returned when reading from the
// block are always backed by true strings held in here rather than
@ -551,19 +582,17 @@ type Reader struct {
// prevents memory faults when applications work with read symbols after
// the block has been unmapped. The older format has sparse indexes so a map
// must be used, but the new format is not so we can use a slice.
symbols map[uint32]string
symbolSlice []string
symbolsV1 map[uint32]string
symbolsV2 []string
symbolsTableSize uint64
dec *Decoder
crc32 hash.Hash32
version int
}
var (
errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")
errInvalidChecksum = fmt.Errorf("invalid checksum")
)
@ -587,10 +616,10 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
return b[start:end]
}
// NewReader returns a new IndexReader on the given byte slice. It automatically
// NewReader returns a new index reader on the given byte slice. It automatically
// handles different format versions.
func NewReader(b ByteSlice) (*Reader, error) {
return newReader(b, nil)
return newReader(b, ioutil.NopCloser(nil))
}
// NewFileReader returns a new index reader against the given index file.
@ -606,14 +635,12 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
r := &Reader{
b: b,
c: c,
symbols: map[uint32]string{},
labels: map[string]uint64{},
postings: map[string]map[string]uint64{},
crc32: newCRC32(),
}
// Verify header.
if b.Len() < 5 {
if r.b.Len() < HeaderLen {
return nil, errors.Wrap(errInvalidSize, "index header")
}
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
@ -621,54 +648,59 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
}
r.version = int(r.b.Range(4, 5)[0])
if r.version != 1 && r.version != 2 {
if r.version != FormatV1 && r.version != FormatV2 {
return nil, errors.Errorf("unknown index file version %d", r.version)
}
if err := r.readTOC(); err != nil {
toc, err := NewTOCFromByteSlice(b)
if err != nil {
return nil, errors.Wrap(err, "read TOC")
}
if err := r.readSymbols(int(r.toc.symbols)); err != nil {
r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(toc.Symbols))
if err != nil {
return nil, errors.Wrap(err, "read symbols")
}
var err error
// Use the strings already allocated by symbols, rather than
// re-allocating them again below.
symbols := make(map[string]string, len(r.symbols)+len(r.symbolSlice))
for _, s := range r.symbols {
symbols[s] = s
// Additionally, calculate symbolsTableSize.
allocatedSymbols := make(map[string]string, len(r.symbolsV1)+len(r.symbolsV2))
for _, s := range r.symbolsV1 {
r.symbolsTableSize += uint64(len(s) + 8)
allocatedSymbols[s] = s
}
for _, s := range r.symbolSlice {
symbols[s] = s
for _, s := range r.symbolsV2 {
r.symbolsTableSize += uint64(len(s) + 8)
allocatedSymbols[s] = s
}
err = r.readOffsetTable(r.toc.labelIndicesTable, func(key []string, off uint64) error {
if err := ReadOffsetTable(r.b, toc.LabelIndicesTable, func(key []string, off uint64) error {
if len(key) != 1 {
return errors.Errorf("unexpected key length %d", len(key))
return errors.Errorf("unexpected key length for label indices table %d", len(key))
}
r.labels[symbols[key[0]]] = off
r.labels[allocatedSymbols[key[0]]] = off
return nil
})
if err != nil {
}); err != nil {
return nil, errors.Wrap(err, "read label index table")
}
r.postings[""] = map[string]uint64{}
err = r.readOffsetTable(r.toc.postingsTable, func(key []string, off uint64) error {
if err := ReadOffsetTable(r.b, toc.PostingsTable, func(key []string, off uint64) error {
if len(key) != 2 {
return errors.Errorf("unexpected key length %d", len(key))
return errors.Errorf("unexpected key length for posting table %d", len(key))
}
if _, ok := r.postings[key[0]]; !ok {
r.postings[symbols[key[0]]] = map[string]uint64{}
r.postings[allocatedSymbols[key[0]]] = map[string]uint64{}
}
r.postings[key[0]][symbols[key[1]]] = off
r.postings[key[0]][allocatedSymbols[key[1]]] = off
return nil
})
if err != nil {
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}
r.dec = &Decoder{lookupSymbol: r.lookupSymbol}
r.dec = &Decoder{LookupSymbol: r.lookupSymbol}
return r, nil
}
@ -690,7 +722,7 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
for k, e := range r.postings {
for v, start := range e {
d := r.decbufAt(int(start))
d := newDecbufAt(r.b, int(start))
if d.err() != nil {
return nil, d.err()
}
@ -703,121 +735,45 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
return m, nil
}
func (r *Reader) readTOC() error {
if r.b.Len() < indexTOCLen {
return errInvalidSize
}
b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len())
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
d := decbuf{b: b[:len(b)-4]}
if d.crc32() != expCRC {
return errors.Wrap(errInvalidChecksum, "read TOC")
}
r.toc.symbols = d.be64()
r.toc.series = d.be64()
r.toc.labelIndices = d.be64()
r.toc.labelIndicesTable = d.be64()
r.toc.postings = d.be64()
r.toc.postingsTable = d.be64()
return d.err()
}
// decbufAt returns a new decoding buffer. It expects the first 4 bytes
// after offset to hold the big endian encoded content length, followed by the contents and the expected
// checksum.
func (r *Reader) decbufAt(off int) decbuf {
if r.b.Len() < off+4 {
return decbuf{e: errInvalidSize}
}
b := r.b.Range(off, off+4)
l := int(binary.BigEndian.Uint32(b))
if r.b.Len() < off+4+l+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+4, off+4+l+4)
dec := decbuf{b: b[:len(b)-4]}
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
// checksum.
func (r *Reader) decbufUvarintAt(off int) decbuf {
// We never have to access this method at the far end of the byte slice. Thus just checking
// against the MaxVarintLen32 is sufficient.
if r.b.Len() < off+binary.MaxVarintLen32 {
return decbuf{e: errInvalidSize}
}
b := r.b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b)
if n <= 0 || n > binary.MaxVarintLen32 {
return decbuf{e: errors.Errorf("invalid uvarint %d", n)}
}
if r.b.Len() < off+n+int(l)+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+n, off+n+int(l)+4)
dec := decbuf{b: b[:len(b)-4]}
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
// ReadSymbols reads the symbol table fully into memory and allocates proper strings for them.
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them
// after the reader is closed.
func (r *Reader) readSymbols(off int) error {
func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]string, error) {
if off == 0 {
return nil
return nil, nil, nil
}
d := r.decbufAt(off)
d := newDecbufAt(bs, off)
var (
origLen = d.len()
cnt = d.be32int()
basePos = uint32(off) + 4
nextPos = basePos + uint32(origLen-d.len())
origLen = d.len()
cnt = d.be32int()
basePos = uint32(off) + 4
nextPos = basePos + uint32(origLen-d.len())
symbolSlice []string
symbols = map[uint32]string{}
)
if r.version == 2 {
r.symbolSlice = make([]string, 0, cnt)
if version == 2 {
symbolSlice = make([]string, 0, cnt)
}
for d.err() == nil && d.len() > 0 && cnt > 0 {
s := d.uvarintStr()
if r.version == 2 {
r.symbolSlice = append(r.symbolSlice, s)
if version == FormatV2 {
symbolSlice = append(symbolSlice, s)
} else {
r.symbols[nextPos] = s
symbols[nextPos] = s
nextPos = basePos + uint32(origLen-d.len())
}
cnt--
}
return errors.Wrap(d.err(), "read symbols")
return symbolSlice, symbols, errors.Wrap(d.err(), "read symbols")
}
// readOffsetTable reads an offset table at the given position calls f for each
// found entry.f
// If f returns an error it stops decoding and returns the received error,
func (r *Reader) readOffsetTable(off uint64, f func([]string, uint64) error) error {
d := r.decbufAt(int(off))
// ReadOffsetTable reads an offset table and at the given position calls f for each
// found entry. If f returns an error it stops decoding and returns the received error.
func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error {
d := newDecbufAt(bs, int(off))
cnt := d.be32()
for d.err() == nil && d.len() > 0 && cnt > 0 {
@ -845,10 +801,10 @@ func (r *Reader) Close() error {
}
func (r *Reader) lookupSymbol(o uint32) (string, error) {
if int(o) < len(r.symbolSlice) {
return r.symbolSlice[o], nil
if int(o) < len(r.symbolsV2) {
return r.symbolsV2[o], nil
}
s, ok := r.symbols[o]
s, ok := r.symbolsV1[o]
if !ok {
return "", errors.Errorf("unknown symbol offset %d", o)
}
@ -857,27 +813,20 @@ func (r *Reader) lookupSymbol(o uint32) (string, error) {
// Symbols returns a set of symbols that exist within the index.
func (r *Reader) Symbols() (map[string]struct{}, error) {
res := make(map[string]struct{}, len(r.symbols))
res := make(map[string]struct{}, len(r.symbolsV1)+len(r.symbolsV2))
for _, s := range r.symbols {
for _, s := range r.symbolsV1 {
res[s] = struct{}{}
}
for _, s := range r.symbolSlice {
for _, s := range r.symbolsV2 {
res[s] = struct{}{}
}
return res, nil
}
// SymbolTableSize returns the symbol table that is used to resolve symbol references.
// SymbolTableSize returns the symbol table size in bytes.
func (r *Reader) SymbolTableSize() uint64 {
var size int
for _, s := range r.symbols {
size += len(s) + 8
}
for _, s := range r.symbolSlice {
size += len(s) + 8
}
return uint64(size)
return r.symbolsTableSize
}
// LabelValues returns value tuples that exist for the given label name tuples.
@ -892,7 +841,7 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
//return nil, fmt.Errorf("label index doesn't exist")
}
d := r.decbufAt(int(off))
d := newDecbufAt(r.b, int(off))
nc := d.be32int()
d.be32() // consume unused value entry count.
@ -916,7 +865,7 @@ func (emptyStringTuples) Len() int { return 0 }
// LabelIndices returns a slice of label names for which labels or label tuples value indices exist.
// NOTE: This is deprecated. Use `LabelNames()` instead.
func (r *Reader) LabelIndices() ([][]string, error) {
res := [][]string{}
var res [][]string
for s := range r.labels {
res = append(res, strings.Split(s, labelNameSeperator))
}
@ -928,10 +877,10 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
if r.version == 2 {
if r.version == FormatV2 {
offset = id * 16
}
d := r.decbufUvarintAt(int(offset))
d := newDecbufUvarintAt(r.b, int(offset))
if d.err() != nil {
return d.err()
}
@ -948,7 +897,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) {
if !ok {
return EmptyPostings(), nil
}
d := r.decbufAt(int(off))
d := newDecbufAt(r.b, int(off))
if d.err() != nil {
return nil, errors.Wrap(d.err(), "get postings entry")
}
@ -1062,7 +1011,7 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
// It currently does not contain decoding methods for all entry types but can be extended
// by them if there's demand.
type Decoder struct {
lookupSymbol func(uint32) (string, error)
LookupSymbol func(uint32) (string, error)
}
// Postings returns a postings list for b and its number of elements.
@ -1090,11 +1039,11 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e
return errors.Wrap(d.err(), "read series label offsets")
}
ln, err := dec.lookupSymbol(lno)
ln, err := dec.LookupSymbol(lno)
if err != nil {
return errors.Wrap(err, "lookup label name")
}
lv, err := dec.lookupSymbol(lvo)
lv, err := dec.LookupSymbol(lvo)
if err != nil {
return errors.Wrap(err, "lookup label value")
}

View file

@ -380,13 +380,28 @@ func TestPersistence_index_e2e(t *testing.T) {
}
}
gotSymbols, err := ir.Symbols()
testutil.Ok(t, err)
testutil.Equals(t, len(mi.symbols), len(gotSymbols))
for s := range mi.symbols {
_, ok := gotSymbols[s]
testutil.Assert(t, ok, "")
}
testutil.Ok(t, ir.Close())
}
func TestDecbufUvariantWithInvalidBuffer(t *testing.T) {
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
db := newDecbufUvarintAt(b, 0)
testutil.NotOk(t, db.err())
}
func TestReaderWithInvalidBuffer(t *testing.T) {
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
r := &Reader{b: b}
db := r.decbufUvarintAt(0)
testutil.NotOk(t, db.err())
_, err := NewReader(b)
testutil.NotOk(t, err)
}