mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-13 17:14:05 -08:00
vendor: update prometheus/tsdb
This commit is contained in:
parent
7a3261aa99
commit
f52248269e
7
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
7
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DiskBlock represents a data block backed by on-disk data.
|
||||||
type DiskBlock interface {
|
type DiskBlock interface {
|
||||||
BlockReader
|
BlockReader
|
||||||
|
|
||||||
|
@ -42,6 +43,7 @@ type DiskBlock interface {
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BlockReader provides reading access to a data block.
|
||||||
type BlockReader interface {
|
type BlockReader interface {
|
||||||
// Index returns an IndexReader over the block's data.
|
// Index returns an IndexReader over the block's data.
|
||||||
Index() IndexReader
|
Index() IndexReader
|
||||||
|
@ -53,11 +55,6 @@ type BlockReader interface {
|
||||||
Tombstones() TombstoneReader
|
Tombstones() TombstoneReader
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshottable defines an entity that can be backedup online.
|
|
||||||
type Snapshottable interface {
|
|
||||||
Snapshot(dir string) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Appendable defines an entity to which data can be appended.
|
// Appendable defines an entity to which data can be appended.
|
||||||
type Appendable interface {
|
type Appendable interface {
|
||||||
// Appender returns a new Appender against an underlying store.
|
// Appender returns a new Appender against an underlying store.
|
||||||
|
|
2
vendor/github.com/prometheus/tsdb/chunks.go
generated
vendored
2
vendor/github.com/prometheus/tsdb/chunks.go
generated
vendored
|
@ -21,7 +21,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/tsdb/chunks"
|
"github.com/prometheus/tsdb/chunks"
|
||||||
)
|
)
|
||||||
|
|
27
vendor/github.com/prometheus/tsdb/compact.go
generated
vendored
27
vendor/github.com/prometheus/tsdb/compact.go
generated
vendored
|
@ -20,12 +20,13 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/go-kit/kit/log/level"
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/tsdb/chunks"
|
"github.com/prometheus/tsdb/chunks"
|
||||||
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -356,7 +357,7 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error {
|
||||||
// write creates a new block that is the union of the provided blocks into dir.
|
// write creates a new block that is the union of the provided blocks into dir.
|
||||||
// It cleans up all files of the old blocks after completing successfully.
|
// It cleans up all files of the old blocks after completing successfully.
|
||||||
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
|
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
|
||||||
c.logger.Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime)
|
level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime)
|
||||||
|
|
||||||
defer func(t time.Time) {
|
defer func(t time.Time) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -420,21 +421,20 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
return errors.Wrap(err, "write new tombstones file")
|
return errors.Wrap(err, "write new tombstones file")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Block successfully written, make visible and remove old ones.
|
df, err := fileutil.OpenDir(tmp)
|
||||||
if err := renameFile(tmp, dir); err != nil {
|
|
||||||
return errors.Wrap(err, "rename block dir")
|
|
||||||
}
|
|
||||||
// Properly sync parent dir to ensure changes are visible.
|
|
||||||
df, err := fileutil.OpenDir(dir)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "sync block dir")
|
return errors.Wrap(err, "open temporary block dir")
|
||||||
}
|
}
|
||||||
defer df.Close()
|
defer df.Close()
|
||||||
|
|
||||||
if err := fileutil.Fsync(df); err != nil {
|
if err := fileutil.Fsync(df); err != nil {
|
||||||
return errors.Wrap(err, "sync block dir")
|
return errors.Wrap(err, "sync temporary dir file")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Block successfully written, make visible and remove old ones.
|
||||||
|
if err := renameFile(tmp, dir); err != nil {
|
||||||
|
return errors.Wrap(err, "rename block dir")
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -750,13 +750,10 @@ func renameFile(from, to string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer pdir.Close()
|
|
||||||
|
|
||||||
if err = fileutil.Fsync(pdir); err != nil {
|
if err = fileutil.Fsync(pdir); err != nil {
|
||||||
|
pdir.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = pdir.Close(); err != nil {
|
return pdir.Close()
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
26
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
26
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
|
@ -30,8 +30,9 @@ import (
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/go-kit/kit/log/level"
|
||||||
"github.com/nightlyone/lockfile"
|
"github.com/nightlyone/lockfile"
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -203,7 +204,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
return nil, errors.Wrap(err, "create leveled compactor")
|
return nil, errors.Wrap(err, "create leveled compactor")
|
||||||
}
|
}
|
||||||
|
|
||||||
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, 10*time.Second)
|
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -251,12 +252,12 @@ func (db *DB) run() {
|
||||||
|
|
||||||
_, err1 := db.retentionCutoff()
|
_, err1 := db.retentionCutoff()
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
db.logger.Log("msg", "retention cutoff failed", "err", err1)
|
level.Error(db.logger).Log("msg", "retention cutoff failed", "err", err1)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err2 := db.compact()
|
_, err2 := db.compact()
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
db.logger.Log("msg", "compaction failed", "err", err2)
|
level.Error(db.logger).Log("msg", "compaction failed", "err", err2)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err1 != nil || err2 != nil {
|
if err1 != nil || err2 != nil {
|
||||||
|
@ -520,6 +521,17 @@ func validateBlockSequence(bs []DiskBlock) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) Blocks() []DiskBlock {
|
||||||
|
db.mtx.RLock()
|
||||||
|
defer db.mtx.RUnlock()
|
||||||
|
|
||||||
|
return db.blocks
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) Head() *Head {
|
||||||
|
return db.head
|
||||||
|
}
|
||||||
|
|
||||||
// Close the partition.
|
// Close the partition.
|
||||||
func (db *DB) Close() error {
|
func (db *DB) Close() error {
|
||||||
close(db.stopc)
|
close(db.stopc)
|
||||||
|
@ -551,7 +563,7 @@ func (db *DB) DisableCompactions() {
|
||||||
defer db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
db.compactionsEnabled = false
|
db.compactionsEnabled = false
|
||||||
db.logger.Log("msg", "compactions disabled")
|
level.Info(db.logger).Log("msg", "compactions disabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnableCompactions enables compactions.
|
// EnableCompactions enables compactions.
|
||||||
|
@ -560,7 +572,7 @@ func (db *DB) EnableCompactions() {
|
||||||
defer db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
db.compactionsEnabled = true
|
db.compactionsEnabled = true
|
||||||
db.logger.Log("msg", "compactions enabled")
|
level.Info(db.logger).Log("msg", "compactions enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot writes the current data to the directory.
|
// Snapshot writes the current data to the directory.
|
||||||
|
@ -579,7 +591,7 @@ func (db *DB) Snapshot(dir string) error {
|
||||||
defer db.mtx.RUnlock()
|
defer db.mtx.RUnlock()
|
||||||
|
|
||||||
for _, b := range db.blocks {
|
for _, b := range db.blocks {
|
||||||
db.logger.Log("msg", "snapshotting block", "block", b)
|
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
|
||||||
|
|
||||||
if err := b.Snapshot(dir); err != nil {
|
if err := b.Snapshot(dir); err != nil {
|
||||||
return errors.Wrap(err, "error snapshotting headblock")
|
return errors.Wrap(err, "error snapshotting headblock")
|
||||||
|
|
16
vendor/github.com/prometheus/tsdb/encoding_helpers.go
generated
vendored
16
vendor/github.com/prometheus/tsdb/encoding_helpers.go
generated
vendored
|
@ -77,22 +77,6 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
|
||||||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||||
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
||||||
|
|
||||||
// uvarintTempStr decodes like uvarintStr but the returned string is
|
|
||||||
// not safe to use if the underyling buffer changes.
|
|
||||||
func (d *decbuf) uvarintTempStr() string {
|
|
||||||
l := d.uvarint64()
|
|
||||||
if d.e != nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
if len(d.b) < int(l) {
|
|
||||||
d.e = errInvalidSize
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
s := yoloString(d.b[:l])
|
|
||||||
d.b = d.b[l:]
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *decbuf) uvarintStr() string {
|
func (d *decbuf) uvarintStr() string {
|
||||||
l := d.uvarint64()
|
l := d.uvarint64()
|
||||||
if d.e != nil {
|
if d.e != nil {
|
||||||
|
|
22
vendor/github.com/prometheus/tsdb/fileutil/dir_unix.go
generated
vendored
Normal file
22
vendor/github.com/prometheus/tsdb/fileutil/dir_unix.go
generated
vendored
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build !windows
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import "os"
|
||||||
|
|
||||||
|
// OpenDir opens a directory for syncing.
|
||||||
|
func OpenDir(path string) (*os.File, error) { return os.Open(path) }
|
46
vendor/github.com/prometheus/tsdb/fileutil/dir_windows.go
generated
vendored
Normal file
46
vendor/github.com/prometheus/tsdb/fileutil/dir_windows.go
generated
vendored
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build windows
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
// OpenDir opens a directory in windows with write access for syncing.
|
||||||
|
func OpenDir(path string) (*os.File, error) {
|
||||||
|
fd, err := openDir(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return os.NewFile(uintptr(fd), path), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func openDir(path string) (fd syscall.Handle, err error) {
|
||||||
|
if len(path) == 0 {
|
||||||
|
return syscall.InvalidHandle, syscall.ERROR_FILE_NOT_FOUND
|
||||||
|
}
|
||||||
|
pathp, err := syscall.UTF16PtrFromString(path)
|
||||||
|
if err != nil {
|
||||||
|
return syscall.InvalidHandle, err
|
||||||
|
}
|
||||||
|
access := uint32(syscall.GENERIC_READ | syscall.GENERIC_WRITE)
|
||||||
|
sharemode := uint32(syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE)
|
||||||
|
createmode := uint32(syscall.OPEN_EXISTING)
|
||||||
|
fl := uint32(syscall.FILE_FLAG_BACKUP_SEMANTICS)
|
||||||
|
return syscall.CreateFile(pathp, access, sharemode, nil, createmode, fl, 0)
|
||||||
|
}
|
25
vendor/github.com/prometheus/tsdb/fileutil/fileutil.go
generated
vendored
Normal file
25
vendor/github.com/prometheus/tsdb/fileutil/fileutil.go
generated
vendored
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
// Package fileutil provides utility methods used when dealing with the filesystem in tsdb.
|
||||||
|
// It is largely copied from github.com/coreos/etcd/pkg/fileutil to avoid the
|
||||||
|
// dependency chain it brings with it.
|
||||||
|
// Please check github.com/coreos/etcd for licensing information.
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"sort"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ReadDir returns the filenames in the given directory in sorted order.
|
||||||
|
func ReadDir(dirpath string) ([]string, error) {
|
||||||
|
dir, err := os.Open(dirpath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer dir.Close()
|
||||||
|
names, err := dir.Readdirnames(-1)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
sort.Strings(names)
|
||||||
|
return names, nil
|
||||||
|
}
|
54
vendor/github.com/prometheus/tsdb/fileutil/preallocate.go
generated
vendored
Normal file
54
vendor/github.com/prometheus/tsdb/fileutil/preallocate.go
generated
vendored
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
// Copyright 2015 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Preallocate tries to allocate the space for given
|
||||||
|
// file. This operation is only supported on linux by a
|
||||||
|
// few filesystems (btrfs, ext4, etc.).
|
||||||
|
// If the operation is unsupported, no error will be returned.
|
||||||
|
// Otherwise, the error encountered will be returned.
|
||||||
|
func Preallocate(f *os.File, sizeInBytes int64, extendFile bool) error {
|
||||||
|
if sizeInBytes == 0 {
|
||||||
|
// fallocate will return EINVAL if length is 0; skip
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if extendFile {
|
||||||
|
return preallocExtend(f, sizeInBytes)
|
||||||
|
}
|
||||||
|
return preallocFixed(f, sizeInBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func preallocExtendTrunc(f *os.File, sizeInBytes int64) error {
|
||||||
|
curOff, err := f.Seek(0, io.SeekCurrent)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
size, err := f.Seek(sizeInBytes, io.SeekEnd)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err = f.Seek(curOff, io.SeekStart); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if sizeInBytes > size {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return f.Truncate(sizeInBytes)
|
||||||
|
}
|
41
vendor/github.com/prometheus/tsdb/fileutil/preallocate_darwin.go
generated
vendored
Normal file
41
vendor/github.com/prometheus/tsdb/fileutil/preallocate_darwin.go
generated
vendored
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
// Copyright 2015 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
func preallocExtend(f *os.File, sizeInBytes int64) error {
|
||||||
|
if err := preallocFixed(f, sizeInBytes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return preallocExtendTrunc(f, sizeInBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func preallocFixed(f *os.File, sizeInBytes int64) error {
|
||||||
|
fstore := &syscall.Fstore_t{
|
||||||
|
Flags: syscall.F_ALLOCATEALL,
|
||||||
|
Posmode: syscall.F_PEOFPOSMODE,
|
||||||
|
Length: sizeInBytes}
|
||||||
|
p := unsafe.Pointer(fstore)
|
||||||
|
_, _, errno := syscall.Syscall(syscall.SYS_FCNTL, f.Fd(), uintptr(syscall.F_PREALLOCATE), uintptr(p))
|
||||||
|
if errno == 0 || errno == syscall.ENOTSUP {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errno
|
||||||
|
}
|
47
vendor/github.com/prometheus/tsdb/fileutil/preallocate_linux.go
generated
vendored
Normal file
47
vendor/github.com/prometheus/tsdb/fileutil/preallocate_linux.go
generated
vendored
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
// Copyright 2015 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
func preallocExtend(f *os.File, sizeInBytes int64) error {
|
||||||
|
// use mode = 0 to change size
|
||||||
|
err := syscall.Fallocate(int(f.Fd()), 0, 0, sizeInBytes)
|
||||||
|
if err != nil {
|
||||||
|
errno, ok := err.(syscall.Errno)
|
||||||
|
// not supported; fallback
|
||||||
|
// fallocate EINTRs frequently in some environments; fallback
|
||||||
|
if ok && (errno == syscall.ENOTSUP || errno == syscall.EINTR) {
|
||||||
|
return preallocExtendTrunc(f, sizeInBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func preallocFixed(f *os.File, sizeInBytes int64) error {
|
||||||
|
// use mode = 1 to keep size; see FALLOC_FL_KEEP_SIZE
|
||||||
|
err := syscall.Fallocate(int(f.Fd()), 1, 0, sizeInBytes)
|
||||||
|
if err != nil {
|
||||||
|
errno, ok := err.(syscall.Errno)
|
||||||
|
// treat not supported as nil error
|
||||||
|
if ok && errno == syscall.ENOTSUP {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
25
vendor/github.com/prometheus/tsdb/fileutil/preallocate_other.go
generated
vendored
Normal file
25
vendor/github.com/prometheus/tsdb/fileutil/preallocate_other.go
generated
vendored
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
// Copyright 2015 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build !linux,!darwin
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import "os"
|
||||||
|
|
||||||
|
func preallocExtend(f *os.File, sizeInBytes int64) error {
|
||||||
|
return preallocExtendTrunc(f, sizeInBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func preallocFixed(f *os.File, sizeInBytes int64) error { return nil }
|
29
vendor/github.com/prometheus/tsdb/fileutil/sync.go
generated
vendored
Normal file
29
vendor/github.com/prometheus/tsdb/fileutil/sync.go
generated
vendored
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build !linux,!darwin
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import "os"
|
||||||
|
|
||||||
|
// Fsync is a wrapper around file.Sync(). Special handling is needed on darwin platform.
|
||||||
|
func Fsync(f *os.File) error {
|
||||||
|
return f.Sync()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fdatasync is a wrapper around file.Sync(). Special handling is needed on linux platform.
|
||||||
|
func Fdatasync(f *os.File) error {
|
||||||
|
return f.Sync()
|
||||||
|
}
|
40
vendor/github.com/prometheus/tsdb/fileutil/sync_darwin.go
generated
vendored
Normal file
40
vendor/github.com/prometheus/tsdb/fileutil/sync_darwin.go
generated
vendored
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build darwin
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Fsync on HFS/OSX flushes the data on to the physical drive but the drive
|
||||||
|
// may not write it to the persistent media for quite sometime and it may be
|
||||||
|
// written in out-of-order sequence. Using F_FULLFSYNC ensures that the
|
||||||
|
// physical drive's buffer will also get flushed to the media.
|
||||||
|
func Fsync(f *os.File) error {
|
||||||
|
_, _, errno := syscall.Syscall(syscall.SYS_FCNTL, f.Fd(), uintptr(syscall.F_FULLFSYNC), uintptr(0))
|
||||||
|
if errno == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errno
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fdatasync on darwin platform invokes fcntl(F_FULLFSYNC) for actual persistence
|
||||||
|
// on physical drive media.
|
||||||
|
func Fdatasync(f *os.File) error {
|
||||||
|
return Fsync(f)
|
||||||
|
}
|
34
vendor/github.com/prometheus/tsdb/fileutil/sync_linux.go
generated
vendored
Normal file
34
vendor/github.com/prometheus/tsdb/fileutil/sync_linux.go
generated
vendored
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build linux
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Fsync is a wrapper around file.Sync(). Special handling is needed on darwin platform.
|
||||||
|
func Fsync(f *os.File) error {
|
||||||
|
return f.Sync()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fdatasync is similar to fsync(), but does not flush modified metadata
|
||||||
|
// unless that metadata is needed in order to allow a subsequent data retrieval
|
||||||
|
// to be correctly handled.
|
||||||
|
func Fdatasync(f *os.File) error {
|
||||||
|
return syscall.Fdatasync(int(f.Fd()))
|
||||||
|
}
|
12
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
12
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
|
@ -21,6 +21,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/go-kit/kit/log/level"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/tsdb/chunks"
|
"github.com/prometheus/tsdb/chunks"
|
||||||
|
@ -235,7 +236,7 @@ func (h *Head) ReadWAL() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if unknownRefs > 0 {
|
if unknownRefs > 0 {
|
||||||
h.logger.Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
|
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
|
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
|
||||||
|
@ -248,9 +249,6 @@ func (h *Head) ReadWAL() error {
|
||||||
func (h *Head) Truncate(mint int64) error {
|
func (h *Head) Truncate(mint int64) error {
|
||||||
initialize := h.MinTime() == math.MinInt64
|
initialize := h.MinTime() == math.MinInt64
|
||||||
|
|
||||||
if mint%h.chunkRange != 0 {
|
|
||||||
return errors.Errorf("truncating at %d not aligned", mint)
|
|
||||||
}
|
|
||||||
if h.MinTime() >= mint {
|
if h.MinTime() >= mint {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -270,7 +268,7 @@ func (h *Head) Truncate(mint int64) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
h.gc()
|
h.gc()
|
||||||
h.logger.Log("msg", "head GC completed", "duration", time.Since(start))
|
level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start))
|
||||||
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
||||||
|
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
|
@ -279,9 +277,9 @@ func (h *Head) Truncate(mint int64) error {
|
||||||
return h.series.getByID(id) != nil
|
return h.series.getByID(id) != nil
|
||||||
}
|
}
|
||||||
if err := h.wal.Truncate(mint, keep); err == nil {
|
if err := h.wal.Truncate(mint, keep); err == nil {
|
||||||
h.logger.Log("msg", "WAL truncation completed", "duration", time.Since(start))
|
level.Info(h.logger).Log("msg", "WAL truncation completed", "duration", time.Since(start))
|
||||||
} else {
|
} else {
|
||||||
h.logger.Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start))
|
level.Error(h.logger).Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start))
|
||||||
}
|
}
|
||||||
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
|
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
|
||||||
|
|
||||||
|
|
86
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
86
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
|
@ -23,10 +23,9 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
@ -232,13 +231,13 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error {
|
||||||
w.toc.labelIndices = w.pos
|
w.toc.labelIndices = w.pos
|
||||||
|
|
||||||
case idxStagePostings:
|
case idxStagePostings:
|
||||||
|
w.toc.postings = w.pos
|
||||||
|
|
||||||
|
case idxStageDone:
|
||||||
w.toc.labelIndicesTable = w.pos
|
w.toc.labelIndicesTable = w.pos
|
||||||
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
|
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.toc.postings = w.pos
|
|
||||||
|
|
||||||
case idxStageDone:
|
|
||||||
w.toc.postingsTable = w.pos
|
w.toc.postingsTable = w.pos
|
||||||
if err := w.writeOffsetTable(w.postings); err != nil {
|
if err := w.writeOffsetTable(w.postings); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -404,10 +403,8 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||||
|
|
||||||
// writeOffsetTable writes a sequence of readable hash entries.
|
// writeOffsetTable writes a sequence of readable hash entries.
|
||||||
func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
|
func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
|
||||||
w.buf1.reset()
|
|
||||||
w.buf1.putBE32int(len(entries))
|
|
||||||
|
|
||||||
w.buf2.reset()
|
w.buf2.reset()
|
||||||
|
w.buf2.putBE32int(len(entries))
|
||||||
|
|
||||||
for _, e := range entries {
|
for _, e := range entries {
|
||||||
w.buf2.putUvarint(len(e.keys))
|
w.buf2.putUvarint(len(e.keys))
|
||||||
|
@ -417,6 +414,7 @@ func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
|
||||||
w.buf2.putUvarint64(e.offset)
|
w.buf2.putUvarint64(e.offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
w.buf1.reset()
|
||||||
w.buf1.putBE32int(w.buf2.len())
|
w.buf1.putBE32int(w.buf2.len())
|
||||||
w.buf2.putHash(w.crc32)
|
w.buf2.putHash(w.crc32)
|
||||||
|
|
||||||
|
@ -563,6 +561,12 @@ type indexReader struct {
|
||||||
// Cached hashmaps of section offsets.
|
// Cached hashmaps of section offsets.
|
||||||
labels map[string]uint32
|
labels map[string]uint32
|
||||||
postings map[string]uint32
|
postings map[string]uint32
|
||||||
|
// Cache of read symbols. Strings that are returned when reading from the
|
||||||
|
// block are always backed by true strings held in here rather than
|
||||||
|
// strings that are backed by byte slices from the mmap'd index file. This
|
||||||
|
// prevents memory faults when applications work with read symbols after
|
||||||
|
// the block has been unmapped.
|
||||||
|
symbols map[uint32]string
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -579,7 +583,11 @@ func newIndexReader(dir string) (*indexReader, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
r := &indexReader{b: f.b, c: f}
|
r := &indexReader{
|
||||||
|
b: f.b,
|
||||||
|
c: f,
|
||||||
|
symbols: map[uint32]string{},
|
||||||
|
}
|
||||||
|
|
||||||
// Verify magic number.
|
// Verify magic number.
|
||||||
if len(f.b) < 4 {
|
if len(f.b) < 4 {
|
||||||
|
@ -592,6 +600,9 @@ func newIndexReader(dir string) (*indexReader, error) {
|
||||||
if err := r.readTOC(); err != nil {
|
if err := r.readTOC(); err != nil {
|
||||||
return nil, errors.Wrap(err, "read TOC")
|
return nil, errors.Wrap(err, "read TOC")
|
||||||
}
|
}
|
||||||
|
if err := r.readSymbols(int(r.toc.symbols)); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "read symbols")
|
||||||
|
}
|
||||||
|
|
||||||
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
|
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -623,21 +634,40 @@ func (r *indexReader) decbufAt(off int) decbuf {
|
||||||
return decbuf{b: r.b[off:]}
|
return decbuf{b: r.b[off:]}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
|
||||||
|
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them
|
||||||
|
// after the reader is closed.
|
||||||
|
func (r *indexReader) readSymbols(off int) error {
|
||||||
|
if off == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
d1 = r.decbufAt(int(off))
|
||||||
|
d2 = d1.decbuf(d1.be32int())
|
||||||
|
origLen = d2.len()
|
||||||
|
cnt = d2.be32int()
|
||||||
|
basePos = uint32(off) + 4
|
||||||
|
nextPos = basePos + uint32(origLen-d2.len())
|
||||||
|
)
|
||||||
|
for d2.err() == nil && d2.len() > 0 && cnt > 0 {
|
||||||
|
s := d2.uvarintStr()
|
||||||
|
r.symbols[uint32(nextPos)] = s
|
||||||
|
|
||||||
|
nextPos = basePos + uint32(origLen-d2.len())
|
||||||
|
cnt--
|
||||||
|
}
|
||||||
|
return d2.err()
|
||||||
|
}
|
||||||
|
|
||||||
// readOffsetTable reads an offset table at the given position and returns a map
|
// readOffsetTable reads an offset table at the given position and returns a map
|
||||||
// with the key strings concatenated by the 0xff unicode non-character.
|
// with the key strings concatenated by the 0xff unicode non-character.
|
||||||
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
||||||
// A table might not have been written at all, in which case the position
|
|
||||||
// is zeroed out.
|
|
||||||
if off == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
const sep = "\xff"
|
const sep = "\xff"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
d1 = r.decbufAt(int(off))
|
d1 = r.decbufAt(int(off))
|
||||||
cnt = d1.be32()
|
|
||||||
d2 = d1.decbuf(d1.be32int())
|
d2 = d1.decbuf(d1.be32int())
|
||||||
|
cnt = d2.be32()
|
||||||
)
|
)
|
||||||
|
|
||||||
res := make(map[string]uint32, 512)
|
res := make(map[string]uint32, 512)
|
||||||
|
@ -647,7 +677,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
||||||
keys := make([]string, 0, keyCount)
|
keys := make([]string, 0, keyCount)
|
||||||
|
|
||||||
for i := 0; i < keyCount; i++ {
|
for i := 0; i < keyCount; i++ {
|
||||||
keys = append(keys, d2.uvarintTempStr())
|
keys = append(keys, d2.uvarintStr())
|
||||||
}
|
}
|
||||||
res[strings.Join(keys, sep)] = uint32(d2.uvarint())
|
res[strings.Join(keys, sep)] = uint32(d2.uvarint())
|
||||||
|
|
||||||
|
@ -682,28 +712,20 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
||||||
d := r.decbufAt(int(o))
|
s, ok := r.symbols[o]
|
||||||
|
if !ok {
|
||||||
s := d.uvarintTempStr()
|
return "", errors.Errorf("unknown symbol offset %d", o)
|
||||||
if d.err() != nil {
|
|
||||||
return "", errors.Wrapf(d.err(), "read symbol at %d", o)
|
|
||||||
}
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) Symbols() (map[string]struct{}, error) {
|
func (r *indexReader) Symbols() (map[string]struct{}, error) {
|
||||||
d1 := r.decbufAt(int(r.toc.symbols))
|
res := make(map[string]struct{}, len(r.symbols))
|
||||||
d2 := d1.decbuf(d1.be32int())
|
|
||||||
|
|
||||||
count := d2.be32int()
|
for _, s := range r.symbols {
|
||||||
sym := make(map[string]struct{}, count)
|
res[s] = struct{}{}
|
||||||
|
|
||||||
for ; count > 0; count-- {
|
|
||||||
s := d2.uvarintTempStr()
|
|
||||||
sym[s] = struct{}{}
|
|
||||||
}
|
}
|
||||||
|
return res, nil
|
||||||
return sym, d2.err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
||||||
|
|
79
vendor/github.com/prometheus/tsdb/pool.go
generated
vendored
79
vendor/github.com/prometheus/tsdb/pool.go
generated
vendored
|
@ -1,79 +0,0 @@
|
||||||
package tsdb
|
|
||||||
|
|
||||||
import "sync"
|
|
||||||
|
|
||||||
type bucketPool struct {
|
|
||||||
buckets []sync.Pool
|
|
||||||
sizes []int
|
|
||||||
new func(sz int) interface{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newBucketPool(minSize, maxSize int, factor float64, f func(sz int) interface{}) *bucketPool {
|
|
||||||
if minSize < 1 {
|
|
||||||
panic("invalid minimum pool size")
|
|
||||||
}
|
|
||||||
if maxSize < 1 {
|
|
||||||
panic("invalid maximum pool size")
|
|
||||||
}
|
|
||||||
if factor < 1 {
|
|
||||||
panic("invalid factor")
|
|
||||||
}
|
|
||||||
|
|
||||||
var sizes []int
|
|
||||||
|
|
||||||
for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
|
|
||||||
sizes = append(sizes, s)
|
|
||||||
}
|
|
||||||
|
|
||||||
p := &bucketPool{
|
|
||||||
buckets: make([]sync.Pool, len(sizes)),
|
|
||||||
sizes: sizes,
|
|
||||||
new: f,
|
|
||||||
}
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *bucketPool) get(sz int) interface{} {
|
|
||||||
for i, bktSize := range p.sizes {
|
|
||||||
if sz > bktSize {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
x := p.buckets[i].Get()
|
|
||||||
if x == nil {
|
|
||||||
x = p.new(sz)
|
|
||||||
}
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
return p.new(sz)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *bucketPool) put(x interface{}, sz int) {
|
|
||||||
for i, bktSize := range p.sizes {
|
|
||||||
if sz > bktSize {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
p.buckets[i].Put(x)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type poolUint64 struct {
|
|
||||||
p *bucketPool
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPoolUint64(minSize, maxSize int, factor float64) poolUint64 {
|
|
||||||
return poolUint64{
|
|
||||||
p: newBucketPool(minSize, maxSize, factor, func(sz int) interface{} {
|
|
||||||
return make([]uint64, 0, sz)
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p poolUint64) get(sz int) []uint64 {
|
|
||||||
return p.p.get(sz).([]uint64)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p poolUint64) put(x []uint64) {
|
|
||||||
p.p.put(x[:0], cap(x))
|
|
||||||
}
|
|
53
vendor/github.com/prometheus/tsdb/wal.go
generated
vendored
53
vendor/github.com/prometheus/tsdb/wal.go
generated
vendored
|
@ -27,10 +27,12 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/go-kit/kit/log/level"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WALEntryType indicates what data a WAL entry contains.
|
// WALEntryType indicates what data a WAL entry contains.
|
||||||
|
@ -64,6 +66,26 @@ type SeriesCB func([]RefSeries) error
|
||||||
// is only valid until the call returns.
|
// is only valid until the call returns.
|
||||||
type DeletesCB func([]Stone) error
|
type DeletesCB func([]Stone) error
|
||||||
|
|
||||||
|
type walMetrics struct {
|
||||||
|
fsyncDuration prometheus.Summary
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
|
||||||
|
m := &walMetrics{}
|
||||||
|
|
||||||
|
m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||||||
|
Name: "tsdb_wal_fsync_duration_seconds",
|
||||||
|
Help: "Duration of WAL fsync.",
|
||||||
|
})
|
||||||
|
|
||||||
|
if r != nil {
|
||||||
|
r.MustRegister(
|
||||||
|
m.fsyncDuration,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
// WAL is a write ahead log that can log new series labels and samples.
|
// WAL is a write ahead log that can log new series labels and samples.
|
||||||
// It must be completely read before new entries are logged.
|
// It must be completely read before new entries are logged.
|
||||||
type WAL interface {
|
type WAL interface {
|
||||||
|
@ -149,6 +171,7 @@ func newCRC32() hash.Hash32 {
|
||||||
// SegmentWAL is a write ahead log for series data.
|
// SegmentWAL is a write ahead log for series data.
|
||||||
type SegmentWAL struct {
|
type SegmentWAL struct {
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
|
metrics *walMetrics
|
||||||
|
|
||||||
dirFile *os.File
|
dirFile *os.File
|
||||||
files []*segmentFile
|
files []*segmentFile
|
||||||
|
@ -168,7 +191,7 @@ type SegmentWAL struct {
|
||||||
|
|
||||||
// OpenSegmentWAL opens or creates a write ahead log in the given directory.
|
// OpenSegmentWAL opens or creates a write ahead log in the given directory.
|
||||||
// The WAL must be read completely before new data is written.
|
// The WAL must be read completely before new data is written.
|
||||||
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) {
|
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, r prometheus.Registerer) (*SegmentWAL, error) {
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -189,6 +212,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration)
|
||||||
segmentSize: walSegmentSizeBytes,
|
segmentSize: walSegmentSizeBytes,
|
||||||
crc32: newCRC32(),
|
crc32: newCRC32(),
|
||||||
}
|
}
|
||||||
|
w.metrics = newWalMetrics(w, r)
|
||||||
|
|
||||||
fns, err := sequenceFiles(w.dirFile.Name())
|
fns, err := sequenceFiles(w.dirFile.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -228,7 +252,7 @@ func (r *repairingWALReader) Read(series SeriesCB, samples SamplesCB, deletes De
|
||||||
|
|
||||||
// truncate the WAL after the last valid entry.
|
// truncate the WAL after the last valid entry.
|
||||||
func (w *SegmentWAL) truncate(err error, file int, lastOffset int64) error {
|
func (w *SegmentWAL) truncate(err error, file int, lastOffset int64) error {
|
||||||
w.logger.Log("msg", "WAL corruption detected; truncating",
|
level.Error(w.logger).Log("msg", "WAL corruption detected; truncating",
|
||||||
"err", err, "file", w.files[file].Name(), "pos", lastOffset)
|
"err", err, "file", w.files[file].Name(), "pos", lastOffset)
|
||||||
|
|
||||||
// Close and delete all files after the current one.
|
// Close and delete all files after the current one.
|
||||||
|
@ -527,16 +551,16 @@ func (w *SegmentWAL) cut() error {
|
||||||
go func() {
|
go func() {
|
||||||
off, err := hf.Seek(0, os.SEEK_CUR)
|
off, err := hf.Seek(0, os.SEEK_CUR)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
|
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
|
||||||
}
|
}
|
||||||
if err := hf.Truncate(off); err != nil {
|
if err := hf.Truncate(off); err != nil {
|
||||||
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
|
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
|
||||||
}
|
}
|
||||||
if err := hf.Sync(); err != nil {
|
if err := hf.Sync(); err != nil {
|
||||||
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
|
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
|
||||||
}
|
}
|
||||||
if err := hf.Close(); err != nil {
|
if err := hf.Close(); err != nil {
|
||||||
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
|
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -552,7 +576,7 @@ func (w *SegmentWAL) cut() error {
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err = w.dirFile.Sync(); err != nil {
|
if err = w.dirFile.Sync(); err != nil {
|
||||||
w.logger.Log("msg", "sync WAL directory", "err", err)
|
level.Error(w.logger).Log("msg", "sync WAL directory", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -591,7 +615,10 @@ func (w *SegmentWAL) Sync() error {
|
||||||
}
|
}
|
||||||
if head != nil {
|
if head != nil {
|
||||||
// But only fsync the head segment after releasing the mutex as it will block on disk I/O.
|
// But only fsync the head segment after releasing the mutex as it will block on disk I/O.
|
||||||
return fileutil.Fdatasync(head.File)
|
start := time.Now()
|
||||||
|
err := fileutil.Fdatasync(head.File)
|
||||||
|
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -603,7 +630,11 @@ func (w *SegmentWAL) sync() error {
|
||||||
if w.head() == nil {
|
if w.head() == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fileutil.Fdatasync(w.head().File)
|
|
||||||
|
start := time.Now()
|
||||||
|
err := fileutil.Fdatasync(w.head().File)
|
||||||
|
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *SegmentWAL) flush() error {
|
func (w *SegmentWAL) flush() error {
|
||||||
|
@ -629,7 +660,7 @@ func (w *SegmentWAL) run(interval time.Duration) {
|
||||||
return
|
return
|
||||||
case <-tick:
|
case <-tick:
|
||||||
if err := w.Sync(); err != nil {
|
if err := w.Sync(); err != nil {
|
||||||
w.logger.Log("msg", "sync failed", "err", err)
|
level.Error(w.logger).Log("msg", "sync failed", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
20
vendor/vendor.json
vendored
20
vendor/vendor.json
vendored
|
@ -867,22 +867,28 @@
|
||||||
"revisionTime": "2016-04-11T19:08:41Z"
|
"revisionTime": "2016-04-11T19:08:41Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "evkeOdR0mTFS7yyREas6oa1QvHY=",
|
"checksumSHA1": "u1ERYVx8oD5cH3UkQunUTi9n1WI=",
|
||||||
"path": "github.com/prometheus/tsdb",
|
"path": "github.com/prometheus/tsdb",
|
||||||
"revision": "69f105f4f9478e929ef2a7d7553a7558b1de5c84",
|
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
|
||||||
"revisionTime": "2017-09-21T12:57:51Z"
|
"revisionTime": "2017-10-05T07:27:10Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
|
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
|
||||||
"path": "github.com/prometheus/tsdb/chunks",
|
"path": "github.com/prometheus/tsdb/chunks",
|
||||||
"revision": "69f105f4f9478e929ef2a7d7553a7558b1de5c84",
|
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
|
||||||
"revisionTime": "2017-09-21T12:57:51Z"
|
"revisionTime": "2017-10-05T07:27:10Z"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=",
|
||||||
|
"path": "github.com/prometheus/tsdb/fileutil",
|
||||||
|
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
|
||||||
|
"revisionTime": "2017-10-05T07:27:10Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
|
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
|
||||||
"path": "github.com/prometheus/tsdb/labels",
|
"path": "github.com/prometheus/tsdb/labels",
|
||||||
"revision": "69f105f4f9478e929ef2a7d7553a7558b1de5c84",
|
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
|
||||||
"revisionTime": "2017-09-21T12:57:51Z"
|
"revisionTime": "2017-10-05T07:27:10Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
||||||
|
|
Loading…
Reference in a new issue