Merge pull request #190 from prometheus/walsync

wal: synchronize background operations
This commit is contained in:
Fabian Reinartz 2017-11-01 18:11:22 +01:00 committed by GitHub
commit b1df857819
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

47
wal.go
View file

@ -190,6 +190,7 @@ type SegmentWAL struct {
stopc chan struct{}
donec chan struct{}
actorc chan func() error // sequentialized background operations
buffers sync.Pool
}
@ -213,6 +214,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration,
flushInterval: flushInterval,
donec: make(chan struct{}),
stopc: make(chan struct{}),
actorc: make(chan func() error, 1),
segmentSize: walSegmentSizeBytes,
crc32: newCRC32(),
}
@ -580,18 +582,21 @@ func (w *SegmentWAL) cut() error {
// Finish last segment asynchronously to not block the WAL moving along
// in the new segment.
go func() {
off, err := hf.Seek(0, os.SEEK_CUR)
if err != nil {
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
if err := hf.Truncate(off); err != nil {
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
if err := hf.Sync(); err != nil {
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
if err := hf.Close(); err != nil {
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
w.actorc <- func() error {
off, err := hf.Seek(0, os.SEEK_CUR)
if err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
if err := hf.Truncate(off); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
if err := hf.Sync(); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
if err := hf.Close(); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
return nil
}
}()
}
@ -606,8 +611,8 @@ func (w *SegmentWAL) cut() error {
}
go func() {
if err = w.dirFile.Sync(); err != nil {
level.Error(w.logger).Log("msg", "sync WAL directory", "err", err)
w.actorc <- func() error {
return errors.Wrap(w.dirFile.Sync(), "sync WAL directory")
}
}()
@ -686,9 +691,23 @@ func (w *SegmentWAL) run(interval time.Duration) {
defer close(w.donec)
for {
// Processing all enqueued operations has precedence over shutdown and
// background syncs.
select {
case f := <-w.actorc:
if err := f(); err != nil {
level.Error(w.logger).Log("msg", "operation failed", "err", err)
}
continue
default:
}
select {
case <-w.stopc:
return
case f := <-w.actorc:
if err := f(); err != nil {
level.Error(w.logger).Log("msg", "operation failed", "err", err)
}
case <-tick:
if err := w.Sync(); err != nil {
level.Error(w.logger).Log("msg", "sync failed", "err", err)