index: abstract ByteSlice and adjust indexReader

This replaces the builtin byte slice with an interface for the index
reader. This allows the complex decoding of the index file format
to be used against more generalized implementations.
This commit is contained in:
Fabian Reinartz 2017-11-09 17:27:09 +00:00
parent c354d6bd59
commit b7c3cfecbf
3 changed files with 153 additions and 130 deletions

View file

@ -142,8 +142,8 @@ type Block struct {
dir string
meta BlockMeta
chunkr *chunkReader
indexr *indexReader
chunkr ChunkReader
indexr IndexReader
tombstones tombstoneReader
}
@ -160,7 +160,7 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) {
if err != nil {
return nil, err
}
ir, err := newIndexReader(dir)
ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
if err != nil {
return nil, err
}

271
index.go
View file

@ -560,7 +560,7 @@ type StringTuples interface {
type indexReader struct {
// The underlying byte slice holding the encoded series data.
b []byte
b ByteSlice
toc indexTOC
// Close that releases the underlying resources of the byte slice.
@ -585,27 +585,52 @@ var (
errInvalidChecksum = fmt.Errorf("invalid checksum")
)
// NewIndexReader returns a new IndexReader on the given directory.
func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) }
// ByteSlice abstracts a byte slice.
type ByteSlice interface {
Len() int
Range(start, end int) []byte
}
// newIndexReader returns a new indexReader on the given directory.
func newIndexReader(dir string) (*indexReader, error) {
f, err := openMmapFile(filepath.Join(dir, "index"))
type realByteSlice []byte
func (b realByteSlice) Len() int {
return len(b)
}
func (b realByteSlice) Range(start, end int) []byte {
return b[start:end]
}
func (b realByteSlice) Sub(start, end int) ByteSlice {
return b[start:end]
}
// NewIndexReader returns a new IndexReader on the given directory.
func NewIndexReader(b ByteSlice) (IndexReader, error) {
return newIndexReader(b, nil)
}
func NewFileIndexReader(path string) (IndexReader, error) {
f, err := openMmapFile(path)
if err != nil {
return nil, err
}
return newIndexReader(realByteSlice(f.b), f)
}
// newIndexReader returns a new indexReader on the given directory.
func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) {
r := &indexReader{
b: f.b,
c: f,
b: b,
c: c,
symbols: map[uint32]string{},
crc32: newCRC32(),
}
// Verify magic number.
if len(f.b) < 4 {
if b.Len() < 4 {
return nil, errors.Wrap(errInvalidSize, "index header")
}
if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex {
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
return nil, errors.Errorf("invalid magic number %x", m)
}
@ -615,6 +640,7 @@ func newIndexReader(dir string) (*indexReader, error) {
if err := r.readSymbols(int(r.toc.symbols)); err != nil {
return nil, errors.Wrap(err, "read symbols")
}
var err error
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
if err != nil {
@ -625,31 +651,80 @@ func newIndexReader(dir string) (*indexReader, error) {
}
func (r *indexReader) readTOC() error {
d1 := r.decbufAt(len(r.b) - indexTOCLen)
d2 := d1.decbuf(indexTOCLen - 4)
crc := d2.crc32()
r.toc.symbols = d2.be64()
r.toc.series = d2.be64()
r.toc.labelIndices = d2.be64()
r.toc.labelIndicesTable = d2.be64()
r.toc.postings = d2.be64()
r.toc.postingsTable = d2.be64()
if d2.err() != nil {
return d2.err()
if r.b.Len() < indexTOCLen {
return errInvalidSize
}
if read := d1.be32(); crc != read {
return errors.Wrap(errInvalidChecksum, "read TOC")
}
return d1.err()
b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len())
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
d := decbuf{b: b[:len(b)-4]}
if d.crc32() != expCRC {
return (errInvalidChecksum)
}
r.toc.symbols = d.be64()
r.toc.series = d.be64()
r.toc.labelIndices = d.be64()
r.toc.labelIndicesTable = d.be64()
r.toc.postings = d.be64()
r.toc.postingsTable = d.be64()
return d.err()
}
// decbufAt returns a new decoding buffer. It expects the first 4 bytes
// after offset to hold the big endian encoded content length, followed by the contents and the expected
// checksum.
func (r *indexReader) decbufAt(off int) decbuf {
if len(r.b) < off {
if r.b.Len() < off+4 {
return decbuf{e: errInvalidSize}
}
return decbuf{b: r.b[off:]}
b := r.b.Range(off, off+4)
l := int(binary.BigEndian.Uint32(b))
if r.b.Len() < off+4+l+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+4, off+4+l+4)
dec := decbuf{b: b[:len(b)-4]}
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// decbufAt returns a new decoding buffer. It expects the first bytes
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
// checksum.
func (r *indexReader) decbufUvarintAt(off int) decbuf {
// We never have to access this method at the far end of the byte slice. Thus just checking
// against the MaxVarintLen32 is sufficient.
if r.b.Len() < off+binary.MaxVarintLen32 {
return decbuf{e: errInvalidSize}
}
b := r.b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b)
if n > binary.MaxVarintLen32 {
return decbuf{e: errors.New("invalid uvarint")}
}
if r.b.Len() < off+n+int(l)+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+n, off+n+int(l)+4)
dec := decbuf{b: b[:len(b)-4]}
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
@ -659,26 +734,22 @@ func (r *indexReader) readSymbols(off int) error {
if off == 0 {
return nil
}
d := r.decbufAt(off)
var (
d1 = r.decbufAt(int(off))
d2 = d1.decbuf(d1.be32int())
crc = d2.crc32()
origLen = d2.len()
cnt = d2.be32int()
origLen = d.len()
cnt = d.be32int()
basePos = uint32(off) + 4
nextPos = basePos + uint32(origLen-d2.len())
nextPos = basePos + uint32(origLen-d.len())
)
for d2.err() == nil && d2.len() > 0 && cnt > 0 {
s := d2.uvarintStr()
for d.err() == nil && d.len() > 0 && cnt > 0 {
s := d.uvarintStr()
r.symbols[uint32(nextPos)] = s
nextPos = basePos + uint32(origLen-d2.len())
nextPos = basePos + uint32(origLen-d.len())
cnt--
}
if read := d1.be32(); crc != read {
return errors.Wrap(errInvalidChecksum, "read symbols")
}
return d2.err()
return d.err()
}
// readOffsetTable reads an offset table at the given position and returns a map
@ -686,55 +757,29 @@ func (r *indexReader) readSymbols(off int) error {
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
const sep = "\xff"
var (
d1 = r.decbufAt(int(off))
d2 = d1.decbuf(d1.be32int())
crc = d2.crc32()
cnt = d2.be32()
)
d := r.decbufAt(int(off))
cnt := d.be32()
res := make(map[string]uint32, 512)
res := make(map[string]uint32, cnt)
for d2.err() == nil && d2.len() > 0 && cnt > 0 {
keyCount := int(d2.uvarint())
for d.err() == nil && d.len() > 0 && cnt > 0 {
keyCount := int(d.uvarint())
keys := make([]string, 0, keyCount)
for i := 0; i < keyCount; i++ {
keys = append(keys, d2.uvarintStr())
keys = append(keys, d.uvarintStr())
}
res[strings.Join(keys, sep)] = uint32(d2.uvarint())
res[strings.Join(keys, sep)] = uint32(d.uvarint())
cnt--
}
if read := d1.be32(); crc != read {
return nil, errors.Wrap(errInvalidChecksum, "read offset table")
}
return res, d2.err()
return res, d.err()
}
func (r *indexReader) Close() error {
return r.c.Close()
}
func (r *indexReader) section(o uint32) (byte, []byte, error) {
b := r.b[o:]
if len(b) < 5 {
return 0, nil, errors.Wrap(errInvalidSize, "read header")
}
flag := b[0]
l := binary.BigEndian.Uint32(b[1:5])
b = b[5:]
// b must have the given length plus 4 bytes for the CRC32 checksum.
if len(b) < int(l)+4 {
return 0, nil, errors.Wrap(errInvalidSize, "section content")
}
return flag, b[:l], nil
}
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
s, ok := r.symbols[o]
if !ok {
@ -764,23 +809,17 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
//return nil, fmt.Errorf("label index doesn't exist")
}
d1 := r.decbufAt(int(off))
d2 := d1.decbuf(d1.be32int())
crc := d2.crc32()
d := r.decbufAt(int(off))
nc := d2.be32int()
d2.be32() // consume unused value entry count.
nc := d.be32int()
d.be32() // consume unused value entry count.
if d2.err() != nil {
return nil, errors.Wrap(d2.err(), "read label value index")
}
if read := d1.be32(); crc != read {
return nil, errors.Wrap(errInvalidChecksum, "read label values")
if d.err() != nil {
return nil, errors.Wrap(d.err(), "read label value index")
}
st := &serializedStringTuples{
l: nc,
b: d2.get(),
b: d.get(),
lookup: r.lookupSymbol,
}
return st, nil
@ -803,21 +842,19 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
}
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
d1 := r.decbufAt(int(ref))
d2 := d1.decbuf(d1.uvarint())
crc := d2.crc32()
d := r.decbufUvarintAt(int(ref))
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]
k := int(d2.uvarint())
k := int(d.uvarint())
for i := 0; i < k; i++ {
lno := uint32(d2.uvarint())
lvo := uint32(d2.uvarint())
lno := uint32(d.uvarint())
lvo := uint32(d.uvarint())
if d2.err() != nil {
return errors.Wrap(d2.err(), "read series label offsets")
if d.err() != nil {
return errors.Wrap(d.err(), "read series label offsets")
}
ln, err := r.lookupSymbol(lno)
@ -833,15 +870,15 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
}
// Read the chunks meta data.
k = int(d2.uvarint())
k = int(d.uvarint())
if k == 0 {
return nil
}
t0 := d2.varint64()
maxt := int64(d2.uvarint64()) + t0
ref0 := int64(d2.uvarint64())
t0 := d.varint64()
maxt := int64(d.uvarint64()) + t0
ref0 := int64(d.uvarint64())
*chks = append(*chks, ChunkMeta{
Ref: uint64(ref0),
@ -851,14 +888,14 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
t0 = maxt
for i := 1; i < k; i++ {
mint := int64(d2.uvarint64()) + t0
maxt := int64(d2.uvarint64()) + mint
mint := int64(d.uvarint64()) + t0
maxt := int64(d.uvarint64()) + mint
ref0 += d2.varint64()
ref0 += d.varint64()
t0 = maxt
if d2.err() != nil {
return errors.Wrapf(d2.err(), "read meta for chunk %d", i)
if d.err() != nil {
return errors.Wrapf(d.err(), "read meta for chunk %d", i)
}
*chks = append(*chks, ChunkMeta{
@ -867,10 +904,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
MaxTime: maxt,
})
}
if read := d1.be32(); crc != read {
return errors.Wrap(errInvalidChecksum, "read series")
}
return nil
return d.err()
}
func (r *indexReader) Postings(name, value string) (Postings, error) {
@ -881,21 +915,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
if !ok {
return emptyPostings, nil
}
d := r.decbufAt(int(off))
d.be32() // consume unused postings list length.
d1 := r.decbufAt(int(off))
d2 := d1.decbuf(d1.be32int())
crc := d2.crc32()
d2.be32() // consume unused postings list length.
if d2.err() != nil {
return nil, errors.Wrap(d2.err(), "get postings bytes")
}
if read := d1.be32(); crc != read {
return nil, errors.Wrap(errInvalidChecksum, "read postings")
}
return newBigEndianPostings(d2.get()), nil
return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes")
}
func (r *indexReader) SortedPostings(p Postings) Postings {

View file

@ -159,7 +159,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
require.NoError(t, err, "create index writer")
require.NoError(t, iw.Close(), "close index writer")
ir, err := newIndexReader(dir)
ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
require.NoError(t, err, "open index reader")
require.NoError(t, ir.Close(), "close index reader")
@ -169,7 +169,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
_, err = f.WriteAt([]byte{0, 0}, 0)
require.NoError(t, err)
_, err = newIndexReader(dir)
_, err = NewFileIndexReader(dir)
require.Error(t, err)
}
@ -210,7 +210,7 @@ func TestIndexRW_Postings(t *testing.T) {
require.NoError(t, iw.Close())
ir, err := newIndexReader(dir)
ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
require.NoError(t, err, "open index reader")
p, err := ir.Postings("a", "1")
@ -325,7 +325,7 @@ func TestPersistence_index_e2e(t *testing.T) {
err = iw.Close()
require.NoError(t, err)
ir, err := newIndexReader(dir)
ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
require.NoError(t, err)
for p := range mi.postings.m {