From 9ddbd64d007858dbc6bc9ce6d1f1824796b1e636 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 19 Jan 2017 11:22:47 +0100 Subject: [PATCH] Move stats into meta.json file, cleanup, docs --- block.go | 62 +++++++++++++++++-------- compact.go | 27 ++++++----- db.go | 130 ++++++++++++++++++++++------------------------------ head.go | 125 +++++++++++++++++++++++++++++--------------------- reader.go | 25 ---------- wal.go | 12 +++-- wal_test.go | 10 +++- writer.go | 49 +++----------------- 8 files changed, 207 insertions(+), 233 deletions(-) diff --git a/block.go b/block.go index 410f490c13..0f02f44225 100644 --- a/block.go +++ b/block.go @@ -1,10 +1,11 @@ package tsdb import ( + "encoding/json" + "io/ioutil" "os" "path/filepath" "sort" - "sync" "github.com/coreos/etcd/pkg/fileutil" "github.com/pkg/errors" @@ -12,23 +13,36 @@ import ( // Block handles reads against a Block of time series data. type Block interface { + // Directory where block data is stored. Dir() string - Stats() BlockStats + + // Stats returns statistics about the block. + Meta() BlockMeta + + // Index returns an IndexReader over the block's data. Index() IndexReader + + // Series returns a SeriesReader over the block's data. Series() SeriesReader + + // Persisted returns whether the block is already persisted, + // and no longer being appended to. Persisted() bool + + // Close releases all underlying resources of the block. Close() error } -// BlockStats provides stats on a data block. -type BlockStats struct { - MinTime, MaxTime int64 // time range of samples in the block +// BlockMeta provides meta information about a block. +type BlockMeta struct { + MinTime int64 `json:"minTime,omitempty"` + MaxTime int64 `json:"maxTime,omitempty"` - SampleCount uint64 - SeriesCount uint64 - ChunkCount uint64 - - mtx sync.RWMutex + Stats struct { + NumSamples uint64 `json:"numSamples,omitempty"` + NumSeries uint64 `json:"numSeries,omitempty"` + NumChunks uint64 `json:"numChunks,omitempty"` + } `json:"stats,omitempty"` } const ( @@ -37,8 +51,8 @@ const ( ) type persistedBlock struct { - dir string - stats *BlockStats + dir string + meta BlockMeta chunksf, indexf *mmapFile @@ -46,6 +60,13 @@ type persistedBlock struct { indexr *indexReader } +type blockMeta struct { + Version int `json:"version"` + Meta BlockMeta `json:",inline"` +} + +const metaFilename = "meta.json" + func newPersistedBlock(dir string) (*persistedBlock, error) { // TODO(fabxc): validate match of name and stats time, validate magic. @@ -68,19 +89,22 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { return nil, errors.Wrap(err, "create index reader") } - stats, err := ir.Stats() - if err != nil { - return nil, errors.Wrap(err, "read stats") - } - pb := &persistedBlock{ dir: dir, chunksf: chunksf, indexf: indexf, chunkr: sr, indexr: ir, - stats: &stats, } + + b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) + if err != nil { + return nil, err + } + if err := json.Unmarshal(b, &pb.meta); err != nil { + return nil, err + } + return pb, nil } @@ -98,7 +122,7 @@ func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Persisted() bool { return true } func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Series() SeriesReader { return pb.chunkr } -func (pb *persistedBlock) Stats() BlockStats { return *pb.stats } +func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } func chunksFileName(path string) string { return filepath.Join(path, "chunks-000") diff --git a/compact.go b/compact.go index ca63ceeeae..c0ad7c8e66 100644 --- a/compact.go +++ b/compact.go @@ -93,11 +93,11 @@ func compactionMatch(blocks []Block) bool { // Naively check whether both blocks have roughly the same number of samples // and whether the total sample count doesn't exceed 2GB chunk file size // by rough approximation. - n := float64(blocks[0].Stats().SampleCount) + n := float64(blocks[0].Meta().Stats.NumSamples) t := n for _, b := range blocks[1:] { - m := float64(b.Stats().SampleCount) + m := float64(b.Meta().Stats.NumSamples) if m < 0.7*n || m > 1.3*n { return false @@ -109,12 +109,12 @@ func compactionMatch(blocks []Block) bool { return t < 10*200e6 } -func mergeStats(blocks ...Block) (res BlockStats) { - res.MinTime = blocks[0].Stats().MinTime - res.MaxTime = blocks[len(blocks)-1].Stats().MaxTime +func mergeBlockMetas(blocks ...Block) (res BlockMeta) { + res.MinTime = blocks[0].Meta().MinTime + res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime for _, b := range blocks { - res.SampleCount += b.Stats().SampleCount + res.Stats.NumSamples += b.Meta().Stats.NumSamples } return res } @@ -147,7 +147,10 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { return errors.Wrap(err, "create index file") } - indexw := newIndexWriter(indexf) + indexw, err := newIndexWriter(indexf) + if err != nil { + return errors.Wrap(err, "open index writer") + } chunkw := newSeriesWriter(chunkf, indexw) if err = c.write(blocks, indexw, chunkw); err != nil { @@ -204,7 +207,7 @@ func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWrite postings = &memPostings{m: make(map[term][]uint32, 512)} values = map[string]stringset{} i = uint32(0) - stats = mergeStats(blocks...) + meta = mergeBlockMetas(blocks...) ) for set.Next() { @@ -213,8 +216,8 @@ func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWrite return err } - stats.ChunkCount += uint64(len(chunks)) - stats.SeriesCount++ + meta.Stats.NumChunks += uint64(len(chunks)) + meta.Stats.NumSeries++ for _, l := range lset { valset, ok := values[l.Name] @@ -232,10 +235,6 @@ func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWrite return set.Err() } - if err := indexw.WriteStats(stats); err != nil { - return err - } - s := make([]string, 0, 256) for n, v := range values { s = s[:0] diff --git a/db.go b/db.go index 84883237c1..f581a7e2df 100644 --- a/db.go +++ b/db.go @@ -152,7 +152,7 @@ func (db *DB) run() { select { case <-db.cutc: db.mtx.Lock() - err := db.cut() + _, err := db.cut() db.mtx.Unlock() if err != nil { @@ -268,34 +268,6 @@ func (db *DB) compact(i, j int) error { return nil } -func isBlockDir(fi os.FileInfo) bool { - if !fi.IsDir() { - return false - } - if !strings.HasPrefix(fi.Name(), "b-") { - return false - } - if _, err := strconv.ParseUint(fi.Name()[2:], 10, 32); err != nil { - return false - } - return true -} - -func blockDirs(dir string) ([]string, error) { - files, err := ioutil.ReadDir(dir) - if err != nil { - return nil, err - } - var dirs []string - - for _, fi := range files { - if isBlockDir(fi) { - dirs = append(dirs, filepath.Join(dir, fi.Name())) - } - } - return dirs, nil -} - func (db *DB) initBlocks() error { var ( persisted []*persistedBlock @@ -309,7 +281,7 @@ func (db *DB) initBlocks() error { for _, dir := range dirs { if fileutil.Exist(filepath.Join(dir, walFileName)) { - h, err := openHeadBlock(dir, db.logger) + h, err := openHeadBlock(dir, db.logger, nil) if err != nil { return err } @@ -327,9 +299,9 @@ func (db *DB) initBlocks() error { db.heads = heads if len(heads) == 0 { - return db.cut() + _, err = db.cut() } - return nil + return err } // Close the partition. @@ -418,24 +390,6 @@ func (a *dbAppender) Rollback() error { return err } -func (db *DB) headForDir(dir string) (int, bool) { - for i, b := range db.heads { - if b.Dir() == dir { - return i, true - } - } - return -1, false -} - -func (db *DB) persistedForDir(dir string) (int, bool) { - for i, b := range db.persisted { - if b.Dir() == dir { - return i, true - } - } - return -1, false -} - func (db *DB) compactable() []Block { db.mtx.RLock() defer db.mtx.RUnlock() @@ -471,14 +425,14 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { var bs []Block for _, b := range db.persisted { - s := b.Stats() - if intervalOverlap(mint, maxt, s.MinTime, s.MaxTime) { + m := b.Meta() + if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { bs = append(bs, b) } } for _, b := range db.heads { - s := b.Stats() - if intervalOverlap(mint, maxt, s.MinTime, s.MaxTime) { + m := b.Meta() + if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { bs = append(bs, b) } } @@ -488,23 +442,62 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (db *DB) cut() error { - dir, err := db.nextBlockDir() +func (db *DB) cut() (*headBlock, error) { + dir, err := nextBlockDir(db.dir) if err != nil { - return err + return nil, err } - newHead, err := openHeadBlock(dir, db.logger) + + // If its not the very first head block, all its samples must not be + // larger than + var minTime *int64 + + if len(db.heads) > 0 { + cb := db.heads[len(db.heads)-1] + minTime = new(int64) + *minTime = cb.Meta().MaxTime + 1 + } + + newHead, err := openHeadBlock(dir, db.logger, minTime) if err != nil { - return err + return nil, err } db.heads = append(db.heads, newHead) db.headGen++ - return nil + return newHead, nil } -func (db *DB) nextBlockDir() (string, error) { - names, err := fileutil.ReadDir(db.dir) +func isBlockDir(fi os.FileInfo) bool { + if !fi.IsDir() { + return false + } + if !strings.HasPrefix(fi.Name(), "b-") { + return false + } + if _, err := strconv.ParseUint(fi.Name()[2:], 10, 32); err != nil { + return false + } + return true +} + +func blockDirs(dir string) ([]string, error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + var dirs []string + + for _, fi := range files { + if isBlockDir(fi) { + dirs = append(dirs, filepath.Join(dir, fi.Name())) + } + } + return dirs, nil +} + +func nextBlockDir(dir string) (string, error) { + names, err := fileutil.ReadDir(dir) if err != nil { return "", err } @@ -520,7 +513,7 @@ func (db *DB) nextBlockDir() (string, error) { } i = j } - return filepath.Join(db.dir, fmt.Sprintf("b-%0.6d", i+1)), nil + return filepath.Join(dir, fmt.Sprintf("b-%0.6d", i+1)), nil } // PartitionedDB is a time series storage. @@ -691,14 +684,3 @@ func yoloString(b []byte) string { } return *((*string)(unsafe.Pointer(&h))) } - -func yoloBytes(s string) []byte { - sh := (*reflect.StringHeader)(unsafe.Pointer(&s)) - - h := reflect.SliceHeader{ - Cap: sh.Len, - Len: sh.Len, - Data: sh.Data, - } - return *((*[]byte)(unsafe.Pointer(&h))) -} diff --git a/head.go b/head.go index 42fdc414d2..d0a9c73597 100644 --- a/head.go +++ b/head.go @@ -1,10 +1,13 @@ package tsdb import ( - "errors" + "encoding/json" "fmt" + "io/ioutil" "math" "math/rand" + "os" + "path/filepath" "sort" "sync" "time" @@ -13,6 +16,7 @@ import ( "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" + "github.com/pkg/errors" ) var ( @@ -37,6 +41,10 @@ type headBlock struct { mtx sync.RWMutex dir string + // Head blocks are initialized with a minimum timestamp if they were preceded + // by another block. Appended samples must not have a smaller timestamp than this. + minTime *int64 + // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. series []*memSeries @@ -52,18 +60,24 @@ type headBlock struct { wal *WAL - stats *BlockStats + metamtx sync.RWMutex + meta BlockMeta } // openHeadBlock creates a new empty head block. -func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { +func openHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error) { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, err + } + wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 5*time.Second) if err != nil { return nil, err } - b := &headBlock{ + h := &headBlock{ dir: dir, + minTime: minTime, series: []*memSeries{}, hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, @@ -71,35 +85,46 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { wal: wal, mapper: newPositionMapper(nil), } - b.stats = &BlockStats{ - MinTime: math.MinInt64, - MaxTime: math.MaxInt64, - } - err = wal.ReadAll(&walHandler{ - series: func(lset labels.Labels) { - b.create(lset.Hash(), lset) - b.stats.SeriesCount++ - }, - sample: func(s refdSample) { - b.series[s.ref].append(s.t, s.v) - - if s.t < b.stats.MinTime { - b.stats.MinTime = s.t - } - if s.t > b.stats.MaxTime { - b.stats.MaxTime = s.t - } - b.stats.SampleCount++ - }, - }) + b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) if err != nil { return nil, err } + if err := json.Unmarshal(b, &h.meta); err != nil { + return nil, err + } - b.updateMapping() + // Replay contents of the write ahead log. + if err = wal.ReadAll(&walHandler{ + series: func(lset labels.Labels) error { + h.create(lset.Hash(), lset) + h.meta.Stats.NumSeries++ - return b, nil + return nil + }, + sample: func(s refdSample) error { + h.series[s.ref].append(s.t, s.v) + + if h.minTime != nil && s.t < *h.minTime { + return errors.Errorf("sample earlier than minimum timestamp %d", *h.minTime) + } + if s.t < h.meta.MinTime { + h.meta.MinTime = s.t + } + if s.t > h.meta.MaxTime { + h.meta.MaxTime = s.t + } + h.meta.Stats.NumSamples++ + + return nil + }, + }); err != nil { + return nil, err + } + + h.updateMapping() + + return h, nil } // Close syncs all data and closes underlying resources of the head block. @@ -107,19 +132,18 @@ func (h *headBlock) Close() error { return h.wal.Close() } +func (h *headBlock) Meta() BlockMeta { + h.metamtx.RLock() + defer h.metamtx.RUnlock() + + return h.meta +} + func (h *headBlock) Dir() string { return h.dir } func (h *headBlock) Persisted() bool { return false } func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } func (h *headBlock) Series() SeriesReader { return &headSeriesReader{h} } -// Stats returns statisitics about the indexed data. -func (h *headBlock) Stats() BlockStats { - h.stats.mtx.RLock() - defer h.stats.mtx.RUnlock() - - return *h.stats -} - func (h *headBlock) Appender() Appender { h.mtx.RLock() return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} @@ -194,7 +218,7 @@ func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error } func (a *headAppender) Add(ref uint64, t int64, v float64) error { - // We only own the first 5 bytes of the reference. Anything before is + // We only own the last 5 bytes of the reference. Anything before is // used by higher-order appenders. We erase it to avoid issues. ref = (ref << 24) >> 24 @@ -287,6 +311,7 @@ func (a *headAppender) Commit() error { // Write all new series and samples to the WAL and add it to the // in-mem database on success. if err := a.wal.Log(a.newLabels, a.samples); err != nil { + a.mtx.RUnlock() return err } @@ -298,17 +323,17 @@ func (a *headAppender) Commit() error { a.mtx.RUnlock() - a.stats.mtx.Lock() - defer a.stats.mtx.Unlock() + a.metamtx.Lock() + defer a.metamtx.Unlock() - a.stats.SampleCount += total - a.stats.SeriesCount += uint64(len(a.newSeries)) + a.meta.Stats.NumSamples += total + a.meta.Stats.NumSeries += uint64(len(a.newSeries)) - if mint < a.stats.MinTime { - a.stats.MinTime = mint + if mint < a.meta.MinTime { + a.meta.MinTime = mint } - if maxt > a.stats.MaxTime { - a.stats.MaxTime = maxt + if maxt > a.meta.MaxTime { + a.meta.MaxTime = maxt } return nil @@ -420,12 +445,6 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) { return res, nil } -func (h *headIndexReader) Stats() (BlockStats, error) { - h.stats.mtx.RLock() - defer h.stats.mtx.RUnlock() - return *h.stats, nil -} - // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { @@ -467,10 +486,10 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { } func (h *headBlock) fullness() float64 { - h.stats.mtx.RLock() - defer h.stats.mtx.RUnlock() + h.metamtx.RLock() + defer h.metamtx.RUnlock() - return float64(h.stats.SampleCount) / float64(h.stats.SeriesCount+1) / 250 + return float64(h.meta.Stats.NumSamples) / float64(h.meta.Stats.NumSeries+1) / 250 } func (h *headBlock) updateMapping() { diff --git a/reader.go b/reader.go index 8ff0f9cb47..5171ee56ef 100644 --- a/reader.go +++ b/reader.go @@ -53,9 +53,6 @@ func (s *seriesReader) Chunk(offset uint32) (chunks.Chunk, error) { // IndexReader provides reading access of serialized index data. type IndexReader interface { - // Stats returns statisitics about the indexed data. - Stats() (BlockStats, error) - // LabelValues returns the possible label values LabelValues(names ...string) (StringTuples, error) @@ -195,28 +192,6 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) { return yoloString(b), nil } -func (r *indexReader) Stats() (BlockStats, error) { - flag, b, err := r.section(8) - if err != nil { - return BlockStats{}, err - } - if flag != flagStd { - return BlockStats{}, errInvalidFlag - } - - if len(b) != 64 { - return BlockStats{}, errInvalidSize - } - - return BlockStats{ - MinTime: int64(binary.BigEndian.Uint64(b)), - MaxTime: int64(binary.BigEndian.Uint64(b[8:])), - SeriesCount: binary.BigEndian.Uint64(b[16:]), - ChunkCount: binary.BigEndian.Uint64(b[24:]), - SampleCount: binary.BigEndian.Uint64(b[32:]), - }, nil -} - func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { key := strings.Join(names, string(sep)) off, ok := r.labels[key] diff --git a/wal.go b/wal.go index a24b439616..5731bba142 100644 --- a/wal.go +++ b/wal.go @@ -88,8 +88,8 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error } type walHandler struct { - sample func(refdSample) - series func(labels.Labels) + sample func(refdSample) error + series func(labels.Labels) error } // ReadAll consumes all entries in the WAL and triggers the registered handlers. @@ -343,7 +343,9 @@ func (d *walDecoder) decodeSeries(flag byte, b []byte) error { b = b[n+int(vl):] } - d.handler.series(lset) + if err := d.handler.series(lset); err != nil { + return err + } } return nil } @@ -382,7 +384,9 @@ func (d *walDecoder) decodeSamples(flag byte, b []byte) error { smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) b = b[8:] - d.handler.sample(smpl) + if err := d.handler.sample(smpl); err != nil { + return err + } } return nil } diff --git a/wal_test.go b/wal_test.go index 6e6f81ab5d..35ea053132 100644 --- a/wal_test.go +++ b/wal_test.go @@ -118,8 +118,14 @@ func BenchmarkWALRead(b *testing.B) { var numSeries, numSamples int err = wal.ReadAll(&walHandler{ - series: func(lset labels.Labels) { numSeries++ }, - sample: func(smpl refdSample) { numSamples++ }, + series: func(lset labels.Labels) error { + numSeries++ + return nil + }, + sample: func(smpl refdSample) error { + numSamples++ + return nil + }, }) require.NoError(b, err) diff --git a/writer.go b/writer.go index c246bad0d7..dd794ba180 100644 --- a/writer.go +++ b/writer.go @@ -2,7 +2,6 @@ package tsdb import ( "encoding/binary" - "fmt" "hash/crc32" "io" "sort" @@ -156,9 +155,6 @@ type IndexWriter interface { // list iterator. It only has to be available during the write processing. AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) - // WriteStats writes final stats for the indexed block. - WriteStats(BlockStats) error - // WriteLabelIndex serializes an index from label names to values. // The passed in values chained tuples of strings of the length of names. WriteLabelIndex(names []string, values []string) error @@ -183,9 +179,10 @@ type indexWriterSeries struct { // indexWriter implements the IndexWriter interface for the standard // serialization format. type indexWriter struct { - ow io.Writer - w *ioutil.PageWriter - n int64 + ow io.Writer + w *ioutil.PageWriter + n int64 + started bool series map[uint32]*indexWriterSeries @@ -194,14 +191,15 @@ type indexWriter struct { postings []hashEntry // postings lists offsets } -func newIndexWriter(w io.Writer) *indexWriter { - return &indexWriter{ +func newIndexWriter(w io.Writer) (*indexWriter, error) { + ix := &indexWriter{ w: ioutil.NewPageWriter(w, compactionPageBytes, 0), ow: w, n: 0, symbols: make(map[string]uint32, 4096), series: make(map[uint32]*indexWriterSeries, 4096), } + return ix, ix.writeMeta() } func (w *indexWriter) write(wr io.Writer, b []byte) error { @@ -253,39 +251,6 @@ func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkM } } -func (w *indexWriter) WriteStats(stats BlockStats) error { - if w.n != 0 { - return fmt.Errorf("WriteStats must be called first") - } - - if err := w.writeMeta(); err != nil { - return err - } - - b := [64]byte{} - - binary.BigEndian.PutUint64(b[0:], uint64(stats.MinTime)) - binary.BigEndian.PutUint64(b[8:], uint64(stats.MaxTime)) - binary.BigEndian.PutUint64(b[16:], stats.SeriesCount) - binary.BigEndian.PutUint64(b[24:], stats.ChunkCount) - binary.BigEndian.PutUint64(b[32:], stats.SampleCount) - - err := w.section(64, flagStd, func(wr io.Writer) error { - return w.write(wr, b[:]) - }) - if err != nil { - return err - } - - if err := w.writeSymbols(); err != nil { - return err - } - if err := w.writeSeries(); err != nil { - return err - } - return nil -} - func (w *indexWriter) writeSymbols() error { // Generate sorted list of strings we will store as reference table. symbols := make([]string, 0, len(w.symbols))