mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 06:04:05 -08:00
Use contextualized and traced errors in reader
This commit is contained in:
parent
282d9ae6e2
commit
00a503129b
10
block.go
10
block.go
|
@ -15,6 +15,14 @@ type block interface {
|
||||||
interval() (int64, int64)
|
interval() (int64, int64)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type BlockStats struct {
|
||||||
|
MinTime int64
|
||||||
|
MaxTime int64
|
||||||
|
SampleCount uint64
|
||||||
|
SeriesCount uint32
|
||||||
|
ChunkCount uint32
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
flagNone = 0
|
flagNone = 0
|
||||||
flagStd = 1
|
flagStd = 1
|
||||||
|
@ -33,6 +41,7 @@ func newPersistedBlock(path string) (*persistedBlock, error) {
|
||||||
// The directory must be named after the base timestamp for the block.
|
// The directory must be named after the base timestamp for the block.
|
||||||
// TODO(fabxc): validate match of name and stats time, validate magic.
|
// TODO(fabxc): validate match of name and stats time, validate magic.
|
||||||
|
|
||||||
|
fmt.Println("new persisted block", path)
|
||||||
// mmap files belonging to the block.
|
// mmap files belonging to the block.
|
||||||
chunksf, err := openMmapFile(chunksFileName(path))
|
chunksf, err := openMmapFile(chunksFileName(path))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -56,6 +65,7 @@ func newPersistedBlock(path string) (*persistedBlock, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
fmt.Println("initialized new persisted block with", stats)
|
||||||
|
|
||||||
pb := &persistedBlock{
|
pb := &persistedBlock{
|
||||||
chunksf: chunksf,
|
chunksf: chunksf,
|
||||||
|
|
6
db.go
6
db.go
|
@ -275,7 +275,11 @@ func (s *Shard) appendBatch(ts int64, samples []Sample) error {
|
||||||
if s.head.stats.SampleCount/uint64(s.head.stats.ChunkCount) > 400 {
|
if s.head.stats.SampleCount/uint64(s.head.stats.ChunkCount) > 400 {
|
||||||
select {
|
select {
|
||||||
case s.persistCh <- struct{}{}:
|
case s.persistCh <- struct{}{}:
|
||||||
go s.persist()
|
go func() {
|
||||||
|
if err := s.persist(); err != nil {
|
||||||
|
s.logger.Log("msg", "persistance error", "err", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
64
reader.go
64
reader.go
|
@ -6,6 +6,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/fabxc/tsdb/chunks"
|
"github.com/fabxc/tsdb/chunks"
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SeriesReader provides reading access of serialized time series data.
|
// SeriesReader provides reading access of serialized time series data.
|
||||||
|
@ -88,7 +89,7 @@ var (
|
||||||
|
|
||||||
func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) {
|
func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) {
|
||||||
if len(b) < 16 {
|
if len(b) < 16 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "index header")
|
||||||
}
|
}
|
||||||
r := &indexReader{
|
r := &indexReader{
|
||||||
series: s,
|
series: s,
|
||||||
|
@ -105,20 +106,25 @@ func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) {
|
||||||
loff := binary.BigEndian.Uint32(b[len(b)-8 : len(b)-4])
|
loff := binary.BigEndian.Uint32(b[len(b)-8 : len(b)-4])
|
||||||
poff := binary.BigEndian.Uint32(b[len(b)-4:])
|
poff := binary.BigEndian.Uint32(b[len(b)-4:])
|
||||||
|
|
||||||
if r.labels, err = readHashmap(r.section(loff)); err != nil {
|
f, b, err := r.section(loff)
|
||||||
return nil, err
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "label index hashmap section at %d", loff)
|
||||||
}
|
}
|
||||||
if r.postings, err = readHashmap(r.section(poff)); err != nil {
|
if r.labels, err = readHashmap(f, b); err != nil {
|
||||||
return nil, err
|
return nil, errors.Wrap(err, "read label index hashmap")
|
||||||
|
}
|
||||||
|
f, b, err = r.section(poff)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "postings hashmap section at %d", loff)
|
||||||
|
}
|
||||||
|
if r.postings, err = readHashmap(f, b); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "read postings hashmap")
|
||||||
}
|
}
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func readHashmap(flag byte, b []byte, err error) (map[string]uint32, error) {
|
func readHashmap(flag byte, b []byte) (map[string]uint32, error) {
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if flag != flagStd {
|
if flag != flagStd {
|
||||||
return nil, errInvalidFlag
|
return nil, errInvalidFlag
|
||||||
}
|
}
|
||||||
|
@ -127,19 +133,19 @@ func readHashmap(flag byte, b []byte, err error) (map[string]uint32, error) {
|
||||||
for len(b) > 0 {
|
for len(b) > 0 {
|
||||||
l, n := binary.Uvarint(b)
|
l, n := binary.Uvarint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "read key length")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
|
|
||||||
if len(b) < int(l) {
|
if len(b) < int(l) {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "read key")
|
||||||
}
|
}
|
||||||
s := string(b[:l])
|
s := string(b[:l])
|
||||||
b = b[l:]
|
b = b[l:]
|
||||||
|
|
||||||
o, n := binary.Uvarint(b)
|
o, n := binary.Uvarint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "read offset value")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
|
|
||||||
|
@ -153,7 +159,7 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
||||||
b := r.b[o:]
|
b := r.b[o:]
|
||||||
|
|
||||||
if len(b) < 5 {
|
if len(b) < 5 {
|
||||||
return 0, nil, errInvalidSize
|
return 0, nil, errors.Wrap(errInvalidSize, "read header")
|
||||||
}
|
}
|
||||||
|
|
||||||
flag := b[0]
|
flag := b[0]
|
||||||
|
@ -163,7 +169,7 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
||||||
|
|
||||||
// b must have the given length plus 4 bytes for the CRC32 checksum.
|
// b must have the given length plus 4 bytes for the CRC32 checksum.
|
||||||
if len(b) < int(l)+4 {
|
if len(b) < int(l)+4 {
|
||||||
return 0, nil, errInvalidSize
|
return 0, nil, errors.Wrap(errInvalidSize, "section content")
|
||||||
}
|
}
|
||||||
return flag, b[:l], nil
|
return flag, b[:l], nil
|
||||||
}
|
}
|
||||||
|
@ -213,14 +219,14 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
||||||
|
|
||||||
flag, b, err := r.section(off)
|
flag, b, err := r.section(off)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("section: %s", err)
|
return nil, errors.Wrapf(err, "section at %d", off)
|
||||||
}
|
}
|
||||||
if flag != flagStd {
|
if flag != flagStd {
|
||||||
return nil, errInvalidFlag
|
return nil, errInvalidFlag
|
||||||
}
|
}
|
||||||
l, n := binary.Uvarint(b)
|
l, n := binary.Uvarint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "read label index size")
|
||||||
}
|
}
|
||||||
|
|
||||||
st := &serializedStringTuples{
|
st := &serializedStringTuples{
|
||||||
|
@ -234,7 +240,7 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
||||||
func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
|
func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
|
||||||
k, n := binary.Uvarint(r.b[ref:])
|
k, n := binary.Uvarint(r.b[ref:])
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "number of labels")
|
||||||
}
|
}
|
||||||
|
|
||||||
b := r.b[int(ref)+n:]
|
b := r.b[int(ref)+n:]
|
||||||
|
@ -243,7 +249,7 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
|
||||||
for i := 0; i < int(k); i++ {
|
for i := 0; i < int(k); i++ {
|
||||||
o, n := binary.Uvarint(b)
|
o, n := binary.Uvarint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "symbol offset")
|
||||||
}
|
}
|
||||||
offsets = append(offsets, uint32(o))
|
offsets = append(offsets, uint32(o))
|
||||||
|
|
||||||
|
@ -251,7 +257,7 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
|
||||||
}
|
}
|
||||||
// Symbol offests must occur in pairs representing name and value.
|
// Symbol offests must occur in pairs representing name and value.
|
||||||
if len(offsets)&1 != 0 {
|
if len(offsets)&1 != 0 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.New("odd number of symbol references")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(fabxc): Fully materialize series symbols for now. Figure out later if it
|
// TODO(fabxc): Fully materialize series symbols for now. Figure out later if it
|
||||||
|
@ -265,11 +271,11 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
|
||||||
for i := 0; i < int(k); i += 2 {
|
for i := 0; i < int(k); i += 2 {
|
||||||
n, err := r.lookupSymbol(offsets[i])
|
n, err := r.lookupSymbol(offsets[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errors.Wrap(err, "symbol lookup")
|
||||||
}
|
}
|
||||||
v, err := r.lookupSymbol(offsets[i+1])
|
v, err := r.lookupSymbol(offsets[i+1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errors.Wrap(err, "symbol lookup")
|
||||||
}
|
}
|
||||||
labels = append(labels, Label{
|
labels = append(labels, Label{
|
||||||
Name: string(n),
|
Name: string(n),
|
||||||
|
@ -280,7 +286,7 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
|
||||||
// Read the chunks meta data.
|
// Read the chunks meta data.
|
||||||
k, n = binary.Uvarint(b)
|
k, n = binary.Uvarint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "number of chunks")
|
||||||
}
|
}
|
||||||
|
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
|
@ -289,7 +295,7 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
|
||||||
for i := 0; i < int(k); i++ {
|
for i := 0; i < int(k); i++ {
|
||||||
firstTime, n := binary.Varint(b)
|
firstTime, n := binary.Varint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "first time")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
|
|
||||||
|
@ -300,13 +306,13 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
|
||||||
|
|
||||||
lastTime, n := binary.Varint(b)
|
lastTime, n := binary.Varint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "last time")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
|
|
||||||
o, n := binary.Uvarint(b)
|
o, n := binary.Uvarint(b)
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "chunk offset")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
|
|
||||||
|
@ -344,11 +350,11 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
|
||||||
|
|
||||||
flag, b, err := r.section(off)
|
flag, b, err := r.section(off)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errors.Wrapf(err, "section at %d", off)
|
||||||
}
|
}
|
||||||
|
|
||||||
if flag != flagStd {
|
if flag != flagStd {
|
||||||
return nil, errInvalidFlag
|
return nil, errors.Wrapf(errInvalidFlag, "section at %d", off)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(fabxc): just read into memory as an intermediate solution.
|
// TODO(fabxc): just read into memory as an intermediate solution.
|
||||||
|
@ -357,7 +363,7 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
|
||||||
|
|
||||||
for len(b) > 0 {
|
for len(b) > 0 {
|
||||||
if len(b) < 4 {
|
if len(b) < 4 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "plain postings entry")
|
||||||
}
|
}
|
||||||
l = append(l, binary.BigEndian.Uint32(b[:4]))
|
l = append(l, binary.BigEndian.Uint32(b[:4]))
|
||||||
|
|
||||||
|
@ -374,7 +380,7 @@ type stringTuples struct {
|
||||||
|
|
||||||
func newStringTuples(s []string, l int) (*stringTuples, error) {
|
func newStringTuples(s []string, l int) (*stringTuples, error) {
|
||||||
if len(s)%l != 0 {
|
if len(s)%l != 0 {
|
||||||
return nil, errInvalidSize
|
return nil, errors.Wrap(errInvalidSize, "string tuple list")
|
||||||
}
|
}
|
||||||
return &stringTuples{s: s, l: l}, nil
|
return &stringTuples{s: s, l: l}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,14 +146,6 @@ type ChunkMeta struct {
|
||||||
MaxTime int64
|
MaxTime int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type BlockStats struct {
|
|
||||||
MinTime int64
|
|
||||||
MaxTime int64
|
|
||||||
SampleCount uint64
|
|
||||||
SeriesCount uint32
|
|
||||||
ChunkCount uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
// IndexWriter serialized the index for a block of series data.
|
// IndexWriter serialized the index for a block of series data.
|
||||||
// The methods must generally be called in order they are specified.
|
// The methods must generally be called in order they are specified.
|
||||||
type IndexWriter interface {
|
type IndexWriter interface {
|
||||||
|
|
Loading…
Reference in a new issue