mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
448 lines
10 KiB
Go
448 lines
10 KiB
Go
package tsdb
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/fabxc/tsdb/chunks"
|
|
"github.com/fabxc/tsdb/labels"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// SeriesReader provides reading access of serialized time series data.
|
|
type SeriesReader interface {
|
|
// Chunk returns the series data chunk with the given reference.
|
|
Chunk(ref uint32) (chunks.Chunk, error)
|
|
}
|
|
|
|
// seriesReader implements a SeriesReader for a serialized byte stream
|
|
// of series data.
|
|
type seriesReader struct {
|
|
// The underlying byte slice holding the encoded series data.
|
|
b []byte
|
|
}
|
|
|
|
func newSeriesReader(b []byte) (*seriesReader, error) {
|
|
// Verify magic number.
|
|
if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries {
|
|
return nil, fmt.Errorf("invalid magic number %x", m)
|
|
}
|
|
return &seriesReader{b: b}, nil
|
|
}
|
|
|
|
func (s *seriesReader) Chunk(offset uint32) (chunks.Chunk, error) {
|
|
if int(offset) > len(s.b) {
|
|
return nil, errors.Errorf("offset %d beyond data size %d", offset, len(s.b))
|
|
}
|
|
b := s.b[offset:]
|
|
|
|
l, n := binary.Uvarint(b)
|
|
if n < 0 {
|
|
return nil, fmt.Errorf("reading chunk length failed")
|
|
}
|
|
b = b[n:]
|
|
enc := chunks.Encoding(b[0])
|
|
|
|
c, err := chunks.FromData(enc, b[1:1+l])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// IndexReader provides reading access of serialized index data.
|
|
type IndexReader interface {
|
|
// Stats returns statisitics about the indexed data.
|
|
Stats() (BlockStats, error)
|
|
|
|
// LabelValues returns the possible label values
|
|
LabelValues(names ...string) (StringTuples, error)
|
|
|
|
// Postings returns the postings list iterator for the label pair.
|
|
Postings(name, value string) (Postings, error)
|
|
|
|
// Series returns the series for the given reference.
|
|
Series(ref uint32, mint, maxt int64) (Series, error)
|
|
}
|
|
|
|
// StringTuples provides access to a sorted list of string tuples.
|
|
type StringTuples interface {
|
|
// Total number of tuples in the list.
|
|
Len() int
|
|
// At returns the tuple at position i.
|
|
At(i int) ([]string, error)
|
|
}
|
|
|
|
type indexReader struct {
|
|
series SeriesReader
|
|
|
|
// The underlying byte slice holding the encoded series data.
|
|
b []byte
|
|
|
|
// Cached hashmaps of section offsets.
|
|
labels map[string]uint32
|
|
postings map[string]uint32
|
|
}
|
|
|
|
var (
|
|
errInvalidSize = fmt.Errorf("invalid size")
|
|
errInvalidFlag = fmt.Errorf("invalid flag")
|
|
errNotFound = fmt.Errorf("not found")
|
|
)
|
|
|
|
func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) {
|
|
if len(b) < 16 {
|
|
return nil, errors.Wrap(errInvalidSize, "index header")
|
|
}
|
|
r := &indexReader{
|
|
series: s,
|
|
b: b,
|
|
}
|
|
|
|
// Verify magic number.
|
|
if m := binary.BigEndian.Uint32(b[:4]); m != MagicIndex {
|
|
return nil, fmt.Errorf("invalid magic number %x", m)
|
|
}
|
|
|
|
var err error
|
|
// The last two 4 bytes hold the pointers to the hashmaps.
|
|
loff := binary.BigEndian.Uint32(b[len(b)-8 : len(b)-4])
|
|
poff := binary.BigEndian.Uint32(b[len(b)-4:])
|
|
|
|
f, b, err := r.section(loff)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "label index hashmap section at %d", loff)
|
|
}
|
|
if r.labels, err = readHashmap(f, b); err != nil {
|
|
return nil, errors.Wrap(err, "read label index hashmap")
|
|
}
|
|
f, b, err = r.section(poff)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "postings hashmap section at %d", loff)
|
|
}
|
|
if r.postings, err = readHashmap(f, b); err != nil {
|
|
return nil, errors.Wrap(err, "read postings hashmap")
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
func readHashmap(flag byte, b []byte) (map[string]uint32, error) {
|
|
if flag != flagStd {
|
|
return nil, errInvalidFlag
|
|
}
|
|
h := make(map[string]uint32, 512)
|
|
|
|
for len(b) > 0 {
|
|
l, n := binary.Uvarint(b)
|
|
if n < 1 {
|
|
return nil, errors.Wrap(errInvalidSize, "read key length")
|
|
}
|
|
b = b[n:]
|
|
|
|
if len(b) < int(l) {
|
|
return nil, errors.Wrap(errInvalidSize, "read key")
|
|
}
|
|
s := string(b[:l])
|
|
b = b[l:]
|
|
|
|
o, n := binary.Uvarint(b)
|
|
if n < 1 {
|
|
return nil, errors.Wrap(errInvalidSize, "read offset value")
|
|
}
|
|
b = b[n:]
|
|
|
|
h[s] = uint32(o)
|
|
}
|
|
|
|
return h, nil
|
|
}
|
|
|
|
func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
|
b := r.b[o:]
|
|
|
|
if len(b) < 5 {
|
|
return 0, nil, errors.Wrap(errInvalidSize, "read header")
|
|
}
|
|
|
|
flag := b[0]
|
|
l := binary.BigEndian.Uint32(b[1:5])
|
|
|
|
b = b[5:]
|
|
|
|
// b must have the given length plus 4 bytes for the CRC32 checksum.
|
|
if len(b) < int(l)+4 {
|
|
return 0, nil, errors.Wrap(errInvalidSize, "section content")
|
|
}
|
|
return flag, b[:l], nil
|
|
}
|
|
|
|
func (r *indexReader) lookupSymbol(o uint32) ([]byte, error) {
|
|
l, n := binary.Uvarint(r.b[o:])
|
|
if n < 0 {
|
|
return nil, fmt.Errorf("reading symbol length failed")
|
|
}
|
|
|
|
end := int(o) + n + int(l)
|
|
if end > len(r.b) {
|
|
return nil, fmt.Errorf("invalid length")
|
|
}
|
|
|
|
return r.b[int(o)+n : end], nil
|
|
}
|
|
|
|
func (r *indexReader) Stats() (BlockStats, error) {
|
|
flag, b, err := r.section(8)
|
|
if err != nil {
|
|
return BlockStats{}, err
|
|
}
|
|
if flag != flagStd {
|
|
return BlockStats{}, errInvalidFlag
|
|
}
|
|
|
|
if len(b) != 64 {
|
|
return BlockStats{}, errInvalidSize
|
|
}
|
|
|
|
return BlockStats{
|
|
MinTime: int64(binary.BigEndian.Uint64(b)),
|
|
MaxTime: int64(binary.BigEndian.Uint64(b[8:])),
|
|
SeriesCount: binary.BigEndian.Uint32(b[16:]),
|
|
ChunkCount: binary.BigEndian.Uint32(b[20:]),
|
|
SampleCount: binary.BigEndian.Uint64(b[24:]),
|
|
}, nil
|
|
}
|
|
|
|
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
|
key := strings.Join(names, string(sep))
|
|
off, ok := r.labels[key]
|
|
if !ok {
|
|
return nil, fmt.Errorf("label index doesn't exist")
|
|
}
|
|
|
|
flag, b, err := r.section(off)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "section at %d", off)
|
|
}
|
|
if flag != flagStd {
|
|
return nil, errInvalidFlag
|
|
}
|
|
l, n := binary.Uvarint(b)
|
|
if n < 1 {
|
|
return nil, errors.Wrap(errInvalidSize, "read label index size")
|
|
}
|
|
|
|
st := &serializedStringTuples{
|
|
l: int(l),
|
|
b: b[n:],
|
|
lookup: r.lookupSymbol,
|
|
}
|
|
return st, nil
|
|
}
|
|
|
|
func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
|
|
k, n := binary.Uvarint(r.b[ref:])
|
|
if n < 1 {
|
|
return nil, errors.Wrap(errInvalidSize, "number of labels")
|
|
}
|
|
|
|
b := r.b[int(ref)+n:]
|
|
offsets := make([]uint32, 0, 2*k)
|
|
|
|
for i := 0; i < 2*int(k); i++ {
|
|
o, n := binary.Uvarint(b)
|
|
if n < 1 {
|
|
return nil, errors.Wrap(errInvalidSize, "symbol offset")
|
|
}
|
|
offsets = append(offsets, uint32(o))
|
|
|
|
b = b[n:]
|
|
}
|
|
// Symbol offests must occur in pairs representing name and value.
|
|
if len(offsets)&1 != 0 {
|
|
return nil, errors.New("odd number of symbol references")
|
|
}
|
|
|
|
// TODO(fabxc): Fully materialize series symbols for now. Figure out later if it
|
|
// makes sense to decode those lazily.
|
|
// If we use unsafe strings the there'll be no copy overhead.
|
|
//
|
|
// The references are expected to be sorted and match the order of
|
|
// the underlying strings.
|
|
lbls := make(labels.Labels, 0, k)
|
|
|
|
for i := 0; i < len(offsets); i += 2 {
|
|
n, err := r.lookupSymbol(offsets[i])
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "symbol lookup")
|
|
}
|
|
v, err := r.lookupSymbol(offsets[i+1])
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "symbol lookup")
|
|
}
|
|
lbls = append(lbls, labels.Label{
|
|
Name: string(n),
|
|
Value: string(v),
|
|
})
|
|
}
|
|
|
|
// Read the chunks meta data.
|
|
k, n = binary.Uvarint(b)
|
|
if n < 1 {
|
|
return nil, errors.Wrap(errInvalidSize, "number of chunks")
|
|
}
|
|
|
|
b = b[n:]
|
|
chunks := make([]ChunkMeta, 0, k)
|
|
|
|
for i := 0; i < int(k); i++ {
|
|
firstTime, n := binary.Varint(b)
|
|
if n < 1 {
|
|
return nil, errors.Wrap(errInvalidSize, "first time")
|
|
}
|
|
b = b[n:]
|
|
|
|
// Terminate early if we exceeded the queried time range.
|
|
if firstTime > maxt {
|
|
break
|
|
}
|
|
|
|
lastTime, n := binary.Varint(b)
|
|
if n < 1 {
|
|
return nil, errors.Wrap(errInvalidSize, "last time")
|
|
}
|
|
b = b[n:]
|
|
|
|
o, n := binary.Uvarint(b)
|
|
if n < 1 {
|
|
return nil, errors.Wrap(errInvalidSize, "chunk offset")
|
|
}
|
|
b = b[n:]
|
|
|
|
// Skip the chunk if it is before the queried time range.
|
|
if lastTime < mint {
|
|
continue
|
|
}
|
|
|
|
chunks = append(chunks, ChunkMeta{
|
|
Ref: uint32(o),
|
|
MinTime: firstTime,
|
|
MaxTime: lastTime,
|
|
})
|
|
}
|
|
// If no chunks applicable to the time range were found, the series
|
|
// can be skipped.
|
|
if len(chunks) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
return &chunkSeries{
|
|
labels: lbls,
|
|
chunks: chunks,
|
|
chunk: r.series.Chunk,
|
|
}, nil
|
|
}
|
|
|
|
func (r *indexReader) Postings(name, value string) (Postings, error) {
|
|
key := name + string(sep) + value
|
|
|
|
off, ok := r.postings[key]
|
|
if !ok {
|
|
return nil, errNotFound
|
|
}
|
|
|
|
flag, b, err := r.section(off)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "section at %d", off)
|
|
}
|
|
|
|
if flag != flagStd {
|
|
return nil, errors.Wrapf(errInvalidFlag, "section at %d", off)
|
|
}
|
|
|
|
// TODO(fabxc): just read into memory as an intermediate solution.
|
|
// Add iterator over serialized data.
|
|
var l []uint32
|
|
|
|
for len(b) > 0 {
|
|
if len(b) < 4 {
|
|
return nil, errors.Wrap(errInvalidSize, "plain postings entry")
|
|
}
|
|
l = append(l, binary.BigEndian.Uint32(b[:4]))
|
|
|
|
b = b[4:]
|
|
}
|
|
|
|
return &listPostings{list: l, idx: -1}, nil
|
|
}
|
|
|
|
type stringTuples struct {
|
|
l int // tuple length
|
|
s []string // flattened tuple entries
|
|
}
|
|
|
|
func newStringTuples(s []string, l int) (*stringTuples, error) {
|
|
if len(s)%l != 0 {
|
|
return nil, errors.Wrap(errInvalidSize, "string tuple list")
|
|
}
|
|
return &stringTuples{s: s, l: l}, nil
|
|
}
|
|
|
|
func (t *stringTuples) Len() int { return len(t.s) / t.l }
|
|
func (t *stringTuples) At(i int) ([]string, error) { return t.s[i : i+t.l], nil }
|
|
|
|
func (t *stringTuples) Swap(i, j int) {
|
|
c := make([]string, t.l)
|
|
copy(c, t.s[i:i+t.l])
|
|
|
|
for k := 0; k < t.l; k++ {
|
|
t.s[i+k] = t.s[j+k]
|
|
t.s[j+k] = c[k]
|
|
}
|
|
}
|
|
|
|
func (t *stringTuples) Less(i, j int) bool {
|
|
for k := 0; k < t.l; k++ {
|
|
d := strings.Compare(t.s[i+k], t.s[j+k])
|
|
|
|
if d < 0 {
|
|
return true
|
|
}
|
|
if d > 0 {
|
|
return false
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
type serializedStringTuples struct {
|
|
l int
|
|
b []byte
|
|
lookup func(uint32) ([]byte, error)
|
|
}
|
|
|
|
func (t *serializedStringTuples) Len() int {
|
|
// TODO(fabxc): Cache this?
|
|
return len(t.b) / (4 * t.l)
|
|
}
|
|
|
|
func (t *serializedStringTuples) At(i int) ([]string, error) {
|
|
if len(t.b) < (i+t.l)*4 {
|
|
return nil, errInvalidSize
|
|
}
|
|
res := make([]string, 0, t.l)
|
|
|
|
for k := 0; k < t.l; k++ {
|
|
offset := binary.BigEndian.Uint32(t.b[(i+k)*4:])
|
|
|
|
b, err := t.lookup(offset)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "symbol lookup")
|
|
}
|
|
res = append(res, string(b))
|
|
}
|
|
|
|
return res, nil
|
|
}
|