Merge pull request #171 from prometheus/safeclose

Add more verbose error handling for closing, reduce locking
This commit is contained in:
Fabian Reinartz 2017-10-10 18:10:06 +02:00 committed by GitHub
commit f347eac33d
11 changed files with 257 additions and 142 deletions

143
block.go
View file

@ -19,6 +19,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"github.com/oklog/ulid" "github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -26,33 +27,16 @@ import (
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
// DiskBlock represents a data block backed by on-disk data.
type DiskBlock interface {
BlockReader
// Directory where block data is stored.
Dir() string
// Stats returns statistics about the block.
Meta() BlockMeta
Delete(mint, maxt int64, m ...labels.Matcher) error
Snapshot(dir string) error
Close() error
}
// BlockReader provides reading access to a data block. // BlockReader provides reading access to a data block.
type BlockReader interface { type BlockReader interface {
// Index returns an IndexReader over the block's data. // Index returns an IndexReader over the block's data.
Index() IndexReader Index() (IndexReader, error)
// Chunks returns a ChunkReader over the block's data. // Chunks returns a ChunkReader over the block's data.
Chunks() ChunkReader Chunks() (ChunkReader, error)
// Tombstones returns a TombstoneReader over the block's deleted data. // Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() TombstoneReader Tombstones() (TombstoneReader, error)
} }
// Appendable defines an entity to which data can be appended. // Appendable defines an entity to which data can be appended.
@ -149,7 +133,12 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
return renameFile(tmp, path) return renameFile(tmp, path)
} }
type persistedBlock struct { // Block represents a directory of time series data covering a continous time range.
type Block struct {
mtx sync.RWMutex
closing bool
pendingReaders sync.WaitGroup
dir string dir string
meta BlockMeta meta BlockMeta
@ -159,7 +148,9 @@ type persistedBlock struct {
tombstones tombstoneReader tombstones tombstoneReader
} }
func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(dir string, pool chunks.Pool) (*Block, error) {
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
if err != nil { if err != nil {
return nil, err return nil, err
@ -179,7 +170,7 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) {
return nil, err return nil, err
} }
pb := &persistedBlock{ pb := &Block{
dir: dir, dir: dir,
meta: *meta, meta: *meta,
chunkr: cr, chunkr: cr,
@ -189,28 +180,110 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) {
return pb, nil return pb, nil
} }
func (pb *persistedBlock) Close() error { // Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
pb.closing = true
pb.mtx.Unlock()
pb.pendingReaders.Wait()
var merr MultiError var merr MultiError
merr.Add(pb.chunkr.Close()) merr.Add(pb.chunkr.Close())
merr.Add(pb.indexr.Close()) merr.Add(pb.indexr.Close())
merr.Add(pb.tombstones.Close())
return merr.Err() return merr.Err()
} }
func (pb *persistedBlock) String() string { func (pb *Block) String() string {
return pb.meta.ULID.String() return pb.meta.ULID.String()
} }
func (pb *persistedBlock) Dir() string { return pb.dir } // Dir returns the directory of the block.
func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *Block) Dir() string { return pb.dir }
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
func (pb *persistedBlock) Tombstones() TombstoneReader { // Meta returns meta information about the block.
return pb.tombstones func (pb *Block) Meta() BlockMeta { return pb.meta }
}
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } // ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")
func (pb *Block) startRead() error {
pb.mtx.RLock()
defer pb.mtx.RUnlock()
if pb.closing {
return ErrClosing
}
pb.pendingReaders.Add(1)
return nil
}
// Index returns a new IndexReader against the block data.
func (pb *Block) Index() (IndexReader, error) {
if err := pb.startRead(); err != nil {
return nil, err
}
return blockIndexReader{IndexReader: pb.indexr, b: pb}, nil
}
// Chunks returns a new ChunkReader against the block data.
func (pb *Block) Chunks() (ChunkReader, error) {
if err := pb.startRead(); err != nil {
return nil, err
}
return blockChunkReader{ChunkReader: pb.chunkr, b: pb}, nil
}
// Tombstones returns a new TombstoneReader against the block data.
func (pb *Block) Tombstones() (TombstoneReader, error) {
if err := pb.startRead(); err != nil {
return nil, err
}
return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil
}
type blockIndexReader struct {
IndexReader
b *Block
}
func (r blockIndexReader) Close() error {
r.b.pendingReaders.Done()
return nil
}
type blockTombstoneReader struct {
TombstoneReader
b *Block
}
func (r blockTombstoneReader) Close() error {
r.b.pendingReaders.Done()
return nil
}
type blockChunkReader struct {
ChunkReader
b *Block
}
func (r blockChunkReader) Close() error {
r.b.pendingReaders.Done()
return nil
}
// Delete matching series between mint and maxt in the block.
func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
pb.mtx.Lock()
defer pb.mtx.Unlock()
if pb.closing {
return ErrClosing
}
func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
pr := newPostingsReader(pb.indexr) pr := newPostingsReader(pb.indexr)
p, absent := pr.Select(ms...) p, absent := pr.Select(ms...)
@ -262,7 +335,8 @@ Outer:
return writeMetaFile(pb.dir, &pb.meta) return writeMetaFile(pb.dir, &pb.meta)
} }
func (pb *persistedBlock) Snapshot(dir string) error { // Snapshot creates snapshot of the block into dir.
func (pb *Block) Snapshot(dir string) error {
blockDir := filepath.Join(dir, pb.meta.ULID.String()) blockDir := filepath.Join(dir, pb.meta.ULID.String())
if err := os.MkdirAll(blockDir, 0777); err != nil { if err := os.MkdirAll(blockDir, 0777); err != nil {
return errors.Wrap(err, "create snapshot block dir") return errors.Wrap(err, "create snapshot block dir")
@ -311,7 +385,6 @@ func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if b > maxt { if b > maxt {
b = maxt b = maxt
} }
return a, b return a, b
} }

View file

@ -21,9 +21,9 @@ import (
"io" "io"
"os" "os"
"github.com/prometheus/tsdb/fileutil"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/fileutil"
) )
const ( const (

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"io"
"math/rand" "math/rand"
"os" "os"
"path/filepath" "path/filepath"
@ -299,7 +300,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
var metas []*BlockMeta var metas []*BlockMeta
for _, d := range dirs { for _, d := range dirs {
b, err := newPersistedBlock(d, c.chunkPool) b, err := OpenBlock(d, c.chunkPool)
if err != nil { if err != nil {
return err return err
} }
@ -444,10 +445,30 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
var ( var (
set compactionSet set compactionSet
allSymbols = make(map[string]struct{}, 1<<16) allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{}
) )
for i, b := range blocks { defer func() { closeAll(closers...) }()
symbols, err := b.Index().Symbols() for i, b := range blocks {
indexr, err := b.Index()
if err != nil {
return errors.Wrapf(err, "open index reader for block %s", b)
}
closers = append(closers, indexr)
chunkr, err := b.Chunks()
if err != nil {
return errors.Wrapf(err, "open chunk reader for block %s", b)
}
closers = append(closers, chunkr)
tombsr, err := b.Tombstones()
if err != nil {
return errors.Wrapf(err, "open tombstone reader for block %s", b)
}
closers = append(closers, tombsr)
symbols, err := indexr.Symbols()
if err != nil { if err != nil {
return errors.Wrap(err, "read symbols") return errors.Wrap(err, "read symbols")
} }
@ -455,15 +476,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
allSymbols[s] = struct{}{} allSymbols[s] = struct{}{}
} }
indexr := b.Index()
all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value) all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value)
if err != nil { if err != nil {
return err return err
} }
all = indexr.SortedPostings(all) all = indexr.SortedPostings(all)
s := newCompactionSeriesSet(indexr, b.Chunks(), b.Tombstones(), all) s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
if i == 0 { if i == 0 {
set = s set = s
@ -565,7 +584,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return errors.Wrap(err, "write postings") return errors.Wrap(err, "write postings")
} }
} }
return nil return nil
} }

83
db.go
View file

@ -30,7 +30,6 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/prometheus/tsdb/fileutil"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/nightlyone/lockfile" "github.com/nightlyone/lockfile"
@ -38,6 +37,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
@ -105,7 +105,7 @@ type DB struct {
// Mutex for that must be held when modifying the general block layout. // Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex mtx sync.RWMutex
blocks []DiskBlock blocks []*Block
head *Head head *Head
@ -431,7 +431,7 @@ func retentionCutoff(dir string, mint int64) (bool, error) {
return changes, fileutil.Fsync(df) return changes, fileutil.Fsync(df)
} }
func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) { func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
for _, b := range db.blocks { for _, b := range db.blocks {
if b.Meta().ULID == id { if b.Meta().ULID == id {
return b, true return b, true
@ -456,7 +456,7 @@ func (db *DB) reload() (err error) {
return errors.Wrap(err, "find blocks") return errors.Wrap(err, "find blocks")
} }
var ( var (
blocks []DiskBlock blocks []*Block
exist = map[ulid.ULID]struct{}{} exist = map[ulid.ULID]struct{}{}
) )
@ -468,7 +468,7 @@ func (db *DB) reload() (err error) {
b, ok := db.getBlock(meta.ULID) b, ok := db.getBlock(meta.ULID)
if !ok { if !ok {
b, err = newPersistedBlock(dir, db.chunkPool) b, err = OpenBlock(dir, db.chunkPool)
if err != nil { if err != nil {
return errors.Wrapf(err, "open block %s", dir) return errors.Wrapf(err, "open block %s", dir)
} }
@ -505,7 +505,7 @@ func (db *DB) reload() (err error) {
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
} }
func validateBlockSequence(bs []DiskBlock) error { func validateBlockSequence(bs []*Block) error {
if len(bs) == 0 { if len(bs) == 0 {
return nil return nil
} }
@ -521,13 +521,19 @@ func validateBlockSequence(bs []DiskBlock) error {
return nil return nil
} }
func (db *DB) Blocks() []DiskBlock { func (db *DB) String() string {
return "HEAD"
}
// Blocks returns the databases persisted blocks.
func (db *DB) Blocks() []*Block {
db.mtx.RLock() db.mtx.RLock()
defer db.mtx.RUnlock() defer db.mtx.RUnlock()
return db.blocks return db.blocks
} }
// Head returns the databases's head.
func (db *DB) Head() *Head { func (db *DB) Head() *Head {
return db.head return db.head
} }
@ -587,41 +593,42 @@ func (db *DB) Snapshot(dir string) error {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
db.mtx.RLock() for _, b := range db.Blocks() {
defer db.mtx.RUnlock()
for _, b := range db.blocks {
level.Info(db.logger).Log("msg", "snapshotting block", "block", b) level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil { if err := b.Snapshot(dir); err != nil {
return errors.Wrap(err, "error snapshotting headblock") return errors.Wrap(err, "error snapshotting headblock")
} }
} }
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
} }
// Querier returns a new querier over the data partition for the given time range. // Querier returns a new querier over the data partition for the given time range.
// A goroutine must not handle more than one open Querier. // A goroutine must not handle more than one open Querier.
func (db *DB) Querier(mint, maxt int64) Querier { func (db *DB) Querier(mint, maxt int64) (Querier, error) {
db.mtx.RLock() var blocks []BlockReader
blocks := db.blocksForInterval(mint, maxt) for _, b := range db.Blocks() {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
blocks = append(blocks, b)
}
}
if maxt >= db.head.MinTime() {
blocks = append(blocks, db.head)
}
sq := &querier{ sq := &querier{
blocks: make([]Querier, 0, len(blocks)), blocks: make([]Querier, 0, len(blocks)),
db: db,
} }
for _, b := range blocks { for _, b := range blocks {
sq.blocks = append(sq.blocks, &blockQuerier{ q, err := NewBlockQuerier(b, mint, maxt)
mint: mint, if err != nil {
maxt: maxt, return nil, errors.Wrapf(err, "open querier for block %s", b)
index: b.Index(), }
chunks: b.Chunks(), sq.blocks = append(sq.blocks, q)
tombstones: b.Tombstones(),
})
} }
return sq return sq, nil
} }
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
@ -634,28 +641,22 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
db.mtx.Lock()
defer db.mtx.Unlock()
var g errgroup.Group var g errgroup.Group
for _, b := range db.blocks { for _, b := range db.Blocks() {
m := b.Meta() m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
g.Go(func(b DiskBlock) func() error { g.Go(func(b *Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) } return func() error { return b.Delete(mint, maxt, ms...) }
}(b)) }(b))
} }
} }
g.Go(func() error { g.Go(func() error {
return db.head.Delete(mint, maxt, ms...) return db.head.Delete(mint, maxt, ms...)
}) })
if err := g.Wait(); err != nil { if err := g.Wait(); err != nil {
return err return err
} }
return nil return nil
} }
@ -668,24 +669,6 @@ func intervalContains(min, max, t int64) bool {
return t >= min && t <= max return t >= min && t <= max
} }
// blocksForInterval returns all blocks within the partition that may contain
// data for the given time range.
func (db *DB) blocksForInterval(mint, maxt int64) []BlockReader {
var bs []BlockReader
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
bs = append(bs, b)
}
}
if maxt >= db.head.MinTime() {
bs = append(bs, db.head)
}
return bs
}
func isBlockDir(fi os.FileInfo) bool { func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() { if !fi.IsDir() {
return false return false

View file

@ -68,7 +68,8 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
require.NoError(t, err) require.NoError(t, err)
querier := db.Querier(0, 1) querier, err := db.Querier(0, 1)
require.NoError(t, err)
seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar")))
require.Equal(t, seriesSet, map[string][]sample{}) require.Equal(t, seriesSet, map[string][]sample{})
@ -77,7 +78,8 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
err = app.Commit() err = app.Commit()
require.NoError(t, err) require.NoError(t, err)
querier = db.Querier(0, 1) querier, err = db.Querier(0, 1)
require.NoError(t, err)
defer querier.Close() defer querier.Close()
seriesSet = readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) seriesSet = readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar")))
@ -96,7 +98,8 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
err = app.Rollback() err = app.Rollback()
require.NoError(t, err) require.NoError(t, err)
querier := db.Querier(0, 1) querier, err := db.Querier(0, 1)
require.NoError(t, err)
defer querier.Close() defer querier.Close()
seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar")))
@ -140,7 +143,9 @@ func TestDBAppenderAddRef(t *testing.T) {
require.NoError(t, app2.Commit()) require.NoError(t, app2.Commit())
q := db.Querier(0, 200) q, err := db.Querier(0, 200)
require.NoError(t, err)
res := readSeriesSet(t, q.Select(labels.NewEqualMatcher("a", "b"))) res := readSeriesSet(t, q.Select(labels.NewEqualMatcher("a", "b")))
require.Equal(t, map[string][]sample{ require.Equal(t, map[string][]sample{
@ -190,7 +195,9 @@ Outer:
} }
// Compare the result. // Compare the result.
q := db.Querier(0, numSamples) q, err := db.Querier(0, numSamples)
require.NoError(t, err)
res := q.Select(labels.NewEqualMatcher("a", "b")) res := q.Select(labels.NewEqualMatcher("a", "b"))
expSamples := make([]sample, 0, len(c.remaint)) expSamples := make([]sample, 0, len(c.remaint))
@ -284,7 +291,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
// Make sure the right value is stored. // Make sure the right value is stored.
q := db.Querier(0, 10) q, err := db.Querier(0, 10)
require.NoError(t, err)
ss := q.Select(labels.NewEqualMatcher("a", "b")) ss := q.Select(labels.NewEqualMatcher("a", "b"))
ssMap := readSeriesSet(t, ss) ssMap := readSeriesSet(t, ss)
@ -302,7 +311,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
q = db.Querier(0, 10) q, err = db.Querier(0, 10)
require.NoError(t, err)
ss = q.Select(labels.NewEqualMatcher("a", "b")) ss = q.Select(labels.NewEqualMatcher("a", "b"))
ssMap = readSeriesSet(t, ss) ssMap = readSeriesSet(t, ss)
@ -336,7 +347,8 @@ func TestDB_Snapshot(t *testing.T) {
db, err = Open(snap, nil, nil, nil) db, err = Open(snap, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
querier := db.Querier(mint, mint+1000) querier, err := db.Querier(mint, mint+1000)
require.NoError(t, err)
defer querier.Close() defer querier.Close()
// sum values // sum values
@ -485,7 +497,9 @@ func TestDB_e2e(t *testing.T) {
} }
} }
q := db.Querier(mint, maxt) q, err := db.Querier(mint, maxt)
require.NoError(t, err)
ss := q.Select(qry.ms...) ss := q.Select(qry.ms...)
result := map[string][]sample{} result := map[string][]sample{}

47
head.go
View file

@ -305,6 +305,23 @@ func (h *Head) initTime(t int64) (initialized bool) {
return true return true
} }
type rangeHead struct {
head *Head
mint, maxt int64
}
func (h *rangeHead) Index() (IndexReader, error) {
return h.head.indexRange(h.mint, h.maxt), nil
}
func (h *rangeHead) Chunks() (ChunkReader, error) {
return h.head.chunksRange(h.mint, h.maxt), nil
}
func (h *rangeHead) Tombstones() (TombstoneReader, error) {
return h.head.tombstones, nil
}
// initAppender is a helper to initialize the time bounds of a the head // initAppender is a helper to initialize the time bounds of a the head
// upon the first sample it receives. // upon the first sample it receives.
type initAppender struct { type initAppender struct {
@ -611,13 +628,14 @@ func (h *Head) gc() {
h.symMtx.Unlock() h.symMtx.Unlock()
} }
func (h *Head) Tombstones() TombstoneReader { // Tombstones returns a new reader over the head's tombstones
return h.tombstones func (h *Head) Tombstones() (TombstoneReader, error) {
return h.tombstones, nil
} }
// Index returns an IndexReader against the block. // Index returns an IndexReader against the block.
func (h *Head) Index() IndexReader { func (h *Head) Index() (IndexReader, error) {
return h.indexRange(math.MinInt64, math.MaxInt64) return h.indexRange(math.MinInt64, math.MaxInt64), nil
} }
func (h *Head) indexRange(mint, maxt int64) *headIndexReader { func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
@ -628,8 +646,8 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
} }
// Chunks returns a ChunkReader against the block. // Chunks returns a ChunkReader against the block.
func (h *Head) Chunks() ChunkReader { func (h *Head) Chunks() (ChunkReader, error) {
return h.chunksRange(math.MinInt64, math.MaxInt64) return h.chunksRange(math.MinInt64, math.MaxInt64), nil
} }
func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { func (h *Head) chunksRange(mint, maxt int64) *headChunkReader {
@ -712,23 +730,6 @@ func (c *safeChunk) Iterator() chunks.Iterator {
// func (c *safeChunk) Bytes() []byte { panic("illegal") } // func (c *safeChunk) Bytes() []byte { panic("illegal") }
// func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } // func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") }
type rangeHead struct {
head *Head
mint, maxt int64
}
func (h *rangeHead) Index() IndexReader {
return h.head.indexRange(h.mint, h.maxt)
}
func (h *rangeHead) Chunks() ChunkReader {
return h.head.chunksRange(h.mint, h.maxt)
}
func (h *rangeHead) Tombstones() TombstoneReader {
return newEmptyTombstoneReader()
}
type headIndexReader struct { type headIndexReader struct {
head *Head head *Head
mint, maxt int64 mint, maxt int64

View file

@ -322,7 +322,8 @@ Outer:
} }
// Compare the result. // Compare the result.
q := NewBlockQuerier(head.Index(), head.Chunks(), head.Tombstones(), head.MinTime(), head.MaxTime()) q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
require.NoError(t, err)
res := q.Select(labels.NewEqualMatcher("a", "b")) res := q.Select(labels.NewEqualMatcher("a", "b"))
expSamples := make([]sample, 0, len(c.remaint)) expSamples := make([]sample, 0, len(c.remaint))

View file

@ -19,14 +19,14 @@ import (
"fmt" "fmt"
"hash" "hash"
"io" "io"
"math"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
"strings" "strings"
"math"
"github.com/prometheus/tsdb/fileutil"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )

View file

@ -40,12 +40,15 @@ type mockIndex struct {
} }
func newMockIndex() mockIndex { func newMockIndex() mockIndex {
return mockIndex{ ix := mockIndex{
series: make(map[uint64]series), series: make(map[uint64]series),
labelIndex: make(map[string][]string), labelIndex: make(map[string][]string),
postings: newMemPostings(), postings: newMemPostings(),
symbols: make(map[string]struct{}), symbols: make(map[string]struct{}),
} }
ix.postings.ensureOrder()
return ix
} }
func (m mockIndex) Symbols() (map[string]struct{}, error) { func (m mockIndex) Symbols() (map[string]struct{}, error) {
@ -277,6 +280,7 @@ func TestPersistence_index_e2e(t *testing.T) {
postings = newMemPostings() postings = newMemPostings()
values = map[string]stringset{} values = map[string]stringset{}
) )
postings.ensureOrder()
mi := newMockIndex() mi := newMockIndex()

View file

@ -18,6 +18,7 @@ import (
"sort" "sort"
"strings" "strings"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
@ -50,7 +51,6 @@ type Series interface {
// querier aggregates querying results from time blocks within // querier aggregates querying results from time blocks within
// a single partition. // a single partition.
type querier struct { type querier struct {
db *DB
blocks []Querier blocks []Querier
} }
@ -103,21 +103,30 @@ func (q *querier) Close() error {
for _, bq := range q.blocks { for _, bq := range q.blocks {
merr.Add(bq.Close()) merr.Add(bq.Close())
} }
q.db.mtx.RUnlock()
return merr.Err() return merr.Err()
} }
// NewBlockQuerier returns a queries against the readers. // NewBlockQuerier returns a queries against the readers.
func NewBlockQuerier(ir IndexReader, cr ChunkReader, tr TombstoneReader, mint, maxt int64) Querier { func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) {
return &blockQuerier{ indexr, err := b.Index()
index: ir, if err != nil {
chunks: cr, return nil, errors.Wrapf(err, "open index reader")
tombstones: tr,
mint: mint,
maxt: maxt,
} }
chunkr, err := b.Chunks()
if err != nil {
return nil, errors.Wrapf(err, "open chunk reader")
}
tombsr, err := b.Tombstones()
if err != nil {
return nil, errors.Wrapf(err, "open tombstone reader")
}
return &blockQuerier{
mint: mint,
maxt: maxt,
index: indexr,
chunks: chunkr,
tombstones: tombsr,
}, nil
} }
// blockQuerier provides querying access to a single block database. // blockQuerier provides querying access to a single block database.
@ -175,7 +184,13 @@ func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
} }
func (q *blockQuerier) Close() error { func (q *blockQuerier) Close() error {
return nil var merr MultiError
merr.Add(q.index.Close())
merr.Add(q.chunks.Close())
merr.Add(q.tombstones.Close())
return merr.Err()
} }
// postingsReader is used to select matching postings from an IndexReader. // postingsReader is used to select matching postings from an IndexReader.

View file

@ -33,9 +33,11 @@ const (
tombstoneFormatV1 = 1 tombstoneFormatV1 = 1
) )
// TombstoneReader is the iterator over tombstones. // TombstoneReader gives access to tombstone intervals by series reference.
type TombstoneReader interface { type TombstoneReader interface {
Get(ref uint64) Intervals Get(ref uint64) Intervals
Close() error
} }
func writeTombstoneFile(dir string, tr tombstoneReader) error { func writeTombstoneFile(dir string, tr tombstoneReader) error {
@ -154,6 +156,10 @@ func (t tombstoneReader) add(ref uint64, itv Interval) {
t[ref] = t[ref].add(itv) t[ref] = t[ref].add(itv)
} }
func (tombstoneReader) Close() error {
return nil
}
// Interval represents a single time-interval. // Interval represents a single time-interval.
type Interval struct { type Interval struct {
Mint, Maxt int64 Mint, Maxt int64