mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-24 05:04:05 -08:00
Switch blocks to ULID directories, drop sequenc numbers
This commit is contained in:
parent
285bc07030
commit
39df7e2bba
6
block.go
6
block.go
|
@ -15,7 +15,6 @@ package tsdb
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -73,9 +72,6 @@ type BlockMeta struct {
|
|||
// Unique identifier for the block and its contents. Changes on compaction.
|
||||
ULID ulid.ULID `json:"ulid"`
|
||||
|
||||
// Sequence number of the block.
|
||||
Sequence int `json:"sequence"`
|
||||
|
||||
// MinTime and MaxTime specify the time range all samples
|
||||
// in the block are in.
|
||||
MinTime int64 `json:"minTime"`
|
||||
|
@ -190,7 +186,7 @@ func (pb *persistedBlock) Close() error {
|
|||
}
|
||||
|
||||
func (pb *persistedBlock) String() string {
|
||||
return fmt.Sprintf("(%d, %s)", pb.meta.Sequence, pb.meta.ULID)
|
||||
return pb.meta.ULID.String()
|
||||
}
|
||||
|
||||
func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
promlabels "github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/textparse"
|
||||
"github.com/prometheus/tsdb"
|
||||
|
@ -114,7 +115,6 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
|
|||
RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
||||
MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||
AppendableBlocks: 2,
|
||||
})
|
||||
if err != nil {
|
||||
exitWithError(err)
|
||||
|
@ -225,7 +225,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount
|
|||
s.ref = &ref
|
||||
} else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil {
|
||||
|
||||
if err.Error() != "not found" {
|
||||
if errors.Cause(err) != tsdb.ErrNotFound {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
|
|
59
compact.go
59
compact.go
|
@ -20,6 +20,8 @@ import (
|
|||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"go4.org/sort"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/oklog/ulid"
|
||||
|
@ -34,10 +36,10 @@ type Compactor interface {
|
|||
// Plan returns a set of non-overlapping directories that can
|
||||
// be compacted concurrently.
|
||||
// Results returned when compactions are in progress are undefined.
|
||||
Plan(dir string) ([][]string, error)
|
||||
Plan() ([][]string, error)
|
||||
|
||||
// Write persists a Block into a directory.
|
||||
Write(dir string, b Block) error
|
||||
Write(b Block) error
|
||||
|
||||
// Compact runs compaction against the provided directories. Must
|
||||
// only be called concurrently with results of Plan().
|
||||
|
@ -46,6 +48,7 @@ type Compactor interface {
|
|||
|
||||
// compactor implements the Compactor interface.
|
||||
type compactor struct {
|
||||
dir string
|
||||
metrics *compactorMetrics
|
||||
logger log.Logger
|
||||
opts *compactorOptions
|
||||
|
@ -87,8 +90,9 @@ type compactorOptions struct {
|
|||
maxBlockRange uint64
|
||||
}
|
||||
|
||||
func newCompactor(r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
||||
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
||||
return &compactor{
|
||||
dir: dir,
|
||||
opts: opts,
|
||||
logger: l,
|
||||
metrics: newCompactorMetrics(r),
|
||||
|
@ -103,13 +107,18 @@ type compactionInfo struct {
|
|||
|
||||
const compactionBlocksLen = 3
|
||||
|
||||
func (c *compactor) Plan(dir string) ([][]string, error) {
|
||||
dirs, err := blockDirs(dir)
|
||||
type dirMeta struct {
|
||||
dir string
|
||||
meta *BlockMeta
|
||||
}
|
||||
|
||||
func (c *compactor) Plan() ([][]string, error) {
|
||||
dirs, err := blockDirs(c.dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var bs []*BlockMeta
|
||||
var bs []dirMeta
|
||||
|
||||
for _, dir := range dirs {
|
||||
meta, err := readMetaFile(dir)
|
||||
|
@ -117,9 +126,12 @@ func (c *compactor) Plan(dir string) ([][]string, error) {
|
|||
return nil, err
|
||||
}
|
||||
if meta.Compaction.Generation > 0 {
|
||||
bs = append(bs, meta)
|
||||
bs = append(bs, dirMeta{dir, meta})
|
||||
}
|
||||
}
|
||||
sort.Slice(bs, func(i, j int) bool {
|
||||
return bs[i].meta.MinTime < bs[j].meta.MinTime
|
||||
})
|
||||
|
||||
if len(bs) == 0 {
|
||||
return nil, nil
|
||||
|
@ -128,7 +140,7 @@ func (c *compactor) Plan(dir string) ([][]string, error) {
|
|||
sliceDirs := func(i, j int) [][]string {
|
||||
var res []string
|
||||
for k := i; k < j; k++ {
|
||||
res = append(res, dirs[k])
|
||||
res = append(res, bs[k].dir)
|
||||
}
|
||||
return [][]string{res}
|
||||
}
|
||||
|
@ -143,26 +155,22 @@ func (c *compactor) Plan(dir string) ([][]string, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *compactor) match(bs []*BlockMeta) bool {
|
||||
g := bs[0].Compaction.Generation
|
||||
func (c *compactor) match(bs []dirMeta) bool {
|
||||
g := bs[0].meta.Compaction.Generation
|
||||
|
||||
for _, b := range bs {
|
||||
if b.Compaction.Generation != g {
|
||||
if b.meta.Compaction.Generation != g {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange
|
||||
return uint64(bs[len(bs)-1].meta.MaxTime-bs[0].meta.MinTime) <= c.opts.maxBlockRange
|
||||
}
|
||||
|
||||
func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
|
||||
m0 := blocks[0].Meta()
|
||||
|
||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
res.Sequence = m0.Sequence
|
||||
res.MinTime = m0.MinTime
|
||||
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime
|
||||
res.ULID = ulid.MustNew(ulid.Now(), entropy)
|
||||
|
||||
res.Compaction.Generation = m0.Compaction.Generation + 1
|
||||
|
||||
|
@ -185,16 +193,22 @@ func (c *compactor) Compact(dirs ...string) (err error) {
|
|||
blocks = append(blocks, b)
|
||||
}
|
||||
|
||||
return c.write(dirs[0], blocks...)
|
||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||
|
||||
return c.write(uid, blocks...)
|
||||
}
|
||||
|
||||
func (c *compactor) Write(dir string, b Block) error {
|
||||
return c.write(dir, b)
|
||||
func (c *compactor) Write(b Block) error {
|
||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||
|
||||
return c.write(uid, b)
|
||||
}
|
||||
|
||||
// write creates a new block that is the union of the provided blocks into dir.
|
||||
// It cleans up all files of the old blocks after completing successfully.
|
||||
func (c *compactor) write(dir string, blocks ...Block) (err error) {
|
||||
func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||
c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks))
|
||||
|
||||
defer func(t time.Time) {
|
||||
|
@ -204,6 +218,7 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) {
|
|||
c.metrics.duration.Observe(time.Since(t).Seconds())
|
||||
}(time.Now())
|
||||
|
||||
dir := filepath.Join(c.dir, uid.String())
|
||||
tmp := dir + ".tmp"
|
||||
|
||||
if err = os.RemoveAll(tmp); err != nil {
|
||||
|
@ -229,6 +244,8 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) {
|
|||
if err != nil {
|
||||
return errors.Wrap(err, "write compaction")
|
||||
}
|
||||
meta.ULID = uid
|
||||
|
||||
if err = writeMetaFile(tmp, meta); err != nil {
|
||||
return errors.Wrap(err, "write merged meta")
|
||||
}
|
||||
|
@ -244,7 +261,7 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) {
|
|||
if err := renameFile(tmp, dir); err != nil {
|
||||
return errors.Wrap(err, "rename block dir")
|
||||
}
|
||||
for _, b := range blocks[1:] {
|
||||
for _, b := range blocks {
|
||||
if err := os.RemoveAll(b.Dir()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
326
db.go
326
db.go
|
@ -22,6 +22,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -33,6 +34,7 @@ import (
|
|||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/nightlyone/lockfile"
|
||||
"github.com/oklog/ulid"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
|
@ -45,7 +47,6 @@ var DefaultOptions = &Options{
|
|||
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
||||
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||
AppendableBlocks: 2,
|
||||
NoLockfile: false,
|
||||
}
|
||||
|
||||
|
@ -64,13 +65,6 @@ type Options struct {
|
|||
// The maximum timestamp range of compacted blocks.
|
||||
MaxBlockDuration uint64
|
||||
|
||||
// Number of head blocks that can be appended to.
|
||||
// Should be two or higher to prevent write errors in general scenarios.
|
||||
//
|
||||
// After a new block is started for timestamp t0 or higher, appends with
|
||||
// timestamps as early as t0 - (n-1) * MinBlockDuration are valid.
|
||||
AppendableBlocks int
|
||||
|
||||
// NoLockfile disables creation and consideration of a lock file.
|
||||
NoLockfile bool
|
||||
}
|
||||
|
@ -159,11 +153,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
return nil, err
|
||||
}
|
||||
|
||||
absdir, err := filepath.Abs(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if l == nil {
|
||||
l = log.NewLogfmtLogger(os.Stdout)
|
||||
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
||||
|
@ -172,9 +161,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
if opts == nil {
|
||||
opts = DefaultOptions
|
||||
}
|
||||
if opts.AppendableBlocks < 1 {
|
||||
return nil, errors.Errorf("AppendableBlocks must be greater than 0")
|
||||
}
|
||||
|
||||
db = &DB{
|
||||
dir: dir,
|
||||
|
@ -186,6 +172,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
stopc: make(chan struct{}),
|
||||
}
|
||||
if !opts.NoLockfile {
|
||||
absdir, err := filepath.Abs(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockf, err := lockfile.New(filepath.Join(absdir, "lock"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -196,7 +186,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
db.lockf = &lockf
|
||||
}
|
||||
|
||||
db.compactor = newCompactor(r, l, &compactorOptions{
|
||||
db.compactor = newCompactor(dir, r, l, &compactorOptions{
|
||||
maxBlockRange: opts.MaxBlockDuration,
|
||||
})
|
||||
|
||||
|
@ -281,8 +271,8 @@ func (db *DB) compact() (changes bool, err error) {
|
|||
// returning the lock to not block Appenders.
|
||||
// Selected blocks are semantically ensured to not be written to afterwards
|
||||
// by appendable().
|
||||
if len(db.heads) > db.opts.AppendableBlocks {
|
||||
for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] {
|
||||
if len(db.heads) > 2 {
|
||||
for _, h := range db.heads[:len(db.heads)-2] {
|
||||
// Blocks that won't be appendable when instantiating a new appender
|
||||
// might still have active appenders on them.
|
||||
// Abort at the first one we encounter.
|
||||
|
@ -302,7 +292,7 @@ func (db *DB) compact() (changes bool, err error) {
|
|||
default:
|
||||
}
|
||||
|
||||
if err = db.compactor.Write(h.Dir(), h); err != nil {
|
||||
if err = db.compactor.Write(h); err != nil {
|
||||
return changes, errors.Wrap(err, "persist head block")
|
||||
}
|
||||
changes = true
|
||||
|
@ -311,7 +301,7 @@ func (db *DB) compact() (changes bool, err error) {
|
|||
|
||||
// Check for compactions of multiple blocks.
|
||||
for {
|
||||
plans, err := db.compactor.Plan(db.dir)
|
||||
plans, err := db.compactor.Plan()
|
||||
if err != nil {
|
||||
return changes, errors.Wrap(err, "plan compaction")
|
||||
}
|
||||
|
@ -375,9 +365,9 @@ func retentionCutoff(dir string, mint int64) (bool, error) {
|
|||
return changes, fileutil.Fsync(df)
|
||||
}
|
||||
|
||||
func (db *DB) seqBlock(i int) (Block, bool) {
|
||||
func (db *DB) getBlock(id ulid.ULID) (Block, bool) {
|
||||
for _, b := range db.blocks {
|
||||
if b.Meta().Sequence == i {
|
||||
if b.Meta().ULID == id {
|
||||
return b, true
|
||||
}
|
||||
}
|
||||
|
@ -399,10 +389,8 @@ func (db *DB) reloadBlocks() error {
|
|||
return errors.Wrap(err, "find blocks")
|
||||
}
|
||||
var (
|
||||
metas []*BlockMeta
|
||||
blocks []Block
|
||||
heads []headBlock
|
||||
seqBlocks = make(map[int]Block, len(dirs))
|
||||
blocks []Block
|
||||
exist = map[ulid.ULID]struct{}{}
|
||||
)
|
||||
|
||||
for _, dir := range dirs {
|
||||
|
@ -410,47 +398,58 @@ func (db *DB) reloadBlocks() error {
|
|||
if err != nil {
|
||||
return errors.Wrapf(err, "read meta information %s", dir)
|
||||
}
|
||||
metas = append(metas, meta)
|
||||
}
|
||||
|
||||
for i, meta := range metas {
|
||||
b, ok := db.seqBlock(meta.Sequence)
|
||||
|
||||
if meta.Compaction.Generation == 0 {
|
||||
if !ok {
|
||||
b, err = db.openHeadBlock(dirs[i])
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "load head at %s", dirs[i])
|
||||
}
|
||||
b, ok := db.getBlock(meta.ULID)
|
||||
if !ok {
|
||||
if meta.Compaction.Generation == 0 {
|
||||
b, err = db.openHeadBlock(dir)
|
||||
} else {
|
||||
b, err = newPersistedBlock(dir)
|
||||
}
|
||||
if meta.ULID != b.Meta().ULID {
|
||||
return errors.Errorf("head block ULID changed unexpectedly")
|
||||
}
|
||||
heads = append(heads, b.(headBlock))
|
||||
} else {
|
||||
if !ok || meta.ULID != b.Meta().ULID {
|
||||
b, err = newPersistedBlock(dirs[i])
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "open persisted block %s", dirs[i])
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "open block %s", dir)
|
||||
}
|
||||
}
|
||||
|
||||
seqBlocks[meta.Sequence] = b
|
||||
blocks = append(blocks, b)
|
||||
exist[meta.ULID] = struct{}{}
|
||||
}
|
||||
|
||||
// Close all blocks that we no longer need. They are closed after returning all
|
||||
// locks to avoid questionable locking order.
|
||||
if err := validateBlockSequence(blocks); err != nil {
|
||||
return errors.Wrap(err, "invalid block sequence")
|
||||
}
|
||||
// Close all opened blocks that no longer exist after we returned all locks.
|
||||
for _, b := range db.blocks {
|
||||
if nb, ok := seqBlocks[b.Meta().Sequence]; !ok || nb != b {
|
||||
if _, ok := exist[b.Meta().ULID]; !ok {
|
||||
cs = append(cs, b)
|
||||
}
|
||||
}
|
||||
|
||||
db.blocks = blocks
|
||||
db.heads = heads
|
||||
db.heads = nil
|
||||
|
||||
for _, b := range blocks {
|
||||
if b.Meta().Compaction.Generation == 0 {
|
||||
db.heads = append(db.heads, b.(*HeadBlock))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateBlockSequence(bs []Block) error {
|
||||
if len(bs) == 0 {
|
||||
return nil
|
||||
}
|
||||
sort.Slice(bs, func(i, j int) bool {
|
||||
return bs[i].Meta().MinTime < bs[j].Meta().MinTime
|
||||
})
|
||||
prev := bs[0]
|
||||
for _, b := range bs[1:] {
|
||||
if b.Meta().MinTime < prev.Meta().MaxTime {
|
||||
return errors.Errorf("block time ranges overlap", b.Meta().MinTime, prev.Meta().MaxTime)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -482,27 +481,27 @@ func (db *DB) Close() error {
|
|||
// Appender returns a new Appender on the database.
|
||||
func (db *DB) Appender() Appender {
|
||||
db.mtx.RLock()
|
||||
a := &dbAppender{db: db}
|
||||
return &dbAppender{db: db}
|
||||
|
||||
// XXX(fabxc): turn off creating initial appender as it will happen on-demand
|
||||
// anyway. For now this, with combination of only having a single timestamp per batch,
|
||||
// prevents opening more than one appender and hitting an unresolved deadlock (#11).
|
||||
//
|
||||
// // XXX(fabxc): turn off creating initial appender as it will happen on-demand
|
||||
// // anyway. For now this, with combination of only having a single timestamp per batch,
|
||||
// // prevents opening more than one appender and hitting an unresolved deadlock (#11).
|
||||
// //
|
||||
|
||||
// Only instantiate appender after returning the headmtx to avoid
|
||||
// questionable locking order.
|
||||
db.headmtx.RLock()
|
||||
app := db.appendable()
|
||||
db.headmtx.RUnlock()
|
||||
// // Only instantiate appender after returning the headmtx to avoid
|
||||
// // questionable locking order.
|
||||
// db.headmtx.RLock()
|
||||
// app := db.appendable()
|
||||
// db.headmtx.RUnlock()
|
||||
|
||||
for _, b := range app {
|
||||
a.heads = append(a.heads, &metaAppender{
|
||||
meta: b.Meta(),
|
||||
app: b.Appender(),
|
||||
})
|
||||
}
|
||||
// for _, b := range app {
|
||||
// a.heads = append(a.heads, &metaAppender{
|
||||
// meta: b.Meta(),
|
||||
// app: b.Appender(),
|
||||
// })
|
||||
// }
|
||||
|
||||
return a
|
||||
// return a
|
||||
}
|
||||
|
||||
type dbAppender struct {
|
||||
|
@ -512,6 +511,15 @@ type dbAppender struct {
|
|||
samples int
|
||||
}
|
||||
|
||||
func (a *dbAppender) getAppender(ulid string) (*metaAppender, bool) {
|
||||
for _, h := range a.heads {
|
||||
if string(h.meta.ULID[:]) == ulid {
|
||||
return h, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
type metaAppender struct {
|
||||
meta BlockMeta
|
||||
app Appender
|
||||
|
@ -532,18 +540,20 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error)
|
|||
}
|
||||
|
||||
func (a *dbAppender) AddFast(ref string, t int64, v float64) error {
|
||||
// Load the head last byte of the head sequence from the 3rd byte of the
|
||||
// reference number.
|
||||
// gen := (ref << 16) >> 56
|
||||
|
||||
h, err := a.appenderFor(t)
|
||||
if err != nil {
|
||||
return err
|
||||
if len(ref) < 16 {
|
||||
return errors.Wrap(ErrNotFound, "invalid ref length")
|
||||
}
|
||||
if yoloString(h.meta.ULID[:]) != ref[:16] {
|
||||
return errors.Wrap(ErrNotFound, "unexpected ULID")
|
||||
// The first 16 bytes a ref hold the ULID of the head block.
|
||||
h, ok := a.getAppender(ref[:16])
|
||||
if !ok {
|
||||
return errors.Wrapf(ErrNotFound, "no block for ULID %s", ref[:16])
|
||||
}
|
||||
if err := h.app.AddFast(ref[16:], t, v); err != nil {
|
||||
// The block the ref points to might fit the given timestamp.
|
||||
// We mask the error to stick with our contract.
|
||||
if errors.Cause(err) == ErrOutOfBounds {
|
||||
err = ErrNotFound
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -554,84 +564,74 @@ func (a *dbAppender) AddFast(ref string, t int64, v float64) error {
|
|||
// appenderFor gets the appender for the head containing timestamp t.
|
||||
// If the head block doesn't exist yet, it gets created.
|
||||
func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) {
|
||||
// If there's no fitting head block for t, ensure it gets created.
|
||||
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
|
||||
a.db.headmtx.Lock()
|
||||
|
||||
var newHeads []headBlock
|
||||
|
||||
if err := a.db.ensureHead(t); err != nil {
|
||||
a.db.headmtx.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
if len(a.heads) == 0 {
|
||||
newHeads = append(newHeads, a.db.appendable()...)
|
||||
} else {
|
||||
maxSeq := a.heads[len(a.heads)-1].meta.Sequence
|
||||
for _, b := range a.db.appendable() {
|
||||
if b.Meta().Sequence > maxSeq {
|
||||
newHeads = append(newHeads, b)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
a.db.headmtx.Unlock()
|
||||
|
||||
// XXX(fabxc): temporary workaround. See comment on instantiating DB.Appender.
|
||||
// for _, b := range newHeads {
|
||||
// // Only get appender for the block with the specific timestamp.
|
||||
// if t >= b.Meta().MaxTime {
|
||||
// continue
|
||||
// }
|
||||
// a.heads = append(a.heads, &metaAppender{
|
||||
// app: b.Appender(),
|
||||
// meta: b.Meta(),
|
||||
// })
|
||||
// break
|
||||
// }
|
||||
|
||||
// Instantiate appenders after returning headmtx to avoid questionable
|
||||
// locking order.
|
||||
for _, b := range newHeads {
|
||||
a.heads = append(a.heads, &metaAppender{
|
||||
app: b.Appender(),
|
||||
meta: b.Meta(),
|
||||
})
|
||||
}
|
||||
}
|
||||
for i := len(a.heads) - 1; i >= 0; i-- {
|
||||
if h := a.heads[i]; t >= h.meta.MinTime {
|
||||
for _, h := range a.heads {
|
||||
if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) {
|
||||
return h, nil
|
||||
}
|
||||
}
|
||||
// Currently opened appenders do not cover t. Ensure the head block is
|
||||
// created and add missing appenders.
|
||||
a.db.headmtx.Lock()
|
||||
|
||||
return nil, ErrNotFound
|
||||
if err := a.db.ensureHead(t); err != nil {
|
||||
a.db.headmtx.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var hb headBlock
|
||||
for _, h := range a.db.appendable() {
|
||||
m := h.Meta()
|
||||
|
||||
if intervalContains(m.MinTime, m.MaxTime-1, t) {
|
||||
hb = h
|
||||
break
|
||||
}
|
||||
}
|
||||
a.db.headmtx.Unlock()
|
||||
|
||||
if hb == nil {
|
||||
return nil, ErrOutOfBounds
|
||||
}
|
||||
// Instantiate appender after returning headmtx!
|
||||
app := &metaAppender{
|
||||
meta: hb.Meta(),
|
||||
app: hb.Appender(),
|
||||
}
|
||||
a.heads = append(a.heads, app)
|
||||
|
||||
return app, nil
|
||||
}
|
||||
|
||||
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
|
||||
mint = (t / width) * width
|
||||
return mint, mint + width
|
||||
}
|
||||
|
||||
// ensureHead makes sure that there is a head block for the timestamp t if
|
||||
// it is within or after the currently appendable window.
|
||||
func (db *DB) ensureHead(t int64) error {
|
||||
// Initial case for a new database: we must create the first
|
||||
// AppendableBlocks-1 front padding heads.
|
||||
if len(db.heads) == 0 {
|
||||
for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- {
|
||||
if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
mint, maxt := rangeForTimestamp(t, int64(db.opts.MinBlockDuration))
|
||||
|
||||
for {
|
||||
h := db.heads[len(db.heads)-1]
|
||||
m := h.Meta()
|
||||
// If t doesn't exceed the range of heads blocks, there's nothing to do.
|
||||
if t < m.MaxTime {
|
||||
return nil
|
||||
}
|
||||
if _, err := db.cut(m.MaxTime); err != nil {
|
||||
// Initial case with an empty database. t is the first timestamp we ever received.
|
||||
// Create an additional buffering block in front.
|
||||
if len(db.blocks) == 0 {
|
||||
if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil {
|
||||
return err
|
||||
}
|
||||
// If the previous block reaches into our new window, make it smaller.
|
||||
} else if mt := db.blocks[len(db.blocks)-1].Meta().MaxTime; mt > mint {
|
||||
mint = mt
|
||||
}
|
||||
if mint >= maxt {
|
||||
return nil
|
||||
}
|
||||
// Error if the requested time for a head is before the appendable window.
|
||||
if len(db.heads) > 0 && t < db.heads[0].Meta().MinTime {
|
||||
return ErrOutOfBounds
|
||||
}
|
||||
|
||||
_, err := db.createHeadBlock(mint, maxt)
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *dbAppender) Commit() error {
|
||||
|
@ -669,14 +669,15 @@ func (a *dbAppender) Rollback() error {
|
|||
}
|
||||
|
||||
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
|
||||
func (db *DB) appendable() []headBlock {
|
||||
var i int
|
||||
app := make([]headBlock, 0, db.opts.AppendableBlocks)
|
||||
|
||||
if len(db.heads) > db.opts.AppendableBlocks {
|
||||
i = len(db.heads) - db.opts.AppendableBlocks
|
||||
func (db *DB) appendable() (r []headBlock) {
|
||||
switch len(db.heads) {
|
||||
case 0:
|
||||
case 1:
|
||||
r = append(r, db.heads[0])
|
||||
default:
|
||||
r = append(r, db.heads[len(db.heads)-2:]...)
|
||||
}
|
||||
return append(app, db.heads[i:]...)
|
||||
return r
|
||||
}
|
||||
|
||||
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
||||
|
@ -711,7 +712,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
|
|||
// openHeadBlock opens the head block at dir.
|
||||
func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
|
||||
var (
|
||||
wdir = filepath.Join(dir, "wal")
|
||||
wdir = walDir(dir)
|
||||
l = log.With(db.logger, "wal", wdir)
|
||||
)
|
||||
wal, err := OpenSegmentWAL(wdir, l, 5*time.Second)
|
||||
|
@ -726,16 +727,10 @@ func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
|
|||
return h, nil
|
||||
}
|
||||
|
||||
// cut starts a new head block to append to. The completed head block
|
||||
// will still be appendable for the configured grace period.
|
||||
func (db *DB) cut(mint int64) (headBlock, error) {
|
||||
maxt := mint + int64(db.opts.MinBlockDuration)
|
||||
|
||||
dir, seq, err := nextSequenceFile(db.dir, "b-")
|
||||
// createHeadBlock starts a new head block to append to.
|
||||
func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) {
|
||||
dir, err := TouchHeadBlock(db.dir, mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil {
|
||||
return nil, errors.Wrapf(err, "touch head block %s", dir)
|
||||
}
|
||||
newHead, err := db.openHeadBlock(dir)
|
||||
|
@ -758,13 +753,8 @@ func isBlockDir(fi os.FileInfo) bool {
|
|||
if !fi.IsDir() {
|
||||
return false
|
||||
}
|
||||
if !strings.HasPrefix(fi.Name(), "b-") {
|
||||
return false
|
||||
}
|
||||
if _, err := strconv.ParseUint(fi.Name()[2:], 10, 32); err != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
_, err := ulid.Parse(fi.Name())
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func blockDirs(dir string) ([]string, error) {
|
||||
|
|
18
db_test.go
18
db_test.go
|
@ -108,22 +108,22 @@ func TestDBAppenderAddRef(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
app := db.Appender()
|
||||
defer app.Rollback()
|
||||
app1 := db.Appender()
|
||||
|
||||
ref, err := app.Add(labels.FromStrings("a", "b"), 0, 0)
|
||||
ref, err := app1.Add(labels.FromStrings("a", "b"), 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
// When a series is first created, refs don't work within that transaction.
|
||||
err = app.AddFast(ref, 1, 1)
|
||||
err = app1.AddFast(ref, 1, 1)
|
||||
require.EqualError(t, errors.Cause(err), ErrNotFound.Error())
|
||||
|
||||
err = app.Commit()
|
||||
err = app1.Commit()
|
||||
require.NoError(t, err)
|
||||
|
||||
app = db.Appender()
|
||||
app2 := db.Appender()
|
||||
defer app2.Rollback()
|
||||
|
||||
ref, err = app.Add(labels.FromStrings("a", "b"), 1, 1)
|
||||
ref, err = app2.Add(labels.FromStrings("a", "b"), 1, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Ref must be prefixed with block ULID of the block we wrote to.
|
||||
|
@ -131,13 +131,13 @@ func TestDBAppenderAddRef(t *testing.T) {
|
|||
require.Equal(t, string(id[:]), ref[:16])
|
||||
|
||||
// Reference must be valid to add another sample.
|
||||
err = app.AddFast(ref, 2, 2)
|
||||
err = app2.AddFast(ref, 2, 2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// AddFast for the same timestamp must fail if the generation in the reference
|
||||
// doesn't add up.
|
||||
refb := []byte(ref)
|
||||
refb[15] ^= refb[15]
|
||||
err = app.AddFast(string(refb), 1, 1)
|
||||
err = app2.AddFast(string(refb), 1, 1)
|
||||
require.EqualError(t, errors.Cause(err), ErrNotFound.Error())
|
||||
}
|
||||
|
|
37
head.go
37
head.go
|
@ -18,6 +18,7 @@ import (
|
|||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -73,30 +74,30 @@ type HeadBlock struct {
|
|||
|
||||
// TouchHeadBlock atomically touches a new head block in dir for
|
||||
// samples in the range [mint,maxt).
|
||||
func TouchHeadBlock(dir string, seq int, mint, maxt int64) error {
|
||||
// Make head block creation appear atomic.
|
||||
tmp := dir + ".tmp"
|
||||
|
||||
if err := os.MkdirAll(tmp, 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
func TouchHeadBlock(dir string, mint, maxt int64) (string, error) {
|
||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
ulid, err := ulid.New(ulid.Now(), entropy)
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Make head block creation appear atomic.
|
||||
dir = filepath.Join(dir, ulid.String())
|
||||
tmp := dir + ".tmp"
|
||||
|
||||
if err := os.MkdirAll(tmp, 0777); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err := writeMetaFile(tmp, &BlockMeta{
|
||||
ULID: ulid,
|
||||
Sequence: seq,
|
||||
MinTime: mint,
|
||||
MaxTime: maxt,
|
||||
ULID: ulid,
|
||||
MinTime: mint,
|
||||
MaxTime: maxt,
|
||||
}); err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
return renameFile(tmp, dir)
|
||||
return dir, renameFile(tmp, dir)
|
||||
}
|
||||
|
||||
// OpenHeadBlock opens the head block in dir.
|
||||
|
@ -150,7 +151,7 @@ func (h *HeadBlock) inBounds(t int64) bool {
|
|||
}
|
||||
|
||||
func (h *HeadBlock) String() string {
|
||||
return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID)
|
||||
return h.meta.ULID.String()
|
||||
}
|
||||
|
||||
// Close syncs all data and closes underlying resources of the head block.
|
||||
|
@ -182,7 +183,6 @@ func (h *HeadBlock) Close() error {
|
|||
func (h *HeadBlock) Meta() BlockMeta {
|
||||
m := BlockMeta{
|
||||
ULID: h.meta.ULID,
|
||||
Sequence: h.meta.Sequence,
|
||||
MinTime: h.meta.MinTime,
|
||||
MaxTime: h.meta.MaxTime,
|
||||
Compaction: h.meta.Compaction,
|
||||
|
@ -337,6 +337,9 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, erro
|
|||
var nullRef = string([]byte{0, 0, 0, 0, 0, 0, 0, 0})
|
||||
|
||||
func (a *headAppender) AddFast(ref string, t int64, v float64) error {
|
||||
if len(ref) != 8 {
|
||||
return errors.Wrap(ErrNotFound, "invalid ref length")
|
||||
}
|
||||
var (
|
||||
refn = binary.BigEndian.Uint64(yoloBytes(ref))
|
||||
id = (refn << 1) >> 1
|
||||
|
|
|
@ -33,7 +33,7 @@ import (
|
|||
|
||||
// createTestHeadBlock creates a new head block with a SegmentWAL.
|
||||
func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock {
|
||||
err := TouchHeadBlock(dir, 0, mint, maxt)
|
||||
dir, err := TouchHeadBlock(dir, mint, maxt)
|
||||
require.NoError(t, err)
|
||||
|
||||
wal, err := OpenSegmentWAL(dir, nil, 5*time.Second)
|
||||
|
|
4
wal.go
4
wal.go
|
@ -21,7 +21,6 @@ import (
|
|||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -91,7 +90,6 @@ type RefSample struct {
|
|||
}
|
||||
|
||||
const (
|
||||
walDirName = "wal"
|
||||
walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB
|
||||
)
|
||||
|
||||
|
@ -107,8 +105,6 @@ func init() {
|
|||
// OpenSegmentWAL opens or creates a write ahead log in the given directory.
|
||||
// The WAL must be read completely before new data is written.
|
||||
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) {
|
||||
dir = filepath.Join(dir, walDirName)
|
||||
|
||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue