Break out WAL into segment files

This commit is contained in:
Fabian Reinartz 2017-02-13 23:53:19 -08:00
parent 987a90d149
commit 79944a5912
3 changed files with 206 additions and 96 deletions

View file

@ -91,10 +91,10 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
dir := filepath.Join(b.outPath, "storage")
st, err := tsdb.OpenPartitioned(dir, 1, nil, &tsdb.Options{
WALFlushInterval: 5 * time.Second,
WALFlushInterval: 200 * time.Millisecond,
RetentionDuration: 1 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
MinBlockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds
MaxBlockDuration: 16 * 60 * 60 * 1000, // 1 days in milliseconds
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
AppendableBlocks: 2,
})
if err != nil {
@ -128,7 +128,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
dur := measureTime("ingestScrapes", func() {
b.startProfiling()
total, err = b.ingestScrapes(metrics, 4000)
total, err = b.ingestScrapes(metrics, 3000)
if err != nil {
exitWithError(err)
}

31
db.go
View file

@ -334,7 +334,7 @@ func (db *DB) initBlocks() error {
}
for _, dir := range dirs {
if fileutil.Exist(filepath.Join(dir, walFileName)) {
if fileutil.Exist(filepath.Join(dir, walDirName)) {
h, err := openHeadBlock(dir, db.logger)
if err != nil {
return err
@ -601,7 +601,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
func (db *DB) cut(mint int64) (*headBlock, error) {
maxt := mint + int64(db.opts.MinBlockDuration)
dir, seq, err := nextSequenceDir(db.dir, "b-")
dir, seq, err := nextSequenceFile(db.dir, "b-")
if err != nil {
return nil, err
}
@ -651,7 +651,32 @@ func blockDirs(dir string) ([]string, error) {
return dirs, nil
}
func nextSequenceDir(dir, prefix string) (string, int, error) {
func sequenceFiles(dir, prefix string) ([]string, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
var res []string
for _, fi := range files {
if isSequenceFile(fi, prefix) {
res = append(res, filepath.Join(dir, fi.Name()))
}
}
return res, nil
}
func isSequenceFile(fi os.FileInfo, prefix string) bool {
if !strings.HasPrefix(fi.Name(), prefix) {
return false
}
if _, err := strconv.ParseUint(fi.Name()[len(prefix):], 10, 32); err != nil {
return false
}
return true
}
func nextSequenceFile(dir, prefix string) (string, int, error) {
names, err := fileutil.ReadDir(dir)
if err != nil {
return "", 0, err

229
wal.go
View file

@ -3,6 +3,7 @@ package tsdb
import (
"bufio"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"math"
@ -32,56 +33,54 @@ const (
type WAL struct {
mtx sync.Mutex
f *fileutil.LockedFile
enc *walEncoder
dirFile *os.File
files []*fileutil.LockedFile
logger log.Logger
flushInterval time.Duration
cur *bufio.Writer
curN int
stopc chan struct{}
donec chan struct{}
symbols map[string]uint32
}
const walFileName = "wal-000"
const (
walDirName = "wal"
walSegmentSizeBytes = 64 * 1000 * 1000 // 64 MB
)
// OpenWAL opens or creates a write ahead log in the given directory.
// The WAL must be read completely before new data is written.
func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error) {
dir = filepath.Join(dir, walDirName)
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
p := filepath.Join(dir, walFileName)
f, err := fileutil.TryLockFile(p, os.O_RDWR, 0666)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
f, err = fileutil.LockFile(p, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return nil, err
}
if _, err = f.Seek(0, os.SEEK_END); err != nil {
return nil, err
}
}
enc, err := newWALEncoder(f.File)
df, err := fileutil.OpenDir(dir)
if err != nil {
return nil, err
}
w := &WAL{
f: f,
dirFile: df,
logger: l,
enc: enc,
flushInterval: flushInterval,
symbols: map[string]uint32{},
donec: make(chan struct{}),
stopc: make(chan struct{}),
}
if err := w.initSegments(); err != nil {
return nil, err
}
// If there are no existing segments yet, create the initial one.
if len(w.files) == 0 {
if err := w.cut(); err != nil {
return nil, err
}
}
go w.run(flushInterval)
return w, nil
@ -94,40 +93,129 @@ type walHandler struct {
// ReadAll consumes all entries in the WAL and triggers the registered handlers.
func (w *WAL) ReadAll(h *walHandler) error {
dec := &walDecoder{
r: w.f,
handler: h,
}
fmt.Println("readall", w.files)
for _, f := range w.files {
fmt.Println(" ", f.Name())
dec := newWALDecoder(f, h)
for {
if err := dec.entry(); err != nil {
if err == io.EOF {
return nil
// If file end was reached, move on to the next segment.
break
}
return err
}
}
}
return nil
}
// Log writes a batch of new series labels and samples to the log.
func (w *WAL) Log(series []labels.Labels, samples []refdSample) error {
if err := w.enc.encodeSeries(series); err != nil {
if err := w.encodeSeries(series); err != nil {
return err
}
if err := w.enc.encodeSamples(samples); err != nil {
if err := w.encodeSamples(samples); err != nil {
return err
}
if w.flushInterval <= 0 {
return w.sync()
return w.Sync()
}
return nil
}
func (w *WAL) sync() error {
if err := w.enc.flush(); err != nil {
// initSegments finds all existing segment files and opens them in the
// appropriate file modes.
func (w *WAL) initSegments() error {
fns, err := sequenceFiles(w.dirFile.Name(), "")
if err != nil {
return err
}
return fileutil.Fdatasync(w.f.File)
if len(fns) == 0 {
return nil
}
if len(fns) > 1 {
for _, fn := range fns[:len(fns)-2] {
lf, err := fileutil.TryLockFile(fn, os.O_RDONLY, 0666)
if err != nil {
return err
}
w.files = append(w.files, lf)
}
}
// The most recent WAL file is the one we have to keep appending to.
lf, err := fileutil.TryLockFile(fns[len(fns)-1], os.O_RDWR, 0666)
if err != nil {
return err
}
w.files = append(w.files, lf)
return nil
}
// cut finishes the currently active segments and open the next one.
// The encoder is reset to point to the new segment.
func (w *WAL) cut() error {
// If there's a previous segment, truncate it to its final size
// and sync everything to disc.
if tf := w.tail(); tf != nil {
off, err := tf.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if err := tf.Truncate(off); err != nil {
return err
}
if err := w.sync(); err != nil {
return err
}
}
p, _, err := nextSequenceFile(w.dirFile.Name(), "")
if err != nil {
return err
}
f, err := fileutil.LockFile(p, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return err
}
if _, err = f.Seek(0, os.SEEK_SET); err != nil {
return err
}
if err = fileutil.Preallocate(f.File, walSegmentSizeBytes, true); err != nil {
return err
}
if err = w.dirFile.Sync(); err != nil {
return err
}
w.files = append(w.files, f)
w.cur = bufio.NewWriterSize(f, 4*1024*1024)
w.curN = 0
return nil
}
func (w *WAL) tail() *fileutil.LockedFile {
if len(w.files) == 0 {
return nil
}
return w.files[len(w.files)-1]
}
func (w *WAL) Sync() error {
w.mtx.Lock()
defer w.mtx.Unlock()
return w.sync()
}
func (w *WAL) sync() error {
if err := w.cur.Flush(); err != nil {
return err
}
return fileutil.Fdatasync(w.tail().File)
}
func (w *WAL) run(interval time.Duration) {
@ -145,7 +233,7 @@ func (w *WAL) run(interval time.Duration) {
case <-w.stopc:
return
case <-tick:
if err := w.sync(); err != nil {
if err := w.Sync(); err != nil {
w.logger.Log("msg", "sync failed", "err", err)
}
}
@ -157,16 +245,18 @@ func (w *WAL) Close() error {
close(w.stopc)
<-w.donec
w.mtx.Lock()
defer w.mtx.Unlock()
var merr MultiError
if err := w.sync(); err != nil {
return err
}
return w.f.Close()
for _, f := range w.files {
merr.Add(f.Close())
}
type walEncoder struct {
mtx sync.Mutex
// w *ioutil.PageWriter
w *bufio.Writer
return merr.Err()
}
const (
@ -178,31 +268,20 @@ const (
walPageBytes = 16 * minSectorSize
)
func newWALEncoder(f *os.File) (*walEncoder, error) {
// offset, err := f.Seek(0, os.SEEK_CUR)
// if err != nil {
// return nil, err
// }
enc := &walEncoder{
// w: ioutil.NewPageWriter(f, walPageBytes, int(offset)),
w: bufio.NewWriterSize(f, 4*1024*1024),
}
return enc, nil
}
func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error {
w.mtx.Lock()
defer w.mtx.Unlock()
func (e *walEncoder) flush() error {
e.mtx.Lock()
defer e.mtx.Unlock()
sz := 6 + 4 + len(buf)
return e.w.Flush()
if w.curN+sz > walSegmentSizeBytes {
if err := w.cut(); err != nil {
return err
}
}
func (e *walEncoder) entry(et WALEntryType, flag byte, buf []byte) error {
e.mtx.Lock()
defer e.mtx.Unlock()
h := crc32.NewIEEE()
w := io.MultiWriter(h, e.w)
wr := io.MultiWriter(h, w.cur)
b := make([]byte, 6)
b[0] = byte(et)
@ -210,16 +289,18 @@ func (e *walEncoder) entry(et WALEntryType, flag byte, buf []byte) error {
binary.BigEndian.PutUint32(b[2:], uint32(len(buf)))
if _, err := w.Write(b); err != nil {
if _, err := wr.Write(b); err != nil {
return err
}
if _, err := w.Write(buf); err != nil {
if _, err := wr.Write(buf); err != nil {
return err
}
if _, err := e.w.Write(h.Sum(nil)); err != nil {
if _, err := w.cur.Write(h.Sum(nil)); err != nil {
return err
}
w.curN += sz
putWALBuffer(buf)
return nil
}
@ -244,7 +325,7 @@ func putWALBuffer(b []byte) {
walBuffers.Put(b)
}
func (e *walEncoder) encodeSeries(series []labels.Labels) error {
func (w *WAL) encodeSeries(series []labels.Labels) error {
if len(series) == 0 {
return nil
}
@ -267,10 +348,10 @@ func (e *walEncoder) encodeSeries(series []labels.Labels) error {
}
}
return e.entry(WALEntrySeries, walSeriesSimple, buf)
return w.entry(WALEntrySeries, walSeriesSimple, buf)
}
func (e *walEncoder) encodeSamples(samples []refdSample) error {
func (w *WAL) encodeSamples(samples []refdSample) error {
if len(samples) == 0 {
return nil
}
@ -300,7 +381,7 @@ func (e *walEncoder) encodeSamples(samples []refdSample) error {
buf = append(buf, b[:8]...)
}
return e.entry(WALEntrySamples, walSamplesSimple, buf)
return w.entry(WALEntrySamples, walSamplesSimple, buf)
}
type walDecoder struct {
@ -310,7 +391,7 @@ type walDecoder struct {
buf []byte
}
func newWALDecoer(r io.Reader, h *walHandler) *walDecoder {
func newWALDecoder(r io.Reader, h *walHandler) *walDecoder {
return &walDecoder{
r: r,
handler: h,
@ -402,6 +483,10 @@ func (d *walDecoder) entry() error {
flag = b[1]
length = int(binary.BigEndian.Uint32(b[2:]))
)
// Exit if we reached pre-allocated space.
if etype == 0 {
return io.EOF
}
if length > len(d.buf) {
d.buf = make([]byte, length)