diff --git a/block.go b/block.go index 72ebb1f8a..6c1e555f3 100644 --- a/block.go +++ b/block.go @@ -15,7 +15,6 @@ package tsdb import ( "encoding/json" - "fmt" "io/ioutil" "os" "path/filepath" @@ -73,9 +72,6 @@ type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. ULID ulid.ULID `json:"ulid"` - // Sequence number of the block. - Sequence int `json:"sequence"` - // MinTime and MaxTime specify the time range all samples // in the block are in. MinTime int64 `json:"minTime"` @@ -190,7 +186,7 @@ func (pb *persistedBlock) Close() error { } func (pb *persistedBlock) String() string { - return fmt.Sprintf("(%d, %s)", pb.meta.Sequence, pb.meta.ULID) + return pb.meta.ULID.String() } func (pb *persistedBlock) Querier(mint, maxt int64) Querier { diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 4c0b8cdaa..a12dc675d 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -28,6 +28,7 @@ import ( "time" "unsafe" + "github.com/pkg/errors" promlabels "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/tsdb" @@ -114,7 +115,6 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds - AppendableBlocks: 2, }) if err != nil { exitWithError(err) @@ -225,7 +225,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount s.ref = &ref } else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil { - if err.Error() != "not found" { + if errors.Cause(err) != tsdb.ErrNotFound { panic(err) } diff --git a/compact.go b/compact.go index 938697419..e52a080ef 100644 --- a/compact.go +++ b/compact.go @@ -20,6 +20,8 @@ import ( "path/filepath" "time" + "go4.org/sort" + "github.com/coreos/etcd/pkg/fileutil" "github.com/go-kit/kit/log" "github.com/oklog/ulid" @@ -34,10 +36,10 @@ type Compactor interface { // Plan returns a set of non-overlapping directories that can // be compacted concurrently. // Results returned when compactions are in progress are undefined. - Plan(dir string) ([][]string, error) + Plan() ([][]string, error) // Write persists a Block into a directory. - Write(dir string, b Block) error + Write(b Block) error // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). @@ -46,6 +48,7 @@ type Compactor interface { // compactor implements the Compactor interface. type compactor struct { + dir string metrics *compactorMetrics logger log.Logger opts *compactorOptions @@ -87,8 +90,9 @@ type compactorOptions struct { maxBlockRange uint64 } -func newCompactor(r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { +func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { return &compactor{ + dir: dir, opts: opts, logger: l, metrics: newCompactorMetrics(r), @@ -103,13 +107,18 @@ type compactionInfo struct { const compactionBlocksLen = 3 -func (c *compactor) Plan(dir string) ([][]string, error) { - dirs, err := blockDirs(dir) +type dirMeta struct { + dir string + meta *BlockMeta +} + +func (c *compactor) Plan() ([][]string, error) { + dirs, err := blockDirs(c.dir) if err != nil { return nil, err } - var bs []*BlockMeta + var bs []dirMeta for _, dir := range dirs { meta, err := readMetaFile(dir) @@ -117,9 +126,12 @@ func (c *compactor) Plan(dir string) ([][]string, error) { return nil, err } if meta.Compaction.Generation > 0 { - bs = append(bs, meta) + bs = append(bs, dirMeta{dir, meta}) } } + sort.Slice(bs, func(i, j int) bool { + return bs[i].meta.MinTime < bs[j].meta.MinTime + }) if len(bs) == 0 { return nil, nil @@ -128,7 +140,7 @@ func (c *compactor) Plan(dir string) ([][]string, error) { sliceDirs := func(i, j int) [][]string { var res []string for k := i; k < j; k++ { - res = append(res, dirs[k]) + res = append(res, bs[k].dir) } return [][]string{res} } @@ -143,26 +155,22 @@ func (c *compactor) Plan(dir string) ([][]string, error) { return nil, nil } -func (c *compactor) match(bs []*BlockMeta) bool { - g := bs[0].Compaction.Generation +func (c *compactor) match(bs []dirMeta) bool { + g := bs[0].meta.Compaction.Generation for _, b := range bs { - if b.Compaction.Generation != g { + if b.meta.Compaction.Generation != g { return false } } - return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange + return uint64(bs[len(bs)-1].meta.MaxTime-bs[0].meta.MinTime) <= c.opts.maxBlockRange } func mergeBlockMetas(blocks ...Block) (res BlockMeta) { m0 := blocks[0].Meta() - entropy := rand.New(rand.NewSource(time.Now().UnixNano())) - - res.Sequence = m0.Sequence res.MinTime = m0.MinTime res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime - res.ULID = ulid.MustNew(ulid.Now(), entropy) res.Compaction.Generation = m0.Compaction.Generation + 1 @@ -185,16 +193,22 @@ func (c *compactor) Compact(dirs ...string) (err error) { blocks = append(blocks, b) } - return c.write(dirs[0], blocks...) + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + + return c.write(uid, blocks...) } -func (c *compactor) Write(dir string, b Block) error { - return c.write(dir, b) +func (c *compactor) Write(b Block) error { + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + + return c.write(uid, b) } // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. -func (c *compactor) write(dir string, blocks ...Block) (err error) { +func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks)) defer func(t time.Time) { @@ -204,6 +218,7 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) { c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) + dir := filepath.Join(c.dir, uid.String()) tmp := dir + ".tmp" if err = os.RemoveAll(tmp); err != nil { @@ -229,6 +244,8 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) { if err != nil { return errors.Wrap(err, "write compaction") } + meta.ULID = uid + if err = writeMetaFile(tmp, meta); err != nil { return errors.Wrap(err, "write merged meta") } @@ -244,7 +261,7 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) { if err := renameFile(tmp, dir); err != nil { return errors.Wrap(err, "rename block dir") } - for _, b := range blocks[1:] { + for _, b := range blocks { if err := os.RemoveAll(b.Dir()); err != nil { return err } diff --git a/db.go b/db.go index 6031e7aff..56a13e268 100644 --- a/db.go +++ b/db.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "runtime" + "sort" "strconv" "strings" "sync" @@ -33,6 +34,7 @@ import ( "github.com/coreos/etcd/pkg/fileutil" "github.com/go-kit/kit/log" "github.com/nightlyone/lockfile" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/labels" @@ -45,7 +47,6 @@ var DefaultOptions = &Options{ RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds - AppendableBlocks: 2, NoLockfile: false, } @@ -64,13 +65,6 @@ type Options struct { // The maximum timestamp range of compacted blocks. MaxBlockDuration uint64 - // Number of head blocks that can be appended to. - // Should be two or higher to prevent write errors in general scenarios. - // - // After a new block is started for timestamp t0 or higher, appends with - // timestamps as early as t0 - (n-1) * MinBlockDuration are valid. - AppendableBlocks int - // NoLockfile disables creation and consideration of a lock file. NoLockfile bool } @@ -159,11 +153,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db return nil, err } - absdir, err := filepath.Abs(dir) - if err != nil { - return nil, err - } - if l == nil { l = log.NewLogfmtLogger(os.Stdout) l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) @@ -172,9 +161,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if opts == nil { opts = DefaultOptions } - if opts.AppendableBlocks < 1 { - return nil, errors.Errorf("AppendableBlocks must be greater than 0") - } db = &DB{ dir: dir, @@ -186,6 +172,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db stopc: make(chan struct{}), } if !opts.NoLockfile { + absdir, err := filepath.Abs(dir) + if err != nil { + return nil, err + } lockf, err := lockfile.New(filepath.Join(absdir, "lock")) if err != nil { return nil, err @@ -196,7 +186,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = &lockf } - db.compactor = newCompactor(r, l, &compactorOptions{ + db.compactor = newCompactor(dir, r, l, &compactorOptions{ maxBlockRange: opts.MaxBlockDuration, }) @@ -281,8 +271,8 @@ func (db *DB) compact() (changes bool, err error) { // returning the lock to not block Appenders. // Selected blocks are semantically ensured to not be written to afterwards // by appendable(). - if len(db.heads) > db.opts.AppendableBlocks { - for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { + if len(db.heads) > 2 { + for _, h := range db.heads[:len(db.heads)-2] { // Blocks that won't be appendable when instantiating a new appender // might still have active appenders on them. // Abort at the first one we encounter. @@ -302,7 +292,7 @@ func (db *DB) compact() (changes bool, err error) { default: } - if err = db.compactor.Write(h.Dir(), h); err != nil { + if err = db.compactor.Write(h); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true @@ -311,7 +301,7 @@ func (db *DB) compact() (changes bool, err error) { // Check for compactions of multiple blocks. for { - plans, err := db.compactor.Plan(db.dir) + plans, err := db.compactor.Plan() if err != nil { return changes, errors.Wrap(err, "plan compaction") } @@ -375,9 +365,9 @@ func retentionCutoff(dir string, mint int64) (bool, error) { return changes, fileutil.Fsync(df) } -func (db *DB) seqBlock(i int) (Block, bool) { +func (db *DB) getBlock(id ulid.ULID) (Block, bool) { for _, b := range db.blocks { - if b.Meta().Sequence == i { + if b.Meta().ULID == id { return b, true } } @@ -399,10 +389,8 @@ func (db *DB) reloadBlocks() error { return errors.Wrap(err, "find blocks") } var ( - metas []*BlockMeta - blocks []Block - heads []headBlock - seqBlocks = make(map[int]Block, len(dirs)) + blocks []Block + exist = map[ulid.ULID]struct{}{} ) for _, dir := range dirs { @@ -410,47 +398,58 @@ func (db *DB) reloadBlocks() error { if err != nil { return errors.Wrapf(err, "read meta information %s", dir) } - metas = append(metas, meta) - } - for i, meta := range metas { - b, ok := db.seqBlock(meta.Sequence) - - if meta.Compaction.Generation == 0 { - if !ok { - b, err = db.openHeadBlock(dirs[i]) - if err != nil { - return errors.Wrapf(err, "load head at %s", dirs[i]) - } + b, ok := db.getBlock(meta.ULID) + if !ok { + if meta.Compaction.Generation == 0 { + b, err = db.openHeadBlock(dir) + } else { + b, err = newPersistedBlock(dir) } - if meta.ULID != b.Meta().ULID { - return errors.Errorf("head block ULID changed unexpectedly") - } - heads = append(heads, b.(headBlock)) - } else { - if !ok || meta.ULID != b.Meta().ULID { - b, err = newPersistedBlock(dirs[i]) - if err != nil { - return errors.Wrapf(err, "open persisted block %s", dirs[i]) - } + if err != nil { + return errors.Wrapf(err, "open block %s", dir) } } - seqBlocks[meta.Sequence] = b blocks = append(blocks, b) + exist[meta.ULID] = struct{}{} } - // Close all blocks that we no longer need. They are closed after returning all - // locks to avoid questionable locking order. + if err := validateBlockSequence(blocks); err != nil { + return errors.Wrap(err, "invalid block sequence") + } + // Close all opened blocks that no longer exist after we returned all locks. for _, b := range db.blocks { - if nb, ok := seqBlocks[b.Meta().Sequence]; !ok || nb != b { + if _, ok := exist[b.Meta().ULID]; !ok { cs = append(cs, b) } } db.blocks = blocks - db.heads = heads + db.heads = nil + for _, b := range blocks { + if b.Meta().Compaction.Generation == 0 { + db.heads = append(db.heads, b.(*HeadBlock)) + } + } + + return nil +} + +func validateBlockSequence(bs []Block) error { + if len(bs) == 0 { + return nil + } + sort.Slice(bs, func(i, j int) bool { + return bs[i].Meta().MinTime < bs[j].Meta().MinTime + }) + prev := bs[0] + for _, b := range bs[1:] { + if b.Meta().MinTime < prev.Meta().MaxTime { + return errors.Errorf("block time ranges overlap", b.Meta().MinTime, prev.Meta().MaxTime) + } + } return nil } @@ -482,27 +481,27 @@ func (db *DB) Close() error { // Appender returns a new Appender on the database. func (db *DB) Appender() Appender { db.mtx.RLock() - a := &dbAppender{db: db} + return &dbAppender{db: db} - // XXX(fabxc): turn off creating initial appender as it will happen on-demand - // anyway. For now this, with combination of only having a single timestamp per batch, - // prevents opening more than one appender and hitting an unresolved deadlock (#11). - // + // // XXX(fabxc): turn off creating initial appender as it will happen on-demand + // // anyway. For now this, with combination of only having a single timestamp per batch, + // // prevents opening more than one appender and hitting an unresolved deadlock (#11). + // // - // Only instantiate appender after returning the headmtx to avoid - // questionable locking order. - db.headmtx.RLock() - app := db.appendable() - db.headmtx.RUnlock() + // // Only instantiate appender after returning the headmtx to avoid + // // questionable locking order. + // db.headmtx.RLock() + // app := db.appendable() + // db.headmtx.RUnlock() - for _, b := range app { - a.heads = append(a.heads, &metaAppender{ - meta: b.Meta(), - app: b.Appender(), - }) - } + // for _, b := range app { + // a.heads = append(a.heads, &metaAppender{ + // meta: b.Meta(), + // app: b.Appender(), + // }) + // } - return a + // return a } type dbAppender struct { @@ -512,6 +511,15 @@ type dbAppender struct { samples int } +func (a *dbAppender) getAppender(ulid string) (*metaAppender, bool) { + for _, h := range a.heads { + if string(h.meta.ULID[:]) == ulid { + return h, true + } + } + return nil, false +} + type metaAppender struct { meta BlockMeta app Appender @@ -532,18 +540,20 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) } func (a *dbAppender) AddFast(ref string, t int64, v float64) error { - // Load the head last byte of the head sequence from the 3rd byte of the - // reference number. - // gen := (ref << 16) >> 56 - - h, err := a.appenderFor(t) - if err != nil { - return err + if len(ref) < 16 { + return errors.Wrap(ErrNotFound, "invalid ref length") } - if yoloString(h.meta.ULID[:]) != ref[:16] { - return errors.Wrap(ErrNotFound, "unexpected ULID") + // The first 16 bytes a ref hold the ULID of the head block. + h, ok := a.getAppender(ref[:16]) + if !ok { + return errors.Wrapf(ErrNotFound, "no block for ULID %s", ref[:16]) } if err := h.app.AddFast(ref[16:], t, v); err != nil { + // The block the ref points to might fit the given timestamp. + // We mask the error to stick with our contract. + if errors.Cause(err) == ErrOutOfBounds { + err = ErrNotFound + } return err } @@ -554,84 +564,74 @@ func (a *dbAppender) AddFast(ref string, t int64, v float64) error { // appenderFor gets the appender for the head containing timestamp t. // If the head block doesn't exist yet, it gets created. func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) { - // If there's no fitting head block for t, ensure it gets created. - if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { - a.db.headmtx.Lock() - - var newHeads []headBlock - - if err := a.db.ensureHead(t); err != nil { - a.db.headmtx.Unlock() - return nil, err - } - if len(a.heads) == 0 { - newHeads = append(newHeads, a.db.appendable()...) - } else { - maxSeq := a.heads[len(a.heads)-1].meta.Sequence - for _, b := range a.db.appendable() { - if b.Meta().Sequence > maxSeq { - newHeads = append(newHeads, b) - } - } - } - - a.db.headmtx.Unlock() - - // XXX(fabxc): temporary workaround. See comment on instantiating DB.Appender. - // for _, b := range newHeads { - // // Only get appender for the block with the specific timestamp. - // if t >= b.Meta().MaxTime { - // continue - // } - // a.heads = append(a.heads, &metaAppender{ - // app: b.Appender(), - // meta: b.Meta(), - // }) - // break - // } - - // Instantiate appenders after returning headmtx to avoid questionable - // locking order. - for _, b := range newHeads { - a.heads = append(a.heads, &metaAppender{ - app: b.Appender(), - meta: b.Meta(), - }) - } - } - for i := len(a.heads) - 1; i >= 0; i-- { - if h := a.heads[i]; t >= h.meta.MinTime { + for _, h := range a.heads { + if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) { return h, nil } } + // Currently opened appenders do not cover t. Ensure the head block is + // created and add missing appenders. + a.db.headmtx.Lock() - return nil, ErrNotFound + if err := a.db.ensureHead(t); err != nil { + a.db.headmtx.Unlock() + return nil, err + } + + var hb headBlock + for _, h := range a.db.appendable() { + m := h.Meta() + + if intervalContains(m.MinTime, m.MaxTime-1, t) { + hb = h + break + } + } + a.db.headmtx.Unlock() + + if hb == nil { + return nil, ErrOutOfBounds + } + // Instantiate appender after returning headmtx! + app := &metaAppender{ + meta: hb.Meta(), + app: hb.Appender(), + } + a.heads = append(a.heads, app) + + return app, nil +} + +func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { + mint = (t / width) * width + return mint, mint + width } // ensureHead makes sure that there is a head block for the timestamp t if // it is within or after the currently appendable window. func (db *DB) ensureHead(t int64) error { - // Initial case for a new database: we must create the first - // AppendableBlocks-1 front padding heads. - if len(db.heads) == 0 { - for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- { - if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil { - return err - } - } - } + mint, maxt := rangeForTimestamp(t, int64(db.opts.MinBlockDuration)) - for { - h := db.heads[len(db.heads)-1] - m := h.Meta() - // If t doesn't exceed the range of heads blocks, there's nothing to do. - if t < m.MaxTime { - return nil - } - if _, err := db.cut(m.MaxTime); err != nil { + // Initial case with an empty database. t is the first timestamp we ever received. + // Create an additional buffering block in front. + if len(db.blocks) == 0 { + if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil { return err } + // If the previous block reaches into our new window, make it smaller. + } else if mt := db.blocks[len(db.blocks)-1].Meta().MaxTime; mt > mint { + mint = mt } + if mint >= maxt { + return nil + } + // Error if the requested time for a head is before the appendable window. + if len(db.heads) > 0 && t < db.heads[0].Meta().MinTime { + return ErrOutOfBounds + } + + _, err := db.createHeadBlock(mint, maxt) + return err } func (a *dbAppender) Commit() error { @@ -669,14 +669,15 @@ func (a *dbAppender) Rollback() error { } // appendable returns a copy of a slice of HeadBlocks that can still be appended to. -func (db *DB) appendable() []headBlock { - var i int - app := make([]headBlock, 0, db.opts.AppendableBlocks) - - if len(db.heads) > db.opts.AppendableBlocks { - i = len(db.heads) - db.opts.AppendableBlocks +func (db *DB) appendable() (r []headBlock) { + switch len(db.heads) { + case 0: + case 1: + r = append(r, db.heads[0]) + default: + r = append(r, db.heads[len(db.heads)-2:]...) } - return append(app, db.heads[i:]...) + return r } func intervalOverlap(amin, amax, bmin, bmax int64) bool { @@ -711,7 +712,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // openHeadBlock opens the head block at dir. func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { var ( - wdir = filepath.Join(dir, "wal") + wdir = walDir(dir) l = log.With(db.logger, "wal", wdir) ) wal, err := OpenSegmentWAL(wdir, l, 5*time.Second) @@ -726,16 +727,10 @@ func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { return h, nil } -// cut starts a new head block to append to. The completed head block -// will still be appendable for the configured grace period. -func (db *DB) cut(mint int64) (headBlock, error) { - maxt := mint + int64(db.opts.MinBlockDuration) - - dir, seq, err := nextSequenceFile(db.dir, "b-") +// createHeadBlock starts a new head block to append to. +func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) { + dir, err := TouchHeadBlock(db.dir, mint, maxt) if err != nil { - return nil, err - } - if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil { return nil, errors.Wrapf(err, "touch head block %s", dir) } newHead, err := db.openHeadBlock(dir) @@ -758,13 +753,8 @@ func isBlockDir(fi os.FileInfo) bool { if !fi.IsDir() { return false } - if !strings.HasPrefix(fi.Name(), "b-") { - return false - } - if _, err := strconv.ParseUint(fi.Name()[2:], 10, 32); err != nil { - return false - } - return true + _, err := ulid.Parse(fi.Name()) + return err == nil } func blockDirs(dir string) ([]string, error) { diff --git a/db_test.go b/db_test.go index 2f6770fff..1fa4d7270 100644 --- a/db_test.go +++ b/db_test.go @@ -108,22 +108,22 @@ func TestDBAppenderAddRef(t *testing.T) { require.NoError(t, err) defer db.Close() - app := db.Appender() - defer app.Rollback() + app1 := db.Appender() - ref, err := app.Add(labels.FromStrings("a", "b"), 0, 0) + ref, err := app1.Add(labels.FromStrings("a", "b"), 0, 0) require.NoError(t, err) // When a series is first created, refs don't work within that transaction. - err = app.AddFast(ref, 1, 1) + err = app1.AddFast(ref, 1, 1) require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) - err = app.Commit() + err = app1.Commit() require.NoError(t, err) - app = db.Appender() + app2 := db.Appender() + defer app2.Rollback() - ref, err = app.Add(labels.FromStrings("a", "b"), 1, 1) + ref, err = app2.Add(labels.FromStrings("a", "b"), 1, 1) require.NoError(t, err) // Ref must be prefixed with block ULID of the block we wrote to. @@ -131,13 +131,13 @@ func TestDBAppenderAddRef(t *testing.T) { require.Equal(t, string(id[:]), ref[:16]) // Reference must be valid to add another sample. - err = app.AddFast(ref, 2, 2) + err = app2.AddFast(ref, 2, 2) require.NoError(t, err) // AddFast for the same timestamp must fail if the generation in the reference // doesn't add up. refb := []byte(ref) refb[15] ^= refb[15] - err = app.AddFast(string(refb), 1, 1) + err = app2.AddFast(string(refb), 1, 1) require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) } diff --git a/head.go b/head.go index 4a195a795..f30a934a5 100644 --- a/head.go +++ b/head.go @@ -18,6 +18,7 @@ import ( "math" "math/rand" "os" + "path/filepath" "sort" "sync" "sync/atomic" @@ -73,30 +74,30 @@ type HeadBlock struct { // TouchHeadBlock atomically touches a new head block in dir for // samples in the range [mint,maxt). -func TouchHeadBlock(dir string, seq int, mint, maxt int64) error { - // Make head block creation appear atomic. - tmp := dir + ".tmp" - - if err := os.MkdirAll(tmp, 0777); err != nil { - return err - } - +func TouchHeadBlock(dir string, mint, maxt int64) (string, error) { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) ulid, err := ulid.New(ulid.Now(), entropy) if err != nil { - return err + return "", err + } + + // Make head block creation appear atomic. + dir = filepath.Join(dir, ulid.String()) + tmp := dir + ".tmp" + + if err := os.MkdirAll(tmp, 0777); err != nil { + return "", err } if err := writeMetaFile(tmp, &BlockMeta{ - ULID: ulid, - Sequence: seq, - MinTime: mint, - MaxTime: maxt, + ULID: ulid, + MinTime: mint, + MaxTime: maxt, }); err != nil { - return err + return "", err } - return renameFile(tmp, dir) + return dir, renameFile(tmp, dir) } // OpenHeadBlock opens the head block in dir. @@ -150,7 +151,7 @@ func (h *HeadBlock) inBounds(t int64) bool { } func (h *HeadBlock) String() string { - return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID) + return h.meta.ULID.String() } // Close syncs all data and closes underlying resources of the head block. @@ -182,7 +183,6 @@ func (h *HeadBlock) Close() error { func (h *HeadBlock) Meta() BlockMeta { m := BlockMeta{ ULID: h.meta.ULID, - Sequence: h.meta.Sequence, MinTime: h.meta.MinTime, MaxTime: h.meta.MaxTime, Compaction: h.meta.Compaction, @@ -337,6 +337,9 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, erro var nullRef = string([]byte{0, 0, 0, 0, 0, 0, 0, 0}) func (a *headAppender) AddFast(ref string, t int64, v float64) error { + if len(ref) != 8 { + return errors.Wrap(ErrNotFound, "invalid ref length") + } var ( refn = binary.BigEndian.Uint64(yoloBytes(ref)) id = (refn << 1) >> 1 diff --git a/head_test.go b/head_test.go index aa9138060..a6df15079 100644 --- a/head_test.go +++ b/head_test.go @@ -33,7 +33,7 @@ import ( // createTestHeadBlock creates a new head block with a SegmentWAL. func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock { - err := TouchHeadBlock(dir, 0, mint, maxt) + dir, err := TouchHeadBlock(dir, mint, maxt) require.NoError(t, err) wal, err := OpenSegmentWAL(dir, nil, 5*time.Second) diff --git a/wal.go b/wal.go index 251944f0b..63975de2a 100644 --- a/wal.go +++ b/wal.go @@ -21,7 +21,6 @@ import ( "io" "math" "os" - "path/filepath" "sync" "time" @@ -91,7 +90,6 @@ type RefSample struct { } const ( - walDirName = "wal" walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB ) @@ -107,8 +105,6 @@ func init() { // OpenSegmentWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) { - dir = filepath.Join(dir, walDirName) - if err := os.MkdirAll(dir, 0777); err != nil { return nil, err }