prometheus/db.go

1005 lines
24 KiB
Go
Raw Normal View History

2017-04-10 11:59:45 -07:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2016-11-15 01:34:25 -08:00
// Package tsdb implements a time series storage for float64 sample data.
package tsdb
import (
"bytes"
2016-12-04 04:16:11 -08:00
"fmt"
2017-02-27 01:46:15 -08:00
"io"
"io/ioutil"
"math"
2016-12-04 04:16:11 -08:00
"os"
"path/filepath"
"runtime"
"sort"
2016-12-14 23:31:26 -08:00
"strconv"
"strings"
"sync"
"time"
2016-12-14 23:31:26 -08:00
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
2017-01-03 06:43:26 -08:00
"github.com/pkg/errors"
2016-12-31 00:48:49 -08:00
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"golang.org/x/sync/errgroup"
2016-11-15 01:34:25 -08:00
)
2016-12-09 01:00:14 -08:00
// DefaultOptions used for the DB. They are sane for setups using
2018-03-02 03:12:32 -08:00
// millisecond precision timestamps.
2016-11-15 01:34:25 -08:00
var DefaultOptions = &Options{
2017-02-09 17:54:26 -08:00
WALFlushInterval: 5 * time.Second,
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
2017-07-18 02:17:17 -07:00
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
2017-05-09 03:52:47 -07:00
NoLockfile: false,
2016-11-15 01:34:25 -08:00
}
// Options of the DB storage.
type Options struct {
2017-11-12 21:10:23 -08:00
// The interval at which the write ahead log is flushed to disk.
WALFlushInterval time.Duration
2017-02-09 17:54:26 -08:00
// Duration of persisted data to keep.
RetentionDuration uint64
// The sizes of the Blocks.
BlockRanges []int64
2017-05-09 03:52:47 -07:00
// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool
2016-11-15 01:34:25 -08:00
}
// Appender allows appending a batch of data. It must be completed with a
// call to Commit or Rollback and must not be reused afterwards.
//
// Operations on the Appender interface are not goroutine-safe.
type Appender interface {
// Add adds a sample pair for the given series. A reference number is
// returned which can be used to add further samples in the same or later
// transactions.
// Returned reference numbers are ephemeral and may be rejected in calls
// to AddFast() at any point. Adding the sample via Add() returns a new
// reference number.
// If the reference is the empty string it must not be used for caching.
Add(l labels.Labels, t int64, v float64) (uint64, error)
// Add adds a sample pair for the referenced series. It is generally faster
// than adding a sample by providing its full label set.
AddFast(ref uint64, t int64, v float64) error
// Commit submits the collected samples and purges the batch.
Commit() error
// Rollback rolls back all modifications made in the appender so far.
Rollback() error
}
// DB handles reads and writes of time series falling into
// a hashed partition of a seriedb.
type DB struct {
dir string
lockf fileutil.Releaser
logger log.Logger
metrics *dbMetrics
opts *Options
chunkPool chunkenc.Pool
compactor Compactor
2016-12-09 01:00:14 -08:00
2017-07-14 00:00:22 -07:00
// Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex
blocks []*Block
head *Head
compactc chan struct{}
donec chan struct{}
stopc chan struct{}
// cmtx is used to control compactions and deletions.
cmtx sync.Mutex
compactionsEnabled bool
2016-12-09 01:00:14 -08:00
}
type dbMetrics struct {
loadedBlocks prometheus.GaugeFunc
reloads prometheus.Counter
reloadsFailed prometheus.Counter
compactionsTriggered prometheus.Counter
cutoffs prometheus.Counter
cutoffsFailed prometheus.Counter
tombCleanTimer prometheus.Histogram
2016-12-31 00:48:49 -08:00
}
2017-05-26 06:13:03 -07:00
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m := &dbMetrics{}
2017-01-03 06:43:26 -08:00
2017-05-26 06:13:03 -07:00
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
2017-10-20 03:32:32 -07:00
Name: "prometheus_tsdb_blocks_loaded",
2017-05-26 06:13:03 -07:00
Help: "Number of currently loaded data blocks",
}, func() float64 {
db.mtx.RLock()
defer db.mtx.RUnlock()
return float64(len(db.blocks))
})
m.reloads = prometheus.NewCounter(prometheus.CounterOpts{
2017-10-20 03:32:32 -07:00
Name: "prometheus_tsdb_reloads_total",
2017-05-26 06:13:03 -07:00
Help: "Number of times the database reloaded block data from disk.",
})
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{
2017-10-20 03:32:32 -07:00
Name: "prometheus_tsdb_reloads_failures_total",
Help: "Number of times the database failed to reload block data from disk.",
2017-05-26 06:13:03 -07:00
})
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
2017-10-20 03:32:32 -07:00
Name: "prometheus_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
})
m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_retention_cutoffs_total",
Help: "Number of times the database cut off block data from disk.",
})
m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_retention_cutoffs_failures_total",
Help: "Number of times the database failed to cut off block data from disk.",
})
m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_tombstone_cleanup_seconds",
Help: "The time taken to recompact blocks to remove tombstones.",
})
2016-12-31 00:48:49 -08:00
if r != nil {
r.MustRegister(
2017-05-26 06:13:03 -07:00
m.loadedBlocks,
m.reloads,
m.reloadsFailed,
m.cutoffs,
m.cutoffsFailed,
2017-01-09 10:14:21 -08:00
m.compactionsTriggered,
m.tombCleanTimer,
2016-12-31 00:48:49 -08:00
)
}
return m
}
// Open returns a new DB in the given directory.
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
2017-02-19 07:04:37 -08:00
if l == nil {
l = log.NewNopLogger()
2017-02-19 07:04:37 -08:00
}
2017-01-17 21:18:32 -08:00
if opts == nil {
opts = DefaultOptions
}
// Fixup bad format written by Prometheus 2.1.
2018-02-09 04:37:10 -08:00
if err := repairBadIndexVersion(l, dir); err != nil {
2018-02-09 04:11:03 -08:00
return nil, err
}
2017-01-17 21:18:32 -08:00
db = &DB{
dir: dir,
logger: l,
opts: opts,
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
compactionsEnabled: true,
chunkPool: chunkenc.NewPool(),
}
2017-05-26 06:13:03 -07:00
db.metrics = newDBMetrics(db, r)
2017-05-09 03:52:47 -07:00
if !opts.NoLockfile {
absdir, err := filepath.Abs(dir)
if err != nil {
return nil, err
}
lockf, _, err := fileutil.Flock(filepath.Join(absdir, "lock"))
2017-05-09 03:52:47 -07:00
if err != nil {
return nil, errors.Wrap(err, "lock DB directory")
2017-05-09 03:52:47 -07:00
}
db.lockf = lockf
2017-05-09 03:52:47 -07:00
}
2017-09-01 02:46:46 -07:00
db.compactor, err = NewLeveledCompactor(r, l, opts.BlockRanges, db.chunkPool)
if err != nil {
return nil, errors.Wrap(err, "create leveled compactor")
}
2017-10-04 12:51:34 -07:00
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r)
if err != nil {
return nil, err
}
2017-09-01 02:46:46 -07:00
db.head, err = NewHead(r, l, wal, opts.BlockRanges[0])
if err != nil {
2017-08-30 08:38:25 -07:00
return nil, err
}
if err := db.reload(); err != nil {
2016-12-14 23:31:26 -08:00
return nil, err
}
2017-09-06 07:20:37 -07:00
if err := db.head.ReadWAL(); err != nil {
return nil, errors.Wrap(err, "read WAL")
}
go db.run()
return db, nil
}
2017-06-08 03:14:13 -07:00
// Dir returns the directory of the database.
func (db *DB) Dir() string {
return db.dir
}
func (db *DB) run() {
defer close(db.donec)
backoff := time.Duration(0)
for {
select {
case <-db.stopc:
return
case <-time.After(backoff):
}
select {
case <-time.After(1 * time.Minute):
select {
case db.compactc <- struct{}{}:
default:
}
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
_, err1 := db.retentionCutoff()
if err1 != nil {
level.Error(db.logger).Log("msg", "retention cutoff failed", "err", err1)
2017-03-19 05:50:35 -07:00
}
_, err2 := db.compact()
if err2 != nil {
level.Error(db.logger).Log("msg", "compaction failed", "err", err2)
2017-03-19 05:50:35 -07:00
}
2017-03-17 07:30:05 -07:00
if err1 != nil || err2 != nil {
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
} else {
backoff = 0
}
case <-db.stopc:
return
}
}
}
func (db *DB) retentionCutoff() (b bool, err error) {
defer func() {
if !b && err == nil {
// no data had to be cut off.
return
}
db.metrics.cutoffs.Inc()
if err != nil {
db.metrics.cutoffsFailed.Inc()
}
}()
2017-03-17 07:30:05 -07:00
if db.opts.RetentionDuration == 0 {
return false, nil
}
2017-03-17 07:56:19 -07:00
db.mtx.RLock()
blocks := db.blocks[:]
db.mtx.RUnlock()
2017-03-17 07:56:19 -07:00
if len(blocks) == 0 {
2017-03-17 07:30:05 -07:00
return false, nil
}
last := blocks[len(db.blocks)-1]
2017-03-17 07:30:05 -07:00
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
dirs, err := retentionCutoffDirs(db.dir, mint)
if err != nil {
return false, err
}
2017-03-17 07:30:05 -07:00
// This will close the dirs and then delete the dirs.
if len(dirs) > 0 {
return true, db.reload(dirs...)
}
return false, nil
2017-03-17 07:30:05 -07:00
}
// Appender opens a new appender against the database.
func (db *DB) Appender() Appender {
return dbAppender{db: db, Appender: db.head.Appender()}
}
// dbAppender wraps the DB's head appender and triggers compactions on commit
// if necessary.
type dbAppender struct {
Appender
db *DB
}
func (a dbAppender) Commit() error {
err := a.Appender.Commit()
2017-09-04 06:07:30 -07:00
// We could just run this check every few minutes practically. But for benchmarks
// and high frequency use cases this is the safer way.
if a.db.head.MaxTime()-a.db.head.MinTime() > a.db.head.chunkRange/2*3 {
select {
case a.db.compactc <- struct{}{}:
default:
}
}
return err
}
func (db *DB) compact() (changes bool, err error) {
db.cmtx.Lock()
defer db.cmtx.Unlock()
2017-01-17 21:18:32 -08:00
if !db.compactionsEnabled {
return false, nil
}
// Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority.
for {
select {
case <-db.stopc:
return changes, nil
default:
}
// The head has a compactable range if 1.5 level 0 ranges are between the oldest
// and newest timestamp. The 0.5 acts as a buffer of the appendable window.
if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 {
break
}
mint, maxt := rangeForTimestamp(db.head.MinTime(), db.opts.BlockRanges[0])
// Wrap head into a range that bounds all reads to it.
head := &rangeHead{
head: db.head,
mint: mint,
maxt: maxt,
}
2017-11-14 06:25:30 -08:00
if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil {
return changes, errors.Wrap(err, "persist head block")
}
changes = true
2017-08-09 02:10:29 -07:00
runtime.GC()
if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
2017-08-09 02:10:29 -07:00
}
runtime.GC()
}
2017-03-02 00:13:29 -08:00
// Check for compactions of multiple blocks.
for {
2017-08-09 02:10:29 -07:00
plan, err := db.compactor.Plan(db.dir)
2017-03-02 00:13:29 -08:00
if err != nil {
return changes, errors.Wrap(err, "plan compaction")
2017-03-02 00:13:29 -08:00
}
2017-08-09 02:10:29 -07:00
if len(plan) == 0 {
break
}
2017-03-02 00:13:29 -08:00
select {
case <-db.stopc:
return changes, nil
2017-03-02 00:13:29 -08:00
default:
}
2017-12-05 02:49:22 -08:00
if _, err := db.compactor.Compact(db.dir, plan...); err != nil {
2017-08-09 02:10:29 -07:00
return changes, errors.Wrapf(err, "compact %s", plan)
}
changes = true
runtime.GC()
if err := db.reload(plan...); err != nil {
return changes, errors.Wrap(err, "reload blocks")
}
runtime.GC()
}
return changes, nil
2017-02-09 17:54:26 -08:00
}
// retentionCutoffDirs returns all directories of blocks in dir that are strictly
2017-03-17 07:30:05 -07:00
// before mint.
func retentionCutoffDirs(dir string, mint int64) ([]string, error) {
2017-03-19 05:50:35 -07:00
df, err := fileutil.OpenDir(dir)
if err != nil {
return nil, errors.Wrapf(err, "open directory")
2017-03-19 05:50:35 -07:00
}
2017-06-11 15:05:04 -07:00
defer df.Close()
2017-03-17 07:30:05 -07:00
dirs, err := blockDirs(dir)
if err != nil {
return nil, errors.Wrapf(err, "list block dirs %s", dir)
2017-03-17 07:30:05 -07:00
}
delDirs := []string{}
2017-03-17 07:30:05 -07:00
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
return nil, errors.Wrapf(err, "read block meta %s", dir)
2017-03-17 07:30:05 -07:00
}
// The first block we encounter marks that we crossed the boundary
// of deletable blocks.
if meta.MaxTime >= mint {
break
}
delDirs = append(delDirs, dir)
2017-03-17 07:30:05 -07:00
}
return delDirs, nil
2017-03-17 07:30:05 -07:00
}
2017-03-02 00:13:29 -08:00
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
for _, b := range db.blocks {
if b.Meta().ULID == id {
return b, true
}
}
return nil, false
}
func stringsContain(set []string, elem string) bool {
for _, e := range set {
if elem == e {
return true
}
}
return false
}
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
// a list of block directories which should be deleted during reload.
func (db *DB) reload(deleteable ...string) (err error) {
2017-08-30 08:38:25 -07:00
defer func() {
2017-05-26 06:13:03 -07:00
if err != nil {
db.metrics.reloadsFailed.Inc()
}
db.metrics.reloads.Inc()
2017-08-30 08:38:25 -07:00
}()
2017-05-26 06:13:03 -07:00
2017-03-02 00:13:29 -08:00
dirs, err := blockDirs(db.dir)
if err != nil {
return errors.Wrap(err, "find blocks")
}
2017-02-09 17:54:26 -08:00
var (
blocks []*Block
exist = map[ulid.ULID]struct{}{}
2017-02-09 17:54:26 -08:00
)
2017-03-02 00:13:29 -08:00
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
return errors.Wrapf(err, "read meta information %s", dir)
2017-02-09 17:54:26 -08:00
}
// If the block is pending for deletion, don't add it to the new block set.
if stringsContain(deleteable, dir) {
continue
}
2016-12-14 23:31:26 -08:00
b, ok := db.getBlock(meta.ULID)
if !ok {
b, err = OpenBlock(dir, db.chunkPool)
if err != nil {
return errors.Wrapf(err, "open block %s", dir)
2017-03-02 00:13:29 -08:00
}
}
2016-12-22 03:05:24 -08:00
blocks = append(blocks, b)
exist[meta.ULID] = struct{}{}
2016-12-09 01:00:14 -08:00
}
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
})
if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence")
}
// Swap in new blocks first for subsequently created readers to be seen.
// Then close previous blocks, which may block for pending readers to complete.
db.mtx.Lock()
oldBlocks := db.blocks
db.blocks = blocks
db.mtx.Unlock()
for _, b := range oldBlocks {
if _, ok := exist[b.Meta().ULID]; ok {
continue
}
if err := b.Close(); err != nil {
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
}
if err := os.RemoveAll(b.Dir()); err != nil {
level.Warn(db.logger).Log("msg", "deleting block failed", "err", err)
}
}
// Garbage collect data in the head if the most recent persisted block
// covers data of its current time range.
if len(blocks) == 0 {
return nil
}
maxt := blocks[len(blocks)-1].Meta().MaxTime
2017-09-01 05:38:49 -07:00
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
2017-01-03 06:43:26 -08:00
// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func validateBlockSequence(bs []*Block) error {
if len(bs) <= 1 {
return nil
}
var metas []BlockMeta
for _, b := range bs {
metas = append(metas, b.meta)
}
overlaps := OverlappingBlocks(metas)
if len(overlaps) > 0 {
return errors.Errorf("block time ranges overlap: %s", overlaps)
}
return nil
}
// TimeRange specifies minTime and maxTime range.
type TimeRange struct {
Min, Max int64
}
// Overlaps contains overlapping blocks aggregated by overlapping range.
type Overlaps map[TimeRange][]BlockMeta
// String returns human readable string form of overlapped blocks.
func (o Overlaps) String() string {
var res []string
for r, overlaps := range o {
var groups []string
for _, m := range overlaps {
groups = append(groups, fmt.Sprintf(
"<ulid: %s, mint: %d, maxt: %d, range: %s>",
m.ULID.String(),
m.MinTime,
m.MaxTime,
(time.Duration((m.MaxTime-m.MinTime)/1000)*time.Second).String(),
))
}
res = append(res, fmt.Sprintf(
"[mint: %d, maxt: %d, range: %s, blocks: %d]: %s",
r.Min, r.Max,
(time.Duration((r.Max-r.Min)/1000)*time.Second).String(),
len(overlaps),
strings.Join(groups, ", ")),
)
}
return strings.Join(res, "\n")
}
// OverlappingBlocks returns all overlapping blocks from given meta files.
func OverlappingBlocks(bm []BlockMeta) Overlaps {
if len(bm) <= 1 {
return nil
}
sort.Slice(bm, func(i, j int) bool {
return bm[i].MinTime < bm[j].MinTime
})
var (
overlaps [][]BlockMeta
// pending contains not ended blocks in regards to "current" timestamp.
pending = []BlockMeta{bm[0]}
// continuousPending helps to aggregate same overlaps to single group.
continuousPending = true
)
// We have here blocks sorted by minTime. We iterate over each block and treat its minTime as our "current" timestamp.
// We check if any of the pending block finished (blocks that we have seen before, but their maxTime was still ahead current
// timestamp). If not, it means they overlap with our current block. In the same time current block is assumed pending.
for _, b := range bm[1:] {
var newPending []BlockMeta
for _, p := range pending {
// "b.MinTime" is our current time.
if b.MinTime >= p.MaxTime {
continuousPending = false
continue
}
// "p" overlaps with "b" and "p" is still pending.
newPending = append(newPending, p)
}
// Our block "b" is now pending.
pending = append(newPending, b)
if len(newPending) == 0 {
// No overlaps.
continue
}
if continuousPending && len(overlaps) > 0 {
overlaps[len(overlaps)-1] = append(overlaps[len(overlaps)-1], b)
continue
}
overlaps = append(overlaps, append(newPending, b))
// Start new pendings.
continuousPending = true
}
// Fetch the critical overlapped time range foreach overlap groups.
overlapGroups := Overlaps{}
for _, overlap := range overlaps {
minRange := TimeRange{Min: 0, Max: math.MaxInt64}
for _, b := range overlap {
if minRange.Max > b.MaxTime {
minRange.Max = b.MaxTime
}
if minRange.Min < b.MinTime {
minRange.Min = b.MinTime
}
}
overlapGroups[minRange] = overlap
}
return overlapGroups
2017-01-02 13:24:35 -08:00
}
func (db *DB) String() string {
return "HEAD"
}
// Blocks returns the databases persisted blocks.
func (db *DB) Blocks() []*Block {
db.mtx.RLock()
defer db.mtx.RUnlock()
return db.blocks
}
// Head returns the databases's head.
func (db *DB) Head() *Head {
return db.head
}
2017-01-05 23:08:02 -08:00
// Close the partition.
func (db *DB) Close() error {
close(db.stopc)
<-db.donec
db.mtx.Lock()
defer db.mtx.Unlock()
var g errgroup.Group
// blocks also contains all head blocks.
for _, pb := range db.blocks {
g.Go(pb.Close)
2016-12-14 23:31:26 -08:00
}
var merr MultiError
merr.Add(g.Wait())
2017-05-09 03:52:47 -07:00
if db.lockf != nil {
merr.Add(db.lockf.Release())
2017-05-09 03:52:47 -07:00
}
merr.Add(db.head.Close())
2017-01-02 13:24:35 -08:00
return merr.Err()
2016-12-09 01:00:14 -08:00
}
// DisableCompactions disables compactions.
func (db *DB) DisableCompactions() {
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.compactionsEnabled = false
level.Info(db.logger).Log("msg", "compactions disabled")
}
// EnableCompactions enables compactions.
func (db *DB) EnableCompactions() {
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.compactionsEnabled = true
level.Info(db.logger).Log("msg", "compactions enabled")
}
2018-02-28 03:04:55 -08:00
// Snapshot writes the current data to the directory. If withHead is set to true it
// will create a new block containing all data that's currently in the memory buffer/WAL.
func (db *DB) Snapshot(dir string, withHead bool) error {
if dir == db.dir {
return errors.Errorf("cannot snapshot into base directory")
}
if _, err := ulid.Parse(dir); err == nil {
return errors.Errorf("dir must not be a valid ULID")
}
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil {
return errors.Wrapf(err, "error snapshotting block: %s", b.Dir())
}
}
2018-02-28 03:04:55 -08:00
if !withHead {
return nil
}
2017-11-14 06:25:30 -08:00
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
return errors.Wrap(err, "snapshot head block")
}
2017-07-14 00:00:22 -07:00
// Querier returns a new querier over the data partition for the given time range.
// A goroutine must not handle more than one open Querier.
func (db *DB) Querier(mint, maxt int64) (Querier, error) {
var blocks []BlockReader
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
blocks = append(blocks, b)
}
}
if maxt >= db.head.MinTime() {
blocks = append(blocks, db.head)
}
sq := &querier{
blocks: make([]Querier, 0, len(blocks)),
}
for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt)
if err == nil {
sq.blocks = append(sq.blocks, q)
continue
}
// If we fail, all previously opened queriers must be closed.
for _, q := range sq.blocks {
q.Close()
}
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
return sq, nil
}
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
mint = (t / width) * width
return mint, mint + width
}
// Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.
func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
2017-07-14 00:00:22 -07:00
var g errgroup.Group
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
g.Go(func(b *Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) }
}(b))
}
}
g.Go(func() error {
return db.head.Delete(mint, maxt, ms...)
})
return g.Wait()
}
// CleanTombstones re-writes any blocks with tombstones.
func (db *DB) CleanTombstones() (err error) {
db.cmtx.Lock()
defer db.cmtx.Unlock()
start := time.Now()
defer db.metrics.tombCleanTimer.Observe(time.Since(start).Seconds())
newUIDs := []ulid.ULID{}
defer func() {
// If any error is caused, we need to delete all the new directory created.
if err != nil {
for _, uid := range newUIDs {
dir := filepath.Join(db.Dir(), uid.String())
if err := os.RemoveAll(dir); err != nil {
level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err)
}
}
}
}()
db.mtx.RLock()
blocks := db.blocks[:]
db.mtx.RUnlock()
deletable := []string{}
for _, b := range blocks {
uid, er := b.CleanTombstones(db.Dir(), db.compactor)
if er != nil {
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
return err
}
if uid != nil { // New block was created.
deletable = append(deletable, b.Dir())
newUIDs = append(newUIDs, *uid)
}
}
if len(deletable) == 0 {
return nil
}
return errors.Wrap(db.reload(deletable...), "reload blocks")
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
// Checks Overlap: http://stackoverflow.com/questions/3269434/
return amin <= bmax && bmin <= amax
}
func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
}
_, err := ulid.Parse(fi.Name())
return err == nil
}
func blockDirs(dir string) ([]string, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
var dirs []string
for _, fi := range files {
if isBlockDir(fi) {
dirs = append(dirs, filepath.Join(dir, fi.Name()))
}
}
return dirs, nil
2017-01-03 06:43:26 -08:00
}
2016-12-09 04:41:38 -08:00
func sequenceFiles(dir string) ([]string, error) {
2017-02-13 23:53:19 -08:00
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
var res []string
for _, fi := range files {
if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
continue
2017-02-13 23:53:19 -08:00
}
res = append(res, filepath.Join(dir, fi.Name()))
2017-02-13 23:53:19 -08:00
}
return res, nil
}
func nextSequenceFile(dir string) (string, int, error) {
names, err := fileutil.ReadDir(dir)
if err != nil {
return "", 0, err
}
i := uint64(0)
for _, n := range names {
j, err := strconv.ParseUint(n, 10, 64)
if err != nil {
continue
}
i = j
}
return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
}
2016-12-09 04:41:38 -08:00
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error
// Returns a concatenated string of the contained errors
func (es MultiError) Error() string {
var buf bytes.Buffer
if len(es) > 1 {
fmt.Fprintf(&buf, "%d errors: ", len(es))
2016-12-08 01:04:24 -08:00
}
for i, err := range es {
if i != 0 {
buf.WriteString("; ")
}
buf.WriteString(err.Error())
}
return buf.String()
2016-11-15 01:34:25 -08:00
}
2016-12-14 23:31:26 -08:00
// Add adds the error to the error list if it is not nil.
func (es *MultiError) Add(err error) {
if err == nil {
return
}
if merr, ok := err.(MultiError); ok {
*es = append(*es, merr...)
} else {
*es = append(*es, err)
2016-12-14 23:31:26 -08:00
}
}
// Err returns the error list as an error or nil if it is empty.
2016-12-14 23:31:26 -08:00
func (es MultiError) Err() error {
if len(es) == 0 {
return nil
}
return es
}
2017-02-27 01:46:15 -08:00
func closeAll(cs ...io.Closer) error {
var merr MultiError
for _, c := range cs {
merr.Add(c.Close())
}
return merr.Err()
}
func exponential(d, min, max time.Duration) time.Duration {
d *= 2
if d < min {
d = min
}
if d > max {
d = max
}
return d
}