diff --git a/block.go b/block.go index f18cadd609..7d7cbe86dd 100644 --- a/block.go +++ b/block.go @@ -17,6 +17,7 @@ type Block interface { Index() IndexReader Series() SeriesReader Persisted() bool + Close() error } // BlockStats provides stats on a data block. diff --git a/compact.go b/compact.go index bacfca2aae..a407622454 100644 --- a/compact.go +++ b/compact.go @@ -13,7 +13,7 @@ import ( type compactor struct { metrics *compactorMetrics - blocks compactableBlocks + opts *compactorOptions } type compactorMetrics struct { @@ -48,49 +48,42 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { return m } -type blockStore interface { - blocks() []Block +type compactorOptions struct { + maxBlocks uint8 + maxBlockRange uint64 + maxSize uint64 } -type compactableBlocks interface { - compactable() []Block -} - -func newCompactor(blocks compactableBlocks, r prometheus.Registerer) (*compactor, error) { - c := &compactor{ - blocks: blocks, +func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor { + return &compactor{ + opts: opts, metrics: newCompactorMetrics(r), } - - return c, nil } -const ( - compactionMaxSize = 1 << 30 // 1GB - compactionBlocks = 2 -) - -func (c *compactor) pick() []Block { - bs := c.blocks.compactable() +// pick returns a range [i, j] in the blocks that are suitable to be compacted +// into a single block at position i. +func (c *compactor) pick(bs []Block) (i, j int, ok bool) { + last := len(bs) - 1 if len(bs) == 0 { - return nil + return 0, 0, false } - if len(bs) == 1 && !bs[0].Persisted() { - return bs - } - if !bs[0].Persisted() { - if len(bs) == 2 || !compactionMatch(bs[:3]) { - return bs[:1] + + // Make sure we always compact the last block if unpersisted. + if !bs[last].Persisted() { + if len(bs) >= 3 && compactionMatch(bs[last-2:last+1]) { + return last - 2, last, true } + return last, last, true } for i := 0; i+2 < len(bs); i += 3 { tpl := bs[i : i+3] if compactionMatch(tpl) { - return tpl + return i, i + 2, true } } - return nil + return 0, 0, false } func compactionMatch(blocks []Block) bool { @@ -106,7 +99,7 @@ func compactionMatch(blocks []Block) bool { for _, b := range blocks[1:] { m := float64(b.Stats().SampleCount) - if m < 0.8*n || m > 1.2*n { + if m < 0.7*n || m > 1.3*n { return false } t += m @@ -184,6 +177,7 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWriter) error { var set compactionSet + for i, b := range blocks { all, err := b.Index().Postings("", "") if err != nil { diff --git a/db.go b/db.go index 8290dcb478..977dc59065 100644 --- a/db.go +++ b/db.go @@ -27,15 +27,14 @@ import ( // DefaultOptions used for the DB. They are sane for setups using // millisecond precision timestampdb. var DefaultOptions = &Options{ - Retention: 15 * 24 * 3600 * 1000, // 15 days - DisableWAL: false, + WALFlushInterval: 5 * time.Second, + MaxBlockRange: 24 * 60 * 60 * 1000, // 1 day in milliseconds } // Options of the DB storage. type Options struct { - Retention int64 - DisableWAL bool WALFlushInterval time.Duration + MaxBlockRange uint64 } // Appender allows appending a batch of data. It must be completed with a @@ -66,6 +65,7 @@ type DB struct { dir string logger log.Logger metrics *dbMetrics + opts *Options mtx sync.RWMutex persisted []*persistedBlock @@ -107,8 +107,7 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { } // Open returns a new DB in the given directory. -func Open(dir string, logger log.Logger) (db *DB, err error) { - // Create directory if partition is new. +func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { if !fileutil.Exist(dir) { if err := os.MkdirAll(dir, 0777); err != nil { return nil, err @@ -117,22 +116,29 @@ func Open(dir string, logger log.Logger) (db *DB, err error) { var r prometheus.Registerer // r := prometheus.DefaultRegisterer + if opts == nil { + opts = DefaultOptions + } + db = &DB{ dir: dir, logger: logger, metrics: newDBMetrics(r), + opts: opts, compactc: make(chan struct{}, 1), cutc: make(chan struct{}, 1), donec: make(chan struct{}), stopc: make(chan struct{}), } + db.compactor = newCompactor(r, &compactorOptions{ + maxBlockRange: opts.MaxBlockRange, + maxBlocks: 3, + maxSize: 1 << 29, // 512MB + }) if err := db.initBlocks(); err != nil { return nil, err } - if db.compactor, err = newCompactor(db, r); err != nil { - return nil, err - } go db.run() @@ -166,26 +172,18 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - for { - blocks := db.compactor.pick() - if len(blocks) == 0 { - break - } - // TODO(fabxc): pick emits blocks in order. compact acts on - // inverted order. Put inversion into compactor? - var bs []Block - for _, b := range blocks { - bs = append([]Block{b}, bs...) - } - - select { - case <-db.stopc: - return - default: - } - if err := db.compact(bs); err != nil { - db.logger.Log("msg", "compaction failed", "err", err) - } + i, j, ok := db.compactor.pick(db.compactable()) + if !ok { + continue + } + if err := db.compact(i, j); err != nil { + db.logger.Log("msg", "compaction failed", "err", err) + continue + } + // Trigger another compaction in case there's more work to do. + select { + case db.compactc <- struct{}{}: + default: } case <-db.stopc: @@ -194,41 +192,80 @@ func (db *DB) run() { } } -func (db *DB) compact(blocks []Block) error { - if len(blocks) == 0 { - return nil +func (db *DB) getBlock(i int) Block { + if i < len(db.persisted) { + return db.persisted[i] } - tmpdir := blocks[0].Dir() + ".tmp" + return db.heads[i-len(db.persisted)] +} - // TODO(fabxc): find a better place to do this transparently. - for _, b := range blocks { - if h, ok := b.(*headBlock); ok { - h.updateMapping() +// removeBlocks removes the blocks in range [i, j] from the list of persisted +// and head blocks. The blocks are not closed and their files not deleted. +func (db *DB) removeBlocks(i, j int) { + for k := i; k <= j; k++ { + if i < len(db.persisted) { + db.persisted = append(db.persisted[:i], db.persisted[i+1:]...) + } else { + l := i - len(db.persisted) + db.heads = append(db.heads[:l], db.heads[l+1:]...) } } +} + +func (db *DB) blocks() (bs []Block) { + for _, b := range db.persisted { + bs = append(bs, b) + } + for _, b := range db.heads { + bs = append(bs, b) + } + return bs +} + +func (db *DB) compact(i, j int) error { + if j < i { + return errors.New("invalid compaction block range") + } + var blocks []Block + for k := i; k <= j; k++ { + blocks = append(blocks, db.getBlock(k)) + } + var ( + dir = blocks[0].Dir() + tmpdir = dir + ".tmp" + ) if err := db.compactor.compact(tmpdir, blocks...); err != nil { return err } + pb, err := newPersistedBlock(tmpdir) + if err != nil { + return err + } + db.mtx.Lock() defer db.mtx.Unlock() - if err := renameDir(tmpdir, blocks[0].Dir()); err != nil { + if err := renameDir(tmpdir, dir); err != nil { return errors.Wrap(err, "rename dir") } - for _, b := range blocks[1:] { - if err := os.RemoveAll(b.Dir()); err != nil { - return errors.Wrap(err, "delete dir") + pb.dir = dir + + db.removeBlocks(i, j) + db.persisted = append(db.persisted, pb) + + for i, b := range blocks { + if err := b.Close(); err != nil { + return errors.Wrap(err, "close old block") + } + if i > 0 { + if err := os.RemoveAll(b.Dir()); err != nil { + return errors.Wrap(err, "removing old block") + } } } - - var merr MultiError - - for _, b := range blocks { - merr.Add(errors.Wrapf(db.reinit(b.Dir()), "reinit block at %q", b.Dir())) - } - return merr.Err() + return nil } func isBlockDir(fi os.FileInfo) bool { @@ -244,23 +281,33 @@ func isBlockDir(fi os.FileInfo) bool { return true } +func blockDirs(dir string) ([]string, error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + var dirs []string + + for _, fi := range files { + if isBlockDir(fi) { + dirs = append(dirs, filepath.Join(dir, fi.Name())) + } + } + return dirs, nil +} + func (db *DB) initBlocks() error { var ( - pbs []*persistedBlock - heads []*headBlock + persisted []*persistedBlock + heads []*headBlock ) - files, err := ioutil.ReadDir(db.dir) + dirs, err := blockDirs(db.dir) if err != nil { return err } - for _, fi := range files { - if !isBlockDir(fi) { - continue - } - dir := filepath.Join(db.dir, fi.Name()) - + for _, dir := range dirs { if fileutil.Exist(filepath.Join(dir, walFileName)) { h, err := openHeadBlock(dir, db.logger) if err != nil { @@ -269,31 +316,14 @@ func (db *DB) initBlocks() error { heads = append(heads, h) continue } - b, err := newPersistedBlock(dir) if err != nil { return err } - pbs = append(pbs, b) + persisted = append(persisted, b) } - // Validate that blocks are sequential in time. - lastTime := int64(math.MinInt64) - - for _, b := range pbs { - if b.Stats().MinTime < lastTime { - return errors.Errorf("illegal order for block at %q", b.Dir()) - } - lastTime = b.Stats().MaxTime - } - for _, b := range heads { - if b.Stats().MinTime < lastTime { - return errors.Errorf("illegal order for block at %q", b.Dir()) - } - lastTime = b.Stats().MaxTime - } - - db.persisted = pbs + db.persisted = persisted db.heads = heads if len(heads) == 0 { @@ -322,6 +352,7 @@ func (db *DB) Close() error { return merr.Err() } +// Appender returns a new Appender on the database. func (db *DB) Appender() Appender { db.mtx.RLock() @@ -403,68 +434,16 @@ func (db *DB) persistedForDir(dir string) (int, bool) { return -1, false } -func (db *DB) reinit(dir string) error { - if !fileutil.Exist(dir) { - if i, ok := db.headForDir(dir); ok { - if err := db.heads[i].Close(); err != nil { - return err - } - db.heads = append(db.heads[:i], db.heads[i+1:]...) - } - if i, ok := db.persistedForDir(dir); ok { - if err := db.persisted[i].Close(); err != nil { - return err - } - db.persisted = append(db.persisted[:i], db.persisted[i+1:]...) - } - return nil - } - - // Remove a previous head block. - if i, ok := db.headForDir(dir); ok { - if err := db.heads[i].Close(); err != nil { - return err - } - db.heads = append(db.heads[:i], db.heads[i+1:]...) - } - // Close an old persisted block. - i, ok := db.persistedForDir(dir) - if ok { - if err := db.persisted[i].Close(); err != nil { - return err - } - } - pb, err := newPersistedBlock(dir) - if err != nil { - return errors.Wrap(err, "open persisted block") - } - if i >= 0 { - db.persisted[i] = pb - } else { - db.persisted = append(db.persisted, pb) - } - - return nil -} - func (db *DB) compactable() []Block { db.mtx.RLock() defer db.mtx.RUnlock() var blocks []Block for _, pb := range db.persisted { - blocks = append([]Block{pb}, blocks...) + blocks = append(blocks, pb) } - - // threshold := db.heads[len(db.heads)-1].bstatdb.MaxTime - headGracePeriod - - // for _, hb := range db.heads { - // if hb.bstatdb.MaxTime < threshold { - // blocks = append(blocks, hb) - // } - // } for _, hb := range db.heads[:len(db.heads)-1] { - blocks = append([]Block{hb}, blocks...) + blocks = append(blocks, hb) } return blocks @@ -505,9 +484,6 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { return bs } -// TODO(fabxc): make configurable. -const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale - // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. func (db *DB) cut() error { @@ -548,7 +524,6 @@ func (db *DB) nextBlockDir() (string, error) { // PartitionedDB is a time series storage. type PartitionedDB struct { logger log.Logger - opts *Options dir string partitionPow uint @@ -577,7 +552,6 @@ func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*Partition } c := &PartitionedDB{ logger: l, - opts: opts, dir: dir, partitionPow: uint(math.Log2(float64(n))), } @@ -589,7 +563,7 @@ func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*Partition l := log.NewContext(l).With("partition", i) d := partitionDir(dir, i) - s, err := Open(d, l) + s, err := Open(d, l, opts) if err != nil { return nil, fmt.Errorf("initializing partition %q failed: %s", d, err) }