From 5534e6c53c3b7fb408d4e3e0f0af1c02582fe6a3 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 12 May 2017 16:34:41 +0200 Subject: [PATCH] Make HeadBlock impl public, make interface private --- block.go | 4 ++-- db.go | 18 +++++++++--------- head.go | 49 +++++++++++++++++++++++++------------------------ head_test.go | 12 ++++++------ 4 files changed, 42 insertions(+), 41 deletions(-) diff --git a/block.go b/block.go index e1ccfd40d3..72ebb1f8a8 100644 --- a/block.go +++ b/block.go @@ -48,8 +48,8 @@ type Block interface { Queryable } -// HeadBlock is a regular block that can still be appended to. -type HeadBlock interface { +// headBlock is a regular block that can still be appended to. +type headBlock interface { Block Appendable } diff --git a/db.go b/db.go index 864b9cfbdc..51c49965c8 100644 --- a/db.go +++ b/db.go @@ -118,7 +118,7 @@ type DB struct { // or the general layout. // Must never be held when acquiring a blocks's mutex! headmtx sync.RWMutex - heads []HeadBlock + heads []headBlock compactor Compactor @@ -401,7 +401,7 @@ func (db *DB) reloadBlocks() error { var ( metas []*BlockMeta blocks []Block - heads []HeadBlock + heads []headBlock seqBlocks = make(map[int]Block, len(dirs)) ) @@ -418,7 +418,7 @@ func (db *DB) reloadBlocks() error { if meta.Compaction.Generation == 0 { if !ok { - b, err = openHeadBlock(dirs[i], db.logger) + b, err = OpenHeadBlock(dirs[i], db.logger) if err != nil { return errors.Wrapf(err, "load head at %s", dirs[i]) } @@ -426,7 +426,7 @@ func (db *DB) reloadBlocks() error { if meta.ULID != b.Meta().ULID { return errors.Errorf("head block ULID changed unexpectedly") } - heads = append(heads, b.(HeadBlock)) + heads = append(heads, b.(headBlock)) } else { if !ok || meta.ULID != b.Meta().ULID { b, err = newPersistedBlock(dirs[i]) @@ -559,7 +559,7 @@ func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) { if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { a.db.headmtx.Lock() - var newHeads []HeadBlock + var newHeads []headBlock if err := a.db.ensureHead(t); err != nil { a.db.headmtx.Unlock() @@ -670,9 +670,9 @@ func (a *dbAppender) Rollback() error { } // appendable returns a copy of a slice of HeadBlocks that can still be appended to. -func (db *DB) appendable() []HeadBlock { +func (db *DB) appendable() []headBlock { var i int - app := make([]HeadBlock, 0, db.opts.AppendableBlocks) + app := make([]headBlock, 0, db.opts.AppendableBlocks) if len(db.heads) > db.opts.AppendableBlocks { i = len(db.heads) - db.opts.AppendableBlocks @@ -711,14 +711,14 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (db *DB) cut(mint int64) (HeadBlock, error) { +func (db *DB) cut(mint int64) (headBlock, error) { maxt := mint + int64(db.opts.MinBlockDuration) dir, seq, err := nextSequenceFile(db.dir, "b-") if err != nil { return nil, err } - newHead, err := createHeadBlock(dir, seq, db.logger, mint, maxt) + newHead, err := CreateHeadBlock(dir, seq, db.logger, mint, maxt) if err != nil { return nil, err } diff --git a/head.go b/head.go index 7d19011dd3..b39385fb64 100644 --- a/head.go +++ b/head.go @@ -47,8 +47,8 @@ var ( ErrOutOfBounds = errors.New("out of bounds") ) -// headBlock handles reads and writes of time series data within a time window. -type headBlock struct { +// HeadBlock handles reads and writes of time series data within a time window. +type HeadBlock struct { mtx sync.RWMutex dir string wal *WAL @@ -69,7 +69,8 @@ type headBlock struct { meta BlockMeta } -func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { +// CreateHeadBlock creates a new head block in dir that holds samples in the range [mint,maxt). +func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*HeadBlock, error) { // Make head block creation appear atomic. tmp := dir + ".tmp" @@ -95,11 +96,11 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head if err := renameFile(tmp, dir); err != nil { return nil, err } - return openHeadBlock(dir, l) + return OpenHeadBlock(dir, l) } -// openHeadBlock creates a new empty head block. -func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { +// OpenHeadBlock opens the head block in dir. +func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { wal, err := OpenWAL(dir, log.With(l, "component", "wal"), 5*time.Second) if err != nil { return nil, err @@ -109,7 +110,7 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { return nil, err } - h := &headBlock{ + h := &HeadBlock{ dir: dir, wal: wal, series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. @@ -151,16 +152,16 @@ Outer: // inBounds returns true if the given timestamp is within the valid // time bounds of the block. -func (h *headBlock) inBounds(t int64) bool { +func (h *HeadBlock) inBounds(t int64) bool { return t >= h.meta.MinTime && t <= h.meta.MaxTime } -func (h *headBlock) String() string { +func (h *HeadBlock) String() string { return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID) } // Close syncs all data and closes underlying resources of the head block. -func (h *headBlock) Close() error { +func (h *HeadBlock) Close() error { h.mtx.Lock() defer h.mtx.Unlock() @@ -184,7 +185,7 @@ func (h *headBlock) Close() error { return nil } -func (h *headBlock) Meta() BlockMeta { +func (h *HeadBlock) Meta() BlockMeta { m := BlockMeta{ ULID: h.meta.ULID, Sequence: h.meta.Sequence, @@ -200,12 +201,12 @@ func (h *headBlock) Meta() BlockMeta { return m } -func (h *headBlock) Dir() string { return h.dir } -func (h *headBlock) Persisted() bool { return false } -func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } -func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } +func (h *HeadBlock) Dir() string { return h.dir } +func (h *HeadBlock) Persisted() bool { return false } +func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } +func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } -func (h *headBlock) Querier(mint, maxt int64) Querier { +func (h *HeadBlock) Querier(mint, maxt int64) Querier { h.mtx.RLock() defer h.mtx.RUnlock() @@ -244,7 +245,7 @@ func (h *headBlock) Querier(mint, maxt int64) Querier { } } -func (h *headBlock) Appender() Appender { +func (h *HeadBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) h.mtx.RLock() @@ -252,10 +253,10 @@ func (h *headBlock) Appender() Appender { if h.closed { panic(fmt.Sprintf("block %s already closed", h.dir)) } - return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} + return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()} } -func (h *headBlock) Busy() bool { +func (h *HeadBlock) Busy() bool { return atomic.LoadUint64(&h.activeWriters) > 0 } @@ -274,7 +275,7 @@ func putHeadAppendBuffer(b []refdSample) { } type headAppender struct { - *headBlock + *HeadBlock newSeries map[uint64]hashedLabels newHashes map[uint64]uint64 @@ -454,7 +455,7 @@ func (a *headAppender) Rollback() error { } type headChunkReader struct { - *headBlock + *HeadBlock } // Chunk returns the chunk for the reference number. @@ -490,7 +491,7 @@ func (c *safeChunk) Iterator() chunks.Iterator { // func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } type headIndexReader struct { - *headBlock + *HeadBlock } // LabelValues returns the possible label values @@ -558,7 +559,7 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) { // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. -func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { +func (h *HeadBlock) get(hash uint64, lset labels.Labels) *memSeries { series := h.hashes[hash] for _, s := range series { @@ -569,7 +570,7 @@ func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { return nil } -func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { +func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries { s := &memSeries{ lset: lset, ref: uint32(len(h.series)), diff --git a/head_test.go b/head_test.go index f7d9b19f8c..45b4f49d0f 100644 --- a/head_test.go +++ b/head_test.go @@ -39,7 +39,7 @@ func BenchmarkCreateSeries(b *testing.B) { require.NoError(b, err) defer os.RemoveAll(dir) - h, err := createHeadBlock(dir, 0, nil, 0, 1) + h, err := CreateHeadBlock(dir, 0, nil, 0, 1) require.NoError(b, err) b.ReportAllocs() @@ -93,7 +93,7 @@ func TestAmendDatapointCausesError(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) + hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) require.NoError(t, err, "Error creating head block") app := hb.Appender() @@ -110,7 +110,7 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) + hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) require.NoError(t, err, "Error creating head block") app := hb.Appender() @@ -127,7 +127,7 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) + hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) require.NoError(t, err, "Error creating head block") app := hb.Appender() @@ -144,7 +144,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) + hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) require.NoError(t, err) // Append AmendedValue. @@ -246,7 +246,7 @@ func TestHeadBlock_e2e(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, minTime, maxTime) + hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, minTime, maxTime) require.NoError(t, err) app := hb.Appender()