diff --git a/block.go b/block.go index e8985a7f2..dcc2e58ec 100644 --- a/block.go +++ b/block.go @@ -13,9 +13,9 @@ import ( // Block handles reads against a block of time series data within a time window. type block interface { - Querier(mint, maxt int64) Querier - interval() (int64, int64) + index() IndexReader + series() SeriesReader } type BlockStats struct { @@ -34,8 +34,8 @@ const ( type persistedBlock struct { chunksf, indexf *mmapFile - chunks *seriesReader - index *indexReader + chunkr *seriesReader + indexr *indexReader stats BlockStats } @@ -70,8 +70,8 @@ func newPersistedBlock(path string) (*persistedBlock, error) { pb := &persistedBlock{ chunksf: chunksf, indexf: indexf, - chunks: sr, - index: ir, + chunkr: sr, + indexr: ir, stats: stats, } return pb, nil @@ -87,13 +87,12 @@ func (pb *persistedBlock) Close() error { return err1 } -func (pb *persistedBlock) Querier(mint, maxt int64) Querier { - return &blockQuerier{ - mint: mint, - maxt: maxt, - index: pb.index, - series: pb.chunks, - } +func (pb *persistedBlock) index() IndexReader { + return pb.indexr +} + +func (pb *persistedBlock) series() SeriesReader { + return pb.chunkr } func (pb *persistedBlock) interval() (int64, int64) { diff --git a/compact.go b/compact.go index 3ef779361..c6539179f 100644 --- a/compact.go +++ b/compact.go @@ -57,7 +57,7 @@ func (c *compactor) close() error { return nil } -func (c *compactor) compact(dir string, a, b *persistedBlock) error { +func (c *compactor) compact(dir string, a, b block) error { if err := os.MkdirAll(dir, 0777); err != nil { return err } @@ -77,23 +77,32 @@ func (c *compactor) compact(dir string, a, b *persistedBlock) error { defer index.Close() defer series.Close() - aall, err := a.index.Postings("", "") + aall, err := a.index().Postings("", "") if err != nil { return err } - ball, err := b.index.Postings("", "") + ball, err := b.index().Postings("", "") if err != nil { return err } set, err := newCompactionMerger( - newCompactionSeriesSet(a.index, a.chunks, aall), - newCompactionSeriesSet(b.index, b.chunks, ball), + newCompactionSeriesSet(a.index(), a.series(), aall), + newCompactionSeriesSet(b.index(), b.series(), ball), ) if err != nil { return err } + astats, err := a.index().Stats() + if err != nil { + return err + } + bstats, err := a.index().Stats() + if err != nil { + return err + } + // We fully rebuild the postings list index from merged series. var ( postings = &memPostings{m: make(map[term][]uint32, 512)} @@ -101,9 +110,9 @@ func (c *compactor) compact(dir string, a, b *persistedBlock) error { i = uint32(0) ) stats := BlockStats{ - MinTime: a.stats.MinTime, - MaxTime: b.stats.MaxTime, - SampleCount: a.stats.SampleCount + b.stats.SampleCount, + MinTime: astats.MinTime, + MaxTime: bstats.MaxTime, + SampleCount: astats.SampleCount + bstats.SampleCount, } for set.Next() { diff --git a/head.go b/head.go index 01110f1a2..171c55290 100644 --- a/head.go +++ b/head.go @@ -66,9 +66,12 @@ func (h *HeadBlock) Close() error { return h.wal.Close() } -// Querier returns a new querier over the head block. -func (h *HeadBlock) Querier(mint, maxt int64) Querier { - return newBlockQuerier(h, h, mint, maxt) +func (h *HeadBlock) index() IndexReader { + return h +} + +func (h *HeadBlock) series() SeriesReader { + return h } // Chunk returns the chunk for the reference number. diff --git a/querier.go b/querier.go index 1d53f47f6..0532516b6 100644 --- a/querier.go +++ b/querier.go @@ -148,7 +148,12 @@ func (s *Shard) Querier(mint, maxt int64) Querier { } for _, b := range blocks { - sq.blocks = append(sq.blocks, b.Querier(mint, maxt)) + sq.blocks = append(sq.blocks, &blockQuerier{ + mint: mint, + maxt: maxt, + index: b.index(), + series: b.series(), + }) } return sq