Add composed Block interfaces, remove head generation

This adds more lower-leve interfaces which are used to compose
to different Block interfaces.
The DB only uses interfaces instead of explicit persistedBlock and
headBlock. The headBlock generation property is dropped as the use-case
can be implemented using block sequence numbers.
This commit is contained in:
Fabian Reinartz 2017-03-20 08:41:56 +01:00
parent 303a4ec3bc
commit 11be2cc585
4 changed files with 109 additions and 70 deletions

View file

@ -11,8 +11,8 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// Block handles reads against a Block of time series data. // DiskBlock handles reads against a Block of time series data.
type Block interface { type DiskBlock interface {
// Directory where block data is stored. // Directory where block data is stored.
Dir() string Dir() string
@ -29,6 +29,32 @@ type Block interface {
Close() error Close() error
} }
// Block is an interface to a DiskBlock that can also be queried.
type Block interface {
DiskBlock
// Queryable
}
// HeadBlock is a regular block that can still be appended to.
type HeadBlock interface {
DiskBlock
Appendable
}
// Appendable defines an entity to which data can be appended.
type Appendable interface {
// Appender returns a new Appender against an underlying store.
Appender() Appender
// Busy returns whether there are any currently active appenders.
Busy() bool
}
// Queryable defines an entity which provides a Querier.
type Queryable interface {
Queryable() Querier
}
// BlockMeta provides meta information about a block. // BlockMeta provides meta information about a block.
type BlockMeta struct { type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction. // Unique identifier for the block and its contents. Changes on compaction.

View file

@ -15,7 +15,7 @@ import (
) )
const ( const (
// MagicSeries 4 bytes at the head of series file. // MagicChunks 4 bytes at the head of series file.
MagicChunks = 0x85BD40DD MagicChunks = 0x85BD40DD
) )

130
db.go
View file

@ -11,7 +11,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"unsafe" "unsafe"
@ -95,14 +94,13 @@ type DB struct {
// Mutex for that must be held when modifying the general // Mutex for that must be held when modifying the general
// block layout. // block layout.
mtx sync.RWMutex mtx sync.RWMutex
persisted []*persistedBlock blocks []Block
seqBlocks map[int]Block // seqBlocks map[int]Block
// Mutex that must be held when modifying just the head blocks // Mutex that must be held when modifying just the head blocks
// or the general layout. // or the general layout.
headmtx sync.RWMutex headmtx sync.RWMutex
heads []*headBlock heads []HeadBlock
headGen uint8
compactor Compactor compactor Compactor
@ -237,11 +235,11 @@ func (db *DB) retentionCutoff() (bool, error) {
// We don't count the span covered by head blocks towards the // We don't count the span covered by head blocks towards the
// retention time as it generally makes up a fraction of it. // retention time as it generally makes up a fraction of it.
if len(db.persisted) == 0 { if len(db.blocks)-len(db.heads) == 0 {
return false, nil return false, nil
} }
last := db.persisted[len(db.persisted)-1] last := db.blocks[len(db.blocks)-len(db.heads)-1]
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
return retentionCutoff(db.dir, mint) return retentionCutoff(db.dir, mint)
@ -252,7 +250,7 @@ func (db *DB) compact() (changes bool, err error) {
// Check whether we have pending head blocks that are ready to be persisted. // Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority. // They have the highest priority.
var singles []*headBlock var singles []Block
// Collect head blocks that are ready for compaction. Write them after // Collect head blocks that are ready for compaction. Write them after
// returning the lock to not block Appenders. // returning the lock to not block Appenders.
@ -263,7 +261,7 @@ func (db *DB) compact() (changes bool, err error) {
// Blocks that won't be appendable when instantiating a new appender // Blocks that won't be appendable when instantiating a new appender
// might still have active appenders on them. // might still have active appenders on them.
// Abort at the first one we encounter. // Abort at the first one we encounter.
if atomic.LoadUint64(&h.activeWriters) > 0 { if h.Busy() {
break break
} }
singles = append(singles, h) singles = append(singles, h)
@ -355,6 +353,15 @@ func retentionCutoff(dir string, mint int64) (bool, error) {
return changes, fileutil.Fsync(df) return changes, fileutil.Fsync(df)
} }
func (db *DB) seqBlock(i int) (Block, bool) {
for _, b := range db.blocks {
if b.Meta().Sequence == i {
return b, true
}
}
return nil, false
}
func (db *DB) reloadBlocks() error { func (db *DB) reloadBlocks() error {
var cs []io.Closer var cs []io.Closer
defer closeAll(cs...) defer closeAll(cs...)
@ -371,8 +378,8 @@ func (db *DB) reloadBlocks() error {
} }
var ( var (
metas []*BlockMeta metas []*BlockMeta
persisted []*persistedBlock blocks []Block
heads []*headBlock heads []HeadBlock
seqBlocks = make(map[int]Block, len(dirs)) seqBlocks = make(map[int]Block, len(dirs))
) )
@ -385,7 +392,7 @@ func (db *DB) reloadBlocks() error {
} }
for i, meta := range metas { for i, meta := range metas {
b, ok := db.seqBlocks[meta.Sequence] b, ok := db.seqBlock(meta.Sequence)
if meta.Compaction.Generation == 0 { if meta.Compaction.Generation == 0 {
if !ok { if !ok {
@ -397,7 +404,7 @@ func (db *DB) reloadBlocks() error {
if meta.ULID != b.Meta().ULID { if meta.ULID != b.Meta().ULID {
return errors.Errorf("head block ULID changed unexpectedly") return errors.Errorf("head block ULID changed unexpectedly")
} }
heads = append(heads, b.(*headBlock)) heads = append(heads, b.(HeadBlock))
} else { } else {
if !ok || meta.ULID != b.Meta().ULID { if !ok || meta.ULID != b.Meta().ULID {
b, err = newPersistedBlock(dirs[i]) b, err = newPersistedBlock(dirs[i])
@ -405,22 +412,21 @@ func (db *DB) reloadBlocks() error {
return errors.Wrapf(err, "open persisted block %s", dirs[i]) return errors.Wrapf(err, "open persisted block %s", dirs[i])
} }
} }
persisted = append(persisted, b.(*persistedBlock))
} }
seqBlocks[meta.Sequence] = b seqBlocks[meta.Sequence] = b
blocks = append(blocks, b)
} }
// Close all blocks that we no longer need. They are closed after returning all // Close all blocks that we no longer need. They are closed after returning all
// locks to avoid questionable locking order. // locks to avoid questionable locking order.
for seq, b := range db.seqBlocks { for _, b := range db.blocks {
if nb, ok := seqBlocks[seq]; !ok || nb != b { if nb := seqBlocks[b.Meta().Sequence]; nb != b {
cs = append(cs, b) cs = append(cs, b)
} }
} }
db.seqBlocks = seqBlocks db.blocks = blocks
db.persisted = persisted
db.heads = heads db.heads = heads
return nil return nil
@ -436,12 +442,10 @@ func (db *DB) Close() error {
var g errgroup.Group var g errgroup.Group
for _, pb := range db.persisted { // blocks also contains all head blocks.
for _, pb := range db.blocks {
g.Go(pb.Close) g.Go(pb.Close)
} }
for _, hb := range db.heads {
g.Go(hb.Close)
}
var merr MultiError var merr MultiError
@ -459,15 +463,14 @@ func (db *DB) Appender() Appender {
// Only instantiate appender after returning the headmtx to avoid // Only instantiate appender after returning the headmtx to avoid
// questionable locking order. // questionable locking order.
db.headmtx.RLock() db.headmtx.RLock()
app := db.appendable() app := db.appendable()
heads := make([]*headBlock, len(app))
copy(heads, app)
db.headmtx.RUnlock() db.headmtx.RUnlock()
for _, b := range heads { for _, b := range app {
a.heads = append(a.heads, b.Appender().(*headAppender)) a.heads = append(a.heads, &metaAppender{
meta: b.Meta(),
app: b.Appender().(*headAppender),
})
} }
return a return a
@ -475,38 +478,44 @@ func (db *DB) Appender() Appender {
type dbAppender struct { type dbAppender struct {
db *DB db *DB
heads []*headAppender heads []*metaAppender
samples int samples int
} }
type metaAppender struct {
meta BlockMeta
app Appender
}
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
h, err := a.appenderFor(t) h, err := a.appenderFor(t)
if err != nil { if err != nil {
return 0, err return 0, err
} }
ref, err := h.Add(lset, t, v) ref, err := h.app.Add(lset, t, v)
if err != nil { if err != nil {
return 0, err return 0, err
} }
a.samples++ a.samples++
return ref | (uint64(h.generation) << 40), nil // Store last byte of sequence number in 3rd byte of refernece.
return ref | (uint64(h.meta.Sequence^0xff) << 40), nil
} }
func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
// We store the head generation in the 4th byte and use it to reject // Load the head last byte of the head sequence from the 3rd byte of the
// stale references. // reference number.
gen := uint8((ref << 16) >> 56) gen := (ref << 16) >> 56
h, err := a.appenderFor(t) h, err := a.appenderFor(t)
if err != nil { if err != nil {
return err return err
} }
// If the reference pointed into a previous block, we cannot // If the last byte of the sequence does not add up, the reference is not valid.
// use it to append the sample. if uint64(h.meta.Sequence^0xff) != gen {
if h.generation != gen {
return ErrNotFound return ErrNotFound
} }
if err := h.AddFast(ref, t, v); err != nil { if err := h.app.AddFast(ref, t, v); err != nil {
return err return err
} }
@ -516,12 +525,12 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
// appenderFor gets the appender for the head containing timestamp t. // appenderFor gets the appender for the head containing timestamp t.
// If the head block doesn't exist yet, it gets created. // If the head block doesn't exist yet, it gets created.
func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) {
// If there's no fitting head block for t, ensure it gets created. // If there's no fitting head block for t, ensure it gets created.
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
a.db.headmtx.Lock() a.db.headmtx.Lock()
var newHeads []*headBlock var newHeads []HeadBlock
if err := a.db.ensureHead(t); err != nil { if err := a.db.ensureHead(t); err != nil {
a.db.headmtx.Unlock() a.db.headmtx.Unlock()
@ -532,7 +541,7 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
} else { } else {
maxSeq := a.heads[len(a.heads)-1].meta.Sequence maxSeq := a.heads[len(a.heads)-1].meta.Sequence
for _, b := range a.db.appendable() { for _, b := range a.db.appendable() {
if b.meta.Sequence > maxSeq { if b.Meta().Sequence > maxSeq {
newHeads = append(newHeads, b) newHeads = append(newHeads, b)
} }
} }
@ -543,7 +552,10 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
// Instantiate appenders after returning headmtx to avoid questionable // Instantiate appenders after returning headmtx to avoid questionable
// locking order. // locking order.
for _, b := range newHeads { for _, b := range newHeads {
a.heads = append(a.heads, b.Appender().(*headAppender)) a.heads = append(a.heads, &metaAppender{
app: b.Appender(),
meta: b.Meta(),
})
} }
} }
for i := len(a.heads) - 1; i >= 0; i-- { for i := len(a.heads) - 1; i >= 0; i-- {
@ -570,11 +582,12 @@ func (db *DB) ensureHead(t int64) error {
for { for {
h := db.heads[len(db.heads)-1] h := db.heads[len(db.heads)-1]
m := h.Meta()
// If t doesn't exceed the range of heads blocks, there's nothing to do. // If t doesn't exceed the range of heads blocks, there's nothing to do.
if t < h.meta.MaxTime { if t < m.MaxTime {
return nil return nil
} }
if _, err := db.cut(h.meta.MaxTime); err != nil { if _, err := db.cut(m.MaxTime); err != nil {
return err return err
} }
} }
@ -584,7 +597,7 @@ func (a *dbAppender) Commit() error {
var merr MultiError var merr MultiError
for _, h := range a.heads { for _, h := range a.heads {
merr.Add(h.Commit()) merr.Add(h.app.Commit())
} }
a.db.mtx.RUnlock() a.db.mtx.RUnlock()
@ -598,18 +611,22 @@ func (a *dbAppender) Rollback() error {
var merr MultiError var merr MultiError
for _, h := range a.heads { for _, h := range a.heads {
merr.Add(h.Rollback()) merr.Add(h.app.Rollback())
} }
a.db.mtx.RUnlock() a.db.mtx.RUnlock()
return merr.Err() return merr.Err()
} }
func (db *DB) appendable() []*headBlock { // appendable returns a copy of a slice of HeadBlocks that can still be appended to.
if len(db.heads) <= db.opts.AppendableBlocks { func (db *DB) appendable() []HeadBlock {
return db.heads var i int
app := make([]HeadBlock, 0, db.opts.AppendableBlocks)
if len(db.heads) > db.opts.AppendableBlocks {
i = len(db.heads) - db.opts.AppendableBlocks
} }
return db.heads[len(db.heads)-db.opts.AppendableBlocks:] return append(app, db.heads[i:]...)
} }
func intervalOverlap(amin, amax, bmin, bmax int64) bool { func intervalOverlap(amin, amax, bmin, bmax int64) bool {
@ -631,13 +648,7 @@ func intervalContains(min, max, t int64) bool {
func (db *DB) blocksForInterval(mint, maxt int64) []Block { func (db *DB) blocksForInterval(mint, maxt int64) []Block {
var bs []Block var bs []Block
for _, b := range db.persisted { for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
bs = append(bs, b)
}
}
for _, b := range db.heads {
m := b.Meta() m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
bs = append(bs, b) bs = append(bs, b)
@ -661,11 +672,8 @@ func (db *DB) cut(mint int64) (*headBlock, error) {
return nil, err return nil, err
} }
db.blocks = append(db.blocks, newHead)
db.heads = append(db.heads, newHead) db.heads = append(db.heads, newHead)
db.seqBlocks[seq] = newHead
db.headGen++
newHead.generation = db.headGen
select { select {
case db.compactc <- struct{}{}: case db.compactc <- struct{}{}:

View file

@ -38,7 +38,6 @@ var (
type headBlock struct { type headBlock struct {
mtx sync.RWMutex mtx sync.RWMutex
dir string dir string
generation uint8
wal *WAL wal *WAL
activeWriters uint64 activeWriters uint64
@ -184,6 +183,10 @@ func (h *headBlock) Appender() Appender {
return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} return &headAppender{headBlock: h, samples: getHeadAppendBuffer()}
} }
func (h *headBlock) Busy() bool {
return atomic.LoadUint64(&h.activeWriters) > 0
}
var headPool = sync.Pool{} var headPool = sync.Pool{}
func getHeadAppendBuffer() []refdSample { func getHeadAppendBuffer() []refdSample {
@ -265,6 +268,8 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
// sample sequence is valid. // sample sequence is valid.
// We also have to revalidate it as we switch locks an create // We also have to revalidate it as we switch locks an create
// the new series. // the new series.
} else if ref > uint64(len(a.series)) {
return ErrNotFound
} else { } else {
ms := a.series[int(ref)] ms := a.series[int(ref)]
if ms == nil { if ms == nil {