From e4167a5ca885d06cdb6d06bbbd54ba46f3946441 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 2 Nov 2017 17:12:26 +0100 Subject: [PATCH] Update vendoring of prometheus/tsdb --- vendor/github.com/prometheus/tsdb/chunks.go | 8 ++- vendor/github.com/prometheus/tsdb/compact.go | 12 +++- .../github.com/prometheus/tsdb/db_windows.go | 3 +- vendor/github.com/prometheus/tsdb/head.go | 2 +- vendor/github.com/prometheus/tsdb/index.go | 2 + .../github.com/prometheus/tsdb/tombstones.go | 10 ++- vendor/github.com/prometheus/tsdb/wal.go | 67 ++++++++++++++----- vendor/vendor.json | 6 +- 8 files changed, 83 insertions(+), 27 deletions(-) diff --git a/vendor/github.com/prometheus/tsdb/chunks.go b/vendor/github.com/prometheus/tsdb/chunks.go index 8152677f7..f6e329b79 100644 --- a/vendor/github.com/prometheus/tsdb/chunks.go +++ b/vendor/github.com/prometheus/tsdb/chunks.go @@ -170,6 +170,7 @@ func (w *chunkWriter) finalizeTail() error { if err := tf.Truncate(off); err != nil { return err } + return tf.Close() } @@ -276,7 +277,12 @@ func (w *chunkWriter) seq() int { } func (w *chunkWriter) Close() error { - return w.finalizeTail() + if err := w.finalizeTail(); err != nil { + return err + } + + // close dir file (if not windows platform will fail on rename) + return w.dirFile.Close() } // ChunkReader provides reading access of serialized time series data. diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index a70918a28..955ba3caf 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -426,12 +426,22 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open temporary block dir") } - defer df.Close() + defer func() { + if df != nil { + df.Close() + } + }() if err := fileutil.Fsync(df); err != nil { return errors.Wrap(err, "sync temporary dir file") } + // close temp dir before rename block dir(for windows platform) + if err = df.Close(); err != nil { + return errors.Wrap(err, "close temporary dir") + } + df = nil + // Block successfully written, make visible and remove old ones. if err := renameFile(tmp, dir); err != nil { return errors.Wrap(err, "rename block dir") diff --git a/vendor/github.com/prometheus/tsdb/db_windows.go b/vendor/github.com/prometheus/tsdb/db_windows.go index 700518e7a..444bf4103 100644 --- a/vendor/github.com/prometheus/tsdb/db_windows.go +++ b/vendor/github.com/prometheus/tsdb/db_windows.go @@ -21,8 +21,7 @@ import ( func mmap(f *os.File, sz int) ([]byte, error) { low, high := uint32(sz), uint32(sz>>32) - - h, errno := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READONLY, low, high, nil) + h, errno := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READONLY, high, low, nil) if h == 0 { return nil, os.NewSyscallError("CreateFileMapping", errno) } diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 92363659f..3d2bdb9c6 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -142,7 +142,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { }) m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_samples_appended_total", - Help: "Total number of appended sampledb.", + Help: "Total number of appended samples.", }) if r != nil { diff --git a/vendor/github.com/prometheus/tsdb/index.go b/vendor/github.com/prometheus/tsdb/index.go index c0680aa33..f976de695 100644 --- a/vendor/github.com/prometheus/tsdb/index.go +++ b/vendor/github.com/prometheus/tsdb/index.go @@ -153,6 +153,8 @@ func newIndexWriter(dir string) (*indexWriter, error) { if err != nil { return nil, err } + defer df.Close() // close for flatform windows + f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return nil, err diff --git a/vendor/github.com/prometheus/tsdb/tombstones.go b/vendor/github.com/prometheus/tsdb/tombstones.go index 19b224634..d43cd0bd0 100644 --- a/vendor/github.com/prometheus/tsdb/tombstones.go +++ b/vendor/github.com/prometheus/tsdb/tombstones.go @@ -49,7 +49,11 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { if err != nil { return err } - defer f.Close() + defer func() { + if f != nil { + f.Close() + } + }() buf := encbuf{b: make([]byte, 3*binary.MaxVarintLen64)} buf.reset() @@ -82,6 +86,10 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { return err } + if err = f.Close(); err != nil { + return err + } + f = nil return renameFile(tmp, path) } diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index 225851de1..b0cea8f09 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -190,6 +190,7 @@ type SegmentWAL struct { stopc chan struct{} donec chan struct{} + actorc chan func() error // sequentialized background operations buffers sync.Pool } @@ -213,6 +214,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, flushInterval: flushInterval, donec: make(chan struct{}), stopc: make(chan struct{}), + actorc: make(chan func() error, 1), segmentSize: walSegmentSizeBytes, crc32: newCRC32(), } @@ -384,7 +386,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { w.putBuffer(buf) if err != nil { - return err + return errors.Wrap(err, "write to compaction segment") } } if r.Err() != nil { @@ -401,14 +403,15 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { csf.Sync() csf.Close() + candidates[0].Close() // need close before remove on platform windows if err := renameFile(csf.Name(), candidates[0].Name()); err != nil { - return err + return errors.Wrap(err, "rename compaction segment") } for _, f := range candidates[1:] { + f.Close() // need close before remove on platform windows if err := os.RemoveAll(f.Name()); err != nil { return errors.Wrap(err, "delete WAL segment file") } - f.Close() } if err := w.dirFile.Sync(); err != nil { return err @@ -522,6 +525,15 @@ func (w *SegmentWAL) openSegmentFile(name string) (*os.File, error) { } metab := make([]byte, 8) + // If there is an error, we need close f for platform windows before gc. + // Otherwise, file op may fail. + hasError := true + defer func() { + if hasError { + f.Close() + } + }() + if n, err := f.Read(metab); err != nil { return nil, errors.Wrapf(err, "validate meta %q", f.Name()) } else if n != 8 { @@ -534,6 +546,7 @@ func (w *SegmentWAL) openSegmentFile(name string) (*os.File, error) { if metab[4] != WALFormatDefault { return nil, errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name()) } + hasError = false return f, nil } @@ -569,18 +582,21 @@ func (w *SegmentWAL) cut() error { // Finish last segment asynchronously to not block the WAL moving along // in the new segment. go func() { - off, err := hf.Seek(0, os.SEEK_CUR) - if err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) - } - if err := hf.Truncate(off); err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) - } - if err := hf.Sync(); err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) - } - if err := hf.Close(); err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + w.actorc <- func() error { + off, err := hf.Seek(0, os.SEEK_CUR) + if err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + if err := hf.Truncate(off); err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + if err := hf.Sync(); err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + if err := hf.Close(); err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + return nil } }() } @@ -595,8 +611,8 @@ func (w *SegmentWAL) cut() error { } go func() { - if err = w.dirFile.Sync(); err != nil { - level.Error(w.logger).Log("msg", "sync WAL directory", "err", err) + w.actorc <- func() error { + return errors.Wrap(w.dirFile.Sync(), "sync WAL directory") } }() @@ -675,9 +691,23 @@ func (w *SegmentWAL) run(interval time.Duration) { defer close(w.donec) for { + // Processing all enqueued operations has precedence over shutdown and + // background syncs. + select { + case f := <-w.actorc: + if err := f(); err != nil { + level.Error(w.logger).Log("msg", "operation failed", "err", err) + } + continue + default: + } select { case <-w.stopc: return + case f := <-w.actorc: + if err := f(); err != nil { + level.Error(w.logger).Log("msg", "operation failed", "err", err) + } case <-tick: if err := w.Sync(); err != nil { level.Error(w.logger).Log("msg", "sync failed", "err", err) @@ -702,7 +732,8 @@ func (w *SegmentWAL) Close() error { if hf := w.head(); hf != nil { return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) } - return nil + + return w.dirFile.Close() } const ( diff --git a/vendor/vendor.json b/vendor/vendor.json index 0f5b66b0b..11c18553b 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -846,10 +846,10 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "qbhdcw451oyIWXj+0zlkR+rDi9Y=", + "checksumSHA1": "MZoz9kpR5PSUM9mJLh3c7nSrk9c=", "path": "github.com/prometheus/tsdb", - "revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce", - "revisionTime": "2017-10-25T14:52:11Z" + "revision": "b1df85781931b0ff48d09a364174016d16a4dc3e", + "revisionTime": "2017-11-01T17:11:22Z" }, { "checksumSHA1": "uy6ySJ6EZqof+yMD2wTkYob8BeU=",