storage: update TSDB

This commit is contained in:
Fabian Reinartz 2017-05-22 11:53:08 +02:00
parent ea09299ca5
commit d289dc55c3
14 changed files with 353 additions and 342 deletions

View file

@ -137,10 +137,6 @@ func init() {
&cfg.tsdb.MaxBlockDuration, "storage.tsdb.max-block-duration", 0,
"Maximum duration compacted blocks may span. (Defaults to 10% of the retention period)",
)
cfg.fs.IntVar(
&cfg.tsdb.AppendableBlocks, "storage.tsdb.appendable-blocks", 2,
"Number of head blocks that can be appended to.",
)
cfg.fs.DurationVar(
&cfg.tsdb.Retention, "storage.tsdb.retention", 15*24*time.Hour,
"How long to retain samples in the storage.",

View file

@ -27,8 +27,8 @@ func (a nopAppendable) Appender() (storage.Appender, error) {
type nopAppender struct{}
func (a nopAppender) Add(labels.Labels, int64, float64) (uint64, error) { return 0, nil }
func (a nopAppender) AddFast(uint64, int64, float64) error { return nil }
func (a nopAppender) Add(labels.Labels, int64, float64) (string, error) { return "", nil }
func (a nopAppender) AddFast(string, int64, float64) error { return nil }
func (a nopAppender) Commit() error { return nil }
func (a nopAppender) Rollback() error { return nil }
@ -36,18 +36,18 @@ type collectResultAppender struct {
result []sample
}
func (a *collectResultAppender) AddFast(ref uint64, t int64, v float64) error {
func (a *collectResultAppender) AddFast(ref string, t int64, v float64) error {
// Not implemented.
return storage.ErrNotFound
}
func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) {
func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (string, error) {
a.result = append(a.result, sample{
metric: m,
t: t,
v: v,
})
return 0, nil
return "", nil
}
func (a *collectResultAppender) Commit() error { return nil }

View file

@ -427,8 +427,8 @@ type scrapeLoop struct {
reportAppender func() storage.Appender
// TODO: Keep only the values from the last scrape to avoid a memory leak.
refCache map[string]uint64 // Parsed string to ref.
lsetCache map[uint64]lsetCacheEntry // Ref to labelset and string
refCache map[string]string // Parsed string to ref.
lsetCache map[string]lsetCacheEntry // Ref to labelset and string
seriesInPreviousScrape map[string]labels.Labels
ctx context.Context
@ -445,8 +445,8 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag
scraper: sc,
appender: app,
reportAppender: reportApp,
refCache: map[string]uint64{},
lsetCache: map[uint64]lsetCacheEntry{},
refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{},
stopped: make(chan struct{}),
ctx: ctx,
l: l,

View file

@ -604,8 +604,8 @@ func TestScrapeLoopAppend(t *testing.T) {
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]uint64{},
lsetCache: map[uint64]lsetCacheEntry{},
refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{},
}
now := time.Now()
@ -643,8 +643,8 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]uint64{},
lsetCache: map[uint64]lsetCacheEntry{},
refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{},
}
now := time.Now()
@ -687,8 +687,8 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]uint64{},
lsetCache: map[uint64]lsetCacheEntry{},
refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{},
}
now := time.Now()
@ -718,16 +718,16 @@ type errorAppender struct {
collectResultAppender
}
func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
if lset.Get(model.MetricNameLabel) == "out_of_order" {
return 0, storage.ErrOutOfOrderSample
return "", storage.ErrOutOfOrderSample
} else if lset.Get(model.MetricNameLabel) == "amend" {
return 0, storage.ErrDuplicateSampleForTimestamp
return "", storage.ErrDuplicateSampleForTimestamp
}
return app.collectResultAppender.Add(lset, t, v)
}
func (app *errorAppender) AddFast(ref uint64, t int64, v float64) error {
func (app *errorAppender) AddFast(ref string, t int64, v float64) error {
return app.collectResultAppender.AddFast(ref, t, v)
}
@ -736,8 +736,8 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]uint64{},
lsetCache: map[uint64]lsetCacheEntry{},
refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{},
l: log.Base(),
}

View file

@ -236,19 +236,19 @@ type limitAppender struct {
i int
}
func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
if app.i+1 > app.limit {
return 0, errors.New("sample limit exceeded")
return "", errors.New("sample limit exceeded")
}
ref, err := app.Appender.Add(lset, t, v)
if err != nil {
return 0, fmt.Errorf("sample limit of %d exceeded", app.limit)
return "", fmt.Errorf("sample limit of %d exceeded", app.limit)
}
app.i++
return ref, nil
}
func (app *limitAppender) AddFast(ref uint64, t int64, v float64) error {
func (app *limitAppender) AddFast(ref string, t int64, v float64) error {
if app.i+1 > app.limit {
return errors.New("sample limit exceeded")
}
@ -267,7 +267,7 @@ type ruleLabelsAppender struct {
labels labels.Labels
}
func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
lb := labels.NewBuilder(lset)
for _, l := range app.labels {
@ -289,7 +289,7 @@ type honorLabelsAppender struct {
// Merges the sample's metric with the given labels if the label is not
// already present in the metric.
// This also considers labels explicitly set to the empty string.
func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
lb := labels.NewBuilder(lset)
for _, l := range app.labels {
@ -309,10 +309,10 @@ type relabelAppender struct {
var errSeriesDropped = errors.New("series dropped")
func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
lset = relabel.Process(lset, app.relabelings...)
if lset == nil {
return 0, errSeriesDropped
return "", errSeriesDropped
}
return app.Appender.Add(lset, t, v)
}

View file

@ -52,9 +52,9 @@ type Querier interface {
// Appender provides batched appends against a storage.
type Appender interface {
Add(l labels.Labels, t int64, v float64) (uint64, error)
Add(l labels.Labels, t int64, v float64) (string, error)
AddFast(ref uint64, t int64, v float64) error
AddFast(ref string, t int64, v float64) error
// Commit submits the collected samples and purges the batch.
Commit() error

View file

@ -17,6 +17,7 @@ import (
"time"
"unsafe"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
@ -41,13 +42,6 @@ type Options struct {
// The maximum timestamp range of compacted blocks.
MaxBlockDuration time.Duration
// Number of head blocks that can be appended to.
// Should be two or higher to prevent write errors in general scenarios.
//
// After a new block is started for timestamp t0 or higher, appends with
// timestamps as early as t0 - (n-1) * MinBlockDuration are valid.
AppendableBlocks int
// Duration for how long to retain data.
Retention time.Duration
@ -61,7 +55,6 @@ func Open(path string, r prometheus.Registerer, opts *Options) (storage.Storage,
WALFlushInterval: 10 * time.Second,
MinBlockDuration: uint64(opts.MinBlockDuration.Seconds() * 1000),
MaxBlockDuration: uint64(opts.MaxBlockDuration.Seconds() * 1000),
AppendableBlocks: opts.AppendableBlocks,
RetentionDuration: uint64(opts.Retention.Seconds() * 1000),
NoLockfile: opts.NoLockfile,
})
@ -121,24 +114,24 @@ type appender struct {
a tsdb.Appender
}
func (a appender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
func (a appender) Add(lset labels.Labels, t int64, v float64) (string, error) {
ref, err := a.a.Add(toTSDBLabels(lset), t, v)
switch err {
switch errors.Cause(err) {
case tsdb.ErrNotFound:
return 0, storage.ErrNotFound
return "", storage.ErrNotFound
case tsdb.ErrOutOfOrderSample:
return 0, storage.ErrOutOfOrderSample
return "", storage.ErrOutOfOrderSample
case tsdb.ErrAmendSample:
return 0, storage.ErrDuplicateSampleForTimestamp
return "", storage.ErrDuplicateSampleForTimestamp
}
return ref, err
}
func (a appender) AddFast(ref uint64, t int64, v float64) error {
func (a appender) AddFast(ref string, t int64, v float64) error {
err := a.a.AddFast(ref, t, v)
switch err {
switch errors.Cause(err) {
case tsdb.ErrNotFound:
return storage.ErrNotFound
case tsdb.ErrOutOfOrderSample:

View file

@ -33,10 +33,11 @@ func NewStorage(t T) storage.Storage {
log.With("dir", dir).Debugln("opening test storage")
// Tests just load data for a series sequentially. Thus we
// need a long appendable window.
db, err := tsdb.Open(dir, nil, &tsdb.Options{
MinBlockDuration: 2 * time.Hour,
MinBlockDuration: 24 * time.Hour,
MaxBlockDuration: 24 * time.Hour,
AppendableBlocks: 10,
})
if err != nil {
t.Fatalf("Opening test storage failed: %s", err)

View file

@ -15,7 +15,6 @@ package tsdb
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
@ -73,9 +72,6 @@ type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
ULID ulid.ULID `json:"ulid"`
// Sequence number of the block.
Sequence int `json:"sequence"`
// MinTime and MaxTime specify the time range all samples
// in the block are in.
MinTime int64 `json:"minTime"`
@ -190,7 +186,7 @@ func (pb *persistedBlock) Close() error {
}
func (pb *persistedBlock) String() string {
return fmt.Sprintf("(%d, %s)", pb.meta.Sequence, pb.meta.ULID)
return pb.meta.ULID.String()
}
func (pb *persistedBlock) Querier(mint, maxt int64) Querier {

View file

@ -18,6 +18,7 @@ import (
"math/rand"
"os"
"path/filepath"
"sort"
"time"
"github.com/coreos/etcd/pkg/fileutil"
@ -34,10 +35,10 @@ type Compactor interface {
// Plan returns a set of non-overlapping directories that can
// be compacted concurrently.
// Results returned when compactions are in progress are undefined.
Plan(dir string) ([][]string, error)
Plan() ([][]string, error)
// Write persists a Block into a directory.
Write(dir string, b Block) error
Write(b Block) error
// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
@ -46,6 +47,7 @@ type Compactor interface {
// compactor implements the Compactor interface.
type compactor struct {
dir string
metrics *compactorMetrics
logger log.Logger
opts *compactorOptions
@ -87,8 +89,9 @@ type compactorOptions struct {
maxBlockRange uint64
}
func newCompactor(r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
return &compactor{
dir: dir,
opts: opts,
logger: l,
metrics: newCompactorMetrics(r),
@ -103,13 +106,18 @@ type compactionInfo struct {
const compactionBlocksLen = 3
func (c *compactor) Plan(dir string) ([][]string, error) {
dirs, err := blockDirs(dir)
type dirMeta struct {
dir string
meta *BlockMeta
}
func (c *compactor) Plan() ([][]string, error) {
dirs, err := blockDirs(c.dir)
if err != nil {
return nil, err
}
var bs []*BlockMeta
var dms []dirMeta
for _, dir := range dirs {
meta, err := readMetaFile(dir)
@ -117,25 +125,28 @@ func (c *compactor) Plan(dir string) ([][]string, error) {
return nil, err
}
if meta.Compaction.Generation > 0 {
bs = append(bs, meta)
dms = append(dms, dirMeta{dir, meta})
}
}
sort.Slice(dms, func(i, j int) bool {
return dms[i].meta.MinTime < dms[j].meta.MinTime
})
if len(bs) == 0 {
if len(dms) == 0 {
return nil, nil
}
sliceDirs := func(i, j int) [][]string {
var res []string
for k := i; k < j; k++ {
res = append(res, dirs[k])
res = append(res, dms[k].dir)
}
return [][]string{res}
}
// Then we care about compacting multiple blocks, starting with the oldest.
for i := 0; i < len(bs)-compactionBlocksLen+1; i++ {
if c.match(bs[i : i+3]) {
for i := 0; i < len(dms)-compactionBlocksLen+1; i++ {
if c.match(dms[i : i+3]) {
return sliceDirs(i, i+compactionBlocksLen), nil
}
}
@ -143,26 +154,22 @@ func (c *compactor) Plan(dir string) ([][]string, error) {
return nil, nil
}
func (c *compactor) match(bs []*BlockMeta) bool {
g := bs[0].Compaction.Generation
func (c *compactor) match(dirs []dirMeta) bool {
g := dirs[0].meta.Compaction.Generation
for _, b := range bs {
if b.Compaction.Generation != g {
for _, d := range dirs {
if d.meta.Compaction.Generation != g {
return false
}
}
return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange
return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange
}
func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
m0 := blocks[0].Meta()
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
res.Sequence = m0.Sequence
res.MinTime = m0.MinTime
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime
res.ULID = ulid.MustNew(ulid.Now(), entropy)
res.Compaction.Generation = m0.Compaction.Generation + 1
@ -185,16 +192,27 @@ func (c *compactor) Compact(dirs ...string) (err error) {
blocks = append(blocks, b)
}
return c.write(dirs[0], blocks...)
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
return c.write(uid, blocks...)
}
func (c *compactor) Write(dir string, b Block) error {
return c.write(dir, b)
func (c *compactor) Write(b Block) error {
// Buffering blocks might have been created that often have no data.
if b.Meta().Stats.NumSeries == 0 {
return errors.Wrap(os.RemoveAll(b.Dir()), "remove empty block")
}
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
return c.write(uid, b)
}
// write creates a new block that is the union of the provided blocks into dir.
// It cleans up all files of the old blocks after completing successfully.
func (c *compactor) write(dir string, blocks ...Block) (err error) {
func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks))
defer func(t time.Time) {
@ -204,6 +222,7 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) {
c.metrics.duration.Observe(time.Since(t).Seconds())
}(time.Now())
dir := filepath.Join(c.dir, uid.String())
tmp := dir + ".tmp"
if err = os.RemoveAll(tmp); err != nil {
@ -229,6 +248,8 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) {
if err != nil {
return errors.Wrap(err, "write compaction")
}
meta.ULID = uid
if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
@ -244,7 +265,7 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) {
if err := renameFile(tmp, dir); err != nil {
return errors.Wrap(err, "rename block dir")
}
for _, b := range blocks[1:] {
for _, b := range blocks {
if err := os.RemoveAll(b.Dir()); err != nil {
return err
}

View file

@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
@ -33,6 +34,7 @@ import (
"github.com/coreos/etcd/pkg/fileutil"
"github.com/go-kit/kit/log"
"github.com/nightlyone/lockfile"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/labels"
@ -45,7 +47,6 @@ var DefaultOptions = &Options{
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
AppendableBlocks: 2,
NoLockfile: false,
}
@ -64,13 +65,6 @@ type Options struct {
// The maximum timestamp range of compacted blocks.
MaxBlockDuration uint64
// Number of head blocks that can be appended to.
// Should be two or higher to prevent write errors in general scenarios.
//
// After a new block is started for timestamp t0 or higher, appends with
// timestamps as early as t0 - (n-1) * MinBlockDuration are valid.
AppendableBlocks int
// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool
}
@ -86,11 +80,11 @@ type Appender interface {
// Returned reference numbers are ephemeral and may be rejected in calls
// to AddFast() at any point. Adding the sample via Add() returns a new
// reference number.
Add(l labels.Labels, t int64, v float64) (uint64, error)
Add(l labels.Labels, t int64, v float64) (string, error)
// Add adds a sample pair for the referenced series. It is generally faster
// than adding a sample by providing its full label set.
AddFast(ref uint64, t int64, v float64) error
AddFast(ref string, t int64, v float64) error
// Commit submits the collected samples and purges the batch.
Commit() error
@ -159,11 +153,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
return nil, err
}
absdir, err := filepath.Abs(dir)
if err != nil {
return nil, err
}
if l == nil {
l = log.NewLogfmtLogger(os.Stdout)
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
@ -172,9 +161,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if opts == nil {
opts = DefaultOptions
}
if opts.AppendableBlocks < 1 {
return nil, errors.Errorf("AppendableBlocks must be greater than 0")
}
db = &DB{
dir: dir,
@ -186,6 +172,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
stopc: make(chan struct{}),
}
if !opts.NoLockfile {
absdir, err := filepath.Abs(dir)
if err != nil {
return nil, err
}
lockf, err := lockfile.New(filepath.Join(absdir, "lock"))
if err != nil {
return nil, err
@ -196,7 +186,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
db.lockf = &lockf
}
db.compactor = newCompactor(r, l, &compactorOptions{
db.compactor = newCompactor(dir, r, l, &compactorOptions{
maxBlockRange: opts.MaxBlockDuration,
})
@ -281,8 +271,8 @@ func (db *DB) compact() (changes bool, err error) {
// returning the lock to not block Appenders.
// Selected blocks are semantically ensured to not be written to afterwards
// by appendable().
if len(db.heads) > db.opts.AppendableBlocks {
for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] {
if len(db.heads) > 2 {
for _, h := range db.heads[:len(db.heads)-2] {
// Blocks that won't be appendable when instantiating a new appender
// might still have active appenders on them.
// Abort at the first one we encounter.
@ -302,7 +292,7 @@ func (db *DB) compact() (changes bool, err error) {
default:
}
if err = db.compactor.Write(h.Dir(), h); err != nil {
if err = db.compactor.Write(h); err != nil {
return changes, errors.Wrap(err, "persist head block")
}
changes = true
@ -311,7 +301,7 @@ func (db *DB) compact() (changes bool, err error) {
// Check for compactions of multiple blocks.
for {
plans, err := db.compactor.Plan(db.dir)
plans, err := db.compactor.Plan()
if err != nil {
return changes, errors.Wrap(err, "plan compaction")
}
@ -375,9 +365,9 @@ func retentionCutoff(dir string, mint int64) (bool, error) {
return changes, fileutil.Fsync(df)
}
func (db *DB) seqBlock(i int) (Block, bool) {
func (db *DB) getBlock(id ulid.ULID) (Block, bool) {
for _, b := range db.blocks {
if b.Meta().Sequence == i {
if b.Meta().ULID == id {
return b, true
}
}
@ -399,10 +389,8 @@ func (db *DB) reloadBlocks() error {
return errors.Wrap(err, "find blocks")
}
var (
metas []*BlockMeta
blocks []Block
heads []headBlock
seqBlocks = make(map[int]Block, len(dirs))
exist = map[ulid.ULID]struct{}{}
)
for _, dir := range dirs {
@ -410,47 +398,58 @@ func (db *DB) reloadBlocks() error {
if err != nil {
return errors.Wrapf(err, "read meta information %s", dir)
}
metas = append(metas, meta)
}
for i, meta := range metas {
b, ok := db.seqBlock(meta.Sequence)
if meta.Compaction.Generation == 0 {
b, ok := db.getBlock(meta.ULID)
if !ok {
b, err = db.openHeadBlock(dirs[i])
if err != nil {
return errors.Wrapf(err, "load head at %s", dirs[i])
}
}
if meta.ULID != b.Meta().ULID {
return errors.Errorf("head block ULID changed unexpectedly")
}
heads = append(heads, b.(headBlock))
if meta.Compaction.Generation == 0 {
b, err = db.openHeadBlock(dir)
} else {
if !ok || meta.ULID != b.Meta().ULID {
b, err = newPersistedBlock(dirs[i])
b, err = newPersistedBlock(dir)
}
if err != nil {
return errors.Wrapf(err, "open persisted block %s", dirs[i])
}
return errors.Wrapf(err, "open block %s", dir)
}
}
seqBlocks[meta.Sequence] = b
blocks = append(blocks, b)
exist[meta.ULID] = struct{}{}
}
// Close all blocks that we no longer need. They are closed after returning all
// locks to avoid questionable locking order.
if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence")
}
// Close all opened blocks that no longer exist after we returned all locks.
for _, b := range db.blocks {
if nb, ok := seqBlocks[b.Meta().Sequence]; !ok || nb != b {
if _, ok := exist[b.Meta().ULID]; !ok {
cs = append(cs, b)
}
}
db.blocks = blocks
db.heads = heads
db.heads = nil
for _, b := range blocks {
if b.Meta().Compaction.Generation == 0 {
db.heads = append(db.heads, b.(*HeadBlock))
}
}
return nil
}
func validateBlockSequence(bs []Block) error {
if len(bs) == 0 {
return nil
}
sort.Slice(bs, func(i, j int) bool {
return bs[i].Meta().MinTime < bs[j].Meta().MinTime
})
prev := bs[0]
for _, b := range bs[1:] {
if b.Meta().MinTime < prev.Meta().MaxTime {
return errors.Errorf("block time ranges overlap", b.Meta().MinTime, prev.Meta().MaxTime)
}
}
return nil
}
@ -482,27 +481,7 @@ func (db *DB) Close() error {
// Appender returns a new Appender on the database.
func (db *DB) Appender() Appender {
db.mtx.RLock()
a := &dbAppender{db: db}
// XXX(fabxc): turn off creating initial appender as it will happen on-demand
// anyway. For now this, with combination of only having a single timestamp per batch,
// prevents opening more than one appender and hitting an unresolved deadlock (#11).
//
// Only instantiate appender after returning the headmtx to avoid
// questionable locking order.
db.headmtx.RLock()
app := db.appendable()
db.headmtx.RUnlock()
for _, b := range app {
a.heads = append(a.heads, &metaAppender{
meta: b.Meta(),
app: b.Appender(),
})
}
return a
return &dbAppender{db: db}
}
type dbAppender struct {
@ -517,34 +496,39 @@ type metaAppender struct {
app Appender
}
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
h, err := a.appenderFor(t)
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
h, err := a.appenderAt(t)
if err != nil {
return 0, err
return "", err
}
ref, err := h.app.Add(lset, t, v)
if err != nil {
return 0, err
return "", err
}
a.samples++
// Store last byte of sequence number in 3rd byte of reference.
return ref | (uint64(h.meta.Sequence&0xff) << 40), nil
return string(append(h.meta.ULID[:], ref...)), nil
}
func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
// Load the head last byte of the head sequence from the 3rd byte of the
// reference number.
gen := (ref << 16) >> 56
h, err := a.appenderFor(t)
func (a *dbAppender) AddFast(ref string, t int64, v float64) error {
if len(ref) < 16 {
return errors.Wrap(ErrNotFound, "invalid ref length")
}
// The first 16 bytes a ref hold the ULID of the head block.
h, err := a.appenderAt(t)
if err != nil {
return err
}
// If the last byte of the sequence does not add up, the reference is not valid.
if uint64(h.meta.Sequence&0xff) != gen {
// Validate the ref points to the same block we got for t.
if string(h.meta.ULID[:]) != ref[:16] {
return ErrNotFound
}
if err := h.app.AddFast(ref, t, v); err != nil {
if err := h.app.AddFast(ref[16:], t, v); err != nil {
// The block the ref points to might fit the given timestamp.
// We mask the error to stick with our contract.
if errors.Cause(err) == ErrOutOfBounds {
err = ErrNotFound
}
return err
}
@ -554,85 +538,84 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
// appenderFor gets the appender for the head containing timestamp t.
// If the head block doesn't exist yet, it gets created.
func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) {
// If there's no fitting head block for t, ensure it gets created.
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) {
for _, h := range a.heads {
if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) {
return h, nil
}
}
// Currently opened appenders do not cover t. Ensure the head block is
// created and add missing appenders.
a.db.headmtx.Lock()
var newHeads []headBlock
if err := a.db.ensureHead(t); err != nil {
a.db.headmtx.Unlock()
return nil, err
}
if len(a.heads) == 0 {
newHeads = append(newHeads, a.db.appendable()...)
} else {
maxSeq := a.heads[len(a.heads)-1].meta.Sequence
for _, b := range a.db.appendable() {
if b.Meta().Sequence > maxSeq {
newHeads = append(newHeads, b)
}
}
}
var hb headBlock
for _, h := range a.db.appendable() {
m := h.Meta()
if intervalContains(m.MinTime, m.MaxTime-1, t) {
hb = h
break
}
}
a.db.headmtx.Unlock()
// XXX(fabxc): temporary workaround. See comment on instantiating DB.Appender.
// for _, b := range newHeads {
// // Only get appender for the block with the specific timestamp.
// if t >= b.Meta().MaxTime {
// continue
// }
// a.heads = append(a.heads, &metaAppender{
// app: b.Appender(),
// meta: b.Meta(),
// })
// break
// }
if hb == nil {
return nil, ErrOutOfBounds
}
// Instantiate appender after returning headmtx!
app := &metaAppender{
meta: hb.Meta(),
app: hb.Appender(),
}
a.heads = append(a.heads, app)
// Instantiate appenders after returning headmtx to avoid questionable
// locking order.
for _, b := range newHeads {
a.heads = append(a.heads, &metaAppender{
app: b.Appender(),
meta: b.Meta(),
})
}
}
for i := len(a.heads) - 1; i >= 0; i-- {
if h := a.heads[i]; t >= h.meta.MinTime {
return h, nil
}
}
return app, nil
}
return nil, ErrNotFound
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
mint = (t / width) * width
return mint, mint + width
}
// ensureHead makes sure that there is a head block for the timestamp t if
// it is within or after the currently appendable window.
func (db *DB) ensureHead(t int64) error {
// Initial case for a new database: we must create the first
// AppendableBlocks-1 front padding heads.
if len(db.heads) == 0 {
for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- {
if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil {
var (
mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration))
addBuffer = len(db.blocks) == 0
last BlockMeta
)
if !addBuffer {
last = db.blocks[len(db.blocks)-1].Meta()
addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration)
}
// Create another block of buffer in front if the DB is initialized or retrieving
// new data after a long gap.
// This ensures we always have a full block width if append window.
if addBuffer {
if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil {
return err
}
// If the previous block reaches into our new window, make it smaller.
} else if mt := last.MaxTime; mt > mint {
mint = mt
}
}
for {
h := db.heads[len(db.heads)-1]
m := h.Meta()
// If t doesn't exceed the range of heads blocks, there's nothing to do.
if t < m.MaxTime {
if mint >= maxt {
return nil
}
if _, err := db.cut(m.MaxTime); err != nil {
// Error if the requested time for a head is before the appendable window.
if len(db.heads) > 0 && t < db.heads[0].Meta().MinTime {
return ErrOutOfBounds
}
_, err := db.createHeadBlock(mint, maxt)
return err
}
}
}
func (a *dbAppender) Commit() error {
@ -640,14 +623,22 @@ func (a *dbAppender) Commit() error {
// Commits to partial appenders must be concurrent as concurrent appenders
// may have conflicting locks on head appenders.
// XXX(fabxc): is this a leaky abstraction? Should make an effort to catch a multi-error?
var g errgroup.Group
// For high-throughput use cases the errgroup causes significant blocking. Typically,
// we just deal with a single appender and special case it.
var err error
switch len(a.heads) {
case 1:
err = a.heads[0].app.Commit()
default:
var g errgroup.Group
for _, h := range a.heads {
g.Go(h.app.Commit)
}
err = g.Wait()
}
if err := g.Wait(); err != nil {
if err != nil {
return err
}
// XXX(fabxc): Push the metric down into head block to account properly
@ -670,14 +661,15 @@ func (a *dbAppender) Rollback() error {
}
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
func (db *DB) appendable() []headBlock {
var i int
app := make([]headBlock, 0, db.opts.AppendableBlocks)
if len(db.heads) > db.opts.AppendableBlocks {
i = len(db.heads) - db.opts.AppendableBlocks
func (db *DB) appendable() (r []headBlock) {
switch len(db.heads) {
case 0:
case 1:
r = append(r, db.heads[0])
default:
r = append(r, db.heads[len(db.heads)-2:]...)
}
return append(app, db.heads[i:]...)
return r
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
@ -712,7 +704,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
// openHeadBlock opens the head block at dir.
func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
var (
wdir = filepath.Join(dir, "wal")
wdir = walDir(dir)
l = log.With(db.logger, "wal", wdir)
)
wal, err := OpenSegmentWAL(wdir, l, 5*time.Second)
@ -727,16 +719,10 @@ func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
return h, nil
}
// cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period.
func (db *DB) cut(mint int64) (headBlock, error) {
maxt := mint + int64(db.opts.MinBlockDuration)
dir, seq, err := nextSequenceFile(db.dir, "b-")
// createHeadBlock starts a new head block to append to.
func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) {
dir, err := TouchHeadBlock(db.dir, mint, maxt)
if err != nil {
return nil, err
}
if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil {
return nil, errors.Wrapf(err, "touch head block %s", dir)
}
newHead, err := db.openHeadBlock(dir)
@ -759,13 +745,8 @@ func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
}
if !strings.HasPrefix(fi.Name(), "b-") {
return false
}
if _, err := strconv.ParseUint(fi.Name()[2:], 10, 32); err != nil {
return false
}
return true
_, err := ulid.Parse(fi.Name())
return err == nil
}
func blockDirs(dir string) ([]string, error) {
@ -870,9 +851,8 @@ func (es MultiError) Err() error {
return es
}
func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}
func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) }
func yoloBytes(s string) []byte { return *((*[]byte)(unsafe.Pointer(&s))) }
func closeAll(cs ...io.Closer) error {
var merr MultiError

View file

@ -18,11 +18,14 @@ import (
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
"time"
"encoding/binary"
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
@ -71,30 +74,30 @@ type HeadBlock struct {
// TouchHeadBlock atomically touches a new head block in dir for
// samples in the range [mint,maxt).
func TouchHeadBlock(dir string, seq int, mint, maxt int64) error {
// Make head block creation appear atomic.
tmp := dir + ".tmp"
if err := os.MkdirAll(tmp, 0777); err != nil {
return err
}
func TouchHeadBlock(dir string, mint, maxt int64) (string, error) {
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
ulid, err := ulid.New(ulid.Now(), entropy)
if err != nil {
return err
return "", err
}
// Make head block creation appear atomic.
dir = filepath.Join(dir, ulid.String())
tmp := dir + ".tmp"
if err := os.MkdirAll(tmp, 0777); err != nil {
return "", err
}
if err := writeMetaFile(tmp, &BlockMeta{
ULID: ulid,
Sequence: seq,
MinTime: mint,
MaxTime: maxt,
}); err != nil {
return err
return "", err
}
return renameFile(tmp, dir)
return dir, renameFile(tmp, dir)
}
// OpenHeadBlock opens the head block in dir.
@ -148,7 +151,7 @@ func (h *HeadBlock) inBounds(t int64) bool {
}
func (h *HeadBlock) String() string {
return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID)
return h.meta.ULID.String()
}
// Close syncs all data and closes underlying resources of the head block.
@ -176,10 +179,10 @@ func (h *HeadBlock) Close() error {
return nil
}
// Meta returns a BlockMeta for the head block.
func (h *HeadBlock) Meta() BlockMeta {
m := BlockMeta{
ULID: h.meta.ULID,
Sequence: h.meta.Sequence,
MinTime: h.meta.MinTime,
MaxTime: h.meta.MaxTime,
Compaction: h.meta.Compaction,
@ -192,11 +195,16 @@ func (h *HeadBlock) Meta() BlockMeta {
return m
}
// Dir returns the directory of the block.
func (h *HeadBlock) Dir() string { return h.dir }
func (h *HeadBlock) Persisted() bool { return false }
// Index returns an IndexReader against the block.
func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
// Chunks returns a ChunkReader against the block.
func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
// Querier returns a new Querier against the block for the range [mint, maxt].
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
h.mtx.RLock()
defer h.mtx.RUnlock()
@ -236,6 +244,7 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier {
}
}
// Appender returns a new Appender against the head block.
func (h *HeadBlock) Appender() Appender {
atomic.AddUint64(&h.activeWriters, 1)
@ -247,6 +256,7 @@ func (h *HeadBlock) Appender() Appender {
return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()}
}
// Busy returns true if the block has open write transactions.
func (h *HeadBlock) Busy() bool {
return atomic.LoadUint64(&h.activeWriters) > 0
}
@ -268,74 +278,89 @@ func putHeadAppendBuffer(b []RefSample) {
type headAppender struct {
*HeadBlock
newSeries map[uint64]hashedLabels
newHashes map[uint64]uint64
refmap map[uint64]uint64
newSeries []*hashedLabels
newLabels []labels.Labels
newHashes map[uint64]uint64
samples []RefSample
}
type hashedLabels struct {
ref uint64
hash uint64
labels labels.Labels
}
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
if !a.inBounds(t) {
return 0, ErrOutOfBounds
return "", ErrOutOfBounds
}
hash := lset.Hash()
refb := make([]byte, 8)
// Series exists already in the block.
if ms := a.get(hash, lset); ms != nil {
return uint64(ms.ref), a.AddFast(uint64(ms.ref), t, v)
binary.BigEndian.PutUint64(refb, uint64(ms.ref))
return string(refb), a.AddFast(string(refb), t, v)
}
// Series was added in this transaction previously.
if ref, ok := a.newHashes[hash]; ok {
return uint64(ref), a.AddFast(uint64(ref), t, v)
binary.BigEndian.PutUint64(refb, ref)
// XXX(fabxc): there's no fast path for multiple samples for the same new series
// in the same transaction. We always return the invalid empty ref. It's has not
// been a relevant use case so far and is not worth the trouble.
return nullRef, a.AddFast(string(refb), t, v)
}
// We only know the actual reference after committing. We generate an
// intermediate reference only valid for this batch.
// It is indicated by the the LSB of the 4th byte being set to 1.
// We use a random ID to avoid collisions when new series are created
// in two subsequent batches.
// TODO(fabxc): Provide method for client to determine whether a ref
// is valid beyond the current transaction.
ref := uint64(rand.Int31()) | (1 << 32)
// The series is completely new.
if a.newSeries == nil {
a.newSeries = map[uint64]hashedLabels{}
a.newHashes = map[uint64]uint64{}
a.refmap = map[uint64]uint64{}
}
a.newSeries[ref] = hashedLabels{hash: hash, labels: lset}
a.newHashes[hash] = ref
// First sample for new series.
ref := uint64(len(a.newSeries))
return ref, a.AddFast(ref, t, v)
a.newSeries = append(a.newSeries, &hashedLabels{
ref: ref,
hash: hash,
labels: lset,
})
// First bit indicates its a series created in this transaction.
ref |= (1 << 63)
a.newHashes[hash] = ref
binary.BigEndian.PutUint64(refb, ref)
return nullRef, a.AddFast(string(refb), t, v)
}
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
// We only own the last 5 bytes of the reference. Anything before is
// used by higher-order appenders. We erase it to avoid issues.
ref = (ref << 24) >> 24
var nullRef = string([]byte{0, 0, 0, 0, 0, 0, 0, 0})
func (a *headAppender) AddFast(ref string, t int64, v float64) error {
if len(ref) != 8 {
return errors.Wrap(ErrNotFound, "invalid ref length")
}
var (
refn = binary.BigEndian.Uint64(yoloBytes(ref))
id = (refn << 1) >> 1
inTx = refn&(1<<63) != 0
)
// Distinguish between existing series and series created in
// this transaction.
if ref&(1<<32) != 0 {
if _, ok := a.newSeries[ref]; !ok {
return ErrNotFound
if inTx {
if id > uint64(len(a.newSeries)-1) {
return errors.Wrap(ErrNotFound, "transaction series ID too high")
}
// TODO(fabxc): we also have to validate here that the
// sample sequence is valid.
// We also have to revalidate it as we switch locks an create
// We also have to revalidate it as we switch locks and create
// the new series.
} else if ref > uint64(len(a.series)) {
return ErrNotFound
} else if id > uint64(len(a.series)) {
return errors.Wrap(ErrNotFound, "transaction series ID too high")
} else {
ms := a.series[int(ref)]
ms := a.series[id]
if ms == nil {
return ErrNotFound
return errors.Wrap(ErrNotFound, "nil series")
}
// TODO(fabxc): memory series should be locked here already.
// Only problem is release of locks in case of a rollback.
@ -356,7 +381,7 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
}
a.samples = append(a.samples, RefSample{
Ref: ref,
Ref: refn,
T: t,
V: v,
})
@ -375,18 +400,18 @@ func (a *headAppender) createSeries() {
base1 := len(a.series)
for ref, l := range a.newSeries {
for _, l := range a.newSeries {
// We switched locks and have to re-validate that the series were not
// created by another goroutine in the meantime.
if base1 > base0 {
if ms := a.get(l.hash, l.labels); ms != nil {
a.refmap[ref] = uint64(ms.ref)
l.ref = uint64(ms.ref)
continue
}
}
// Series is still new.
a.newLabels = append(a.newLabels, l.labels)
a.refmap[ref] = uint64(len(a.series))
l.ref = uint64(len(a.series))
a.create(l.hash, l.labels)
}
@ -401,11 +426,11 @@ func (a *headAppender) Commit() error {
a.createSeries()
// We have to update the refs of samples for series we just created.
for i := range a.samples {
s := &a.samples[i]
if s.Ref&(1<<32) > 0 {
s.Ref = a.refmap[s.Ref]
if s.Ref&(1<<63) != 0 {
s.Ref = a.newSeries[(s.Ref<<1)>>1].ref
}
}
@ -514,6 +539,9 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error
return nil, nil, ErrNotFound
}
s := h.series[ref]
if s == nil {
return nil, nil, ErrNotFound
}
metas := make([]*ChunkMeta, 0, len(s.chunks))
s.mtx.RLock()

View file

@ -21,7 +21,6 @@ import (
"io"
"math"
"os"
"path/filepath"
"sync"
"time"
@ -91,7 +90,6 @@ type RefSample struct {
}
const (
walDirName = "wal"
walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB
)
@ -107,8 +105,6 @@ func init() {
// OpenSegmentWAL opens or creates a write ahead log in the given directory.
// The WAL must be read completely before new data is written.
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) {
dir = filepath.Join(dir, walDirName)
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}

14
vendor/vendor.json vendored
View file

@ -661,22 +661,22 @@
"revisionTime": "2016-04-11T19:08:41Z"
},
{
"checksumSHA1": "T+9Tl4utHkpYSdVFRpdfLloShTM=",
"checksumSHA1": "q2GxuO+ppV/gqBir/Z6ijx7aOOU=",
"path": "github.com/prometheus/tsdb",
"revision": "c8438cfc8113a39f75e398bf00c481d3cb1069f6",
"revisionTime": "2017-05-14T09:51:56Z"
"revision": "4f2eb2057ee0a7f2b984503886bff970a9dab1a8",
"revisionTime": "2017-05-22T06:49:09Z"
},
{
"checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=",
"path": "github.com/prometheus/tsdb/chunks",
"revision": "c8438cfc8113a39f75e398bf00c481d3cb1069f6",
"revisionTime": "2017-05-14T09:51:56Z"
"revision": "4f2eb2057ee0a7f2b984503886bff970a9dab1a8",
"revisionTime": "2017-05-22T06:49:09Z"
},
{
"checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=",
"path": "github.com/prometheus/tsdb/labels",
"revision": "c8438cfc8113a39f75e398bf00c481d3cb1069f6",
"revisionTime": "2017-05-14T09:51:56Z"
"revision": "4f2eb2057ee0a7f2b984503886bff970a9dab1a8",
"revisionTime": "2017-05-22T06:49:09Z"
},
{
"checksumSHA1": "+49Vr4Me28p3cR+gxX5SUQHbbas=",