From 00a503129bd5a25ad1895be0469fc52833024cb8 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 19 Dec 2016 22:29:49 +0100 Subject: [PATCH] Use contextualized and traced errors in reader --- block.go | 10 +++++++++ db.go | 6 +++++- reader.go | 64 ++++++++++++++++++++++++++++++------------------------- writer.go | 8 ------- 4 files changed, 50 insertions(+), 38 deletions(-) diff --git a/block.go b/block.go index acfd4c733..ee3eafcb1 100644 --- a/block.go +++ b/block.go @@ -15,6 +15,14 @@ type block interface { interval() (int64, int64) } +type BlockStats struct { + MinTime int64 + MaxTime int64 + SampleCount uint64 + SeriesCount uint32 + ChunkCount uint32 +} + const ( flagNone = 0 flagStd = 1 @@ -33,6 +41,7 @@ func newPersistedBlock(path string) (*persistedBlock, error) { // The directory must be named after the base timestamp for the block. // TODO(fabxc): validate match of name and stats time, validate magic. + fmt.Println("new persisted block", path) // mmap files belonging to the block. chunksf, err := openMmapFile(chunksFileName(path)) if err != nil { @@ -56,6 +65,7 @@ func newPersistedBlock(path string) (*persistedBlock, error) { if err != nil { return nil, err } + fmt.Println("initialized new persisted block with", stats) pb := &persistedBlock{ chunksf: chunksf, diff --git a/db.go b/db.go index a1ac86269..d9dd1cd1f 100644 --- a/db.go +++ b/db.go @@ -275,7 +275,11 @@ func (s *Shard) appendBatch(ts int64, samples []Sample) error { if s.head.stats.SampleCount/uint64(s.head.stats.ChunkCount) > 400 { select { case s.persistCh <- struct{}{}: - go s.persist() + go func() { + if err := s.persist(); err != nil { + s.logger.Log("msg", "persistance error", "err", err) + } + }() default: } } diff --git a/reader.go b/reader.go index f35ea6f74..e940c851c 100644 --- a/reader.go +++ b/reader.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/fabxc/tsdb/chunks" + "github.com/pkg/errors" ) // SeriesReader provides reading access of serialized time series data. @@ -88,7 +89,7 @@ var ( func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) { if len(b) < 16 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "index header") } r := &indexReader{ series: s, @@ -105,20 +106,25 @@ func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) { loff := binary.BigEndian.Uint32(b[len(b)-8 : len(b)-4]) poff := binary.BigEndian.Uint32(b[len(b)-4:]) - if r.labels, err = readHashmap(r.section(loff)); err != nil { - return nil, err + f, b, err := r.section(loff) + if err != nil { + return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) } - if r.postings, err = readHashmap(r.section(poff)); err != nil { - return nil, err + if r.labels, err = readHashmap(f, b); err != nil { + return nil, errors.Wrap(err, "read label index hashmap") + } + f, b, err = r.section(poff) + if err != nil { + return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) + } + if r.postings, err = readHashmap(f, b); err != nil { + return nil, errors.Wrap(err, "read postings hashmap") } return r, nil } -func readHashmap(flag byte, b []byte, err error) (map[string]uint32, error) { - if err != nil { - return nil, err - } +func readHashmap(flag byte, b []byte) (map[string]uint32, error) { if flag != flagStd { return nil, errInvalidFlag } @@ -127,19 +133,19 @@ func readHashmap(flag byte, b []byte, err error) (map[string]uint32, error) { for len(b) > 0 { l, n := binary.Uvarint(b) if n < 1 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "read key length") } b = b[n:] if len(b) < int(l) { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "read key") } s := string(b[:l]) b = b[l:] o, n := binary.Uvarint(b) if n < 1 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "read offset value") } b = b[n:] @@ -153,7 +159,7 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) { b := r.b[o:] if len(b) < 5 { - return 0, nil, errInvalidSize + return 0, nil, errors.Wrap(errInvalidSize, "read header") } flag := b[0] @@ -163,7 +169,7 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) { // b must have the given length plus 4 bytes for the CRC32 checksum. if len(b) < int(l)+4 { - return 0, nil, errInvalidSize + return 0, nil, errors.Wrap(errInvalidSize, "section content") } return flag, b[:l], nil } @@ -213,14 +219,14 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { flag, b, err := r.section(off) if err != nil { - return nil, fmt.Errorf("section: %s", err) + return nil, errors.Wrapf(err, "section at %d", off) } if flag != flagStd { return nil, errInvalidFlag } l, n := binary.Uvarint(b) if n < 1 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "read label index size") } st := &serializedStringTuples{ @@ -234,7 +240,7 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { k, n := binary.Uvarint(r.b[ref:]) if n < 1 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "number of labels") } b := r.b[int(ref)+n:] @@ -243,7 +249,7 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { for i := 0; i < int(k); i++ { o, n := binary.Uvarint(b) if n < 1 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "symbol offset") } offsets = append(offsets, uint32(o)) @@ -251,7 +257,7 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { } // Symbol offests must occur in pairs representing name and value. if len(offsets)&1 != 0 { - return nil, errInvalidSize + return nil, errors.New("odd number of symbol references") } // TODO(fabxc): Fully materialize series symbols for now. Figure out later if it @@ -265,11 +271,11 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { for i := 0; i < int(k); i += 2 { n, err := r.lookupSymbol(offsets[i]) if err != nil { - return nil, err + return nil, errors.Wrap(err, "symbol lookup") } v, err := r.lookupSymbol(offsets[i+1]) if err != nil { - return nil, err + return nil, errors.Wrap(err, "symbol lookup") } labels = append(labels, Label{ Name: string(n), @@ -280,7 +286,7 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { // Read the chunks meta data. k, n = binary.Uvarint(b) if n < 1 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "number of chunks") } b = b[n:] @@ -289,7 +295,7 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { for i := 0; i < int(k); i++ { firstTime, n := binary.Varint(b) if n < 1 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "first time") } b = b[n:] @@ -300,13 +306,13 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { lastTime, n := binary.Varint(b) if n < 1 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "last time") } b = b[n:] o, n := binary.Uvarint(b) if n < 1 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "chunk offset") } b = b[n:] @@ -344,11 +350,11 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { flag, b, err := r.section(off) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "section at %d", off) } if flag != flagStd { - return nil, errInvalidFlag + return nil, errors.Wrapf(errInvalidFlag, "section at %d", off) } // TODO(fabxc): just read into memory as an intermediate solution. @@ -357,7 +363,7 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { for len(b) > 0 { if len(b) < 4 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "plain postings entry") } l = append(l, binary.BigEndian.Uint32(b[:4])) @@ -374,7 +380,7 @@ type stringTuples struct { func newStringTuples(s []string, l int) (*stringTuples, error) { if len(s)%l != 0 { - return nil, errInvalidSize + return nil, errors.Wrap(errInvalidSize, "string tuple list") } return &stringTuples{s: s, l: l}, nil } diff --git a/writer.go b/writer.go index 72dbdf8a3..fdb506885 100644 --- a/writer.go +++ b/writer.go @@ -146,14 +146,6 @@ type ChunkMeta struct { MaxTime int64 } -type BlockStats struct { - MinTime int64 - MaxTime int64 - SampleCount uint64 - SeriesCount uint32 - ChunkCount uint32 -} - // IndexWriter serialized the index for a block of series data. // The methods must generally be called in order they are specified. type IndexWriter interface {