vendor: update tsdb

This commit is contained in:
Fabian Reinartz 2017-03-20 14:07:25 +01:00
parent c389193b37
commit fc2e56c13f
8 changed files with 192 additions and 121 deletions

View file

@ -2,6 +2,7 @@ package tsdb
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -11,8 +12,8 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// Block handles reads against a Block of time series data. // DiskBlock handles reads against a Block of time series data.
type Block interface { type DiskBlock interface {
// Directory where block data is stored. // Directory where block data is stored.
Dir() string Dir() string
@ -29,6 +30,32 @@ type Block interface {
Close() error Close() error
} }
// Block is an interface to a DiskBlock that can also be queried.
type Block interface {
DiskBlock
Queryable
}
// HeadBlock is a regular block that can still be appended to.
type HeadBlock interface {
Block
Appendable
}
// Appendable defines an entity to which data can be appended.
type Appendable interface {
// Appender returns a new Appender against an underlying store.
Appender() Appender
// Busy returns whether there are any currently active appenders.
Busy() bool
}
// Queryable defines an entity which provides a Querier.
type Queryable interface {
Querier(mint, maxt int64) Querier
}
// BlockMeta provides meta information about a block. // BlockMeta provides meta information about a block.
type BlockMeta struct { type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction. // Unique identifier for the block and its contents. Changes on compaction.
@ -60,14 +87,6 @@ const (
flagStd = 1 flagStd = 1
) )
type persistedBlock struct {
dir string
meta BlockMeta
chunkr *chunkReader
indexr *indexReader
}
type blockMeta struct { type blockMeta struct {
Version int `json:"version"` Version int `json:"version"`
@ -115,6 +134,14 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
return renameFile(tmp, path) return renameFile(tmp, path)
} }
type persistedBlock struct {
dir string
meta BlockMeta
chunkr *chunkReader
indexr *indexReader
}
func newPersistedBlock(dir string) (*persistedBlock, error) { func newPersistedBlock(dir string) (*persistedBlock, error) {
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
if err != nil { if err != nil {
@ -148,6 +175,19 @@ func (pb *persistedBlock) Close() error {
return merr.Err() return merr.Err()
} }
func (pb *persistedBlock) String() string {
return fmt.Sprintf("(%d, %s)", pb.meta.Sequence, pb.meta.ULID)
}
func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
return &blockQuerier{
mint: mint,
maxt: maxt,
index: pb.Index(),
chunks: pb.Chunks(),
}
}
func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }

View file

@ -15,7 +15,7 @@ import (
) )
const ( const (
// MagicSeries 4 bytes at the head of series file. // MagicChunks 4 bytes at the head of series file.
MagicChunks = 0x85BD40DD MagicChunks = 0x85BD40DD
) )

View file

@ -1,6 +1,7 @@
package tsdb package tsdb
import ( import (
"fmt"
"math/rand" "math/rand"
"os" "os"
"path/filepath" "path/filepath"
@ -8,6 +9,7 @@ import (
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log"
"github.com/oklog/ulid" "github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -32,6 +34,7 @@ type Compactor interface {
// compactor implements the Compactor interface. // compactor implements the Compactor interface.
type compactor struct { type compactor struct {
metrics *compactorMetrics metrics *compactorMetrics
logger log.Logger
opts *compactorOptions opts *compactorOptions
} }
@ -71,9 +74,10 @@ type compactorOptions struct {
maxBlockRange uint64 maxBlockRange uint64
} }
func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor { func newCompactor(r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
return &compactor{ return &compactor{
opts: opts, opts: opts,
logger: l,
metrics: newCompactorMetrics(r), metrics: newCompactorMetrics(r),
} }
} }
@ -178,6 +182,8 @@ func (c *compactor) Write(dir string, b Block) error {
// write creates a new block that is the union of the provided blocks into dir. // write creates a new block that is the union of the provided blocks into dir.
// It cleans up all files of the old blocks after completing successfully. // It cleans up all files of the old blocks after completing successfully.
func (c *compactor) write(dir string, blocks ...Block) (err error) { func (c *compactor) write(dir string, blocks ...Block) (err error) {
c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks))
defer func(t time.Time) { defer func(t time.Time) {
if err != nil { if err != nil {
c.metrics.failed.Inc() c.metrics.failed.Inc()

173
vendor/github.com/fabxc/tsdb/db.go generated vendored
View file

@ -11,7 +11,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"unsafe" "unsafe"
@ -94,15 +93,13 @@ type DB struct {
// Mutex for that must be held when modifying the general // Mutex for that must be held when modifying the general
// block layout. // block layout.
mtx sync.RWMutex mtx sync.RWMutex
persisted []*persistedBlock blocks []Block
seqBlocks map[int]Block
// Mutex that must be held when modifying just the head blocks // Mutex that must be held when modifying just the head blocks
// or the general layout. // or the general layout.
headmtx sync.RWMutex headmtx sync.RWMutex
heads []*headBlock heads []HeadBlock
headGen uint8
compactor Compactor compactor Compactor
@ -177,7 +174,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
donec: make(chan struct{}), donec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
} }
db.compactor = newCompactor(r, &compactorOptions{ db.compactor = newCompactor(r, l, &compactorOptions{
maxBlockRange: opts.MaxBlockDuration, maxBlockRange: opts.MaxBlockDuration,
}) })
@ -205,19 +202,20 @@ func (db *DB) run() {
case <-db.compactc: case <-db.compactc:
db.metrics.compactionsTriggered.Inc() db.metrics.compactionsTriggered.Inc()
var merr MultiError
changes1, err := db.retentionCutoff() changes1, err := db.retentionCutoff()
merr.Add(err) if err != nil {
db.logger.Log("msg", "retention cutoff failed", "err", err)
}
changes2, err := db.compact() changes2, err := db.compact()
merr.Add(err) if err != nil {
db.logger.Log("msg", "compaction failed", "err", err)
}
if changes1 || changes2 { if changes1 || changes2 {
merr.Add(db.reloadBlocks()) if err := db.reloadBlocks(); err != nil {
} db.logger.Log("msg", "reloading blocks failed", "err", err)
if err := merr.Err(); err != nil { }
db.logger.Log("msg", "compaction failed", "err", err)
} }
case <-db.stopc: case <-db.stopc:
@ -234,13 +232,16 @@ func (db *DB) retentionCutoff() (bool, error) {
db.mtx.RLock() db.mtx.RLock()
defer db.mtx.RUnlock() defer db.mtx.RUnlock()
db.headmtx.RLock()
defer db.headmtx.RUnlock()
// We don't count the span covered by head blocks towards the // We don't count the span covered by head blocks towards the
// retention time as it generally makes up a fraction of it. // retention time as it generally makes up a fraction of it.
if len(db.persisted) == 0 { if len(db.blocks)-len(db.heads) == 0 {
return false, nil return false, nil
} }
last := db.persisted[len(db.persisted)-1] last := db.blocks[len(db.blocks)-len(db.heads)-1]
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
return retentionCutoff(db.dir, mint) return retentionCutoff(db.dir, mint)
@ -251,7 +252,7 @@ func (db *DB) compact() (changes bool, err error) {
// Check whether we have pending head blocks that are ready to be persisted. // Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority. // They have the highest priority.
var singles []*headBlock var singles []Block
// Collect head blocks that are ready for compaction. Write them after // Collect head blocks that are ready for compaction. Write them after
// returning the lock to not block Appenders. // returning the lock to not block Appenders.
@ -262,7 +263,7 @@ func (db *DB) compact() (changes bool, err error) {
// Blocks that won't be appendable when instantiating a new appender // Blocks that won't be appendable when instantiating a new appender
// might still have active appenders on them. // might still have active appenders on them.
// Abort at the first one we encounter. // Abort at the first one we encounter.
if atomic.LoadUint64(&h.activeWriters) > 0 { if h.Busy() {
break break
} }
singles = append(singles, h) singles = append(singles, h)
@ -271,13 +272,10 @@ func (db *DB) compact() (changes bool, err error) {
db.headmtx.RUnlock() db.headmtx.RUnlock()
Loop:
for _, h := range singles { for _, h := range singles {
db.logger.Log("msg", "write head", "seq", h.Meta().Sequence)
select { select {
case <-db.stopc: case <-db.stopc:
break Loop return changes, nil
default: default:
} }
@ -296,16 +294,15 @@ Loop:
select { select {
case <-db.stopc: case <-db.stopc:
return false, nil return changes, nil
default: default:
} }
// We just execute compactions sequentially to not cause too extreme // We just execute compactions sequentially to not cause too extreme
// CPU and memory spikes. // CPU and memory spikes.
// TODO(fabxc): return more descriptive plans in the future that allow // TODO(fabxc): return more descriptive plans in the future that allow
// estimation of resource usage and conditional parallelization? // estimation of resource usage and conditional parallelization?
for _, p := range plans { for _, p := range plans {
db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p))
if err := db.compactor.Compact(p...); err != nil { if err := db.compactor.Compact(p...); err != nil {
return changes, errors.Wrapf(err, "compact %s", p) return changes, errors.Wrapf(err, "compact %s", p)
} }
@ -323,6 +320,10 @@ Loop:
// retentionCutoff deletes all directories of blocks in dir that are strictly // retentionCutoff deletes all directories of blocks in dir that are strictly
// before mint. // before mint.
func retentionCutoff(dir string, mint int64) (bool, error) { func retentionCutoff(dir string, mint int64) (bool, error) {
df, err := fileutil.OpenDir(dir)
if err != nil {
return false, errors.Wrapf(err, "open directory")
}
dirs, err := blockDirs(dir) dirs, err := blockDirs(dir)
if err != nil { if err != nil {
return false, errors.Wrapf(err, "list block dirs %s", dir) return false, errors.Wrapf(err, "list block dirs %s", dir)
@ -347,7 +348,16 @@ func retentionCutoff(dir string, mint int64) (bool, error) {
} }
} }
return changes, nil return changes, fileutil.Fsync(df)
}
func (db *DB) seqBlock(i int) (Block, bool) {
for _, b := range db.blocks {
if b.Meta().Sequence == i {
return b, true
}
}
return nil, false
} }
func (db *DB) reloadBlocks() error { func (db *DB) reloadBlocks() error {
@ -366,8 +376,8 @@ func (db *DB) reloadBlocks() error {
} }
var ( var (
metas []*BlockMeta metas []*BlockMeta
persisted []*persistedBlock blocks []Block
heads []*headBlock heads []HeadBlock
seqBlocks = make(map[int]Block, len(dirs)) seqBlocks = make(map[int]Block, len(dirs))
) )
@ -380,7 +390,7 @@ func (db *DB) reloadBlocks() error {
} }
for i, meta := range metas { for i, meta := range metas {
b, ok := db.seqBlocks[meta.Sequence] b, ok := db.seqBlock(meta.Sequence)
if meta.Compaction.Generation == 0 { if meta.Compaction.Generation == 0 {
if !ok { if !ok {
@ -392,7 +402,7 @@ func (db *DB) reloadBlocks() error {
if meta.ULID != b.Meta().ULID { if meta.ULID != b.Meta().ULID {
return errors.Errorf("head block ULID changed unexpectedly") return errors.Errorf("head block ULID changed unexpectedly")
} }
heads = append(heads, b.(*headBlock)) heads = append(heads, b.(HeadBlock))
} else { } else {
if !ok || meta.ULID != b.Meta().ULID { if !ok || meta.ULID != b.Meta().ULID {
b, err = newPersistedBlock(dirs[i]) b, err = newPersistedBlock(dirs[i])
@ -400,22 +410,21 @@ func (db *DB) reloadBlocks() error {
return errors.Wrapf(err, "open persisted block %s", dirs[i]) return errors.Wrapf(err, "open persisted block %s", dirs[i])
} }
} }
persisted = append(persisted, b.(*persistedBlock))
} }
seqBlocks[meta.Sequence] = b seqBlocks[meta.Sequence] = b
blocks = append(blocks, b)
} }
// Close all blocks that we no longer need. They are closed after returning all // Close all blocks that we no longer need. They are closed after returning all
// locks to avoid questionable locking order. // locks to avoid questionable locking order.
for seq, b := range db.seqBlocks { for _, b := range db.blocks {
if nb, ok := seqBlocks[seq]; !ok || nb != b { if nb := seqBlocks[b.Meta().Sequence]; nb != b {
cs = append(cs, b) cs = append(cs, b)
} }
} }
db.seqBlocks = seqBlocks db.blocks = blocks
db.persisted = persisted
db.heads = heads db.heads = heads
return nil return nil
@ -431,12 +440,10 @@ func (db *DB) Close() error {
var g errgroup.Group var g errgroup.Group
for _, pb := range db.persisted { // blocks also contains all head blocks.
for _, pb := range db.blocks {
g.Go(pb.Close) g.Go(pb.Close)
} }
for _, hb := range db.heads {
g.Go(hb.Close)
}
var merr MultiError var merr MultiError
@ -454,54 +461,59 @@ func (db *DB) Appender() Appender {
// Only instantiate appender after returning the headmtx to avoid // Only instantiate appender after returning the headmtx to avoid
// questionable locking order. // questionable locking order.
db.headmtx.RLock() db.headmtx.RLock()
app := db.appendable() app := db.appendable()
heads := make([]*headBlock, len(app))
copy(heads, app)
db.headmtx.RUnlock() db.headmtx.RUnlock()
for _, b := range heads { for _, b := range app {
a.heads = append(a.heads, b.Appender().(*headAppender)) a.heads = append(a.heads, &metaAppender{
meta: b.Meta(),
app: b.Appender().(*headAppender),
})
} }
return a return a
} }
type dbAppender struct { type dbAppender struct {
db *DB db *DB
heads []*headAppender heads []*metaAppender
samples int samples int
} }
type metaAppender struct {
meta BlockMeta
app Appender
}
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
h, err := a.appenderFor(t) h, err := a.appenderFor(t)
if err != nil { if err != nil {
return 0, err return 0, err
} }
ref, err := h.Add(lset, t, v) ref, err := h.app.Add(lset, t, v)
if err != nil { if err != nil {
return 0, err return 0, err
} }
a.samples++ a.samples++
return ref | (uint64(h.generation) << 40), nil // Store last byte of sequence number in 3rd byte of refernece.
return ref | (uint64(h.meta.Sequence^0xff) << 40), nil
} }
func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
// We store the head generation in the 4th byte and use it to reject // Load the head last byte of the head sequence from the 3rd byte of the
// stale references. // reference number.
gen := uint8((ref << 16) >> 56) gen := (ref << 16) >> 56
h, err := a.appenderFor(t) h, err := a.appenderFor(t)
if err != nil { if err != nil {
return err return err
} }
// If the reference pointed into a previous block, we cannot // If the last byte of the sequence does not add up, the reference is not valid.
// use it to append the sample. if uint64(h.meta.Sequence^0xff) != gen {
if h.generation != gen {
return ErrNotFound return ErrNotFound
} }
if err := h.AddFast(ref, t, v); err != nil { if err := h.app.AddFast(ref, t, v); err != nil {
return err return err
} }
@ -511,12 +523,12 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
// appenderFor gets the appender for the head containing timestamp t. // appenderFor gets the appender for the head containing timestamp t.
// If the head block doesn't exist yet, it gets created. // If the head block doesn't exist yet, it gets created.
func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) {
// If there's no fitting head block for t, ensure it gets created. // If there's no fitting head block for t, ensure it gets created.
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
a.db.headmtx.Lock() a.db.headmtx.Lock()
var newHeads []*headBlock var newHeads []HeadBlock
if err := a.db.ensureHead(t); err != nil { if err := a.db.ensureHead(t); err != nil {
a.db.headmtx.Unlock() a.db.headmtx.Unlock()
@ -527,7 +539,7 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
} else { } else {
maxSeq := a.heads[len(a.heads)-1].meta.Sequence maxSeq := a.heads[len(a.heads)-1].meta.Sequence
for _, b := range a.db.appendable() { for _, b := range a.db.appendable() {
if b.meta.Sequence > maxSeq { if b.Meta().Sequence > maxSeq {
newHeads = append(newHeads, b) newHeads = append(newHeads, b)
} }
} }
@ -538,7 +550,10 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
// Instantiate appenders after returning headmtx to avoid questionable // Instantiate appenders after returning headmtx to avoid questionable
// locking order. // locking order.
for _, b := range newHeads { for _, b := range newHeads {
a.heads = append(a.heads, b.Appender().(*headAppender)) a.heads = append(a.heads, &metaAppender{
app: b.Appender(),
meta: b.Meta(),
})
} }
} }
for i := len(a.heads) - 1; i >= 0; i-- { for i := len(a.heads) - 1; i >= 0; i-- {
@ -565,11 +580,12 @@ func (db *DB) ensureHead(t int64) error {
for { for {
h := db.heads[len(db.heads)-1] h := db.heads[len(db.heads)-1]
m := h.Meta()
// If t doesn't exceed the range of heads blocks, there's nothing to do. // If t doesn't exceed the range of heads blocks, there's nothing to do.
if t < h.meta.MaxTime { if t < m.MaxTime {
return nil return nil
} }
if _, err := db.cut(h.meta.MaxTime); err != nil { if _, err := db.cut(m.MaxTime); err != nil {
return err return err
} }
} }
@ -579,7 +595,7 @@ func (a *dbAppender) Commit() error {
var merr MultiError var merr MultiError
for _, h := range a.heads { for _, h := range a.heads {
merr.Add(h.Commit()) merr.Add(h.app.Commit())
} }
a.db.mtx.RUnlock() a.db.mtx.RUnlock()
@ -593,18 +609,22 @@ func (a *dbAppender) Rollback() error {
var merr MultiError var merr MultiError
for _, h := range a.heads { for _, h := range a.heads {
merr.Add(h.Rollback()) merr.Add(h.app.Rollback())
} }
a.db.mtx.RUnlock() a.db.mtx.RUnlock()
return merr.Err() return merr.Err()
} }
func (db *DB) appendable() []*headBlock { // appendable returns a copy of a slice of HeadBlocks that can still be appended to.
if len(db.heads) <= db.opts.AppendableBlocks { func (db *DB) appendable() []HeadBlock {
return db.heads var i int
app := make([]HeadBlock, 0, db.opts.AppendableBlocks)
if len(db.heads) > db.opts.AppendableBlocks {
i = len(db.heads) - db.opts.AppendableBlocks
} }
return db.heads[len(db.heads)-db.opts.AppendableBlocks:] return append(app, db.heads[i:]...)
} }
func intervalOverlap(amin, amax, bmin, bmax int64) bool { func intervalOverlap(amin, amax, bmin, bmax int64) bool {
@ -626,13 +646,7 @@ func intervalContains(min, max, t int64) bool {
func (db *DB) blocksForInterval(mint, maxt int64) []Block { func (db *DB) blocksForInterval(mint, maxt int64) []Block {
var bs []Block var bs []Block
for _, b := range db.persisted { for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
bs = append(bs, b)
}
}
for _, b := range db.heads {
m := b.Meta() m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
bs = append(bs, b) bs = append(bs, b)
@ -644,7 +658,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
// cut starts a new head block to append to. The completed head block // cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period. // will still be appendable for the configured grace period.
func (db *DB) cut(mint int64) (*headBlock, error) { func (db *DB) cut(mint int64) (HeadBlock, error) {
maxt := mint + int64(db.opts.MinBlockDuration) maxt := mint + int64(db.opts.MinBlockDuration)
dir, seq, err := nextSequenceFile(db.dir, "b-") dir, seq, err := nextSequenceFile(db.dir, "b-")
@ -656,11 +670,8 @@ func (db *DB) cut(mint int64) (*headBlock, error) {
return nil, err return nil, err
} }
db.blocks = append(db.blocks, newHead)
db.heads = append(db.heads, newHead) db.heads = append(db.heads, newHead)
db.seqBlocks[seq] = newHead
db.headGen++
newHead.generation = db.headGen
select { select {
case db.compactc <- struct{}{}: case db.compactc <- struct{}{}:

33
vendor/github.com/fabxc/tsdb/head.go generated vendored
View file

@ -36,10 +36,9 @@ var (
// headBlock handles reads and writes of time series data within a time window. // headBlock handles reads and writes of time series data within a time window.
type headBlock struct { type headBlock struct {
mtx sync.RWMutex mtx sync.RWMutex
dir string dir string
generation uint8 wal *WAL
wal *WAL
activeWriters uint64 activeWriters uint64
closed bool closed bool
@ -136,6 +135,10 @@ func (h *headBlock) inBounds(t int64) bool {
return t >= h.meta.MinTime && t <= h.meta.MaxTime return t >= h.meta.MinTime && t <= h.meta.MaxTime
} }
func (h *headBlock) String() string {
return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID)
}
// Close syncs all data and closes underlying resources of the head block. // Close syncs all data and closes underlying resources of the head block.
func (h *headBlock) Close() error { func (h *headBlock) Close() error {
h.mtx.Lock() h.mtx.Lock()
@ -173,6 +176,22 @@ func (h *headBlock) Persisted() bool { return false }
func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } func (h *headBlock) Index() IndexReader { return &headIndexReader{h} }
func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} }
func (h *headBlock) Querier(mint, maxt int64) Querier {
h.mtx.RLock()
defer h.mtx.RUnlock()
if h.closed {
panic(fmt.Sprintf("block %s already closed", h.dir))
}
return &blockQuerier{
mint: mint,
maxt: maxt,
index: h.Index(),
chunks: h.Chunks(),
postingsMapper: h.remapPostings,
}
}
func (h *headBlock) Appender() Appender { func (h *headBlock) Appender() Appender {
atomic.AddUint64(&h.activeWriters, 1) atomic.AddUint64(&h.activeWriters, 1)
@ -184,6 +203,10 @@ func (h *headBlock) Appender() Appender {
return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} return &headAppender{headBlock: h, samples: getHeadAppendBuffer()}
} }
func (h *headBlock) Busy() bool {
return atomic.LoadUint64(&h.activeWriters) > 0
}
var headPool = sync.Pool{} var headPool = sync.Pool{}
func getHeadAppendBuffer() []refdSample { func getHeadAppendBuffer() []refdSample {
@ -265,6 +288,8 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
// sample sequence is valid. // sample sequence is valid.
// We also have to revalidate it as we switch locks an create // We also have to revalidate it as we switch locks an create
// the new series. // the new series.
} else if ref > uint64(len(a.series)) {
return ErrNotFound
} else { } else {
ms := a.series[int(ref)] ms := a.series[int(ref)]
if ms == nil { if ms == nil {

View file

@ -567,7 +567,10 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
key := strings.Join(names, string(sep)) key := strings.Join(names, string(sep))
off, ok := r.labels[key] off, ok := r.labels[key]
if !ok { if !ok {
return nil, fmt.Errorf("label index doesn't exist") // XXX(fabxc): hot fix. Should return a partial data error and handle cases
// where the entire block has no data gracefully.
return emptyStringTuples{}, nil
//return nil, fmt.Errorf("label index doesn't exist")
} }
flag, b, err := r.section(off) flag, b, err := r.section(off)
@ -590,6 +593,11 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
return st, nil return st, nil
} }
type emptyStringTuples struct{}
func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil }
func (emptyStringTuples) Len() int { return 0 }
func (r *indexReader) LabelIndices() ([][]string, error) { func (r *indexReader) LabelIndices() ([][]string, error) {
res := [][]string{} res := [][]string{}

View file

@ -55,27 +55,8 @@ func (s *DB) Querier(mint, maxt int64) Querier {
blocks: make([]Querier, 0, len(blocks)), blocks: make([]Querier, 0, len(blocks)),
db: s, db: s,
} }
for _, b := range blocks { for _, b := range blocks {
q := &blockQuerier{ sq.blocks = append(sq.blocks, b.Querier(mint, maxt))
mint: mint,
maxt: maxt,
index: b.Index(),
chunks: b.Chunks(),
}
// TODO(fabxc): find nicer solution.
if hb, ok := b.(*headBlock); ok {
// TODO(fabxc): temporary refactored.
hb.mtx.RLock()
if hb.closed {
panic(fmt.Sprintf("block %s already closed", hb.dir))
}
hb.mtx.RUnlock()
q.postingsMapper = hb.remapPostings
}
sq.blocks = append(sq.blocks, q)
} }
return sq return sq

6
vendor/vendor.json vendored
View file

@ -368,10 +368,10 @@
"revisionTime": "2016-09-30T00:14:02Z" "revisionTime": "2016-09-30T00:14:02Z"
}, },
{ {
"checksumSHA1": "JeYYg27cZpCWZYwYOm7r+UnUR2o=", "checksumSHA1": "8wTICzej/k4pCcYtSw+fmD6oZZE=",
"path": "github.com/fabxc/tsdb", "path": "github.com/fabxc/tsdb",
"revision": "863d38dfeebaceb69ce57cbba862102e10222256", "revision": "2ef3682560a31bd03f0ba70eb6ec509512ad0de8",
"revisionTime": "2017-03-17T14:56:19Z" "revisionTime": "2017-03-20T10:37:06Z"
}, },
{ {
"checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=", "checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=",