mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 06:47:28 -08:00
Change block interface
This changes the block interface to directly expose index and series readers rather than the direct querier
This commit is contained in:
parent
5d75a3dc7b
commit
beb842a856
25
block.go
25
block.go
|
@ -13,9 +13,9 @@ import (
|
||||||
|
|
||||||
// Block handles reads against a block of time series data within a time window.
|
// Block handles reads against a block of time series data within a time window.
|
||||||
type block interface {
|
type block interface {
|
||||||
Querier(mint, maxt int64) Querier
|
|
||||||
|
|
||||||
interval() (int64, int64)
|
interval() (int64, int64)
|
||||||
|
index() IndexReader
|
||||||
|
series() SeriesReader
|
||||||
}
|
}
|
||||||
|
|
||||||
type BlockStats struct {
|
type BlockStats struct {
|
||||||
|
@ -34,8 +34,8 @@ const (
|
||||||
type persistedBlock struct {
|
type persistedBlock struct {
|
||||||
chunksf, indexf *mmapFile
|
chunksf, indexf *mmapFile
|
||||||
|
|
||||||
chunks *seriesReader
|
chunkr *seriesReader
|
||||||
index *indexReader
|
indexr *indexReader
|
||||||
|
|
||||||
stats BlockStats
|
stats BlockStats
|
||||||
}
|
}
|
||||||
|
@ -70,8 +70,8 @@ func newPersistedBlock(path string) (*persistedBlock, error) {
|
||||||
pb := &persistedBlock{
|
pb := &persistedBlock{
|
||||||
chunksf: chunksf,
|
chunksf: chunksf,
|
||||||
indexf: indexf,
|
indexf: indexf,
|
||||||
chunks: sr,
|
chunkr: sr,
|
||||||
index: ir,
|
indexr: ir,
|
||||||
stats: stats,
|
stats: stats,
|
||||||
}
|
}
|
||||||
return pb, nil
|
return pb, nil
|
||||||
|
@ -87,13 +87,12 @@ func (pb *persistedBlock) Close() error {
|
||||||
return err1
|
return err1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
|
func (pb *persistedBlock) index() IndexReader {
|
||||||
return &blockQuerier{
|
return pb.indexr
|
||||||
mint: mint,
|
}
|
||||||
maxt: maxt,
|
|
||||||
index: pb.index,
|
func (pb *persistedBlock) series() SeriesReader {
|
||||||
series: pb.chunks,
|
return pb.chunkr
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) interval() (int64, int64) {
|
func (pb *persistedBlock) interval() (int64, int64) {
|
||||||
|
|
25
compact.go
25
compact.go
|
@ -57,7 +57,7 @@ func (c *compactor) close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) compact(dir string, a, b *persistedBlock) error {
|
func (c *compactor) compact(dir string, a, b block) error {
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -77,23 +77,32 @@ func (c *compactor) compact(dir string, a, b *persistedBlock) error {
|
||||||
defer index.Close()
|
defer index.Close()
|
||||||
defer series.Close()
|
defer series.Close()
|
||||||
|
|
||||||
aall, err := a.index.Postings("", "")
|
aall, err := a.index().Postings("", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ball, err := b.index.Postings("", "")
|
ball, err := b.index().Postings("", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
set, err := newCompactionMerger(
|
set, err := newCompactionMerger(
|
||||||
newCompactionSeriesSet(a.index, a.chunks, aall),
|
newCompactionSeriesSet(a.index(), a.series(), aall),
|
||||||
newCompactionSeriesSet(b.index, b.chunks, ball),
|
newCompactionSeriesSet(b.index(), b.series(), ball),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
astats, err := a.index().Stats()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bstats, err := a.index().Stats()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// We fully rebuild the postings list index from merged series.
|
// We fully rebuild the postings list index from merged series.
|
||||||
var (
|
var (
|
||||||
postings = &memPostings{m: make(map[term][]uint32, 512)}
|
postings = &memPostings{m: make(map[term][]uint32, 512)}
|
||||||
|
@ -101,9 +110,9 @@ func (c *compactor) compact(dir string, a, b *persistedBlock) error {
|
||||||
i = uint32(0)
|
i = uint32(0)
|
||||||
)
|
)
|
||||||
stats := BlockStats{
|
stats := BlockStats{
|
||||||
MinTime: a.stats.MinTime,
|
MinTime: astats.MinTime,
|
||||||
MaxTime: b.stats.MaxTime,
|
MaxTime: bstats.MaxTime,
|
||||||
SampleCount: a.stats.SampleCount + b.stats.SampleCount,
|
SampleCount: astats.SampleCount + bstats.SampleCount,
|
||||||
}
|
}
|
||||||
|
|
||||||
for set.Next() {
|
for set.Next() {
|
||||||
|
|
9
head.go
9
head.go
|
@ -66,9 +66,12 @@ func (h *HeadBlock) Close() error {
|
||||||
return h.wal.Close()
|
return h.wal.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Querier returns a new querier over the head block.
|
func (h *HeadBlock) index() IndexReader {
|
||||||
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
return h
|
||||||
return newBlockQuerier(h, h, mint, maxt)
|
}
|
||||||
|
|
||||||
|
func (h *HeadBlock) series() SeriesReader {
|
||||||
|
return h
|
||||||
}
|
}
|
||||||
|
|
||||||
// Chunk returns the chunk for the reference number.
|
// Chunk returns the chunk for the reference number.
|
||||||
|
|
|
@ -148,7 +148,12 @@ func (s *Shard) Querier(mint, maxt int64) Querier {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
sq.blocks = append(sq.blocks, b.Querier(mint, maxt))
|
sq.blocks = append(sq.blocks, &blockQuerier{
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
index: b.index(),
|
||||||
|
series: b.series(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return sq
|
return sq
|
||||||
|
|
Loading…
Reference in a new issue