From 30727b1e71691ebc356e127651b7a31213f2c5a5 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 2 Mar 2017 10:12:08 +0100 Subject: [PATCH 1/7] Vendor compaction changes --- vendor/github.com/fabxc/tsdb/compact.go | 179 ++++++++---- vendor/github.com/fabxc/tsdb/db.go | 357 ++++++++++++------------ vendor/github.com/fabxc/tsdb/head.go | 19 +- vendor/github.com/fabxc/tsdb/querier.go | 2 + vendor/github.com/fabxc/tsdb/wal.go | 5 +- vendor/github.com/fabxc/tsdb/writer.go | 118 ++++---- vendor/vendor.json | 6 +- 7 files changed, 383 insertions(+), 303 deletions(-) diff --git a/vendor/github.com/fabxc/tsdb/compact.go b/vendor/github.com/fabxc/tsdb/compact.go index b58acba43..51d4ce0d1 100644 --- a/vendor/github.com/fabxc/tsdb/compact.go +++ b/vendor/github.com/fabxc/tsdb/compact.go @@ -13,6 +13,23 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Compactor provides compaction against an underlying storage +// of time series data. +type Compactor interface { + // Plan returns a set of non-overlapping directories that can + // be compacted concurrently. + // Results returned when compactions are in progress are undefined. + Plan(dir string) ([][]string, error) + + // Write persists a Block into a directory. + Write(dir string, b Block) error + + // Compact runs compaction against the provided directories. Must + // only be called concurrently with results of Plan(). + Compact(dirs ...string) error +} + +// compactor implements the Compactor interface. type compactor struct { metrics *compactorMetrics opts *compactorOptions @@ -69,61 +86,55 @@ type compactionInfo struct { const compactionBlocksLen = 3 -// pick returns a range [i, j) in the blocks that are suitable to be compacted -// into a single block at position i. -func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) { - if len(bs) == 0 { - return 0, 0, false +func (c *compactor) Plan(dir string) ([][]string, error) { + dirs, err := blockDirs(dir) + if err != nil { + return nil, err } - // First, we always compact pending in-memory blocks – oldest first. - for i, b := range bs { - if b.generation > 0 { - continue - } - // Directly compact into 2nd generation with previous generation 1 blocks. - if i+1 >= compactionBlocksLen { - match := true - for _, pb := range bs[i-compactionBlocksLen+1 : i] { - match = match && pb.generation == 1 - } - if match { - return i - compactionBlocksLen + 1, i + 1, true - } - } - // If we have enough generation 0 blocks to directly move to the - // 2nd generation, skip generation 1. - if len(bs)-i >= compactionBlocksLen { - // Guard against the newly compacted block becoming larger than - // the previous one. - if i == 0 || bs[i-1].generation >= 2 { - return i, i + compactionBlocksLen, true - } - } + var bs []*BlockMeta - // No optimizations possible, naiively compact the new block. - return i, i + 1, true + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + return nil, err + } + if meta.Compaction.Generation > 0 { + bs = append(bs, meta) + } + } + + if len(bs) == 0 { + return nil, nil + } + + sliceDirs := func(i, j int) [][]string { + var res []string + for k := i; k < j; k++ { + res = append(res, dirs[k]) + } + return [][]string{res} } // Then we care about compacting multiple blocks, starting with the oldest. - for i := 0; i < len(bs)-compactionBlocksLen+1; i += compactionBlocksLen { + for i := 0; i < len(bs)-compactionBlocksLen+1; i++ { if c.match(bs[i : i+3]) { - return i, i + compactionBlocksLen, true + return sliceDirs(i, i+compactionBlocksLen), nil } } - return 0, 0, false + return nil, nil } -func (c *compactor) match(bs []compactionInfo) bool { - g := bs[0].generation +func (c *compactor) match(bs []*BlockMeta) bool { + g := bs[0].Compaction.Generation for _, b := range bs { - if b.generation != g { + if b.Compaction.Generation != g { return false } } - return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange + return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange } var entropy = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -136,11 +147,7 @@ func mergeBlockMetas(blocks ...Block) (res BlockMeta) { res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime res.ULID = ulid.MustNew(ulid.Now(), entropy) - g := m0.Compaction.Generation - if g == 0 && len(blocks) > 1 { - g++ - } - res.Compaction.Generation = g + 1 + res.Compaction.Generation = m0.Compaction.Generation + 1 for _, b := range blocks { res.Stats.NumSamples += b.Meta().Stats.NumSamples @@ -148,35 +155,62 @@ func mergeBlockMetas(blocks ...Block) (res BlockMeta) { return res } -func (c *compactor) compact(dir string, blocks ...Block) (err error) { - start := time.Now() - defer func() { +func (c *compactor) Compact(dirs ...string) (err error) { + var blocks []Block + + for _, d := range dirs { + b, err := newPersistedBlock(d) + if err != nil { + return err + } + blocks = append(blocks, b) + } + + return c.write(dirs[0], blocks...) +} + +func (c *compactor) Write(dir string, b Block) error { + return c.write(dir, b) +} + +// write creates a new block that is the union of the provided blocks into dir. +// It cleans up all files of the old blocks after completing successfully. +func (c *compactor) write(dir string, blocks ...Block) (err error) { + defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() } - c.metrics.duration.Observe(time.Since(start).Seconds()) - }() + c.metrics.duration.Observe(time.Since(t).Seconds()) + }(time.Now()) - if err = os.RemoveAll(dir); err != nil { + tmp := dir + ".tmp" + + if err = os.RemoveAll(tmp); err != nil { return err } - if err = os.MkdirAll(dir, 0777); err != nil { + if err = os.MkdirAll(tmp, 0777); err != nil { return err } - chunkw, err := newChunkWriter(chunkDir(dir)) + // Populate chunk and index files into temporary directory with + // data of all blocks. + chunkw, err := newChunkWriter(chunkDir(tmp)) if err != nil { return errors.Wrap(err, "open chunk writer") } - indexw, err := newIndexWriter(dir) + indexw, err := newIndexWriter(tmp) if err != nil { return errors.Wrap(err, "open index writer") } - if err = c.write(dir, blocks, indexw, chunkw); err != nil { + meta, err := c.populate(blocks, indexw, chunkw) + if err != nil { return errors.Wrap(err, "write compaction") } + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } if err = chunkw.Close(); err != nil { return errors.Wrap(err, "close chunk writer") @@ -184,16 +218,37 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { if err = indexw.Close(); err != nil { return errors.Wrap(err, "close index writer") } + + // Block successfully written, make visible and remove old ones. + if err := renameFile(tmp, dir); err != nil { + return errors.Wrap(err, "rename block dir") + } + for _, b := range blocks[1:] { + if err := os.RemoveAll(b.Dir()); err != nil { + return err + } + } + // Properly sync parent dir to ensure changes are visible. + df, err := fileutil.OpenDir(dir) + if err != nil { + return errors.Wrap(err, "sync block dir") + } + if err := fileutil.Fsync(df); err != nil { + return errors.Wrap(err, "sync block dir") + } + return nil } -func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw ChunkWriter) error { +// populate fills the index and chunk writers with new data gathered as the union +// of the provided blocks. It returns meta information for the new block. +func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { var set compactionSet for i, b := range blocks { all, err := b.Index().Postings("", "") if err != nil { - return err + return nil, err } // TODO(fabxc): find more transparent way of handling this. if hb, ok := b.(*headBlock); ok { @@ -207,7 +262,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw } set, err = newCompactionMerger(set, s) if err != nil { - return err + return nil, err } } @@ -222,7 +277,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw for set.Next() { lset, chunks := set.At() if err := chunkw.WriteChunks(chunks...); err != nil { - return err + return nil, err } indexw.AddSeries(i, lset, chunks...) @@ -243,7 +298,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw i++ } if set.Err() != nil { - return set.Err() + return nil, set.Err() } s := make([]string, 0, 256) @@ -254,13 +309,13 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw s = append(s, x) } if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { - return err + return nil, err } } for t := range postings.m { if err := indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil { - return err + return nil, err } } // Write a postings list containing all series. @@ -269,10 +324,10 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw all[i] = uint32(i) } if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { - return err + return nil, err } - return writeMetaFile(dir, &meta) + return &meta, nil } type compactionSet interface { diff --git a/vendor/github.com/fabxc/tsdb/db.go b/vendor/github.com/fabxc/tsdb/db.go index f967b044c..b1a3a2421 100644 --- a/vendor/github.com/fabxc/tsdb/db.go +++ b/vendor/github.com/fabxc/tsdb/db.go @@ -87,18 +87,26 @@ const sep = '\xff' // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { - dir string - lockf lockfile.Lockfile + dir string + lockf lockfile.Lockfile + logger log.Logger metrics *dbMetrics opts *Options + // Mutex for that must be held when modifying the general + // block layout. mtx sync.RWMutex persisted []*persistedBlock - heads []*headBlock - headGen uint8 + seqBlocks map[int]Block - compactor *compactor + // Mutex that must be held when modifying just the head blocks + // or the general layout. + headmtx sync.RWMutex + heads []*headBlock + headGen uint8 + + compactor Compactor compactc chan struct{} donec chan struct{} @@ -175,10 +183,9 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db maxBlockRange: opts.MaxBlockDuration, }) - if err := db.initBlocks(); err != nil { + if err := db.reloadBlocks(); err != nil { return nil, err } - go db.run() return db, nil @@ -200,35 +207,16 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - var seqs []int - var infos []compactionInfo - for _, b := range db.compactable() { - m := b.Meta() + var merr MultiError - infos = append(infos, compactionInfo{ - generation: m.Compaction.Generation, - mint: m.MinTime, - maxt: m.MaxTime, - seq: m.Sequence, - }) - seqs = append(seqs, m.Sequence) + changes, err := db.compact() + merr.Add(err) + + if changes { + merr.Add(db.reloadBlocks()) } - - i, j, ok := db.compactor.pick(infos) - if !ok { - continue - } - db.logger.Log("msg", "compact", "seqs", fmt.Sprintf("%v", seqs[i:j])) - - if err := db.compact(i, j); err != nil { + if err := merr.Err(); err != nil { db.logger.Log("msg", "compaction failed", "err", err) - continue - } - db.logger.Log("msg", "compaction completed") - // Trigger another compaction in case there's more work to do. - select { - case db.compactc <- struct{}{}: - default: } case <-db.stopc: @@ -237,150 +225,170 @@ func (db *DB) run() { } } -func (db *DB) getBlock(i int) Block { - if i < len(db.persisted) { - return db.persisted[i] - } - return db.heads[i-len(db.persisted)] -} +func (db *DB) compact() (changes bool, err error) { + // Check whether we have pending head blocks that are ready to be persisted. + // They have the highest priority. + db.headmtx.RLock() -// removeBlocks removes the blocks in range [i, j) from the list of persisted -// and head blocks. The blocks are not closed and their files not deleted. -func (db *DB) removeBlocks(i, j int) { - for k := i; k < j; k++ { - if i < len(db.persisted) { - db.persisted = append(db.persisted[:i], db.persisted[i+1:]...) - } else { - l := i - len(db.persisted) - db.heads = append(db.heads[:l], db.heads[l+1:]...) - } - } -} + var singles []*headBlock -func (db *DB) blocks() (bs []Block) { - for _, b := range db.persisted { - bs = append(bs, b) - } - for _, b := range db.heads { - bs = append(bs, b) - } - return bs -} - -// compact block in range [i, j) into a temporary directory and atomically -// swap the blocks out on successful completion. -func (db *DB) compact(i, j int) error { - if j <= i { - return errors.New("invalid compaction block range") - } - var blocks []Block - for k := i; k < j; k++ { - blocks = append(blocks, db.getBlock(k)) - } - var ( - dir = blocks[0].Dir() - tmpdir = dir + ".tmp" - ) - - if err := db.compactor.compact(tmpdir, blocks...); err != nil { - return err - } - - pb, err := newPersistedBlock(tmpdir) - if err != nil { - return err - } - - db.mtx.Lock() - defer db.mtx.Unlock() - - for _, b := range blocks { - if err := b.Close(); err != nil { - return errors.Wrapf(err, "close old block %s", b.Dir()) + // Collect head blocks that are ready for compaction. Write them after + // returning the lock to not block Appenders. + // Selected blocks are semantically ensured to not be written to afterwards + // by appendable(). + if len(db.heads) > db.opts.AppendableBlocks { + for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { + // Blocks that won't be appendable when instantiating a new appender + // might still have active appenders on them. + // Abort at the first one we encounter. + if atomic.LoadUint64(&h.activeWriters) > 0 { + break + } + singles = append(singles, h) } } - if err := renameFile(tmpdir, dir); err != nil { - return errors.Wrap(err, "rename dir") - } - pb.dir = dir + db.headmtx.RUnlock() - db.removeBlocks(i, j) - db.persisted = append(db.persisted, pb) +Loop: + for _, h := range singles { + db.logger.Log("msg", "write head", "seq", h.Meta().Sequence) - for _, b := range blocks[1:] { - db.logger.Log("msg", "remove old dir", "dir", b.Dir()) - if err := os.RemoveAll(b.Dir()); err != nil { - return errors.Wrap(err, "removing old block") + select { + case <-db.stopc: + break Loop + default: } - } - if err := db.retentionCutoff(); err != nil { - return err + + if err = db.compactor.Write(h.Dir(), h); err != nil { + return changes, errors.Wrap(err, "persist head block") + } + changes = true } - return nil -} + // Check for compactions of multiple blocks. + for { + plans, err := db.compactor.Plan(db.dir) + if err != nil { + return changes, errors.Wrap(err, "plan compaction") + } -func (db *DB) retentionCutoff() error { - if db.opts.RetentionDuration == 0 { - return nil - } - h := db.heads[len(db.heads)-1] - t := h.meta.MinTime - int64(db.opts.RetentionDuration) + select { + case <-db.stopc: + return false, nil + default: + } + // We just execute compactions sequentially to not cause too extreme + // CPU and memory spikes. + // TODO(fabxc): return more descriptive plans in the future that allow + // estimation of resource usage and conditional parallelization? + for _, p := range plans { + db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p)) - var ( - blocks = db.blocks() - i int - b Block - ) - for i, b = range blocks { - if b.Meta().MinTime >= t { + if err := db.compactor.Compact(p...); err != nil { + return changes, errors.Wrapf(err, "compact %s", p) + } + changes = true + } + // If we didn't compact anything, there's nothing left to do. + if len(plans) == 0 { break } } - if i <= 1 { - return nil - } - db.logger.Log("msg", "retention cutoff", "idx", i-1) - db.removeBlocks(0, i) - for _, b := range blocks[:i] { - if err := os.RemoveAll(b.Dir()); err != nil { - return errors.Wrap(err, "removing old block") - } - } - return nil + return changes, nil } -func (db *DB) initBlocks() error { - var ( - persisted []*persistedBlock - heads []*headBlock - ) +// func (db *DB) retentionCutoff() error { +// if db.opts.RetentionDuration == 0 { +// return nil +// } +// h := db.heads[len(db.heads)-1] +// t := h.meta.MinTime - int64(db.opts.RetentionDuration) + +// var ( +// blocks = db.blocks() +// i int +// b Block +// ) +// for i, b = range blocks { +// if b.Meta().MinTime >= t { +// break +// } +// } +// if i <= 1 { +// return nil +// } +// db.logger.Log("msg", "retention cutoff", "idx", i-1) +// db.removeBlocks(0, i) + +// for _, b := range blocks[:i] { +// if err := os.RemoveAll(b.Dir()); err != nil { +// return errors.Wrap(err, "removing old block") +// } +// } +// return nil +// } + +func (db *DB) reloadBlocks() error { + db.mtx.Lock() + defer db.mtx.Unlock() dirs, err := blockDirs(db.dir) if err != nil { - return err + return errors.Wrap(err, "find blocks") } + var ( + metas []*BlockMeta + persisted []*persistedBlock + heads []*headBlock + seqBlocks = make(map[int]Block, len(dirs)) + ) for _, dir := range dirs { - if fileutil.Exist(filepath.Join(dir, walDirName)) { - h, err := openHeadBlock(dir, db.logger) - if err != nil { - return err - } - h.generation = db.headGen - db.headGen++ - heads = append(heads, h) - continue - } - b, err := newPersistedBlock(dir) + meta, err := readMetaFile(dir) if err != nil { - return err + return errors.Wrapf(err, "read meta information %s", dir) } - persisted = append(persisted, b) + metas = append(metas, meta) } + for i, meta := range metas { + b, ok := db.seqBlocks[meta.Sequence] + if !ok { + return errors.Errorf("missing block for sequence %d", meta.Sequence) + } + + if meta.Compaction.Generation == 0 { + if meta.ULID != b.Meta().ULID { + return errors.Errorf("head block ULID changed unexpectedly") + } + heads = append(heads, b.(*headBlock)) + } else { + if meta.ULID != b.Meta().ULID { + if err := b.Close(); err != nil { + return err + } + b, err = newPersistedBlock(dirs[i]) + if err != nil { + return errors.Wrapf(err, "open persisted block %s", dirs[i]) + } + } + persisted = append(persisted, b.(*persistedBlock)) + } + + seqBlocks[meta.Sequence] = b + } + + for seq, b := range db.seqBlocks { + if _, ok := seqBlocks[seq]; !ok { + if err := b.Close(); err != nil { + return errors.Wrapf(err, "closing removed block %d", b.Meta().Sequence) + } + } + } + + db.seqBlocks = seqBlocks db.persisted = persisted db.heads = heads @@ -392,10 +400,11 @@ func (db *DB) Close() error { close(db.stopc) <-db.donec - var merr MultiError - + // Lock mutex and leave it locked so we panic if there's a bug causing + // the block to be used afterwards. db.mtx.Lock() - defer db.mtx.Unlock() + + var merr MultiError for _, pb := range db.persisted { merr.Add(pb.Close()) @@ -414,9 +423,14 @@ func (db *DB) Appender() Appender { db.mtx.RLock() a := &dbAppender{db: db} + db.headmtx.RLock() + for _, b := range db.appendable() { a.heads = append(a.heads, b.Appender().(*headAppender)) } + + db.headmtx.RUnlock() + return a } @@ -479,15 +493,12 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { // If there's no fitting head block for t, ensure it gets created. if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { - a.db.mtx.RUnlock() + a.db.headmtx.Lock() if err := a.db.ensureHead(t); err != nil { - a.db.mtx.RLock() + a.db.headmtx.Unlock() return nil, err } - - a.db.mtx.RLock() - if len(a.heads) == 0 { for _, b := range a.db.appendable() { a.heads = append(a.heads, b.Appender().(*headAppender)) @@ -500,6 +511,8 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { } } } + + a.db.headmtx.Unlock() } for i := len(a.heads) - 1; i >= 0; i-- { if h := a.heads[i]; t >= h.meta.MinTime { @@ -511,8 +524,8 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { } func (db *DB) ensureHead(t int64) error { - db.mtx.Lock() - defer db.mtx.Unlock() + // db.mtx.Lock() + // defer db.mtx.Unlock() // Initial case for a new database: we must create the first // AppendableBlocks-1 front padding heads. @@ -568,31 +581,6 @@ func (db *DB) appendable() []*headBlock { return db.heads[len(db.heads)-db.opts.AppendableBlocks:] } -func (db *DB) compactable() []Block { - db.mtx.RLock() - defer db.mtx.RUnlock() - - var blocks []Block - for _, pb := range db.persisted { - blocks = append(blocks, pb) - } - - if len(db.heads) <= db.opts.AppendableBlocks { - return blocks - } - - for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { - // Blocks that won't be appendable when instantiating a new appender - // might still have active appenders on them. - // Abort at the first one we encounter. - if atomic.LoadUint64(&h.activeWriters) > 0 { - break - } - blocks = append(blocks, h) - } - return blocks -} - func intervalOverlap(amin, amax, bmin, bmax int64) bool { if bmin >= amin && bmin <= amax { return true @@ -643,6 +631,7 @@ func (db *DB) cut(mint int64) (*headBlock, error) { } db.heads = append(db.heads, newHead) + db.seqBlocks[seq] = newHead db.headGen++ newHead.generation = db.headGen diff --git a/vendor/github.com/fabxc/tsdb/head.go b/vendor/github.com/fabxc/tsdb/head.go index 260fb962e..058c8fe79 100644 --- a/vendor/github.com/fabxc/tsdb/head.go +++ b/vendor/github.com/fabxc/tsdb/head.go @@ -63,7 +63,10 @@ type headBlock struct { } func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { - if err := os.MkdirAll(dir, 0777); err != nil { + // Make head block creation appear atomic. + tmp := dir + ".tmp" + + if err := os.MkdirAll(tmp, 0777); err != nil { return nil, err } ulid, err := ulid.New(ulid.Now(), entropy) @@ -71,7 +74,7 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head return nil, err } - if err := writeMetaFile(dir, &BlockMeta{ + if err := writeMetaFile(tmp, &BlockMeta{ ULID: ulid, Sequence: seq, MinTime: mint, @@ -79,6 +82,9 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head }); err != nil { return nil, err } + if err := renameFile(tmp, dir); err != nil { + return nil, err + } return openHeadBlock(dir, l) } @@ -139,12 +145,19 @@ func (h *headBlock) inBounds(t int64) bool { // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { + // Lock mutex and leave it locked so we panic if there's a bug causing + // the block to be used afterwards. + h.mtx.Lock() + if err := h.wal.Close(); err != nil { return err } // Check whether the head block still exists in the underlying dir - // or has already been replaced with a compacted version + // or has already been replaced with a compacted version or removed. meta, err := readMetaFile(h.dir) + if os.IsNotExist(err) { + return nil + } if err != nil { return err } diff --git a/vendor/github.com/fabxc/tsdb/querier.go b/vendor/github.com/fabxc/tsdb/querier.go index 5f02c77f7..7783ef312 100644 --- a/vendor/github.com/fabxc/tsdb/querier.go +++ b/vendor/github.com/fabxc/tsdb/querier.go @@ -47,7 +47,9 @@ type querier struct { func (s *DB) Querier(mint, maxt int64) Querier { s.mtx.RLock() + s.headmtx.RLock() blocks := s.blocksForInterval(mint, maxt) + s.headmtx.RUnlock() sq := &querier{ blocks: make([]Querier, 0, len(blocks)), diff --git a/vendor/github.com/fabxc/tsdb/wal.go b/vendor/github.com/fabxc/tsdb/wal.go index a923f5b3d..8b88d110d 100644 --- a/vendor/github.com/fabxc/tsdb/wal.go +++ b/vendor/github.com/fabxc/tsdb/wal.go @@ -58,7 +58,7 @@ type WAL struct { const ( walDirName = "wal" - walSegmentSizeBytes = 64 * 1000 * 1000 // 64 MB + walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB ) // OpenWAL opens or creates a write ahead log in the given directory. @@ -265,8 +265,9 @@ func (w *WAL) Close() error { close(w.stopc) <-w.donec + // Lock mutex and leave it locked so we panic if there's a bug causing + // the block to be used afterwards. w.mtx.Lock() - defer w.mtx.Unlock() if err := w.sync(); err != nil { return err diff --git a/vendor/github.com/fabxc/tsdb/writer.go b/vendor/github.com/fabxc/tsdb/writer.go index bafb0e8cd..264052d32 100644 --- a/vendor/github.com/fabxc/tsdb/writer.go +++ b/vendor/github.com/fabxc/tsdb/writer.go @@ -11,7 +11,6 @@ import ( "sort" "strings" - "github.com/bradfitz/slice" "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" @@ -262,6 +261,10 @@ type indexWriter struct { n int64 started bool + // Reusable memory. + b []byte + uint32s []uint32 + series map[uint32]*indexWriterSeries symbols map[string]uint32 // symbol offsets labelIndexes []hashEntry // label index offsets @@ -284,11 +287,17 @@ func newIndexWriter(dir string) (*indexWriter, error) { } iw := &indexWriter{ - f: f, - bufw: bufio.NewWriterSize(f, 1*1024*1024), - n: 0, - symbols: make(map[string]uint32, 4096), - series: make(map[uint32]*indexWriterSeries, 4096), + f: f, + bufw: bufio.NewWriterSize(f, 1<<22), + n: 0, + + // Reusable memory. + b: make([]byte, 0, 1<<23), + uint32s: make([]uint32, 0, 1<<15), + + // Caches. + symbols: make(map[string]uint32, 1<<13), + series: make(map[uint32]*indexWriterSeries, 1<<16), crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } if err := iw.writeMeta(); err != nil { @@ -304,12 +313,12 @@ func (w *indexWriter) write(wr io.Writer, b []byte) error { } // section writes a CRC32 checksummed section of length l and guarded by flag. -func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error { +func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error { w.crc32.Reset() wr := io.MultiWriter(w.crc32, w.bufw) b := [5]byte{flag, 0, 0, 0, 0} - binary.BigEndian.PutUint32(b[1:], l) + binary.BigEndian.PutUint32(b[1:], uint32(l)) if err := w.write(wr, b[:]); err != nil { return errors.Wrap(err, "writing header") @@ -363,74 +372,77 @@ func (w *indexWriter) writeSymbols() error { base := uint32(w.n) + 5 buf := [binary.MaxVarintLen32]byte{} - b := append(make([]byte, 0, 4096), flagStd) + w.b = append(w.b[:0], flagStd) for _, s := range symbols { - w.symbols[s] = base + uint32(len(b)) + w.symbols[s] = base + uint32(len(w.b)) n := binary.PutUvarint(buf[:], uint64(len(s))) - b = append(b, buf[:n]...) - b = append(b, s...) + w.b = append(w.b, buf[:n]...) + w.b = append(w.b, s...) } - l := uint32(len(b)) - - return w.section(l, flagStd, func(wr io.Writer) error { - return w.write(wr, b) + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) } +type indexWriterSeriesSlice []*indexWriterSeries + +func (s indexWriterSeriesSlice) Len() int { return len(s) } +func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s indexWriterSeriesSlice) Less(i, j int) bool { + return labels.Compare(s[i].labels, s[j].labels) < 0 +} + func (w *indexWriter) writeSeries() error { // Series must be stored sorted along their labels. - series := make([]*indexWriterSeries, 0, len(w.series)) + series := make(indexWriterSeriesSlice, 0, len(w.series)) for _, s := range w.series { series = append(series, s) } - slice.Sort(series, func(i, j int) bool { - return labels.Compare(series[i].labels, series[j].labels) < 0 - }) + sort.Sort(series) // Current end of file plus 5 bytes for section header. // TODO(fabxc): switch to relative offsets. base := uint32(w.n) + 5 - b := make([]byte, 0, 1<<20) // 1MiB + w.b = w.b[:0] buf := make([]byte, binary.MaxVarintLen64) for _, s := range series { // Write label set symbol references. - s.offset = base + uint32(len(b)) + s.offset = base + uint32(len(w.b)) n := binary.PutUvarint(buf, uint64(len(s.labels))) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) for _, l := range s.labels { n = binary.PutUvarint(buf, uint64(w.symbols[l.Name])) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) n = binary.PutUvarint(buf, uint64(w.symbols[l.Value])) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) } // Write chunks meta data including reference into chunk file. n = binary.PutUvarint(buf, uint64(len(s.chunks))) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) for _, c := range s.chunks { n = binary.PutVarint(buf, c.MinTime) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) n = binary.PutVarint(buf, c.MaxTime) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) n = binary.PutUvarint(buf, uint64(c.Ref)) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) } } - l := uint32(len(b)) - - return w.section(l, flagStd, func(wr io.Writer) error { - return w.write(wr, b) + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) } @@ -467,7 +479,7 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { buf := make([]byte, binary.MaxVarintLen32) n := binary.PutUvarint(buf, uint64(len(names))) - l := uint32(n) + uint32(len(values)*4) + l := n + len(values)*4 return w.section(l, flagStd, func(wr io.Writer) error { // First byte indicates tuple size for index. @@ -500,13 +512,10 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { offset: uint32(w.n), }) - b := make([]byte, 0, 4096) - buf := [4]byte{} - // Order of the references in the postings list does not imply order // of the series references within the persisted block they are mapped to. // We have to sort the new references again. - var refs []uint32 + refs := w.uint32s[:0] for it.Next() { s, ok := w.series[it.At()] @@ -519,38 +528,49 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { return err } - slice.Sort(refs, func(i, j int) bool { return refs[i] < refs[j] }) + sort.Sort(uint32slice(refs)) + + w.b = w.b[:0] + buf := make([]byte, 4) for _, r := range refs { - binary.BigEndian.PutUint32(buf[:], r) - b = append(b, buf[:]...) + binary.BigEndian.PutUint32(buf, r) + w.b = append(w.b, buf...) } - return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error { - return w.write(wr, b) + w.uint32s = refs[:0] + + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) } +type uint32slice []uint32 + +func (s uint32slice) Len() int { return len(s) } +func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } + type hashEntry struct { name string offset uint32 } func (w *indexWriter) writeHashmap(h []hashEntry) error { - b := make([]byte, 0, 4096) + w.b = w.b[:0] buf := [binary.MaxVarintLen32]byte{} for _, e := range h { n := binary.PutUvarint(buf[:], uint64(len(e.name))) - b = append(b, buf[:n]...) - b = append(b, e.name...) + w.b = append(w.b, buf[:n]...) + w.b = append(w.b, e.name...) n = binary.PutUvarint(buf[:], uint64(e.offset)) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) } - return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error { - return w.write(wr, b) + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) } diff --git a/vendor/vendor.json b/vendor/vendor.json index 54bfb885f..e183acfcf 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -368,10 +368,10 @@ "revisionTime": "2016-09-30T00:14:02Z" }, { - "checksumSHA1": "E5C5z6CV6JeIA2cpT3KVWeFgZdM=", + "checksumSHA1": "IOnF9CNVjOBoVwdfzfUEv/+JotI=", "path": "github.com/fabxc/tsdb", - "revision": "2c3b56350a6d75a15484494c5a87145828cb34ef", - "revisionTime": "2017-03-01T16:19:57Z" + "revision": "55a9b5428aceb644b3b297d7a9fd63d0354ce953", + "revisionTime": "2017-03-04T15:50:48Z" }, { "checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=", From d9fb57cde44ea189be754eced4d224a413dd3d42 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 7 Mar 2017 11:41:11 +0100 Subject: [PATCH 2/7] *: Simplify []byte to string unsafe conversion --- pkg/textparse/parse.go | 9 +-------- retrieval/scrape.go | 9 +-------- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/pkg/textparse/parse.go b/pkg/textparse/parse.go index 51d08b932..d04ec2942 100644 --- a/pkg/textparse/parse.go +++ b/pkg/textparse/parse.go @@ -7,7 +7,6 @@ package textparse import ( "errors" "io" - "reflect" "sort" "unsafe" @@ -106,11 +105,5 @@ func (p *Parser) Metric(l *labels.Labels) { } func yoloString(b []byte) string { - sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - - h := reflect.StringHeader{ - Data: sh.Data, - Len: sh.Len, - } - return *((*string)(unsafe.Pointer(&h))) + return *((*string)(unsafe.Pointer(&b))) } diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 5391eb3fe..7a2993f43 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -20,7 +20,6 @@ import ( "fmt" "io" "net/http" - "reflect" "sync" "time" "unsafe" @@ -586,13 +585,7 @@ loop: } func yoloString(b []byte) string { - sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - - h := reflect.StringHeader{ - Data: sh.Data, - Len: sh.Len, - } - return *((*string)(unsafe.Pointer(&h))) + return *((*string)(unsafe.Pointer(&b))) } func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error { From 9eb1d6c9270f1ab7ecdd2a7e73b07d1f74df9d1f Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 7 Mar 2017 11:43:32 +0100 Subject: [PATCH 3/7] remote: take code from master --- storage/remote/remote.go | 71 ++-------------------------------------- 1 file changed, 2 insertions(+), 69 deletions(-) diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 1c5fbd9d3..a53f866b3 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -1,8 +1,4 @@ -<<<<<<< HEAD -// Copyright 2016 The Prometheus Authors -======= // Copyright 2017 The Prometheus Authors ->>>>>>> master // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -23,29 +19,12 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" -<<<<<<< HEAD - "github.com/prometheus/prometheus/relabel" -======= ->>>>>>> master ) // Storage allows queueing samples for remote writes. type Storage struct { -<<<<<<< HEAD - mtx sync.RWMutex - externalLabels model.LabelSet - conf config.RemoteWriteConfig - - queue *StorageQueueManager -} - -// New returns a new remote Storage. -func New() *Storage { - return &Storage{} -======= mtx sync.RWMutex queues []*QueueManager ->>>>>>> master } // ApplyConfig updates the state as the new config requires. @@ -53,28 +32,6 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { s.mtx.Lock() defer s.mtx.Unlock() -<<<<<<< HEAD - // TODO: we should only stop & recreate queues which have changes, - // as this can be quite disruptive. - var newQueue *StorageQueueManager - - if conf.RemoteWriteConfig.URL != nil { - c, err := NewClient(conf.RemoteWriteConfig) - if err != nil { - return err - } - newQueue = NewStorageQueueManager(c, nil) - } - - if s.queue != nil { - s.queue.Stop() - } - s.queue = newQueue - s.conf = conf.RemoteWriteConfig - s.externalLabels = conf.GlobalConfig.ExternalLabels - if s.queue != nil { - s.queue.Start() -======= newQueues := []*QueueManager{} // TODO: we should only stop & recreate queues which have changes, // as this can be quite disruptive. @@ -97,15 +54,14 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { s.queues = newQueues for _, q := range s.queues { q.Start() ->>>>>>> master } return nil } // Stop the background processing of the storage queues. func (s *Storage) Stop() { - if s.queue != nil { - s.queue.Stop() + for _, q := range s.queues { + q.Stop() } } @@ -114,32 +70,9 @@ func (s *Storage) Append(smpl *model.Sample) error { s.mtx.RLock() defer s.mtx.RUnlock() -<<<<<<< HEAD - if s.queue == nil { - return nil - } - - var snew model.Sample - snew = *smpl - snew.Metric = smpl.Metric.Clone() - - for ln, lv := range s.externalLabels { - if _, ok := smpl.Metric[ln]; !ok { - snew.Metric[ln] = lv - } - } - snew.Metric = model.Metric( - relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...)) - - if snew.Metric == nil { - return nil - } - s.queue.Append(&snew) -======= for _, q := range s.queues { q.Append(smpl) } ->>>>>>> master return nil } From 8a8eb12985f6ae5cb43f023a07c438cad5b1e08e Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 7 Mar 2017 11:51:30 +0100 Subject: [PATCH 4/7] storage/tsdb: don't use partitioned DB. --- storage/tsdb/tsdb.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 519b63946..835125207 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -13,7 +13,7 @@ import ( // adapter implements a storage.Storage around TSDB. type adapter struct { - db *tsdb.PartitionedDB + db *tsdb.DB } // Options of the DB storage. @@ -38,7 +38,7 @@ type Options struct { // Open returns a new storage backed by a tsdb database. func Open(path string, r prometheus.Registerer, opts *Options) (storage.Storage, error) { - db, err := tsdb.OpenPartitioned(path, 1, nil, r, &tsdb.Options{ + db, err := tsdb.Open(path, nil, r, &tsdb.Options{ WALFlushInterval: 10 * time.Second, MinBlockDuration: uint64(opts.MinBlockDuration.Seconds() * 1000), MaxBlockDuration: uint64(opts.MaxBlockDuration.Seconds() * 1000), From b416ccb6503d21b48fa69c4e068cf162095628c1 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 7 Mar 2017 12:04:15 +0100 Subject: [PATCH 5/7] vendor: add old inluxdb client --- vendor/github.com/influxdb/influxdb/LICENSE | 20 + .../influxdb/influxdb/client/influxdb.go | 180 +++ .../influxdb/influxdb/tsdb/points.go | 1392 +++++++++++++++++ 3 files changed, 1592 insertions(+) create mode 100644 vendor/github.com/influxdb/influxdb/LICENSE create mode 100644 vendor/github.com/influxdb/influxdb/client/influxdb.go create mode 100644 vendor/github.com/influxdb/influxdb/tsdb/points.go diff --git a/vendor/github.com/influxdb/influxdb/LICENSE b/vendor/github.com/influxdb/influxdb/LICENSE new file mode 100644 index 000000000..d50222706 --- /dev/null +++ b/vendor/github.com/influxdb/influxdb/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013-2015 Errplane Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/influxdb/influxdb/client/influxdb.go b/vendor/github.com/influxdb/influxdb/client/influxdb.go new file mode 100644 index 000000000..235beb964 --- /dev/null +++ b/vendor/github.com/influxdb/influxdb/client/influxdb.go @@ -0,0 +1,180 @@ +package client + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "time" + + "github.com/influxdb/influxdb/tsdb" +) + +const ( + // DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance + DefaultTimeout = 0 +) + +// Config is used to specify what server to connect to. +// URL: The URL of the server connecting to. +// Username/Password are optional. They will be passed via basic auth if provided. +// UserAgent: If not provided, will default "InfluxDBClient", +// Timeout: If not provided, will default to 0 (no timeout) +type Config struct { + URL url.URL + Username string + Password string + UserAgent string + Timeout time.Duration + Precision string +} + +// NewConfig will create a config to be used in connecting to the client +func NewConfig() Config { + return Config{ + Timeout: DefaultTimeout, + } +} + +// Client is used to make calls to the server. +type Client struct { + url url.URL + username string + password string + httpClient *http.Client + userAgent string + precision string +} + +const ( + ConsistencyOne = "one" + ConsistencyAll = "all" + ConsistencyQuorum = "quorum" + ConsistencyAny = "any" +) + +// NewClient will instantiate and return a connected client to issue commands to the server. +func NewClient(c Config) (*Client, error) { + client := Client{ + url: c.URL, + username: c.Username, + password: c.Password, + httpClient: &http.Client{Timeout: c.Timeout}, + userAgent: c.UserAgent, + precision: c.Precision, + } + if client.userAgent == "" { + client.userAgent = "InfluxDBClient" + } + return &client, nil +} + +// Write takes BatchPoints and allows for writing of multiple points with defaults +// If successful, error is nil and Response is nil +// If an error occurs, Response may contain additional information if populated. +func (c *Client) Write(bp BatchPoints) (*Response, error) { + u := c.url + u.Path = "write" + + var b bytes.Buffer + for _, p := range bp.Points { + if p.Raw != "" { + if _, err := b.WriteString(p.Raw); err != nil { + return nil, err + } + } else { + for k, v := range bp.Tags { + if p.Tags == nil { + p.Tags = make(map[string]string, len(bp.Tags)) + } + p.Tags[k] = v + } + + if _, err := b.WriteString(p.MarshalString()); err != nil { + return nil, err + } + } + + if err := b.WriteByte('\n'); err != nil { + return nil, err + } + } + + req, err := http.NewRequest("POST", u.String(), &b) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.userAgent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + params := req.URL.Query() + params.Set("db", bp.Database) + params.Set("rp", bp.RetentionPolicy) + params.Set("precision", bp.Precision) + params.Set("consistency", bp.WriteConsistency) + req.URL.RawQuery = params.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var response Response + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + var err = fmt.Errorf(string(body)) + response.Err = err + return &response, err + } + + return nil, nil +} + +// Structs + +// Response represents a list of statement results. +type Response struct { + Err error +} + +// Point defines the fields that will be written to the database +// Measurement, Time, and Fields are required +// Precision can be specified if the time is in epoch format (integer). +// Valid values for Precision are n, u, ms, s, m, and h +type Point struct { + Measurement string + Tags map[string]string + Time time.Time + Fields map[string]interface{} + Precision string + Raw string +} + +func (p *Point) MarshalString() string { + return tsdb.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time).String() +} + +// BatchPoints is used to send batched data in a single write. +// Database and Points are required +// If no retention policy is specified, it will use the databases default retention policy. +// If tags are specified, they will be "merged" with all points. If a point already has that tag, it is ignored. +// If time is specified, it will be applied to any point with an empty time. +// Precision can be specified if the time is in epoch format (integer). +// Valid values for Precision are n, u, ms, s, m, and h +type BatchPoints struct { + Points []Point `json:"points,omitempty"` + Database string `json:"database,omitempty"` + RetentionPolicy string `json:"retentionPolicy,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Time time.Time `json:"time,omitempty"` + Precision string `json:"precision,omitempty"` + WriteConsistency string `json:"-"` +} diff --git a/vendor/github.com/influxdb/influxdb/tsdb/points.go b/vendor/github.com/influxdb/influxdb/tsdb/points.go new file mode 100644 index 000000000..dd8dbb644 --- /dev/null +++ b/vendor/github.com/influxdb/influxdb/tsdb/points.go @@ -0,0 +1,1392 @@ +package tsdb + +import ( + "bytes" + "fmt" + "hash/fnv" + "regexp" + "sort" + "strconv" + "strings" + "time" +) + +// Point defines the values that will be written to the database +type Point interface { + Name() string + SetName(string) + + Tags() Tags + AddTag(key, value string) + SetTags(tags Tags) + + Fields() Fields + AddField(name string, value interface{}) + + Time() time.Time + SetTime(t time.Time) + UnixNano() int64 + + HashID() uint64 + Key() []byte + + Data() []byte + SetData(buf []byte) + + String() string +} + +// Points represents a sortable list of points by timestamp. +type Points []Point + +func (a Points) Len() int { return len(a) } +func (a Points) Less(i, j int) bool { return a[i].Time().Before(a[j].Time()) } +func (a Points) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// point is the default implementation of Point. +type point struct { + time time.Time + + // text encoding of measurement and tags + // key must always be stored sorted by tags, if the original line was not sorted, + // we need to resort it + key []byte + + // text encoding of field data + fields []byte + + // text encoding of timestamp + ts []byte + + // binary encoded field data + data []byte + + // cached version of parsed fields from data + cachedFields map[string]interface{} + + // cached version of parsed name from key + cachedName string +} + +const ( + // the number of characters for the largest possible int64 (9223372036854775807) + maxInt64Digits = 19 + + // the number of characters for the smallest possible int64 (-9223372036854775808) + minInt64Digits = 20 + + // the number of characters required for the largest float64 before a range check + // would occur during parsing + maxFloat64Digits = 25 + + // the number of characters required for smallest float64 before a range check occur + // would occur during parsing + minFloat64Digits = 27 +) + +var ( + // Compile the regex that detects unquoted double quote sequences + quoteReplacer = regexp.MustCompile(`([^\\])"`) + + escapeCodes = map[byte][]byte{ + ',': []byte(`\,`), + '"': []byte(`\"`), + ' ': []byte(`\ `), + '=': []byte(`\=`), + } + + escapeCodesStr = map[string]string{} + + measurementEscapeCodes = map[byte][]byte{ + ',': []byte(`\,`), + ' ': []byte(`\ `), + } + + tagEscapeCodes = map[byte][]byte{ + ',': []byte(`\,`), + ' ': []byte(`\ `), + '=': []byte(`\=`), + } +) + +func init() { + for k, v := range escapeCodes { + escapeCodesStr[string(k)] = string(v) + } +} + +func ParsePointsString(buf string) ([]Point, error) { + return ParsePoints([]byte(buf)) +} + +// ParsePoints returns a slice of Points from a text representation of a point +// with each point separated by newlines. +func ParsePoints(buf []byte) ([]Point, error) { + return ParsePointsWithPrecision(buf, time.Now().UTC(), "n") +} + +func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) { + points := []Point{} + var ( + pos int + block []byte + ) + for { + pos, block = scanLine(buf, pos) + pos += 1 + + if len(block) == 0 { + break + } + + // lines which start with '#' are comments + start := skipWhitespace(block, 0) + + // If line is all whitespace, just skip it + if start >= len(block) { + continue + } + + if block[start] == '#' { + continue + } + + // strip the newline if one is present + if block[len(block)-1] == '\n' { + block = block[:len(block)-1] + } + + pt, err := parsePoint(block[start:len(block)], defaultTime, precision) + if err != nil { + return nil, fmt.Errorf("unable to parse '%s': %v", string(block[start:len(block)]), err) + } + points = append(points, pt) + + if pos >= len(buf) { + break + } + + } + return points, nil + +} + +func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) { + // scan the first block which is measurement[,tag1=value1,tag2=value=2...] + pos, key, err := scanKey(buf, 0) + if err != nil { + return nil, err + } + + // measurement name is required + if len(key) == 0 { + return nil, fmt.Errorf("missing measurement") + } + + // scan the second block is which is field1=value1[,field2=value2,...] + pos, fields, err := scanFields(buf, pos) + if err != nil { + return nil, err + } + + // at least one field is required + if len(fields) == 0 { + return nil, fmt.Errorf("missing fields") + } + + // scan the last block which is an optional integer timestamp + pos, ts, err := scanTime(buf, pos) + + if err != nil { + return nil, err + } + + pt := &point{ + key: key, + fields: fields, + ts: ts, + } + + if len(ts) == 0 { + pt.time = defaultTime + pt.SetPrecision(precision) + } else { + ts, err := strconv.ParseInt(string(ts), 10, 64) + if err != nil { + return nil, err + } + pt.time = time.Unix(0, ts*pt.GetPrecisionMultiplier(precision)) + } + return pt, nil +} + +// scanKey scans buf starting at i for the measurement and tag portion of the point. +// It returns the ending position and the byte slice of key within buf. If there +// are tags, they will be sorted if they are not already. +func scanKey(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + + i = start + + // Determines whether the tags are sort, assume they are + sorted := true + + // indices holds the indexes within buf of the start of each tag. For example, + // a buf of 'cpu,host=a,region=b,zone=c' would have indices slice of [4,11,20] + // which indicates that the first tag starts at buf[4], seconds at buf[11], and + // last at buf[20] + indices := make([]int, 100) + + // tracks how many commas we've seen so we know how many values are indices. + // Since indices is an arbitrarily large slice, + // we need to know how many values in the buffer are in use. + commas := 0 + + // tracks whether we've see an '=' + equals := 0 + + // loop over each byte in buf + for { + // reached the end of buf? + if i >= len(buf) { + if equals == 0 && commas > 0 { + return i, buf[start:i], fmt.Errorf("missing tag value") + } + + break + } + + // equals is special in the tags section. It must be escaped if part of a tag name or value. + // It does not need to be escaped if part of the measurement. + if buf[i] == '=' && commas > 0 { + if i-1 < 0 || i-2 < 0 { + return i, buf[start:i], fmt.Errorf("missing tag name") + } + + // Check for "cpu,=value" but allow "cpu,a\,=value" + if buf[i-1] == ',' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing tag name") + } + + // Check for "cpu,\ =value" + if buf[i-1] == ' ' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing tag name") + } + + i += 1 + equals += 1 + + // Check for "cpu,a=1,b= value=1" + if i < len(buf) && buf[i] == ' ' { + return i, buf[start:i], fmt.Errorf("missing tag value") + } + continue + } + + // escaped character + if buf[i] == '\\' { + i += 2 + continue + } + + // At a tag separator (comma), track it's location + if buf[i] == ',' { + if equals == 0 && commas > 0 { + return i, buf[start:i], fmt.Errorf("missing tag value") + } + i += 1 + + // grow our indices slice if we have too many tags + if commas >= len(indices) { + newIndics := make([]int, cap(indices)*2) + copy(newIndics, indices) + indices = newIndics + } + indices[commas] = i + commas += 1 + + // Check for "cpu, value=1" + if i < len(buf) && buf[i] == ' ' { + return i, buf[start:i], fmt.Errorf("missing tag key") + } + continue + } + + // reached end of the block? (next block would be fields) + if buf[i] == ' ' { + // check for "cpu,tag value=1" + if equals == 0 && commas > 0 { + return i, buf[start:i], fmt.Errorf("missing tag value") + } + if equals > 0 && commas-1 != equals-1 { + return i, buf[start:i], fmt.Errorf("missing tag value") + } + + // grow our indices slice if we have too many tags + if commas >= len(indices) { + newIndics := make([]int, cap(indices)*2) + copy(newIndics, indices) + indices = newIndics + } + + indices[commas] = i + 1 + break + } + + i += 1 + } + + // check that all field sections had key and values (e.g. prevent "a=1,b" + // We're using commas -1 because there should always be a comma after measurement + if equals > 0 && commas-1 != equals-1 { + return i, buf[start:i], fmt.Errorf("invalid tag format") + } + + // This check makes sure we actually received fields from the user. #3379 + // This will catch invalid syntax such as: `cpu,host=serverA,region=us-west` + if i >= len(buf) { + return i, buf[start:i], fmt.Errorf("missing fields") + } + + // Now we know where the key region is within buf, and the locations of tags, we + // need to deterimine if duplicate tags exist and if the tags are sorted. This iterates + // 1/2 of the list comparing each end with each other, walking towards the center from + // both sides. + for j := 0; j < commas/2; j++ { + // get the left and right tags + _, left := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=') + _, right := scanTo(buf[indices[commas-j-1]:indices[commas-j]-1], 0, '=') + + // If the tags are equal, then there are duplicate tags, and we should abort + if bytes.Equal(left, right) { + return i, buf[start:i], fmt.Errorf("duplicate tags") + } + + // If left is greater than right, the tags are not sorted. We must continue + // since their could be duplicate tags still. + if bytes.Compare(left, right) > 0 { + sorted = false + } + } + + // If the tags are not sorted, then sort them. This sort is inline and + // uses the tag indices we created earlier. The actual buffer is not sorted, the + // indices are using the buffer for value comparison. After the indices are sorted, + // the buffer is reconstructed from the sorted indices. + if !sorted && commas > 0 { + // Get the measurement name for later + measurement := buf[start : indices[0]-1] + + // Sort the indices + indices := indices[:commas] + insertionSort(0, commas, buf, indices) + + // Create a new key using the measurement and sorted indices + b := make([]byte, len(buf[start:i])) + pos := copy(b, measurement) + for _, i := range indices { + b[pos] = ',' + pos += 1 + _, v := scanToSpaceOr(buf, i, ',') + pos += copy(b[pos:], v) + } + + return i, b, nil + } + + return i, buf[start:i], nil +} + +func insertionSort(l, r int, buf []byte, indices []int) { + for i := l + 1; i < r; i++ { + for j := i; j > l && less(buf, indices, j, j-1); j-- { + indices[j], indices[j-1] = indices[j-1], indices[j] + } + } +} + +func less(buf []byte, indices []int, i, j int) bool { + // This grabs the tag names for i & j, it ignores the values + _, a := scanTo(buf, indices[i], '=') + _, b := scanTo(buf, indices[j], '=') + return bytes.Compare(a, b) < 0 +} + +func isFieldEscapeChar(b byte) bool { + for c := range escapeCodes { + if c == b { + return true + } + } + return false +} + +// scanFields scans buf, starting at i for the fields section of a point. It returns +// the ending position and the byte slice of the fields within buf +func scanFields(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + quoted := false + + // tracks how many '=' we've seen + equals := 0 + + // tracks how many commas we've seen + commas := 0 + + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // escaped characters? + if buf[i] == '\\' && i+1 < len(buf) { + + // Is this an escape char within a string field? Only " and \ are allowed. + if quoted && (buf[i+1] == '"' || buf[i+1] == '\\') { + i += 2 + continue + // Non-string field escaped chars + } else if !quoted && isFieldEscapeChar(buf[i+1]) { + i += 2 + continue + } + } + + // If the value is quoted, scan until we get to the end quote + if buf[i] == '"' { + quoted = !quoted + i += 1 + continue + } + + // If we see an =, ensure that there is at least on char before and after it + if buf[i] == '=' && !quoted { + equals += 1 + + // check for "... =123" but allow "a\ =123" + if buf[i-1] == ' ' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing field name") + } + + // check for "...a=123,=456" but allow "a=123,a\,=456" + if buf[i-1] == ',' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing field name") + } + + // check for "... value=" + if i+1 >= len(buf) { + return i, buf[start:i], fmt.Errorf("missing field value") + } + + // check for "... value=,value2=..." + if buf[i+1] == ',' || buf[i+1] == ' ' { + return i, buf[start:i], fmt.Errorf("missing field value") + } + + if isNumeric(buf[i+1]) || buf[i+1] == '-' || buf[i+1] == 'N' || buf[i+1] == 'n' { + var err error + i, err = scanNumber(buf, i+1) + if err != nil { + return i, buf[start:i], err + } + continue + } + // If next byte is not a double-quote, the value must be a boolean + if buf[i+1] != '"' { + var err error + i, _, err = scanBoolean(buf, i+1) + if err != nil { + return i, buf[start:i], err + } + continue + } + } + + if buf[i] == ',' && !quoted { + commas += 1 + } + + // reached end of block? + if buf[i] == ' ' && !quoted { + break + } + i += 1 + } + + if quoted { + return i, buf[start:i], fmt.Errorf("unbalanced quotes") + } + + // check that all field sections had key and values (e.g. prevent "a=1,b" + if equals == 0 || commas != equals-1 { + return i, buf[start:i], fmt.Errorf("invalid field format") + } + + return i, buf[start:i], nil +} + +// scanTime scans buf, starting at i for the time section of a point. It returns +// the ending position and the byte slice of the fields within buf and error if the +// timestamp is not in the correct numeric format +func scanTime(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // Timestamps should integers, make sure they are so we don't need to actually + // parse the timestamp until needed + if buf[i] < '0' || buf[i] > '9' { + return i, buf[start:i], fmt.Errorf("bad timestamp") + } + + // reached end of block? + if buf[i] == '\n' { + break + } + i += 1 + } + return i, buf[start:i], nil +} + +func isNumeric(b byte) bool { + return (b >= '0' && b <= '9') || b == '.' +} + +// scanNumber returns the end position within buf, start at i after +// scanning over buf for an integer, or float. It returns an +// error if a invalid number is scanned. +func scanNumber(buf []byte, i int) (int, error) { + start := i + var isInt bool + + // Is negative number? + if i < len(buf) && buf[i] == '-' { + i += 1 + } + + // how many decimal points we've see + decimals := 0 + + // indicates the number is float in scientific notation + scientific := false + + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' || buf[i] == ' ' { + break + } + + if buf[i] == 'i' && i > start && !isInt { + isInt = true + i += 1 + continue + } + + if buf[i] == '.' { + decimals += 1 + } + + // Can't have more than 1 decimal (e.g. 1.1.1 should fail) + if decimals > 1 { + return i, fmt.Errorf("invalid number") + } + + // `e` is valid for floats but not as the first char + if i > start && (buf[i] == 'e') { + scientific = true + i += 1 + continue + } + + // + and - are only valid at this point if they follow an e (scientific notation) + if (buf[i] == '+' || buf[i] == '-') && buf[i-1] == 'e' { + i += 1 + continue + } + + // NaN is a valid float + if i+2 < len(buf) && (buf[i] == 'N' || buf[i] == 'n') { + if (buf[i+1] == 'a' || buf[i+1] == 'A') && (buf[i+2] == 'N' || buf[i+2] == 'n') { + i += 3 + continue + } + return i, fmt.Errorf("invalid number") + } + if !isNumeric(buf[i]) { + return i, fmt.Errorf("invalid number") + } + i += 1 + } + if isInt && (decimals > 0 || scientific) { + return i, fmt.Errorf("invalid number") + } + + // It's more common that numbers will be within min/max range for their type but we need to prevent + // out or range numbers from being parsed successfully. This uses some simple heuristics to decide + // if we should parse the number to the actual type. It does not do it all the time because it incurs + // extra allocations and we end up converting the type again when writing points to disk. + if isInt { + // Make sure the last char is an 'i' for integers (e.g. 9i10 is not valid) + if buf[i-1] != 'i' { + return i, fmt.Errorf("invalid number") + } + // Parse the int to check bounds the number of digits could be larger than the max range + // We subtract 1 from the index to remove the `i` from our tests + if len(buf[start:i-1]) >= maxInt64Digits || len(buf[start:i-1]) >= minInt64Digits { + if _, err := strconv.ParseInt(string(buf[start:i-1]), 10, 64); err != nil { + return i, fmt.Errorf("unable to parse integer %s: %s", buf[start:i-1], err) + } + } + } else { + // Parse the float to check bounds if it's scientific or the number of digits could be larger than the max range + if scientific || len(buf[start:i]) >= maxFloat64Digits || len(buf[start:i]) >= minFloat64Digits { + if _, err := strconv.ParseFloat(string(buf[start:i]), 10); err != nil { + return i, fmt.Errorf("invalid float") + } + } + } + + return i, nil +} + +// scanBoolean returns the end position within buf, start at i after +// scanning over buf for boolean. Valid values for a boolean are +// t, T, true, TRUE, f, F, false, FALSE. It returns an error if a invalid boolean +// is scanned. +func scanBoolean(buf []byte, i int) (int, []byte, error) { + start := i + + if i < len(buf) && (buf[i] != 't' && buf[i] != 'f' && buf[i] != 'T' && buf[i] != 'F') { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + i += 1 + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' || buf[i] == ' ' { + break + } + i += 1 + } + + // Single char bool (t, T, f, F) is ok + if i-start == 1 { + return i, buf[start:i], nil + } + + // length must be 4 for true or TRUE + if (buf[start] == 't' || buf[start] == 'T') && i-start != 4 { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + // length must be 5 for false or FALSE + if (buf[start] == 'f' || buf[start] == 'F') && i-start != 5 { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + // Otherwise + valid := false + switch buf[start] { + case 't': + valid = bytes.Equal(buf[start:i], []byte("true")) + case 'f': + valid = bytes.Equal(buf[start:i], []byte("false")) + case 'T': + valid = bytes.Equal(buf[start:i], []byte("TRUE")) || bytes.Equal(buf[start:i], []byte("True")) + case 'F': + valid = bytes.Equal(buf[start:i], []byte("FALSE")) || bytes.Equal(buf[start:i], []byte("False")) + } + + if !valid { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + return i, buf[start:i], nil + +} + +// skipWhitespace returns the end position within buf, starting at i after +// scanning over spaces in tags +func skipWhitespace(buf []byte, i int) int { + for { + if i >= len(buf) { + return i + } + + if buf[i] == ' ' || buf[i] == '\t' { + i += 1 + continue + } + break + } + return i +} + +// scanLine returns the end position in buf and the next line found within +// buf. +func scanLine(buf []byte, i int) (int, []byte) { + start := i + quoted := false + fields := false + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + if buf[i] == ' ' { + fields = true + } + + // If we see a double quote, makes sure it is not escaped + if fields && buf[i] == '"' && (i-1 > 0 && buf[i-1] != '\\') { + i += 1 + quoted = !quoted + continue + } + + if buf[i] == '\n' && !quoted { + break + } + + i += 1 + } + + return i, buf[start:i] +} + +// scanTo returns the end position in buf and the next consecutive block +// of bytes, starting from i and ending with stop byte. If there are leading +// spaces or escaped chars, they are skipped. +func scanTo(buf []byte, i int, stop byte) (int, []byte) { + start := i + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + if buf[i] == '\\' { + i += 2 + continue + } + + // reached end of block? + if buf[i] == stop { + break + } + i += 1 + } + + return i, buf[start:i] +} + +// scanTo returns the end position in buf and the next consecutive block +// of bytes, starting from i and ending with stop byte. If there are leading +// spaces, they are skipped. +func scanToSpaceOr(buf []byte, i int, stop byte) (int, []byte) { + start := i + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + if buf[i] == '\\' { + i += 2 + continue + } + // reached end of block? + if buf[i] == stop || buf[i] == ' ' { + break + } + i += 1 + } + + return i, buf[start:i] +} + +func scanTagValue(buf []byte, i int) (int, []byte) { + start := i + for { + if i >= len(buf) { + break + } + + if buf[i] == '\\' { + i += 2 + continue + } + + if buf[i] == ',' { + break + } + i += 1 + } + return i, buf[start:i] +} + +func scanFieldValue(buf []byte, i int) (int, []byte) { + start := i + quoted := false + for { + if i >= len(buf) { + break + } + + // Only escape char for a field value is a double-quote + if buf[i] == '\\' && i+1 < len(buf) && buf[i+1] == '"' { + i += 2 + continue + } + + // Quoted value? (e.g. string) + if buf[i] == '"' { + i += 1 + quoted = !quoted + continue + } + + if buf[i] == ',' && !quoted { + break + } + i += 1 + } + return i, buf[start:i] +} + +func escapeMeasurement(in []byte) []byte { + for b, esc := range measurementEscapeCodes { + in = bytes.Replace(in, []byte{b}, esc, -1) + } + return in +} + +func unescapeMeasurement(in []byte) []byte { + for b, esc := range measurementEscapeCodes { + in = bytes.Replace(in, esc, []byte{b}, -1) + } + return in +} + +func escapeTag(in []byte) []byte { + for b, esc := range tagEscapeCodes { + in = bytes.Replace(in, []byte{b}, esc, -1) + } + return in +} + +func unescapeTag(in []byte) []byte { + for b, esc := range tagEscapeCodes { + in = bytes.Replace(in, esc, []byte{b}, -1) + } + return in +} + +func escape(in []byte) []byte { + for b, esc := range escapeCodes { + in = bytes.Replace(in, []byte{b}, esc, -1) + } + return in +} + +func escapeString(in string) string { + for b, esc := range escapeCodesStr { + in = strings.Replace(in, b, esc, -1) + } + return in +} + +func unescape(in []byte) []byte { + i := 0 + inLen := len(in) + var out []byte + + for { + if i >= inLen { + break + } + if in[i] == '\\' && i+1 < inLen { + switch in[i+1] { + case ',': + out = append(out, ',') + i += 2 + continue + case '"': + out = append(out, '"') + i += 2 + continue + case ' ': + out = append(out, ' ') + i += 2 + continue + case '=': + out = append(out, '=') + i += 2 + continue + } + } + out = append(out, in[i]) + i += 1 + } + return out +} + +func unescapeString(in string) string { + for b, esc := range escapeCodesStr { + in = strings.Replace(in, esc, b, -1) + } + return in +} + +// escapeStringField returns a copy of in with any double quotes or +// backslashes with escaped values +func escapeStringField(in string) string { + var out []byte + i := 0 + for { + if i >= len(in) { + break + } + // escape double-quotes + if in[i] == '\\' { + out = append(out, '\\') + out = append(out, '\\') + i += 1 + continue + } + // escape double-quotes + if in[i] == '"' { + out = append(out, '\\') + out = append(out, '"') + i += 1 + continue + } + out = append(out, in[i]) + i += 1 + + } + return string(out) +} + +// unescapeStringField returns a copy of in with any escaped double-quotes +// or backslashes unescaped +func unescapeStringField(in string) string { + var out []byte + i := 0 + for { + if i >= len(in) { + break + } + // unescape backslashes + if in[i] == '\\' && i+1 < len(in) && in[i+1] == '\\' { + out = append(out, '\\') + i += 2 + continue + } + // unescape double-quotes + if in[i] == '\\' && i+1 < len(in) && in[i+1] == '"' { + out = append(out, '"') + i += 2 + continue + } + out = append(out, in[i]) + i += 1 + + } + return string(out) +} + +// NewPoint returns a new point with the given measurement name, tags, fields and timestamp +func NewPoint(name string, tags Tags, fields Fields, time time.Time) Point { + return &point{ + key: MakeKey([]byte(name), tags), + time: time, + fields: fields.MarshalBinary(), + } +} + +func (p *point) Data() []byte { + return p.data +} + +func (p *point) SetData(b []byte) { + p.data = b +} + +func (p *point) Key() []byte { + return p.key +} + +func (p *point) name() []byte { + _, name := scanTo(p.key, 0, ',') + return name +} + +// Name return the measurement name for the point +func (p *point) Name() string { + if p.cachedName != "" { + return p.cachedName + } + p.cachedName = string(unescape(p.name())) + return p.cachedName +} + +// SetName updates the measurement name for the point +func (p *point) SetName(name string) { + p.cachedName = "" + p.key = MakeKey([]byte(name), p.Tags()) +} + +// Time return the timestamp for the point +func (p *point) Time() time.Time { + return p.time +} + +// SetTime updates the timestamp for the point +func (p *point) SetTime(t time.Time) { + p.time = t +} + +// Tags returns the tag set for the point +func (p *point) Tags() Tags { + tags := map[string]string{} + + if len(p.key) != 0 { + pos, name := scanTo(p.key, 0, ',') + + // it's an empyt key, so there are no tags + if len(name) == 0 { + return tags + } + + i := pos + 1 + var key, value []byte + for { + if i >= len(p.key) { + break + } + i, key = scanTo(p.key, i, '=') + i, value = scanTagValue(p.key, i+1) + + tags[string(unescapeTag(key))] = string(unescapeTag(value)) + + i += 1 + } + } + return tags +} + +func MakeKey(name []byte, tags Tags) []byte { + // unescape the name and then re-escape it to avoid double escaping. + // The key should always be stored in escaped form. + return append(escapeMeasurement(unescapeMeasurement(name)), tags.HashKey()...) +} + +// SetTags replaces the tags for the point +func (p *point) SetTags(tags Tags) { + p.key = MakeKey([]byte(p.Name()), tags) +} + +// AddTag adds or replaces a tag value for a point +func (p *point) AddTag(key, value string) { + tags := p.Tags() + tags[key] = value + p.key = MakeKey([]byte(p.Name()), tags) +} + +// Fields returns the fields for the point +func (p *point) Fields() Fields { + if p.cachedFields != nil { + return p.cachedFields + } + p.cachedFields = p.unmarshalBinary() + return p.cachedFields +} + +// AddField adds or replaces a field value for a point +func (p *point) AddField(name string, value interface{}) { + fields := p.Fields() + fields[name] = value + p.fields = fields.MarshalBinary() + p.cachedFields = nil +} + +// SetPrecision will round a time to the specified precision +func (p *point) SetPrecision(precision string) { + switch precision { + case "n": + case "u": + p.SetTime(p.Time().Truncate(time.Microsecond)) + case "ms": + p.SetTime(p.Time().Truncate(time.Millisecond)) + case "s": + p.SetTime(p.Time().Truncate(time.Second)) + case "m": + p.SetTime(p.Time().Truncate(time.Minute)) + case "h": + p.SetTime(p.Time().Truncate(time.Hour)) + } +} + +// GetPrecisionMultiplier will return a multiplier for the precision specified +func (p *point) GetPrecisionMultiplier(precision string) int64 { + d := time.Nanosecond + switch precision { + case "u": + d = time.Microsecond + case "ms": + d = time.Millisecond + case "s": + d = time.Second + case "m": + d = time.Minute + case "h": + d = time.Hour + } + return int64(d) +} + +func (p *point) String() string { + if p.Time().IsZero() { + return fmt.Sprintf("%s %s", p.Key(), string(p.fields)) + } + return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields), p.UnixNano()) +} + +func (p *point) unmarshalBinary() Fields { + return newFieldsFromBinary(p.fields) +} + +func (p *point) HashID() uint64 { + h := fnv.New64a() + h.Write(p.key) + sum := h.Sum64() + return sum +} + +func (p *point) UnixNano() int64 { + return p.Time().UnixNano() +} + +type Tags map[string]string + +func (t Tags) HashKey() []byte { + // Empty maps marshal to empty bytes. + if len(t) == 0 { + return nil + } + + escaped := Tags{} + for k, v := range t { + ek := escapeTag([]byte(k)) + ev := escapeTag([]byte(v)) + escaped[string(ek)] = string(ev) + } + + // Extract keys and determine final size. + sz := len(escaped) + (len(escaped) * 2) // separators + keys := make([]string, len(escaped)+1) + i := 0 + for k, v := range escaped { + keys[i] = k + i += 1 + sz += len(k) + len(v) + } + keys = keys[:i] + sort.Strings(keys) + // Generate marshaled bytes. + b := make([]byte, sz) + buf := b + idx := 0 + for _, k := range keys { + buf[idx] = ',' + idx += 1 + copy(buf[idx:idx+len(k)], k) + idx += len(k) + buf[idx] = '=' + idx += 1 + v := escaped[k] + copy(buf[idx:idx+len(v)], v) + idx += len(v) + } + return b[:idx] +} + +type Fields map[string]interface{} + +func parseNumber(val []byte) (interface{}, error) { + if val[len(val)-1] == 'i' { + val = val[:len(val)-1] + return strconv.ParseInt(string(val), 10, 64) + } + for i := 0; i < len(val); i++ { + // If there is a decimal or an N (NaN), I (Inf), parse as float + if val[i] == '.' || val[i] == 'N' || val[i] == 'n' || val[i] == 'I' || val[i] == 'i' || val[i] == 'e' { + return strconv.ParseFloat(string(val), 64) + } + if val[i] < '0' && val[i] > '9' { + return string(val), nil + } + } + return strconv.ParseFloat(string(val), 64) +} + +func newFieldsFromBinary(buf []byte) Fields { + fields := Fields{} + var ( + i int + name, valueBuf []byte + value interface{} + err error + ) + for { + if i >= len(buf) { + break + } + + i, name = scanTo(buf, i, '=') + if len(name) == 0 { + continue + } + name = unescape(name) + + i, valueBuf = scanFieldValue(buf, i+1) + if len(valueBuf) == 0 { + fields[string(name)] = nil + continue + } + + // If the first char is a double-quote, then unmarshal as string + if valueBuf[0] == '"' { + value = unescapeStringField(string(valueBuf[1 : len(valueBuf)-1])) + // Check for numeric characters and special NaN or Inf + } else if (valueBuf[0] >= '0' && valueBuf[0] <= '9') || valueBuf[0] == '-' || valueBuf[0] == '+' || valueBuf[0] == '.' || + valueBuf[0] == 'N' || valueBuf[0] == 'n' || // NaN + valueBuf[0] == 'I' || valueBuf[0] == 'i' { // Inf + + value, err = parseNumber(valueBuf) + if err != nil { + panic(fmt.Sprintf("unable to parse number value '%v': %v", string(valueBuf), err)) + } + + // Otherwise parse it as bool + } else { + value, err = strconv.ParseBool(string(valueBuf)) + if err != nil { + panic(fmt.Sprintf("unable to parse bool value '%v': %v\n", string(valueBuf), err)) + } + } + fields[string(name)] = value + i += 1 + } + return fields +} + +// MarshalBinary encodes all the fields to their proper type and returns the binary +// represenation +// NOTE: uint64 is specifically not supported due to potential overflow when we decode +// again later to an int64 +func (p Fields) MarshalBinary() []byte { + b := []byte{} + keys := make([]string, len(p)) + i := 0 + for k, _ := range p { + keys[i] = k + i += 1 + } + sort.Strings(keys) + + for _, k := range keys { + v := p[k] + b = append(b, []byte(escapeString(k))...) + b = append(b, '=') + switch t := v.(type) { + case int: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case int8: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case int16: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case int32: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case int64: + b = append(b, []byte(strconv.FormatInt(t, 10))...) + b = append(b, 'i') + case uint: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case uint8: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case uint16: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case uint32: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case float32: + val := []byte(strconv.FormatFloat(float64(t), 'f', -1, 32)) + b = append(b, val...) + case float64: + val := []byte(strconv.FormatFloat(t, 'f', -1, 64)) + b = append(b, val...) + case bool: + b = append(b, []byte(strconv.FormatBool(t))...) + case []byte: + b = append(b, t...) + case string: + b = append(b, '"') + b = append(b, []byte(escapeStringField(t))...) + b = append(b, '"') + case nil: + // skip + default: + // Can't determine the type, so convert to string + b = append(b, '"') + b = append(b, []byte(escapeStringField(fmt.Sprintf("%v", v)))...) + b = append(b, '"') + + } + b = append(b, ',') + } + if len(b) > 0 { + return b[0 : len(b)-1] + } + return b +} + +type indexedSlice struct { + indices []int + b []byte +} + +func (s *indexedSlice) Less(i, j int) bool { + _, a := scanTo(s.b, s.indices[i], '=') + _, b := scanTo(s.b, s.indices[j], '=') + return bytes.Compare(a, b) < 0 +} + +func (s *indexedSlice) Swap(i, j int) { + s.indices[i], s.indices[j] = s.indices[j], s.indices[i] +} + +func (s *indexedSlice) Len() int { + return len(s.indices) +} From 5ec1efe622d138887811caeb830e5c786ae95510 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 8 Mar 2017 15:37:12 +0100 Subject: [PATCH 6/7] retrieval: fix test --- retrieval/targetmanager_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 7330d4649..614005948 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -141,7 +141,8 @@ func TestPopulateLabels(t *testing.T) { }, } for i, c := range cases { - in := c.in.Clone() + in := c.in.Copy() + res, orig, err := populateLabels(c.in, c.cfg) if err != nil { t.Fatalf("case %d: %s", i, err) From f160c4eb20313f6f226f96a18e1a1d5254077c8f Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 8 Mar 2017 16:54:48 +0100 Subject: [PATCH 7/7] vendor: integrate various tsdb fixes --- vendor/github.com/fabxc/tsdb/compact.go | 2 + vendor/github.com/fabxc/tsdb/db.go | 168 +------ vendor/github.com/fabxc/tsdb/db_amd64.go | 10 - vendor/github.com/fabxc/tsdb/head.go | 16 +- vendor/github.com/fabxc/tsdb/querier.go | 241 ++++----- vendor/github.com/fabxc/tsdb/reader.go | 459 ----------------- vendor/github.com/fabxc/tsdb/wal.go | 6 +- vendor/github.com/fabxc/tsdb/writer.go | 611 ----------------------- vendor/vendor.json | 6 +- 9 files changed, 143 insertions(+), 1376 deletions(-) delete mode 100644 vendor/github.com/fabxc/tsdb/db_amd64.go delete mode 100644 vendor/github.com/fabxc/tsdb/reader.go delete mode 100644 vendor/github.com/fabxc/tsdb/writer.go diff --git a/vendor/github.com/fabxc/tsdb/compact.go b/vendor/github.com/fabxc/tsdb/compact.go index 51d4ce0d1..573e9b440 100644 --- a/vendor/github.com/fabxc/tsdb/compact.go +++ b/vendor/github.com/fabxc/tsdb/compact.go @@ -163,6 +163,8 @@ func (c *compactor) Compact(dirs ...string) (err error) { if err != nil { return err } + defer b.Close() + blocks = append(blocks, b) } diff --git a/vendor/github.com/fabxc/tsdb/db.go b/vendor/github.com/fabxc/tsdb/db.go index b1a3a2421..bf8600fc4 100644 --- a/vendor/github.com/fabxc/tsdb/db.go +++ b/vendor/github.com/fabxc/tsdb/db.go @@ -6,10 +6,8 @@ import ( "fmt" "io" "io/ioutil" - "math" "os" "path/filepath" - "reflect" "strconv" "strings" "sync" @@ -334,6 +332,9 @@ func (db *DB) reloadBlocks() error { db.mtx.Lock() defer db.mtx.Unlock() + db.headmtx.Lock() + defer db.headmtx.Unlock() + dirs, err := blockDirs(db.dir) if err != nil { return errors.Wrap(err, "find blocks") @@ -355,17 +356,20 @@ func (db *DB) reloadBlocks() error { for i, meta := range metas { b, ok := db.seqBlocks[meta.Sequence] - if !ok { - return errors.Errorf("missing block for sequence %d", meta.Sequence) - } if meta.Compaction.Generation == 0 { + if !ok { + b, err = openHeadBlock(dirs[i], db.logger) + if err != nil { + return errors.Wrapf(err, "load head at %s", dirs[i]) + } + } if meta.ULID != b.Meta().ULID { return errors.Errorf("head block ULID changed unexpectedly") } heads = append(heads, b.(*headBlock)) } else { - if meta.ULID != b.Meta().ULID { + if ok && meta.ULID != b.Meta().ULID { if err := b.Close(); err != nil { return err } @@ -404,15 +408,18 @@ func (db *DB) Close() error { // the block to be used afterwards. db.mtx.Lock() - var merr MultiError + var g errgroup.Group for _, pb := range db.persisted { - merr.Add(pb.Close()) + g.Go(pb.Close) } for _, hb := range db.heads { - merr.Add(hb.Close()) + g.Go(hb.Close) } + var merr MultiError + + merr.Add(g.Wait()) merr.Add(db.lockf.Unlock()) return merr.Err() @@ -453,19 +460,6 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) return ref | (uint64(h.generation) << 40), nil } -func (a *dbAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) { - h, err := a.appenderFor(t) - if err != nil { - return 0, err - } - ref, err := h.hashedAdd(hash, lset, t, v) - if err != nil { - return 0, err - } - a.samples++ - return ref | (uint64(h.generation) << 40), nil -} - func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { // We store the head generation in the 4th byte and use it to reject // stale references. @@ -523,10 +517,9 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { return nil, ErrNotFound } +// ensureHead makes sure that there is a head block for the timestamp t if +// it is within or after the currently appendable window. func (db *DB) ensureHead(t int64) error { - // db.mtx.Lock() - // defer db.mtx.Unlock() - // Initial case for a new database: we must create the first // AppendableBlocks-1 front padding heads. if len(db.heads) == 0 { @@ -717,123 +710,6 @@ func nextSequenceFile(dir, prefix string) (string, int, error) { return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil } -// PartitionedDB is a time series storage. -type PartitionedDB struct { - logger log.Logger - dir string - - partitionPow uint - Partitions []*DB -} - -func isPowTwo(x int) bool { - return x > 0 && (x&(x-1)) == 0 -} - -// OpenPartitioned or create a new DB. -func OpenPartitioned(dir string, n int, l log.Logger, r prometheus.Registerer, opts *Options) (*PartitionedDB, error) { - if !isPowTwo(n) { - return nil, errors.Errorf("%d is not a power of two", n) - } - if opts == nil { - opts = DefaultOptions - } - if l == nil { - l = log.NewLogfmtLogger(os.Stdout) - l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) - } - - if err := os.MkdirAll(dir, 0777); err != nil { - return nil, err - } - c := &PartitionedDB{ - logger: l, - dir: dir, - partitionPow: uint(math.Log2(float64(n))), - } - - // Initialize vertical partitiondb. - // TODO(fabxc): validate partition number to be power of 2, which is required - // for the bitshift-modulo when finding the right partition. - for i := 0; i < n; i++ { - l := log.NewContext(l).With("partition", i) - d := partitionDir(dir, i) - - s, err := Open(d, l, r, opts) - if err != nil { - return nil, fmt.Errorf("initializing partition %q failed: %s", d, err) - } - - c.Partitions = append(c.Partitions, s) - } - - return c, nil -} - -func partitionDir(base string, i int) string { - return filepath.Join(base, fmt.Sprintf("p-%0.4d", i)) -} - -// Close the database. -func (db *PartitionedDB) Close() error { - var g errgroup.Group - - for _, partition := range db.Partitions { - g.Go(partition.Close) - } - - return g.Wait() -} - -// Appender returns a new appender against the database. -func (db *PartitionedDB) Appender() Appender { - app := &partitionedAppender{db: db} - - for _, p := range db.Partitions { - app.partitions = append(app.partitions, p.Appender().(*dbAppender)) - } - return app -} - -type partitionedAppender struct { - db *PartitionedDB - partitions []*dbAppender -} - -func (a *partitionedAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - h := lset.Hash() - p := h >> (64 - a.db.partitionPow) - - ref, err := a.partitions[p].hashedAdd(h, lset, t, v) - if err != nil { - return 0, err - } - return ref | (p << 48), nil -} - -func (a *partitionedAppender) AddFast(ref uint64, t int64, v float64) error { - p := uint8((ref << 8) >> 56) - return a.partitions[p].AddFast(ref, t, v) -} - -func (a *partitionedAppender) Commit() error { - var merr MultiError - - for _, p := range a.partitions { - merr.Add(p.Commit()) - } - return merr.Err() -} - -func (a *partitionedAppender) Rollback() error { - var merr MultiError - - for _, p := range a.partitions { - merr.Add(p.Rollback()) - } - return merr.Err() -} - // The MultiError type implements the error interface, and contains the // Errors used to construct it. type MultiError []error @@ -877,13 +753,7 @@ func (es MultiError) Err() error { } func yoloString(b []byte) string { - sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - - h := reflect.StringHeader{ - Data: sh.Data, - Len: sh.Len, - } - return *((*string)(unsafe.Pointer(&h))) + return *((*string)(unsafe.Pointer(&b))) } func closeAll(cs ...io.Closer) error { diff --git a/vendor/github.com/fabxc/tsdb/db_amd64.go b/vendor/github.com/fabxc/tsdb/db_amd64.go deleted file mode 100644 index cfd85c975..000000000 --- a/vendor/github.com/fabxc/tsdb/db_amd64.go +++ /dev/null @@ -1,10 +0,0 @@ -package tsdb - -// maxMapSize represents the largest mmap size supported by Bolt. -const maxMapSize = 0xFFFFFFFFFFFF // 256TB - -// maxAllocSize is the size used when creating array pointers. -const maxAllocSize = 0x7FFFFFFF - -// Are unaligned load/stores broken on this arch? -var brokenUnaligned = false diff --git a/vendor/github.com/fabxc/tsdb/head.go b/vendor/github.com/fabxc/tsdb/head.go index 058c8fe79..211cadf77 100644 --- a/vendor/github.com/fabxc/tsdb/head.go +++ b/vendor/github.com/fabxc/tsdb/head.go @@ -44,7 +44,6 @@ type headBlock struct { activeWriters uint64 - symbols map[string]struct{} // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. series []*memSeries @@ -150,7 +149,7 @@ func (h *headBlock) Close() error { h.mtx.Lock() if err := h.wal.Close(); err != nil { - return err + return errors.Wrapf(err, "close WAL for head %s", h.dir) } // Check whether the head block still exists in the underlying dir // or has already been replaced with a compacted version or removed. @@ -223,10 +222,8 @@ type refdSample struct { } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - return a.hashedAdd(lset.Hash(), lset, t, v) -} + hash := lset.Hash() -func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) { if ms := a.get(hash, lset); ms != nil { return uint64(ms.ref), a.AddFast(uint64(ms.ref), t, v) } @@ -530,13 +527,6 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { return s } -func (h *headBlock) fullness() float64 { - h.metamtx.RLock() - defer h.metamtx.RUnlock() - - return float64(h.meta.Stats.NumSamples) / float64(h.meta.Stats.NumSeries+1) / 250 -} - func (h *headBlock) updateMapping() { h.mtx.RLock() @@ -586,7 +576,7 @@ type memSeries struct { lastValue float64 sampleBuf [4]sample - app chunks.Appender // Current appender for the chunkdb. + app chunks.Appender // Current appender for the chunk. } func (s *memSeries) cut() *memChunk { diff --git a/vendor/github.com/fabxc/tsdb/querier.go b/vendor/github.com/fabxc/tsdb/querier.go index 7783ef312..a397b9a63 100644 --- a/vendor/github.com/fabxc/tsdb/querier.go +++ b/vendor/github.com/fabxc/tsdb/querier.go @@ -76,6 +76,9 @@ func (s *DB) Querier(mint, maxt int64) Querier { } func (q *querier) LabelValues(n string) ([]string, error) { + if len(q.blocks) == 0 { + return nil, nil + } res, err := q.blocks[0].LabelValues(n) if err != nil { return nil, err @@ -163,12 +166,16 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { } return &blockSeriesSet{ - index: q.index, - chunks: q.chunks, - it: p, - absent: absent, - mint: q.mint, - maxt: q.maxt, + set: &populatedChunkSeries{ + set: &baseChunkSeries{ + p: p, + index: q.index, + absent: absent, + }, + chunks: q.chunks, + mint: q.mint, + maxt: q.maxt, + }, } } @@ -233,69 +240,6 @@ func (q *blockQuerier) Close() error { return nil } -// partitionedQuerier merges query results from a set of partition querieres. -type partitionedQuerier struct { - mint, maxt int64 - partitions []Querier -} - -// Querier returns a new querier over the database for the given -// time range. -func (db *PartitionedDB) Querier(mint, maxt int64) Querier { - q := &partitionedQuerier{ - mint: mint, - maxt: maxt, - } - for _, s := range db.Partitions { - q.partitions = append(q.partitions, s.Querier(mint, maxt)) - } - - return q -} - -func (q *partitionedQuerier) Select(ms ...labels.Matcher) SeriesSet { - // We gather the non-overlapping series from every partition and simply - // return their union. - r := &mergedSeriesSet{} - - for _, s := range q.partitions { - r.sets = append(r.sets, s.Select(ms...)) - } - if len(r.sets) == 0 { - return nopSeriesSet{} - } - return r -} - -func (q *partitionedQuerier) LabelValues(n string) ([]string, error) { - res, err := q.partitions[0].LabelValues(n) - if err != nil { - return nil, err - } - for _, sq := range q.partitions[1:] { - pr, err := sq.LabelValues(n) - if err != nil { - return nil, err - } - // Merge new values into deduplicated result. - res = mergeStrings(res, pr) - } - return res, nil -} - -func (q *partitionedQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { - return nil, fmt.Errorf("not implemented") -} - -func (q *partitionedQuerier) Close() error { - var merr MultiError - - for _, sq := range q.partitions { - merr.Add(sq.Close()) - } - return merr.Err() -} - func mergeStrings(a, b []string) []string { maxl := len(a) if len(b) > len(a) { @@ -424,23 +368,31 @@ func (s *partitionSeriesSet) Next() bool { return true } -// blockSeriesSet is a set of series from an inverted index query. -type blockSeriesSet struct { - index IndexReader - chunks ChunkReader - it Postings // postings list referencing series - absent []string // labels that must not be set for result series - mint, maxt int64 // considered time range - - err error - cur Series +type chunkSeriesSet interface { + Next() bool + At() (labels.Labels, []ChunkMeta) + Err() error } -func (s *blockSeriesSet) Next() bool { - // Step through the postings iterator to find potential series. -outer: - for s.it.Next() { - lset, chunks, err := s.index.Series(s.it.At()) +// baseChunkSeries loads the label set and chunk references for a postings +// list from an index. It filters out series that have labels set that should be unset. +type baseChunkSeries struct { + p Postings + index IndexReader + absent []string // labels that must be unset in results. + + lset labels.Labels + chks []ChunkMeta + err error +} + +func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } +func (s *baseChunkSeries) Err() error { return s.err } + +func (s *baseChunkSeries) Next() bool { +Outer: + for s.p.Next() { + lset, chunks, err := s.index.Series(s.p.At()) if err != nil { s.err = err return false @@ -449,35 +401,87 @@ outer: // If a series contains a label that must be absent, it is skipped as well. for _, abs := range s.absent { if lset.Get(abs) != "" { - continue outer + continue Outer } } - ser := &chunkSeries{ - labels: lset, - chunks: make([]ChunkMeta, 0, len(chunks)), - chunk: s.chunks.Chunk, - } - // Only use chunks that fit the time range. - for _, c := range chunks { + s.lset = lset + s.chks = chunks + + return true + } + if err := s.p.Err(); err != nil { + s.err = err + } + return false +} + +// populatedChunkSeries loads chunk data from a store for a set of series +// with known chunk references. It filters out chunks that do not fit the +// given time range. +type populatedChunkSeries struct { + set chunkSeriesSet + chunks ChunkReader + mint, maxt int64 + + err error + chks []ChunkMeta + lset labels.Labels +} + +func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } +func (s *populatedChunkSeries) Err() error { return s.err } + +func (s *populatedChunkSeries) Next() bool { + for s.set.Next() { + lset, chks := s.set.At() + + for i := range chks { + c := &chks[i] + if c.MaxTime < s.mint { + chks = chks[1:] continue } if c.MinTime > s.maxt { + chks = chks[:i] break } - ser.chunks = append(ser.chunks, c) + c.Chunk, s.err = s.chunks.Chunk(c.Ref) + if s.err != nil { + return false + } } - // If no chunks of the series apply to the time range, skip it. - if len(ser.chunks) == 0 { + if len(chks) == 0 { continue } - s.cur = ser + s.lset = lset + s.chks = chks + return true } - if s.it.Err() != nil { - s.err = s.it.Err() + if err := s.set.Err(); err != nil { + s.err = err + } + return false +} + +// blockSeriesSet is a set of series from an inverted index query. +type blockSeriesSet struct { + set chunkSeriesSet + err error + cur Series +} + +func (s *blockSeriesSet) Next() bool { + for s.set.Next() { + lset, chunks := s.set.At() + s.cur = &chunkSeries{labels: lset, chunks: chunks} + return true + } + if s.set.Err() != nil { + s.err = s.set.Err() } return false } @@ -490,10 +494,6 @@ func (s *blockSeriesSet) Err() error { return s.err } type chunkSeries struct { labels labels.Labels chunks []ChunkMeta // in-order chunk refs - - // chunk is a function that retrieves chunks based on a reference - // number contained in the chunk meta information. - chunk func(ref uint64) (chunks.Chunk, error) } func (s *chunkSeries) Labels() labels.Labels { @@ -501,21 +501,7 @@ func (s *chunkSeries) Labels() labels.Labels { } func (s *chunkSeries) Iterator() SeriesIterator { - var cs []chunks.Chunk - var mints []int64 - - for _, co := range s.chunks { - c, err := s.chunk(co.Ref) - if err != nil { - panic(err) // TODO(fabxc): add error series iterator. - } - cs = append(cs, c) - mints = append(mints, co.MinTime) - } - - // TODO(fabxc): consider pushing chunk retrieval further down. In practice, we - // probably have to touch all chunks anyway and it doesn't matter. - return newChunkSeriesIterator(mints, cs) + return newChunkSeriesIterator(s.chunks) } // SeriesIterator iterates over the data of a time series. @@ -601,43 +587,38 @@ func (it *chainedSeriesIterator) Err() error { // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct { - mints []int64 // minimum timestamps for each iterator - chunks []chunks.Chunk + chunks []ChunkMeta i int cur chunks.Iterator } -func newChunkSeriesIterator(mints []int64, cs []chunks.Chunk) *chunkSeriesIterator { - if len(mints) != len(cs) { - panic("chunk references and chunks length don't match") - } +func newChunkSeriesIterator(cs []ChunkMeta) *chunkSeriesIterator { return &chunkSeriesIterator{ - mints: mints, chunks: cs, i: 0, - cur: cs[0].Iterator(), + cur: cs[0].Chunk.Iterator(), } } func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { // Only do binary search forward to stay in line with other iterators // that can only move forward. - x := sort.Search(len(it.mints[it.i:]), func(i int) bool { return it.mints[i] >= t }) + x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t }) x += it.i // If the timestamp was not found, it might be in the last chunk. - if x == len(it.mints) { + if x == len(it.chunks) { x-- } // Go to previous chunk if the chunk doesn't exactly start with t. // If we are already at the first chunk, we use it as it's the best we have. - if x > 0 && it.mints[x] > t { + if x > 0 && it.chunks[x].MinTime > t { x-- } it.i = x - it.cur = it.chunks[x].Iterator() + it.cur = it.chunks[x].Chunk.Iterator() for it.cur.Next() { t0, _ := it.cur.At() @@ -664,7 +645,7 @@ func (it *chunkSeriesIterator) Next() bool { } it.i++ - it.cur = it.chunks[it.i].Iterator() + it.cur = it.chunks[it.i].Chunk.Iterator() return it.Next() } diff --git a/vendor/github.com/fabxc/tsdb/reader.go b/vendor/github.com/fabxc/tsdb/reader.go deleted file mode 100644 index d4d816c1f..000000000 --- a/vendor/github.com/fabxc/tsdb/reader.go +++ /dev/null @@ -1,459 +0,0 @@ -package tsdb - -import ( - "encoding/binary" - "fmt" - "io" - "path/filepath" - "strings" - - "github.com/fabxc/tsdb/chunks" - "github.com/fabxc/tsdb/labels" - "github.com/pkg/errors" -) - -// ChunkReader provides reading access of serialized time series data. -type ChunkReader interface { - // Chunk returns the series data chunk with the given reference. - Chunk(ref uint64) (chunks.Chunk, error) - - // Close releases all underlying resources of the reader. - Close() error -} - -// chunkReader implements a SeriesReader for a serialized byte stream -// of series data. -type chunkReader struct { - // The underlying bytes holding the encoded series data. - bs [][]byte - - // Closers for resources behind the byte slices. - cs []io.Closer -} - -// newChunkReader returns a new chunkReader based on mmaped files found in dir. -func newChunkReader(dir string) (*chunkReader, error) { - files, err := sequenceFiles(dir, "") - if err != nil { - return nil, err - } - var cr chunkReader - - for _, fn := range files { - f, err := openMmapFile(fn) - if err != nil { - return nil, errors.Wrapf(err, "mmap files") - } - cr.cs = append(cr.cs, f) - cr.bs = append(cr.bs, f.b) - } - - for i, b := range cr.bs { - if len(b) < 4 { - return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) - } - // Verify magic number. - if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries { - return nil, fmt.Errorf("invalid magic number %x", m) - } - } - return &cr, nil -} - -func (s *chunkReader) Close() error { - return closeAll(s.cs...) -} - -func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { - var ( - seq = int(ref >> 32) - off = int((ref << 32) >> 32) - ) - if seq >= len(s.bs) { - return nil, errors.Errorf("reference sequence %d out of range", seq) - } - b := s.bs[seq] - - if int(off) >= len(b) { - return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) - } - b = b[off:] - - l, n := binary.Uvarint(b) - if n < 0 { - return nil, fmt.Errorf("reading chunk length failed") - } - b = b[n:] - enc := chunks.Encoding(b[0]) - - c, err := chunks.FromData(enc, b[1:1+l]) - if err != nil { - return nil, err - } - return c, nil -} - -// IndexReader provides reading access of serialized index data. -type IndexReader interface { - // LabelValues returns the possible label values - LabelValues(names ...string) (StringTuples, error) - - // Postings returns the postings list iterator for the label pair. - Postings(name, value string) (Postings, error) - - // Series returns the series for the given reference. - Series(ref uint32) (labels.Labels, []ChunkMeta, error) - - // LabelIndices returns the label pairs for which indices exist. - LabelIndices() ([][]string, error) - - // Close released the underlying resources of the reader. - Close() error -} - -// StringTuples provides access to a sorted list of string tuples. -type StringTuples interface { - // Total number of tuples in the list. - Len() int - // At returns the tuple at position i. - At(i int) ([]string, error) -} - -type indexReader struct { - // The underlying byte slice holding the encoded series data. - b []byte - - // Close that releases the underlying resources of the byte slice. - c io.Closer - - // Cached hashmaps of section offsets. - labels map[string]uint32 - postings map[string]uint32 -} - -var ( - errInvalidSize = fmt.Errorf("invalid size") - errInvalidFlag = fmt.Errorf("invalid flag") -) - -// newIndexReader returns a new indexReader on the given directory. -func newIndexReader(dir string) (*indexReader, error) { - f, err := openMmapFile(filepath.Join(dir, "index")) - if err != nil { - return nil, err - } - r := &indexReader{b: f.b, c: f} - - // Verify magic number. - if len(f.b) < 4 { - return nil, errors.Wrap(errInvalidSize, "index header") - } - if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { - return nil, errors.Errorf("invalid magic number %x", m) - } - - // The last two 4 bytes hold the pointers to the hashmaps. - loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4]) - poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:]) - - flag, b, err := r.section(loff) - if err != nil { - return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) - } - if r.labels, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read label index hashmap") - } - flag, b, err = r.section(poff) - if err != nil { - return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) - } - if r.postings, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read postings hashmap") - } - - return r, nil -} - -func readHashmap(flag byte, b []byte) (map[string]uint32, error) { - if flag != flagStd { - return nil, errInvalidFlag - } - h := make(map[string]uint32, 512) - - for len(b) > 0 { - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read key length") - } - b = b[n:] - - if len(b) < int(l) { - return nil, errors.Wrap(errInvalidSize, "read key") - } - s := string(b[:l]) - b = b[l:] - - o, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read offset value") - } - b = b[n:] - - h[s] = uint32(o) - } - - return h, nil -} - -func (r *indexReader) Close() error { - return r.c.Close() -} - -func (r *indexReader) section(o uint32) (byte, []byte, error) { - b := r.b[o:] - - if len(b) < 5 { - return 0, nil, errors.Wrap(errInvalidSize, "read header") - } - - flag := b[0] - l := binary.BigEndian.Uint32(b[1:5]) - - b = b[5:] - - // b must have the given length plus 4 bytes for the CRC32 checksum. - if len(b) < int(l)+4 { - return 0, nil, errors.Wrap(errInvalidSize, "section content") - } - return flag, b[:l], nil -} - -func (r *indexReader) lookupSymbol(o uint32) (string, error) { - if int(o) > len(r.b) { - return "", errors.Errorf("invalid symbol offset %d", o) - } - l, n := binary.Uvarint(r.b[o:]) - if n < 0 { - return "", errors.New("reading symbol length failed") - } - - end := int(o) + n + int(l) - if end > len(r.b) { - return "", errors.New("invalid length") - } - b := r.b[int(o)+n : end] - - return yoloString(b), nil -} - -func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { - key := strings.Join(names, string(sep)) - off, ok := r.labels[key] - if !ok { - return nil, fmt.Errorf("label index doesn't exist") - } - - flag, b, err := r.section(off) - if err != nil { - return nil, errors.Wrapf(err, "section at %d", off) - } - if flag != flagStd { - return nil, errInvalidFlag - } - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read label index size") - } - - st := &serializedStringTuples{ - l: int(l), - b: b[n:], - lookup: r.lookupSymbol, - } - return st, nil -} - -func (r *indexReader) LabelIndices() ([][]string, error) { - res := [][]string{} - - for s := range r.labels { - res = append(res, strings.Split(s, string(sep))) - } - return res, nil -} - -func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { - k, n := binary.Uvarint(r.b[ref:]) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of labels") - } - - b := r.b[int(ref)+n:] - lbls := make(labels.Labels, 0, k) - - for i := 0; i < 2*int(k); i += 2 { - o, m := binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") - } - n, err := r.lookupSymbol(uint32(o)) - if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") - } - b = b[m:] - - o, m = binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") - } - v, err := r.lookupSymbol(uint32(o)) - if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") - } - b = b[m:] - - lbls = append(lbls, labels.Label{ - Name: n, - Value: v, - }) - } - - // Read the chunks meta data. - k, n = binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of chunks") - } - - b = b[n:] - chunks := make([]ChunkMeta, 0, k) - - for i := 0; i < int(k); i++ { - firstTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "first time") - } - b = b[n:] - - lastTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "last time") - } - b = b[n:] - - o, n := binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "chunk offset") - } - b = b[n:] - - chunks = append(chunks, ChunkMeta{ - Ref: o, - MinTime: firstTime, - MaxTime: lastTime, - }) - } - - return lbls, chunks, nil -} - -func (r *indexReader) Postings(name, value string) (Postings, error) { - key := name + string(sep) + value - - off, ok := r.postings[key] - if !ok { - return nil, ErrNotFound - } - - flag, b, err := r.section(off) - if err != nil { - return nil, errors.Wrapf(err, "section at %d", off) - } - - if flag != flagStd { - return nil, errors.Wrapf(errInvalidFlag, "section at %d", off) - } - - // TODO(fabxc): just read into memory as an intermediate solution. - // Add iterator over serialized data. - var l []uint32 - - for len(b) > 0 { - if len(b) < 4 { - return nil, errors.Wrap(errInvalidSize, "plain postings entry") - } - l = append(l, binary.BigEndian.Uint32(b[:4])) - - b = b[4:] - } - - return &listPostings{list: l, idx: -1}, nil -} - -type stringTuples struct { - l int // tuple length - s []string // flattened tuple entries -} - -func newStringTuples(s []string, l int) (*stringTuples, error) { - if len(s)%l != 0 { - return nil, errors.Wrap(errInvalidSize, "string tuple list") - } - return &stringTuples{s: s, l: l}, nil -} - -func (t *stringTuples) Len() int { return len(t.s) / t.l } -func (t *stringTuples) At(i int) ([]string, error) { return t.s[i : i+t.l], nil } - -func (t *stringTuples) Swap(i, j int) { - c := make([]string, t.l) - copy(c, t.s[i:i+t.l]) - - for k := 0; k < t.l; k++ { - t.s[i+k] = t.s[j+k] - t.s[j+k] = c[k] - } -} - -func (t *stringTuples) Less(i, j int) bool { - for k := 0; k < t.l; k++ { - d := strings.Compare(t.s[i+k], t.s[j+k]) - - if d < 0 { - return true - } - if d > 0 { - return false - } - } - return false -} - -type serializedStringTuples struct { - l int - b []byte - lookup func(uint32) (string, error) -} - -func (t *serializedStringTuples) Len() int { - // TODO(fabxc): Cache this? - return len(t.b) / (4 * t.l) -} - -func (t *serializedStringTuples) At(i int) ([]string, error) { - if len(t.b) < (i+t.l)*4 { - return nil, errInvalidSize - } - res := make([]string, 0, t.l) - - for k := 0; k < t.l; k++ { - offset := binary.BigEndian.Uint32(t.b[(i+k)*4:]) - - s, err := t.lookup(offset) - if err != nil { - return nil, errors.Wrap(err, "symbol lookup") - } - res = append(res, s) - } - - return res, nil -} diff --git a/vendor/github.com/fabxc/tsdb/wal.go b/vendor/github.com/fabxc/tsdb/wal.go index 8b88d110d..5b85ca3c9 100644 --- a/vendor/github.com/fabxc/tsdb/wal.go +++ b/vendor/github.com/fabxc/tsdb/wal.go @@ -448,7 +448,11 @@ func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { cr := r.rs[r.cur] et, flag, b, err := r.entry(cr) - if err == io.EOF { + // If we reached the end of the reader, advance to the next one + // and close. + // Do not close on the last one as it will still be appended to. + // XXX(fabxc): leaky abstraction. + if err == io.EOF && r.cur < len(r.rs)-1 { // Current reader completed, close and move to the next one. if err := cr.Close(); err != nil { return 0, 0, nil, err diff --git a/vendor/github.com/fabxc/tsdb/writer.go b/vendor/github.com/fabxc/tsdb/writer.go deleted file mode 100644 index 264052d32..000000000 --- a/vendor/github.com/fabxc/tsdb/writer.go +++ /dev/null @@ -1,611 +0,0 @@ -package tsdb - -import ( - "bufio" - "encoding/binary" - "hash" - "hash/crc32" - "io" - "os" - "path/filepath" - "sort" - "strings" - - "github.com/coreos/etcd/pkg/fileutil" - "github.com/fabxc/tsdb/chunks" - "github.com/fabxc/tsdb/labels" - "github.com/pkg/errors" -) - -const ( - // MagicSeries 4 bytes at the head of series file. - MagicSeries = 0x85BD40DD - - // MagicIndex 4 bytes at the head of an index file. - MagicIndex = 0xBAAAD700 -) - -const compactionPageBytes = minSectorSize * 64 - -// ChunkWriter serializes a time block of chunked series data. -type ChunkWriter interface { - // WriteChunks writes several chunks. The data field of the ChunkMetas - // must be populated. - // After returning successfully, the Ref fields in the ChunkMetas - // is set and can be used to retrieve the chunks from the written data. - WriteChunks(chunks ...ChunkMeta) error - - // Close writes any required finalization and closes the resources - // associated with the underlying writer. - Close() error -} - -// chunkWriter implements the ChunkWriter interface for the standard -// serialization format. -type chunkWriter struct { - dirFile *os.File - files []*os.File - wbuf *bufio.Writer - n int64 - crc32 hash.Hash - - segmentSize int64 -} - -const ( - defaultChunkSegmentSize = 512 * 1024 * 1024 - - chunksFormatV1 = 1 - indexFormatV1 = 1 -) - -func newChunkWriter(dir string) (*chunkWriter, error) { - if err := os.MkdirAll(dir, 0777); err != nil { - return nil, err - } - dirFile, err := fileutil.OpenDir(dir) - if err != nil { - return nil, err - } - cw := &chunkWriter{ - dirFile: dirFile, - n: 0, - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), - segmentSize: defaultChunkSegmentSize, - } - return cw, nil -} - -func (w *chunkWriter) tail() *os.File { - if len(w.files) == 0 { - return nil - } - return w.files[len(w.files)-1] -} - -// finalizeTail writes all pending data to the current tail file, -// truncates its size, and closes it. -func (w *chunkWriter) finalizeTail() error { - tf := w.tail() - if tf == nil { - return nil - } - - if err := w.wbuf.Flush(); err != nil { - return err - } - if err := fileutil.Fsync(tf); err != nil { - return err - } - // As the file was pre-allocated, we truncate any superfluous zero bytes. - off, err := tf.Seek(0, os.SEEK_CUR) - if err != nil { - return err - } - if err := tf.Truncate(off); err != nil { - return err - } - return tf.Close() -} - -func (w *chunkWriter) cut() error { - // Sync current tail to disk and close. - w.finalizeTail() - - p, _, err := nextSequenceFile(w.dirFile.Name(), "") - if err != nil { - return err - } - f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - return err - } - if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { - return err - } - if err = w.dirFile.Sync(); err != nil { - return err - } - - // Write header metadata for new file. - - metab := make([]byte, 8) - binary.BigEndian.PutUint32(metab[:4], MagicSeries) - metab[4] = chunksFormatV1 - - if _, err := f.Write(metab); err != nil { - return err - } - - w.files = append(w.files, f) - if w.wbuf != nil { - w.wbuf.Reset(f) - } else { - w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) - } - w.n = 8 - - return nil -} - -func (w *chunkWriter) write(wr io.Writer, b []byte) error { - n, err := wr.Write(b) - w.n += int64(n) - return err -} - -func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { - // Calculate maximum space we need and cut a new segment in case - // we don't fit into the current one. - maxLen := int64(binary.MaxVarintLen32) - for _, c := range chks { - maxLen += binary.MaxVarintLen32 + 1 - maxLen += int64(len(c.Chunk.Bytes())) - } - newsz := w.n + maxLen - - if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize { - if err := w.cut(); err != nil { - return err - } - } - - // Write chunks sequentially and set the reference field in the ChunkMeta. - w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.wbuf) - - b := make([]byte, binary.MaxVarintLen32) - n := binary.PutUvarint(b, uint64(len(chks))) - - if err := w.write(wr, b[:n]); err != nil { - return err - } - seq := uint64(w.seq()) << 32 - - for i := range chks { - chk := &chks[i] - - chk.Ref = seq | uint64(w.n) - - n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) - - if err := w.write(wr, b[:n]); err != nil { - return err - } - if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil { - return err - } - if err := w.write(wr, chk.Chunk.Bytes()); err != nil { - return err - } - chk.Chunk = nil - } - - if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil { - return err - } - return nil -} - -func (w *chunkWriter) seq() int { - return len(w.files) - 1 -} - -func (w *chunkWriter) Close() error { - return w.finalizeTail() -} - -// ChunkMeta holds information about a chunk of data. -type ChunkMeta struct { - // Ref and Chunk hold either a reference that can be used to retrieve - // chunk data or the data itself. - // Generally, only one of them is set. - Ref uint64 - Chunk chunks.Chunk - - MinTime, MaxTime int64 // time range the data covers -} - -// IndexWriter serialized the index for a block of series data. -// The methods must generally be called in order they are specified. -type IndexWriter interface { - // AddSeries populates the index writer witha series and its offsets - // of chunks that the index can reference. - // The reference number is used to resolve a series against the postings - // list iterator. It only has to be available during the write processing. - AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error - - // WriteLabelIndex serializes an index from label names to values. - // The passed in values chained tuples of strings of the length of names. - WriteLabelIndex(names []string, values []string) error - - // WritePostings writes a postings list for a single label pair. - WritePostings(name, value string, it Postings) error - - // Close writes any finalization and closes theresources associated with - // the underlying writer. - Close() error -} - -type indexWriterSeries struct { - labels labels.Labels - chunks []ChunkMeta // series file offset of chunks - offset uint32 // index file offset of series reference -} - -// indexWriter implements the IndexWriter interface for the standard -// serialization format. -type indexWriter struct { - f *os.File - bufw *bufio.Writer - n int64 - started bool - - // Reusable memory. - b []byte - uint32s []uint32 - - series map[uint32]*indexWriterSeries - symbols map[string]uint32 // symbol offsets - labelIndexes []hashEntry // label index offsets - postings []hashEntry // postings lists offsets - - crc32 hash.Hash -} - -func newIndexWriter(dir string) (*indexWriter, error) { - df, err := fileutil.OpenDir(dir) - if err != nil { - return nil, err - } - f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return nil, err - } - if err := fileutil.Fsync(df); err != nil { - return nil, errors.Wrap(err, "sync dir") - } - - iw := &indexWriter{ - f: f, - bufw: bufio.NewWriterSize(f, 1<<22), - n: 0, - - // Reusable memory. - b: make([]byte, 0, 1<<23), - uint32s: make([]uint32, 0, 1<<15), - - // Caches. - symbols: make(map[string]uint32, 1<<13), - series: make(map[uint32]*indexWriterSeries, 1<<16), - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), - } - if err := iw.writeMeta(); err != nil { - return nil, err - } - return iw, nil -} - -func (w *indexWriter) write(wr io.Writer, b []byte) error { - n, err := wr.Write(b) - w.n += int64(n) - return err -} - -// section writes a CRC32 checksummed section of length l and guarded by flag. -func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error { - w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.bufw) - - b := [5]byte{flag, 0, 0, 0, 0} - binary.BigEndian.PutUint32(b[1:], uint32(l)) - - if err := w.write(wr, b[:]); err != nil { - return errors.Wrap(err, "writing header") - } - - if err := f(wr); err != nil { - return errors.Wrap(err, "write contents") - } - if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil { - return errors.Wrap(err, "writing checksum") - } - return nil -} - -func (w *indexWriter) writeMeta() error { - b := [8]byte{} - - binary.BigEndian.PutUint32(b[:4], MagicIndex) - b[4] = flagStd - - return w.write(w.bufw, b[:]) -} - -func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error { - if _, ok := w.series[ref]; ok { - return errors.Errorf("series with reference %d already added", ref) - } - // Populate the symbol table from all label sets we have to reference. - for _, l := range lset { - w.symbols[l.Name] = 0 - w.symbols[l.Value] = 0 - } - - w.series[ref] = &indexWriterSeries{ - labels: lset, - chunks: chunks, - } - return nil -} - -func (w *indexWriter) writeSymbols() error { - // Generate sorted list of strings we will store as reference table. - symbols := make([]string, 0, len(w.symbols)) - for s := range w.symbols { - symbols = append(symbols, s) - } - sort.Strings(symbols) - - // The start of the section plus a 5 byte section header are our base. - // TODO(fabxc): switch to relative offsets and hold sections in a TOC. - base := uint32(w.n) + 5 - - buf := [binary.MaxVarintLen32]byte{} - w.b = append(w.b[:0], flagStd) - - for _, s := range symbols { - w.symbols[s] = base + uint32(len(w.b)) - - n := binary.PutUvarint(buf[:], uint64(len(s))) - w.b = append(w.b, buf[:n]...) - w.b = append(w.b, s...) - } - - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -type indexWriterSeriesSlice []*indexWriterSeries - -func (s indexWriterSeriesSlice) Len() int { return len(s) } -func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func (s indexWriterSeriesSlice) Less(i, j int) bool { - return labels.Compare(s[i].labels, s[j].labels) < 0 -} - -func (w *indexWriter) writeSeries() error { - // Series must be stored sorted along their labels. - series := make(indexWriterSeriesSlice, 0, len(w.series)) - - for _, s := range w.series { - series = append(series, s) - } - sort.Sort(series) - - // Current end of file plus 5 bytes for section header. - // TODO(fabxc): switch to relative offsets. - base := uint32(w.n) + 5 - - w.b = w.b[:0] - buf := make([]byte, binary.MaxVarintLen64) - - for _, s := range series { - // Write label set symbol references. - s.offset = base + uint32(len(w.b)) - - n := binary.PutUvarint(buf, uint64(len(s.labels))) - w.b = append(w.b, buf[:n]...) - - for _, l := range s.labels { - n = binary.PutUvarint(buf, uint64(w.symbols[l.Name])) - w.b = append(w.b, buf[:n]...) - n = binary.PutUvarint(buf, uint64(w.symbols[l.Value])) - w.b = append(w.b, buf[:n]...) - } - - // Write chunks meta data including reference into chunk file. - n = binary.PutUvarint(buf, uint64(len(s.chunks))) - w.b = append(w.b, buf[:n]...) - - for _, c := range s.chunks { - n = binary.PutVarint(buf, c.MinTime) - w.b = append(w.b, buf[:n]...) - n = binary.PutVarint(buf, c.MaxTime) - w.b = append(w.b, buf[:n]...) - - n = binary.PutUvarint(buf, uint64(c.Ref)) - w.b = append(w.b, buf[:n]...) - } - } - - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -func (w *indexWriter) init() error { - if err := w.writeSymbols(); err != nil { - return err - } - if err := w.writeSeries(); err != nil { - return err - } - w.started = true - - return nil -} - -func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { - if !w.started { - if err := w.init(); err != nil { - return err - } - } - - valt, err := newStringTuples(values, len(names)) - if err != nil { - return err - } - sort.Sort(valt) - - w.labelIndexes = append(w.labelIndexes, hashEntry{ - name: strings.Join(names, string(sep)), - offset: uint32(w.n), - }) - - buf := make([]byte, binary.MaxVarintLen32) - n := binary.PutUvarint(buf, uint64(len(names))) - - l := n + len(values)*4 - - return w.section(l, flagStd, func(wr io.Writer) error { - // First byte indicates tuple size for index. - if err := w.write(wr, buf[:n]); err != nil { - return err - } - - for _, v := range valt.s { - binary.BigEndian.PutUint32(buf, w.symbols[v]) - - if err := w.write(wr, buf[:4]); err != nil { - return err - } - } - return nil - }) -} - -func (w *indexWriter) WritePostings(name, value string, it Postings) error { - if !w.started { - if err := w.init(); err != nil { - return err - } - } - - key := name + string(sep) + value - - w.postings = append(w.postings, hashEntry{ - name: key, - offset: uint32(w.n), - }) - - // Order of the references in the postings list does not imply order - // of the series references within the persisted block they are mapped to. - // We have to sort the new references again. - refs := w.uint32s[:0] - - for it.Next() { - s, ok := w.series[it.At()] - if !ok { - return errors.Errorf("series for reference %d not found", it.At()) - } - refs = append(refs, s.offset) - } - if err := it.Err(); err != nil { - return err - } - - sort.Sort(uint32slice(refs)) - - w.b = w.b[:0] - buf := make([]byte, 4) - - for _, r := range refs { - binary.BigEndian.PutUint32(buf, r) - w.b = append(w.b, buf...) - } - - w.uint32s = refs[:0] - - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -type uint32slice []uint32 - -func (s uint32slice) Len() int { return len(s) } -func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } - -type hashEntry struct { - name string - offset uint32 -} - -func (w *indexWriter) writeHashmap(h []hashEntry) error { - w.b = w.b[:0] - buf := [binary.MaxVarintLen32]byte{} - - for _, e := range h { - n := binary.PutUvarint(buf[:], uint64(len(e.name))) - w.b = append(w.b, buf[:n]...) - w.b = append(w.b, e.name...) - - n = binary.PutUvarint(buf[:], uint64(e.offset)) - w.b = append(w.b, buf[:n]...) - } - - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -func (w *indexWriter) finalize() error { - // Write out hash maps to jump to correct label index and postings sections. - lo := uint32(w.n) - if err := w.writeHashmap(w.labelIndexes); err != nil { - return err - } - - po := uint32(w.n) - if err := w.writeHashmap(w.postings); err != nil { - return err - } - - // Terminate index file with offsets to hashmaps. This is the entry Pointer - // for any index query. - // TODO(fabxc): also store offset to series section to allow plain - // iteration over all existing series? - b := [8]byte{} - binary.BigEndian.PutUint32(b[:4], lo) - binary.BigEndian.PutUint32(b[4:], po) - - return w.write(w.bufw, b[:]) -} - -func (w *indexWriter) Close() error { - if err := w.finalize(); err != nil { - return err - } - if err := w.bufw.Flush(); err != nil { - return err - } - if err := fileutil.Fsync(w.f); err != nil { - return err - } - return w.f.Close() -} diff --git a/vendor/vendor.json b/vendor/vendor.json index e183acfcf..49d0513f9 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -368,10 +368,10 @@ "revisionTime": "2016-09-30T00:14:02Z" }, { - "checksumSHA1": "IOnF9CNVjOBoVwdfzfUEv/+JotI=", + "checksumSHA1": "Aj4Cn1RClamxluIri/LQMnK/yB4=", "path": "github.com/fabxc/tsdb", - "revision": "55a9b5428aceb644b3b297d7a9fd63d0354ce953", - "revisionTime": "2017-03-04T15:50:48Z" + "revision": "ca1bc920b795cfc670002e7643471b0277e79a9b", + "revisionTime": "2017-03-08T15:54:13Z" }, { "checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=",