index: extract decoder type

This commit is contained in:
Fabian Reinartz 2017-12-01 12:06:37 +01:00
parent 6dd2b83a7a
commit b945098e3a

View file

@ -531,6 +531,8 @@ type Reader struct {
// the block has been unmapped.
symbols map[uint32]string
dec *DecoderV1
crc32 hash.Hash32
}
@ -619,9 +621,35 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
if err != nil {
return nil, errors.Wrap(err, "read postings table")
}
r.dec = &DecoderV1{symbols: r.symbols}
return r, nil
}
// Range marks a byte range.
type Range struct {
Start, End int64
}
// PostingsRanges returns a new map of byte range in the underlying index file
// for all postings lists.
func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
m := map[labels.Label]Range{}
for l, start := range r.postings {
d := r.decbufAt(int(start))
if d.err() != nil {
return nil, d.err()
}
m[l] = Range{
Start: int64(start) + 4,
End: int64(start) + 4 + int64(d.len()),
}
}
return m, nil
}
func (r *Reader) readTOC() error {
if r.b.Len() < indexTOCLen {
return errInvalidSize
@ -750,6 +778,7 @@ func (r *Reader) readOffsetTable(off uint64, f func([]string, uint32) error) err
return d.err()
}
// Close the reader and its underlying resources.
func (r *Reader) Close() error {
return r.c.Close()
}
@ -762,6 +791,7 @@ func (r *Reader) lookupSymbol(o uint32) (string, error) {
return s, nil
}
// Symbols returns a set of symbols that exist within the index.
func (r *Reader) Symbols() (map[string]struct{}, error) {
res := make(map[string]struct{}, len(r.symbols))
@ -771,6 +801,12 @@ func (r *Reader) Symbols() (map[string]struct{}, error) {
return res, nil
}
// SymbolTable returns the symbol table that is used to resolve symbol references.
func (r *Reader) SymbolTable() map[uint32]string {
return r.symbols
}
// LabelValues returns value tuples that exist for the given label name tuples.
func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
const sep = "\xff"
@ -804,6 +840,7 @@ type emptyStringTuples struct{}
func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil }
func (emptyStringTuples) Len() int { return 0 }
// LabelIndices returns a for which labels or label tuples value indices exist.
func (r *Reader) LabelIndices() ([][]string, error) {
const sep = "\xff"
@ -815,86 +852,37 @@ func (r *Reader) LabelIndices() ([][]string, error) {
return res, nil
}
func (r *Reader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
d := r.decbufUvarintAt(int(ref))
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]
k := int(d.uvarint())
for i := 0; i < k; i++ {
lno := uint32(d.uvarint())
lvo := uint32(d.uvarint())
if d.err() != nil {
return errors.Wrap(d.err(), "read series label offsets")
}
ln, err := r.lookupSymbol(lno)
if err != nil {
return errors.Wrap(err, "lookup label name")
}
lv, err := r.lookupSymbol(lvo)
if err != nil {
return errors.Wrap(err, "lookup label value")
}
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
// Series the series with the given ID and writes its labels and chunks into lbls and chks.
func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
d := r.decbufUvarintAt(int(id))
if d.err() != nil {
return d.err()
}
// Read the chunks meta data.
k = int(d.uvarint())
if k == 0 {
return nil
}
t0 := d.varint64()
maxt := int64(d.uvarint64()) + t0
ref0 := int64(d.uvarint64())
*chks = append(*chks, chunks.Meta{
Ref: uint64(ref0),
MinTime: t0,
MaxTime: maxt,
})
t0 = maxt
for i := 1; i < k; i++ {
mint := int64(d.uvarint64()) + t0
maxt := int64(d.uvarint64()) + mint
ref0 += d.varint64()
t0 = maxt
if d.err() != nil {
return errors.Wrapf(d.err(), "read meta for chunk %d", i)
}
*chks = append(*chks, chunks.Meta{
Ref: uint64(ref0),
MinTime: mint,
MaxTime: maxt,
})
}
return d.err()
return r.dec.Series(d.get(), lbls, chks)
}
// Postings returns a postings list for the given label pair.
func (r *Reader) Postings(name, value string) (Postings, error) {
off, ok := r.postings[labels.Label{
Name: name,
Value: value,
}]
if !ok {
return emptyPostings, nil
return EmptyPostings(), nil
}
d := r.decbufAt(int(off))
d.be32() // consume unused postings list length.
return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes")
if d.err() != nil {
return nil, errors.Wrap(d.err(), "get postings entry")
}
_, p, err := r.dec.Postings(d.get())
if err != nil {
return nil, errors.Wrap(err, "decode postings")
}
return p, nil
}
// SortedPostings returns the given postings list reordered so that the backing series
// are sorted.
func (r *Reader) SortedPostings(p Postings) Postings {
return p
}
@ -966,3 +954,100 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
return res, nil
}
// DecoderV1 provides decoding methods for the v1 index file format.
//
// It currently does not contain decoding methods for all entry types but can be extended
// by them if there's demand.
type DecoderV1 struct {
symbols map[uint32]string
}
func (dec *DecoderV1) lookupSymbol(o uint32) (string, error) {
s, ok := dec.symbols[o]
if !ok {
return "", errors.Errorf("unknown symbol offset %d", o)
}
return s, nil
}
// SetSymbolTable set the symbol table to be used for lookups when decoding series
// and label indices
func (dec *DecoderV1) SetSymbolTable(t map[uint32]string) {
dec.symbols = t
}
// Postings returns a postings list for b and its number of elements.
func (dec *DecoderV1) Postings(b []byte) (int, Postings, error) {
d := decbuf{b: b}
n := d.be32int()
l := d.get()
return n, newBigEndianPostings(l), d.err()
}
// Series decodes a series entry from the given byte slice into lset and chks.
func (dec *DecoderV1) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]
d := decbuf{b: b}
k := int(d.uvarint())
for i := 0; i < k; i++ {
lno := uint32(d.uvarint())
lvo := uint32(d.uvarint())
if d.err() != nil {
return errors.Wrap(d.err(), "read series label offsets")
}
ln, err := dec.lookupSymbol(lno)
if err != nil {
return errors.Wrap(err, "lookup label name")
}
lv, err := dec.lookupSymbol(lvo)
if err != nil {
return errors.Wrap(err, "lookup label value")
}
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}
// Read the chunks meta data.
k = int(d.uvarint())
if k == 0 {
return nil
}
t0 := d.varint64()
maxt := int64(d.uvarint64()) + t0
ref0 := int64(d.uvarint64())
*chks = append(*chks, chunks.Meta{
Ref: uint64(ref0),
MinTime: t0,
MaxTime: maxt,
})
t0 = maxt
for i := 1; i < k; i++ {
mint := int64(d.uvarint64()) + t0
maxt := int64(d.uvarint64()) + mint
ref0 += d.varint64()
t0 = maxt
if d.err() != nil {
return errors.Wrapf(d.err(), "read meta for chunk %d", i)
}
*chks = append(*chks, chunks.Meta{
Ref: uint64(ref0),
MinTime: mint,
MaxTime: maxt,
})
}
return d.err()
}