From d4779b374c7e6f0a686b27e7c436017bd5d4fce1 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 19 Jan 2017 14:01:38 +0100 Subject: [PATCH] Properly track and write meta file --- block.go | 61 +++++++++++++++++++++++++++++------- compact.go | 12 +++---- db.go | 43 ++++++++++++++++---------- head.go | 91 +++++++++++++++++++++++++++++++----------------------- reader.go | 7 +++-- writer.go | 18 +++++++++-- 6 files changed, 152 insertions(+), 80 deletions(-) diff --git a/block.go b/block.go index 0f02f44225..b15aede0f5 100644 --- a/block.go +++ b/block.go @@ -35,8 +35,11 @@ type Block interface { // BlockMeta provides meta information about a block. type BlockMeta struct { - MinTime int64 `json:"minTime,omitempty"` - MaxTime int64 `json:"maxTime,omitempty"` + // MinTime and MaxTime specify the time range all samples + // in the block must be in. If unset, samples can be appended + // freely until they are set. + MinTime *int64 `json:"minTime,omitempty"` + MaxTime *int64 `json:"maxTime,omitempty"` Stats struct { NumSamples uint64 `json:"numSamples,omitempty"` @@ -61,12 +64,48 @@ type persistedBlock struct { } type blockMeta struct { - Version int `json:"version"` - Meta BlockMeta `json:",inline"` + *BlockMeta + Version int `json:"version"` } const metaFilename = "meta.json" +func readMetaFile(dir string) (*BlockMeta, error) { + b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) + if err != nil { + return nil, err + } + var m blockMeta + + if err := json.Unmarshal(b, &m); err != nil { + return nil, err + } + if m.Version != 1 { + return nil, errors.Errorf("unexpected meta file version %d", m.Version) + } + + return m.BlockMeta, nil +} + +func writeMetaFile(dir string, meta *BlockMeta) error { + f, err := os.Create(filepath.Join(dir, metaFilename)) + if err != nil { + return err + } + + enc := json.NewEncoder(f) + enc.SetIndent("", "\t") + + if err := enc.Encode(&blockMeta{Version: 1, BlockMeta: meta}); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + return nil +} + func newPersistedBlock(dir string) (*persistedBlock, error) { // TODO(fabxc): validate match of name and stats time, validate magic. @@ -89,22 +128,20 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { return nil, errors.Wrap(err, "create index reader") } + meta, err := readMetaFile(dir) + if err != nil { + return nil, err + } + pb := &persistedBlock{ dir: dir, + meta: *meta, chunksf: chunksf, indexf: indexf, chunkr: sr, indexr: ir, } - b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) - if err != nil { - return nil, err - } - if err := json.Unmarshal(b, &pb.meta); err != nil { - return nil, err - } - return pb, nil } diff --git a/compact.go b/compact.go index c0ad7c8e66..948cfb12d9 100644 --- a/compact.go +++ b/compact.go @@ -147,13 +147,10 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { return errors.Wrap(err, "create index file") } - indexw, err := newIndexWriter(indexf) - if err != nil { - return errors.Wrap(err, "open index writer") - } + indexw := newIndexWriter(indexf) chunkw := newSeriesWriter(chunkf, indexw) - if err = c.write(blocks, indexw, chunkw); err != nil { + if err = c.write(dir, blocks, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } @@ -178,7 +175,7 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { return nil } -func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWriter) error { +func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw SeriesWriter) error { var set compactionSet for i, b := range blocks { @@ -260,7 +257,8 @@ func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWrite if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { return err } - return nil + + return writeMetaFile(dir, &meta) } type compactionSet interface { diff --git a/db.go b/db.go index f581a7e2df..8cd1a51c44 100644 --- a/db.go +++ b/db.go @@ -281,7 +281,7 @@ func (db *DB) initBlocks() error { for _, dir := range dirs { if fileutil.Exist(filepath.Join(dir, walFileName)) { - h, err := openHeadBlock(dir, db.logger, nil) + h, err := openHeadBlock(dir, db.logger) if err != nil { return err } @@ -365,7 +365,6 @@ func (a *dbAppender) Add(ref uint64, t int64, v float64) error { if gen != a.gen { return ErrNotFound } - a.db.metrics.samplesAppended.Inc() return a.head.Add(ref, t, v) } @@ -426,13 +425,12 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { for _, b := range db.persisted { m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { + if intervalOverlap(mint, maxt, *m.MinTime, *m.MaxTime) { bs = append(bs, b) } } for _, b := range db.heads { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { + if intervalOverlap(mint, maxt, b.mint, b.maxt) { bs = append(bs, b) } } @@ -443,22 +441,33 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. func (db *DB) cut() (*headBlock, error) { + var mint *int64 + + // If a previous block exists, fix its max time and and take the + // timestamp after as the minimum for the new head. + if len(db.heads) > 0 { + cur := db.heads[len(db.heads)-1] + + cur.metamtx.Lock() + + if cur.meta.MinTime == nil { + mt := cur.mint + cur.meta.MinTime = &mt + } + cur.meta.MaxTime = new(int64) + + mt := cur.maxt + 1 + cur.meta.MaxTime = &mt + mint = &mt + + cur.metamtx.Unlock() + } + dir, err := nextBlockDir(db.dir) if err != nil { return nil, err } - - // If its not the very first head block, all its samples must not be - // larger than - var minTime *int64 - - if len(db.heads) > 0 { - cb := db.heads[len(db.heads)-1] - minTime = new(int64) - *minTime = cb.Meta().MaxTime + 1 - } - - newHead, err := openHeadBlock(dir, db.logger, minTime) + newHead, err := createHeadBlock(dir, db.logger, mint) if err != nil { return nil, err } diff --git a/head.go b/head.go index d0a9c73597..503020e3fc 100644 --- a/head.go +++ b/head.go @@ -1,13 +1,10 @@ package tsdb import ( - "encoding/json" "fmt" - "io/ioutil" "math" "math/rand" "os" - "path/filepath" "sort" "sync" "time" @@ -40,10 +37,7 @@ var ( type headBlock struct { mtx sync.RWMutex dir string - - // Head blocks are initialized with a minimum timestamp if they were preceded - // by another block. Appended samples must not have a smaller timestamp than this. - minTime *int64 + wal *WAL // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. @@ -58,14 +52,24 @@ type headBlock struct { values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms - wal *WAL + metamtx sync.RWMutex + meta BlockMeta + mint, maxt int64 // timestamp range of current samples +} - metamtx sync.RWMutex - meta BlockMeta +func createHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error) { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, err + } + + if err := writeMetaFile(dir, &BlockMeta{MinTime: minTime}); err != nil { + return nil, err + } + return openHeadBlock(dir, l) } // openHeadBlock creates a new empty head block. -func openHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error) { +func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { if err := os.MkdirAll(dir, 0755); err != nil { return nil, err } @@ -74,24 +78,22 @@ func openHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error) if err != nil { return nil, err } + meta, err := readMetaFile(dir) + if err != nil { + return nil, err + } h := &headBlock{ dir: dir, - minTime: minTime, + wal: wal, series: []*memSeries{}, hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, postings: &memPostings{m: make(map[term][]uint32)}, - wal: wal, mapper: newPositionMapper(nil), - } - - b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) - if err != nil { - return nil, err - } - if err := json.Unmarshal(b, &h.meta); err != nil { - return nil, err + meta: *meta, + mint: math.MaxInt64, + maxt: math.MinInt64, } // Replay contents of the write ahead log. @@ -99,23 +101,22 @@ func openHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error) series: func(lset labels.Labels) error { h.create(lset.Hash(), lset) h.meta.Stats.NumSeries++ - return nil }, sample: func(s refdSample) error { h.series[s.ref].append(s.t, s.v) - if h.minTime != nil && s.t < *h.minTime { - return errors.Errorf("sample earlier than minimum timestamp %d", *h.minTime) + if !h.inBounds(s.t) { + return ErrOutOfBounds } - if s.t < h.meta.MinTime { - h.meta.MinTime = s.t + + if s.t < h.mint { + h.mint = s.t } - if s.t > h.meta.MaxTime { - h.meta.MaxTime = s.t + if s.t > h.maxt { + h.maxt = s.t } h.meta.Stats.NumSamples++ - return nil }, }); err != nil { @@ -127,6 +128,12 @@ func openHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error) return h, nil } +// inBounds returns true if the given timestamp is within the valid +// time bounds of the block. +func (h *headBlock) inBounds(t int64) bool { + return h.meta.MinTime != nil && t < *h.meta.MinTime +} + // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { return h.wal.Close() @@ -294,12 +301,6 @@ func (a *headAppender) Commit() error { a.createSeries() - var ( - total = uint64(len(a.samples)) - mint = int64(math.MaxInt64) - maxt = int64(math.MinInt64) - ) - for i := range a.samples { s := &a.samples[i] @@ -315,10 +316,22 @@ func (a *headAppender) Commit() error { return err } + var ( + total = uint64(len(a.samples)) + mint = int64(math.MaxInt64) + maxt = int64(math.MinInt64) + ) + for _, s := range a.samples { if !a.series[s.ref].append(s.t, s.v) { total-- } + if s.t < mint { + mint = s.t + } + if s.t > maxt { + maxt = s.t + } } a.mtx.RUnlock() @@ -329,11 +342,11 @@ func (a *headAppender) Commit() error { a.meta.Stats.NumSamples += total a.meta.Stats.NumSeries += uint64(len(a.newSeries)) - if mint < a.meta.MinTime { - a.meta.MinTime = mint + if mint < a.mint { + a.mint = mint } - if maxt > a.meta.MaxTime { - a.meta.MaxTime = maxt + if maxt > a.maxt { + a.maxt = maxt } return nil diff --git a/reader.go b/reader.go index 5171ee56ef..e79d643374 100644 --- a/reader.go +++ b/reader.go @@ -178,14 +178,17 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) { } func (r *indexReader) lookupSymbol(o uint32) (string, error) { + if int(o) > len(r.b) { + return "", errors.Errorf("invalid symbol offset %d", o) + } l, n := binary.Uvarint(r.b[o:]) if n < 0 { - return "", fmt.Errorf("reading symbol length failed") + return "", errors.New("reading symbol length failed") } end := int(o) + n + int(l) if end > len(r.b) { - return "", fmt.Errorf("invalid length") + return "", errors.New("invalid length") } b := r.b[int(o)+n : end] diff --git a/writer.go b/writer.go index dd794ba180..5f250b02ea 100644 --- a/writer.go +++ b/writer.go @@ -191,15 +191,14 @@ type indexWriter struct { postings []hashEntry // postings lists offsets } -func newIndexWriter(w io.Writer) (*indexWriter, error) { - ix := &indexWriter{ +func newIndexWriter(w io.Writer) *indexWriter { + return &indexWriter{ w: ioutil.NewPageWriter(w, compactionPageBytes, 0), ow: w, n: 0, symbols: make(map[string]uint32, 4096), series: make(map[uint32]*indexWriterSeries, 4096), } - return ix, ix.writeMeta() } func (w *indexWriter) write(wr io.Writer, b []byte) error { @@ -336,6 +335,19 @@ func (w *indexWriter) writeSeries() error { } func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { + if !w.started { + if err := w.writeMeta(); err != nil { + return err + } + if err := w.writeSymbols(); err != nil { + return err + } + if err := w.writeSeries(); err != nil { + return err + } + w.started = true + } + valt, err := newStringTuples(values, len(names)) if err != nil { return err