Make HeadBlock impl public, make interface private

This commit is contained in:
Fabian Reinartz 2017-05-12 16:34:41 +02:00
parent 2fa647f50b
commit 5534e6c53c
4 changed files with 42 additions and 41 deletions

View file

@ -48,8 +48,8 @@ type Block interface {
Queryable Queryable
} }
// HeadBlock is a regular block that can still be appended to. // headBlock is a regular block that can still be appended to.
type HeadBlock interface { type headBlock interface {
Block Block
Appendable Appendable
} }

18
db.go
View file

@ -118,7 +118,7 @@ type DB struct {
// or the general layout. // or the general layout.
// Must never be held when acquiring a blocks's mutex! // Must never be held when acquiring a blocks's mutex!
headmtx sync.RWMutex headmtx sync.RWMutex
heads []HeadBlock heads []headBlock
compactor Compactor compactor Compactor
@ -401,7 +401,7 @@ func (db *DB) reloadBlocks() error {
var ( var (
metas []*BlockMeta metas []*BlockMeta
blocks []Block blocks []Block
heads []HeadBlock heads []headBlock
seqBlocks = make(map[int]Block, len(dirs)) seqBlocks = make(map[int]Block, len(dirs))
) )
@ -418,7 +418,7 @@ func (db *DB) reloadBlocks() error {
if meta.Compaction.Generation == 0 { if meta.Compaction.Generation == 0 {
if !ok { if !ok {
b, err = openHeadBlock(dirs[i], db.logger) b, err = OpenHeadBlock(dirs[i], db.logger)
if err != nil { if err != nil {
return errors.Wrapf(err, "load head at %s", dirs[i]) return errors.Wrapf(err, "load head at %s", dirs[i])
} }
@ -426,7 +426,7 @@ func (db *DB) reloadBlocks() error {
if meta.ULID != b.Meta().ULID { if meta.ULID != b.Meta().ULID {
return errors.Errorf("head block ULID changed unexpectedly") return errors.Errorf("head block ULID changed unexpectedly")
} }
heads = append(heads, b.(HeadBlock)) heads = append(heads, b.(headBlock))
} else { } else {
if !ok || meta.ULID != b.Meta().ULID { if !ok || meta.ULID != b.Meta().ULID {
b, err = newPersistedBlock(dirs[i]) b, err = newPersistedBlock(dirs[i])
@ -559,7 +559,7 @@ func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) {
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
a.db.headmtx.Lock() a.db.headmtx.Lock()
var newHeads []HeadBlock var newHeads []headBlock
if err := a.db.ensureHead(t); err != nil { if err := a.db.ensureHead(t); err != nil {
a.db.headmtx.Unlock() a.db.headmtx.Unlock()
@ -670,9 +670,9 @@ func (a *dbAppender) Rollback() error {
} }
// appendable returns a copy of a slice of HeadBlocks that can still be appended to. // appendable returns a copy of a slice of HeadBlocks that can still be appended to.
func (db *DB) appendable() []HeadBlock { func (db *DB) appendable() []headBlock {
var i int var i int
app := make([]HeadBlock, 0, db.opts.AppendableBlocks) app := make([]headBlock, 0, db.opts.AppendableBlocks)
if len(db.heads) > db.opts.AppendableBlocks { if len(db.heads) > db.opts.AppendableBlocks {
i = len(db.heads) - db.opts.AppendableBlocks i = len(db.heads) - db.opts.AppendableBlocks
@ -711,14 +711,14 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
// cut starts a new head block to append to. The completed head block // cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period. // will still be appendable for the configured grace period.
func (db *DB) cut(mint int64) (HeadBlock, error) { func (db *DB) cut(mint int64) (headBlock, error) {
maxt := mint + int64(db.opts.MinBlockDuration) maxt := mint + int64(db.opts.MinBlockDuration)
dir, seq, err := nextSequenceFile(db.dir, "b-") dir, seq, err := nextSequenceFile(db.dir, "b-")
if err != nil { if err != nil {
return nil, err return nil, err
} }
newHead, err := createHeadBlock(dir, seq, db.logger, mint, maxt) newHead, err := CreateHeadBlock(dir, seq, db.logger, mint, maxt)
if err != nil { if err != nil {
return nil, err return nil, err
} }

49
head.go
View file

@ -47,8 +47,8 @@ var (
ErrOutOfBounds = errors.New("out of bounds") ErrOutOfBounds = errors.New("out of bounds")
) )
// headBlock handles reads and writes of time series data within a time window. // HeadBlock handles reads and writes of time series data within a time window.
type headBlock struct { type HeadBlock struct {
mtx sync.RWMutex mtx sync.RWMutex
dir string dir string
wal *WAL wal *WAL
@ -69,7 +69,8 @@ type headBlock struct {
meta BlockMeta meta BlockMeta
} }
func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { // CreateHeadBlock creates a new head block in dir that holds samples in the range [mint,maxt).
func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*HeadBlock, error) {
// Make head block creation appear atomic. // Make head block creation appear atomic.
tmp := dir + ".tmp" tmp := dir + ".tmp"
@ -95,11 +96,11 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head
if err := renameFile(tmp, dir); err != nil { if err := renameFile(tmp, dir); err != nil {
return nil, err return nil, err
} }
return openHeadBlock(dir, l) return OpenHeadBlock(dir, l)
} }
// openHeadBlock creates a new empty head block. // OpenHeadBlock opens the head block in dir.
func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
wal, err := OpenWAL(dir, log.With(l, "component", "wal"), 5*time.Second) wal, err := OpenWAL(dir, log.With(l, "component", "wal"), 5*time.Second)
if err != nil { if err != nil {
return nil, err return nil, err
@ -109,7 +110,7 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
return nil, err return nil, err
} }
h := &headBlock{ h := &HeadBlock{
dir: dir, dir: dir,
wal: wal, wal: wal,
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
@ -151,16 +152,16 @@ Outer:
// inBounds returns true if the given timestamp is within the valid // inBounds returns true if the given timestamp is within the valid
// time bounds of the block. // time bounds of the block.
func (h *headBlock) inBounds(t int64) bool { func (h *HeadBlock) inBounds(t int64) bool {
return t >= h.meta.MinTime && t <= h.meta.MaxTime return t >= h.meta.MinTime && t <= h.meta.MaxTime
} }
func (h *headBlock) String() string { func (h *HeadBlock) String() string {
return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID) return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID)
} }
// Close syncs all data and closes underlying resources of the head block. // Close syncs all data and closes underlying resources of the head block.
func (h *headBlock) Close() error { func (h *HeadBlock) Close() error {
h.mtx.Lock() h.mtx.Lock()
defer h.mtx.Unlock() defer h.mtx.Unlock()
@ -184,7 +185,7 @@ func (h *headBlock) Close() error {
return nil return nil
} }
func (h *headBlock) Meta() BlockMeta { func (h *HeadBlock) Meta() BlockMeta {
m := BlockMeta{ m := BlockMeta{
ULID: h.meta.ULID, ULID: h.meta.ULID,
Sequence: h.meta.Sequence, Sequence: h.meta.Sequence,
@ -200,12 +201,12 @@ func (h *headBlock) Meta() BlockMeta {
return m return m
} }
func (h *headBlock) Dir() string { return h.dir } func (h *HeadBlock) Dir() string { return h.dir }
func (h *headBlock) Persisted() bool { return false } func (h *HeadBlock) Persisted() bool { return false }
func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
func (h *headBlock) Querier(mint, maxt int64) Querier { func (h *HeadBlock) Querier(mint, maxt int64) Querier {
h.mtx.RLock() h.mtx.RLock()
defer h.mtx.RUnlock() defer h.mtx.RUnlock()
@ -244,7 +245,7 @@ func (h *headBlock) Querier(mint, maxt int64) Querier {
} }
} }
func (h *headBlock) Appender() Appender { func (h *HeadBlock) Appender() Appender {
atomic.AddUint64(&h.activeWriters, 1) atomic.AddUint64(&h.activeWriters, 1)
h.mtx.RLock() h.mtx.RLock()
@ -252,10 +253,10 @@ func (h *headBlock) Appender() Appender {
if h.closed { if h.closed {
panic(fmt.Sprintf("block %s already closed", h.dir)) panic(fmt.Sprintf("block %s already closed", h.dir))
} }
return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()}
} }
func (h *headBlock) Busy() bool { func (h *HeadBlock) Busy() bool {
return atomic.LoadUint64(&h.activeWriters) > 0 return atomic.LoadUint64(&h.activeWriters) > 0
} }
@ -274,7 +275,7 @@ func putHeadAppendBuffer(b []refdSample) {
} }
type headAppender struct { type headAppender struct {
*headBlock *HeadBlock
newSeries map[uint64]hashedLabels newSeries map[uint64]hashedLabels
newHashes map[uint64]uint64 newHashes map[uint64]uint64
@ -454,7 +455,7 @@ func (a *headAppender) Rollback() error {
} }
type headChunkReader struct { type headChunkReader struct {
*headBlock *HeadBlock
} }
// Chunk returns the chunk for the reference number. // Chunk returns the chunk for the reference number.
@ -490,7 +491,7 @@ func (c *safeChunk) Iterator() chunks.Iterator {
// func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } // func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") }
type headIndexReader struct { type headIndexReader struct {
*headBlock *HeadBlock
} }
// LabelValues returns the possible label values // LabelValues returns the possible label values
@ -558,7 +559,7 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) {
// get retrieves the chunk with the hash and label set and creates // get retrieves the chunk with the hash and label set and creates
// a new one if it doesn't exist yet. // a new one if it doesn't exist yet.
func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { func (h *HeadBlock) get(hash uint64, lset labels.Labels) *memSeries {
series := h.hashes[hash] series := h.hashes[hash]
for _, s := range series { for _, s := range series {
@ -569,7 +570,7 @@ func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries {
return nil return nil
} }
func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries {
s := &memSeries{ s := &memSeries{
lset: lset, lset: lset,
ref: uint32(len(h.series)), ref: uint32(len(h.series)),

View file

@ -39,7 +39,7 @@ func BenchmarkCreateSeries(b *testing.B) {
require.NoError(b, err) require.NoError(b, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
h, err := createHeadBlock(dir, 0, nil, 0, 1) h, err := CreateHeadBlock(dir, 0, nil, 0, 1)
require.NoError(b, err) require.NoError(b, err)
b.ReportAllocs() b.ReportAllocs()
@ -93,7 +93,7 @@ func TestAmendDatapointCausesError(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test") tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
require.NoError(t, err, "Error creating head block") require.NoError(t, err, "Error creating head block")
app := hb.Appender() app := hb.Appender()
@ -110,7 +110,7 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test") tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
require.NoError(t, err, "Error creating head block") require.NoError(t, err, "Error creating head block")
app := hb.Appender() app := hb.Appender()
@ -127,7 +127,7 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test") tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
require.NoError(t, err, "Error creating head block") require.NoError(t, err, "Error creating head block")
app := hb.Appender() app := hb.Appender()
@ -144,7 +144,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test") tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
require.NoError(t, err) require.NoError(t, err)
// Append AmendedValue. // Append AmendedValue.
@ -246,7 +246,7 @@ func TestHeadBlock_e2e(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test") tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, minTime, maxTime) hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, minTime, maxTime)
require.NoError(t, err) require.NoError(t, err)
app := hb.Appender() app := hb.Appender()