diff --git a/db.go b/db.go index 9877205fa..01efc1e52 100644 --- a/db.go +++ b/db.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "time" "unsafe" "golang.org/x/sync/errgroup" @@ -33,8 +34,9 @@ var DefaultOptions = &Options{ // Options of the DB storage. type Options struct { - Retention int64 - DisableWAL bool + Retention int64 + DisableWAL bool + WALFlushInterval time.Duration } // Appender allows committing batches of samples to a database. @@ -76,6 +78,7 @@ type DB struct { compactor *compactor compactc chan struct{} + cutc chan struct{} donec chan struct{} stopc chan struct{} } @@ -132,6 +135,7 @@ func Open(dir string, logger log.Logger) (db *DB, err error) { logger: logger, metrics: newDBMetrics(nil), compactc: make(chan struct{}, 1), + cutc: make(chan struct{}, 1), donec: make(chan struct{}), stopc: make(chan struct{}), } @@ -153,6 +157,25 @@ func (db *DB) run() { for { select { + case <-db.cutc: + db.mtx.Lock() + err := db.cut() + db.mtx.Unlock() + + if err != nil { + db.logger.Log("msg", "cut failed", "err", err) + } else { + select { + case db.compactc <- struct{}{}: + default: + } + } + // Drain cut channel so we don't trigger immediately again. + select { + case <-db.cutc: + default: + } + case <-db.compactc: db.metrics.compactionsTriggered.Inc() @@ -177,6 +200,7 @@ func (db *DB) run() { db.logger.Log("msg", "compaction failed", "err", err) } } + case <-db.stopc: return } @@ -244,7 +268,7 @@ func (db *DB) initBlocks() error { dir := filepath.Join(db.dir, fi.Name()) if fileutil.Exist(filepath.Join(dir, walFileName)) { - h, err := OpenHeadBlock(dir) + h, err := OpenHeadBlock(dir, db.logger) if err != nil { return err } @@ -323,13 +347,9 @@ func (db *DB) appendBatch(samples []hashedSample) error { // TODO(fabxc): randomize over time and use better scoring function. if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 250 { - if err := db.cut(); err != nil { - db.logger.Log("msg", "cut failed", "err", err) - } else { - select { - case db.compactc <- struct{}{}: - default: - } + select { + case db.cutc <- struct{}{}: + default: } } @@ -460,22 +480,22 @@ const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (p *DB) cut() error { - dir, err := p.nextBlockDir() +func (db *DB) cut() error { + dir, err := db.nextBlockDir() if err != nil { return err } - newHead, err := OpenHeadBlock(dir) + newHead, err := OpenHeadBlock(dir, db.logger) if err != nil { return err } - p.heads = append(p.heads, newHead) + db.heads = append(db.heads, newHead) return nil } -func (p *DB) nextBlockDir() (string, error) { - names, err := fileutil.ReadDir(p.dir) +func (db *DB) nextBlockDir() (string, error) { + names, err := fileutil.ReadDir(db.dir) if err != nil { return "", err } @@ -491,7 +511,7 @@ func (p *DB) nextBlockDir() (string, error) { } i = j } - return filepath.Join(p.dir, fmt.Sprintf("b-%0.6d", i+1)), nil + return filepath.Join(db.dir, fmt.Sprintf("b-%0.6d", i+1)), nil } // chunkDesc wraps a plain data chunk and provides cached meta data about it. diff --git a/head.go b/head.go index b09c55872..ffe8541eb 100644 --- a/head.go +++ b/head.go @@ -5,10 +5,12 @@ import ( "math" "sort" "sync" + "time" "github.com/bradfitz/slice" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" + "github.com/go-kit/kit/log" ) // HeadBlock handles reads and writes of time series data within a time window. @@ -35,8 +37,8 @@ type HeadBlock struct { } // OpenHeadBlock creates a new empty head block. -func OpenHeadBlock(dir string) (*HeadBlock, error) { - wal, err := OpenWAL(dir) +func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { + wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 15*time.Second) if err != nil { return nil, err } diff --git a/wal.go b/wal.go index 796fbad1c..0132f05e5 100644 --- a/wal.go +++ b/wal.go @@ -7,10 +7,12 @@ import ( "math" "os" "path/filepath" + "time" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/ioutil" "github.com/fabxc/tsdb/labels" + "github.com/go-kit/kit/log" "github.com/pkg/errors" ) @@ -27,8 +29,13 @@ const ( // WAL is a write ahead log for series data. It can only be written to. // Use WALReader to read back from a write ahead log. type WAL struct { - f *fileutil.LockedFile - enc *walEncoder + f *fileutil.LockedFile + enc *walEncoder + logger log.Logger + flushInterval time.Duration + + stopc chan struct{} + donec chan struct{} symbols map[string]uint32 } @@ -37,7 +44,7 @@ const walFileName = "wal-000" // OpenWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. -func OpenWAL(dir string) (*WAL, error) { +func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error) { if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } @@ -64,10 +71,16 @@ func OpenWAL(dir string) (*WAL, error) { } w := &WAL{ - f: f, - enc: enc, - symbols: map[string]uint32{}, + f: f, + logger: l, + enc: enc, + flushInterval: flushInterval, + symbols: map[string]uint32{}, + donec: make(chan struct{}), + stopc: make(chan struct{}), } + go w.run(flushInterval) + return w, nil } @@ -101,6 +114,9 @@ func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error { if err := w.enc.encodeSamples(samples); err != nil { return err } + if w.flushInterval <= 0 { + return w.sync() + } return nil } @@ -111,8 +127,33 @@ func (w *WAL) sync() error { return fileutil.Fdatasync(w.f.File) } +func (w *WAL) run(interval time.Duration) { + var tick <-chan time.Time + + if interval > 0 { + ticker := time.NewTicker(interval) + defer ticker.Stop() + tick = ticker.C + } + defer close(w.donec) + + for { + select { + case <-w.stopc: + return + case <-tick: + if err := w.sync(); err != nil { + w.logger.Log("msg", "sync failed", "err", err) + } + } + } +} + // Close sync all data and closes the underlying resources. func (w *WAL) Close() error { + close(w.stopc) + <-w.donec + if err := w.sync(); err != nil { return err }