Make version for index format configurable

This commit is contained in:
Shubheksha Jalan 2018-01-11 00:49:16 +05:30
parent 129773b41a
commit 44052bc937
3 changed files with 32 additions and 127 deletions

View file

@ -216,7 +216,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
enc.SetIndent("", "\t") enc.SetIndent("", "\t")
var merr MultiError var merr MultiError
meta.Version = 2
if merr.Add(enc.Encode(meta)); merr.Err() != nil { if merr.Add(enc.Encode(meta)); merr.Err() != nil {
merr.Add(f.Close()) merr.Add(f.Close())
return merr.Err() return merr.Err()
@ -253,7 +253,7 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ir, err := index.NewFileReader(filepath.Join(dir, "index")) ir, err := index.NewFileReader(filepath.Join(dir, "index"), meta.Version)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -428,6 +428,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
} }
indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename)) indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename))
meta.Version = indexw.Version
if err != nil { if err != nil {
return errors.Wrap(err, "open index writer") return errors.Wrap(err, "open index writer")
} }

View file

@ -98,7 +98,7 @@ func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable) return crc32.New(castagnoliTable)
} }
// indexWriter implements the IndexWriter interface for the standard // Writer implements the IndexWriter interface for the standard
// serialization format. // serialization format.
type Writer struct { type Writer struct {
f *os.File f *os.File
@ -122,6 +122,8 @@ type Writer struct {
lastSeries labels.Labels lastSeries labels.Labels
crc32 hash.Hash crc32 hash.Hash
Version int
} }
type indexTOC struct { type indexTOC struct {
@ -185,7 +187,7 @@ func (w *Writer) write(bufs ...[]byte) error {
// Once we move to compressed/varint representations in those areas, this limitation // Once we move to compressed/varint representations in those areas, this limitation
// can be lifted. // can be lifted.
if w.pos > 16*math.MaxUint32 { if w.pos > 16*math.MaxUint32 {
return errors.Errorf("exceeding max size of 4GiB") return errors.Errorf("exceeding max size of 64GiB")
} }
} }
return nil return nil
@ -250,7 +252,7 @@ func (w *Writer) writeMeta() error {
return w.write(w.buf1.get()) return w.write(w.buf1.get())
} }
//AddSeries adds the series one at a time along with its chunks. // AddSeries adds the series one at a time along with its chunks.
func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error { func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error {
if err := w.ensureStage(idxStageSeries); err != nil { if err := w.ensureStage(idxStageSeries); err != nil {
return err return err
@ -264,6 +266,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
} }
w.addPadding(16) w.addPadding(16)
w.seriesOffsets[ref] = w.pos / 16 w.seriesOffsets[ref] = w.pos / 16
w.Version = 2
w.buf2.reset() w.buf2.reset()
w.buf2.putUvarint(len(lset)) w.buf2.putUvarint(len(lset))
@ -533,9 +536,11 @@ type Reader struct {
// the block has been unmapped. // the block has been unmapped.
symbols map[uint32]string symbols map[uint32]string
dec Decoder dec *Decoder
crc32 hash.Hash32 crc32 hash.Hash32
version int
} }
var ( var (
@ -565,28 +570,24 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
} }
// NewReader returns a new IndexReader on the given byte slice. // NewReader returns a new IndexReader on the given byte slice.
func NewReader(b ByteSlice) (*Reader, error) { func NewReader(b ByteSlice, v int) (*Reader, error) {
return newReader(b, nil, nil) return newReader(b, nil, v)
} }
func NewReaderV1(b ByteSlice, c io.Closer, d DecoderV1) (*Reader, error) { func NewReaderV1(b ByteSlice, c io.Closer, v int) (*Reader, error) {
return newReader(b, c, d) return newReader(b, c, v)
}
func NewReaderV2(b ByteSlice, c io.Closer, d DecoderV1) (*Reader, error) {
return newReader(b, c, d)
} }
// NewFileReader returns a new index reader against the given index file. // NewFileReader returns a new index reader against the given index file.
func NewFileReader(path string) (*Reader, error) { func NewFileReader(path string, v int) (*Reader, error) {
f, err := fileutil.OpenMmapFile(path) f, err := fileutil.OpenMmapFile(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newReader(realByteSlice(f.Bytes()), f, nil) return newReader(realByteSlice(f.Bytes()), f, v)
} }
func newReader(b ByteSlice, c io.Closer, d Decoder) (*Reader, error) { func newReader(b ByteSlice, c io.Closer, v int) (*Reader, error) {
r := &Reader{ r := &Reader{
b: b, b: b,
c: c, c: c,
@ -594,6 +595,7 @@ func newReader(b ByteSlice, c io.Closer, d Decoder) (*Reader, error) {
labels: map[string]uint32{}, labels: map[string]uint32{},
postings: map[labels.Label]uint32{}, postings: map[labels.Label]uint32{},
crc32: newCRC32(), crc32: newCRC32(),
version: v,
} }
// Verify magic number. // Verify magic number.
if b.Len() < 4 { if b.Len() < 4 {
@ -632,7 +634,7 @@ func newReader(b ByteSlice, c io.Closer, d Decoder) (*Reader, error) {
return nil, errors.Wrap(err, "read postings table") return nil, errors.Wrap(err, "read postings table")
} }
r.dec = &DecoderV1{symbols: r.symbols} r.dec = &Decoder{symbols: r.symbols}
return r, nil return r, nil
} }
@ -862,9 +864,13 @@ func (r *Reader) LabelIndices() ([][]string, error) {
return res, nil return res, nil
} }
// Series the series with the given ID and writes its labels and chunks into lbls and chks. // Reads the series with the given ID and writes its labels and chunks into lbls and chks.
func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
d := r.decbufUvarintAt(int(16 * id)) offset := id
if r.version == 2 {
offset = 16 * id
}
d := r.decbufUvarintAt(int(offset))
if d.err() != nil { if d.err() != nil {
return d.err() return d.err()
} }
@ -969,19 +975,11 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
// //
// It currently does not contain decoding methods for all entry types but can be extended // It currently does not contain decoding methods for all entry types but can be extended
// by them if there's demand. // by them if there's demand.
type Decoder interface { type Decoder struct {
lookupSymbol(o uint32) (string, error)
SetSymbolTable(t map[uint32]string)
Postings(b []byte) (int, Postings, error)
Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error
}
// DecoderV1 is used for dedcoding V1 of the index file.
type DecoderV1 struct {
symbols map[uint32]string symbols map[uint32]string
} }
func (dec DecoderV1) lookupSymbol(o uint32) (string, error) { func (dec *Decoder) lookupSymbol(o uint32) (string, error) {
s, ok := dec.symbols[o] s, ok := dec.symbols[o]
if !ok { if !ok {
return "", errors.Errorf("unknown symbol offset %d", o) return "", errors.Errorf("unknown symbol offset %d", o)
@ -991,12 +989,12 @@ func (dec DecoderV1) lookupSymbol(o uint32) (string, error) {
// SetSymbolTable set the symbol table to be used for lookups when decoding series // SetSymbolTable set the symbol table to be used for lookups when decoding series
// and label indices // and label indices
func (dec DecoderV1) SetSymbolTable(t map[uint32]string) { func (dec *Decoder) SetSymbolTable(t map[uint32]string) {
dec.symbols = t dec.symbols = t
} }
// Postings returns a postings list for b and its number of elements. // Postings returns a postings list for b and its number of elements.
func (dec DecoderV1) Postings(b []byte) (int, Postings, error) { func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
d := decbuf{b: b} d := decbuf{b: b}
n := d.be32int() n := d.be32int()
l := d.get() l := d.get()
@ -1004,101 +1002,7 @@ func (dec DecoderV1) Postings(b []byte) (int, Postings, error) {
} }
// Series decodes a series entry from the given byte slice into lset and chks. // Series decodes a series entry from the given byte slice into lset and chks.
func (dec DecoderV1) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]
d := decbuf{b: b}
k := int(d.uvarint())
for i := 0; i < k; i++ {
lno := uint32(d.uvarint())
lvo := uint32(d.uvarint())
if d.err() != nil {
return errors.Wrap(d.err(), "read series label offsets")
}
ln, err := dec.lookupSymbol(lno)
if err != nil {
return errors.Wrap(err, "lookup label name")
}
lv, err := dec.lookupSymbol(lvo)
if err != nil {
return errors.Wrap(err, "lookup label value")
}
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}
// Read the chunks meta data.
k = int(d.uvarint())
if k == 0 {
return nil
}
t0 := d.varint64()
maxt := int64(d.uvarint64()) + t0
ref0 := int64(d.uvarint64())
*chks = append(*chks, chunks.Meta{
Ref: uint64(ref0),
MinTime: t0,
MaxTime: maxt,
})
t0 = maxt
for i := 1; i < k; i++ {
mint := int64(d.uvarint64()) + t0
maxt := int64(d.uvarint64()) + mint
ref0 += d.varint64()
t0 = maxt
if d.err() != nil {
return errors.Wrapf(d.err(), "read meta for chunk %d", i)
}
*chks = append(*chks, chunks.Meta{
Ref: uint64(ref0),
MinTime: mint,
MaxTime: maxt,
})
}
return d.err()
}
// DecoderV2 is used for dedcoding V2 of the index file.
type DecoderV2 struct {
symbols map[uint32]string
}
func (dec *DecoderV2) lookupSymbol(o uint32) (string, error) {
s, ok := dec.symbols[o]
if !ok {
return "", errors.Errorf("unknown symbol offset %d", o)
}
return s, nil
}
// SetSymbolTable set the symbol table to be used for lookups when decoding series
// and label indices
func (dec *DecoderV2) SetSymbolTable(t map[uint32]string) {
dec.symbols = t
}
// Postings returns a postings list for b and its number of elements.
func (dec *DecoderV2) Postings(b []byte) (int, Postings, error) {
d := decbuf{b: b}
n := d.be32int()
l := d.get()
return n, newBigEndianPostings(l), d.err()
}
// Series decodes a series entry from the given byte slice into lset and chks.
func (dec DecoderV2) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
*lbls = (*lbls)[:0] *lbls = (*lbls)[:0]
*chks = (*chks)[:0] *chks = (*chks)[:0]