prometheus/db.go

715 lines
14 KiB
Go
Raw Normal View History

2016-11-15 01:34:25 -08:00
// Package tsdb implements a time series storage for float64 sample data.
package tsdb
import (
"bytes"
2016-12-04 04:16:11 -08:00
"fmt"
"io/ioutil"
"math"
2016-12-04 04:16:11 -08:00
"os"
"path/filepath"
"reflect"
2016-12-14 23:31:26 -08:00
"strconv"
"strings"
"sync"
"time"
"unsafe"
2016-11-15 01:34:25 -08:00
2016-12-14 23:31:26 -08:00
"golang.org/x/sync/errgroup"
2017-01-03 06:43:26 -08:00
"github.com/coreos/etcd/pkg/fileutil"
2016-12-21 00:39:01 -08:00
"github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log"
2017-01-03 06:43:26 -08:00
"github.com/pkg/errors"
2016-12-31 00:48:49 -08:00
"github.com/prometheus/client_golang/prometheus"
2016-11-15 01:34:25 -08:00
)
2016-12-09 01:00:14 -08:00
// DefaultOptions used for the DB. They are sane for setups using
// millisecond precision timestampdb.
2016-11-15 01:34:25 -08:00
var DefaultOptions = &Options{
2016-12-26 07:55:44 -08:00
Retention: 15 * 24 * 3600 * 1000, // 15 days
DisableWAL: false,
2016-11-15 01:34:25 -08:00
}
// Options of the DB storage.
type Options struct {
Retention int64
DisableWAL bool
WALFlushInterval time.Duration
2016-11-15 01:34:25 -08:00
}
// Appender allows committing batches of samples to a database.
// The data held by the appender is reset after Commit returndb.
type Appender interface {
// AddSeries registers a new known series label set with the appender
// and returns a reference number used to add samples to it over the
// life time of the Appender.
// AddSeries(Labels) uint64
// Add adds a sample pair for the referenced seriedb.
2016-12-29 02:03:39 -08:00
Add(lset labels.Labels, t int64, v float64) error
// Commit submits the collected samples and purges the batch.
Commit() error
}
type hashedSample struct {
hash uint64
2016-12-21 00:39:01 -08:00
labels labels.Labels
2016-12-22 03:05:24 -08:00
ref uint32
t int64
v float64
2016-12-09 07:54:38 -08:00
}
2016-12-09 01:00:14 -08:00
const sep = '\xff'
// DB handles reads and writes of time series falling into
// a hashed partition of a seriedb.
type DB struct {
dir string
2017-01-02 13:24:35 -08:00
logger log.Logger
metrics *dbMetrics
2016-12-09 01:00:14 -08:00
2016-12-14 23:31:26 -08:00
mtx sync.RWMutex
2017-01-03 06:43:26 -08:00
persisted []*persistedBlock
heads []*headBlock
compactor *compactor
compactc chan struct{}
cutc chan struct{}
donec chan struct{}
stopc chan struct{}
2016-12-09 01:00:14 -08:00
}
type dbMetrics struct {
samplesAppended prometheus.Counter
compactionsTriggered prometheus.Counter
2016-12-31 00:48:49 -08:00
}
func newDBMetrics(r prometheus.Registerer) *dbMetrics {
m := &dbMetrics{}
2017-01-03 06:43:26 -08:00
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_samples_appended_total",
Help: "Total number of appended sampledb.",
2017-01-03 06:43:26 -08:00
})
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
})
2016-12-31 00:48:49 -08:00
if r != nil {
r.MustRegister(
m.samplesAppended,
2017-01-09 10:14:21 -08:00
m.compactionsTriggered,
2016-12-31 00:48:49 -08:00
)
}
return m
}
// Open returns a new DB in the given directory.
func Open(dir string, logger log.Logger) (db *DB, err error) {
2017-01-05 23:08:02 -08:00
// Create directory if partition is new.
if !fileutil.Exist(dir) {
if err := os.MkdirAll(dir, 0777); err != nil {
2016-12-14 23:31:26 -08:00
return nil, err
}
}
2017-01-09 10:14:21 -08:00
r := prometheus.DefaultRegisterer
2016-12-14 23:31:26 -08:00
db = &DB{
dir: dir,
logger: logger,
2017-01-09 10:14:21 -08:00
metrics: newDBMetrics(r),
compactc: make(chan struct{}, 1),
cutc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
}
if err := db.initBlocks(); err != nil {
2016-12-14 23:31:26 -08:00
return nil, err
}
2017-01-09 10:14:21 -08:00
if db.compactor, err = newCompactor(db, r); err != nil {
return nil, err
}
go db.run()
return db, nil
}
func (db *DB) run() {
defer close(db.donec)
for {
select {
case <-db.cutc:
db.mtx.Lock()
err := db.cut()
db.mtx.Unlock()
if err != nil {
db.logger.Log("msg", "cut failed", "err", err)
} else {
select {
case db.compactc <- struct{}{}:
default:
}
}
// Drain cut channel so we don't trigger immediately again.
select {
case <-db.cutc:
default:
}
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
for {
blocks := db.compactor.pick()
if len(blocks) == 0 {
break
}
// TODO(fabxc): pick emits blocks in order. compact acts on
// inverted order. Put inversion into compactor?
var bs []Block
for _, b := range blocks {
bs = append([]Block{b}, bs...)
}
select {
case <-db.stopc:
return
default:
}
if err := db.compact(bs); err != nil {
db.logger.Log("msg", "compaction failed", "err", err)
}
}
case <-db.stopc:
return
}
}
}
func (db *DB) compact(blocks []Block) error {
if len(blocks) == 0 {
return nil
}
tmpdir := blocks[0].Dir() + ".tmp"
// TODO(fabxc): find a better place to do this transparently.
for _, b := range blocks {
if h, ok := b.(*headBlock); ok {
h.updateMapping()
}
}
if err := db.compactor.compact(tmpdir, blocks...); err != nil {
return err
}
db.mtx.Lock()
defer db.mtx.Unlock()
if err := renameDir(tmpdir, blocks[0].Dir()); err != nil {
return errors.Wrap(err, "rename dir")
}
for _, b := range blocks[1:] {
if err := os.RemoveAll(b.Dir()); err != nil {
return errors.Wrap(err, "delete dir")
}
}
var merr MultiError
for _, b := range blocks {
merr.Add(errors.Wrapf(db.reinit(b.Dir()), "reinit block at %q", b.Dir()))
}
return merr.Err()
}
2016-12-14 23:31:26 -08:00
func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
2016-12-22 03:05:24 -08:00
}
if !strings.HasPrefix(fi.Name(), "b-") {
return false
}
if _, err := strconv.ParseUint(fi.Name()[2:], 10, 32); err != nil {
return false
2016-12-22 03:05:24 -08:00
}
return true
}
func (db *DB) initBlocks() error {
var (
pbs []*persistedBlock
heads []*headBlock
)
2016-12-22 03:05:24 -08:00
files, err := ioutil.ReadDir(db.dir)
if err != nil {
return err
2016-12-09 01:00:14 -08:00
}
for _, fi := range files {
if !isBlockDir(fi) {
continue
}
dir := filepath.Join(db.dir, fi.Name())
if fileutil.Exist(filepath.Join(dir, walFileName)) {
h, err := openHeadBlock(dir, db.logger)
if err != nil {
return err
}
heads = append(heads, h)
continue
}
b, err := newPersistedBlock(dir)
if err != nil {
return err
}
pbs = append(pbs, b)
}
// Validate that blocks are sequential in time.
lastTime := int64(math.MinInt64)
2016-12-09 04:41:38 -08:00
for _, b := range pbs {
if b.Stats().MinTime < lastTime {
return errors.Errorf("illegal order for block at %q", b.Dir())
}
lastTime = b.Stats().MaxTime
}
for _, b := range heads {
if b.Stats().MinTime < lastTime {
return errors.Errorf("illegal order for block at %q", b.Dir())
}
lastTime = b.Stats().MaxTime
}
2017-01-03 06:43:26 -08:00
db.persisted = pbs
db.heads = heads
2017-01-03 06:43:26 -08:00
if len(heads) == 0 {
return db.cut()
2017-01-02 13:24:35 -08:00
}
return nil
2017-01-02 13:24:35 -08:00
}
2017-01-05 23:08:02 -08:00
// Close the partition.
func (db *DB) Close() error {
close(db.stopc)
<-db.donec
2017-01-02 13:24:35 -08:00
var merr MultiError
2016-12-14 23:31:26 -08:00
db.mtx.Lock()
defer db.mtx.Unlock()
for _, pb := range db.persisted {
2017-01-02 13:24:35 -08:00
merr.Add(pb.Close())
2016-12-14 23:31:26 -08:00
}
for _, hb := range db.heads {
2017-01-03 06:43:26 -08:00
merr.Add(hb.Close())
}
2016-12-14 23:31:26 -08:00
2017-01-02 13:24:35 -08:00
return merr.Err()
2016-12-09 01:00:14 -08:00
}
2017-01-09 11:04:16 -08:00
func (db *DB) Appender() Appender {
return &dbAppender{db: db}
}
type dbAppender struct {
db *DB
buf []hashedSample
}
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) error {
return a.add(hashedSample{
hash: lset.Hash(),
labels: lset,
t: t,
v: v,
})
}
func (a *dbAppender) add(s hashedSample) error {
a.buf = append(a.buf, s)
return nil
}
func (a *dbAppender) Commit() error {
err := a.db.appendBatch(a.buf)
a.buf = a.buf[:0]
return err
}
func (db *DB) appendBatch(samples []hashedSample) error {
if len(samples) == 0 {
return nil
}
db.mtx.RLock()
defer db.mtx.RUnlock()
2016-12-09 04:41:38 -08:00
head := db.heads[len(db.heads)-1]
2017-01-03 06:43:26 -08:00
2016-12-21 16:12:28 -08:00
// TODO(fabxc): distinguish samples between concurrent heads for
// different time blocks. Those may occurr during transition to still
// allow late samples to arrive for a previous block.
2017-01-09 11:04:16 -08:00
n, err := head.appendBatch(samples)
if err == nil {
2017-01-09 11:04:16 -08:00
db.metrics.samplesAppended.Add(float64(n))
}
2016-12-09 04:41:38 -08:00
2017-01-07 09:02:17 -08:00
if head.fullness() > 1.0 {
select {
case db.cutc <- struct{}{}:
default:
2016-12-09 04:41:38 -08:00
}
}
2016-12-21 16:12:28 -08:00
return err
2016-12-09 04:41:38 -08:00
}
func (db *DB) headForDir(dir string) (int, bool) {
for i, b := range db.heads {
if b.Dir() == dir {
2017-01-03 06:43:26 -08:00
return i, true
}
}
return -1, false
}
func (db *DB) persistedForDir(dir string) (int, bool) {
for i, b := range db.persisted {
if b.Dir() == dir {
2017-01-03 06:43:26 -08:00
return i, true
}
}
return -1, false
}
func (db *DB) reinit(dir string) error {
2017-01-03 06:43:26 -08:00
if !fileutil.Exist(dir) {
if i, ok := db.headForDir(dir); ok {
if err := db.heads[i].Close(); err != nil {
2017-01-03 06:43:26 -08:00
return err
}
db.heads = append(db.heads[:i], db.heads[i+1:]...)
2017-01-03 06:43:26 -08:00
}
if i, ok := db.persistedForDir(dir); ok {
if err := db.persisted[i].Close(); err != nil {
2017-01-03 06:43:26 -08:00
return err
}
db.persisted = append(db.persisted[:i], db.persisted[i+1:]...)
2017-01-03 06:43:26 -08:00
}
return nil
}
// Remove a previous head block.
if i, ok := db.headForDir(dir); ok {
if err := db.heads[i].Close(); err != nil {
2017-01-03 06:43:26 -08:00
return err
}
db.heads = append(db.heads[:i], db.heads[i+1:]...)
2017-01-03 06:43:26 -08:00
}
// Close an old persisted block.
i, ok := db.persistedForDir(dir)
2017-01-03 06:43:26 -08:00
if ok {
if err := db.persisted[i].Close(); err != nil {
2017-01-03 06:43:26 -08:00
return err
}
}
pb, err := newPersistedBlock(dir)
if err != nil {
return errors.Wrap(err, "open persisted block")
}
if i >= 0 {
db.persisted[i] = pb
2017-01-03 06:43:26 -08:00
} else {
db.persisted = append(db.persisted, pb)
2017-01-03 06:43:26 -08:00
}
return nil
}
func (db *DB) compactable() []Block {
db.mtx.RLock()
defer db.mtx.RUnlock()
var blocks []Block
for _, pb := range db.persisted {
blocks = append([]Block{pb}, blocks...)
2017-01-03 06:43:26 -08:00
}
// threshold := db.heads[len(db.heads)-1].bstatdb.MaxTime - headGracePeriod
2017-01-03 06:43:26 -08:00
// for _, hb := range db.heads {
// if hb.bstatdb.MaxTime < threshold {
2017-01-03 06:43:26 -08:00
// blocks = append(blocks, hb)
// }
// }
for _, hb := range db.heads[:len(db.heads)-1] {
blocks = append([]Block{hb}, blocks...)
2017-01-03 06:43:26 -08:00
}
return blocks
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
if bmin >= amin && bmin <= amax {
return true
}
if amin >= bmin && amin <= bmax {
return true
}
return false
}
func intervalContains(min, max, t int64) bool {
return t >= min && t <= max
}
// blocksForInterval returns all blocks within the partition that may contain
2016-12-13 06:26:58 -08:00
// data for the given time range.
func (db *DB) blocksForInterval(mint, maxt int64) []Block {
var bs []Block
for _, b := range db.persisted {
s := b.Stats()
if intervalOverlap(mint, maxt, s.MinTime, s.MaxTime) {
bs = append(bs, b)
}
}
for _, b := range db.heads {
s := b.Stats()
if intervalOverlap(mint, maxt, s.MinTime, s.MaxTime) {
2017-01-03 06:43:26 -08:00
bs = append(bs, b)
}
}
return bs
2016-12-13 06:26:58 -08:00
}
2016-12-09 04:41:38 -08:00
// TODO(fabxc): make configurable.
2017-01-03 06:43:26 -08:00
const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale
2016-12-09 04:41:38 -08:00
2017-01-03 06:43:26 -08:00
// cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period.
func (db *DB) cut() error {
dir, err := db.nextBlockDir()
2016-12-22 03:05:24 -08:00
if err != nil {
return err
}
newHead, err := openHeadBlock(dir, db.logger)
if err != nil {
return err
}
db.heads = append(db.heads, newHead)
2016-12-09 04:41:38 -08:00
2017-01-03 06:43:26 -08:00
return nil
}
2016-12-09 04:41:38 -08:00
func (db *DB) nextBlockDir() (string, error) {
names, err := fileutil.ReadDir(db.dir)
if err != nil {
return "", err
}
i := uint64(0)
for _, n := range names {
if !strings.HasPrefix(n, "b-") {
continue
}
j, err := strconv.ParseUint(n[2:], 10, 32)
if err != nil {
continue
}
i = j
}
return filepath.Join(db.dir, fmt.Sprintf("b-%0.6d", i+1)), nil
}
2016-12-09 04:41:38 -08:00
// PartitionedDB is a time series storage.
type PartitionedDB struct {
logger log.Logger
opts *Options
dir string
partitionPow uint
Partitions []*DB
}
func isPowTwo(x int) bool {
return x > 0 && (x&(x-1)) == 0
}
// OpenPartitioned or create a new DB.
func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*PartitionedDB, error) {
if !isPowTwo(n) {
return nil, errors.Errorf("%d is not a power of two", n)
}
if opts == nil {
opts = DefaultOptions
}
if l == nil {
l = log.NewLogfmtLogger(os.Stdout)
l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
}
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
c := &PartitionedDB{
logger: l,
opts: opts,
dir: dir,
partitionPow: uint(math.Log2(float64(n))),
}
// Initialize vertical partitiondb.
// TODO(fabxc): validate partition number to be power of 2, which is required
// for the bitshift-modulo when finding the right partition.
for i := 0; i < n; i++ {
l := log.NewContext(l).With("partition", i)
d := partitionDir(dir, i)
s, err := Open(d, l)
if err != nil {
return nil, fmt.Errorf("initializing partition %q failed: %s", d, err)
}
c.Partitions = append(c.Partitions, s)
}
return c, nil
}
func partitionDir(base string, i int) string {
return filepath.Join(base, fmt.Sprintf("p-%0.4d", i))
}
// Close the database.
func (db *PartitionedDB) Close() error {
var g errgroup.Group
for _, partition := range db.Partitions {
g.Go(partition.Close)
}
return g.Wait()
}
// Appender returns a new appender against the database.
func (db *PartitionedDB) Appender() Appender {
2017-01-09 11:04:16 -08:00
app := &partitionedAppender{db: db}
for _, p := range db.Partitions {
app.buckets = append(app.buckets, p.Appender().(*dbAppender))
}
2017-01-09 11:04:16 -08:00
return app
}
type partitionedAppender struct {
db *PartitionedDB
2017-01-09 11:04:16 -08:00
buckets []*dbAppender
}
func (ba *partitionedAppender) SetSeries(lset labels.Labels) (uint32, error) {
return 0, nil
}
2017-01-09 11:04:16 -08:00
func (a *partitionedAppender) Add(lset labels.Labels, t int64, v float64) error {
h := lset.Hash()
2017-01-09 11:04:16 -08:00
s := h >> (64 - a.db.partitionPow)
2017-01-09 11:04:16 -08:00
return a.buckets[s].add(hashedSample{
hash: h,
labels: lset,
t: t,
v: v,
})
}
func (ba *partitionedAppender) Commit() error {
var merr MultiError
// Spill buckets into partitiondb.
2017-01-09 11:04:16 -08:00
for _, b := range ba.buckets {
merr.Add(b.Commit())
}
return merr.Err()
}
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error
// Returns a concatenated string of the contained errors
func (es MultiError) Error() string {
var buf bytes.Buffer
if len(es) > 1 {
fmt.Fprintf(&buf, "%d errors: ", len(es))
2016-12-08 01:04:24 -08:00
}
for i, err := range es {
if i != 0 {
buf.WriteString("; ")
}
buf.WriteString(err.Error())
}
return buf.String()
2016-11-15 01:34:25 -08:00
}
2016-12-14 23:31:26 -08:00
// Add adds the error to the error list if it is not nil.
func (es *MultiError) Add(err error) {
if err == nil {
return
}
if merr, ok := err.(MultiError); ok {
*es = append(*es, merr...)
} else {
*es = append(*es, err)
2016-12-14 23:31:26 -08:00
}
}
// Err returns the error list as an error or nil if it is empty.
2016-12-14 23:31:26 -08:00
func (es MultiError) Err() error {
if len(es) == 0 {
return nil
}
return es
}
func yoloString(b []byte) string {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
h := reflect.StringHeader{
Data: sh.Data,
Len: sh.Len,
}
return *((*string)(unsafe.Pointer(&h)))
}
func yoloBytes(s string) []byte {
sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
h := reflect.SliceHeader{
Cap: sh.Len,
Len: sh.Len,
Data: sh.Data,
}
return *((*[]byte)(unsafe.Pointer(&h)))
}