From 3901b6e70bcb4e93152de79be86a183fc0ec135a Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 29 Aug 2017 00:39:17 +0200 Subject: [PATCH] Remove multiple heads This changes the structure to a single WAL backed by a single head block. Parts of the head block can be compacted. This relieves us from any head amangement and greatly simplifies any consistency and isolation concerns by just having a single head. --- block.go | 48 +-- compact.go | 95 ++--- db.go | 765 ++++++++++++++++++++++----------------- db_test.go | 347 +++++++++++++++--- head.go | 782 +++++++++++++++------------------------- head_test.go | 934 +++++++++++++++++------------------------------- querier.go | 36 +- querier_test.go | 11 + wal.go | 17 +- 9 files changed, 1455 insertions(+), 1580 deletions(-) diff --git a/block.go b/block.go index 0173a4872..60a644f4b 100644 --- a/block.go +++ b/block.go @@ -26,14 +26,21 @@ import ( "github.com/prometheus/tsdb/labels" ) -// DiskBlock handles reads against a Block of time series data. type DiskBlock interface { + BlockReader + // Directory where block data is stored. Dir() string // Stats returns statistics about the block. Meta() BlockMeta + Delete(mint, maxt int64, m ...labels.Matcher) error + + Close() error +} + +type BlockReader interface { // Index returns an IndexReader over the block's data. Index() IndexReader @@ -42,31 +49,14 @@ type DiskBlock interface { // Tombstones returns a TombstoneReader over the block's deleted data. Tombstones() TombstoneReader - - // Delete deletes data from the block. - Delete(mint, maxt int64, ms ...labels.Matcher) error - - // Close releases all underlying resources of the block. - Close() error } -// Block is an interface to a DiskBlock that can also be queried. -type Block interface { - DiskBlock - Queryable - Snapshottable -} - -// headBlock is a regular block that can still be appended to. -type headBlock interface { - Block - Appendable - - // ActiveWriters returns the number of currently active appenders. - ActiveWriters() int - // HighTimestamp returns the highest currently inserted timestamp. - HighTimestamp() int64 -} +// // Block is an interface to a DiskBlock that can also be queried. +// type Block interface { +// DiskBlock +// Queryable +// Snapshottable +// } // Snapshottable defines an entity that can be backedup online. type Snapshottable interface { @@ -225,16 +215,6 @@ func (pb *persistedBlock) String() string { return pb.meta.ULID.String() } -func (pb *persistedBlock) Querier(mint, maxt int64) Querier { - return &blockQuerier{ - mint: mint, - maxt: maxt, - index: pb.Index(), - chunks: pb.Chunks(), - tombstones: pb.Tombstones(), - } -} - func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } diff --git a/compact.go b/compact.go index cb2c3131d..1ffd90a07 100644 --- a/compact.go +++ b/compact.go @@ -14,7 +14,6 @@ package tsdb import ( - "fmt" "math/rand" "os" "path/filepath" @@ -51,7 +50,7 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. - Write(dest string, b Block) error + Write(dest string, b BlockReader, mint, maxt int64) error // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). @@ -124,8 +123,6 @@ type compactionInfo struct { mint, maxt int64 } -const compactionBlocksLen = 3 - type dirMeta struct { dir string meta *BlockMeta @@ -258,9 +255,12 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { return splitDirs } -func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { - res.MinTime = blocks[0].MinTime - res.MaxTime = blocks[len(blocks)-1].MaxTime +func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { + res := &BlockMeta{ + ULID: uid, + MinTime: blocks[0].MinTime, + MaxTime: blocks[len(blocks)-1].MaxTime, + } sources := map[ulid.ULID]struct{}{} @@ -271,10 +271,6 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { for _, s := range b.Compaction.Sources { sources[s] = struct{}{} } - // If it's an in memory block, its ULID goes into the sources. - if b.Compaction.Level == 0 { - sources[b.ULID] = struct{}{} - } } res.Compaction.Level++ @@ -291,7 +287,8 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { // Compact creates a new block in the compactor's directory from the blocks in the // provided directories. func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { - var blocks []Block + var blocks []BlockReader + var metas []*BlockMeta for _, d := range dirs { b, err := newPersistedBlock(d, c.opts.chunkPool) @@ -300,31 +297,40 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { } defer b.Close() + meta, err := readMetaFile(d) + if err != nil { + return err + } + + metas = append(metas, meta) blocks = append(blocks, b) } entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) - return c.write(dest, uid, blocks...) + return c.write(dest, compactBlockMetas(uid, metas...), blocks...) } -func (c *LeveledCompactor) Write(dest string, b Block) error { - // Buffering blocks might have been created that often have no data. - if b.Meta().Stats.NumSeries == 0 { - return nil - } - +func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) - return c.write(dest, uid, b) + meta := &BlockMeta{ + ULID: uid, + MinTime: mint, + MaxTime: maxt, + } + meta.Compaction.Level = 1 + meta.Compaction.Sources = []ulid.ULID{uid} + + return c.write(dest, meta, b) } // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. -func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (err error) { - c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks)) +func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { + c.logger.Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime) defer func(t time.Time) { if err != nil { @@ -334,7 +340,7 @@ func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (e c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) - dir := filepath.Join(dest, uid.String()) + dir := filepath.Join(dest, meta.ULID.String()) tmp := dir + ".tmp" if err = os.RemoveAll(tmp); err != nil { @@ -356,11 +362,9 @@ func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (e return errors.Wrap(err, "open index writer") } - meta, err := c.populateBlock(blocks, indexw, chunkw) - if err != nil { + if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } - meta.ULID = uid if err = writeMetaFile(tmp, meta); err != nil { return errors.Wrap(err, "write merged meta") @@ -398,18 +402,16 @@ func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (e // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { +func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { var ( set compactionSet - metas []BlockMeta allSymbols = make(map[string]struct{}, 1<<16) ) for i, b := range blocks { - metas = append(metas, b.Meta()) symbols, err := b.Index().Symbols() if err != nil { - return nil, errors.Wrap(err, "read symbols") + return errors.Wrap(err, "read symbols") } for s := range symbols { allSymbols[s] = struct{}{} @@ -419,7 +421,7 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu all, err := indexr.Postings("", "") if err != nil { - return nil, err + return err } all = indexr.SortedPostings(all) @@ -431,7 +433,7 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu } set, err = newCompactionMerger(set, s) if err != nil { - return nil, err + return err } } @@ -440,11 +442,10 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu postings = &memPostings{m: make(map[term][]uint32, 512)} values = map[string]stringset{} i = uint32(0) - meta = compactBlockMetas(metas...) ) if err := indexw.AddSymbols(allSymbols); err != nil { - return nil, errors.Wrap(err, "add symbols") + return errors.Wrap(err, "add symbols") } for set.Next() { @@ -462,7 +463,7 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu newChunk := chunks.NewXORChunk() app, err := newChunk.Appender() if err != nil { - return nil, err + return err } it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} @@ -476,11 +477,11 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu } } if err := chunkw.WriteChunks(chks...); err != nil { - return nil, err + return errors.Wrap(err, "write chunks") } if err := indexw.AddSeries(i, lset, chks...); err != nil { - return nil, errors.Wrapf(err, "add series") + return errors.Wrap(err, "add series") } meta.Stats.NumChunks += uint64(len(chks)) @@ -508,7 +509,7 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu i++ } if set.Err() != nil { - return nil, set.Err() + return errors.Wrap(set.Err(), "iterate compaction set") } s := make([]string, 0, 256) @@ -519,13 +520,13 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu s = append(s, x) } if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { - return nil, err + return errors.Wrap(err, "write label index") } } for t := range postings.m { if err := indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil { - return nil, err + return errors.Wrap(err, "write postings") } } // Write a postings list containing all series. @@ -534,10 +535,10 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu all[i] = uint32(i) } if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { - return nil, err + return errors.Wrap(err, "write 'all' postings") } - return &meta, nil + return nil } type compactionSet interface { @@ -572,9 +573,12 @@ func (c *compactionSeriesSet) Next() bool { if !c.p.Next() { return false } + var err error + c.intervals = c.tombstones.Get(c.p.At()) - if c.err = c.index.Series(c.p.At(), &c.l, &c.c); c.err != nil { + if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil { + c.err = errors.Wrapf(err, "get series %d", c.p.At()) return false } @@ -593,8 +597,9 @@ func (c *compactionSeriesSet) Next() bool { for i := range c.c { chk := &c.c[i] - chk.Chunk, c.err = c.chunks.Chunk(chk.Ref) - if c.err != nil { + chk.Chunk, err = c.chunks.Chunk(chk.Ref) + if err != nil { + c.err = errors.Wrapf(err, "chunk %d not found", chk.Ref) return false } } diff --git a/db.go b/db.go index 4775c670b..dbe5e992c 100644 --- a/db.go +++ b/db.go @@ -16,9 +16,11 @@ package tsdb import ( "bytes" + "encoding/binary" "fmt" "io" "io/ioutil" + "math" "os" "path/filepath" "runtime" @@ -26,6 +28,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "unsafe" @@ -96,22 +99,19 @@ type DB struct { dir string lockf *lockfile.Lockfile - logger log.Logger - metrics *dbMetrics - opts *Options - chunkPool chunks.Pool + logger log.Logger + metrics *dbMetrics + opts *Options + chunkPool chunks.Pool + appendPool sync.Pool + compactor Compactor + wal WAL // Mutex for that must be held when modifying the general block layout. mtx sync.RWMutex - blocks []Block + blocks []DiskBlock - // Mutex that must be held when modifying just the head blocks - // or the general layout. - // mtx must be held before acquiring. - headmtx sync.RWMutex - heads []headBlock - - compactor Compactor + head *Head compactc chan struct{} donec chan struct{} @@ -187,20 +187,24 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } - if l == nil { l = log.NewLogfmtLogger(os.Stdout) l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) } - if opts == nil { opts = DefaultOptions } + wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, 10*time.Second) + if err != nil { + return nil, err + } + db = &DB{ dir: dir, logger: l, opts: opts, + wal: wal, compactc: make(chan struct{}, 1), donec: make(chan struct{}), stopc: make(chan struct{}), @@ -237,16 +241,20 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if len(copts.blockRanges) == 1 { break } - // Max overflow is restricted to 20%. copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1] } db.compactor = NewLeveledCompactor(r, l, copts) + db.head, err = NewHead(l, db.wal.Reader(), copts.blockRanges[0]) + if err != nil { + return nil, err + } if err := db.reloadBlocks(); err != nil { return nil, err } + go db.run() return db, nil @@ -260,12 +268,16 @@ func (db *DB) Dir() string { func (db *DB) run() { defer close(db.donec) - tick := time.NewTicker(30 * time.Second) - defer tick.Stop() + backoff := time.Duration(0) for { select { - case <-tick.C: + case <-db.stopc: + case <-time.After(backoff): + } + + select { + case <-time.After(1 * time.Minute): select { case db.compactc <- struct{}{}: default: @@ -273,20 +285,18 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - changes1, err := db.retentionCutoff() - if err != nil { - db.logger.Log("msg", "retention cutoff failed", "err", err) + _, err1 := db.retentionCutoff() + if err1 != nil { + db.logger.Log("msg", "retention cutoff failed", "err", err1) } - changes2, err := db.compact() - if err != nil { - db.logger.Log("msg", "compaction failed", "err", err) + _, err2 := db.compact() + if err2 != nil { + db.logger.Log("msg", "compaction failed", "err", err2) } - if changes1 || changes2 { - if err := db.reloadBlocks(); err != nil { - db.logger.Log("msg", "reloading blocks failed", "err", err) - } + if err1 != nil || err2 != nil { + exponential(backoff, 1*time.Second, 1*time.Minute) } case <-db.stopc: @@ -303,76 +313,16 @@ func (db *DB) retentionCutoff() (bool, error) { db.mtx.RLock() defer db.mtx.RUnlock() - // We only consider the already persisted blocks. Head blocks generally - // only account for a fraction of the total data. - db.headmtx.RLock() - lenp := len(db.blocks) - len(db.heads) - db.headmtx.RUnlock() - - if lenp == 0 { + if len(db.blocks) == 0 { return false, nil } - last := db.blocks[lenp-1] + last := db.blocks[len(db.blocks)-1] mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) return retentionCutoff(db.dir, mint) } -// headFullness returns up to which fraction of a blocks time range samples -// were already inserted. -func headFullness(h headBlock) float64 { - m := h.Meta() - a := float64(h.HighTimestamp() - m.MinTime) - b := float64(m.MaxTime - m.MinTime) - return a / b -} - -// appendableHeads returns a copy of a slice of HeadBlocks that can still be appended to. -func (db *DB) appendableHeads() (r []headBlock) { - switch l := len(db.heads); l { - case 0: - case 1: - r = append(r, db.heads[0]) - default: - if headFullness(db.heads[l-1]) < 0.5 { - r = append(r, db.heads[l-2]) - } - r = append(r, db.heads[l-1]) - } - return r -} - -func (db *DB) completedHeads() (r []headBlock) { - db.mtx.RLock() - defer db.mtx.RUnlock() - - db.headmtx.RLock() - defer db.headmtx.RUnlock() - - if len(db.heads) < 2 { - return nil - } - - // Select all old heads unless they still have pending appenders. - for _, h := range db.heads[:len(db.heads)-2] { - if h.ActiveWriters() > 0 { - return r - } - r = append(r, h) - } - // Add the 2nd last head if the last head is more than 50% filled. - // Compacting it early allows us to free its memory before allocating - // more for the next block and thus reduces spikes. - h0 := db.heads[len(db.heads)-1] - h1 := db.heads[len(db.heads)-2] - - if headFullness(h0) >= 0.5 && h1.ActiveWriters() == 0 { - r = append(r, h1) - } - return r -} - func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() @@ -383,20 +333,32 @@ func (db *DB) compact() (changes bool, err error) { // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. - for _, h := range db.completedHeads() { + for { select { case <-db.stopc: return changes, nil default: } + // The head has a compactable range if 1.5 level 0 ranges are between the oldest + // and newest timestamp. The 0.5 acts as a buffer of the appendable window. + if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 { + break + } + mint, maxt := rangeForTimestamp(db.head.MinTime(), db.opts.BlockRanges[0]) - if err = db.compactor.Write(db.dir, h); err != nil { + // Wrap head into a range that bounds all reads to it. + head := &rangeHead{ + head: db.head, + mint: mint, + maxt: maxt, + } + if err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true - if err := os.RemoveAll(h.Dir()); err != nil { - return changes, errors.Wrap(err, "delete compacted head block") + if err := db.reloadBlocks(); err != nil { + return changes, errors.Wrap(err, "reload blocks") } runtime.GC() } @@ -427,6 +389,10 @@ func (db *DB) compact() (changes bool, err error) { return changes, errors.Wrap(err, "delete compacted block") } } + + if err := db.reloadBlocks(); err != nil { + return changes, errors.Wrap(err, "reload blocks") + } runtime.GC() } @@ -469,7 +435,7 @@ func retentionCutoff(dir string, mint int64) (bool, error) { return changes, fileutil.Fsync(df) } -func (db *DB) getBlock(id ulid.ULID) (Block, bool) { +func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) { for _, b := range db.blocks { if b.Meta().ULID == id { return b, true @@ -490,18 +456,12 @@ func (db *DB) reloadBlocks() (err error) { var cs []io.Closer defer func() { closeAll(cs...) }() - db.mtx.Lock() - defer db.mtx.Unlock() - - db.headmtx.Lock() - defer db.headmtx.Unlock() - dirs, err := blockDirs(db.dir) if err != nil { return errors.Wrap(err, "find blocks") } var ( - blocks []Block + blocks []DiskBlock exist = map[ulid.ULID]struct{}{} ) @@ -513,11 +473,7 @@ func (db *DB) reloadBlocks() (err error) { b, ok := db.getBlock(meta.ULID) if !ok { - if meta.Compaction.Level == 0 { - b, err = db.openHeadBlock(dir) - } else { - b, err = newPersistedBlock(dir, db.chunkPool) - } + b, err = newPersistedBlock(dir, db.chunkPool) if err != nil { return errors.Wrapf(err, "open block %s", dir) } @@ -532,25 +488,40 @@ func (db *DB) reloadBlocks() (err error) { } // Close all opened blocks that no longer exist after we returned all locks. + // TODO(fabxc: probably races with querier still reading from them. Can + // we just abandon them and have the open FDs be GC'd automatically eventually? for _, b := range db.blocks { if _, ok := exist[b.Meta().ULID]; !ok { cs = append(cs, b) } } + db.mtx.Lock() db.blocks = blocks - db.heads = nil + db.mtx.Unlock() - for _, b := range blocks { - if b.Meta().Compaction.Level == 0 { - db.heads = append(db.heads, b.(*HeadBlock)) - } + // Garbage collect data in the head if the most recent persisted block + // covers data of its current time range. + if len(blocks) == 0 { + return + } + maxt := blocks[len(db.blocks)-1].Meta().MaxTime + if maxt <= db.head.MinTime() { + return + } + start := time.Now() + atomic.StoreInt64(&db.head.minTime, maxt) + db.head.gc() + db.logger.Log("msg", "head GC completed", "duration", time.Since(start)) + + if err := db.wal.Truncate(maxt); err != nil { + return errors.Wrapf(err, "truncate WAL at %d", maxt) } return nil } -func validateBlockSequence(bs []Block) error { +func validateBlockSequence(bs []DiskBlock) error { if len(bs) == 0 { return nil } @@ -584,10 +555,10 @@ func (db *DB) Close() error { var merr MultiError merr.Add(g.Wait()) + if db.lockf != nil { merr.Add(db.lockf.Unlock()) } - return merr.Err() } @@ -611,128 +582,348 @@ func (db *DB) EnableCompactions() { // Snapshot writes the current data to the directory. func (db *DB) Snapshot(dir string) error { - if dir == db.dir { - return errors.Errorf("cannot snapshot into base directory") - } - db.cmtx.Lock() - defer db.cmtx.Unlock() + // if dir == db.dir { + // return errors.Errorf("cannot snapshot into base directory") + // } + // db.cmtx.Lock() + // defer db.cmtx.Unlock() - db.mtx.Lock() // To block any appenders. - defer db.mtx.Unlock() + // db.mtx.Lock() // To block any appenders. + // defer db.mtx.Unlock() - blocks := db.blocks[:] - for _, b := range blocks { - db.logger.Log("msg", "snapshotting block", "block", b) - if err := b.Snapshot(dir); err != nil { - return errors.Wrap(err, "error snapshotting headblock") - } - } + // blocks := db.blocks[:] + // for _, b := range blocks { + // db.logger.Log("msg", "snapshotting block", "block", b) + // if err := b.Snapshot(dir); err != nil { + // return errors.Wrap(err, "error snapshotting headblock") + // } + // } return nil } +// Querier returns a new querier over the data partition for the given time range. +// A goroutine must not handle more than one open Querier. +func (db *DB) Querier(mint, maxt int64) Querier { + db.mtx.RLock() + + blocks := db.blocksForInterval(mint, maxt) + + sq := &querier{ + blocks: make([]Querier, 0, len(blocks)), + db: db, + } + for _, b := range blocks { + sq.blocks = append(sq.blocks, &blockQuerier{ + mint: mint, + maxt: maxt, + index: b.Index(), + chunks: b.Chunks(), + tombstones: b.Tombstones(), + }) + } + + return sq +} + +// initAppender is a helper to initialize the time bounds of a the head +// upon the first sample it receives. +type initAppender struct { + app Appender + db *DB +} + +func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { + if a.app != nil { + return a.app.Add(lset, t, v) + } + for { + // In the init state, the head has a high timestamp of math.MinInt64. + ht := a.db.head.MaxTime() + if ht != math.MinInt64 { + break + } + cr := a.db.opts.BlockRanges[0] + mint, _ := rangeForTimestamp(t, cr) + + atomic.CompareAndSwapInt64(&a.db.head.maxTime, ht, t) + atomic.StoreInt64(&a.db.head.minTime, mint-cr) + } + a.app = a.db.appender() + + return a.app.Add(lset, t, v) +} + +func (a *initAppender) AddFast(ref string, t int64, v float64) error { + if a.app == nil { + return ErrNotFound + } + return a.app.AddFast(ref, t, v) +} + +func (a *initAppender) Commit() error { + if a.app == nil { + return nil + } + return a.app.Commit() +} + +func (a *initAppender) Rollback() error { + if a.app == nil { + return nil + } + return a.app.Rollback() +} + // Appender returns a new Appender on the database. func (db *DB) Appender() Appender { db.metrics.activeAppenders.Inc() - db.mtx.RLock() - return &dbAppender{db: db} + // The head cache might not have a starting point yet. The init appender + // picks up the first appended timestamp as the base. + if db.head.MaxTime() == math.MinInt64 { + return &initAppender{db: db} + } + return db.appender() +} + +func (db *DB) appender() *dbAppender { + db.head.mtx.RLock() + + return &dbAppender{ + db: db, + head: db.head, + wal: db.wal, + mint: db.head.MaxTime() - db.opts.BlockRanges[0]/2, + samples: db.getAppendBuffer(), + highTimestamp: math.MinInt64, + lowTimestamp: math.MaxInt64, + } +} + +func (db *DB) getAppendBuffer() []RefSample { + b := db.appendPool.Get() + if b == nil { + return make([]RefSample, 0, 512) + } + return b.([]RefSample) +} + +func (db *DB) putAppendBuffer(b []RefSample) { + db.appendPool.Put(b[:0]) } type dbAppender struct { - db *DB - heads []*metaAppender + db *DB + head *Head + wal WAL + mint int64 - samples int + newSeries []*hashedLabels + newLabels []labels.Labels + newHashes map[uint64]uint64 + + samples []RefSample + highTimestamp int64 + lowTimestamp int64 } -type metaAppender struct { - meta BlockMeta - app Appender +type hashedLabels struct { + ref uint64 + hash uint64 + labels labels.Labels } func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { - h, err := a.appenderAt(t) - if err != nil { - return "", err + if t < a.mint { + return "", ErrOutOfBounds } - ref, err := h.app.Add(lset, t, v) - if err != nil { - return "", err - } - a.samples++ - if ref == "" { - return "", nil + hash := lset.Hash() + refb := make([]byte, 8) + + // Series exists already in the block. + if ms := a.head.get(hash, lset); ms != nil { + binary.BigEndian.PutUint64(refb, uint64(ms.ref)) + return string(refb), a.AddFast(string(refb), t, v) } - return string(append(h.meta.ULID[:], ref...)), nil + // Series was added in this transaction previously. + if ref, ok := a.newHashes[hash]; ok { + binary.BigEndian.PutUint64(refb, ref) + // XXX(fabxc): there's no fast path for multiple samples for the same new series + // in the same transaction. We always return the invalid empty ref. It's has not + // been a relevant use case so far and is not worth the trouble. + return "", a.AddFast(string(refb), t, v) + } + + // The series is completely new. + if a.newSeries == nil { + a.newHashes = map[uint64]uint64{} + } + // First sample for new series. + ref := uint64(len(a.newSeries)) + + a.newSeries = append(a.newSeries, &hashedLabels{ + ref: ref, + hash: hash, + labels: lset, + }) + // First bit indicates its a series created in this transaction. + ref |= (1 << 63) + + a.newHashes[hash] = ref + binary.BigEndian.PutUint64(refb, ref) + + return "", a.AddFast(string(refb), t, v) } func (a *dbAppender) AddFast(ref string, t int64, v float64) error { - if len(ref) < 16 { + if len(ref) != 8 { return errors.Wrap(ErrNotFound, "invalid ref length") } - // The first 16 bytes a ref hold the ULID of the head block. - h, err := a.appenderAt(t) - if err != nil { - return err - } - // Validate the ref points to the same block we got for t. - if string(h.meta.ULID[:]) != ref[:16] { - return ErrNotFound - } - if err := h.app.AddFast(ref[16:], t, v); err != nil { - // The block the ref points to might fit the given timestamp. - // We mask the error to stick with our contract. - if errors.Cause(err) == ErrOutOfBounds { - err = ErrNotFound + var ( + refn = binary.BigEndian.Uint64(yoloBytes(ref)) + id = uint32(refn) + inTx = refn&(1<<63) != 0 + ) + // Distinguish between existing series and series created in + // this transaction. + if inTx { + if id > uint32(len(a.newSeries)-1) { + return errors.Wrap(ErrNotFound, "transaction series ID too high") } - return err + // TODO(fabxc): we also have to validate here that the + // sample sequence is valid. + // We also have to revalidate it as we switch locks and create + // the new series. + } else { + ms, ok := a.head.series[id] + if !ok { + return errors.Wrap(ErrNotFound, "unknown series") + } + if err := ms.appendable(t, v); err != nil { + return err + } + } + if t < a.mint { + return ErrOutOfBounds } - a.samples++ + if t > a.highTimestamp { + a.highTimestamp = t + } + // if t < a.lowTimestamp { + // a.lowTimestamp = t + // } + + a.samples = append(a.samples, RefSample{ + Ref: refn, + T: t, + V: v, + }) return nil } -// appenderFor gets the appender for the head containing timestamp t. -// If the head block doesn't exist yet, it gets created. -func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) { - for _, h := range a.heads { - if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) { - return h, nil +func (a *dbAppender) createSeries() error { + if len(a.newSeries) == 0 { + return nil + } + a.newLabels = make([]labels.Labels, 0, len(a.newSeries)) + base0 := len(a.head.series) + + a.head.mtx.RUnlock() + defer a.head.mtx.RLock() + a.head.mtx.Lock() + defer a.head.mtx.Unlock() + + base1 := len(a.head.series) + + for _, l := range a.newSeries { + // We switched locks and have to re-validate that the series were not + // created by another goroutine in the meantime. + if base1 > base0 { + if ms := a.head.get(l.hash, l.labels); ms != nil { + l.ref = uint64(ms.ref) + continue + } + } + // Series is still new. + a.newLabels = append(a.newLabels, l.labels) + + s := a.head.create(l.hash, l.labels) + l.ref = uint64(s.ref) + } + + // Write all new series to the WAL. + if err := a.wal.LogSeries(a.newLabels); err != nil { + return errors.Wrap(err, "WAL log series") + } + + return nil +} + +func (a *dbAppender) Commit() error { + defer a.head.mtx.RUnlock() + + defer a.db.metrics.activeAppenders.Dec() + defer a.db.putAppendBuffer(a.samples) + + if err := a.createSeries(); err != nil { + return err + } + + // We have to update the refs of samples for series we just created. + for i := range a.samples { + s := &a.samples[i] + if s.Ref&(1<<63) != 0 { + s.Ref = a.newSeries[(s.Ref<<1)>>1].ref } } - // Currently opened appenders do not cover t. Ensure the head block is - // created and add missing appenders. - a.db.headmtx.Lock() - if err := a.db.ensureHead(t); err != nil { - a.db.headmtx.Unlock() - return nil, err + // Write all new samples to the WAL and add them to the + // in-mem database on success. + if err := a.wal.LogSamples(a.samples); err != nil { + return errors.Wrap(err, "WAL log samples") } - var hb headBlock - for _, h := range a.db.appendableHeads() { - m := h.Meta() + total := uint64(len(a.samples)) - if intervalContains(m.MinTime, m.MaxTime-1, t) { - hb = h + for _, s := range a.samples { + series, ok := a.head.series[uint32(s.Ref)] + if !ok { + return errors.Errorf("series with ID %d not found", s.Ref) + } + if !series.append(s.T, s.V) { + total-- + } + } + + for { + ht := a.head.MaxTime() + if a.highTimestamp <= ht { + break + } + if a.highTimestamp-a.head.MinTime() > a.head.chunkRange/2*3 { + select { + case a.db.compactc <- struct{}{}: + default: + } + } + if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.highTimestamp) { break } } - a.db.headmtx.Unlock() - if hb == nil { - return nil, ErrOutOfBounds - } - // Instantiate appender after returning headmtx! - app := &metaAppender{ - meta: hb.Meta(), - app: hb.Appender(), - } - a.heads = append(a.heads, app) + return nil +} - return app, nil +func (a *dbAppender) Rollback() error { + a.head.mtx.RUnlock() + + a.db.metrics.activeAppenders.Dec() + a.db.putAppendBuffer(a.samples) + + return nil } func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { @@ -740,87 +931,7 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { return mint, mint + width } -// ensureHead makes sure that there is a head block for the timestamp t if -// it is within or after the currently appendable window. -func (db *DB) ensureHead(t int64) error { - var ( - mint, maxt = rangeForTimestamp(t, int64(db.opts.BlockRanges[0])) - addBuffer = len(db.blocks) == 0 - last BlockMeta - ) - - if !addBuffer { - last = db.blocks[len(db.blocks)-1].Meta() - addBuffer = last.MaxTime <= mint-int64(db.opts.BlockRanges[0]) - } - // Create another block of buffer in front if the DB is initialized or retrieving - // new data after a long gap. - // This ensures we always have a full block width of append window. - if addBuffer { - if _, err := db.createHeadBlock(mint-int64(db.opts.BlockRanges[0]), mint); err != nil { - return err - } - // If the previous block reaches into our new window, make it smaller. - } else if mt := last.MaxTime; mt > mint { - mint = mt - } - if mint >= maxt { - return nil - } - // Error if the requested time for a head is before the appendable window. - if len(db.heads) > 0 && t < db.heads[0].Meta().MinTime { - return ErrOutOfBounds - } - - _, err := db.createHeadBlock(mint, maxt) - return err -} - -func (a *dbAppender) Commit() error { - defer a.db.metrics.activeAppenders.Dec() - defer a.db.mtx.RUnlock() - - // Commits to partial appenders must be concurrent as concurrent appenders - // may have conflicting locks on head appenders. - // For high-throughput use cases the errgroup causes significant blocking. Typically, - // we just deal with a single appender and special case it. - var err error - - switch len(a.heads) { - case 1: - err = a.heads[0].app.Commit() - default: - var g errgroup.Group - for _, h := range a.heads { - g.Go(h.app.Commit) - } - err = g.Wait() - } - - if err != nil { - return err - } - // XXX(fabxc): Push the metric down into head block to account properly - // for partial appends? - a.db.metrics.samplesAppended.Add(float64(a.samples)) - - return nil -} - -func (a *dbAppender) Rollback() error { - defer a.db.metrics.activeAppenders.Dec() - defer a.db.mtx.RUnlock() - - var g errgroup.Group - - for _, h := range a.heads { - g.Go(h.app.Rollback) - } - - return g.Wait() -} - -// Delete implements deletion of metrics. +// Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis. func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { db.cmtx.Lock() defer db.cmtx.Unlock() @@ -828,14 +939,50 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { db.mtx.Lock() defer db.mtx.Unlock() - blocks := db.blocksForInterval(mint, maxt) - var g errgroup.Group - for _, b := range blocks { - g.Go(func(b Block) func() error { - return func() error { return b.Delete(mint, maxt, ms...) } - }(b)) + for _, b := range db.blocks { + m := b.Meta() + if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { + g.Go(func(b DiskBlock) func() error { + return func() error { return b.Delete(mint, maxt, ms...) } + }(b)) + } + } + if err := g.Wait(); err != nil { + return err + } + + ir := db.head.Index() + + pr := newPostingsReader(ir) + p, absent := pr.Select(ms...) + + var stones []Stone + +Outer: + for p.Next() { + series := db.head.series[p.At()] + + for _, abs := range absent { + if series.lset.Get(abs) != "" { + continue Outer + } + } + + // Delete only until the current values and not beyond. + t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime()) + stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) + } + + if p.Err() != nil { + return p.Err() + } + if err := db.wal.LogDeletes(stones); err != nil { + return err + } + for _, s := range stones { + db.head.tombstones.add(s.ref, s.intervals[0]) } if err := g.Wait(); err != nil { @@ -856,8 +1003,8 @@ func intervalContains(min, max, t int64) bool { // blocksForInterval returns all blocks within the partition that may contain // data for the given time range. -func (db *DB) blocksForInterval(mint, maxt int64) []Block { - var bs []Block +func (db *DB) blocksForInterval(mint, maxt int64) []BlockReader { + var bs []BlockReader for _, b := range db.blocks { m := b.Meta() @@ -865,52 +1012,13 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { bs = append(bs, b) } } + if maxt >= db.head.MinTime() { + bs = append(bs, db.head) + } return bs } -// openHeadBlock opens the head block at dir. -func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { - var ( - wdir = walDir(dir) - l = log.With(db.logger, "wal", wdir) - ) - wal, err := OpenSegmentWAL(wdir, l, 5*time.Second) - if err != nil { - return nil, errors.Wrapf(err, "open WAL %s", dir) - } - - h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal, db.compactor) - if err != nil { - return nil, errors.Wrapf(err, "open head block %s", dir) - } - return h, nil -} - -// createHeadBlock starts a new head block to append to. -func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) { - dir, err := TouchHeadBlock(db.dir, mint, maxt) - if err != nil { - return nil, errors.Wrapf(err, "touch head block %s", dir) - } - newHead, err := db.openHeadBlock(dir) - if err != nil { - return nil, err - } - - db.logger.Log("msg", "created head block", "ulid", newHead.meta.ULID, "mint", mint, "maxt", maxt) - - db.blocks = append(db.blocks, newHead) // TODO(fabxc): this is a race! - db.heads = append(db.heads, newHead) - - select { - case db.compactc <- struct{}{}: - default: - } - - return newHead, nil -} - func isBlockDir(fi os.FileInfo) bool { if !fi.IsDir() { return false @@ -1032,3 +1140,14 @@ func closeAll(cs ...io.Closer) error { } return merr.Err() } + +func exponential(d, min, max time.Duration) time.Duration { + d *= 2 + if d < min { + d = min + } + if d > max { + d = max + } + return d +} diff --git a/db_test.go b/db_test.go index d1cc56c42..8a5d946eb 100644 --- a/db_test.go +++ b/db_test.go @@ -15,8 +15,10 @@ package tsdb import ( "io/ioutil" + "math" "math/rand" "os" + "sort" "testing" "github.com/pkg/errors" @@ -24,8 +26,20 @@ import ( "github.com/stretchr/testify/require" ) +func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { + tmpdir, _ := ioutil.TempDir("", "test") + + db, err := Open(tmpdir, nil, nil, opts) + require.NoError(t, err) + + // Do not close the test database by default as it will deadlock on test failures. + return db, func() { + os.RemoveAll(tmpdir) + } +} + // Convert a SeriesSet into a form useable with reflect.DeepEqual. -func readSeriesSet(ss SeriesSet) (map[string][]sample, error) { +func readSeriesSet(t testing.TB, ss SeriesSet) map[string][]sample { result := map[string][]sample{} for ss.Next() { @@ -37,31 +51,28 @@ func readSeriesSet(ss SeriesSet) (map[string][]sample, error) { t, v := it.At() samples = append(samples, sample{t: t, v: v}) } + require.NoError(t, it.Err()) name := series.Labels().String() result[name] = samples - if err := ss.Err(); err != nil { - return nil, err - } } - return result, nil + require.NoError(t, ss.Err()) + + return result } func TestDataAvailableOnlyAfterCommit(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) - - db, err := Open(tmpdir, nil, nil, nil) - require.NoError(t, err) - defer db.Close() + db, close := openTestDB(t, nil) + defer close() app := db.Appender() - _, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0) + + _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) require.NoError(t, err) querier := db.Querier(0, 1) - seriesSet, err := readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar"))) - require.NoError(t, err) + seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + require.Equal(t, seriesSet, map[string][]sample{}) require.NoError(t, querier.Close()) @@ -71,23 +82,17 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { querier = db.Querier(0, 1) defer querier.Close() - seriesSet, err = readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar"))) - require.NoError(t, err) + seriesSet = readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}}) } func TestDataNotAvailableAfterRollback(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) - - db, err := Open(tmpdir, nil, nil, nil) - if err != nil { - t.Fatalf("Error opening database: %q", err) - } - defer db.Close() + db, close := openTestDB(t, nil) + defer close() app := db.Appender() - _, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0) + _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) require.NoError(t, err) err = app.Rollback() @@ -96,22 +101,18 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { querier := db.Querier(0, 1) defer querier.Close() - seriesSet, err := readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar"))) - require.NoError(t, err) + seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + require.Equal(t, seriesSet, map[string][]sample{}) } func TestDBAppenderAddRef(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) - - db, err := Open(tmpdir, nil, nil, nil) - require.NoError(t, err) - defer db.Close() + db, close := openTestDB(t, nil) + defer close() app1 := db.Appender() - ref, err := app1.Add(labels.FromStrings("a", "b"), 0, 0) + ref, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) require.NoError(t, err) // When a series is first created, refs don't work within that transaction. @@ -122,35 +123,40 @@ func TestDBAppenderAddRef(t *testing.T) { require.NoError(t, err) app2 := db.Appender() - defer app2.Rollback() - - ref, err = app2.Add(labels.FromStrings("a", "b"), 1, 1) + ref, err = app2.Add(labels.FromStrings("a", "b"), 133, 1) require.NoError(t, err) - // Ref must be prefixed with block ULID of the block we wrote to. - id := db.blocks[len(db.blocks)-1].Meta().ULID - require.Equal(t, string(id[:]), ref[:16]) - // Reference must be valid to add another sample. - err = app2.AddFast(ref, 2, 2) + err = app2.AddFast(ref, 143, 2) require.NoError(t, err) // AddFast for the same timestamp must fail if the generation in the reference // doesn't add up. - refb := []byte(ref) - refb[15] ^= refb[15] - err = app2.AddFast(string(refb), 1, 1) + err = app2.AddFast("abc_invalid_xyz", 1, 1) require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) + + require.NoError(t, app2.Commit()) + + q := db.Querier(0, 200) + res := readSeriesSet(t, q.Select(labels.NewEqualMatcher("a", "b"))) + + require.Equal(t, map[string][]sample{ + labels.FromStrings("a", "b").String(): []sample{ + {t: 123, v: 0}, + {t: 133, v: 1}, + {t: 143, v: 2}, + }, + }, res) + + require.NoError(t, q.Close()) } func TestDeleteSimple(t *testing.T) { numSamples := int64(10) - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + db, close := openTestDB(t, nil) + defer close() - db, err := Open(tmpdir, nil, nil, nil) - require.NoError(t, err) app := db.Appender() smpls := make([]float64, numSamples) @@ -216,3 +222,246 @@ Outer: } } } + +func TestAmendDatapointCausesError(t *testing.T) { + db, close := openTestDB(t, nil) + defer close() + + app := db.Appender() + _, err := app.Add(labels.Labels{}, 0, 0) + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") + + app = db.Appender() + _, err = app.Add(labels.Labels{}, 0, 1) + require.Equal(t, ErrAmendSample, err) + require.NoError(t, app.Rollback(), "Unexpected error rolling back appender") +} + +func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { + db, close := openTestDB(t, nil) + defer close() + + app := db.Appender() + _, err := app.Add(labels.Labels{}, 0, math.NaN()) + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") + + app = db.Appender() + _, err = app.Add(labels.Labels{}, 0, math.NaN()) + require.NoError(t, err) +} + +func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { + db, close := openTestDB(t, nil) + defer close() + + app := db.Appender() + _, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001)) + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") + + app = db.Appender() + _, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000002)) + require.Equal(t, ErrAmendSample, err) +} + +func TestSkippingInvalidValuesInSameTxn(t *testing.T) { + db, close := openTestDB(t, nil) + defer close() + + // Append AmendedValue. + app := db.Appender() + _, err := app.Add(labels.Labels{{"a", "b"}}, 0, 1) + require.NoError(t, err) + _, err = app.Add(labels.Labels{{"a", "b"}}, 0, 2) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Make sure the right value is stored. + q := db.Querier(0, 10) + ss := q.Select(labels.NewEqualMatcher("a", "b")) + ssMap := readSeriesSet(t, ss) + + require.Equal(t, map[string][]sample{ + labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}}, + }, ssMap) + + require.NoError(t, q.Close()) + + // Append Out of Order Value. + app = db.Appender() + _, err = app.Add(labels.Labels{{"a", "b"}}, 10, 3) + require.NoError(t, err) + _, err = app.Add(labels.Labels{{"a", "b"}}, 7, 5) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + q = db.Querier(0, 10) + ss = q.Select(labels.NewEqualMatcher("a", "b")) + ssMap = readSeriesSet(t, ss) + + require.Equal(t, map[string][]sample{ + labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}, {10, 3}}, + }, ssMap) + require.NoError(t, q.Close()) +} + +func TestDB_e2e(t *testing.T) { + const ( + numDatapoints = 1000 + numRanges = 1000 + timeInterval = int64(3) + maxTime = int64(2 * 1000) + minTime = int64(200) + ) + // Create 8 series with 1000 data-points of different ranges and run queries. + lbls := [][]labels.Label{ + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + } + + seriesMap := map[string][]sample{} + for _, l := range lbls { + seriesMap[labels.New(l...).String()] = []sample{} + } + + db, close := openTestDB(t, nil) + defer close() + + app := db.Appender() + + for _, l := range lbls { + lset := labels.New(l...) + series := []sample{} + + ts := rand.Int63n(300) + for i := 0; i < numDatapoints; i++ { + v := rand.Float64() + + series = append(series, sample{ts, v}) + + _, err := app.Add(lset, ts, v) + require.NoError(t, err) + + ts += rand.Int63n(timeInterval) + 1 + } + + seriesMap[lset.String()] = series + } + + require.NoError(t, app.Commit()) + + // Query each selector on 1000 random time-ranges. + queries := []struct { + ms []labels.Matcher + }{ + { + ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "b"), + labels.NewEqualMatcher("job", "prom-k8s"), + }, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "c"), + labels.NewEqualMatcher("instance", "localhost:9090"), + labels.NewEqualMatcher("job", "prometheus"), + }, + }, + // TODO: Add Regexp Matchers. + } + + for _, qry := range queries { + matched := labels.Slice{} + for _, ls := range lbls { + s := labels.Selector(qry.ms) + if s.Matches(ls) { + matched = append(matched, ls) + } + } + + sort.Sort(matched) + + for i := 0; i < numRanges; i++ { + mint := rand.Int63n(300) + maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) + + t.Logf("run query %s, [%d, %d]", qry.ms, mint, maxt) + + expected := map[string][]sample{} + + // Build the mockSeriesSet. + for _, m := range matched { + smpls := boundedSamples(seriesMap[m.String()], mint, maxt) + if len(smpls) > 0 { + expected[m.String()] = smpls + } + } + + q := db.Querier(mint, maxt) + ss := q.Select(qry.ms...) + + result := map[string][]sample{} + + for ss.Next() { + x := ss.At() + + smpls, err := expandSeriesIterator(x.Iterator()) + require.NoError(t, err) + + if len(smpls) > 0 { + result[x.Labels().String()] = smpls + } + } + + require.NoError(t, ss.Err()) + require.Equal(t, expected, result) + + q.Close() + } + } + + return +} diff --git a/head.go b/head.go index af364d117..7174f03d7 100644 --- a/head.go +++ b/head.go @@ -16,18 +16,11 @@ package tsdb import ( "fmt" "math" - "math/rand" - "os" - "path/filepath" "sort" "sync" "sync/atomic" - "time" - - "encoding/binary" "github.com/go-kit/kit/log" - "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" @@ -50,20 +43,17 @@ var ( ErrOutOfBounds = errors.New("out of bounds") ) -// HeadBlock handles reads and writes of time series data within a time window. -type HeadBlock struct { - mtx sync.RWMutex - dir string - wal WAL - compactor Compactor +// Head handles reads and writes of time series data within a time window. +type Head struct { + chunkRange int64 + mtx sync.RWMutex - activeWriters uint64 - highTimestamp int64 - closed bool + minTime, maxTime int64 + lastSeriesID uint32 // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. - series []*memSeries + series map[uint32]*memSeries // hashes contains a collision map of label set hashes of chunks // to their chunk descs. hashes map[uint64][]*memSeries @@ -73,70 +63,37 @@ type HeadBlock struct { postings *memPostings // postings lists for terms tombstones tombstoneReader - - meta BlockMeta } -// TouchHeadBlock atomically touches a new head block in dir for -// samples in the range [mint,maxt). -func TouchHeadBlock(dir string, mint, maxt int64) (string, error) { - entropy := rand.New(rand.NewSource(time.Now().UnixNano())) - - ulid, err := ulid.New(ulid.Now(), entropy) - if err != nil { - return "", err - } - - // Make head block creation appear atomic. - dir = filepath.Join(dir, ulid.String()) - tmp := dir + ".tmp" - - if err := os.MkdirAll(tmp, 0777); err != nil { - return "", err - } - - if err := writeMetaFile(tmp, &BlockMeta{ - ULID: ulid, - MinTime: mint, - MaxTime: maxt, - }); err != nil { - return "", err - } - - return dir, renameFile(tmp, dir) -} - -// OpenHeadBlock opens the head block in dir. -func OpenHeadBlock(dir string, l log.Logger, wal WAL, c Compactor) (*HeadBlock, error) { - meta, err := readMetaFile(dir) - if err != nil { - return nil, err - } - - h := &HeadBlock{ - dir: dir, - wal: wal, - compactor: c, - series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. +// NewHead opens the head block in dir. +func NewHead(l log.Logger, wal WALReader, chunkRange int64) (*Head, error) { + h := &Head{ + chunkRange: chunkRange, + minTime: math.MaxInt64, + maxTime: math.MinInt64, + series: map[uint32]*memSeries{}, hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: &memPostings{m: make(map[term][]uint32)}, - meta: *meta, tombstones: newEmptyTombstoneReader(), } - return h, h.init() + if wal == nil { + wal = NopWAL{} + } + return h, h.init(wal) } -func (h *HeadBlock) init() error { - r := h.wal.Reader() +func (h *Head) String() string { + return "" +} + +func (h *Head) init(r WALReader) error { seriesFunc := func(series []labels.Labels) error { for _, lset := range series { h.create(lset.Hash(), lset) - h.meta.Stats.NumSeries++ } - return nil } samplesFunc := func(samples []RefSample) error { @@ -145,12 +102,7 @@ func (h *HeadBlock) init() error { return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series)) } - h.series[s.Ref].append(s.T, s.V) - - if !h.inBounds(s.T) { - return errors.Wrap(ErrOutOfBounds, "consume WAL") - } - h.meta.Stats.NumSamples++ + h.series[uint32(s.Ref)].append(s.T, s.V) } return nil @@ -172,444 +124,234 @@ func (h *HeadBlock) init() error { return nil } -// inBounds returns true if the given timestamp is within the valid -// time bounds of the block. -func (h *HeadBlock) inBounds(t int64) bool { - return t >= h.meta.MinTime && t <= h.meta.MaxTime -} +// gc removes data before the minimum timestmap from the head. +func (h *Head) gc() { + // Only data strictly lower than this timestamp must be deleted. + mint := h.MinTime() -func (h *HeadBlock) String() string { - return h.meta.ULID.String() -} + deletedHashes := map[uint64][]uint32{} + + h.mtx.RLock() + + for hash, ss := range h.hashes { + for _, s := range ss { + s.mtx.Lock() + s.truncateChunksBefore(mint) + + if len(s.chunks) == 0 { + deletedHashes[hash] = append(deletedHashes[hash], s.ref) + } + + s.mtx.Unlock() + } + } + + deletedIDs := make(map[uint32]struct{}, len(deletedHashes)) + + h.mtx.RUnlock() -// Close syncs all data and closes underlying resources of the head block. -func (h *HeadBlock) Close() error { h.mtx.Lock() defer h.mtx.Unlock() - if err := h.wal.Close(); err != nil { - return errors.Wrapf(err, "close WAL for head %s", h.dir) - } - // Check whether the head block still exists in the underlying dir - // or has already been replaced with a compacted version or removed. - meta, err := readMetaFile(h.dir) - if os.IsNotExist(err) { - return nil - } - if err != nil { - return err - } - if meta.ULID == h.meta.ULID { - return writeMetaFile(h.dir, &h.meta) + for hash, ids := range deletedHashes { + + inIDs := func(id uint32) bool { + for _, o := range ids { + if o == id { + return true + } + } + return false + } + var rem []*memSeries + + for _, s := range h.hashes[hash] { + if !inIDs(s.ref) { + rem = append(rem, s) + continue + } + deletedIDs[s.ref] = struct{}{} + // We switched locks and the series might have received new samples by now, + // check again. + s.mtx.Lock() + chkCount := len(s.chunks) + s.mtx.Unlock() + + if chkCount > 0 { + continue + } + delete(h.series, s.ref) + } + if len(rem) > 0 { + h.hashes[hash] = rem + } else { + delete(h.hashes, hash) + } } - h.closed = true - return nil + for t, p := range h.postings.m { + repl := make([]uint32, 0, len(p)) + + for _, id := range p { + if _, ok := deletedIDs[id]; !ok { + repl = append(repl, id) + } + } + + if len(repl) == 0 { + delete(h.postings.m, t) + } else { + h.postings.m[t] = repl + } + } + + symbols := make(map[string]struct{}, len(h.symbols)) + values := make(map[string]stringset, len(h.values)) + + for t := range h.postings.m { + symbols[t.name] = struct{}{} + symbols[t.value] = struct{}{} + + ss, ok := values[t.name] + if !ok { + ss = stringset{} + values[t.name] = ss + } + ss.set(t.value) + } + + h.symbols = symbols + h.values = values } -// Meta returns a BlockMeta for the head block. -func (h *HeadBlock) Meta() BlockMeta { - m := BlockMeta{ - ULID: h.meta.ULID, - MinTime: h.meta.MinTime, - MaxTime: h.meta.MaxTime, - Compaction: h.meta.Compaction, - } - - m.Stats.NumChunks = atomic.LoadUint64(&h.meta.Stats.NumChunks) - m.Stats.NumSeries = atomic.LoadUint64(&h.meta.Stats.NumSeries) - m.Stats.NumSamples = atomic.LoadUint64(&h.meta.Stats.NumSamples) - - return m -} - -// Tombstones returns the TombstoneReader against the block. -func (h *HeadBlock) Tombstones() TombstoneReader { +func (h *Head) Tombstones() TombstoneReader { return h.tombstones } -// Delete implements headBlock. -func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error { - ir := h.Index() - - pr := newPostingsReader(ir) - p, absent := pr.Select(ms...) - - var stones []Stone - -Outer: - for p.Next() { - ref := p.At() - lset := h.series[ref].lset - for _, abs := range absent { - if lset.Get(abs) != "" { - continue Outer - } - } - - // Delete only until the current values and not beyond. - tmin, tmax := clampInterval(mint, maxt, h.series[ref].chunks[0].minTime, h.series[ref].head().maxTime) - stones = append(stones, Stone{ref, Intervals{{tmin, tmax}}}) - } - - if p.Err() != nil { - return p.Err() - } - if err := h.wal.LogDeletes(stones); err != nil { - return err - } - - for _, s := range stones { - h.tombstones.add(s.ref, s.intervals[0]) - } - - h.meta.Stats.NumTombstones = uint64(len(h.tombstones)) - return nil -} - -// Snapshot persists the current state of the headblock to the given directory. -// Callers must ensure that there are no active appenders against the block. -// DB does this by acquiring its own write lock. -func (h *HeadBlock) Snapshot(snapshotDir string) error { - if h.meta.Stats.NumSeries == 0 { - return nil - } - - return h.compactor.Write(snapshotDir, h) -} - -// Dir returns the directory of the block. -func (h *HeadBlock) Dir() string { return h.dir } - // Index returns an IndexReader against the block. -func (h *HeadBlock) Index() IndexReader { - h.mtx.RLock() - defer h.mtx.RUnlock() +func (h *Head) Index() IndexReader { + return h.indexRange(math.MinInt64, math.MaxInt64) +} - return &headIndexReader{HeadBlock: h, maxSeries: uint32(len(h.series) - 1)} +func (h *Head) indexRange(mint, maxt int64) *headIndexReader { + if hmin := h.MinTime(); hmin > mint { + mint = hmin + } + return &headIndexReader{head: h, mint: mint, maxt: maxt} } // Chunks returns a ChunkReader against the block. -func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } - -// Querier returns a new Querier against the block for the range [mint, maxt]. -func (h *HeadBlock) Querier(mint, maxt int64) Querier { - h.mtx.RLock() - if h.closed { - panic(fmt.Sprintf("block %s already closed", h.dir)) - } - h.mtx.RUnlock() - - return &blockQuerier{ - mint: mint, - maxt: maxt, - index: h.Index(), - chunks: h.Chunks(), - tombstones: h.Tombstones(), - } +func (h *Head) Chunks() ChunkReader { + return h.chunksRange(math.MinInt64, math.MaxInt64) } -// Appender returns a new Appender against the head block. -func (h *HeadBlock) Appender() Appender { - atomic.AddUint64(&h.activeWriters, 1) - - h.mtx.RLock() - - if h.closed { - panic(fmt.Sprintf("block %s already closed", h.dir)) +func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { + if hmin := h.MinTime(); hmin > mint { + mint = hmin } - return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()} + return &headChunkReader{head: h, mint: mint, maxt: maxt} } -// ActiveWriters returns true if the block has open write transactions. -func (h *HeadBlock) ActiveWriters() int { - return int(atomic.LoadUint64(&h.activeWriters)) +// MinTime returns the lowest time bound on visible data in the head. +func (h *Head) MinTime() int64 { + return atomic.LoadInt64(&h.minTime) } -// HighTimestamp returns the highest inserted sample timestamp. -func (h *HeadBlock) HighTimestamp() int64 { - return atomic.LoadInt64(&h.highTimestamp) -} - -var headPool = sync.Pool{} - -func getHeadAppendBuffer() []RefSample { - b := headPool.Get() - if b == nil { - return make([]RefSample, 0, 512) - } - return b.([]RefSample) -} - -func putHeadAppendBuffer(b []RefSample) { - headPool.Put(b[:0]) -} - -type headAppender struct { - *HeadBlock - - newSeries []*hashedLabels - newLabels []labels.Labels - newHashes map[uint64]uint64 - - samples []RefSample - highTimestamp int64 -} - -type hashedLabels struct { - ref uint64 - hash uint64 - labels labels.Labels -} - -func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { - if !a.inBounds(t) { - return "", ErrOutOfBounds - } - - hash := lset.Hash() - refb := make([]byte, 8) - - // Series exists already in the block. - if ms := a.get(hash, lset); ms != nil { - binary.BigEndian.PutUint64(refb, uint64(ms.ref)) - return string(refb), a.AddFast(string(refb), t, v) - } - // Series was added in this transaction previously. - if ref, ok := a.newHashes[hash]; ok { - binary.BigEndian.PutUint64(refb, ref) - // XXX(fabxc): there's no fast path for multiple samples for the same new series - // in the same transaction. We always return the invalid empty ref. It's has not - // been a relevant use case so far and is not worth the trouble. - return "", a.AddFast(string(refb), t, v) - } - - // The series is completely new. - if a.newSeries == nil { - a.newHashes = map[uint64]uint64{} - } - // First sample for new series. - ref := uint64(len(a.newSeries)) - - a.newSeries = append(a.newSeries, &hashedLabels{ - ref: ref, - hash: hash, - labels: lset, - }) - // First bit indicates its a series created in this transaction. - ref |= (1 << 63) - - a.newHashes[hash] = ref - binary.BigEndian.PutUint64(refb, ref) - - return "", a.AddFast(string(refb), t, v) -} - -func (a *headAppender) AddFast(ref string, t int64, v float64) error { - if len(ref) != 8 { - return errors.Wrap(ErrNotFound, "invalid ref length") - } - var ( - refn = binary.BigEndian.Uint64(yoloBytes(ref)) - id = (refn << 1) >> 1 - inTx = refn&(1<<63) != 0 - ) - // Distinguish between existing series and series created in - // this transaction. - if inTx { - if id > uint64(len(a.newSeries)-1) { - return errors.Wrap(ErrNotFound, "transaction series ID too high") - } - // TODO(fabxc): we also have to validate here that the - // sample sequence is valid. - // We also have to revalidate it as we switch locks and create - // the new series. - } else if id > uint64(len(a.series)) { - return errors.Wrap(ErrNotFound, "transaction series ID too high") - } else { - ms := a.series[id] - if ms == nil { - return errors.Wrap(ErrNotFound, "nil series") - } - // TODO(fabxc): memory series should be locked here already. - // Only problem is release of locks in case of a rollback. - c := ms.head() - - if !a.inBounds(t) { - return ErrOutOfBounds - } - if t < c.maxTime { - return ErrOutOfOrderSample - } - - // We are allowing exact duplicates as we can encounter them in valid cases - // like federation and erroring out at that time would be extremely noisy. - if c.maxTime == t && math.Float64bits(ms.lastValue) != math.Float64bits(v) { - return ErrAmendSample - } - } - - if t > a.highTimestamp { - a.highTimestamp = t - } - - a.samples = append(a.samples, RefSample{ - Ref: refn, - T: t, - V: v, - }) - return nil -} - -func (a *headAppender) createSeries() error { - if len(a.newSeries) == 0 { - return nil - } - a.newLabels = make([]labels.Labels, 0, len(a.newSeries)) - base0 := len(a.series) - - a.mtx.RUnlock() - defer a.mtx.RLock() - a.mtx.Lock() - defer a.mtx.Unlock() - - base1 := len(a.series) - - for _, l := range a.newSeries { - // We switched locks and have to re-validate that the series were not - // created by another goroutine in the meantime. - if base1 > base0 { - if ms := a.get(l.hash, l.labels); ms != nil { - l.ref = uint64(ms.ref) - continue - } - } - // Series is still new. - a.newLabels = append(a.newLabels, l.labels) - l.ref = uint64(len(a.series)) - - a.create(l.hash, l.labels) - } - - // Write all new series to the WAL. - if err := a.wal.LogSeries(a.newLabels); err != nil { - return errors.Wrap(err, "WAL log series") - } - - return nil -} - -func (a *headAppender) Commit() error { - defer atomic.AddUint64(&a.activeWriters, ^uint64(0)) - defer putHeadAppendBuffer(a.samples) - defer a.mtx.RUnlock() - - if err := a.createSeries(); err != nil { - return err - } - - // We have to update the refs of samples for series we just created. - for i := range a.samples { - s := &a.samples[i] - if s.Ref&(1<<63) != 0 { - s.Ref = a.newSeries[(s.Ref<<1)>>1].ref - } - } - - // Write all new samples to the WAL and add them to the - // in-mem database on success. - if err := a.wal.LogSamples(a.samples); err != nil { - return errors.Wrap(err, "WAL log samples") - } - - total := uint64(len(a.samples)) - - for _, s := range a.samples { - if !a.series[s.Ref].append(s.T, s.V) { - total-- - } - } - - atomic.AddUint64(&a.meta.Stats.NumSamples, total) - atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries))) - - for { - ht := a.HeadBlock.HighTimestamp() - if a.highTimestamp <= ht { - break - } - if atomic.CompareAndSwapInt64(&a.HeadBlock.highTimestamp, ht, a.highTimestamp) { - break - } - } - - return nil -} - -func (a *headAppender) Rollback() error { - a.mtx.RUnlock() - atomic.AddUint64(&a.activeWriters, ^uint64(0)) - putHeadAppendBuffer(a.samples) - return nil +// MaxTime returns the highest timestamp seen in data of the head. +func (h *Head) MaxTime() int64 { + return atomic.LoadInt64(&h.maxTime) } type headChunkReader struct { - *HeadBlock + head *Head + mint, maxt int64 +} + +func (h *headChunkReader) Close() error { + return nil } // Chunk returns the chunk for the reference number. func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { - h.mtx.RLock() - defer h.mtx.RUnlock() + h.head.mtx.RLock() + defer h.head.mtx.RUnlock() - si := ref >> 32 - ci := (ref << 32) >> 32 + s := h.head.series[uint32(ref>>32)] - c := &safeChunk{ - Chunk: h.series[si].chunks[ci].chunk, - s: h.series[si], - i: int(ci), + s.mtx.RLock() + cid := int((ref << 32) >> 32) + c := s.chunk(cid) + s.mtx.RUnlock() + + // Do not expose chunks that are outside of the specified range. + if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { + return nil, ErrNotFound } - return c, nil + + return &safeChunk{ + Chunk: c.chunk, + s: s, + cid: cid, + }, nil } type safeChunk struct { chunks.Chunk - s *memSeries - i int + s *memSeries + cid int } func (c *safeChunk) Iterator() chunks.Iterator { c.s.mtx.RLock() defer c.s.mtx.RUnlock() - return c.s.iterator(c.i) + return c.s.iterator(c.cid) } // func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } // func (c *safeChunk) Bytes() []byte { panic("illegal") } // func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } +type rangeHead struct { + head *Head + mint, maxt int64 +} + +func (h *rangeHead) Index() IndexReader { + return h.head.indexRange(h.mint, h.maxt) +} + +func (h *rangeHead) Chunks() ChunkReader { + return h.head.chunksRange(h.mint, h.maxt) +} + +func (h *rangeHead) Tombstones() TombstoneReader { + return newEmptyTombstoneReader() +} + type headIndexReader struct { - *HeadBlock - // Highest series that existed when the index reader was instantiated. - maxSeries uint32 + head *Head + mint, maxt int64 +} + +func (h *headIndexReader) Close() error { + return nil } func (h *headIndexReader) Symbols() (map[string]struct{}, error) { - return h.symbols, nil + return h.head.symbols, nil } // LabelValues returns the possible label values func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) { - h.mtx.RLock() - defer h.mtx.RUnlock() + h.head.mtx.RLock() + defer h.head.mtx.RUnlock() if len(names) != 1 { return nil, errInvalidSize } var sl []string - for s := range h.values[names[0]] { + for s := range h.head.values[names[0]] { sl = append(sl, s) } sort.Strings(sl) @@ -619,46 +361,51 @@ func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) { // Postings returns the postings list iterator for the label pair. func (h *headIndexReader) Postings(name, value string) (Postings, error) { - h.mtx.RLock() - defer h.mtx.RUnlock() + h.head.mtx.RLock() + defer h.head.mtx.RUnlock() - return h.postings.get(term{name: name, value: value}), nil + return h.head.postings.get(term{name: name, value: value}), nil } func (h *headIndexReader) SortedPostings(p Postings) Postings { - h.mtx.RLock() - defer h.mtx.RUnlock() + h.head.mtx.RLock() + defer h.head.mtx.RUnlock() ep := make([]uint32, 0, 1024) for p.Next() { - // Skip posting entries that include series added after we - // instantiated the index reader. - if p.At() > h.maxSeries { - break - } ep = append(ep, p.At()) } if err := p.Err(); err != nil { return errPostings{err: errors.Wrap(err, "expand postings")} } + var err error sort.Slice(ep, func(i, j int) bool { - return labels.Compare(h.series[ep[i]].lset, h.series[ep[j]].lset) < 0 + if err != nil { + return false + } + a, ok1 := h.head.series[ep[i]] + b, ok2 := h.head.series[ep[j]] + + if !ok1 || !ok2 { + err = errors.Errorf("series not found") + return false + } + return labels.Compare(a.lset, b.lset) < 0 }) + if err != nil { + return errPostings{err: err} + } return newListPostings(ep) } // Series returns the series for the given reference. func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error { - h.mtx.RLock() - defer h.mtx.RUnlock() + h.head.mtx.RLock() + defer h.head.mtx.RUnlock() - if ref > h.maxSeries { - return ErrNotFound - } - - s := h.series[ref] + s := h.head.series[ref] if s == nil { return ErrNotFound } @@ -670,10 +417,14 @@ func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkM *chks = (*chks)[:0] for i, c := range s.chunks { + // Do not expose chunks that are outside of the specified range. + if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { + continue + } *chks = append(*chks, ChunkMeta{ MinTime: c.minTime, MaxTime: c.maxTime, - Ref: (uint64(ref) << 32) | uint64(i), + Ref: (uint64(ref) << 32) | uint64(s.chunkID(i)), }) } @@ -681,12 +432,12 @@ func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkM } func (h *headIndexReader) LabelIndices() ([][]string, error) { - h.mtx.RLock() - defer h.mtx.RUnlock() + h.head.mtx.RLock() + defer h.head.mtx.RUnlock() res := [][]string{} - for s := range h.values { + for s := range h.head.values { res = append(res, []string{s}) } return res, nil @@ -694,7 +445,7 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) { // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. -func (h *HeadBlock) get(hash uint64, lset labels.Labels) *memSeries { +func (h *Head) get(hash uint64, lset labels.Labels) *memSeries { series := h.hashes[hash] for _, s := range series { @@ -705,11 +456,11 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *memSeries { return nil } -func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries { - s := newMemSeries(lset, uint32(len(h.series)), h.meta.MaxTime) +func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { + id := atomic.AddUint32(&h.lastSeriesID, 1) - // Allocate empty space until we can insert at the given index. - h.series = append(h.series, s) + s := newMemSeries(lset, id, h.chunkRange) + h.series[id] = s h.hashes[hash] = append(h.hashes[hash], s) @@ -727,7 +478,7 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries { h.symbols[l.Value] = struct{}{} } - h.postings.add(s.ref, term{}) + h.postings.add(id, term{}) return s } @@ -740,18 +491,27 @@ type sample struct { type memSeries struct { mtx sync.RWMutex - ref uint32 - lset labels.Labels - chunks []*memChunk + ref uint32 + lset labels.Labels + chunks []*memChunk + chunkRange int64 + firstChunkID int nextAt int64 // timestamp at which to cut the next chunk. - maxt int64 // maximum timestamp for the series. lastValue float64 sampleBuf [4]sample app chunks.Appender // Current appender for the chunk. } +func (s *memSeries) minTime() int64 { + return s.chunks[0].minTime +} + +func (s *memSeries) maxTime() int64 { + return s.head().maxTime +} + func (s *memSeries) cut(mint int64) *memChunk { c := &memChunk{ chunk: chunks.NewXORChunk(), @@ -768,16 +528,65 @@ func (s *memSeries) cut(mint int64) *memChunk { return c } -func newMemSeries(lset labels.Labels, id uint32, maxt int64) *memSeries { +func newMemSeries(lset labels.Labels, id uint32, chunkRange int64) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - maxt: maxt, - nextAt: math.MinInt64, + lset: lset, + ref: id, + chunkRange: chunkRange, + nextAt: math.MinInt64, } return s } +// appendable checks whether the given sample is valid for appending to the series. +func (s *memSeries) appendable(t int64, v float64) error { + if len(s.chunks) == 0 { + return nil + } + c := s.head() + + if t > c.maxTime { + return nil + } + if t < c.maxTime { + return ErrOutOfOrderSample + } + // We are allowing exact duplicates as we can encounter them in valid cases + // like federation and erroring out at that time would be extremely noisy. + if math.Float64bits(s.lastValue) != math.Float64bits(v) { + return ErrAmendSample + } + return nil +} + +func (s *memSeries) chunk(id int) *memChunk { + ix := id - s.firstChunkID + if ix >= len(s.chunks) || ix < 0 { + fmt.Println("get chunk", id, len(s.chunks), s.firstChunkID) + } + + return s.chunks[ix] +} + +func (s *memSeries) chunkID(pos int) int { + return pos + s.firstChunkID +} + +// truncateChunksBefore removes all chunks from the series that have not timestamp +// at or after mint. Chunk IDs remain unchanged. +func (s *memSeries) truncateChunksBefore(mint int64) { + var k int + for i, c := range s.chunks { + if c.maxTime >= mint { + break + } + k = i + 1 + } + s.chunks = append(s.chunks[:0], s.chunks[k:]...) + s.firstChunkID += k +} + +// append adds the sample (t, v) to the series. func (s *memSeries) append(t int64, v float64) bool { const samplesPerChunk = 120 @@ -802,7 +611,8 @@ func (s *memSeries) append(t int64, v float64) bool { c.samples++ if c.samples == samplesPerChunk/4 { - s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.maxt) + _, maxt := rangeForTimestamp(c.minTime, s.chunkRange) + s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt) } s.lastValue = v @@ -827,7 +637,7 @@ func computeChunkEndTime(start, cur, max int64) int64 { } func (s *memSeries) iterator(i int) chunks.Iterator { - c := s.chunks[i] + c := s.chunk(i) if i < len(s.chunks)-1 { return c.chunk.Iterator() diff --git a/head_test.go b/head_test.go index 8aaec03e1..a19d09c4e 100644 --- a/head_test.go +++ b/head_test.go @@ -15,12 +15,8 @@ package tsdb import ( "io/ioutil" - "math" - "math/rand" "os" - "sort" "testing" - "time" "unsafe" "github.com/pkg/errors" @@ -31,23 +27,6 @@ import ( "github.com/stretchr/testify/require" ) -// createTestHeadBlock creates a new head block with a SegmentWAL. -func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock { - dir, err := TouchHeadBlock(dir, mint, maxt) - require.NoError(t, err) - - return openTestHeadBlock(t, dir) -} - -func openTestHeadBlock(t testing.TB, dir string) *HeadBlock { - wal, err := OpenSegmentWAL(dir, nil, 5*time.Second) - require.NoError(t, err) - - h, err := OpenHeadBlock(dir, nil, wal, nil) - require.NoError(t, err) - return h -} - func BenchmarkCreateSeries(b *testing.B) { lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6) require.NoError(b, err) @@ -57,7 +36,10 @@ func BenchmarkCreateSeries(b *testing.B) { require.NoError(b, err) defer os.RemoveAll(dir) - h := createTestHeadBlock(b, dir, 0, 1) + h, err := NewHead(nil, nil, 10000) + if err != nil { + require.NoError(b, err) + } b.ReportAllocs() b.ResetTimer() @@ -106,598 +88,321 @@ func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) { return mets, nil } -func TestAmendDatapointCausesError(t *testing.T) { - dir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(dir) - - hb := createTestHeadBlock(t, dir, 0, 1000) - - app := hb.Appender() - _, err := app.Add(labels.Labels{}, 0, 0) - require.NoError(t, err, "Failed to add sample") - require.NoError(t, app.Commit(), "Unexpected error committing appender") - - app = hb.Appender() - _, err = app.Add(labels.Labels{}, 0, 1) - require.Equal(t, ErrAmendSample, err) -} - -func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { - dir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(dir) - - hb := createTestHeadBlock(t, dir, 0, 1000) - - app := hb.Appender() - _, err := app.Add(labels.Labels{}, 0, math.NaN()) - require.NoError(t, err, "Failed to add sample") - require.NoError(t, app.Commit(), "Unexpected error committing appender") - - app = hb.Appender() - _, err = app.Add(labels.Labels{}, 0, math.NaN()) - require.NoError(t, err) -} - -func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { - dir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(dir) - - hb := createTestHeadBlock(t, dir, 0, 1000) - - app := hb.Appender() - _, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001)) - require.NoError(t, err, "Failed to add sample") - require.NoError(t, app.Commit(), "Unexpected error committing appender") - - app = hb.Appender() - _, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000002)) - require.Equal(t, ErrAmendSample, err) -} - -func TestSkippingInvalidValuesInSameTxn(t *testing.T) { - dir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(dir) - - hb := createTestHeadBlock(t, dir, 0, 1000) - - // Append AmendedValue. - app := hb.Appender() - _, err := app.Add(labels.Labels{{"a", "b"}}, 0, 1) - require.NoError(t, err) - _, err = app.Add(labels.Labels{{"a", "b"}}, 0, 2) - require.NoError(t, err) - require.NoError(t, app.Commit()) - require.Equal(t, uint64(1), hb.Meta().Stats.NumSamples) - - // Make sure the right value is stored. - q := hb.Querier(0, 10) - ss := q.Select(labels.NewEqualMatcher("a", "b")) - ssMap, err := readSeriesSet(ss) - require.NoError(t, err) - - require.Equal(t, map[string][]sample{ - labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}}, - }, ssMap) - - require.NoError(t, q.Close()) - - // Append Out of Order Value. - app = hb.Appender() - _, err = app.Add(labels.Labels{{"a", "b"}}, 10, 3) - require.NoError(t, err) - _, err = app.Add(labels.Labels{{"a", "b"}}, 7, 5) - require.NoError(t, err) - require.NoError(t, app.Commit()) - require.Equal(t, uint64(2), hb.Meta().Stats.NumSamples) - - q = hb.Querier(0, 10) - ss = q.Select(labels.NewEqualMatcher("a", "b")) - ssMap, err = readSeriesSet(ss) - require.NoError(t, err) - - require.Equal(t, map[string][]sample{ - labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}, {10, 3}}, - }, ssMap) - require.NoError(t, q.Close()) -} - -func TestHeadBlock_e2e(t *testing.T) { - numDatapoints := 1000 - numRanges := 1000 - timeInterval := int64(3) - maxTime := int64(2 * 1000) - minTime := int64(200) - // Create 8 series with 1000 data-points of different ranges and run queries. - lbls := [][]labels.Label{ - { - {"a", "b"}, - {"instance", "localhost:9090"}, - {"job", "prometheus"}, - }, - { - {"a", "b"}, - {"instance", "127.0.0.1:9090"}, - {"job", "prometheus"}, - }, - { - {"a", "b"}, - {"instance", "127.0.0.1:9090"}, - {"job", "prom-k8s"}, - }, - { - {"a", "b"}, - {"instance", "localhost:9090"}, - {"job", "prom-k8s"}, - }, - { - {"a", "c"}, - {"instance", "localhost:9090"}, - {"job", "prometheus"}, - }, - { - {"a", "c"}, - {"instance", "127.0.0.1:9090"}, - {"job", "prometheus"}, - }, - { - {"a", "c"}, - {"instance", "127.0.0.1:9090"}, - {"job", "prom-k8s"}, - }, - { - {"a", "c"}, - {"instance", "localhost:9090"}, - {"job", "prom-k8s"}, - }, - } - - seriesMap := map[string][]sample{} - for _, l := range lbls { - seriesMap[labels.New(l...).String()] = []sample{} - } - - dir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(dir) - - hb := createTestHeadBlock(t, dir, minTime, maxTime) - app := hb.Appender() - - for _, l := range lbls { - ls := labels.New(l...) - series := []sample{} - - ts := rand.Int63n(300) - for i := 0; i < numDatapoints; i++ { - v := rand.Float64() - if ts >= minTime && ts <= maxTime { - series = append(series, sample{ts, v}) - } - - _, err := app.Add(ls, ts, v) - if ts >= minTime && ts <= maxTime { - require.NoError(t, err) - } else { - require.EqualError(t, err, ErrOutOfBounds.Error()) - } - - ts += rand.Int63n(timeInterval) + 1 - } - - seriesMap[labels.New(l...).String()] = series - } - - require.NoError(t, app.Commit()) - - // Query each selector on 1000 random time-ranges. - queries := []struct { - ms []labels.Matcher - }{ - { - ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, - }, - { - ms: []labels.Matcher{ - labels.NewEqualMatcher("a", "b"), - labels.NewEqualMatcher("job", "prom-k8s"), - }, - }, - { - ms: []labels.Matcher{ - labels.NewEqualMatcher("a", "c"), - labels.NewEqualMatcher("instance", "localhost:9090"), - labels.NewEqualMatcher("job", "prometheus"), - }, - }, - // TODO: Add Regexp Matchers. - } - - for _, qry := range queries { - matched := labels.Slice{} - for _, ls := range lbls { - s := labels.Selector(qry.ms) - if s.Matches(ls) { - matched = append(matched, ls) - } - } - - sort.Sort(matched) - - for i := 0; i < numRanges; i++ { - mint := rand.Int63n(300) - maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) - - q := hb.Querier(mint, maxt) - ss := q.Select(qry.ms...) - - // Build the mockSeriesSet. - matchedSeries := make([]Series, 0, len(matched)) - for _, m := range matched { - smpls := boundedSamples(seriesMap[m.String()], mint, maxt) - - // Only append those series for which samples exist as mockSeriesSet - // doesn't skip series with no samples. - // TODO: But sometimes SeriesSet returns an empty SeriesIterator - if len(smpls) > 0 { - matchedSeries = append(matchedSeries, newSeries( - m.Map(), - smpls, - )) - } - } - expSs := newListSeriesSet(matchedSeries) - - // Compare both SeriesSets. - for { - eok, rok := expSs.Next(), ss.Next() - - // Skip a series if iterator is empty. - if rok { - for !ss.At().Iterator().Next() { - rok = ss.Next() - if !rok { - break - } - } - } - - require.Equal(t, eok, rok, "next") - - if !eok { - break - } - sexp := expSs.At() - sres := ss.At() - - require.Equal(t, sexp.Labels(), sres.Labels(), "labels") - - smplExp, errExp := expandSeriesIterator(sexp.Iterator()) - smplRes, errRes := expandSeriesIterator(sres.Iterator()) - - require.Equal(t, errExp, errRes, "samples error") - require.Equal(t, smplExp, smplRes, "samples") - } - } - } - - return -} - -func TestHBDeleteSimple(t *testing.T) { - numSamples := int64(10) - - dir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(dir) - - hb := createTestHeadBlock(t, dir, 0, numSamples) - app := hb.Appender() - - smpls := make([]float64, numSamples) - for i := int64(0); i < numSamples; i++ { - smpls[i] = rand.Float64() - app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) - } - - require.NoError(t, app.Commit()) - cases := []struct { - intervals Intervals - remaint []int64 - }{ - { - intervals: Intervals{{0, 3}}, - remaint: []int64{4, 5, 6, 7, 8, 9}, - }, - { - intervals: Intervals{{1, 3}}, - remaint: []int64{0, 4, 5, 6, 7, 8, 9}, - }, - { - intervals: Intervals{{1, 3}, {4, 7}}, - remaint: []int64{0, 8, 9}, - }, - { - intervals: Intervals{{1, 3}, {4, 700}}, - remaint: []int64{0}, - }, - { - intervals: Intervals{{0, 9}}, - remaint: []int64{}, - }, - } - -Outer: - for _, c := range cases { - // Reset the tombstones. - hb.tombstones = newEmptyTombstoneReader() - - // Delete the ranges. - for _, r := range c.intervals { - require.NoError(t, hb.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) - } - - // Compare the result. - q := hb.Querier(0, numSamples) - res := q.Select(labels.NewEqualMatcher("a", "b")) - - expSamples := make([]sample, 0, len(c.remaint)) - for _, ts := range c.remaint { - expSamples = append(expSamples, sample{ts, smpls[ts]}) - } - - expss := newListSeriesSet([]Series{ - newSeries(map[string]string{"a": "b"}, expSamples), - }) - - if len(expSamples) == 0 { - require.False(t, res.Next()) - continue - } - - for { - eok, rok := expss.Next(), res.Next() - require.Equal(t, eok, rok, "next") - - if !eok { - continue Outer - } - sexp := expss.At() - sres := res.At() - - require.Equal(t, sexp.Labels(), sres.Labels(), "labels") - - smplExp, errExp := expandSeriesIterator(sexp.Iterator()) - smplRes, errRes := expandSeriesIterator(sres.Iterator()) - - require.Equal(t, errExp, errRes, "samples error") - require.Equal(t, smplExp, smplRes, "samples") - } - } -} - -func TestDeleteUntilCurMax(t *testing.T) { - numSamples := int64(10) - - dir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(dir) - - hb := createTestHeadBlock(t, dir, 0, 2*numSamples) - app := hb.Appender() - - smpls := make([]float64, numSamples) - for i := int64(0); i < numSamples; i++ { - smpls[i] = rand.Float64() - app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) - } - - require.NoError(t, app.Commit()) - require.NoError(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b"))) - app = hb.Appender() - _, err := app.Add(labels.Labels{{"a", "b"}}, 11, 1) - require.NoError(t, err) - require.NoError(t, app.Commit()) - - q := hb.Querier(0, 100000) - res := q.Select(labels.NewEqualMatcher("a", "b")) - - require.True(t, res.Next()) - exps := res.At() - it := exps.Iterator() - ressmpls, err := expandSeriesIterator(it) - require.NoError(t, err) - require.Equal(t, []sample{{11, 1}}, ressmpls) -} - -func TestDelete_e2e(t *testing.T) { - numDatapoints := 1000 - numRanges := 1000 - timeInterval := int64(2) - maxTime := int64(2 * 1000) - minTime := int64(200) - // Create 8 series with 1000 data-points of different ranges, delete and run queries. - lbls := [][]labels.Label{ - { - {"a", "b"}, - {"instance", "localhost:9090"}, - {"job", "prometheus"}, - }, - { - {"a", "b"}, - {"instance", "127.0.0.1:9090"}, - {"job", "prometheus"}, - }, - { - {"a", "b"}, - {"instance", "127.0.0.1:9090"}, - {"job", "prom-k8s"}, - }, - { - {"a", "b"}, - {"instance", "localhost:9090"}, - {"job", "prom-k8s"}, - }, - { - {"a", "c"}, - {"instance", "localhost:9090"}, - {"job", "prometheus"}, - }, - { - {"a", "c"}, - {"instance", "127.0.0.1:9090"}, - {"job", "prometheus"}, - }, - { - {"a", "c"}, - {"instance", "127.0.0.1:9090"}, - {"job", "prom-k8s"}, - }, - { - {"a", "c"}, - {"instance", "localhost:9090"}, - {"job", "prom-k8s"}, - }, - } - - seriesMap := map[string][]sample{} - for _, l := range lbls { - seriesMap[labels.New(l...).String()] = []sample{} - } - - dir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(dir) - - hb := createTestHeadBlock(t, dir, minTime, maxTime) - app := hb.Appender() - - for _, l := range lbls { - ls := labels.New(l...) - series := []sample{} - - ts := rand.Int63n(300) - for i := 0; i < numDatapoints; i++ { - v := rand.Float64() - if ts >= minTime && ts <= maxTime { - series = append(series, sample{ts, v}) - } - - _, err := app.Add(ls, ts, v) - if ts >= minTime && ts <= maxTime { - require.NoError(t, err) - } else { - require.EqualError(t, err, ErrOutOfBounds.Error()) - } - - ts += rand.Int63n(timeInterval) + 1 - } - - seriesMap[labels.New(l...).String()] = series - } - - require.NoError(t, app.Commit()) - - // Delete a time-range from each-selector. - dels := []struct { - ms []labels.Matcher - drange Intervals - }{ - { - ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, - drange: Intervals{{300, 500}, {600, 670}}, - }, - { - ms: []labels.Matcher{ - labels.NewEqualMatcher("a", "b"), - labels.NewEqualMatcher("job", "prom-k8s"), - }, - drange: Intervals{{300, 500}, {100, 670}}, - }, - { - ms: []labels.Matcher{ - labels.NewEqualMatcher("a", "c"), - labels.NewEqualMatcher("instance", "localhost:9090"), - labels.NewEqualMatcher("job", "prometheus"), - }, - drange: Intervals{{300, 400}, {100, 6700}}, - }, - // TODO: Add Regexp Matchers. - } - - for _, del := range dels { - // Reset the deletes everytime. - writeTombstoneFile(hb.dir, newEmptyTombstoneReader()) - hb.tombstones = newEmptyTombstoneReader() - - for _, r := range del.drange { - require.NoError(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) - } - - matched := labels.Slice{} - for _, ls := range lbls { - s := labels.Selector(del.ms) - if s.Matches(ls) { - matched = append(matched, ls) - } - } - - sort.Sort(matched) - - for i := 0; i < numRanges; i++ { - mint := rand.Int63n(200) - maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) - - q := hb.Querier(mint, maxt) - ss := q.Select(del.ms...) - - // Build the mockSeriesSet. - matchedSeries := make([]Series, 0, len(matched)) - for _, m := range matched { - smpls := boundedSamples(seriesMap[m.String()], mint, maxt) - smpls = deletedSamples(smpls, del.drange) - - // Only append those series for which samples exist as mockSeriesSet - // doesn't skip series with no samples. - // TODO: But sometimes SeriesSet returns an empty SeriesIterator - if len(smpls) > 0 { - matchedSeries = append(matchedSeries, newSeries( - m.Map(), - smpls, - )) - } - } - expSs := newListSeriesSet(matchedSeries) - - // Compare both SeriesSets. - for { - eok, rok := expSs.Next(), ss.Next() - - // Skip a series if iterator is empty. - if rok { - for !ss.At().Iterator().Next() { - rok = ss.Next() - if !rok { - break - } - } - } - require.Equal(t, eok, rok, "next") - - if !eok { - break - } - sexp := expSs.At() - sres := ss.At() - - require.Equal(t, sexp.Labels(), sres.Labels(), "labels") - - smplExp, errExp := expandSeriesIterator(sexp.Iterator()) - smplRes, errRes := expandSeriesIterator(sres.Iterator()) - - require.Equal(t, errExp, errRes, "samples error") - require.Equal(t, smplExp, smplRes, "samples") - } - } - } - - return -} +// func TestHBDeleteSimple(t *testing.T) { +// numSamples := int64(10) + +// hb, close := openTestDB(t, nil) +// defer close() + +// app := hb.Appender() + +// smpls := make([]float64, numSamples) +// for i := int64(0); i < numSamples; i++ { +// smpls[i] = rand.Float64() +// app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) +// } + +// require.NoError(t, app.Commit()) +// cases := []struct { +// intervals Intervals +// remaint []int64 +// }{ +// { +// intervals: Intervals{{0, 3}}, +// remaint: []int64{4, 5, 6, 7, 8, 9}, +// }, +// { +// intervals: Intervals{{1, 3}}, +// remaint: []int64{0, 4, 5, 6, 7, 8, 9}, +// }, +// { +// intervals: Intervals{{1, 3}, {4, 7}}, +// remaint: []int64{0, 8, 9}, +// }, +// { +// intervals: Intervals{{1, 3}, {4, 700}}, +// remaint: []int64{0}, +// }, +// { +// intervals: Intervals{{0, 9}}, +// remaint: []int64{}, +// }, +// } + +// Outer: +// for _, c := range cases { +// // Reset the tombstones. +// hb.tombstones = newEmptyTombstoneReader() + +// // Delete the ranges. +// for _, r := range c.intervals { +// require.NoError(t, hb.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) +// } + +// // Compare the result. +// q := hb.Querier(0, numSamples) +// res := q.Select(labels.NewEqualMatcher("a", "b")) + +// expSamples := make([]sample, 0, len(c.remaint)) +// for _, ts := range c.remaint { +// expSamples = append(expSamples, sample{ts, smpls[ts]}) +// } + +// expss := newListSeriesSet([]Series{ +// newSeries(map[string]string{"a": "b"}, expSamples), +// }) + +// if len(expSamples) == 0 { +// require.False(t, res.Next()) +// continue +// } + +// for { +// eok, rok := expss.Next(), res.Next() +// require.Equal(t, eok, rok, "next") + +// if !eok { +// continue Outer +// } +// sexp := expss.At() +// sres := res.At() + +// require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + +// smplExp, errExp := expandSeriesIterator(sexp.Iterator()) +// smplRes, errRes := expandSeriesIterator(sres.Iterator()) + +// require.Equal(t, errExp, errRes, "samples error") +// require.Equal(t, smplExp, smplRes, "samples") +// } +// } +// } + +// func TestDeleteUntilCurMax(t *testing.T) { +// numSamples := int64(10) + +// dir, _ := ioutil.TempDir("", "test") +// defer os.RemoveAll(dir) + +// hb := createTestHead(t, dir, 0, 2*numSamples) +// app := hb.Appender() + +// smpls := make([]float64, numSamples) +// for i := int64(0); i < numSamples; i++ { +// smpls[i] = rand.Float64() +// app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) +// } + +// require.NoError(t, app.Commit()) +// require.NoError(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b"))) +// app = hb.Appender() +// _, err := app.Add(labels.Labels{{"a", "b"}}, 11, 1) +// require.NoError(t, err) +// require.NoError(t, app.Commit()) + +// q := hb.Querier(0, 100000) +// res := q.Select(labels.NewEqualMatcher("a", "b")) + +// require.True(t, res.Next()) +// exps := res.At() +// it := exps.Iterator() +// ressmpls, err := expandSeriesIterator(it) +// require.NoError(t, err) +// require.Equal(t, []sample{{11, 1}}, ressmpls) +// } + +// func TestDelete_e2e(t *testing.T) { +// numDatapoints := 1000 +// numRanges := 1000 +// timeInterval := int64(2) +// maxTime := int64(2 * 1000) +// minTime := int64(200) +// // Create 8 series with 1000 data-points of different ranges, delete and run queries. +// lbls := [][]labels.Label{ +// { +// {"a", "b"}, +// {"instance", "localhost:9090"}, +// {"job", "prometheus"}, +// }, +// { +// {"a", "b"}, +// {"instance", "127.0.0.1:9090"}, +// {"job", "prometheus"}, +// }, +// { +// {"a", "b"}, +// {"instance", "127.0.0.1:9090"}, +// {"job", "prom-k8s"}, +// }, +// { +// {"a", "b"}, +// {"instance", "localhost:9090"}, +// {"job", "prom-k8s"}, +// }, +// { +// {"a", "c"}, +// {"instance", "localhost:9090"}, +// {"job", "prometheus"}, +// }, +// { +// {"a", "c"}, +// {"instance", "127.0.0.1:9090"}, +// {"job", "prometheus"}, +// }, +// { +// {"a", "c"}, +// {"instance", "127.0.0.1:9090"}, +// {"job", "prom-k8s"}, +// }, +// { +// {"a", "c"}, +// {"instance", "localhost:9090"}, +// {"job", "prom-k8s"}, +// }, +// } + +// seriesMap := map[string][]sample{} +// for _, l := range lbls { +// seriesMap[labels.New(l...).String()] = []sample{} +// } + +// dir, _ := ioutil.TempDir("", "test") +// defer os.RemoveAll(dir) + +// hb := createTestHead(t, dir, minTime, maxTime) +// app := hb.Appender() + +// for _, l := range lbls { +// ls := labels.New(l...) +// series := []sample{} + +// ts := rand.Int63n(300) +// for i := 0; i < numDatapoints; i++ { +// v := rand.Float64() +// if ts >= minTime && ts <= maxTime { +// series = append(series, sample{ts, v}) +// } + +// _, err := app.Add(ls, ts, v) +// if ts >= minTime && ts <= maxTime { +// require.NoError(t, err) +// } else { +// require.EqualError(t, err, ErrOutOfBounds.Error()) +// } + +// ts += rand.Int63n(timeInterval) + 1 +// } + +// seriesMap[labels.New(l...).String()] = series +// } + +// require.NoError(t, app.Commit()) + +// // Delete a time-range from each-selector. +// dels := []struct { +// ms []labels.Matcher +// drange Intervals +// }{ +// { +// ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, +// drange: Intervals{{300, 500}, {600, 670}}, +// }, +// { +// ms: []labels.Matcher{ +// labels.NewEqualMatcher("a", "b"), +// labels.NewEqualMatcher("job", "prom-k8s"), +// }, +// drange: Intervals{{300, 500}, {100, 670}}, +// }, +// { +// ms: []labels.Matcher{ +// labels.NewEqualMatcher("a", "c"), +// labels.NewEqualMatcher("instance", "localhost:9090"), +// labels.NewEqualMatcher("job", "prometheus"), +// }, +// drange: Intervals{{300, 400}, {100, 6700}}, +// }, +// // TODO: Add Regexp Matchers. +// } + +// for _, del := range dels { +// // Reset the deletes everytime. +// writeTombstoneFile(hb.dir, newEmptyTombstoneReader()) +// hb.tombstones = newEmptyTombstoneReader() + +// for _, r := range del.drange { +// require.NoError(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) +// } + +// matched := labels.Slice{} +// for _, ls := range lbls { +// s := labels.Selector(del.ms) +// if s.Matches(ls) { +// matched = append(matched, ls) +// } +// } + +// sort.Sort(matched) + +// for i := 0; i < numRanges; i++ { +// mint := rand.Int63n(200) +// maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) + +// q := hb.Querier(mint, maxt) +// ss := q.Select(del.ms...) + +// // Build the mockSeriesSet. +// matchedSeries := make([]Series, 0, len(matched)) +// for _, m := range matched { +// smpls := boundedSamples(seriesMap[m.String()], mint, maxt) +// smpls = deletedSamples(smpls, del.drange) + +// // Only append those series for which samples exist as mockSeriesSet +// // doesn't skip series with no samples. +// // TODO: But sometimes SeriesSet returns an empty SeriesIterator +// if len(smpls) > 0 { +// matchedSeries = append(matchedSeries, newSeries( +// m.Map(), +// smpls, +// )) +// } +// } +// expSs := newListSeriesSet(matchedSeries) + +// // Compare both SeriesSets. +// for { +// eok, rok := expSs.Next(), ss.Next() + +// // Skip a series if iterator is empty. +// if rok { +// for !ss.At().Iterator().Next() { +// rok = ss.Next() +// if !rok { +// break +// } +// } +// } +// require.Equal(t, eok, rok, "next") + +// if !eok { +// break +// } +// sexp := expSs.At() +// sres := ss.At() + +// require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + +// smplExp, errExp := expandSeriesIterator(sexp.Iterator()) +// smplRes, errRes := expandSeriesIterator(sres.Iterator()) + +// require.Equal(t, errExp, errRes, "samples error") +// require.Equal(t, smplExp, smplRes, "samples") +// } +// } +// } + +// return +// } func boundedSamples(full []sample, mint, maxt int64) []sample { for len(full) > 0 { @@ -725,7 +430,6 @@ Outer: continue Outer } } - ds = append(ds, s) } diff --git a/querier.go b/querier.go index 32fdd664d..5461fec89 100644 --- a/querier.go +++ b/querier.go @@ -54,26 +54,6 @@ type querier struct { blocks []Querier } -// Querier returns a new querier over the data partition for the given time range. -// A goroutine must not handle more than one open Querier. -func (s *DB) Querier(mint, maxt int64) Querier { - s.mtx.RLock() - - s.headmtx.RLock() - blocks := s.blocksForInterval(mint, maxt) - s.headmtx.RUnlock() - - sq := &querier{ - blocks: make([]Querier, 0, len(blocks)), - db: s, - } - for _, b := range blocks { - sq.blocks = append(sq.blocks, b.Querier(mint, maxt)) - } - - return sq -} - func (q *querier) LabelValues(n string) ([]string, error) { return q.lvals(q.blocks, n) } @@ -700,6 +680,7 @@ type chunkSeriesIterator struct { func newChunkSeriesIterator(cs []ChunkMeta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator { it := cs[0].Chunk.Iterator() + if len(dranges) > 0 { it = &deletedIterator{it: it, intervals: dranges} } @@ -750,19 +731,22 @@ func (it *chunkSeriesIterator) At() (t int64, v float64) { } func (it *chunkSeriesIterator) Next() bool { - for it.cur.Next() { + if it.cur.Next() { t, _ := it.cur.At() - if t < it.mint { - return it.Seek(it.mint) - } + if t < it.mint { + if !it.Seek(it.mint) { + return false + } + t, _ = it.At() + + return t <= it.maxt + } if t > it.maxt { return false } - return true } - if err := it.cur.Err(); err != nil { return false } diff --git a/querier_test.go b/querier_test.go index 1091f4007..12ff22013 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1138,6 +1138,17 @@ func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { require.Equal(t, float64(6), v) } +// Regression when calling Next() with a time bounded to fit within two samples. +// Seek gets called and advances beyond the max time, which was just accepted as a valid sample. +func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) { + metas := []ChunkMeta{ + chunkFromSamples([]sample{{1, 6}, {5, 6}, {7, 8}}), + } + + it := newChunkSeriesIterator(metas, nil, 2, 4) + require.False(t, it.Next()) +} + func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { lbls := []labels.Labels{labels.New(labels.Label{"a", "b"})} chunkMetas := [][]ChunkMeta{ diff --git a/wal.go b/wal.go index 773bf51ee..2ee889050 100644 --- a/wal.go +++ b/wal.go @@ -84,9 +84,19 @@ type WAL interface { LogSeries([]labels.Labels) error LogSamples([]RefSample) error LogDeletes([]Stone) error + Truncate(maxt int64) error Close() error } +type NopWAL struct{} + +func (NopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil } +func (w NopWAL) Reader() WALReader { return w } +func (NopWAL) LogSeries([]labels.Labels) error { return nil } +func (NopWAL) LogSamples([]RefSample) error { return nil } +func (NopWAL) LogDeletes([]Stone) error { return nil } +func (NopWAL) Close() error { return nil } + // WALReader reads entries from a WAL. type WALReader interface { Read(SeriesCB, SamplesCB, DeletesCB) error @@ -319,6 +329,10 @@ func (w *SegmentWAL) Sync() error { return fileutil.Fdatasync(tail) } +func (w *SegmentWAL) Truncate(maxt int64) error { + return nil +} + func (w *SegmentWAL) sync() error { if err := w.flush(); err != nil { return err @@ -360,9 +374,8 @@ func (w *SegmentWAL) Close() error { close(w.stopc) <-w.donec - // Lock mutex and leave it locked so we panic if there's a bug causing - // the block to be used afterwards. w.mtx.Lock() + defer w.mtx.Unlock() if err := w.sync(); err != nil { return err