From 44052bc9374a3198d1f56e4750b0e3dcccb5c45c Mon Sep 17 00:00:00 2001 From: Shubheksha Jalan Date: Thu, 11 Jan 2018 00:49:16 +0530 Subject: [PATCH] Make version for index format configurable --- block.go | 4 +- compact.go | 1 + index/index.go | 154 ++++++++++--------------------------------------- 3 files changed, 32 insertions(+), 127 deletions(-) diff --git a/block.go b/block.go index 3dd49b511..35466c29e 100644 --- a/block.go +++ b/block.go @@ -216,7 +216,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error { enc.SetIndent("", "\t") var merr MultiError - meta.Version = 2 + if merr.Add(enc.Encode(meta)); merr.Err() != nil { merr.Add(f.Close()) return merr.Err() @@ -253,7 +253,7 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { if err != nil { return nil, err } - ir, err := index.NewFileReader(filepath.Join(dir, "index")) + ir, err := index.NewFileReader(filepath.Join(dir, "index"), meta.Version) if err != nil { return nil, err } diff --git a/compact.go b/compact.go index 5c532759f..f96e64394 100644 --- a/compact.go +++ b/compact.go @@ -428,6 +428,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename)) + meta.Version = indexw.Version if err != nil { return errors.Wrap(err, "open index writer") } diff --git a/index/index.go b/index/index.go index 2d76970d1..3b7a09c39 100644 --- a/index/index.go +++ b/index/index.go @@ -98,7 +98,7 @@ func newCRC32() hash.Hash32 { return crc32.New(castagnoliTable) } -// indexWriter implements the IndexWriter interface for the standard +// Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { f *os.File @@ -122,6 +122,8 @@ type Writer struct { lastSeries labels.Labels crc32 hash.Hash + + Version int } type indexTOC struct { @@ -185,7 +187,7 @@ func (w *Writer) write(bufs ...[]byte) error { // Once we move to compressed/varint representations in those areas, this limitation // can be lifted. if w.pos > 16*math.MaxUint32 { - return errors.Errorf("exceeding max size of 4GiB") + return errors.Errorf("exceeding max size of 64GiB") } } return nil @@ -250,7 +252,7 @@ func (w *Writer) writeMeta() error { return w.write(w.buf1.get()) } -//AddSeries adds the series one at a time along with its chunks. +// AddSeries adds the series one at a time along with its chunks. func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error { if err := w.ensureStage(idxStageSeries); err != nil { return err @@ -264,6 +266,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta } w.addPadding(16) w.seriesOffsets[ref] = w.pos / 16 + w.Version = 2 w.buf2.reset() w.buf2.putUvarint(len(lset)) @@ -533,9 +536,11 @@ type Reader struct { // the block has been unmapped. symbols map[uint32]string - dec Decoder + dec *Decoder crc32 hash.Hash32 + + version int } var ( @@ -565,28 +570,24 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { } // NewReader returns a new IndexReader on the given byte slice. -func NewReader(b ByteSlice) (*Reader, error) { - return newReader(b, nil, nil) +func NewReader(b ByteSlice, v int) (*Reader, error) { + return newReader(b, nil, v) } -func NewReaderV1(b ByteSlice, c io.Closer, d DecoderV1) (*Reader, error) { - return newReader(b, c, d) -} - -func NewReaderV2(b ByteSlice, c io.Closer, d DecoderV1) (*Reader, error) { - return newReader(b, c, d) +func NewReaderV1(b ByteSlice, c io.Closer, v int) (*Reader, error) { + return newReader(b, c, v) } // NewFileReader returns a new index reader against the given index file. -func NewFileReader(path string) (*Reader, error) { +func NewFileReader(path string, v int) (*Reader, error) { f, err := fileutil.OpenMmapFile(path) if err != nil { return nil, err } - return newReader(realByteSlice(f.Bytes()), f, nil) + return newReader(realByteSlice(f.Bytes()), f, v) } -func newReader(b ByteSlice, c io.Closer, d Decoder) (*Reader, error) { +func newReader(b ByteSlice, c io.Closer, v int) (*Reader, error) { r := &Reader{ b: b, c: c, @@ -594,6 +595,7 @@ func newReader(b ByteSlice, c io.Closer, d Decoder) (*Reader, error) { labels: map[string]uint32{}, postings: map[labels.Label]uint32{}, crc32: newCRC32(), + version: v, } // Verify magic number. if b.Len() < 4 { @@ -632,7 +634,7 @@ func newReader(b ByteSlice, c io.Closer, d Decoder) (*Reader, error) { return nil, errors.Wrap(err, "read postings table") } - r.dec = &DecoderV1{symbols: r.symbols} + r.dec = &Decoder{symbols: r.symbols} return r, nil } @@ -862,9 +864,13 @@ func (r *Reader) LabelIndices() ([][]string, error) { return res, nil } -// Series the series with the given ID and writes its labels and chunks into lbls and chks. +// Reads the series with the given ID and writes its labels and chunks into lbls and chks. func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { - d := r.decbufUvarintAt(int(16 * id)) + offset := id + if r.version == 2 { + offset = 16 * id + } + d := r.decbufUvarintAt(int(offset)) if d.err() != nil { return d.err() } @@ -969,19 +975,11 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { // // It currently does not contain decoding methods for all entry types but can be extended // by them if there's demand. -type Decoder interface { - lookupSymbol(o uint32) (string, error) - SetSymbolTable(t map[uint32]string) - Postings(b []byte) (int, Postings, error) - Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error -} - -// DecoderV1 is used for dedcoding V1 of the index file. -type DecoderV1 struct { +type Decoder struct { symbols map[uint32]string } -func (dec DecoderV1) lookupSymbol(o uint32) (string, error) { +func (dec *Decoder) lookupSymbol(o uint32) (string, error) { s, ok := dec.symbols[o] if !ok { return "", errors.Errorf("unknown symbol offset %d", o) @@ -991,12 +989,12 @@ func (dec DecoderV1) lookupSymbol(o uint32) (string, error) { // SetSymbolTable set the symbol table to be used for lookups when decoding series // and label indices -func (dec DecoderV1) SetSymbolTable(t map[uint32]string) { +func (dec *Decoder) SetSymbolTable(t map[uint32]string) { dec.symbols = t } // Postings returns a postings list for b and its number of elements. -func (dec DecoderV1) Postings(b []byte) (int, Postings, error) { +func (dec *Decoder) Postings(b []byte) (int, Postings, error) { d := decbuf{b: b} n := d.be32int() l := d.get() @@ -1004,101 +1002,7 @@ func (dec DecoderV1) Postings(b []byte) (int, Postings, error) { } // Series decodes a series entry from the given byte slice into lset and chks. -func (dec DecoderV1) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { - *lbls = (*lbls)[:0] - *chks = (*chks)[:0] - - d := decbuf{b: b} - - k := int(d.uvarint()) - - for i := 0; i < k; i++ { - lno := uint32(d.uvarint()) - lvo := uint32(d.uvarint()) - - if d.err() != nil { - return errors.Wrap(d.err(), "read series label offsets") - } - - ln, err := dec.lookupSymbol(lno) - if err != nil { - return errors.Wrap(err, "lookup label name") - } - lv, err := dec.lookupSymbol(lvo) - if err != nil { - return errors.Wrap(err, "lookup label value") - } - - *lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) - } - - // Read the chunks meta data. - k = int(d.uvarint()) - - if k == 0 { - return nil - } - - t0 := d.varint64() - maxt := int64(d.uvarint64()) + t0 - ref0 := int64(d.uvarint64()) - - *chks = append(*chks, chunks.Meta{ - Ref: uint64(ref0), - MinTime: t0, - MaxTime: maxt, - }) - t0 = maxt - - for i := 1; i < k; i++ { - mint := int64(d.uvarint64()) + t0 - maxt := int64(d.uvarint64()) + mint - - ref0 += d.varint64() - t0 = maxt - - if d.err() != nil { - return errors.Wrapf(d.err(), "read meta for chunk %d", i) - } - - *chks = append(*chks, chunks.Meta{ - Ref: uint64(ref0), - MinTime: mint, - MaxTime: maxt, - }) - } - return d.err() -} - -// DecoderV2 is used for dedcoding V2 of the index file. -type DecoderV2 struct { - symbols map[uint32]string -} - -func (dec *DecoderV2) lookupSymbol(o uint32) (string, error) { - s, ok := dec.symbols[o] - if !ok { - return "", errors.Errorf("unknown symbol offset %d", o) - } - return s, nil -} - -// SetSymbolTable set the symbol table to be used for lookups when decoding series -// and label indices -func (dec *DecoderV2) SetSymbolTable(t map[uint32]string) { - dec.symbols = t -} - -// Postings returns a postings list for b and its number of elements. -func (dec *DecoderV2) Postings(b []byte) (int, Postings, error) { - d := decbuf{b: b} - n := d.be32int() - l := d.get() - return n, newBigEndianPostings(l), d.err() -} - -// Series decodes a series entry from the given byte slice into lset and chks. -func (dec DecoderV2) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { +func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { *lbls = (*lbls)[:0] *chks = (*chks)[:0]