diff --git a/vendor/github.com/fabxc/tsdb/compact.go b/vendor/github.com/fabxc/tsdb/compact.go index b58acba43..51d4ce0d1 100644 --- a/vendor/github.com/fabxc/tsdb/compact.go +++ b/vendor/github.com/fabxc/tsdb/compact.go @@ -13,6 +13,23 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Compactor provides compaction against an underlying storage +// of time series data. +type Compactor interface { + // Plan returns a set of non-overlapping directories that can + // be compacted concurrently. + // Results returned when compactions are in progress are undefined. + Plan(dir string) ([][]string, error) + + // Write persists a Block into a directory. + Write(dir string, b Block) error + + // Compact runs compaction against the provided directories. Must + // only be called concurrently with results of Plan(). + Compact(dirs ...string) error +} + +// compactor implements the Compactor interface. type compactor struct { metrics *compactorMetrics opts *compactorOptions @@ -69,61 +86,55 @@ type compactionInfo struct { const compactionBlocksLen = 3 -// pick returns a range [i, j) in the blocks that are suitable to be compacted -// into a single block at position i. -func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) { - if len(bs) == 0 { - return 0, 0, false +func (c *compactor) Plan(dir string) ([][]string, error) { + dirs, err := blockDirs(dir) + if err != nil { + return nil, err } - // First, we always compact pending in-memory blocks – oldest first. - for i, b := range bs { - if b.generation > 0 { - continue - } - // Directly compact into 2nd generation with previous generation 1 blocks. - if i+1 >= compactionBlocksLen { - match := true - for _, pb := range bs[i-compactionBlocksLen+1 : i] { - match = match && pb.generation == 1 - } - if match { - return i - compactionBlocksLen + 1, i + 1, true - } - } - // If we have enough generation 0 blocks to directly move to the - // 2nd generation, skip generation 1. - if len(bs)-i >= compactionBlocksLen { - // Guard against the newly compacted block becoming larger than - // the previous one. - if i == 0 || bs[i-1].generation >= 2 { - return i, i + compactionBlocksLen, true - } - } + var bs []*BlockMeta - // No optimizations possible, naiively compact the new block. - return i, i + 1, true + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + return nil, err + } + if meta.Compaction.Generation > 0 { + bs = append(bs, meta) + } + } + + if len(bs) == 0 { + return nil, nil + } + + sliceDirs := func(i, j int) [][]string { + var res []string + for k := i; k < j; k++ { + res = append(res, dirs[k]) + } + return [][]string{res} } // Then we care about compacting multiple blocks, starting with the oldest. - for i := 0; i < len(bs)-compactionBlocksLen+1; i += compactionBlocksLen { + for i := 0; i < len(bs)-compactionBlocksLen+1; i++ { if c.match(bs[i : i+3]) { - return i, i + compactionBlocksLen, true + return sliceDirs(i, i+compactionBlocksLen), nil } } - return 0, 0, false + return nil, nil } -func (c *compactor) match(bs []compactionInfo) bool { - g := bs[0].generation +func (c *compactor) match(bs []*BlockMeta) bool { + g := bs[0].Compaction.Generation for _, b := range bs { - if b.generation != g { + if b.Compaction.Generation != g { return false } } - return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange + return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange } var entropy = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -136,11 +147,7 @@ func mergeBlockMetas(blocks ...Block) (res BlockMeta) { res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime res.ULID = ulid.MustNew(ulid.Now(), entropy) - g := m0.Compaction.Generation - if g == 0 && len(blocks) > 1 { - g++ - } - res.Compaction.Generation = g + 1 + res.Compaction.Generation = m0.Compaction.Generation + 1 for _, b := range blocks { res.Stats.NumSamples += b.Meta().Stats.NumSamples @@ -148,35 +155,62 @@ func mergeBlockMetas(blocks ...Block) (res BlockMeta) { return res } -func (c *compactor) compact(dir string, blocks ...Block) (err error) { - start := time.Now() - defer func() { +func (c *compactor) Compact(dirs ...string) (err error) { + var blocks []Block + + for _, d := range dirs { + b, err := newPersistedBlock(d) + if err != nil { + return err + } + blocks = append(blocks, b) + } + + return c.write(dirs[0], blocks...) +} + +func (c *compactor) Write(dir string, b Block) error { + return c.write(dir, b) +} + +// write creates a new block that is the union of the provided blocks into dir. +// It cleans up all files of the old blocks after completing successfully. +func (c *compactor) write(dir string, blocks ...Block) (err error) { + defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() } - c.metrics.duration.Observe(time.Since(start).Seconds()) - }() + c.metrics.duration.Observe(time.Since(t).Seconds()) + }(time.Now()) - if err = os.RemoveAll(dir); err != nil { + tmp := dir + ".tmp" + + if err = os.RemoveAll(tmp); err != nil { return err } - if err = os.MkdirAll(dir, 0777); err != nil { + if err = os.MkdirAll(tmp, 0777); err != nil { return err } - chunkw, err := newChunkWriter(chunkDir(dir)) + // Populate chunk and index files into temporary directory with + // data of all blocks. + chunkw, err := newChunkWriter(chunkDir(tmp)) if err != nil { return errors.Wrap(err, "open chunk writer") } - indexw, err := newIndexWriter(dir) + indexw, err := newIndexWriter(tmp) if err != nil { return errors.Wrap(err, "open index writer") } - if err = c.write(dir, blocks, indexw, chunkw); err != nil { + meta, err := c.populate(blocks, indexw, chunkw) + if err != nil { return errors.Wrap(err, "write compaction") } + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } if err = chunkw.Close(); err != nil { return errors.Wrap(err, "close chunk writer") @@ -184,16 +218,37 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { if err = indexw.Close(); err != nil { return errors.Wrap(err, "close index writer") } + + // Block successfully written, make visible and remove old ones. + if err := renameFile(tmp, dir); err != nil { + return errors.Wrap(err, "rename block dir") + } + for _, b := range blocks[1:] { + if err := os.RemoveAll(b.Dir()); err != nil { + return err + } + } + // Properly sync parent dir to ensure changes are visible. + df, err := fileutil.OpenDir(dir) + if err != nil { + return errors.Wrap(err, "sync block dir") + } + if err := fileutil.Fsync(df); err != nil { + return errors.Wrap(err, "sync block dir") + } + return nil } -func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw ChunkWriter) error { +// populate fills the index and chunk writers with new data gathered as the union +// of the provided blocks. It returns meta information for the new block. +func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { var set compactionSet for i, b := range blocks { all, err := b.Index().Postings("", "") if err != nil { - return err + return nil, err } // TODO(fabxc): find more transparent way of handling this. if hb, ok := b.(*headBlock); ok { @@ -207,7 +262,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw } set, err = newCompactionMerger(set, s) if err != nil { - return err + return nil, err } } @@ -222,7 +277,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw for set.Next() { lset, chunks := set.At() if err := chunkw.WriteChunks(chunks...); err != nil { - return err + return nil, err } indexw.AddSeries(i, lset, chunks...) @@ -243,7 +298,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw i++ } if set.Err() != nil { - return set.Err() + return nil, set.Err() } s := make([]string, 0, 256) @@ -254,13 +309,13 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw s = append(s, x) } if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { - return err + return nil, err } } for t := range postings.m { if err := indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil { - return err + return nil, err } } // Write a postings list containing all series. @@ -269,10 +324,10 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw all[i] = uint32(i) } if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { - return err + return nil, err } - return writeMetaFile(dir, &meta) + return &meta, nil } type compactionSet interface { diff --git a/vendor/github.com/fabxc/tsdb/db.go b/vendor/github.com/fabxc/tsdb/db.go index f967b044c..b1a3a2421 100644 --- a/vendor/github.com/fabxc/tsdb/db.go +++ b/vendor/github.com/fabxc/tsdb/db.go @@ -87,18 +87,26 @@ const sep = '\xff' // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { - dir string - lockf lockfile.Lockfile + dir string + lockf lockfile.Lockfile + logger log.Logger metrics *dbMetrics opts *Options + // Mutex for that must be held when modifying the general + // block layout. mtx sync.RWMutex persisted []*persistedBlock - heads []*headBlock - headGen uint8 + seqBlocks map[int]Block - compactor *compactor + // Mutex that must be held when modifying just the head blocks + // or the general layout. + headmtx sync.RWMutex + heads []*headBlock + headGen uint8 + + compactor Compactor compactc chan struct{} donec chan struct{} @@ -175,10 +183,9 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db maxBlockRange: opts.MaxBlockDuration, }) - if err := db.initBlocks(); err != nil { + if err := db.reloadBlocks(); err != nil { return nil, err } - go db.run() return db, nil @@ -200,35 +207,16 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - var seqs []int - var infos []compactionInfo - for _, b := range db.compactable() { - m := b.Meta() + var merr MultiError - infos = append(infos, compactionInfo{ - generation: m.Compaction.Generation, - mint: m.MinTime, - maxt: m.MaxTime, - seq: m.Sequence, - }) - seqs = append(seqs, m.Sequence) + changes, err := db.compact() + merr.Add(err) + + if changes { + merr.Add(db.reloadBlocks()) } - - i, j, ok := db.compactor.pick(infos) - if !ok { - continue - } - db.logger.Log("msg", "compact", "seqs", fmt.Sprintf("%v", seqs[i:j])) - - if err := db.compact(i, j); err != nil { + if err := merr.Err(); err != nil { db.logger.Log("msg", "compaction failed", "err", err) - continue - } - db.logger.Log("msg", "compaction completed") - // Trigger another compaction in case there's more work to do. - select { - case db.compactc <- struct{}{}: - default: } case <-db.stopc: @@ -237,150 +225,170 @@ func (db *DB) run() { } } -func (db *DB) getBlock(i int) Block { - if i < len(db.persisted) { - return db.persisted[i] - } - return db.heads[i-len(db.persisted)] -} +func (db *DB) compact() (changes bool, err error) { + // Check whether we have pending head blocks that are ready to be persisted. + // They have the highest priority. + db.headmtx.RLock() -// removeBlocks removes the blocks in range [i, j) from the list of persisted -// and head blocks. The blocks are not closed and their files not deleted. -func (db *DB) removeBlocks(i, j int) { - for k := i; k < j; k++ { - if i < len(db.persisted) { - db.persisted = append(db.persisted[:i], db.persisted[i+1:]...) - } else { - l := i - len(db.persisted) - db.heads = append(db.heads[:l], db.heads[l+1:]...) - } - } -} + var singles []*headBlock -func (db *DB) blocks() (bs []Block) { - for _, b := range db.persisted { - bs = append(bs, b) - } - for _, b := range db.heads { - bs = append(bs, b) - } - return bs -} - -// compact block in range [i, j) into a temporary directory and atomically -// swap the blocks out on successful completion. -func (db *DB) compact(i, j int) error { - if j <= i { - return errors.New("invalid compaction block range") - } - var blocks []Block - for k := i; k < j; k++ { - blocks = append(blocks, db.getBlock(k)) - } - var ( - dir = blocks[0].Dir() - tmpdir = dir + ".tmp" - ) - - if err := db.compactor.compact(tmpdir, blocks...); err != nil { - return err - } - - pb, err := newPersistedBlock(tmpdir) - if err != nil { - return err - } - - db.mtx.Lock() - defer db.mtx.Unlock() - - for _, b := range blocks { - if err := b.Close(); err != nil { - return errors.Wrapf(err, "close old block %s", b.Dir()) + // Collect head blocks that are ready for compaction. Write them after + // returning the lock to not block Appenders. + // Selected blocks are semantically ensured to not be written to afterwards + // by appendable(). + if len(db.heads) > db.opts.AppendableBlocks { + for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { + // Blocks that won't be appendable when instantiating a new appender + // might still have active appenders on them. + // Abort at the first one we encounter. + if atomic.LoadUint64(&h.activeWriters) > 0 { + break + } + singles = append(singles, h) } } - if err := renameFile(tmpdir, dir); err != nil { - return errors.Wrap(err, "rename dir") - } - pb.dir = dir + db.headmtx.RUnlock() - db.removeBlocks(i, j) - db.persisted = append(db.persisted, pb) +Loop: + for _, h := range singles { + db.logger.Log("msg", "write head", "seq", h.Meta().Sequence) - for _, b := range blocks[1:] { - db.logger.Log("msg", "remove old dir", "dir", b.Dir()) - if err := os.RemoveAll(b.Dir()); err != nil { - return errors.Wrap(err, "removing old block") + select { + case <-db.stopc: + break Loop + default: } - } - if err := db.retentionCutoff(); err != nil { - return err + + if err = db.compactor.Write(h.Dir(), h); err != nil { + return changes, errors.Wrap(err, "persist head block") + } + changes = true } - return nil -} + // Check for compactions of multiple blocks. + for { + plans, err := db.compactor.Plan(db.dir) + if err != nil { + return changes, errors.Wrap(err, "plan compaction") + } -func (db *DB) retentionCutoff() error { - if db.opts.RetentionDuration == 0 { - return nil - } - h := db.heads[len(db.heads)-1] - t := h.meta.MinTime - int64(db.opts.RetentionDuration) + select { + case <-db.stopc: + return false, nil + default: + } + // We just execute compactions sequentially to not cause too extreme + // CPU and memory spikes. + // TODO(fabxc): return more descriptive plans in the future that allow + // estimation of resource usage and conditional parallelization? + for _, p := range plans { + db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p)) - var ( - blocks = db.blocks() - i int - b Block - ) - for i, b = range blocks { - if b.Meta().MinTime >= t { + if err := db.compactor.Compact(p...); err != nil { + return changes, errors.Wrapf(err, "compact %s", p) + } + changes = true + } + // If we didn't compact anything, there's nothing left to do. + if len(plans) == 0 { break } } - if i <= 1 { - return nil - } - db.logger.Log("msg", "retention cutoff", "idx", i-1) - db.removeBlocks(0, i) - for _, b := range blocks[:i] { - if err := os.RemoveAll(b.Dir()); err != nil { - return errors.Wrap(err, "removing old block") - } - } - return nil + return changes, nil } -func (db *DB) initBlocks() error { - var ( - persisted []*persistedBlock - heads []*headBlock - ) +// func (db *DB) retentionCutoff() error { +// if db.opts.RetentionDuration == 0 { +// return nil +// } +// h := db.heads[len(db.heads)-1] +// t := h.meta.MinTime - int64(db.opts.RetentionDuration) + +// var ( +// blocks = db.blocks() +// i int +// b Block +// ) +// for i, b = range blocks { +// if b.Meta().MinTime >= t { +// break +// } +// } +// if i <= 1 { +// return nil +// } +// db.logger.Log("msg", "retention cutoff", "idx", i-1) +// db.removeBlocks(0, i) + +// for _, b := range blocks[:i] { +// if err := os.RemoveAll(b.Dir()); err != nil { +// return errors.Wrap(err, "removing old block") +// } +// } +// return nil +// } + +func (db *DB) reloadBlocks() error { + db.mtx.Lock() + defer db.mtx.Unlock() dirs, err := blockDirs(db.dir) if err != nil { - return err + return errors.Wrap(err, "find blocks") } + var ( + metas []*BlockMeta + persisted []*persistedBlock + heads []*headBlock + seqBlocks = make(map[int]Block, len(dirs)) + ) for _, dir := range dirs { - if fileutil.Exist(filepath.Join(dir, walDirName)) { - h, err := openHeadBlock(dir, db.logger) - if err != nil { - return err - } - h.generation = db.headGen - db.headGen++ - heads = append(heads, h) - continue - } - b, err := newPersistedBlock(dir) + meta, err := readMetaFile(dir) if err != nil { - return err + return errors.Wrapf(err, "read meta information %s", dir) } - persisted = append(persisted, b) + metas = append(metas, meta) } + for i, meta := range metas { + b, ok := db.seqBlocks[meta.Sequence] + if !ok { + return errors.Errorf("missing block for sequence %d", meta.Sequence) + } + + if meta.Compaction.Generation == 0 { + if meta.ULID != b.Meta().ULID { + return errors.Errorf("head block ULID changed unexpectedly") + } + heads = append(heads, b.(*headBlock)) + } else { + if meta.ULID != b.Meta().ULID { + if err := b.Close(); err != nil { + return err + } + b, err = newPersistedBlock(dirs[i]) + if err != nil { + return errors.Wrapf(err, "open persisted block %s", dirs[i]) + } + } + persisted = append(persisted, b.(*persistedBlock)) + } + + seqBlocks[meta.Sequence] = b + } + + for seq, b := range db.seqBlocks { + if _, ok := seqBlocks[seq]; !ok { + if err := b.Close(); err != nil { + return errors.Wrapf(err, "closing removed block %d", b.Meta().Sequence) + } + } + } + + db.seqBlocks = seqBlocks db.persisted = persisted db.heads = heads @@ -392,10 +400,11 @@ func (db *DB) Close() error { close(db.stopc) <-db.donec - var merr MultiError - + // Lock mutex and leave it locked so we panic if there's a bug causing + // the block to be used afterwards. db.mtx.Lock() - defer db.mtx.Unlock() + + var merr MultiError for _, pb := range db.persisted { merr.Add(pb.Close()) @@ -414,9 +423,14 @@ func (db *DB) Appender() Appender { db.mtx.RLock() a := &dbAppender{db: db} + db.headmtx.RLock() + for _, b := range db.appendable() { a.heads = append(a.heads, b.Appender().(*headAppender)) } + + db.headmtx.RUnlock() + return a } @@ -479,15 +493,12 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { // If there's no fitting head block for t, ensure it gets created. if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { - a.db.mtx.RUnlock() + a.db.headmtx.Lock() if err := a.db.ensureHead(t); err != nil { - a.db.mtx.RLock() + a.db.headmtx.Unlock() return nil, err } - - a.db.mtx.RLock() - if len(a.heads) == 0 { for _, b := range a.db.appendable() { a.heads = append(a.heads, b.Appender().(*headAppender)) @@ -500,6 +511,8 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { } } } + + a.db.headmtx.Unlock() } for i := len(a.heads) - 1; i >= 0; i-- { if h := a.heads[i]; t >= h.meta.MinTime { @@ -511,8 +524,8 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { } func (db *DB) ensureHead(t int64) error { - db.mtx.Lock() - defer db.mtx.Unlock() + // db.mtx.Lock() + // defer db.mtx.Unlock() // Initial case for a new database: we must create the first // AppendableBlocks-1 front padding heads. @@ -568,31 +581,6 @@ func (db *DB) appendable() []*headBlock { return db.heads[len(db.heads)-db.opts.AppendableBlocks:] } -func (db *DB) compactable() []Block { - db.mtx.RLock() - defer db.mtx.RUnlock() - - var blocks []Block - for _, pb := range db.persisted { - blocks = append(blocks, pb) - } - - if len(db.heads) <= db.opts.AppendableBlocks { - return blocks - } - - for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { - // Blocks that won't be appendable when instantiating a new appender - // might still have active appenders on them. - // Abort at the first one we encounter. - if atomic.LoadUint64(&h.activeWriters) > 0 { - break - } - blocks = append(blocks, h) - } - return blocks -} - func intervalOverlap(amin, amax, bmin, bmax int64) bool { if bmin >= amin && bmin <= amax { return true @@ -643,6 +631,7 @@ func (db *DB) cut(mint int64) (*headBlock, error) { } db.heads = append(db.heads, newHead) + db.seqBlocks[seq] = newHead db.headGen++ newHead.generation = db.headGen diff --git a/vendor/github.com/fabxc/tsdb/head.go b/vendor/github.com/fabxc/tsdb/head.go index 260fb962e..058c8fe79 100644 --- a/vendor/github.com/fabxc/tsdb/head.go +++ b/vendor/github.com/fabxc/tsdb/head.go @@ -63,7 +63,10 @@ type headBlock struct { } func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { - if err := os.MkdirAll(dir, 0777); err != nil { + // Make head block creation appear atomic. + tmp := dir + ".tmp" + + if err := os.MkdirAll(tmp, 0777); err != nil { return nil, err } ulid, err := ulid.New(ulid.Now(), entropy) @@ -71,7 +74,7 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head return nil, err } - if err := writeMetaFile(dir, &BlockMeta{ + if err := writeMetaFile(tmp, &BlockMeta{ ULID: ulid, Sequence: seq, MinTime: mint, @@ -79,6 +82,9 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head }); err != nil { return nil, err } + if err := renameFile(tmp, dir); err != nil { + return nil, err + } return openHeadBlock(dir, l) } @@ -139,12 +145,19 @@ func (h *headBlock) inBounds(t int64) bool { // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { + // Lock mutex and leave it locked so we panic if there's a bug causing + // the block to be used afterwards. + h.mtx.Lock() + if err := h.wal.Close(); err != nil { return err } // Check whether the head block still exists in the underlying dir - // or has already been replaced with a compacted version + // or has already been replaced with a compacted version or removed. meta, err := readMetaFile(h.dir) + if os.IsNotExist(err) { + return nil + } if err != nil { return err } diff --git a/vendor/github.com/fabxc/tsdb/querier.go b/vendor/github.com/fabxc/tsdb/querier.go index 5f02c77f7..7783ef312 100644 --- a/vendor/github.com/fabxc/tsdb/querier.go +++ b/vendor/github.com/fabxc/tsdb/querier.go @@ -47,7 +47,9 @@ type querier struct { func (s *DB) Querier(mint, maxt int64) Querier { s.mtx.RLock() + s.headmtx.RLock() blocks := s.blocksForInterval(mint, maxt) + s.headmtx.RUnlock() sq := &querier{ blocks: make([]Querier, 0, len(blocks)), diff --git a/vendor/github.com/fabxc/tsdb/wal.go b/vendor/github.com/fabxc/tsdb/wal.go index a923f5b3d..8b88d110d 100644 --- a/vendor/github.com/fabxc/tsdb/wal.go +++ b/vendor/github.com/fabxc/tsdb/wal.go @@ -58,7 +58,7 @@ type WAL struct { const ( walDirName = "wal" - walSegmentSizeBytes = 64 * 1000 * 1000 // 64 MB + walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB ) // OpenWAL opens or creates a write ahead log in the given directory. @@ -265,8 +265,9 @@ func (w *WAL) Close() error { close(w.stopc) <-w.donec + // Lock mutex and leave it locked so we panic if there's a bug causing + // the block to be used afterwards. w.mtx.Lock() - defer w.mtx.Unlock() if err := w.sync(); err != nil { return err diff --git a/vendor/github.com/fabxc/tsdb/writer.go b/vendor/github.com/fabxc/tsdb/writer.go index bafb0e8cd..264052d32 100644 --- a/vendor/github.com/fabxc/tsdb/writer.go +++ b/vendor/github.com/fabxc/tsdb/writer.go @@ -11,7 +11,6 @@ import ( "sort" "strings" - "github.com/bradfitz/slice" "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" @@ -262,6 +261,10 @@ type indexWriter struct { n int64 started bool + // Reusable memory. + b []byte + uint32s []uint32 + series map[uint32]*indexWriterSeries symbols map[string]uint32 // symbol offsets labelIndexes []hashEntry // label index offsets @@ -284,11 +287,17 @@ func newIndexWriter(dir string) (*indexWriter, error) { } iw := &indexWriter{ - f: f, - bufw: bufio.NewWriterSize(f, 1*1024*1024), - n: 0, - symbols: make(map[string]uint32, 4096), - series: make(map[uint32]*indexWriterSeries, 4096), + f: f, + bufw: bufio.NewWriterSize(f, 1<<22), + n: 0, + + // Reusable memory. + b: make([]byte, 0, 1<<23), + uint32s: make([]uint32, 0, 1<<15), + + // Caches. + symbols: make(map[string]uint32, 1<<13), + series: make(map[uint32]*indexWriterSeries, 1<<16), crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } if err := iw.writeMeta(); err != nil { @@ -304,12 +313,12 @@ func (w *indexWriter) write(wr io.Writer, b []byte) error { } // section writes a CRC32 checksummed section of length l and guarded by flag. -func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error { +func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error { w.crc32.Reset() wr := io.MultiWriter(w.crc32, w.bufw) b := [5]byte{flag, 0, 0, 0, 0} - binary.BigEndian.PutUint32(b[1:], l) + binary.BigEndian.PutUint32(b[1:], uint32(l)) if err := w.write(wr, b[:]); err != nil { return errors.Wrap(err, "writing header") @@ -363,74 +372,77 @@ func (w *indexWriter) writeSymbols() error { base := uint32(w.n) + 5 buf := [binary.MaxVarintLen32]byte{} - b := append(make([]byte, 0, 4096), flagStd) + w.b = append(w.b[:0], flagStd) for _, s := range symbols { - w.symbols[s] = base + uint32(len(b)) + w.symbols[s] = base + uint32(len(w.b)) n := binary.PutUvarint(buf[:], uint64(len(s))) - b = append(b, buf[:n]...) - b = append(b, s...) + w.b = append(w.b, buf[:n]...) + w.b = append(w.b, s...) } - l := uint32(len(b)) - - return w.section(l, flagStd, func(wr io.Writer) error { - return w.write(wr, b) + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) } +type indexWriterSeriesSlice []*indexWriterSeries + +func (s indexWriterSeriesSlice) Len() int { return len(s) } +func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s indexWriterSeriesSlice) Less(i, j int) bool { + return labels.Compare(s[i].labels, s[j].labels) < 0 +} + func (w *indexWriter) writeSeries() error { // Series must be stored sorted along their labels. - series := make([]*indexWriterSeries, 0, len(w.series)) + series := make(indexWriterSeriesSlice, 0, len(w.series)) for _, s := range w.series { series = append(series, s) } - slice.Sort(series, func(i, j int) bool { - return labels.Compare(series[i].labels, series[j].labels) < 0 - }) + sort.Sort(series) // Current end of file plus 5 bytes for section header. // TODO(fabxc): switch to relative offsets. base := uint32(w.n) + 5 - b := make([]byte, 0, 1<<20) // 1MiB + w.b = w.b[:0] buf := make([]byte, binary.MaxVarintLen64) for _, s := range series { // Write label set symbol references. - s.offset = base + uint32(len(b)) + s.offset = base + uint32(len(w.b)) n := binary.PutUvarint(buf, uint64(len(s.labels))) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) for _, l := range s.labels { n = binary.PutUvarint(buf, uint64(w.symbols[l.Name])) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) n = binary.PutUvarint(buf, uint64(w.symbols[l.Value])) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) } // Write chunks meta data including reference into chunk file. n = binary.PutUvarint(buf, uint64(len(s.chunks))) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) for _, c := range s.chunks { n = binary.PutVarint(buf, c.MinTime) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) n = binary.PutVarint(buf, c.MaxTime) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) n = binary.PutUvarint(buf, uint64(c.Ref)) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) } } - l := uint32(len(b)) - - return w.section(l, flagStd, func(wr io.Writer) error { - return w.write(wr, b) + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) } @@ -467,7 +479,7 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { buf := make([]byte, binary.MaxVarintLen32) n := binary.PutUvarint(buf, uint64(len(names))) - l := uint32(n) + uint32(len(values)*4) + l := n + len(values)*4 return w.section(l, flagStd, func(wr io.Writer) error { // First byte indicates tuple size for index. @@ -500,13 +512,10 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { offset: uint32(w.n), }) - b := make([]byte, 0, 4096) - buf := [4]byte{} - // Order of the references in the postings list does not imply order // of the series references within the persisted block they are mapped to. // We have to sort the new references again. - var refs []uint32 + refs := w.uint32s[:0] for it.Next() { s, ok := w.series[it.At()] @@ -519,38 +528,49 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { return err } - slice.Sort(refs, func(i, j int) bool { return refs[i] < refs[j] }) + sort.Sort(uint32slice(refs)) + + w.b = w.b[:0] + buf := make([]byte, 4) for _, r := range refs { - binary.BigEndian.PutUint32(buf[:], r) - b = append(b, buf[:]...) + binary.BigEndian.PutUint32(buf, r) + w.b = append(w.b, buf...) } - return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error { - return w.write(wr, b) + w.uint32s = refs[:0] + + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) } +type uint32slice []uint32 + +func (s uint32slice) Len() int { return len(s) } +func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } + type hashEntry struct { name string offset uint32 } func (w *indexWriter) writeHashmap(h []hashEntry) error { - b := make([]byte, 0, 4096) + w.b = w.b[:0] buf := [binary.MaxVarintLen32]byte{} for _, e := range h { n := binary.PutUvarint(buf[:], uint64(len(e.name))) - b = append(b, buf[:n]...) - b = append(b, e.name...) + w.b = append(w.b, buf[:n]...) + w.b = append(w.b, e.name...) n = binary.PutUvarint(buf[:], uint64(e.offset)) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) } - return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error { - return w.write(wr, b) + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) } diff --git a/vendor/vendor.json b/vendor/vendor.json index 54bfb885f..e183acfcf 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -368,10 +368,10 @@ "revisionTime": "2016-09-30T00:14:02Z" }, { - "checksumSHA1": "E5C5z6CV6JeIA2cpT3KVWeFgZdM=", + "checksumSHA1": "IOnF9CNVjOBoVwdfzfUEv/+JotI=", "path": "github.com/fabxc/tsdb", - "revision": "2c3b56350a6d75a15484494c5a87145828cb34ef", - "revisionTime": "2017-03-01T16:19:57Z" + "revision": "55a9b5428aceb644b3b297d7a9fd63d0354ce953", + "revisionTime": "2017-03-04T15:50:48Z" }, { "checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=",