Periodically fsync WAL, make head cut async

This commit is contained in:
Fabian Reinartz 2017-01-06 15:18:06 +01:00
parent c61b310210
commit 71efd2e08d
3 changed files with 88 additions and 25 deletions

54
db.go
View file

@ -12,6 +12,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"unsafe" "unsafe"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -33,8 +34,9 @@ var DefaultOptions = &Options{
// Options of the DB storage. // Options of the DB storage.
type Options struct { type Options struct {
Retention int64 Retention int64
DisableWAL bool DisableWAL bool
WALFlushInterval time.Duration
} }
// Appender allows committing batches of samples to a database. // Appender allows committing batches of samples to a database.
@ -76,6 +78,7 @@ type DB struct {
compactor *compactor compactor *compactor
compactc chan struct{} compactc chan struct{}
cutc chan struct{}
donec chan struct{} donec chan struct{}
stopc chan struct{} stopc chan struct{}
} }
@ -132,6 +135,7 @@ func Open(dir string, logger log.Logger) (db *DB, err error) {
logger: logger, logger: logger,
metrics: newDBMetrics(nil), metrics: newDBMetrics(nil),
compactc: make(chan struct{}, 1), compactc: make(chan struct{}, 1),
cutc: make(chan struct{}, 1),
donec: make(chan struct{}), donec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
} }
@ -153,6 +157,25 @@ func (db *DB) run() {
for { for {
select { select {
case <-db.cutc:
db.mtx.Lock()
err := db.cut()
db.mtx.Unlock()
if err != nil {
db.logger.Log("msg", "cut failed", "err", err)
} else {
select {
case db.compactc <- struct{}{}:
default:
}
}
// Drain cut channel so we don't trigger immediately again.
select {
case <-db.cutc:
default:
}
case <-db.compactc: case <-db.compactc:
db.metrics.compactionsTriggered.Inc() db.metrics.compactionsTriggered.Inc()
@ -177,6 +200,7 @@ func (db *DB) run() {
db.logger.Log("msg", "compaction failed", "err", err) db.logger.Log("msg", "compaction failed", "err", err)
} }
} }
case <-db.stopc: case <-db.stopc:
return return
} }
@ -244,7 +268,7 @@ func (db *DB) initBlocks() error {
dir := filepath.Join(db.dir, fi.Name()) dir := filepath.Join(db.dir, fi.Name())
if fileutil.Exist(filepath.Join(dir, walFileName)) { if fileutil.Exist(filepath.Join(dir, walFileName)) {
h, err := OpenHeadBlock(dir) h, err := OpenHeadBlock(dir, db.logger)
if err != nil { if err != nil {
return err return err
} }
@ -323,13 +347,9 @@ func (db *DB) appendBatch(samples []hashedSample) error {
// TODO(fabxc): randomize over time and use better scoring function. // TODO(fabxc): randomize over time and use better scoring function.
if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 250 { if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 250 {
if err := db.cut(); err != nil { select {
db.logger.Log("msg", "cut failed", "err", err) case db.cutc <- struct{}{}:
} else { default:
select {
case db.compactc <- struct{}{}:
default:
}
} }
} }
@ -460,22 +480,22 @@ const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale
// cut starts a new head block to append to. The completed head block // cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period. // will still be appendable for the configured grace period.
func (p *DB) cut() error { func (db *DB) cut() error {
dir, err := p.nextBlockDir() dir, err := db.nextBlockDir()
if err != nil { if err != nil {
return err return err
} }
newHead, err := OpenHeadBlock(dir) newHead, err := OpenHeadBlock(dir, db.logger)
if err != nil { if err != nil {
return err return err
} }
p.heads = append(p.heads, newHead) db.heads = append(db.heads, newHead)
return nil return nil
} }
func (p *DB) nextBlockDir() (string, error) { func (db *DB) nextBlockDir() (string, error) {
names, err := fileutil.ReadDir(p.dir) names, err := fileutil.ReadDir(db.dir)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -491,7 +511,7 @@ func (p *DB) nextBlockDir() (string, error) {
} }
i = j i = j
} }
return filepath.Join(p.dir, fmt.Sprintf("b-%0.6d", i+1)), nil return filepath.Join(db.dir, fmt.Sprintf("b-%0.6d", i+1)), nil
} }
// chunkDesc wraps a plain data chunk and provides cached meta data about it. // chunkDesc wraps a plain data chunk and provides cached meta data about it.

View file

@ -5,10 +5,12 @@ import (
"math" "math"
"sort" "sort"
"sync" "sync"
"time"
"github.com/bradfitz/slice" "github.com/bradfitz/slice"
"github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log"
) )
// HeadBlock handles reads and writes of time series data within a time window. // HeadBlock handles reads and writes of time series data within a time window.
@ -35,8 +37,8 @@ type HeadBlock struct {
} }
// OpenHeadBlock creates a new empty head block. // OpenHeadBlock creates a new empty head block.
func OpenHeadBlock(dir string) (*HeadBlock, error) { func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
wal, err := OpenWAL(dir) wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 15*time.Second)
if err != nil { if err != nil {
return nil, err return nil, err
} }

53
wal.go
View file

@ -7,10 +7,12 @@ import (
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
"time"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/ioutil" "github.com/coreos/etcd/pkg/ioutil"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -27,8 +29,13 @@ const (
// WAL is a write ahead log for series data. It can only be written to. // WAL is a write ahead log for series data. It can only be written to.
// Use WALReader to read back from a write ahead log. // Use WALReader to read back from a write ahead log.
type WAL struct { type WAL struct {
f *fileutil.LockedFile f *fileutil.LockedFile
enc *walEncoder enc *walEncoder
logger log.Logger
flushInterval time.Duration
stopc chan struct{}
donec chan struct{}
symbols map[string]uint32 symbols map[string]uint32
} }
@ -37,7 +44,7 @@ const walFileName = "wal-000"
// OpenWAL opens or creates a write ahead log in the given directory. // OpenWAL opens or creates a write ahead log in the given directory.
// The WAL must be read completely before new data is written. // The WAL must be read completely before new data is written.
func OpenWAL(dir string) (*WAL, error) { func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error) {
if err := os.MkdirAll(dir, 0777); err != nil { if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err return nil, err
} }
@ -64,10 +71,16 @@ func OpenWAL(dir string) (*WAL, error) {
} }
w := &WAL{ w := &WAL{
f: f, f: f,
enc: enc, logger: l,
symbols: map[string]uint32{}, enc: enc,
flushInterval: flushInterval,
symbols: map[string]uint32{},
donec: make(chan struct{}),
stopc: make(chan struct{}),
} }
go w.run(flushInterval)
return w, nil return w, nil
} }
@ -101,6 +114,9 @@ func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error {
if err := w.enc.encodeSamples(samples); err != nil { if err := w.enc.encodeSamples(samples); err != nil {
return err return err
} }
if w.flushInterval <= 0 {
return w.sync()
}
return nil return nil
} }
@ -111,8 +127,33 @@ func (w *WAL) sync() error {
return fileutil.Fdatasync(w.f.File) return fileutil.Fdatasync(w.f.File)
} }
func (w *WAL) run(interval time.Duration) {
var tick <-chan time.Time
if interval > 0 {
ticker := time.NewTicker(interval)
defer ticker.Stop()
tick = ticker.C
}
defer close(w.donec)
for {
select {
case <-w.stopc:
return
case <-tick:
if err := w.sync(); err != nil {
w.logger.Log("msg", "sync failed", "err", err)
}
}
}
}
// Close sync all data and closes the underlying resources. // Close sync all data and closes the underlying resources.
func (w *WAL) Close() error { func (w *WAL) Close() error {
close(w.stopc)
<-w.donec
if err := w.sync(); err != nil { if err := w.sync(); err != nil {
return err return err
} }