mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Use page buffered writer for WAL
This commit is contained in:
parent
9c6a72aadd
commit
e7edae39b2
34
wal.go
34
wal.go
|
@ -9,6 +9,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
|
"github.com/coreos/etcd/pkg/ioutil"
|
||||||
"github.com/fabxc/tsdb/labels"
|
"github.com/fabxc/tsdb/labels"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
@ -55,10 +56,14 @@ func OpenWAL(dir string) (*WAL, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
enc, err := newWALEncoder(f.File)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
w := &WAL{
|
w := &WAL{
|
||||||
f: f,
|
f: f,
|
||||||
enc: newWALEncoder(f),
|
enc: enc,
|
||||||
symbols: map[string]uint32{},
|
symbols: map[string]uint32{},
|
||||||
}
|
}
|
||||||
return w, nil
|
return w, nil
|
||||||
|
@ -97,6 +102,9 @@ func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) sync() error {
|
func (w *WAL) sync() error {
|
||||||
|
if err := w.enc.flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return fileutil.Fdatasync(w.f.File)
|
return fileutil.Fdatasync(w.f.File)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,16 +117,30 @@ func (w *WAL) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
type walEncoder struct {
|
type walEncoder struct {
|
||||||
w io.Writer
|
w *ioutil.PageWriter
|
||||||
|
|
||||||
buf []byte
|
buf []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWALEncoder(w io.Writer) *walEncoder {
|
// walPageBytes is the alignment for flushing records to the backing Writer.
|
||||||
return &walEncoder{
|
// It should be a multiple of the minimum sector size so that WAL can safely
|
||||||
w: w,
|
// distinguish between torn writes and ordinary data corruption.
|
||||||
buf: make([]byte, 0, 1024*1024),
|
const minSectorSize = 512
|
||||||
|
const walPageBytes = 8 * minSectorSize
|
||||||
|
|
||||||
|
func newWALEncoder(f *os.File) (*walEncoder, error) {
|
||||||
|
offset, err := f.Seek(0, os.SEEK_CUR)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return &walEncoder{
|
||||||
|
w: ioutil.NewPageWriter(f, walPageBytes, int(offset)),
|
||||||
|
buf: make([]byte, 0, 1024*1024),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *walEncoder) flush() error {
|
||||||
|
return e.w.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *walEncoder) entry(et WALEntryType, flag byte, n int) error {
|
func (e *walEncoder) entry(et WALEntryType, flag byte, n int) error {
|
||||||
|
|
Loading…
Reference in a new issue