prometheus/db.go

1021 lines
23 KiB
Go
Raw Normal View History

2017-04-10 11:59:45 -07:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2016-11-15 01:34:25 -08:00
// Package tsdb implements a time series storage for float64 sample data.
package tsdb
import (
"bytes"
2016-12-04 04:16:11 -08:00
"fmt"
2017-02-27 01:46:15 -08:00
"io"
"io/ioutil"
2016-12-04 04:16:11 -08:00
"os"
"path/filepath"
"runtime"
"sort"
2016-12-14 23:31:26 -08:00
"strconv"
"strings"
"sync"
"time"
"unsafe"
2016-11-15 01:34:25 -08:00
2016-12-14 23:31:26 -08:00
"golang.org/x/sync/errgroup"
2017-01-03 06:43:26 -08:00
"github.com/coreos/etcd/pkg/fileutil"
"github.com/go-kit/kit/log"
"github.com/nightlyone/lockfile"
"github.com/oklog/ulid"
2017-01-03 06:43:26 -08:00
"github.com/pkg/errors"
2016-12-31 00:48:49 -08:00
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/labels"
2016-11-15 01:34:25 -08:00
)
2016-12-09 01:00:14 -08:00
// DefaultOptions used for the DB. They are sane for setups using
// millisecond precision timestampdb.
2016-11-15 01:34:25 -08:00
var DefaultOptions = &Options{
2017-02-09 17:54:26 -08:00
WALFlushInterval: 5 * time.Second,
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
2017-07-18 02:17:17 -07:00
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
2017-05-09 03:52:47 -07:00
NoLockfile: false,
2016-11-15 01:34:25 -08:00
}
// Options of the DB storage.
type Options struct {
// The interval at which the write ahead log is flushed to disc.
WALFlushInterval time.Duration
2017-02-09 17:54:26 -08:00
// Duration of persisted data to keep.
RetentionDuration uint64
// The sizes of the Blocks.
BlockRanges []int64
2017-05-09 03:52:47 -07:00
// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool
2016-11-15 01:34:25 -08:00
}
// Appender allows appending a batch of data. It must be completed with a
// call to Commit or Rollback and must not be reused afterwards.
//
// Operations on the Appender interface are not goroutine-safe.
type Appender interface {
// Add adds a sample pair for the given series. A reference number is
// returned which can be used to add further samples in the same or later
// transactions.
// Returned reference numbers are ephemeral and may be rejected in calls
// to AddFast() at any point. Adding the sample via Add() returns a new
// reference number.
// If the reference is the empty string it must not be used for caching.
2017-05-17 07:43:01 -07:00
Add(l labels.Labels, t int64, v float64) (string, error)
// Add adds a sample pair for the referenced series. It is generally faster
// than adding a sample by providing its full label set.
2017-05-17 07:43:01 -07:00
AddFast(ref string, t int64, v float64) error
// Commit submits the collected samples and purges the batch.
Commit() error
// Rollback rolls back all modifications made in the appender so far.
Rollback() error
}
// DB handles reads and writes of time series falling into
// a hashed partition of a seriedb.
type DB struct {
dir string
2017-05-09 03:52:47 -07:00
lockf *lockfile.Lockfile
2017-01-02 13:24:35 -08:00
logger log.Logger
metrics *dbMetrics
2017-01-17 21:18:32 -08:00
opts *Options
2016-12-09 01:00:14 -08:00
2017-07-14 00:00:22 -07:00
// Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex
blocks []Block
// Mutex that must be held when modifying just the head blocks
// or the general layout.
2017-07-14 00:00:22 -07:00
// mtx must be held before acquiring.
headmtx sync.RWMutex
heads []headBlock
2017-03-02 00:13:29 -08:00
compactor Compactor
compactc chan struct{}
donec chan struct{}
stopc chan struct{}
// cmtx is used to control compactions and deletions.
cmtx sync.Mutex
compactionsEnabled bool
2016-12-09 01:00:14 -08:00
}
type dbMetrics struct {
2017-05-26 06:13:03 -07:00
activeAppenders prometheus.Gauge
loadedBlocks prometheus.GaugeFunc
reloads prometheus.Counter
reloadsFailed prometheus.Counter
reloadDuration prometheus.Summary
samplesAppended prometheus.Counter
compactionsTriggered prometheus.Counter
2016-12-31 00:48:49 -08:00
}
2017-05-26 06:13:03 -07:00
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m := &dbMetrics{}
2017-01-03 06:43:26 -08:00
2017-05-26 06:13:03 -07:00
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_active_appenders",
Help: "Number of currently active appender transactions",
})
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_blocks_loaded",
Help: "Number of currently loaded data blocks",
}, func() float64 {
db.mtx.RLock()
defer db.mtx.RUnlock()
return float64(len(db.blocks))
})
m.reloads = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_reloads_total",
Help: "Number of times the database reloaded block data from disk.",
})
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_reloads_failures_total",
Help: "Number of times the database failed to reload black data from disk.",
})
m.reloadDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_reload_duration_seconds",
Help: "Duration of block reloads.",
})
2017-01-03 06:43:26 -08:00
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_samples_appended_total",
Help: "Total number of appended sampledb.",
2017-01-03 06:43:26 -08:00
})
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
})
2016-12-31 00:48:49 -08:00
if r != nil {
r.MustRegister(
2017-05-26 06:13:03 -07:00
m.activeAppenders,
m.loadedBlocks,
m.reloads,
m.reloadsFailed,
m.reloadDuration,
2016-12-31 00:48:49 -08:00
m.samplesAppended,
2017-01-09 10:14:21 -08:00
m.compactionsTriggered,
2016-12-31 00:48:49 -08:00
)
}
return m
}
// Open returns a new DB in the given directory.
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
2017-02-19 07:04:37 -08:00
if l == nil {
l = log.NewLogfmtLogger(os.Stdout)
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
2017-02-19 07:04:37 -08:00
}
2017-01-17 21:18:32 -08:00
if opts == nil {
opts = DefaultOptions
}
db = &DB{
dir: dir,
logger: l,
opts: opts,
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
compactionsEnabled: true,
}
2017-05-26 06:13:03 -07:00
db.metrics = newDBMetrics(db, r)
2017-05-09 03:52:47 -07:00
if !opts.NoLockfile {
absdir, err := filepath.Abs(dir)
if err != nil {
return nil, err
}
2017-05-09 03:52:47 -07:00
lockf, err := lockfile.New(filepath.Join(absdir, "lock"))
if err != nil {
return nil, err
}
if err := lockf.TryLock(); err != nil {
return nil, errors.Wrapf(err, "open DB in %s", dir)
}
db.lockf = &lockf
}
copts := &compactorOptions{
blockRanges: opts.BlockRanges,
}
if len(copts.blockRanges) == 0 {
return nil, errors.New("at least one block-range must exist")
}
for float64(copts.blockRanges[len(copts.blockRanges)-1])/float64(opts.RetentionDuration) > 0.2 {
if len(copts.blockRanges) == 1 {
break
}
// Max overflow is restricted to 20%.
copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1]
}
db.compactor = newCompactor(dir, r, l, copts)
2017-03-02 00:13:29 -08:00
if err := db.reloadBlocks(); err != nil {
2016-12-14 23:31:26 -08:00
return nil, err
}
go db.run()
return db, nil
}
2017-06-08 03:14:13 -07:00
// Dir returns the directory of the database.
func (db *DB) Dir() string {
return db.dir
}
func (db *DB) run() {
defer close(db.donec)
tick := time.NewTicker(30 * time.Second)
defer tick.Stop()
for {
select {
case <-tick.C:
select {
case db.compactc <- struct{}{}:
default:
}
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
2017-03-17 07:30:05 -07:00
changes1, err := db.retentionCutoff()
2017-03-19 05:50:35 -07:00
if err != nil {
db.logger.Log("msg", "retention cutoff failed", "err", err)
}
2017-03-17 07:30:05 -07:00
changes2, err := db.compact()
2017-03-19 05:50:35 -07:00
if err != nil {
db.logger.Log("msg", "compaction failed", "err", err)
}
2017-03-17 07:30:05 -07:00
if changes1 || changes2 {
2017-03-19 05:50:35 -07:00
if err := db.reloadBlocks(); err != nil {
db.logger.Log("msg", "reloading blocks failed", "err", err)
}
}
case <-db.stopc:
return
}
}
}
2017-03-17 07:30:05 -07:00
func (db *DB) retentionCutoff() (bool, error) {
if db.opts.RetentionDuration == 0 {
return false, nil
}
2017-03-17 07:56:19 -07:00
db.mtx.RLock()
defer db.mtx.RUnlock()
// We only consider the already persisted blocks. Head blocks generally
// only account for a fraction of the total data.
db.headmtx.RLock()
lenp := len(db.blocks) - len(db.heads)
db.headmtx.RUnlock()
if lenp == 0 {
2017-03-17 07:30:05 -07:00
return false, nil
}
last := db.blocks[lenp-1]
2017-03-17 07:30:05 -07:00
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
return retentionCutoff(db.dir, mint)
}
// headFullness returns up to which fraction of a blocks time range samples
// were already inserted.
func headFullness(h headBlock) float64 {
m := h.Meta()
a := float64(h.HighTimestamp() - m.MinTime)
b := float64(m.MaxTime - m.MinTime)
return a / b
}
// appendableHeads returns a copy of a slice of HeadBlocks that can still be appended to.
func (db *DB) appendableHeads() (r []headBlock) {
switch l := len(db.heads); l {
case 0:
case 1:
r = append(r, db.heads[0])
default:
if headFullness(db.heads[l-1]) < 0.5 {
r = append(r, db.heads[l-2])
}
r = append(r, db.heads[l-1])
}
return r
}
func (db *DB) completedHeads() (r []headBlock) {
2017-07-14 00:00:22 -07:00
db.mtx.RLock()
defer db.mtx.RUnlock()
db.headmtx.RLock()
defer db.headmtx.RUnlock()
if len(db.heads) < 2 {
return nil
}
// Select all old heads unless they still have pending appenders.
for _, h := range db.heads[:len(db.heads)-2] {
if h.ActiveWriters() > 0 {
return r
}
r = append(r, h)
}
// Add the 2nd last head if the last head is more than 50% filled.
// Compacting it early allows us to free its memory before allocating
// more for the next block and thus reduces spikes.
if h2 := db.heads[len(db.heads)-2]; headFullness(h2) >= 0.5 && h2.ActiveWriters() == 0 {
r = append(r, h2)
}
return r
}
func (db *DB) compact() (changes bool, err error) {
db.cmtx.Lock()
defer db.cmtx.Unlock()
2017-01-17 21:18:32 -08:00
if !db.compactionsEnabled {
return false, nil
}
// Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority.
for _, h := range db.completedHeads() {
select {
case <-db.stopc:
return changes, nil
default:
}
if err = db.compactor.Write(h); err != nil {
return changes, errors.Wrap(err, "persist head block")
}
changes = true
runtime.GC()
}
2017-03-02 00:13:29 -08:00
// Check for compactions of multiple blocks.
for {
plans, err := db.compactor.Plan()
2017-03-02 00:13:29 -08:00
if err != nil {
return changes, errors.Wrap(err, "plan compaction")
2017-03-02 00:13:29 -08:00
}
if len(plans) == 0 {
break
}
2017-03-02 00:13:29 -08:00
select {
case <-db.stopc:
return changes, nil
2017-03-02 00:13:29 -08:00
default:
}
2017-03-02 00:13:29 -08:00
// We just execute compactions sequentially to not cause too extreme
// CPU and memory spikes.
// TODO(fabxc): return more descriptive plans in the future that allow
// estimation of resource usage and conditional parallelization?
for _, p := range plans {
if err := db.compactor.Compact(p...); err != nil {
return changes, errors.Wrapf(err, "compact %s", p)
2017-03-02 00:13:29 -08:00
}
changes = true
runtime.GC()
2017-01-17 21:18:32 -08:00
}
}
return changes, nil
2017-02-09 17:54:26 -08:00
}
2017-03-17 07:30:05 -07:00
// retentionCutoff deletes all directories of blocks in dir that are strictly
// before mint.
func retentionCutoff(dir string, mint int64) (bool, error) {
2017-03-19 05:50:35 -07:00
df, err := fileutil.OpenDir(dir)
if err != nil {
return false, errors.Wrapf(err, "open directory")
}
2017-06-11 15:05:04 -07:00
defer df.Close()
2017-03-17 07:30:05 -07:00
dirs, err := blockDirs(dir)
if err != nil {
return false, errors.Wrapf(err, "list block dirs %s", dir)
}
changes := false
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
return changes, errors.Wrapf(err, "read block meta %s", dir)
}
// The first block we encounter marks that we crossed the boundary
// of deletable blocks.
if meta.MaxTime >= mint {
break
}
changes = true
if err := os.RemoveAll(dir); err != nil {
return changes, err
}
}
2017-03-19 05:50:35 -07:00
return changes, fileutil.Fsync(df)
2017-03-17 07:30:05 -07:00
}
2017-03-02 00:13:29 -08:00
func (db *DB) getBlock(id ulid.ULID) (Block, bool) {
for _, b := range db.blocks {
if b.Meta().ULID == id {
return b, true
}
}
return nil, false
}
2017-05-26 06:13:03 -07:00
func (db *DB) reloadBlocks() (err error) {
defer func(t time.Time) {
if err != nil {
db.metrics.reloadsFailed.Inc()
}
db.metrics.reloads.Inc()
db.metrics.reloadDuration.Observe(time.Since(t).Seconds())
}(time.Now())
var cs []io.Closer
defer func() { closeAll(cs...) }()
2017-03-02 00:13:29 -08:00
db.mtx.Lock()
defer db.mtx.Unlock()
2017-02-09 17:54:26 -08:00
2017-03-06 00:33:55 -08:00
db.headmtx.Lock()
defer db.headmtx.Unlock()
2017-03-02 00:13:29 -08:00
dirs, err := blockDirs(db.dir)
if err != nil {
return errors.Wrap(err, "find blocks")
}
2017-02-09 17:54:26 -08:00
var (
blocks []Block
exist = map[ulid.ULID]struct{}{}
2017-02-09 17:54:26 -08:00
)
2017-03-02 00:13:29 -08:00
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
return errors.Wrapf(err, "read meta information %s", dir)
2017-02-09 17:54:26 -08:00
}
2016-12-14 23:31:26 -08:00
b, ok := db.getBlock(meta.ULID)
if !ok {
if meta.Compaction.Generation == 0 {
b, err = db.openHeadBlock(dir)
} else {
b, err = newPersistedBlock(dir)
2017-03-02 00:13:29 -08:00
}
if err != nil {
return errors.Wrapf(err, "open block %s", dir)
2017-03-02 00:13:29 -08:00
}
}
2016-12-22 03:05:24 -08:00
blocks = append(blocks, b)
exist[meta.ULID] = struct{}{}
2016-12-09 01:00:14 -08:00
}
if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence")
}
// Close all opened blocks that no longer exist after we returned all locks.
for _, b := range db.blocks {
if _, ok := exist[b.Meta().ULID]; !ok {
cs = append(cs, b)
}
}
db.blocks = blocks
db.heads = nil
for _, b := range blocks {
if b.Meta().Compaction.Generation == 0 {
db.heads = append(db.heads, b.(*HeadBlock))
}
}
return nil
}
2017-01-03 06:43:26 -08:00
func validateBlockSequence(bs []Block) error {
if len(bs) == 0 {
return nil
}
sort.Slice(bs, func(i, j int) bool {
return bs[i].Meta().MinTime < bs[j].Meta().MinTime
})
prev := bs[0]
for _, b := range bs[1:] {
if b.Meta().MinTime < prev.Meta().MaxTime {
2017-05-26 06:13:03 -07:00
return errors.Errorf("block time ranges overlap (%d, %d)", b.Meta().MinTime, prev.Meta().MaxTime)
}
}
return nil
2017-01-02 13:24:35 -08:00
}
2017-01-05 23:08:02 -08:00
// Close the partition.
func (db *DB) Close() error {
close(db.stopc)
<-db.donec
db.mtx.Lock()
defer db.mtx.Unlock()
var g errgroup.Group
// blocks also contains all head blocks.
for _, pb := range db.blocks {
g.Go(pb.Close)
2016-12-14 23:31:26 -08:00
}
var merr MultiError
merr.Add(g.Wait())
2017-05-09 03:52:47 -07:00
if db.lockf != nil {
merr.Add(db.lockf.Unlock())
}
2017-01-02 13:24:35 -08:00
return merr.Err()
2016-12-09 01:00:14 -08:00
}
// DisableCompactions disables compactions.
func (db *DB) DisableCompactions() {
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.compactionsEnabled = false
db.logger.Log("msg", "compactions disabled")
}
// EnableCompactions enables compactions.
func (db *DB) EnableCompactions() {
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.compactionsEnabled = true
db.logger.Log("msg", "compactions enabled")
}
// Snapshot writes the current data to the directory.
func (db *DB) Snapshot(dir string) error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
2017-07-14 00:00:22 -07:00
db.mtx.Lock() // To block any appenders.
defer db.mtx.Unlock()
blocks := db.blocks[:]
for _, b := range blocks {
db.logger.Log("msg", "snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil {
return errors.Wrap(err, "error snapshotting headblock")
}
}
return nil
}
2017-01-17 21:18:32 -08:00
// Appender returns a new Appender on the database.
2017-01-09 11:04:16 -08:00
func (db *DB) Appender() Appender {
2017-05-26 06:13:03 -07:00
db.metrics.activeAppenders.Inc()
db.mtx.RLock()
return &dbAppender{db: db}
2017-01-09 11:04:16 -08:00
}
type dbAppender struct {
db *DB
heads []*metaAppender
samples int
2017-01-09 11:04:16 -08:00
}
type metaAppender struct {
meta BlockMeta
app Appender
}
2017-05-17 07:43:01 -07:00
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
h, err := a.appenderAt(t)
if err != nil {
2017-05-17 07:43:01 -07:00
return "", err
}
ref, err := h.app.Add(lset, t, v)
if err != nil {
2017-05-17 07:43:01 -07:00
return "", err
}
a.samples++
2017-05-17 07:43:01 -07:00
if ref == "" {
return "", nil
}
2017-05-17 07:43:01 -07:00
return string(append(h.meta.ULID[:], ref...)), nil
2017-01-09 11:04:16 -08:00
}
2017-05-17 07:43:01 -07:00
func (a *dbAppender) AddFast(ref string, t int64, v float64) error {
if len(ref) < 16 {
return errors.Wrap(ErrNotFound, "invalid ref length")
}
// The first 16 bytes a ref hold the ULID of the head block.
h, err := a.appenderAt(t)
if err != nil {
return err
}
// Validate the ref points to the same block we got for t.
if string(h.meta.ULID[:]) != ref[:16] {
return ErrNotFound
}
2017-05-17 07:43:01 -07:00
if err := h.app.AddFast(ref[16:], t, v); err != nil {
// The block the ref points to might fit the given timestamp.
// We mask the error to stick with our contract.
if errors.Cause(err) == ErrOutOfBounds {
err = ErrNotFound
}
return err
}
a.samples++
return nil
}
2016-12-09 04:41:38 -08:00
// appenderFor gets the appender for the head containing timestamp t.
// If the head block doesn't exist yet, it gets created.
func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) {
for _, h := range a.heads {
if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) {
return h, nil
}
}
// Currently opened appenders do not cover t. Ensure the head block is
// created and add missing appenders.
a.db.headmtx.Lock()
if err := a.db.ensureHead(t); err != nil {
a.db.headmtx.Unlock()
return nil, err
}
var hb headBlock
for _, h := range a.db.appendableHeads() {
m := h.Meta()
if intervalContains(m.MinTime, m.MaxTime-1, t) {
hb = h
break
}
}
a.db.headmtx.Unlock()
if hb == nil {
return nil, ErrOutOfBounds
}
// Instantiate appender after returning headmtx!
app := &metaAppender{
meta: hb.Meta(),
app: hb.Appender(),
}
a.heads = append(a.heads, app)
2017-01-03 06:43:26 -08:00
return app, nil
}
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
mint = (t / width) * width
return mint, mint + width
}
2016-12-09 04:41:38 -08:00
// ensureHead makes sure that there is a head block for the timestamp t if
// it is within or after the currently appendable window.
func (db *DB) ensureHead(t int64) error {
var (
mint, maxt = rangeForTimestamp(t, int64(db.opts.BlockRanges[0]))
addBuffer = len(db.blocks) == 0
last BlockMeta
)
if !addBuffer {
last = db.blocks[len(db.blocks)-1].Meta()
addBuffer = last.MaxTime <= mint-int64(db.opts.BlockRanges[0])
}
2017-05-18 07:31:02 -07:00
// Create another block of buffer in front if the DB is initialized or retrieving
// new data after a long gap.
// This ensures we always have a full block width of append window.
if addBuffer {
if _, err := db.createHeadBlock(mint-int64(db.opts.BlockRanges[0]), mint); err != nil {
return err
}
// If the previous block reaches into our new window, make it smaller.
2017-05-18 07:31:02 -07:00
} else if mt := last.MaxTime; mt > mint {
mint = mt
}
if mint >= maxt {
return nil
}
// Error if the requested time for a head is before the appendable window.
if len(db.heads) > 0 && t < db.heads[0].Meta().MinTime {
return ErrOutOfBounds
}
_, err := db.createHeadBlock(mint, maxt)
return err
2016-12-09 04:41:38 -08:00
}
func (a *dbAppender) Commit() error {
2017-05-26 06:13:03 -07:00
defer a.db.metrics.activeAppenders.Dec()
defer a.db.mtx.RUnlock()
// Commits to partial appenders must be concurrent as concurrent appenders
// may have conflicting locks on head appenders.
// For high-throughput use cases the errgroup causes significant blocking. Typically,
// we just deal with a single appender and special case it.
var err error
switch len(a.heads) {
case 1:
err = a.heads[0].app.Commit()
default:
var g errgroup.Group
for _, h := range a.heads {
g.Go(h.app.Commit)
}
err = g.Wait()
}
if err != nil {
return err
}
// XXX(fabxc): Push the metric down into head block to account properly
// for partial appends?
a.db.metrics.samplesAppended.Add(float64(a.samples))
return nil
}
func (a *dbAppender) Rollback() error {
2017-05-26 06:13:03 -07:00
defer a.db.metrics.activeAppenders.Dec()
defer a.db.mtx.RUnlock()
var g errgroup.Group
for _, h := range a.heads {
g.Go(h.app.Rollback)
}
return g.Wait()
}
// Delete implements deletion of metrics.
func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
2017-07-14 00:00:22 -07:00
db.mtx.Lock()
defer db.mtx.Unlock()
blocks := db.blocksForInterval(mint, maxt)
var g errgroup.Group
for _, b := range blocks {
g.Go(func(b Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) }
}(b))
}
if err := g.Wait(); err != nil {
return err
}
return nil
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
// Checks Overlap: http://stackoverflow.com/questions/3269434/
return amin <= bmax && bmin <= amax
}
func intervalContains(min, max, t int64) bool {
return t >= min && t <= max
}
// blocksForInterval returns all blocks within the partition that may contain
2016-12-13 06:26:58 -08:00
// data for the given time range.
func (db *DB) blocksForInterval(mint, maxt int64) []Block {
var bs []Block
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
2017-01-03 06:43:26 -08:00
bs = append(bs, b)
}
}
return bs
2016-12-13 06:26:58 -08:00
}
2017-05-13 09:14:18 -07:00
// openHeadBlock opens the head block at dir.
func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
var (
wdir = walDir(dir)
2017-05-13 09:14:18 -07:00
l = log.With(db.logger, "wal", wdir)
)
wal, err := OpenSegmentWAL(wdir, l, 5*time.Second)
if err != nil {
return nil, errors.Wrap(err, "open WAL %s")
}
h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal)
if err != nil {
return nil, errors.Wrapf(err, "open head block %s", dir)
}
return h, nil
}
// createHeadBlock starts a new head block to append to.
func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) {
dir, err := TouchHeadBlock(db.dir, mint, maxt)
2017-01-19 05:01:38 -08:00
if err != nil {
2017-05-13 09:14:18 -07:00
return nil, errors.Wrapf(err, "touch head block %s", dir)
}
newHead, err := db.openHeadBlock(dir)
if err != nil {
return nil, err
}
db.logger.Log("msg", "created head block", "ulid", newHead.meta.ULID, "mint", mint, "maxt", maxt)
db.blocks = append(db.blocks, newHead) // TODO(fabxc): this is a race!
db.heads = append(db.heads, newHead)
select {
case db.compactc <- struct{}{}:
default:
}
return newHead, nil
}
func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
}
_, err := ulid.Parse(fi.Name())
return err == nil
}
func blockDirs(dir string) ([]string, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
var dirs []string
for _, fi := range files {
if isBlockDir(fi) {
dirs = append(dirs, filepath.Join(dir, fi.Name()))
}
}
return dirs, nil
2017-01-03 06:43:26 -08:00
}
2016-12-09 04:41:38 -08:00
2017-02-13 23:53:19 -08:00
func sequenceFiles(dir, prefix string) ([]string, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
var res []string
for _, fi := range files {
if isSequenceFile(fi, prefix) {
res = append(res, filepath.Join(dir, fi.Name()))
}
}
return res, nil
}
func isSequenceFile(fi os.FileInfo, prefix string) bool {
if !strings.HasPrefix(fi.Name(), prefix) {
return false
}
if _, err := strconv.ParseUint(fi.Name()[len(prefix):], 10, 32); err != nil {
return false
}
return true
}
func nextSequenceFile(dir, prefix string) (string, int, error) {
names, err := fileutil.ReadDir(dir)
if err != nil {
return "", 0, err
}
i := uint64(0)
for _, n := range names {
2017-02-09 17:54:26 -08:00
if !strings.HasPrefix(n, prefix) {
continue
}
2017-02-09 17:54:26 -08:00
j, err := strconv.ParseUint(n[len(prefix):], 10, 32)
if err != nil {
continue
}
i = j
}
2017-02-09 17:54:26 -08:00
return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil
}
2016-12-09 04:41:38 -08:00
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error
// Returns a concatenated string of the contained errors
func (es MultiError) Error() string {
var buf bytes.Buffer
if len(es) > 1 {
fmt.Fprintf(&buf, "%d errors: ", len(es))
2016-12-08 01:04:24 -08:00
}
for i, err := range es {
if i != 0 {
buf.WriteString("; ")
}
buf.WriteString(err.Error())
}
return buf.String()
2016-11-15 01:34:25 -08:00
}
2016-12-14 23:31:26 -08:00
// Add adds the error to the error list if it is not nil.
func (es *MultiError) Add(err error) {
if err == nil {
return
}
if merr, ok := err.(MultiError); ok {
*es = append(*es, merr...)
} else {
*es = append(*es, err)
2016-12-14 23:31:26 -08:00
}
}
// Err returns the error list as an error or nil if it is empty.
2016-12-14 23:31:26 -08:00
func (es MultiError) Err() error {
if len(es) == 0 {
return nil
}
return es
}
2017-05-17 07:43:01 -07:00
func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) }
func yoloBytes(s string) []byte { return *((*[]byte)(unsafe.Pointer(&s))) }
2017-02-27 01:46:15 -08:00
func closeAll(cs ...io.Closer) error {
var merr MultiError
for _, c := range cs {
merr.Add(c.Close())
}
return merr.Err()
}