From b945098e3ad2408df919f4818e6446c1961acb84 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 1 Dec 2017 12:06:37 +0100 Subject: [PATCH] index: extract decoder type --- index/index.go | 219 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 152 insertions(+), 67 deletions(-) diff --git a/index/index.go b/index/index.go index a6d673bf0..ad6dbb218 100644 --- a/index/index.go +++ b/index/index.go @@ -531,6 +531,8 @@ type Reader struct { // the block has been unmapped. symbols map[uint32]string + dec *DecoderV1 + crc32 hash.Hash32 } @@ -619,9 +621,35 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { if err != nil { return nil, errors.Wrap(err, "read postings table") } + + r.dec = &DecoderV1{symbols: r.symbols} + return r, nil } +// Range marks a byte range. +type Range struct { + Start, End int64 +} + +// PostingsRanges returns a new map of byte range in the underlying index file +// for all postings lists. +func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { + m := map[labels.Label]Range{} + + for l, start := range r.postings { + d := r.decbufAt(int(start)) + if d.err() != nil { + return nil, d.err() + } + m[l] = Range{ + Start: int64(start) + 4, + End: int64(start) + 4 + int64(d.len()), + } + } + return m, nil +} + func (r *Reader) readTOC() error { if r.b.Len() < indexTOCLen { return errInvalidSize @@ -750,6 +778,7 @@ func (r *Reader) readOffsetTable(off uint64, f func([]string, uint32) error) err return d.err() } +// Close the reader and its underlying resources. func (r *Reader) Close() error { return r.c.Close() } @@ -762,6 +791,7 @@ func (r *Reader) lookupSymbol(o uint32) (string, error) { return s, nil } +// Symbols returns a set of symbols that exist within the index. func (r *Reader) Symbols() (map[string]struct{}, error) { res := make(map[string]struct{}, len(r.symbols)) @@ -771,6 +801,12 @@ func (r *Reader) Symbols() (map[string]struct{}, error) { return res, nil } +// SymbolTable returns the symbol table that is used to resolve symbol references. +func (r *Reader) SymbolTable() map[uint32]string { + return r.symbols +} + +// LabelValues returns value tuples that exist for the given label name tuples. func (r *Reader) LabelValues(names ...string) (StringTuples, error) { const sep = "\xff" @@ -804,6 +840,7 @@ type emptyStringTuples struct{} func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil } func (emptyStringTuples) Len() int { return 0 } +// LabelIndices returns a for which labels or label tuples value indices exist. func (r *Reader) LabelIndices() ([][]string, error) { const sep = "\xff" @@ -815,86 +852,37 @@ func (r *Reader) LabelIndices() ([][]string, error) { return res, nil } -func (r *Reader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { - d := r.decbufUvarintAt(int(ref)) - - *lbls = (*lbls)[:0] - *chks = (*chks)[:0] - - k := int(d.uvarint()) - - for i := 0; i < k; i++ { - lno := uint32(d.uvarint()) - lvo := uint32(d.uvarint()) - - if d.err() != nil { - return errors.Wrap(d.err(), "read series label offsets") - } - - ln, err := r.lookupSymbol(lno) - if err != nil { - return errors.Wrap(err, "lookup label name") - } - lv, err := r.lookupSymbol(lvo) - if err != nil { - return errors.Wrap(err, "lookup label value") - } - - *lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) +// Series the series with the given ID and writes its labels and chunks into lbls and chks. +func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { + d := r.decbufUvarintAt(int(id)) + if d.err() != nil { + return d.err() } - - // Read the chunks meta data. - k = int(d.uvarint()) - - if k == 0 { - return nil - } - - t0 := d.varint64() - maxt := int64(d.uvarint64()) + t0 - ref0 := int64(d.uvarint64()) - - *chks = append(*chks, chunks.Meta{ - Ref: uint64(ref0), - MinTime: t0, - MaxTime: maxt, - }) - t0 = maxt - - for i := 1; i < k; i++ { - mint := int64(d.uvarint64()) + t0 - maxt := int64(d.uvarint64()) + mint - - ref0 += d.varint64() - t0 = maxt - - if d.err() != nil { - return errors.Wrapf(d.err(), "read meta for chunk %d", i) - } - - *chks = append(*chks, chunks.Meta{ - Ref: uint64(ref0), - MinTime: mint, - MaxTime: maxt, - }) - } - return d.err() + return r.dec.Series(d.get(), lbls, chks) } +// Postings returns a postings list for the given label pair. func (r *Reader) Postings(name, value string) (Postings, error) { off, ok := r.postings[labels.Label{ Name: name, Value: value, }] if !ok { - return emptyPostings, nil + return EmptyPostings(), nil } d := r.decbufAt(int(off)) - d.be32() // consume unused postings list length. - - return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes") + if d.err() != nil { + return nil, errors.Wrap(d.err(), "get postings entry") + } + _, p, err := r.dec.Postings(d.get()) + if err != nil { + return nil, errors.Wrap(err, "decode postings") + } + return p, nil } +// SortedPostings returns the given postings list reordered so that the backing series +// are sorted. func (r *Reader) SortedPostings(p Postings) Postings { return p } @@ -966,3 +954,100 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { return res, nil } + +// DecoderV1 provides decoding methods for the v1 index file format. +// +// It currently does not contain decoding methods for all entry types but can be extended +// by them if there's demand. +type DecoderV1 struct { + symbols map[uint32]string +} + +func (dec *DecoderV1) lookupSymbol(o uint32) (string, error) { + s, ok := dec.symbols[o] + if !ok { + return "", errors.Errorf("unknown symbol offset %d", o) + } + return s, nil +} + +// SetSymbolTable set the symbol table to be used for lookups when decoding series +// and label indices +func (dec *DecoderV1) SetSymbolTable(t map[uint32]string) { + dec.symbols = t +} + +// Postings returns a postings list for b and its number of elements. +func (dec *DecoderV1) Postings(b []byte) (int, Postings, error) { + d := decbuf{b: b} + n := d.be32int() + l := d.get() + return n, newBigEndianPostings(l), d.err() +} + +// Series decodes a series entry from the given byte slice into lset and chks. +func (dec *DecoderV1) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { + *lbls = (*lbls)[:0] + *chks = (*chks)[:0] + + d := decbuf{b: b} + + k := int(d.uvarint()) + + for i := 0; i < k; i++ { + lno := uint32(d.uvarint()) + lvo := uint32(d.uvarint()) + + if d.err() != nil { + return errors.Wrap(d.err(), "read series label offsets") + } + + ln, err := dec.lookupSymbol(lno) + if err != nil { + return errors.Wrap(err, "lookup label name") + } + lv, err := dec.lookupSymbol(lvo) + if err != nil { + return errors.Wrap(err, "lookup label value") + } + + *lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) + } + + // Read the chunks meta data. + k = int(d.uvarint()) + + if k == 0 { + return nil + } + + t0 := d.varint64() + maxt := int64(d.uvarint64()) + t0 + ref0 := int64(d.uvarint64()) + + *chks = append(*chks, chunks.Meta{ + Ref: uint64(ref0), + MinTime: t0, + MaxTime: maxt, + }) + t0 = maxt + + for i := 1; i < k; i++ { + mint := int64(d.uvarint64()) + t0 + maxt := int64(d.uvarint64()) + mint + + ref0 += d.varint64() + t0 = maxt + + if d.err() != nil { + return errors.Wrapf(d.err(), "read meta for chunk %d", i) + } + + *chks = append(*chks, chunks.Meta{ + Ref: uint64(ref0), + MinTime: mint, + MaxTime: maxt, + }) + } + return d.err() +}