From df7db4ac0767cf9c58e64f3af817ec80ac4fd61c Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Sun, 19 Mar 2017 21:03:09 +0530 Subject: [PATCH 1/5] Update kit/log To New API NewContext has been removed couple of weeks back. Ref: https://github.com/go-kit/kit/releases/tag/v0.4.0 --- db.go | 2 +- head.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/db.go b/db.go index 4de657a397..a89665e273 100644 --- a/db.go +++ b/db.go @@ -157,7 +157,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if l == nil { l = log.NewLogfmtLogger(os.Stdout) - l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) } if opts == nil { diff --git a/head.go b/head.go index eed2ff2222..4bebb379c6 100644 --- a/head.go +++ b/head.go @@ -86,7 +86,7 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head // openHeadBlock creates a new empty head block. func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { - wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 5*time.Second) + wal, err := OpenWAL(dir, log.With(l, "component", "wal"), 5*time.Second) if err != nil { return nil, err } From 11be2cc585ef5a9825d8cf695ad4f216a5fc8870 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 20 Mar 2017 08:41:56 +0100 Subject: [PATCH 2/5] Add composed Block interfaces, remove head generation This adds more lower-leve interfaces which are used to compose to different Block interfaces. The DB only uses interfaces instead of explicit persistedBlock and headBlock. The headBlock generation property is dropped as the use-case can be implemented using block sequence numbers. --- block.go | 30 +++++++++++- chunks.go | 2 +- db.go | 134 +++++++++++++++++++++++++++++------------------------- head.go | 13 ++++-- 4 files changed, 109 insertions(+), 70 deletions(-) diff --git a/block.go b/block.go index 9bb5d6ebb8..3e7b5ad247 100644 --- a/block.go +++ b/block.go @@ -11,8 +11,8 @@ import ( "github.com/pkg/errors" ) -// Block handles reads against a Block of time series data. -type Block interface { +// DiskBlock handles reads against a Block of time series data. +type DiskBlock interface { // Directory where block data is stored. Dir() string @@ -29,6 +29,32 @@ type Block interface { Close() error } +// Block is an interface to a DiskBlock that can also be queried. +type Block interface { + DiskBlock + // Queryable +} + +// HeadBlock is a regular block that can still be appended to. +type HeadBlock interface { + DiskBlock + Appendable +} + +// Appendable defines an entity to which data can be appended. +type Appendable interface { + // Appender returns a new Appender against an underlying store. + Appender() Appender + + // Busy returns whether there are any currently active appenders. + Busy() bool +} + +// Queryable defines an entity which provides a Querier. +type Queryable interface { + Queryable() Querier +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. diff --git a/chunks.go b/chunks.go index b3aaeb206d..c628108b9a 100644 --- a/chunks.go +++ b/chunks.go @@ -15,7 +15,7 @@ import ( ) const ( - // MagicSeries 4 bytes at the head of series file. + // MagicChunks 4 bytes at the head of series file. MagicChunks = 0x85BD40DD ) diff --git a/db.go b/db.go index 4de657a397..e84a9228ad 100644 --- a/db.go +++ b/db.go @@ -11,7 +11,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "unsafe" @@ -94,15 +93,14 @@ type DB struct { // Mutex for that must be held when modifying the general // block layout. - mtx sync.RWMutex - persisted []*persistedBlock - seqBlocks map[int]Block + mtx sync.RWMutex + blocks []Block + // seqBlocks map[int]Block // Mutex that must be held when modifying just the head blocks // or the general layout. headmtx sync.RWMutex - heads []*headBlock - headGen uint8 + heads []HeadBlock compactor Compactor @@ -237,11 +235,11 @@ func (db *DB) retentionCutoff() (bool, error) { // We don't count the span covered by head blocks towards the // retention time as it generally makes up a fraction of it. - if len(db.persisted) == 0 { + if len(db.blocks)-len(db.heads) == 0 { return false, nil } - last := db.persisted[len(db.persisted)-1] + last := db.blocks[len(db.blocks)-len(db.heads)-1] mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) return retentionCutoff(db.dir, mint) @@ -252,7 +250,7 @@ func (db *DB) compact() (changes bool, err error) { // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. - var singles []*headBlock + var singles []Block // Collect head blocks that are ready for compaction. Write them after // returning the lock to not block Appenders. @@ -263,7 +261,7 @@ func (db *DB) compact() (changes bool, err error) { // Blocks that won't be appendable when instantiating a new appender // might still have active appenders on them. // Abort at the first one we encounter. - if atomic.LoadUint64(&h.activeWriters) > 0 { + if h.Busy() { break } singles = append(singles, h) @@ -355,6 +353,15 @@ func retentionCutoff(dir string, mint int64) (bool, error) { return changes, fileutil.Fsync(df) } +func (db *DB) seqBlock(i int) (Block, bool) { + for _, b := range db.blocks { + if b.Meta().Sequence == i { + return b, true + } + } + return nil, false +} + func (db *DB) reloadBlocks() error { var cs []io.Closer defer closeAll(cs...) @@ -371,8 +378,8 @@ func (db *DB) reloadBlocks() error { } var ( metas []*BlockMeta - persisted []*persistedBlock - heads []*headBlock + blocks []Block + heads []HeadBlock seqBlocks = make(map[int]Block, len(dirs)) ) @@ -385,7 +392,7 @@ func (db *DB) reloadBlocks() error { } for i, meta := range metas { - b, ok := db.seqBlocks[meta.Sequence] + b, ok := db.seqBlock(meta.Sequence) if meta.Compaction.Generation == 0 { if !ok { @@ -397,7 +404,7 @@ func (db *DB) reloadBlocks() error { if meta.ULID != b.Meta().ULID { return errors.Errorf("head block ULID changed unexpectedly") } - heads = append(heads, b.(*headBlock)) + heads = append(heads, b.(HeadBlock)) } else { if !ok || meta.ULID != b.Meta().ULID { b, err = newPersistedBlock(dirs[i]) @@ -405,22 +412,21 @@ func (db *DB) reloadBlocks() error { return errors.Wrapf(err, "open persisted block %s", dirs[i]) } } - persisted = append(persisted, b.(*persistedBlock)) } seqBlocks[meta.Sequence] = b + blocks = append(blocks, b) } // Close all blocks that we no longer need. They are closed after returning all // locks to avoid questionable locking order. - for seq, b := range db.seqBlocks { - if nb, ok := seqBlocks[seq]; !ok || nb != b { + for _, b := range db.blocks { + if nb := seqBlocks[b.Meta().Sequence]; nb != b { cs = append(cs, b) } } - db.seqBlocks = seqBlocks - db.persisted = persisted + db.blocks = blocks db.heads = heads return nil @@ -436,12 +442,10 @@ func (db *DB) Close() error { var g errgroup.Group - for _, pb := range db.persisted { + // blocks also contains all head blocks. + for _, pb := range db.blocks { g.Go(pb.Close) } - for _, hb := range db.heads { - g.Go(hb.Close) - } var merr MultiError @@ -459,54 +463,59 @@ func (db *DB) Appender() Appender { // Only instantiate appender after returning the headmtx to avoid // questionable locking order. db.headmtx.RLock() - app := db.appendable() - heads := make([]*headBlock, len(app)) - copy(heads, app) - db.headmtx.RUnlock() - for _, b := range heads { - a.heads = append(a.heads, b.Appender().(*headAppender)) + for _, b := range app { + a.heads = append(a.heads, &metaAppender{ + meta: b.Meta(), + app: b.Appender().(*headAppender), + }) } return a } type dbAppender struct { - db *DB - heads []*headAppender + db *DB + heads []*metaAppender + samples int } +type metaAppender struct { + meta BlockMeta + app Appender +} + func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { h, err := a.appenderFor(t) if err != nil { return 0, err } - ref, err := h.Add(lset, t, v) + ref, err := h.app.Add(lset, t, v) if err != nil { return 0, err } a.samples++ - return ref | (uint64(h.generation) << 40), nil + // Store last byte of sequence number in 3rd byte of refernece. + return ref | (uint64(h.meta.Sequence^0xff) << 40), nil } func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { - // We store the head generation in the 4th byte and use it to reject - // stale references. - gen := uint8((ref << 16) >> 56) + // Load the head last byte of the head sequence from the 3rd byte of the + // reference number. + gen := (ref << 16) >> 56 h, err := a.appenderFor(t) if err != nil { return err } - // If the reference pointed into a previous block, we cannot - // use it to append the sample. - if h.generation != gen { + // If the last byte of the sequence does not add up, the reference is not valid. + if uint64(h.meta.Sequence^0xff) != gen { return ErrNotFound } - if err := h.AddFast(ref, t, v); err != nil { + if err := h.app.AddFast(ref, t, v); err != nil { return err } @@ -516,12 +525,12 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { // appenderFor gets the appender for the head containing timestamp t. // If the head block doesn't exist yet, it gets created. -func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { +func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) { // If there's no fitting head block for t, ensure it gets created. if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { a.db.headmtx.Lock() - var newHeads []*headBlock + var newHeads []HeadBlock if err := a.db.ensureHead(t); err != nil { a.db.headmtx.Unlock() @@ -532,7 +541,7 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { } else { maxSeq := a.heads[len(a.heads)-1].meta.Sequence for _, b := range a.db.appendable() { - if b.meta.Sequence > maxSeq { + if b.Meta().Sequence > maxSeq { newHeads = append(newHeads, b) } } @@ -543,7 +552,10 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { // Instantiate appenders after returning headmtx to avoid questionable // locking order. for _, b := range newHeads { - a.heads = append(a.heads, b.Appender().(*headAppender)) + a.heads = append(a.heads, &metaAppender{ + app: b.Appender(), + meta: b.Meta(), + }) } } for i := len(a.heads) - 1; i >= 0; i-- { @@ -570,11 +582,12 @@ func (db *DB) ensureHead(t int64) error { for { h := db.heads[len(db.heads)-1] + m := h.Meta() // If t doesn't exceed the range of heads blocks, there's nothing to do. - if t < h.meta.MaxTime { + if t < m.MaxTime { return nil } - if _, err := db.cut(h.meta.MaxTime); err != nil { + if _, err := db.cut(m.MaxTime); err != nil { return err } } @@ -584,7 +597,7 @@ func (a *dbAppender) Commit() error { var merr MultiError for _, h := range a.heads { - merr.Add(h.Commit()) + merr.Add(h.app.Commit()) } a.db.mtx.RUnlock() @@ -598,18 +611,22 @@ func (a *dbAppender) Rollback() error { var merr MultiError for _, h := range a.heads { - merr.Add(h.Rollback()) + merr.Add(h.app.Rollback()) } a.db.mtx.RUnlock() return merr.Err() } -func (db *DB) appendable() []*headBlock { - if len(db.heads) <= db.opts.AppendableBlocks { - return db.heads +// appendable returns a copy of a slice of HeadBlocks that can still be appended to. +func (db *DB) appendable() []HeadBlock { + var i int + app := make([]HeadBlock, 0, db.opts.AppendableBlocks) + + if len(db.heads) > db.opts.AppendableBlocks { + i = len(db.heads) - db.opts.AppendableBlocks } - return db.heads[len(db.heads)-db.opts.AppendableBlocks:] + return append(app, db.heads[i:]...) } func intervalOverlap(amin, amax, bmin, bmax int64) bool { @@ -631,13 +648,7 @@ func intervalContains(min, max, t int64) bool { func (db *DB) blocksForInterval(mint, maxt int64) []Block { var bs []Block - for _, b := range db.persisted { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { - bs = append(bs, b) - } - } - for _, b := range db.heads { + for _, b := range db.blocks { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { bs = append(bs, b) @@ -661,11 +672,8 @@ func (db *DB) cut(mint int64) (*headBlock, error) { return nil, err } + db.blocks = append(db.blocks, newHead) db.heads = append(db.heads, newHead) - db.seqBlocks[seq] = newHead - db.headGen++ - - newHead.generation = db.headGen select { case db.compactc <- struct{}{}: diff --git a/head.go b/head.go index eed2ff2222..f801bc69c8 100644 --- a/head.go +++ b/head.go @@ -36,10 +36,9 @@ var ( // headBlock handles reads and writes of time series data within a time window. type headBlock struct { - mtx sync.RWMutex - dir string - generation uint8 - wal *WAL + mtx sync.RWMutex + dir string + wal *WAL activeWriters uint64 closed bool @@ -184,6 +183,10 @@ func (h *headBlock) Appender() Appender { return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} } +func (h *headBlock) Busy() bool { + return atomic.LoadUint64(&h.activeWriters) > 0 +} + var headPool = sync.Pool{} func getHeadAppendBuffer() []refdSample { @@ -265,6 +268,8 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { // sample sequence is valid. // We also have to revalidate it as we switch locks an create // the new series. + } else if ref > uint64(len(a.series)) { + return ErrNotFound } else { ms := a.series[int(ref)] if ms == nil { From 2c999836fbec923495ad2818d1f94df8a0c22876 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 20 Mar 2017 10:21:21 +0100 Subject: [PATCH 3/5] Add Queryable interface to Block This adds the Queryable interface to the Block interface. Head and persisted blocks now implement their own Querier() method and thus isolate customization (e.g. remapPostings) more cleanly. --- block.go | 31 ++++++++++++++++++++----------- db.go | 6 +++--- head.go | 16 ++++++++++++++++ querier.go | 21 +-------------------- 4 files changed, 40 insertions(+), 34 deletions(-) diff --git a/block.go b/block.go index 3e7b5ad247..2ecc7687c8 100644 --- a/block.go +++ b/block.go @@ -32,12 +32,12 @@ type DiskBlock interface { // Block is an interface to a DiskBlock that can also be queried. type Block interface { DiskBlock - // Queryable + Queryable } // HeadBlock is a regular block that can still be appended to. type HeadBlock interface { - DiskBlock + Block Appendable } @@ -52,7 +52,7 @@ type Appendable interface { // Queryable defines an entity which provides a Querier. type Queryable interface { - Queryable() Querier + Querier(mint, maxt int64) Querier } // BlockMeta provides meta information about a block. @@ -86,14 +86,6 @@ const ( flagStd = 1 ) -type persistedBlock struct { - dir string - meta BlockMeta - - chunkr *chunkReader - indexr *indexReader -} - type blockMeta struct { Version int `json:"version"` @@ -141,6 +133,14 @@ func writeMetaFile(dir string, meta *BlockMeta) error { return renameFile(tmp, path) } +type persistedBlock struct { + dir string + meta BlockMeta + + chunkr *chunkReader + indexr *indexReader +} + func newPersistedBlock(dir string) (*persistedBlock, error) { meta, err := readMetaFile(dir) if err != nil { @@ -174,6 +174,15 @@ func (pb *persistedBlock) Close() error { return merr.Err() } +func (pb *persistedBlock) Querier(mint, maxt int64) Querier { + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: pb.Index(), + chunks: pb.Chunks(), + } +} + func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } diff --git a/db.go b/db.go index e84a9228ad..6955477cd9 100644 --- a/db.go +++ b/db.go @@ -95,7 +95,6 @@ type DB struct { // block layout. mtx sync.RWMutex blocks []Block - // seqBlocks map[int]Block // Mutex that must be held when modifying just the head blocks // or the general layout. @@ -270,9 +269,10 @@ func (db *DB) compact() (changes bool, err error) { db.headmtx.RUnlock() + db.logger.Log("msg", "picked singles", "singles", fmt.Sprintf("%v", singles)) Loop: for _, h := range singles { - db.logger.Log("msg", "write head", "seq", h.Meta().Sequence) + db.logger.Log("msg", "write head", "seq", h.Meta().Sequence, "dir", h.Dir(), "ulid", h.Meta().ULID) select { case <-db.stopc: @@ -660,7 +660,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (db *DB) cut(mint int64) (*headBlock, error) { +func (db *DB) cut(mint int64) (HeadBlock, error) { maxt := mint + int64(db.opts.MinBlockDuration) dir, seq, err := nextSequenceFile(db.dir, "b-") diff --git a/head.go b/head.go index f801bc69c8..61a1313604 100644 --- a/head.go +++ b/head.go @@ -172,6 +172,22 @@ func (h *headBlock) Persisted() bool { return false } func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } +func (h *headBlock) Querier(mint, maxt int64) Querier { + h.mtx.RLock() + defer h.mtx.RUnlock() + + if h.closed { + panic(fmt.Sprintf("block %s already closed", h.dir)) + } + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: h.Index(), + chunks: h.Chunks(), + postingsMapper: h.remapPostings, + } +} + func (h *headBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) diff --git a/querier.go b/querier.go index c027a8dc0e..555731171a 100644 --- a/querier.go +++ b/querier.go @@ -55,27 +55,8 @@ func (s *DB) Querier(mint, maxt int64) Querier { blocks: make([]Querier, 0, len(blocks)), db: s, } - for _, b := range blocks { - q := &blockQuerier{ - mint: mint, - maxt: maxt, - index: b.Index(), - chunks: b.Chunks(), - } - - // TODO(fabxc): find nicer solution. - if hb, ok := b.(*headBlock); ok { - // TODO(fabxc): temporary refactored. - hb.mtx.RLock() - if hb.closed { - panic(fmt.Sprintf("block %s already closed", hb.dir)) - } - hb.mtx.RUnlock() - q.postingsMapper = hb.remapPostings - } - - sq.blocks = append(sq.blocks, q) + sq.blocks = append(sq.blocks, b.Querier(mint, maxt)) } return sq From 36355692575c28061a92cc8b381b11898399fa41 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 20 Mar 2017 10:41:43 +0100 Subject: [PATCH 4/5] Trigger reload correctly on interrupted compaction --- block.go | 5 +++++ compact.go | 8 +++++++- db.go | 13 ++++--------- head.go | 4 ++++ 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/block.go b/block.go index 2ecc7687c8..4939ee5d33 100644 --- a/block.go +++ b/block.go @@ -2,6 +2,7 @@ package tsdb import ( "encoding/json" + "fmt" "io/ioutil" "os" "path/filepath" @@ -174,6 +175,10 @@ func (pb *persistedBlock) Close() error { return merr.Err() } +func (pb *persistedBlock) String() string { + return fmt.Sprintf("(%d, %s)", pb.meta.Sequence, pb.meta.ULID) +} + func (pb *persistedBlock) Querier(mint, maxt int64) Querier { return &blockQuerier{ mint: mint, diff --git a/compact.go b/compact.go index a1d7b1e5c5..a1720605a6 100644 --- a/compact.go +++ b/compact.go @@ -1,6 +1,7 @@ package tsdb import ( + "fmt" "math/rand" "os" "path/filepath" @@ -8,6 +9,7 @@ import ( "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" + "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -32,6 +34,7 @@ type Compactor interface { // compactor implements the Compactor interface. type compactor struct { metrics *compactorMetrics + logger log.Logger opts *compactorOptions } @@ -71,9 +74,10 @@ type compactorOptions struct { maxBlockRange uint64 } -func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor { +func newCompactor(r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { return &compactor{ opts: opts, + logger: l, metrics: newCompactorMetrics(r), } } @@ -178,6 +182,8 @@ func (c *compactor) Write(dir string, b Block) error { // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. func (c *compactor) write(dir string, blocks ...Block) (err error) { + c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks)) + defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() diff --git a/db.go b/db.go index 6955477cd9..2c00b7d790 100644 --- a/db.go +++ b/db.go @@ -174,7 +174,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db donec: make(chan struct{}), stopc: make(chan struct{}), } - db.compactor = newCompactor(r, &compactorOptions{ + db.compactor = newCompactor(r, l, &compactorOptions{ maxBlockRange: opts.MaxBlockDuration, }) @@ -269,14 +269,10 @@ func (db *DB) compact() (changes bool, err error) { db.headmtx.RUnlock() - db.logger.Log("msg", "picked singles", "singles", fmt.Sprintf("%v", singles)) -Loop: for _, h := range singles { - db.logger.Log("msg", "write head", "seq", h.Meta().Sequence, "dir", h.Dir(), "ulid", h.Meta().ULID) - select { case <-db.stopc: - break Loop + return changes, nil default: } @@ -295,16 +291,15 @@ Loop: select { case <-db.stopc: - return false, nil + return changes, nil default: } + // We just execute compactions sequentially to not cause too extreme // CPU and memory spikes. // TODO(fabxc): return more descriptive plans in the future that allow // estimation of resource usage and conditional parallelization? for _, p := range plans { - db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p)) - if err := db.compactor.Compact(p...); err != nil { return changes, errors.Wrapf(err, "compact %s", p) } diff --git a/head.go b/head.go index 61a1313604..4864276f99 100644 --- a/head.go +++ b/head.go @@ -135,6 +135,10 @@ func (h *headBlock) inBounds(t int64) bool { return t >= h.meta.MinTime && t <= h.meta.MaxTime } +func (h *headBlock) String() string { + return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID) +} + // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { h.mtx.Lock() From 2ef3682560a31bd03f0ba70eb6ec509512ad0de8 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 20 Mar 2017 11:37:06 +0100 Subject: [PATCH 5/5] Hotfix erroneous "label index missing" error --- index.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/index.go b/index.go index f957085763..d0e557dd62 100644 --- a/index.go +++ b/index.go @@ -567,7 +567,10 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { key := strings.Join(names, string(sep)) off, ok := r.labels[key] if !ok { - return nil, fmt.Errorf("label index doesn't exist") + // XXX(fabxc): hot fix. Should return a partial data error and handle cases + // where the entire block has no data gracefully. + return emptyStringTuples{}, nil + //return nil, fmt.Errorf("label index doesn't exist") } flag, b, err := r.section(off) @@ -590,6 +593,11 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { return st, nil } +type emptyStringTuples struct{} + +func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil } +func (emptyStringTuples) Len() int { return 0 } + func (r *indexReader) LabelIndices() ([][]string, error) { res := [][]string{}