mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-21 03:16:00 -08:00
merge BlockMeta and blockMeta struct, implement v1 and v2 index readers
This commit is contained in:
parent
77eb9fb56e
commit
129773b41a
18
block.go
18
block.go
|
@ -151,6 +151,9 @@ type BlockMeta struct {
|
|||
|
||||
// Information on compactions the block was created from.
|
||||
Compaction BlockMetaCompaction `json:"compaction"`
|
||||
|
||||
// Version of the index format
|
||||
Version int `json:"version"`
|
||||
}
|
||||
|
||||
// BlockStats contains stats about contents of a block.
|
||||
|
@ -176,12 +179,6 @@ const (
|
|||
flagStd = 1
|
||||
)
|
||||
|
||||
type blockMeta struct {
|
||||
Version int `json:"version"`
|
||||
|
||||
*BlockMeta
|
||||
}
|
||||
|
||||
const indexFilename = "index"
|
||||
const metaFilename = "meta.json"
|
||||
|
||||
|
@ -193,16 +190,16 @@ func readMetaFile(dir string) (*BlockMeta, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var m blockMeta
|
||||
var m BlockMeta
|
||||
|
||||
if err := json.Unmarshal(b, &m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if m.Version != 1 {
|
||||
if m.Version != 1 && m.Version != 2 {
|
||||
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
|
||||
}
|
||||
|
||||
return m.BlockMeta, nil
|
||||
return &m, nil
|
||||
}
|
||||
|
||||
func writeMetaFile(dir string, meta *BlockMeta) error {
|
||||
|
@ -219,7 +216,8 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
|
|||
enc.SetIndent("", "\t")
|
||||
|
||||
var merr MultiError
|
||||
if merr.Add(enc.Encode(&blockMeta{Version: 1, BlockMeta: meta})); merr.Err() != nil {
|
||||
meta.Version = 2
|
||||
if merr.Add(enc.Encode(meta)); merr.Err() != nil {
|
||||
merr.Add(f.Close())
|
||||
return merr.Err()
|
||||
}
|
||||
|
|
128
index/index.go
128
index/index.go
|
@ -533,7 +533,7 @@ type Reader struct {
|
|||
// the block has been unmapped.
|
||||
symbols map[uint32]string
|
||||
|
||||
dec *DecoderV1
|
||||
dec Decoder
|
||||
|
||||
crc32 hash.Hash32
|
||||
}
|
||||
|
@ -566,7 +566,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
|
|||
|
||||
// NewReader returns a new IndexReader on the given byte slice.
|
||||
func NewReader(b ByteSlice) (*Reader, error) {
|
||||
return newReader(b, nil)
|
||||
return newReader(b, nil, nil)
|
||||
}
|
||||
|
||||
func NewReaderV1(b ByteSlice, c io.Closer, d DecoderV1) (*Reader, error) {
|
||||
return newReader(b, c, d)
|
||||
}
|
||||
|
||||
func NewReaderV2(b ByteSlice, c io.Closer, d DecoderV1) (*Reader, error) {
|
||||
return newReader(b, c, d)
|
||||
}
|
||||
|
||||
// NewFileReader returns a new index reader against the given index file.
|
||||
|
@ -575,10 +583,10 @@ func NewFileReader(path string) (*Reader, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newReader(realByteSlice(f.Bytes()), f)
|
||||
return newReader(realByteSlice(f.Bytes()), f, nil)
|
||||
}
|
||||
|
||||
func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
||||
func newReader(b ByteSlice, c io.Closer, d Decoder) (*Reader, error) {
|
||||
r := &Reader{
|
||||
b: b,
|
||||
c: c,
|
||||
|
@ -957,15 +965,23 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
// DecoderV1 provides decoding methods for the v1 index file format.
|
||||
// Decoder provides decoding methods for the v1 and v2 index file format.
|
||||
//
|
||||
// It currently does not contain decoding methods for all entry types but can be extended
|
||||
// by them if there's demand.
|
||||
type Decoder interface {
|
||||
lookupSymbol(o uint32) (string, error)
|
||||
SetSymbolTable(t map[uint32]string)
|
||||
Postings(b []byte) (int, Postings, error)
|
||||
Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error
|
||||
}
|
||||
|
||||
// DecoderV1 is used for dedcoding V1 of the index file.
|
||||
type DecoderV1 struct {
|
||||
symbols map[uint32]string
|
||||
}
|
||||
|
||||
func (dec *DecoderV1) lookupSymbol(o uint32) (string, error) {
|
||||
func (dec DecoderV1) lookupSymbol(o uint32) (string, error) {
|
||||
s, ok := dec.symbols[o]
|
||||
if !ok {
|
||||
return "", errors.Errorf("unknown symbol offset %d", o)
|
||||
|
@ -975,12 +991,12 @@ func (dec *DecoderV1) lookupSymbol(o uint32) (string, error) {
|
|||
|
||||
// SetSymbolTable set the symbol table to be used for lookups when decoding series
|
||||
// and label indices
|
||||
func (dec *DecoderV1) SetSymbolTable(t map[uint32]string) {
|
||||
func (dec DecoderV1) SetSymbolTable(t map[uint32]string) {
|
||||
dec.symbols = t
|
||||
}
|
||||
|
||||
// Postings returns a postings list for b and its number of elements.
|
||||
func (dec *DecoderV1) Postings(b []byte) (int, Postings, error) {
|
||||
func (dec DecoderV1) Postings(b []byte) (int, Postings, error) {
|
||||
d := decbuf{b: b}
|
||||
n := d.be32int()
|
||||
l := d.get()
|
||||
|
@ -988,7 +1004,101 @@ func (dec *DecoderV1) Postings(b []byte) (int, Postings, error) {
|
|||
}
|
||||
|
||||
// Series decodes a series entry from the given byte slice into lset and chks.
|
||||
func (dec *DecoderV1) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
|
||||
func (dec DecoderV1) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
|
||||
*lbls = (*lbls)[:0]
|
||||
*chks = (*chks)[:0]
|
||||
|
||||
d := decbuf{b: b}
|
||||
|
||||
k := int(d.uvarint())
|
||||
|
||||
for i := 0; i < k; i++ {
|
||||
lno := uint32(d.uvarint())
|
||||
lvo := uint32(d.uvarint())
|
||||
|
||||
if d.err() != nil {
|
||||
return errors.Wrap(d.err(), "read series label offsets")
|
||||
}
|
||||
|
||||
ln, err := dec.lookupSymbol(lno)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "lookup label name")
|
||||
}
|
||||
lv, err := dec.lookupSymbol(lvo)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "lookup label value")
|
||||
}
|
||||
|
||||
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
|
||||
}
|
||||
|
||||
// Read the chunks meta data.
|
||||
k = int(d.uvarint())
|
||||
|
||||
if k == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
t0 := d.varint64()
|
||||
maxt := int64(d.uvarint64()) + t0
|
||||
ref0 := int64(d.uvarint64())
|
||||
|
||||
*chks = append(*chks, chunks.Meta{
|
||||
Ref: uint64(ref0),
|
||||
MinTime: t0,
|
||||
MaxTime: maxt,
|
||||
})
|
||||
t0 = maxt
|
||||
|
||||
for i := 1; i < k; i++ {
|
||||
mint := int64(d.uvarint64()) + t0
|
||||
maxt := int64(d.uvarint64()) + mint
|
||||
|
||||
ref0 += d.varint64()
|
||||
t0 = maxt
|
||||
|
||||
if d.err() != nil {
|
||||
return errors.Wrapf(d.err(), "read meta for chunk %d", i)
|
||||
}
|
||||
|
||||
*chks = append(*chks, chunks.Meta{
|
||||
Ref: uint64(ref0),
|
||||
MinTime: mint,
|
||||
MaxTime: maxt,
|
||||
})
|
||||
}
|
||||
return d.err()
|
||||
}
|
||||
|
||||
// DecoderV2 is used for dedcoding V2 of the index file.
|
||||
type DecoderV2 struct {
|
||||
symbols map[uint32]string
|
||||
}
|
||||
|
||||
func (dec *DecoderV2) lookupSymbol(o uint32) (string, error) {
|
||||
s, ok := dec.symbols[o]
|
||||
if !ok {
|
||||
return "", errors.Errorf("unknown symbol offset %d", o)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// SetSymbolTable set the symbol table to be used for lookups when decoding series
|
||||
// and label indices
|
||||
func (dec *DecoderV2) SetSymbolTable(t map[uint32]string) {
|
||||
dec.symbols = t
|
||||
}
|
||||
|
||||
// Postings returns a postings list for b and its number of elements.
|
||||
func (dec *DecoderV2) Postings(b []byte) (int, Postings, error) {
|
||||
d := decbuf{b: b}
|
||||
n := d.be32int()
|
||||
l := d.get()
|
||||
return n, newBigEndianPostings(l), d.err()
|
||||
}
|
||||
|
||||
// Series decodes a series entry from the given byte slice into lset and chks.
|
||||
func (dec DecoderV2) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
|
||||
*lbls = (*lbls)[:0]
|
||||
*chks = (*chks)[:0]
|
||||
|
||||
|
|
Loading…
Reference in a new issue