diff --git a/block.go b/block.go index 72ebb1f8a..6c1e555f3 100644 --- a/block.go +++ b/block.go @@ -15,7 +15,6 @@ package tsdb import ( "encoding/json" - "fmt" "io/ioutil" "os" "path/filepath" @@ -73,9 +72,6 @@ type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. ULID ulid.ULID `json:"ulid"` - // Sequence number of the block. - Sequence int `json:"sequence"` - // MinTime and MaxTime specify the time range all samples // in the block are in. MinTime int64 `json:"minTime"` @@ -190,7 +186,7 @@ func (pb *persistedBlock) Close() error { } func (pb *persistedBlock) String() string { - return fmt.Sprintf("(%d, %s)", pb.meta.Sequence, pb.meta.ULID) + return pb.meta.ULID.String() } func (pb *persistedBlock) Querier(mint, maxt int64) Querier { diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index e47254feb..a12dc675d 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -28,6 +28,7 @@ import ( "time" "unsafe" + "github.com/pkg/errors" promlabels "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/tsdb" @@ -114,7 +115,6 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds - AppendableBlocks: 2, }) if err != nil { exitWithError(err) @@ -197,7 +197,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount type sample struct { labels labels.Labels value int64 - ref *uint64 + ref *string } scrape := make([]*sample, 0, len(metrics)) @@ -225,7 +225,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount s.ref = &ref } else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil { - if err.Error() != "not found" { + if errors.Cause(err) != tsdb.ErrNotFound { panic(err) } diff --git a/compact.go b/compact.go index 938697419..310c162f6 100644 --- a/compact.go +++ b/compact.go @@ -18,6 +18,7 @@ import ( "math/rand" "os" "path/filepath" + "sort" "time" "github.com/coreos/etcd/pkg/fileutil" @@ -34,10 +35,10 @@ type Compactor interface { // Plan returns a set of non-overlapping directories that can // be compacted concurrently. // Results returned when compactions are in progress are undefined. - Plan(dir string) ([][]string, error) + Plan() ([][]string, error) // Write persists a Block into a directory. - Write(dir string, b Block) error + Write(b Block) error // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). @@ -46,6 +47,7 @@ type Compactor interface { // compactor implements the Compactor interface. type compactor struct { + dir string metrics *compactorMetrics logger log.Logger opts *compactorOptions @@ -87,8 +89,9 @@ type compactorOptions struct { maxBlockRange uint64 } -func newCompactor(r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { +func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { return &compactor{ + dir: dir, opts: opts, logger: l, metrics: newCompactorMetrics(r), @@ -103,13 +106,18 @@ type compactionInfo struct { const compactionBlocksLen = 3 -func (c *compactor) Plan(dir string) ([][]string, error) { - dirs, err := blockDirs(dir) +type dirMeta struct { + dir string + meta *BlockMeta +} + +func (c *compactor) Plan() ([][]string, error) { + dirs, err := blockDirs(c.dir) if err != nil { return nil, err } - var bs []*BlockMeta + var dms []dirMeta for _, dir := range dirs { meta, err := readMetaFile(dir) @@ -117,25 +125,28 @@ func (c *compactor) Plan(dir string) ([][]string, error) { return nil, err } if meta.Compaction.Generation > 0 { - bs = append(bs, meta) + dms = append(dms, dirMeta{dir, meta}) } } + sort.Slice(dms, func(i, j int) bool { + return dms[i].meta.MinTime < dms[j].meta.MinTime + }) - if len(bs) == 0 { + if len(dms) == 0 { return nil, nil } sliceDirs := func(i, j int) [][]string { var res []string for k := i; k < j; k++ { - res = append(res, dirs[k]) + res = append(res, dms[k].dir) } return [][]string{res} } // Then we care about compacting multiple blocks, starting with the oldest. - for i := 0; i < len(bs)-compactionBlocksLen+1; i++ { - if c.match(bs[i : i+3]) { + for i := 0; i < len(dms)-compactionBlocksLen+1; i++ { + if c.match(dms[i : i+3]) { return sliceDirs(i, i+compactionBlocksLen), nil } } @@ -143,26 +154,22 @@ func (c *compactor) Plan(dir string) ([][]string, error) { return nil, nil } -func (c *compactor) match(bs []*BlockMeta) bool { - g := bs[0].Compaction.Generation +func (c *compactor) match(dirs []dirMeta) bool { + g := dirs[0].meta.Compaction.Generation - for _, b := range bs { - if b.Compaction.Generation != g { + for _, d := range dirs { + if d.meta.Compaction.Generation != g { return false } } - return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange + return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange } func mergeBlockMetas(blocks ...Block) (res BlockMeta) { m0 := blocks[0].Meta() - entropy := rand.New(rand.NewSource(time.Now().UnixNano())) - - res.Sequence = m0.Sequence res.MinTime = m0.MinTime res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime - res.ULID = ulid.MustNew(ulid.Now(), entropy) res.Compaction.Generation = m0.Compaction.Generation + 1 @@ -185,16 +192,27 @@ func (c *compactor) Compact(dirs ...string) (err error) { blocks = append(blocks, b) } - return c.write(dirs[0], blocks...) + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + + return c.write(uid, blocks...) } -func (c *compactor) Write(dir string, b Block) error { - return c.write(dir, b) +func (c *compactor) Write(b Block) error { + // Buffering blocks might have been created that often have no data. + if b.Meta().Stats.NumSeries == 0 { + return errors.Wrap(os.RemoveAll(b.Dir()), "remove empty block") + } + + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + + return c.write(uid, b) } // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. -func (c *compactor) write(dir string, blocks ...Block) (err error) { +func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks)) defer func(t time.Time) { @@ -204,6 +222,7 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) { c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) + dir := filepath.Join(c.dir, uid.String()) tmp := dir + ".tmp" if err = os.RemoveAll(tmp); err != nil { @@ -229,6 +248,8 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) { if err != nil { return errors.Wrap(err, "write compaction") } + meta.ULID = uid + if err = writeMetaFile(tmp, meta); err != nil { return errors.Wrap(err, "write merged meta") } @@ -244,7 +265,7 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) { if err := renameFile(tmp, dir); err != nil { return errors.Wrap(err, "rename block dir") } - for _, b := range blocks[1:] { + for _, b := range blocks { if err := os.RemoveAll(b.Dir()); err != nil { return err } diff --git a/db.go b/db.go index c4e97abcc..a376f465e 100644 --- a/db.go +++ b/db.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "runtime" + "sort" "strconv" "strings" "sync" @@ -33,6 +34,7 @@ import ( "github.com/coreos/etcd/pkg/fileutil" "github.com/go-kit/kit/log" "github.com/nightlyone/lockfile" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/labels" @@ -45,7 +47,6 @@ var DefaultOptions = &Options{ RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds - AppendableBlocks: 2, NoLockfile: false, } @@ -64,13 +65,6 @@ type Options struct { // The maximum timestamp range of compacted blocks. MaxBlockDuration uint64 - // Number of head blocks that can be appended to. - // Should be two or higher to prevent write errors in general scenarios. - // - // After a new block is started for timestamp t0 or higher, appends with - // timestamps as early as t0 - (n-1) * MinBlockDuration are valid. - AppendableBlocks int - // NoLockfile disables creation and consideration of a lock file. NoLockfile bool } @@ -86,11 +80,11 @@ type Appender interface { // Returned reference numbers are ephemeral and may be rejected in calls // to AddFast() at any point. Adding the sample via Add() returns a new // reference number. - Add(l labels.Labels, t int64, v float64) (uint64, error) + Add(l labels.Labels, t int64, v float64) (string, error) // Add adds a sample pair for the referenced series. It is generally faster // than adding a sample by providing its full label set. - AddFast(ref uint64, t int64, v float64) error + AddFast(ref string, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error @@ -159,11 +153,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db return nil, err } - absdir, err := filepath.Abs(dir) - if err != nil { - return nil, err - } - if l == nil { l = log.NewLogfmtLogger(os.Stdout) l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) @@ -172,9 +161,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if opts == nil { opts = DefaultOptions } - if opts.AppendableBlocks < 1 { - return nil, errors.Errorf("AppendableBlocks must be greater than 0") - } db = &DB{ dir: dir, @@ -186,6 +172,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db stopc: make(chan struct{}), } if !opts.NoLockfile { + absdir, err := filepath.Abs(dir) + if err != nil { + return nil, err + } lockf, err := lockfile.New(filepath.Join(absdir, "lock")) if err != nil { return nil, err @@ -196,7 +186,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = &lockf } - db.compactor = newCompactor(r, l, &compactorOptions{ + db.compactor = newCompactor(dir, r, l, &compactorOptions{ maxBlockRange: opts.MaxBlockDuration, }) @@ -281,8 +271,8 @@ func (db *DB) compact() (changes bool, err error) { // returning the lock to not block Appenders. // Selected blocks are semantically ensured to not be written to afterwards // by appendable(). - if len(db.heads) > db.opts.AppendableBlocks { - for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { + if len(db.heads) > 2 { + for _, h := range db.heads[:len(db.heads)-2] { // Blocks that won't be appendable when instantiating a new appender // might still have active appenders on them. // Abort at the first one we encounter. @@ -302,7 +292,7 @@ func (db *DB) compact() (changes bool, err error) { default: } - if err = db.compactor.Write(h.Dir(), h); err != nil { + if err = db.compactor.Write(h); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true @@ -311,7 +301,7 @@ func (db *DB) compact() (changes bool, err error) { // Check for compactions of multiple blocks. for { - plans, err := db.compactor.Plan(db.dir) + plans, err := db.compactor.Plan() if err != nil { return changes, errors.Wrap(err, "plan compaction") } @@ -375,9 +365,9 @@ func retentionCutoff(dir string, mint int64) (bool, error) { return changes, fileutil.Fsync(df) } -func (db *DB) seqBlock(i int) (Block, bool) { +func (db *DB) getBlock(id ulid.ULID) (Block, bool) { for _, b := range db.blocks { - if b.Meta().Sequence == i { + if b.Meta().ULID == id { return b, true } } @@ -399,10 +389,8 @@ func (db *DB) reloadBlocks() error { return errors.Wrap(err, "find blocks") } var ( - metas []*BlockMeta - blocks []Block - heads []headBlock - seqBlocks = make(map[int]Block, len(dirs)) + blocks []Block + exist = map[ulid.ULID]struct{}{} ) for _, dir := range dirs { @@ -410,47 +398,58 @@ func (db *DB) reloadBlocks() error { if err != nil { return errors.Wrapf(err, "read meta information %s", dir) } - metas = append(metas, meta) - } - for i, meta := range metas { - b, ok := db.seqBlock(meta.Sequence) - - if meta.Compaction.Generation == 0 { - if !ok { - b, err = db.openHeadBlock(dirs[i]) - if err != nil { - return errors.Wrapf(err, "load head at %s", dirs[i]) - } + b, ok := db.getBlock(meta.ULID) + if !ok { + if meta.Compaction.Generation == 0 { + b, err = db.openHeadBlock(dir) + } else { + b, err = newPersistedBlock(dir) } - if meta.ULID != b.Meta().ULID { - return errors.Errorf("head block ULID changed unexpectedly") - } - heads = append(heads, b.(headBlock)) - } else { - if !ok || meta.ULID != b.Meta().ULID { - b, err = newPersistedBlock(dirs[i]) - if err != nil { - return errors.Wrapf(err, "open persisted block %s", dirs[i]) - } + if err != nil { + return errors.Wrapf(err, "open block %s", dir) } } - seqBlocks[meta.Sequence] = b blocks = append(blocks, b) + exist[meta.ULID] = struct{}{} } - // Close all blocks that we no longer need. They are closed after returning all - // locks to avoid questionable locking order. + if err := validateBlockSequence(blocks); err != nil { + return errors.Wrap(err, "invalid block sequence") + } + // Close all opened blocks that no longer exist after we returned all locks. for _, b := range db.blocks { - if nb, ok := seqBlocks[b.Meta().Sequence]; !ok || nb != b { + if _, ok := exist[b.Meta().ULID]; !ok { cs = append(cs, b) } } db.blocks = blocks - db.heads = heads + db.heads = nil + for _, b := range blocks { + if b.Meta().Compaction.Generation == 0 { + db.heads = append(db.heads, b.(*HeadBlock)) + } + } + + return nil +} + +func validateBlockSequence(bs []Block) error { + if len(bs) == 0 { + return nil + } + sort.Slice(bs, func(i, j int) bool { + return bs[i].Meta().MinTime < bs[j].Meta().MinTime + }) + prev := bs[0] + for _, b := range bs[1:] { + if b.Meta().MinTime < prev.Meta().MaxTime { + return errors.Errorf("block time ranges overlap", b.Meta().MinTime, prev.Meta().MaxTime) + } + } return nil } @@ -482,27 +481,7 @@ func (db *DB) Close() error { // Appender returns a new Appender on the database. func (db *DB) Appender() Appender { db.mtx.RLock() - a := &dbAppender{db: db} - - // XXX(fabxc): turn off creating initial appender as it will happen on-demand - // anyway. For now this, with combination of only having a single timestamp per batch, - // prevents opening more than one appender and hitting an unresolved deadlock (#11). - // - - // Only instantiate appender after returning the headmtx to avoid - // questionable locking order. - db.headmtx.RLock() - app := db.appendable() - db.headmtx.RUnlock() - - for _, b := range app { - a.heads = append(a.heads, &metaAppender{ - meta: b.Meta(), - app: b.Appender(), - }) - } - - return a + return &dbAppender{db: db} } type dbAppender struct { @@ -517,34 +496,39 @@ type metaAppender struct { app Appender } -func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - h, err := a.appenderFor(t) +func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { + h, err := a.appenderAt(t) if err != nil { - return 0, err + return "", err } ref, err := h.app.Add(lset, t, v) if err != nil { - return 0, err + return "", err } a.samples++ - // Store last byte of sequence number in 3rd byte of reference. - return ref | (uint64(h.meta.Sequence&0xff) << 40), nil + + return string(append(h.meta.ULID[:], ref...)), nil } -func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { - // Load the head last byte of the head sequence from the 3rd byte of the - // reference number. - gen := (ref << 16) >> 56 - - h, err := a.appenderFor(t) +func (a *dbAppender) AddFast(ref string, t int64, v float64) error { + if len(ref) < 16 { + return errors.Wrap(ErrNotFound, "invalid ref length") + } + // The first 16 bytes a ref hold the ULID of the head block. + h, err := a.appenderAt(t) if err != nil { return err } - // If the last byte of the sequence does not add up, the reference is not valid. - if uint64(h.meta.Sequence&0xff) != gen { + // Validate the ref points to the same block we got for t. + if string(h.meta.ULID[:]) != ref[:16] { return ErrNotFound } - if err := h.app.AddFast(ref, t, v); err != nil { + if err := h.app.AddFast(ref[16:], t, v); err != nil { + // The block the ref points to might fit the given timestamp. + // We mask the error to stick with our contract. + if errors.Cause(err) == ErrOutOfBounds { + err = ErrNotFound + } return err } @@ -554,85 +538,84 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { // appenderFor gets the appender for the head containing timestamp t. // If the head block doesn't exist yet, it gets created. -func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) { - // If there's no fitting head block for t, ensure it gets created. - if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { - a.db.headmtx.Lock() - - var newHeads []headBlock - - if err := a.db.ensureHead(t); err != nil { - a.db.headmtx.Unlock() - return nil, err - } - if len(a.heads) == 0 { - newHeads = append(newHeads, a.db.appendable()...) - } else { - maxSeq := a.heads[len(a.heads)-1].meta.Sequence - for _, b := range a.db.appendable() { - if b.Meta().Sequence > maxSeq { - newHeads = append(newHeads, b) - } - } - } - - a.db.headmtx.Unlock() - - // XXX(fabxc): temporary workaround. See comment on instantiating DB.Appender. - // for _, b := range newHeads { - // // Only get appender for the block with the specific timestamp. - // if t >= b.Meta().MaxTime { - // continue - // } - // a.heads = append(a.heads, &metaAppender{ - // app: b.Appender(), - // meta: b.Meta(), - // }) - // break - // } - - // Instantiate appenders after returning headmtx to avoid questionable - // locking order. - for _, b := range newHeads { - a.heads = append(a.heads, &metaAppender{ - app: b.Appender(), - meta: b.Meta(), - }) - } - } - for i := len(a.heads) - 1; i >= 0; i-- { - if h := a.heads[i]; t >= h.meta.MinTime { +func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) { + for _, h := range a.heads { + if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) { return h, nil } } + // Currently opened appenders do not cover t. Ensure the head block is + // created and add missing appenders. + a.db.headmtx.Lock() - return nil, ErrNotFound + if err := a.db.ensureHead(t); err != nil { + a.db.headmtx.Unlock() + return nil, err + } + + var hb headBlock + for _, h := range a.db.appendable() { + m := h.Meta() + + if intervalContains(m.MinTime, m.MaxTime-1, t) { + hb = h + break + } + } + a.db.headmtx.Unlock() + + if hb == nil { + return nil, ErrOutOfBounds + } + // Instantiate appender after returning headmtx! + app := &metaAppender{ + meta: hb.Meta(), + app: hb.Appender(), + } + a.heads = append(a.heads, app) + + return app, nil +} + +func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { + mint = (t / width) * width + return mint, mint + width } // ensureHead makes sure that there is a head block for the timestamp t if // it is within or after the currently appendable window. func (db *DB) ensureHead(t int64) error { - // Initial case for a new database: we must create the first - // AppendableBlocks-1 front padding heads. - if len(db.heads) == 0 { - for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- { - if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil { - return err - } - } - } + var ( + mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration)) + addBuffer = len(db.blocks) == 0 + last BlockMeta + ) - for { - h := db.heads[len(db.heads)-1] - m := h.Meta() - // If t doesn't exceed the range of heads blocks, there's nothing to do. - if t < m.MaxTime { - return nil - } - if _, err := db.cut(m.MaxTime); err != nil { + if !addBuffer { + last = db.blocks[len(db.blocks)-1].Meta() + addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration) + } + // Create another block of buffer in front if the DB is initialized or retrieving + // new data after a long gap. + // This ensures we always have a full block width if append window. + if addBuffer { + if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil { return err } + // If the previous block reaches into our new window, make it smaller. + } else if mt := last.MaxTime; mt > mint { + mint = mt } + if mint >= maxt { + return nil + } + // Error if the requested time for a head is before the appendable window. + if len(db.heads) > 0 && t < db.heads[0].Meta().MinTime { + return ErrOutOfBounds + } + + _, err := db.createHeadBlock(mint, maxt) + return err } func (a *dbAppender) Commit() error { @@ -640,14 +623,22 @@ func (a *dbAppender) Commit() error { // Commits to partial appenders must be concurrent as concurrent appenders // may have conflicting locks on head appenders. - // XXX(fabxc): is this a leaky abstraction? Should make an effort to catch a multi-error? - var g errgroup.Group + // For high-throughput use cases the errgroup causes significant blocking. Typically, + // we just deal with a single appender and special case it. + var err error - for _, h := range a.heads { - g.Go(h.app.Commit) + switch len(a.heads) { + case 1: + err = a.heads[0].app.Commit() + default: + var g errgroup.Group + for _, h := range a.heads { + g.Go(h.app.Commit) + } + err = g.Wait() } - if err := g.Wait(); err != nil { + if err != nil { return err } // XXX(fabxc): Push the metric down into head block to account properly @@ -670,14 +661,15 @@ func (a *dbAppender) Rollback() error { } // appendable returns a copy of a slice of HeadBlocks that can still be appended to. -func (db *DB) appendable() []headBlock { - var i int - app := make([]headBlock, 0, db.opts.AppendableBlocks) - - if len(db.heads) > db.opts.AppendableBlocks { - i = len(db.heads) - db.opts.AppendableBlocks +func (db *DB) appendable() (r []headBlock) { + switch len(db.heads) { + case 0: + case 1: + r = append(r, db.heads[0]) + default: + r = append(r, db.heads[len(db.heads)-2:]...) } - return append(app, db.heads[i:]...) + return r } func intervalOverlap(amin, amax, bmin, bmax int64) bool { @@ -712,7 +704,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // openHeadBlock opens the head block at dir. func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { var ( - wdir = filepath.Join(dir, "wal") + wdir = walDir(dir) l = log.With(db.logger, "wal", wdir) ) wal, err := OpenSegmentWAL(wdir, l, 5*time.Second) @@ -727,16 +719,10 @@ func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { return h, nil } -// cut starts a new head block to append to. The completed head block -// will still be appendable for the configured grace period. -func (db *DB) cut(mint int64) (headBlock, error) { - maxt := mint + int64(db.opts.MinBlockDuration) - - dir, seq, err := nextSequenceFile(db.dir, "b-") +// createHeadBlock starts a new head block to append to. +func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) { + dir, err := TouchHeadBlock(db.dir, mint, maxt) if err != nil { - return nil, err - } - if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil { return nil, errors.Wrapf(err, "touch head block %s", dir) } newHead, err := db.openHeadBlock(dir) @@ -759,13 +745,8 @@ func isBlockDir(fi os.FileInfo) bool { if !fi.IsDir() { return false } - if !strings.HasPrefix(fi.Name(), "b-") { - return false - } - if _, err := strconv.ParseUint(fi.Name()[2:], 10, 32); err != nil { - return false - } - return true + _, err := ulid.Parse(fi.Name()) + return err == nil } func blockDirs(dir string) ([]string, error) { @@ -870,9 +851,8 @@ func (es MultiError) Err() error { return es } -func yoloString(b []byte) string { - return *((*string)(unsafe.Pointer(&b))) -} +func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } +func yoloBytes(s string) []byte { return *((*[]byte)(unsafe.Pointer(&s))) } func closeAll(cs ...io.Closer) error { var merr MultiError diff --git a/db_test.go b/db_test.go index f3b2dca2a..1fa4d7270 100644 --- a/db_test.go +++ b/db_test.go @@ -18,6 +18,7 @@ import ( "os" "testing" + "github.com/pkg/errors" "github.com/prometheus/tsdb/labels" "github.com/stretchr/testify/require" ) @@ -107,25 +108,36 @@ func TestDBAppenderAddRef(t *testing.T) { require.NoError(t, err) defer db.Close() - app := db.Appender() - defer app.Rollback() + app1 := db.Appender() - ref, err := app.Add(labels.FromStrings("a", "b"), 0, 0) + ref, err := app1.Add(labels.FromStrings("a", "b"), 0, 0) require.NoError(t, err) - // Head sequence number should be in 3rd MSB and be greater than 0. - gen := (ref << 16) >> 56 - require.True(t, gen > 1) + // When a series is first created, refs don't work within that transaction. + err = app1.AddFast(ref, 1, 1) + require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) + + err = app1.Commit() + require.NoError(t, err) + + app2 := db.Appender() + defer app2.Rollback() + + ref, err = app2.Add(labels.FromStrings("a", "b"), 1, 1) + require.NoError(t, err) + + // Ref must be prefixed with block ULID of the block we wrote to. + id := db.blocks[len(db.blocks)-1].Meta().ULID + require.Equal(t, string(id[:]), ref[:16]) // Reference must be valid to add another sample. - err = app.AddFast(ref, 1, 1) + err = app2.AddFast(ref, 2, 2) require.NoError(t, err) // AddFast for the same timestamp must fail if the generation in the reference // doesn't add up. - refBad := ref | ((gen + 1) << 4) - err = app.AddFast(refBad, 1, 1) - require.Error(t, err) - - require.Equal(t, 2, app.(*dbAppender).samples) + refb := []byte(ref) + refb[15] ^= refb[15] + err = app2.AddFast(string(refb), 1, 1) + require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) } diff --git a/head.go b/head.go index b71bbafc0..f30a934a5 100644 --- a/head.go +++ b/head.go @@ -18,11 +18,14 @@ import ( "math" "math/rand" "os" + "path/filepath" "sort" "sync" "sync/atomic" "time" + "encoding/binary" + "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -71,30 +74,30 @@ type HeadBlock struct { // TouchHeadBlock atomically touches a new head block in dir for // samples in the range [mint,maxt). -func TouchHeadBlock(dir string, seq int, mint, maxt int64) error { - // Make head block creation appear atomic. - tmp := dir + ".tmp" - - if err := os.MkdirAll(tmp, 0777); err != nil { - return err - } - +func TouchHeadBlock(dir string, mint, maxt int64) (string, error) { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) ulid, err := ulid.New(ulid.Now(), entropy) if err != nil { - return err + return "", err + } + + // Make head block creation appear atomic. + dir = filepath.Join(dir, ulid.String()) + tmp := dir + ".tmp" + + if err := os.MkdirAll(tmp, 0777); err != nil { + return "", err } if err := writeMetaFile(tmp, &BlockMeta{ - ULID: ulid, - Sequence: seq, - MinTime: mint, - MaxTime: maxt, + ULID: ulid, + MinTime: mint, + MaxTime: maxt, }); err != nil { - return err + return "", err } - return renameFile(tmp, dir) + return dir, renameFile(tmp, dir) } // OpenHeadBlock opens the head block in dir. @@ -148,7 +151,7 @@ func (h *HeadBlock) inBounds(t int64) bool { } func (h *HeadBlock) String() string { - return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID) + return h.meta.ULID.String() } // Close syncs all data and closes underlying resources of the head block. @@ -176,10 +179,10 @@ func (h *HeadBlock) Close() error { return nil } +// Meta returns a BlockMeta for the head block. func (h *HeadBlock) Meta() BlockMeta { m := BlockMeta{ ULID: h.meta.ULID, - Sequence: h.meta.Sequence, MinTime: h.meta.MinTime, MaxTime: h.meta.MaxTime, Compaction: h.meta.Compaction, @@ -192,11 +195,16 @@ func (h *HeadBlock) Meta() BlockMeta { return m } -func (h *HeadBlock) Dir() string { return h.dir } -func (h *HeadBlock) Persisted() bool { return false } -func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } +// Dir returns the directory of the block. +func (h *HeadBlock) Dir() string { return h.dir } + +// Index returns an IndexReader against the block. +func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } + +// Chunks returns a ChunkReader against the block. func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } +// Querier returns a new Querier against the block for the range [mint, maxt]. func (h *HeadBlock) Querier(mint, maxt int64) Querier { h.mtx.RLock() defer h.mtx.RUnlock() @@ -236,6 +244,7 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier { } } +// Appender returns a new Appender against the head block. func (h *HeadBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) @@ -247,6 +256,7 @@ func (h *HeadBlock) Appender() Appender { return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()} } +// Busy returns true if the block has open write transactions. func (h *HeadBlock) Busy() bool { return atomic.LoadUint64(&h.activeWriters) > 0 } @@ -268,74 +278,89 @@ func putHeadAppendBuffer(b []RefSample) { type headAppender struct { *HeadBlock - newSeries map[uint64]hashedLabels - newHashes map[uint64]uint64 - refmap map[uint64]uint64 + newSeries []*hashedLabels newLabels []labels.Labels + newHashes map[uint64]uint64 samples []RefSample } type hashedLabels struct { + ref uint64 hash uint64 labels labels.Labels } -func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { +func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { if !a.inBounds(t) { - return 0, ErrOutOfBounds + return "", ErrOutOfBounds } hash := lset.Hash() + refb := make([]byte, 8) + // Series exists already in the block. if ms := a.get(hash, lset); ms != nil { - return uint64(ms.ref), a.AddFast(uint64(ms.ref), t, v) + binary.BigEndian.PutUint64(refb, uint64(ms.ref)) + return string(refb), a.AddFast(string(refb), t, v) } + // Series was added in this transaction previously. if ref, ok := a.newHashes[hash]; ok { - return uint64(ref), a.AddFast(uint64(ref), t, v) + binary.BigEndian.PutUint64(refb, ref) + // XXX(fabxc): there's no fast path for multiple samples for the same new series + // in the same transaction. We always return the invalid empty ref. It's has not + // been a relevant use case so far and is not worth the trouble. + return nullRef, a.AddFast(string(refb), t, v) } - // We only know the actual reference after committing. We generate an - // intermediate reference only valid for this batch. - // It is indicated by the the LSB of the 4th byte being set to 1. - // We use a random ID to avoid collisions when new series are created - // in two subsequent batches. - // TODO(fabxc): Provide method for client to determine whether a ref - // is valid beyond the current transaction. - ref := uint64(rand.Int31()) | (1 << 32) - + // The series is completely new. if a.newSeries == nil { - a.newSeries = map[uint64]hashedLabels{} a.newHashes = map[uint64]uint64{} - a.refmap = map[uint64]uint64{} } - a.newSeries[ref] = hashedLabels{hash: hash, labels: lset} - a.newHashes[hash] = ref + // First sample for new series. + ref := uint64(len(a.newSeries)) - return ref, a.AddFast(ref, t, v) + a.newSeries = append(a.newSeries, &hashedLabels{ + ref: ref, + hash: hash, + labels: lset, + }) + // First bit indicates its a series created in this transaction. + ref |= (1 << 63) + + a.newHashes[hash] = ref + binary.BigEndian.PutUint64(refb, ref) + + return nullRef, a.AddFast(string(refb), t, v) } -func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { - // We only own the last 5 bytes of the reference. Anything before is - // used by higher-order appenders. We erase it to avoid issues. - ref = (ref << 24) >> 24 +var nullRef = string([]byte{0, 0, 0, 0, 0, 0, 0, 0}) +func (a *headAppender) AddFast(ref string, t int64, v float64) error { + if len(ref) != 8 { + return errors.Wrap(ErrNotFound, "invalid ref length") + } + var ( + refn = binary.BigEndian.Uint64(yoloBytes(ref)) + id = (refn << 1) >> 1 + inTx = refn&(1<<63) != 0 + ) // Distinguish between existing series and series created in // this transaction. - if ref&(1<<32) != 0 { - if _, ok := a.newSeries[ref]; !ok { - return ErrNotFound + if inTx { + if id > uint64(len(a.newSeries)-1) { + return errors.Wrap(ErrNotFound, "transaction series ID too high") } // TODO(fabxc): we also have to validate here that the // sample sequence is valid. - // We also have to revalidate it as we switch locks an create + // We also have to revalidate it as we switch locks and create // the new series. - } else if ref > uint64(len(a.series)) { - return ErrNotFound + } else if id > uint64(len(a.series)) { + return errors.Wrap(ErrNotFound, "transaction series ID too high") } else { - ms := a.series[int(ref)] + ms := a.series[id] if ms == nil { - return ErrNotFound + return errors.Wrap(ErrNotFound, "nil series") } // TODO(fabxc): memory series should be locked here already. // Only problem is release of locks in case of a rollback. @@ -356,7 +381,7 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { } a.samples = append(a.samples, RefSample{ - Ref: ref, + Ref: refn, T: t, V: v, }) @@ -375,18 +400,18 @@ func (a *headAppender) createSeries() { base1 := len(a.series) - for ref, l := range a.newSeries { + for _, l := range a.newSeries { // We switched locks and have to re-validate that the series were not // created by another goroutine in the meantime. if base1 > base0 { if ms := a.get(l.hash, l.labels); ms != nil { - a.refmap[ref] = uint64(ms.ref) + l.ref = uint64(ms.ref) continue } } // Series is still new. a.newLabels = append(a.newLabels, l.labels) - a.refmap[ref] = uint64(len(a.series)) + l.ref = uint64(len(a.series)) a.create(l.hash, l.labels) } @@ -401,11 +426,11 @@ func (a *headAppender) Commit() error { a.createSeries() + // We have to update the refs of samples for series we just created. for i := range a.samples { s := &a.samples[i] - - if s.Ref&(1<<32) > 0 { - s.Ref = a.refmap[s.Ref] + if s.Ref&(1<<63) != 0 { + s.Ref = a.newSeries[(s.Ref<<1)>>1].ref } } @@ -514,6 +539,9 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error return nil, nil, ErrNotFound } s := h.series[ref] + if s == nil { + return nil, nil, ErrNotFound + } metas := make([]*ChunkMeta, 0, len(s.chunks)) s.mtx.RLock() diff --git a/head_test.go b/head_test.go index aa9138060..a6df15079 100644 --- a/head_test.go +++ b/head_test.go @@ -33,7 +33,7 @@ import ( // createTestHeadBlock creates a new head block with a SegmentWAL. func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock { - err := TouchHeadBlock(dir, 0, mint, maxt) + dir, err := TouchHeadBlock(dir, mint, maxt) require.NoError(t, err) wal, err := OpenSegmentWAL(dir, nil, 5*time.Second) diff --git a/wal.go b/wal.go index 251944f0b..63975de2a 100644 --- a/wal.go +++ b/wal.go @@ -21,7 +21,6 @@ import ( "io" "math" "os" - "path/filepath" "sync" "time" @@ -91,7 +90,6 @@ type RefSample struct { } const ( - walDirName = "wal" walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB ) @@ -107,8 +105,6 @@ func init() { // OpenSegmentWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) { - dir = filepath.Join(dir, walDirName) - if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } diff --git a/wal_test.go b/wal_test.go index cbd6c6247..c2988e7e0 100644 --- a/wal_test.go +++ b/wal_test.go @@ -320,17 +320,13 @@ func TestWALRestoreCorrupted(t *testing.T) { require.Equal(t, 0, len(l)) require.Equal(t, []RefSample{{T: 1, V: 2}}, s) - // Truncation should happen transparently and now cause an error. + // Truncation should happen transparently and not cause an error. require.False(t, r.Next()) require.Nil(t, r.Err()) require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}})) require.NoError(t, w2.Close()) - files, err := fileutil.ReadDir(dir) - require.NoError(t, err) - require.Equal(t, 1, len(files)) - // We should see the first valid entry and the new one, everything after // is truncated. w3, err := OpenSegmentWAL(dir, logger, 0)