From b7c3cfecbf6355b10b1171945e37f6bce6697724 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 9 Nov 2017 17:27:09 +0000 Subject: [PATCH] index: abstract ByteSlice and adjust indexReader This replaces the builtin byte slice with an interface for the index reader. This allows the complex decoding of the index file format to be used against more generalized implementations. --- block.go | 6 +- index.go | 269 +++++++++++++++++++++++++++----------------------- index_test.go | 8 +- 3 files changed, 153 insertions(+), 130 deletions(-) diff --git a/block.go b/block.go index 2bd1fd364..2d7bab2d8 100644 --- a/block.go +++ b/block.go @@ -142,8 +142,8 @@ type Block struct { dir string meta BlockMeta - chunkr *chunkReader - indexr *indexReader + chunkr ChunkReader + indexr IndexReader tombstones tombstoneReader } @@ -160,7 +160,7 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { if err != nil { return nil, err } - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) if err != nil { return nil, err } diff --git a/index.go b/index.go index 36783ced2..277807e21 100644 --- a/index.go +++ b/index.go @@ -560,7 +560,7 @@ type StringTuples interface { type indexReader struct { // The underlying byte slice holding the encoded series data. - b []byte + b ByteSlice toc indexTOC // Close that releases the underlying resources of the byte slice. @@ -585,27 +585,52 @@ var ( errInvalidChecksum = fmt.Errorf("invalid checksum") ) -// NewIndexReader returns a new IndexReader on the given directory. -func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) } +// ByteSlice abstracts a byte slice. +type ByteSlice interface { + Len() int + Range(start, end int) []byte +} -// newIndexReader returns a new indexReader on the given directory. -func newIndexReader(dir string) (*indexReader, error) { - f, err := openMmapFile(filepath.Join(dir, "index")) +type realByteSlice []byte + +func (b realByteSlice) Len() int { + return len(b) +} + +func (b realByteSlice) Range(start, end int) []byte { + return b[start:end] +} + +func (b realByteSlice) Sub(start, end int) ByteSlice { + return b[start:end] +} + +// NewIndexReader returns a new IndexReader on the given directory. +func NewIndexReader(b ByteSlice) (IndexReader, error) { + return newIndexReader(b, nil) +} + +func NewFileIndexReader(path string) (IndexReader, error) { + f, err := openMmapFile(path) if err != nil { return nil, err } + return newIndexReader(realByteSlice(f.b), f) +} + +// newIndexReader returns a new indexReader on the given directory. +func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { r := &indexReader{ - b: f.b, - c: f, + b: b, + c: c, symbols: map[uint32]string{}, crc32: newCRC32(), } - // Verify magic number. - if len(f.b) < 4 { + if b.Len() < 4 { return nil, errors.Wrap(errInvalidSize, "index header") } - if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { + if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { return nil, errors.Errorf("invalid magic number %x", m) } @@ -615,6 +640,7 @@ func newIndexReader(dir string) (*indexReader, error) { if err := r.readSymbols(int(r.toc.symbols)); err != nil { return nil, errors.Wrap(err, "read symbols") } + var err error r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) if err != nil { @@ -625,31 +651,80 @@ func newIndexReader(dir string) (*indexReader, error) { } func (r *indexReader) readTOC() error { - d1 := r.decbufAt(len(r.b) - indexTOCLen) - d2 := d1.decbuf(indexTOCLen - 4) - crc := d2.crc32() - - r.toc.symbols = d2.be64() - r.toc.series = d2.be64() - r.toc.labelIndices = d2.be64() - r.toc.labelIndicesTable = d2.be64() - r.toc.postings = d2.be64() - r.toc.postingsTable = d2.be64() - - if d2.err() != nil { - return d2.err() + if r.b.Len() < indexTOCLen { + return errInvalidSize } - if read := d1.be32(); crc != read { - return errors.Wrap(errInvalidChecksum, "read TOC") + b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len()) + + expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) + d := decbuf{b: b[:len(b)-4]} + + if d.crc32() != expCRC { + return (errInvalidChecksum) } - return d1.err() + + r.toc.symbols = d.be64() + r.toc.series = d.be64() + r.toc.labelIndices = d.be64() + r.toc.labelIndicesTable = d.be64() + r.toc.postings = d.be64() + r.toc.postingsTable = d.be64() + + return d.err() } +// decbufAt returns a new decoding buffer. It expects the first 4 bytes +// after offset to hold the big endian encoded content length, followed by the contents and the expected +// checksum. func (r *indexReader) decbufAt(off int) decbuf { - if len(r.b) < off { + if r.b.Len() < off+4 { return decbuf{e: errInvalidSize} } - return decbuf{b: r.b[off:]} + b := r.b.Range(off, off+4) + l := int(binary.BigEndian.Uint32(b)) + + if r.b.Len() < off+4+l+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = r.b.Range(off+4, off+4+l+4) + dec := decbuf{b: b[:len(b)-4]} + + if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { + return decbuf{e: errInvalidChecksum} + } + return dec +} + +// decbufAt returns a new decoding buffer. It expects the first bytes +// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected +// checksum. +func (r *indexReader) decbufUvarintAt(off int) decbuf { + // We never have to access this method at the far end of the byte slice. Thus just checking + // against the MaxVarintLen32 is sufficient. + if r.b.Len() < off+binary.MaxVarintLen32 { + return decbuf{e: errInvalidSize} + } + b := r.b.Range(off, off+binary.MaxVarintLen32) + + l, n := binary.Uvarint(b) + if n > binary.MaxVarintLen32 { + return decbuf{e: errors.New("invalid uvarint")} + } + + if r.b.Len() < off+n+int(l)+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = r.b.Range(off+n, off+n+int(l)+4) + dec := decbuf{b: b[:len(b)-4]} + + if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { + return decbuf{e: errInvalidChecksum} + } + return dec } // readSymbols reads the symbol table fully into memory and allocates proper strings for them. @@ -659,26 +734,22 @@ func (r *indexReader) readSymbols(off int) error { if off == 0 { return nil } + d := r.decbufAt(off) + var ( - d1 = r.decbufAt(int(off)) - d2 = d1.decbuf(d1.be32int()) - crc = d2.crc32() - origLen = d2.len() - cnt = d2.be32int() + origLen = d.len() + cnt = d.be32int() basePos = uint32(off) + 4 - nextPos = basePos + uint32(origLen-d2.len()) + nextPos = basePos + uint32(origLen-d.len()) ) - for d2.err() == nil && d2.len() > 0 && cnt > 0 { - s := d2.uvarintStr() + for d.err() == nil && d.len() > 0 && cnt > 0 { + s := d.uvarintStr() r.symbols[uint32(nextPos)] = s - nextPos = basePos + uint32(origLen-d2.len()) + nextPos = basePos + uint32(origLen-d.len()) cnt-- } - if read := d1.be32(); crc != read { - return errors.Wrap(errInvalidChecksum, "read symbols") - } - return d2.err() + return d.err() } // readOffsetTable reads an offset table at the given position and returns a map @@ -686,55 +757,29 @@ func (r *indexReader) readSymbols(off int) error { func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { const sep = "\xff" - var ( - d1 = r.decbufAt(int(off)) - d2 = d1.decbuf(d1.be32int()) - crc = d2.crc32() - cnt = d2.be32() - ) + d := r.decbufAt(int(off)) + cnt := d.be32() - res := make(map[string]uint32, 512) + res := make(map[string]uint32, cnt) - for d2.err() == nil && d2.len() > 0 && cnt > 0 { - keyCount := int(d2.uvarint()) + for d.err() == nil && d.len() > 0 && cnt > 0 { + keyCount := int(d.uvarint()) keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d2.uvarintStr()) + keys = append(keys, d.uvarintStr()) } - res[strings.Join(keys, sep)] = uint32(d2.uvarint()) + res[strings.Join(keys, sep)] = uint32(d.uvarint()) cnt-- } - if read := d1.be32(); crc != read { - return nil, errors.Wrap(errInvalidChecksum, "read offset table") - } - return res, d2.err() + return res, d.err() } func (r *indexReader) Close() error { return r.c.Close() } -func (r *indexReader) section(o uint32) (byte, []byte, error) { - b := r.b[o:] - - if len(b) < 5 { - return 0, nil, errors.Wrap(errInvalidSize, "read header") - } - - flag := b[0] - l := binary.BigEndian.Uint32(b[1:5]) - - b = b[5:] - - // b must have the given length plus 4 bytes for the CRC32 checksum. - if len(b) < int(l)+4 { - return 0, nil, errors.Wrap(errInvalidSize, "section content") - } - return flag, b[:l], nil -} - func (r *indexReader) lookupSymbol(o uint32) (string, error) { s, ok := r.symbols[o] if !ok { @@ -764,23 +809,17 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - d1 := r.decbufAt(int(off)) - d2 := d1.decbuf(d1.be32int()) - crc := d2.crc32() + d := r.decbufAt(int(off)) - nc := d2.be32int() - d2.be32() // consume unused value entry count. + nc := d.be32int() + d.be32() // consume unused value entry count. - if d2.err() != nil { - return nil, errors.Wrap(d2.err(), "read label value index") - } - - if read := d1.be32(); crc != read { - return nil, errors.Wrap(errInvalidChecksum, "read label values") + if d.err() != nil { + return nil, errors.Wrap(d.err(), "read label value index") } st := &serializedStringTuples{ l: nc, - b: d2.get(), + b: d.get(), lookup: r.lookupSymbol, } return st, nil @@ -803,21 +842,19 @@ func (r *indexReader) LabelIndices() ([][]string, error) { } func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { - d1 := r.decbufAt(int(ref)) - d2 := d1.decbuf(d1.uvarint()) - crc := d2.crc32() + d := r.decbufUvarintAt(int(ref)) *lbls = (*lbls)[:0] *chks = (*chks)[:0] - k := int(d2.uvarint()) + k := int(d.uvarint()) for i := 0; i < k; i++ { - lno := uint32(d2.uvarint()) - lvo := uint32(d2.uvarint()) + lno := uint32(d.uvarint()) + lvo := uint32(d.uvarint()) - if d2.err() != nil { - return errors.Wrap(d2.err(), "read series label offsets") + if d.err() != nil { + return errors.Wrap(d.err(), "read series label offsets") } ln, err := r.lookupSymbol(lno) @@ -833,15 +870,15 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) } // Read the chunks meta data. - k = int(d2.uvarint()) + k = int(d.uvarint()) if k == 0 { return nil } - t0 := d2.varint64() - maxt := int64(d2.uvarint64()) + t0 - ref0 := int64(d2.uvarint64()) + t0 := d.varint64() + maxt := int64(d.uvarint64()) + t0 + ref0 := int64(d.uvarint64()) *chks = append(*chks, ChunkMeta{ Ref: uint64(ref0), @@ -851,14 +888,14 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) t0 = maxt for i := 1; i < k; i++ { - mint := int64(d2.uvarint64()) + t0 - maxt := int64(d2.uvarint64()) + mint + mint := int64(d.uvarint64()) + t0 + maxt := int64(d.uvarint64()) + mint - ref0 += d2.varint64() + ref0 += d.varint64() t0 = maxt - if d2.err() != nil { - return errors.Wrapf(d2.err(), "read meta for chunk %d", i) + if d.err() != nil { + return errors.Wrapf(d.err(), "read meta for chunk %d", i) } *chks = append(*chks, ChunkMeta{ @@ -867,10 +904,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) MaxTime: maxt, }) } - if read := d1.be32(); crc != read { - return errors.Wrap(errInvalidChecksum, "read series") - } - return nil + return d.err() } func (r *indexReader) Postings(name, value string) (Postings, error) { @@ -881,21 +915,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { if !ok { return emptyPostings, nil } + d := r.decbufAt(int(off)) + d.be32() // consume unused postings list length. - d1 := r.decbufAt(int(off)) - d2 := d1.decbuf(d1.be32int()) - - crc := d2.crc32() - - d2.be32() // consume unused postings list length. - - if d2.err() != nil { - return nil, errors.Wrap(d2.err(), "get postings bytes") - } - if read := d1.be32(); crc != read { - return nil, errors.Wrap(errInvalidChecksum, "read postings") - } - return newBigEndianPostings(d2.get()), nil + return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes") } func (r *indexReader) SortedPostings(p Postings) Postings { diff --git a/index_test.go b/index_test.go index 7b908e39a..f18aba0eb 100644 --- a/index_test.go +++ b/index_test.go @@ -159,7 +159,7 @@ func TestIndexRW_Create_Open(t *testing.T) { require.NoError(t, err, "create index writer") require.NoError(t, iw.Close(), "close index writer") - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err, "open index reader") require.NoError(t, ir.Close(), "close index reader") @@ -169,7 +169,7 @@ func TestIndexRW_Create_Open(t *testing.T) { _, err = f.WriteAt([]byte{0, 0}, 0) require.NoError(t, err) - _, err = newIndexReader(dir) + _, err = NewFileIndexReader(dir) require.Error(t, err) } @@ -210,7 +210,7 @@ func TestIndexRW_Postings(t *testing.T) { require.NoError(t, iw.Close()) - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err, "open index reader") p, err := ir.Postings("a", "1") @@ -325,7 +325,7 @@ func TestPersistence_index_e2e(t *testing.T) { err = iw.Close() require.NoError(t, err) - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err) for p := range mi.postings.m {