Merge pull request #3508 from prometheus/uptsdb

update TSDB
This commit is contained in:
Fabian Reinartz 2017-11-23 19:11:54 +01:00 committed by GitHub
commit 2ec5965b75
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 492 additions and 305 deletions

View file

@ -517,7 +517,12 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
Inspect(s.Expr, func(node Node) bool { Inspect(s.Expr, func(node Node) bool {
switch n := node.(type) { switch n := node.(type) {
case *VectorSelector: case *VectorSelector:
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...)) set, err := querier.Select(n.LabelMatchers...)
if err != nil {
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
return false
}
n.series, err = expandSeriesSet(set)
if err != nil { if err != nil {
// TODO(fabxc): use multi-error. // TODO(fabxc): use multi-error.
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
@ -529,7 +534,12 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
} }
case *MatrixSelector: case *MatrixSelector:
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...)) set, err := querier.Select(n.LabelMatchers...)
if err != nil {
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
return false
}
n.series, err = expandSeriesSet(set)
if err != nil { if err != nil {
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
return false return false

View file

@ -172,9 +172,16 @@ func TestStaleness(t *testing.T) {
querier, err := storage.Querier(context.Background(), 0, 2000) querier, err := storage.Querier(context.Background(), 0, 2000)
testutil.Ok(t, err) testutil.Ok(t, err)
defer querier.Close() defer querier.Close()
matcher, _ := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
samples, err := readSeriesSet(querier.Select(matcher)) matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
testutil.Ok(t, err) testutil.Ok(t, err)
set, err := querier.Select(matcher)
testutil.Ok(t, err)
samples, err := readSeriesSet(set)
testutil.Ok(t, err)
metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String() metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String()
metricSample, ok := samples[metric] metricSample, ok := samples[metric]

View file

@ -216,12 +216,16 @@ func NewMergeQuerier(queriers []Querier) Querier {
} }
// Select returns a set of series that matches the given label matchers. // Select returns a set of series that matches the given label matchers.
func (q *mergeQuerier) Select(matchers ...*labels.Matcher) SeriesSet { func (q *mergeQuerier) Select(matchers ...*labels.Matcher) (SeriesSet, error) {
seriesSets := make([]SeriesSet, 0, len(q.queriers)) seriesSets := make([]SeriesSet, 0, len(q.queriers))
for _, querier := range q.queriers { for _, querier := range q.queriers {
seriesSets = append(seriesSets, querier.Select(matchers...)) set, err := querier.Select(matchers...)
if err != nil {
return nil, err
}
seriesSets = append(seriesSets, set)
} }
return newMergeSeriesSet(seriesSets) return newMergeSeriesSet(seriesSets), nil
} }
// LabelValues returns all potential values for a label name. // LabelValues returns all potential values for a label name.

View file

@ -52,7 +52,7 @@ type Queryable interface {
// Querier provides reading access to time series data. // Querier provides reading access to time series data.
type Querier interface { type Querier interface {
// Select returns a set of series that matches the given label matchers. // Select returns a set of series that matches the given label matchers.
Select(...*labels.Matcher) SeriesSet Select(...*labels.Matcher) (SeriesSet, error)
// LabelValues returns all potential values for a label name. // LabelValues returns all potential values for a label name.
LabelValues(name string) ([]string, error) LabelValues(name string) ([]string, error)

View file

@ -22,8 +22,8 @@ func NoopQuerier() Querier {
return noopQuerier{} return noopQuerier{}
} }
func (noopQuerier) Select(...*labels.Matcher) SeriesSet { func (noopQuerier) Select(...*labels.Matcher) (SeriesSet, error) {
return NoopSeriesSet() return NoopSeriesSet(), nil
} }
func (noopQuerier) LabelValues(name string) ([]string, error) { func (noopQuerier) LabelValues(name string) ([]string, error) {

View file

@ -43,18 +43,18 @@ type querier struct {
// Select implements storage.Querier and uses the given matchers to read series // Select implements storage.Querier and uses the given matchers to read series
// sets from the Client. // sets from the Client.
func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { func (q *querier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
query, err := ToQuery(q.mint, q.maxt, matchers) query, err := ToQuery(q.mint, q.maxt, matchers)
if err != nil { if err != nil {
return errSeriesSet{err: err} return nil, err
} }
res, err := q.client.Read(q.ctx, query) res, err := q.client.Read(q.ctx, query)
if err != nil { if err != nil {
return errSeriesSet{err: err} return nil, err
} }
return FromQueryResult(res) return FromQueryResult(res), nil
} }
// LabelValues implements storage.Querier and is a noop. // LabelValues implements storage.Querier and is a noop.
@ -91,10 +91,13 @@ type externalLabelsQuerier struct {
// Select adds equality matchers for all external labels to the list of matchers // Select adds equality matchers for all external labels to the list of matchers
// before calling the wrapped storage.Queryable. The added external labels are // before calling the wrapped storage.Queryable. The added external labels are
// removed from the returned series sets. // removed from the returned series sets.
func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet { func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
m, added := q.addExternalLabels(matchers) m, added := q.addExternalLabels(matchers)
s := q.Querier.Select(m...) s, err := q.Querier.Select(m...)
return newSeriesSetFilter(s, added) if err != nil {
return nil, err
}
return newSeriesSetFilter(s, added), nil
} }
// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier // PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier
@ -141,7 +144,7 @@ type requiredMatchersQuerier struct {
// Select returns a NoopSeriesSet if the given matchers don't match the label // Select returns a NoopSeriesSet if the given matchers don't match the label
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. // set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet { func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
ms := q.requiredMatchers ms := q.requiredMatchers
for _, m := range matchers { for _, m := range matchers {
for i, r := range ms { for i, r := range ms {
@ -155,7 +158,7 @@ func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.Ser
} }
} }
if len(ms) > 0 { if len(ms) > 0 {
return storage.NoopSeriesSet() return storage.NoopSeriesSet(), nil
} }
return q.Querier.Select(matchers...) return q.Querier.Select(matchers...)
} }

View file

@ -41,9 +41,12 @@ func TestExternalLabelsQuerierSelect(t *testing.T) {
Querier: mockQuerier{}, Querier: mockQuerier{},
externalLabels: model.LabelSet{"region": "europe"}, externalLabels: model.LabelSet{"region": "europe"},
} }
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
if have := q.Select(matchers...); !reflect.DeepEqual(want, have) { have, err := q.Select(matchers...)
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(want, have) {
t.Errorf("expected series set %+v, got %+v", want, have) t.Errorf("expected series set %+v, got %+v", want, have)
} }
} }
@ -154,8 +157,8 @@ type mockSeriesSet struct {
storage.SeriesSet storage.SeriesSet
} }
func (mockQuerier) Select(...*labels.Matcher) storage.SeriesSet { func (mockQuerier) Select(...*labels.Matcher) (storage.SeriesSet, error) {
return mockSeriesSet{} return mockSeriesSet{}, nil
} }
func TestPreferLocalStorageFilter(t *testing.T) { func TestPreferLocalStorageFilter(t *testing.T) {
@ -310,7 +313,11 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) {
requiredMatchers: test.requiredMatchers, requiredMatchers: test.requiredMatchers,
} }
if want, have := test.seriesSet, q.Select(test.matchers...); want != have { have, err := q.Select(test.matchers...)
if err != nil {
t.Error(err)
}
if want := test.seriesSet; want != have {
t.Errorf("%d. expected series set %+v, got %+v", i, want, have) t.Errorf("%d. expected series set %+v, got %+v", i, want, have)
} }
if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) { if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) {

View file

@ -188,14 +188,17 @@ type querier struct {
q tsdb.Querier q tsdb.Querier
} }
func (q querier) Select(oms ...*labels.Matcher) storage.SeriesSet { func (q querier) Select(oms ...*labels.Matcher) (storage.SeriesSet, error) {
ms := make([]tsdbLabels.Matcher, 0, len(oms)) ms := make([]tsdbLabels.Matcher, 0, len(oms))
for _, om := range oms { for _, om := range oms {
ms = append(ms, convertMatcher(om)) ms = append(ms, convertMatcher(om))
} }
set, err := q.q.Select(ms...)
return seriesSet{set: q.q.Select(ms...)} if err != nil {
return nil, err
}
return seriesSet{set: set}, nil
} }
func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) } func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) }

View file

@ -133,7 +133,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
return renameFile(tmp, path) return renameFile(tmp, path)
} }
// Block represents a directory of time series data covering a continous time range. // Block represents a directory of time series data covering a continuous time range.
type Block struct { type Block struct {
mtx sync.RWMutex mtx sync.RWMutex
closing bool closing bool
@ -142,10 +142,9 @@ type Block struct {
dir string dir string
meta BlockMeta meta BlockMeta
chunkr *chunkReader chunkr ChunkReader
indexr *indexReader indexr IndexReader
tombstones TombstoneReader
tombstones tombstoneReader
} }
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
@ -156,11 +155,11 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) {
return nil, err return nil, err
} }
cr, err := newChunkReader(chunkDir(dir), pool) cr, err := NewDirChunkReader(chunkDir(dir), pool)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ir, err := newIndexReader(dir) ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -284,13 +283,15 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
return ErrClosing return ErrClosing
} }
pr := newPostingsReader(pb.indexr) p, absent, err := PostingsForMatchers(pb.indexr, ms...)
p, absent := pr.Select(ms...) if err != nil {
return errors.Wrap(err, "select series")
}
ir := pb.indexr ir := pb.indexr
// Choose only valid postings which have chunks in the time-range. // Choose only valid postings which have chunks in the time-range.
stones := map[uint64]Intervals{} stones := memTombstones{}
var lset labels.Labels var lset labels.Labels
var chks []ChunkMeta var chks []ChunkMeta
@ -322,16 +323,21 @@ Outer:
return p.Err() return p.Err()
} }
// Merge the current and new tombstones. err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for k, v := range stones { for _, iv := range ivs {
pb.tombstones.add(k, v[0]) stones.add(id, iv)
pb.meta.Stats.NumTombstones++
}
return nil
})
if err != nil {
return err
} }
pb.tombstones = stones
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
return err return err
} }
pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones))
return writeMetaFile(pb.dir, &pb.meta) return writeMetaFile(pb.dir, &pb.meta)
} }

12
vendor/github.com/prometheus/tsdb/block.prof generated vendored Normal file
View file

@ -0,0 +1,12 @@
--- contention:
cycles/second=2494255279
80179315716 1 @ 0x10061bb 0x10e008c 0x10e3934 0x10dfd30 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1
80176248000 1 @ 0x1065c12 0x1313b9d 0x10dfd30 0x105cea1
37792267436 303368 @ 0x10061fb 0x131dc08 0x105cea1
21607828 1098 @ 0x10648fe 0x10650d7 0x1064fca 0x12e5a74 0x12e5df2 0x131d969 0x105cea1
1272473 118 @ 0x10648fe 0x1065232 0x10651c6 0x1064cb0 0x12e5bcc 0x131dc50 0x105cea1
851800 1 @ 0x10061bb 0x1313bc6 0x10dfd30 0x105cea1
818628 59 @ 0x10648fe 0x1065232 0x10651c6 0x1064ebf 0x12e5a74 0x12e5df2 0x131d969 0x105cea1
501203 2 @ 0x1005473 0x12e5ed4 0x131d969 0x105cea1
7738 1 @ 0x10648fe 0x1064d19 0x12e5bcc 0x131dc50 0x105cea1
3846 1 @ 0x1005473 0x10e373b 0x10dfd3a 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1

View file

@ -298,7 +298,7 @@ type ChunkReader interface {
// of series data. // of series data.
type chunkReader struct { type chunkReader struct {
// The underlying bytes holding the encoded series data. // The underlying bytes holding the encoded series data.
bs [][]byte bs []ByteSlice
// Closers for resources behind the byte slices. // Closers for resources behind the byte slices.
cs []io.Closer cs []io.Closer
@ -306,8 +306,32 @@ type chunkReader struct {
pool chunks.Pool pool chunks.Pool
} }
// newChunkReader returns a new chunkReader based on mmaped files found in dir. func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkReader, error) {
func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { cr := chunkReader{pool: pool, bs: bs, cs: cs}
for i, b := range cr.bs {
if b.Len() < 4 {
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
return nil, fmt.Errorf("invalid magic number %x", m)
}
}
return &cr, nil
}
// NewChunkReader returns a new chunk reader against the given byte slices.
func NewChunkReader(bs []ByteSlice, pool chunks.Pool) (ChunkReader, error) {
if pool == nil {
pool = chunks.NewPool()
}
return newChunkReader(bs, nil, pool)
}
// NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the
// given directory.
func NewDirChunkReader(dir string, pool chunks.Pool) (ChunkReader, error) {
files, err := sequenceFiles(dir) files, err := sequenceFiles(dir)
if err != nil { if err != nil {
return nil, err return nil, err
@ -315,27 +339,19 @@ func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
if pool == nil { if pool == nil {
pool = chunks.NewPool() pool = chunks.NewPool()
} }
cr := chunkReader{pool: pool}
var bs []ByteSlice
var cs []io.Closer
for _, fn := range files { for _, fn := range files {
f, err := openMmapFile(fn) f, err := openMmapFile(fn)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "mmap files") return nil, errors.Wrapf(err, "mmap files")
} }
cr.cs = append(cr.cs, f) cs = append(cs, f)
cr.bs = append(cr.bs, f.b) bs = append(bs, realByteSlice(f.b))
} }
return newChunkReader(bs, cs, pool)
for i, b := range cr.bs {
if len(b) < 4 {
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks {
return nil, fmt.Errorf("invalid magic number %x", m)
}
}
return &cr, nil
} }
func (s *chunkReader) Close() error { func (s *chunkReader) Close() error {
@ -352,16 +368,18 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
} }
b := s.bs[seq] b := s.bs[seq]
if int(off) >= len(b) { if int(off) >= b.Len() {
return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len())
} }
b = b[off:] // With the minimum chunk length this should never cause us reading
// over the end of the slice.
r := b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b) l, n := binary.Uvarint(r)
if n < 0 { if n < 0 {
return nil, fmt.Errorf("reading chunk length failed") return nil, fmt.Errorf("reading chunk length failed")
} }
b = b[n:] r = b.Range(off+n, off+n+int(l))
return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l]) return s.pool.Get(chunks.Encoding(r[0]), r[1:1+l])
} }

View file

@ -46,8 +46,7 @@ package chunks
import ( import (
"encoding/binary" "encoding/binary"
"math" "math"
"math/bits"
bits "github.com/dgryski/go-bits"
) )
// XORChunk holds XOR encoded sample data. // XORChunk holds XOR encoded sample data.
@ -197,8 +196,8 @@ func (a *xorAppender) writeVDelta(v float64) {
} }
a.b.writeBit(one) a.b.writeBit(one)
leading := uint8(bits.Clz(vDelta)) leading := uint8(bits.LeadingZeros64(vDelta))
trailing := uint8(bits.Ctz(vDelta)) trailing := uint8(bits.TrailingZeros64(vDelta))
// Clamp number of leading zeros to avoid overflow when encoding. // Clamp number of leading zeros to avoid overflow when encoding.
if leading >= 32 { if leading >= 32 {

View file

@ -52,7 +52,7 @@ type Compactor interface {
Plan(dir string) ([]string, error) Plan(dir string) ([]string, error)
// Write persists a Block into a directory. // Write persists a Block into a directory.
Write(dest string, b BlockReader, mint, maxt int64) error Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error)
// Compact runs compaction against the provided directories. Must // Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan(). // only be called concurrently with results of Plan().
@ -321,7 +321,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
return c.write(dest, compactBlockMetas(uid, metas...), blocks...) return c.write(dest, compactBlockMetas(uid, metas...), blocks...)
} }
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error { func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
entropy := rand.New(rand.NewSource(time.Now().UnixNano())) entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy) uid := ulid.MustNew(ulid.Now(), entropy)
@ -333,7 +333,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) e
meta.Compaction.Level = 1 meta.Compaction.Level = 1
meta.Compaction.Sources = []ulid.ULID{uid} meta.Compaction.Sources = []ulid.ULID{uid}
return c.write(dest, meta, b) return uid, c.write(dest, meta, b)
} }
// instrumentedChunkWriter is used for level 1 compactions to record statistics // instrumentedChunkWriter is used for level 1 compactions to record statistics
@ -418,7 +418,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
} }
// Create an empty tombstones file. // Create an empty tombstones file.
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil {
return errors.Wrap(err, "write new tombstones file") return errors.Wrap(err, "write new tombstones file")
} }
@ -453,7 +453,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
// of the provided blocks. It returns meta information for the new block. // of the provided blocks. It returns meta information for the new block.
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
var ( var (
set compactionSet set ChunkSeriesSet
allSymbols = make(map[string]struct{}, 1<<16) allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{} closers = []io.Closer{}
) )
@ -597,18 +597,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return nil return nil
} }
type compactionSet interface {
Next() bool
At() (labels.Labels, []ChunkMeta, Intervals)
Err() error
}
type compactionSeriesSet struct { type compactionSeriesSet struct {
p Postings p Postings
index IndexReader index IndexReader
chunks ChunkReader chunks ChunkReader
tombstones TombstoneReader tombstones TombstoneReader
series SeriesSet
l labels.Labels l labels.Labels
c []ChunkMeta c []ChunkMeta
@ -631,7 +624,11 @@ func (c *compactionSeriesSet) Next() bool {
} }
var err error var err error
c.intervals = c.tombstones.Get(c.p.At()) c.intervals, err = c.tombstones.Get(c.p.At())
if err != nil {
c.err = errors.Wrap(err, "get tombstones")
return false
}
if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil { if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil {
c.err = errors.Wrapf(err, "get series %d", c.p.At()) c.err = errors.Wrapf(err, "get series %d", c.p.At())
@ -675,7 +672,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) {
} }
type compactionMerger struct { type compactionMerger struct {
a, b compactionSet a, b ChunkSeriesSet
aok, bok bool aok, bok bool
l labels.Labels l labels.Labels
@ -688,7 +685,7 @@ type compactionSeries struct {
chunks []*ChunkMeta chunks []*ChunkMeta
} }
func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
c := &compactionMerger{ c := &compactionMerger{
a: a, a: a,
b: b, b: b,

0
vendor/github.com/prometheus/tsdb/cpu.pro generated vendored Normal file
View file

0
vendor/github.com/prometheus/tsdb/cpu.prof generated vendored Normal file
View file

View file

@ -52,7 +52,7 @@ var DefaultOptions = &Options{
// Options of the DB storage. // Options of the DB storage.
type Options struct { type Options struct {
// The interval at which the write ahead log is flushed to disc. // The interval at which the write ahead log is flushed to disk.
WALFlushInterval time.Duration WALFlushInterval time.Duration
// Duration of persisted data to keep. // Duration of persisted data to keep.
@ -101,7 +101,6 @@ type DB struct {
opts *Options opts *Options
chunkPool chunks.Pool chunkPool chunks.Pool
compactor Compactor compactor Compactor
wal WAL
// Mutex for that must be held when modifying the general block layout. // Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex mtx sync.RWMutex
@ -142,7 +141,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
}) })
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{ m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_reloads_failures_total", Name: "prometheus_tsdb_reloads_failures_total",
Help: "Number of times the database failed to reload black data from disk.", Help: "Number of times the database failed to reload block data from disk.",
}) })
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{ m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_triggered_total", Name: "prometheus_tsdb_compactions_triggered_total",
@ -278,16 +277,23 @@ func (db *DB) retentionCutoff() (bool, error) {
} }
db.mtx.RLock() db.mtx.RLock()
defer db.mtx.RUnlock() blocks := db.blocks[:]
db.mtx.RUnlock()
if len(db.blocks) == 0 { if len(blocks) == 0 {
return false, nil return false, nil
} }
last := db.blocks[len(db.blocks)-1] last := blocks[len(db.blocks)-1]
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
return retentionCutoff(db.dir, mint) mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
dirs, err := retentionCutoffDirs(db.dir, mint)
if err != nil {
return false, err
}
// This will close the dirs and then delete the dirs.
return len(dirs) > 0, db.reload(dirs...)
} }
// Appender opens a new appender against the database. // Appender opens a new appender against the database.
@ -345,7 +351,7 @@ func (db *DB) compact() (changes bool, err error) {
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
} }
if err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil {
return changes, errors.Wrap(err, "persist head block") return changes, errors.Wrap(err, "persist head block")
} }
changes = true changes = true
@ -389,40 +395,37 @@ func (db *DB) compact() (changes bool, err error) {
return changes, nil return changes, nil
} }
// retentionCutoff deletes all directories of blocks in dir that are strictly // retentionCutoffDirs returns all directories of blocks in dir that are strictly
// before mint. // before mint.
func retentionCutoff(dir string, mint int64) (bool, error) { func retentionCutoffDirs(dir string, mint int64) ([]string, error) {
df, err := fileutil.OpenDir(dir) df, err := fileutil.OpenDir(dir)
if err != nil { if err != nil {
return false, errors.Wrapf(err, "open directory") return nil, errors.Wrapf(err, "open directory")
} }
defer df.Close() defer df.Close()
dirs, err := blockDirs(dir) dirs, err := blockDirs(dir)
if err != nil { if err != nil {
return false, errors.Wrapf(err, "list block dirs %s", dir) return nil, errors.Wrapf(err, "list block dirs %s", dir)
} }
changes := false delDirs := []string{}
for _, dir := range dirs { for _, dir := range dirs {
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
if err != nil { if err != nil {
return changes, errors.Wrapf(err, "read block meta %s", dir) return nil, errors.Wrapf(err, "read block meta %s", dir)
} }
// The first block we encounter marks that we crossed the boundary // The first block we encounter marks that we crossed the boundary
// of deletable blocks. // of deletable blocks.
if meta.MaxTime >= mint { if meta.MaxTime >= mint {
break break
} }
changes = true
if err := os.RemoveAll(dir); err != nil { delDirs = append(delDirs, dir)
return changes, err
}
} }
return changes, fileutil.Fsync(df) return delDirs, nil
} }
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
@ -572,6 +575,7 @@ func (db *DB) Close() error {
if db.lockf != nil { if db.lockf != nil {
merr.Add(db.lockf.Unlock()) merr.Add(db.lockf.Unlock())
} }
merr.Add(db.head.Close())
return merr.Err() return merr.Err()
} }
@ -615,7 +619,8 @@ func (db *DB) Snapshot(dir string) error {
return errors.Wrap(err, "error snapshotting headblock") return errors.Wrap(err, "error snapshotting headblock")
} }
} }
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
return errors.Wrap(err, "snapshot head block")
} }
// Querier returns a new querier over the data partition for the given time range. // Querier returns a new querier over the data partition for the given time range.

View file

@ -11,19 +11,20 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// +build !windows,!plan9,!solaris // +build !windows,!plan9
package tsdb package tsdb
import ( import (
"os" "os"
"syscall"
"golang.org/x/sys/unix"
) )
func mmap(f *os.File, length int) ([]byte, error) { func mmap(f *os.File, length int) ([]byte, error) {
return syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED) return unix.Mmap(int(f.Fd()), 0, length, unix.PROT_READ, unix.MAP_SHARED)
} }
func munmap(b []byte) (err error) { func munmap(b []byte) (err error) {
return syscall.Munmap(b) return unix.Munmap(b)
} }

View file

@ -3,6 +3,7 @@ package tsdb
import ( import (
"encoding/binary" "encoding/binary"
"hash" "hash"
"hash/crc32"
"unsafe" "unsafe"
) )
@ -77,6 +78,11 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// crc32 returns a CRC32 checksum over the remaining bytes.
func (d *decbuf) crc32() uint32 {
return crc32.Checksum(d.b, castagnoliTable)
}
func (d *decbuf) uvarintStr() string { func (d *decbuf) uvarintStr() string {
l := d.uvarint64() l := d.uvarint64()
if d.e != nil { if d.e != nil {

View file

@ -66,7 +66,7 @@ type Head struct {
postings *memPostings // postings lists for terms postings *memPostings // postings lists for terms
tombstones tombstoneReader tombstones memTombstones
} }
type headMetrics struct { type headMetrics struct {
@ -186,7 +186,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
values: map[string]stringset{}, values: map[string]stringset{},
symbols: map[string]struct{}{}, symbols: map[string]struct{}{},
postings: newUnorderedMemPostings(), postings: newUnorderedMemPostings(),
tombstones: newEmptyTombstoneReader(), tombstones: memTombstones{},
} }
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)
@ -574,8 +574,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := h.indexRange(mint, maxt) ir := h.indexRange(mint, maxt)
pr := newPostingsReader(ir) p, absent, err := PostingsForMatchers(ir, ms...)
p, absent := pr.Select(ms...) if err != nil {
return errors.Wrap(err, "select series")
}
var stones []Stone var stones []Stone
@ -739,6 +741,11 @@ func (h *Head) MaxTime() int64 {
return atomic.LoadInt64(&h.maxTime) return atomic.LoadInt64(&h.maxTime)
} }
// Close flushes the WAL and closes the head.
func (h *Head) Close() error {
return h.wal.Close()
}
type headChunkReader struct { type headChunkReader struct {
head *Head head *Head
mint, maxt int64 mint, maxt int64

View file

@ -560,7 +560,7 @@ type StringTuples interface {
type indexReader struct { type indexReader struct {
// The underlying byte slice holding the encoded series data. // The underlying byte slice holding the encoded series data.
b []byte b ByteSlice
toc indexTOC toc indexTOC
// Close that releases the underlying resources of the byte slice. // Close that releases the underlying resources of the byte slice.
@ -575,33 +575,62 @@ type indexReader struct {
// prevents memory faults when applications work with read symbols after // prevents memory faults when applications work with read symbols after
// the block has been unmapped. // the block has been unmapped.
symbols map[uint32]string symbols map[uint32]string
crc32 hash.Hash32
} }
var ( var (
errInvalidSize = fmt.Errorf("invalid size") errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag") errInvalidFlag = fmt.Errorf("invalid flag")
errInvalidChecksum = fmt.Errorf("invalid checksum")
) )
// NewIndexReader returns a new IndexReader on the given directory. // ByteSlice abstracts a byte slice.
func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) } type ByteSlice interface {
Len() int
Range(start, end int) []byte
}
// newIndexReader returns a new indexReader on the given directory. type realByteSlice []byte
func newIndexReader(dir string) (*indexReader, error) {
f, err := openMmapFile(filepath.Join(dir, "index")) func (b realByteSlice) Len() int {
return len(b)
}
func (b realByteSlice) Range(start, end int) []byte {
return b[start:end]
}
func (b realByteSlice) Sub(start, end int) ByteSlice {
return b[start:end]
}
// NewIndexReader returns a new IndexReader on the given byte slice.
func NewIndexReader(b ByteSlice) (IndexReader, error) {
return newIndexReader(b, nil)
}
// NewFileIndexReader returns a new index reader against the given index file.
func NewFileIndexReader(path string) (IndexReader, error) {
f, err := openMmapFile(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
r := &indexReader{ return newIndexReader(realByteSlice(f.b), f)
b: f.b, }
c: f,
symbols: map[uint32]string{},
}
func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) {
r := &indexReader{
b: b,
c: c,
symbols: map[uint32]string{},
crc32: newCRC32(),
}
// Verify magic number. // Verify magic number.
if len(f.b) < 4 { if b.Len() < 4 {
return nil, errors.Wrap(errInvalidSize, "index header") return nil, errors.Wrap(errInvalidSize, "index header")
} }
if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
return nil, errors.Errorf("invalid magic number %x", m) return nil, errors.Errorf("invalid magic number %x", m)
} }
@ -611,6 +640,7 @@ func newIndexReader(dir string) (*indexReader, error) {
if err := r.readSymbols(int(r.toc.symbols)); err != nil { if err := r.readSymbols(int(r.toc.symbols)); err != nil {
return nil, errors.Wrap(err, "read symbols") return nil, errors.Wrap(err, "read symbols")
} }
var err error
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
if err != nil { if err != nil {
@ -621,7 +651,17 @@ func newIndexReader(dir string) (*indexReader, error) {
} }
func (r *indexReader) readTOC() error { func (r *indexReader) readTOC() error {
d := r.decbufAt(len(r.b) - indexTOCLen) if r.b.Len() < indexTOCLen {
return errInvalidSize
}
b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len())
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
d := decbuf{b: b[:len(b)-4]}
if d.crc32() != expCRC {
return errInvalidChecksum
}
r.toc.symbols = d.be64() r.toc.symbols = d.be64()
r.toc.series = d.be64() r.toc.series = d.be64()
@ -630,16 +670,61 @@ func (r *indexReader) readTOC() error {
r.toc.postings = d.be64() r.toc.postings = d.be64()
r.toc.postingsTable = d.be64() r.toc.postingsTable = d.be64()
// TODO(fabxc): validate checksum. return d.err()
return nil
} }
// decbufAt returns a new decoding buffer. It expects the first 4 bytes
// after offset to hold the big endian encoded content length, followed by the contents and the expected
// checksum.
func (r *indexReader) decbufAt(off int) decbuf { func (r *indexReader) decbufAt(off int) decbuf {
if len(r.b) < off { if r.b.Len() < off+4 {
return decbuf{e: errInvalidSize} return decbuf{e: errInvalidSize}
} }
return decbuf{b: r.b[off:]} b := r.b.Range(off, off+4)
l := int(binary.BigEndian.Uint32(b))
if r.b.Len() < off+4+l+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+4, off+4+l+4)
dec := decbuf{b: b[:len(b)-4]}
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
// checksum.
func (r *indexReader) decbufUvarintAt(off int) decbuf {
// We never have to access this method at the far end of the byte slice. Thus just checking
// against the MaxVarintLen32 is sufficient.
if r.b.Len() < off+binary.MaxVarintLen32 {
return decbuf{e: errInvalidSize}
}
b := r.b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b)
if n > binary.MaxVarintLen32 {
return decbuf{e: errors.New("invalid uvarint")}
}
if r.b.Len() < off+n+int(l)+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+n, off+n+int(l)+4)
dec := decbuf{b: b[:len(b)-4]}
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
return decbuf{e: errInvalidChecksum}
}
return dec
} }
// readSymbols reads the symbol table fully into memory and allocates proper strings for them. // readSymbols reads the symbol table fully into memory and allocates proper strings for them.
@ -649,22 +734,22 @@ func (r *indexReader) readSymbols(off int) error {
if off == 0 { if off == 0 {
return nil return nil
} }
d := r.decbufAt(off)
var ( var (
d1 = r.decbufAt(int(off)) origLen = d.len()
d2 = d1.decbuf(d1.be32int()) cnt = d.be32int()
origLen = d2.len()
cnt = d2.be32int()
basePos = uint32(off) + 4 basePos = uint32(off) + 4
nextPos = basePos + uint32(origLen-d2.len()) nextPos = basePos + uint32(origLen-d.len())
) )
for d2.err() == nil && d2.len() > 0 && cnt > 0 { for d.err() == nil && d.len() > 0 && cnt > 0 {
s := d2.uvarintStr() s := d.uvarintStr()
r.symbols[uint32(nextPos)] = s r.symbols[uint32(nextPos)] = s
nextPos = basePos + uint32(origLen-d2.len()) nextPos = basePos + uint32(origLen-d.len())
cnt-- cnt--
} }
return d2.err() return d.err()
} }
// readOffsetTable reads an offset table at the given position and returns a map // readOffsetTable reads an offset table at the given position and returns a map
@ -672,53 +757,29 @@ func (r *indexReader) readSymbols(off int) error {
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
const sep = "\xff" const sep = "\xff"
var ( d := r.decbufAt(int(off))
d1 = r.decbufAt(int(off)) cnt := d.be32()
d2 = d1.decbuf(d1.be32int())
cnt = d2.be32()
)
res := make(map[string]uint32, 512) res := make(map[string]uint32, cnt)
for d2.err() == nil && d2.len() > 0 && cnt > 0 { for d.err() == nil && d.len() > 0 && cnt > 0 {
keyCount := int(d2.uvarint()) keyCount := int(d.uvarint())
keys := make([]string, 0, keyCount) keys := make([]string, 0, keyCount)
for i := 0; i < keyCount; i++ { for i := 0; i < keyCount; i++ {
keys = append(keys, d2.uvarintStr()) keys = append(keys, d.uvarintStr())
} }
res[strings.Join(keys, sep)] = uint32(d2.uvarint()) res[strings.Join(keys, sep)] = uint32(d.uvarint())
cnt-- cnt--
} }
return res, d.err()
// TODO(fabxc): verify checksum from remainer of d1.
return res, d2.err()
} }
func (r *indexReader) Close() error { func (r *indexReader) Close() error {
return r.c.Close() return r.c.Close()
} }
func (r *indexReader) section(o uint32) (byte, []byte, error) {
b := r.b[o:]
if len(b) < 5 {
return 0, nil, errors.Wrap(errInvalidSize, "read header")
}
flag := b[0]
l := binary.BigEndian.Uint32(b[1:5])
b = b[5:]
// b must have the given length plus 4 bytes for the CRC32 checksum.
if len(b) < int(l)+4 {
return 0, nil, errors.Wrap(errInvalidSize, "section content")
}
return flag, b[:l], nil
}
func (r *indexReader) lookupSymbol(o uint32) (string, error) { func (r *indexReader) lookupSymbol(o uint32) (string, error) {
s, ok := r.symbols[o] s, ok := r.symbols[o]
if !ok { if !ok {
@ -748,21 +809,17 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
//return nil, fmt.Errorf("label index doesn't exist") //return nil, fmt.Errorf("label index doesn't exist")
} }
d1 := r.decbufAt(int(off)) d := r.decbufAt(int(off))
d2 := d1.decbuf(d1.be32int())
nc := d2.be32int() nc := d.be32int()
d2.be32() // consume unused value entry count. d.be32() // consume unused value entry count.
if d2.err() != nil { if d.err() != nil {
return nil, errors.Wrap(d2.err(), "read label value index") return nil, errors.Wrap(d.err(), "read label value index")
} }
// TODO(fabxc): verify checksum in 4 remaining bytes of d1.
st := &serializedStringTuples{ st := &serializedStringTuples{
l: nc, l: nc,
b: d2.get(), b: d.get(),
lookup: r.lookupSymbol, lookup: r.lookupSymbol,
} }
return st, nil return st, nil
@ -785,20 +842,19 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
} }
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
d1 := r.decbufAt(int(ref)) d := r.decbufUvarintAt(int(ref))
d2 := d1.decbuf(int(d1.uvarint()))
*lbls = (*lbls)[:0] *lbls = (*lbls)[:0]
*chks = (*chks)[:0] *chks = (*chks)[:0]
k := int(d2.uvarint()) k := int(d.uvarint())
for i := 0; i < k; i++ { for i := 0; i < k; i++ {
lno := uint32(d2.uvarint()) lno := uint32(d.uvarint())
lvo := uint32(d2.uvarint()) lvo := uint32(d.uvarint())
if d2.err() != nil { if d.err() != nil {
return errors.Wrap(d2.err(), "read series label offsets") return errors.Wrap(d.err(), "read series label offsets")
} }
ln, err := r.lookupSymbol(lno) ln, err := r.lookupSymbol(lno)
@ -814,15 +870,15 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
} }
// Read the chunks meta data. // Read the chunks meta data.
k = int(d2.uvarint()) k = int(d.uvarint())
if k == 0 { if k == 0 {
return nil return nil
} }
t0 := d2.varint64() t0 := d.varint64()
maxt := int64(d2.uvarint64()) + t0 maxt := int64(d.uvarint64()) + t0
ref0 := int64(d2.uvarint64()) ref0 := int64(d.uvarint64())
*chks = append(*chks, ChunkMeta{ *chks = append(*chks, ChunkMeta{
Ref: uint64(ref0), Ref: uint64(ref0),
@ -832,14 +888,14 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
t0 = maxt t0 = maxt
for i := 1; i < k; i++ { for i := 1; i < k; i++ {
mint := int64(d2.uvarint64()) + t0 mint := int64(d.uvarint64()) + t0
maxt := int64(d2.uvarint64()) + mint maxt := int64(d.uvarint64()) + mint
ref0 += d2.varint64() ref0 += d.varint64()
t0 = maxt t0 = maxt
if d2.err() != nil { if d.err() != nil {
return errors.Wrapf(d2.err(), "read meta for chunk %d", i) return errors.Wrapf(d.err(), "read meta for chunk %d", i)
} }
*chks = append(*chks, ChunkMeta{ *chks = append(*chks, ChunkMeta{
@ -848,10 +904,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
MaxTime: maxt, MaxTime: maxt,
}) })
} }
return d.err()
// TODO(fabxc): verify CRC32.
return nil
} }
func (r *indexReader) Postings(name, value string) (Postings, error) { func (r *indexReader) Postings(name, value string) (Postings, error) {
@ -862,19 +915,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
if !ok { if !ok {
return emptyPostings, nil return emptyPostings, nil
} }
d := r.decbufAt(int(off))
d.be32() // consume unused postings list length.
d1 := r.decbufAt(int(off)) return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes")
d2 := d1.decbuf(d1.be32int())
d2.be32() // consume unused postings list length.
if d2.err() != nil {
return nil, errors.Wrap(d2.err(), "get postings bytes")
}
// TODO(fabxc): read checksum from 4 remainer bytes of d1 and verify.
return newBigEndianPostings(d2.get()), nil
} }
func (r *indexReader) SortedPostings(p Postings) Postings { func (r *indexReader) SortedPostings(p Postings) Postings {

View file

@ -165,6 +165,11 @@ func (e errPostings) Err() error { return e.err }
var emptyPostings = errPostings{} var emptyPostings = errPostings{}
// EmptyPostings returns a postings list that's always empty.
func EmptyPostings() Postings {
return emptyPostings
}
// Intersect returns a new postings list over the intersection of the // Intersect returns a new postings list over the intersection of the
// input postings. // input postings.
func Intersect(its ...Postings) Postings { func Intersect(its ...Postings) Postings {

View file

@ -27,7 +27,7 @@ import (
// time range. // time range.
type Querier interface { type Querier interface {
// Select returns a set of series that matches the given label matchers. // Select returns a set of series that matches the given label matchers.
Select(...labels.Matcher) SeriesSet Select(...labels.Matcher) (SeriesSet, error)
// LabelValues returns all potential values for a label name. // LabelValues returns all potential values for a label name.
LabelValues(string) ([]string, error) LabelValues(string) ([]string, error)
@ -81,20 +81,29 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented") return nil, fmt.Errorf("not implemented")
} }
func (q *querier) Select(ms ...labels.Matcher) SeriesSet { func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) {
return q.sel(q.blocks, ms) return q.sel(q.blocks, ms)
} }
func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet { func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) {
if len(qs) == 0 { if len(qs) == 0 {
return nopSeriesSet{} return EmptySeriesSet(), nil
} }
if len(qs) == 1 { if len(qs) == 1 {
return qs[0].Select(ms...) return qs[0].Select(ms...)
} }
l := len(qs) / 2 l := len(qs) / 2
return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms))
a, err := q.sel(qs[:l], ms)
if err != nil {
return nil, err
}
b, err := q.sel(qs[l:], ms)
if err != nil {
return nil, err
}
return newMergedSeriesSet(a, b), nil
} }
func (q *querier) Close() error { func (q *querier) Close() error {
@ -141,20 +150,14 @@ type blockQuerier struct {
mint, maxt int64 mint, maxt int64
} }
func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) {
pr := newPostingsReader(q.index) base, err := LookupChunkSeries(q.index, q.tombstones, ms...)
if err != nil {
p, absent := pr.Select(ms...) return nil, err
}
return &blockSeriesSet{ return &blockSeriesSet{
set: &populatedChunkSeries{ set: &populatedChunkSeries{
set: &baseChunkSeries{ set: base,
p: p,
index: q.index,
absent: absent,
tombstones: q.tombstones,
},
chunks: q.chunks, chunks: q.chunks,
mint: q.mint, mint: q.mint,
maxt: q.maxt, maxt: q.maxt,
@ -162,7 +165,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
mint: q.mint, mint: q.mint,
maxt: q.maxt, maxt: q.maxt,
} }, nil
} }
func (q *blockQuerier) LabelValues(name string) ([]string, error) { func (q *blockQuerier) LabelValues(name string) ([]string, error) {
@ -196,16 +199,10 @@ func (q *blockQuerier) Close() error {
return merr.Err() return merr.Err()
} }
// postingsReader is used to select matching postings from an IndexReader. // PostingsForMatchers assembles a single postings iterator against the index reader
type postingsReader struct { // based on the given matchers. It returns a list of label names that must be manually
index IndexReader // checked to not exist in series the postings list points to.
} func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) {
func newPostingsReader(i IndexReader) *postingsReader {
return &postingsReader{index: i}
}
func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
var ( var (
its []Postings its []Postings
absent []string absent []string
@ -217,12 +214,13 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
absent = append(absent, m.Name()) absent = append(absent, m.Name())
continue continue
} }
its = append(its, r.selectSingle(m)) it, err := postingsForMatcher(index, m)
if err != nil {
return nil, nil, err
}
its = append(its, it)
} }
return index.SortedPostings(Intersect(its...)), absent, nil
p := Intersect(its...)
return r.index.SortedPostings(p), absent
} }
// tuplesByPrefix uses binary search to find prefix matches within ts. // tuplesByPrefix uses binary search to find prefix matches within ts.
@ -256,33 +254,33 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error)
return matches, nil return matches, nil
} }
func (r *postingsReader) selectSingle(m labels.Matcher) Postings { func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) {
// Fast-path for equal matching. // Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok { if em, ok := m.(*labels.EqualMatcher); ok {
it, err := r.index.Postings(em.Name(), em.Value()) it, err := index.Postings(em.Name(), em.Value())
if err != nil { if err != nil {
return errPostings{err: err} return nil, err
} }
return it return it, nil
} }
tpls, err := r.index.LabelValues(m.Name()) tpls, err := index.LabelValues(m.Name())
if err != nil { if err != nil {
return errPostings{err: err} return nil, err
} }
var res []string var res []string
if pm, ok := m.(*labels.PrefixMatcher); ok { if pm, ok := m.(*labels.PrefixMatcher); ok {
res, err = tuplesByPrefix(pm, tpls) res, err = tuplesByPrefix(pm, tpls)
if err != nil { if err != nil {
return errPostings{err: err} return nil, err
} }
} else { } else {
for i := 0; i < tpls.Len(); i++ { for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i) vals, err := tpls.At(i)
if err != nil { if err != nil {
return errPostings{err: err} return nil, err
} }
if m.Matches(vals[0]) { if m.Matches(vals[0]) {
res = append(res, vals[0]) res = append(res, vals[0])
@ -291,20 +289,20 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
} }
if len(res) == 0 { if len(res) == 0 {
return emptyPostings return EmptyPostings(), nil
} }
var rit []Postings var rit []Postings
for _, v := range res { for _, v := range res {
it, err := r.index.Postings(m.Name(), v) it, err := index.Postings(m.Name(), v)
if err != nil { if err != nil {
return errPostings{err: err} return nil, err
} }
rit = append(rit, it) rit = append(rit, it)
} }
return Merge(rit...) return Merge(rit...), nil
} }
func mergeStrings(a, b []string) []string { func mergeStrings(a, b []string) []string {
@ -342,11 +340,12 @@ type SeriesSet interface {
Err() error Err() error
} }
type nopSeriesSet struct{} var emptySeriesSet = errSeriesSet{}
func (nopSeriesSet) Next() bool { return false } // EmptySeriesSet returns a series set that's always empty.
func (nopSeriesSet) At() Series { return nil } func EmptySeriesSet() SeriesSet {
func (nopSeriesSet) Err() error { return nil } return emptySeriesSet
}
// mergedSeriesSet takes two series sets as a single series set. The input series sets // mergedSeriesSet takes two series sets as a single series set. The input series sets
// must be sorted and sequential in time, i.e. if they have the same label set, // must be sorted and sequential in time, i.e. if they have the same label set,
@ -418,7 +417,7 @@ func (s *mergedSeriesSet) Next() bool {
return true return true
} }
type chunkSeriesSet interface { type ChunkSeriesSet interface {
Next() bool Next() bool
At() (labels.Labels, []ChunkMeta, Intervals) At() (labels.Labels, []ChunkMeta, Intervals)
Err() error Err() error
@ -438,6 +437,24 @@ type baseChunkSeries struct {
err error err error
} }
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
// over them. It drops chunks based on tombstones in the given reader.
func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) {
if tr == nil {
tr = EmptyTombstoneReader()
}
p, absent, err := PostingsForMatchers(ir, ms...)
if err != nil {
return nil, err
}
return &baseChunkSeries{
p: p,
index: ir,
tombstones: tr,
absent: absent,
}, nil
}
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
return s.lset, s.chks, s.intervals return s.lset, s.chks, s.intervals
} }
@ -448,6 +465,7 @@ func (s *baseChunkSeries) Next() bool {
var ( var (
lset labels.Labels lset labels.Labels
chunks []ChunkMeta chunks []ChunkMeta
err error
) )
Outer: Outer:
for s.p.Next() { for s.p.Next() {
@ -470,7 +488,11 @@ Outer:
s.lset = lset s.lset = lset
s.chks = chunks s.chks = chunks
s.intervals = s.tombstones.Get(s.p.At()) s.intervals, err = s.tombstones.Get(s.p.At())
if err != nil {
s.err = errors.Wrap(err, "get tombstones")
return false
}
if len(s.intervals) > 0 { if len(s.intervals) > 0 {
// Only those chunks that are not entirely deleted. // Only those chunks that are not entirely deleted.
@ -496,7 +518,7 @@ Outer:
// with known chunk references. It filters out chunks that do not fit the // with known chunk references. It filters out chunks that do not fit the
// given time range. // given time range.
type populatedChunkSeries struct { type populatedChunkSeries struct {
set chunkSeriesSet set ChunkSeriesSet
chunks ChunkReader chunks ChunkReader
mint, maxt int64 mint, maxt int64
@ -553,7 +575,7 @@ func (s *populatedChunkSeries) Next() bool {
// blockSeriesSet is a set of series from an inverted index query. // blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct { type blockSeriesSet struct {
set chunkSeriesSet set ChunkSeriesSet
err error err error
cur Series cur Series

View file

@ -35,12 +35,17 @@ const (
// TombstoneReader gives access to tombstone intervals by series reference. // TombstoneReader gives access to tombstone intervals by series reference.
type TombstoneReader interface { type TombstoneReader interface {
Get(ref uint64) Intervals // Get returns deletion intervals for the series with the given reference.
Get(ref uint64) (Intervals, error)
// Iter calls the given function for each encountered interval.
Iter(func(uint64, Intervals) error) error
// Close any underlying resources
Close() error Close() error
} }
func writeTombstoneFile(dir string, tr tombstoneReader) error { func writeTombstoneFile(dir string, tr TombstoneReader) error {
path := filepath.Join(dir, tombstoneFilename) path := filepath.Join(dir, tombstoneFilename)
tmp := path + ".tmp" tmp := path + ".tmp"
hash := newCRC32() hash := newCRC32()
@ -67,19 +72,21 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
mw := io.MultiWriter(f, hash) mw := io.MultiWriter(f, hash)
for k, v := range tr { tr.Iter(func(ref uint64, ivs Intervals) error {
for _, itv := range v { for _, iv := range ivs {
buf.reset() buf.reset()
buf.putUvarint64(k)
buf.putVarint64(itv.Mint) buf.putUvarint64(ref)
buf.putVarint64(itv.Maxt) buf.putVarint64(iv.Mint)
buf.putVarint64(iv.Maxt)
_, err = mw.Write(buf.get()) _, err = mw.Write(buf.get())
if err != nil { if err != nil {
return err return err
} }
} }
} return nil
})
_, err = f.Write(hash.Sum(nil)) _, err = f.Write(hash.Sum(nil))
if err != nil { if err != nil {
@ -100,7 +107,7 @@ type Stone struct {
intervals Intervals intervals Intervals
} }
func readTombstones(dir string) (tombstoneReader, error) { func readTombstones(dir string) (memTombstones, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if err != nil { if err != nil {
return nil, err return nil, err
@ -131,7 +138,8 @@ func readTombstones(dir string) (tombstoneReader, error) {
return nil, errors.New("checksum did not match") return nil, errors.New("checksum did not match")
} }
stonesMap := newEmptyTombstoneReader() stonesMap := memTombstones{}
for d.len() > 0 { for d.len() > 0 {
k := d.uvarint64() k := d.uvarint64()
mint := d.varint64() mint := d.varint64()
@ -143,28 +151,36 @@ func readTombstones(dir string) (tombstoneReader, error) {
stonesMap.add(k, Interval{mint, maxt}) stonesMap.add(k, Interval{mint, maxt})
} }
return newTombstoneReader(stonesMap), nil return stonesMap, nil
} }
type tombstoneReader map[uint64]Intervals type memTombstones map[uint64]Intervals
func newTombstoneReader(ts map[uint64]Intervals) tombstoneReader { var emptyTombstoneReader = memTombstones{}
return tombstoneReader(ts)
// EmptyTombstoneReader returns a TombstoneReader that is always empty.
func EmptyTombstoneReader() TombstoneReader {
return emptyTombstoneReader
} }
func newEmptyTombstoneReader() tombstoneReader { func (t memTombstones) Get(ref uint64) (Intervals, error) {
return tombstoneReader(make(map[uint64]Intervals)) return t[ref], nil
} }
func (t tombstoneReader) Get(ref uint64) Intervals { func (t memTombstones) Iter(f func(uint64, Intervals) error) error {
return t[ref] for ref, ivs := range t {
if err := f(ref, ivs); err != nil {
return err
}
}
return nil
} }
func (t tombstoneReader) add(ref uint64, itv Interval) { func (t memTombstones) add(ref uint64, itv Interval) {
t[ref] = t[ref].add(itv) t[ref] = t[ref].add(itv)
} }
func (tombstoneReader) Close() error { func (memTombstones) Close() error {
return nil return nil
} }

BIN
vendor/github.com/prometheus/tsdb/tsdb.test generated vendored Executable file

Binary file not shown.

View file

@ -63,11 +63,11 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
m := &walMetrics{} m := &walMetrics{}
m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_wal_fsync_duration_seconds", Name: "prometheus_tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.", Help: "Duration of WAL fsync.",
}) })
m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{ m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_wal_corruptions_total", Name: "prometheus_tsdb_wal_corruptions_total",
Help: "Total number of WAL corruptions.", Help: "Total number of WAL corruptions.",
}) })

20
vendor/vendor.json vendored
View file

@ -782,28 +782,28 @@
"revisionTime": "2016-04-11T19:08:41Z" "revisionTime": "2016-04-11T19:08:41Z"
}, },
{ {
"checksumSHA1": "Bty/r75M8kM+GA80eMM5p0cLTi0=", "checksumSHA1": "c3VEi8SL0XmI6BmokMOWrSWmNu8=",
"path": "github.com/prometheus/tsdb", "path": "github.com/prometheus/tsdb",
"revision": "706602daed1487f7849990678b4ece4599745905", "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a",
"revisionTime": "2017-11-04T07:45:56Z" "revisionTime": "2017-11-23T17:41:24Z"
}, },
{ {
"checksumSHA1": "uy6ySJ6EZqof+yMD2wTkYob8BeU=", "checksumSHA1": "C5V8KPHm/gZF0qrNwmIEDdG6rhA=",
"path": "github.com/prometheus/tsdb/chunks", "path": "github.com/prometheus/tsdb/chunks",
"revision": "706602daed1487f7849990678b4ece4599745905", "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a",
"revisionTime": "2017-11-04T07:45:56Z" "revisionTime": "2017-11-23T17:41:24Z"
}, },
{ {
"checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=", "checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=",
"path": "github.com/prometheus/tsdb/fileutil", "path": "github.com/prometheus/tsdb/fileutil",
"revision": "706602daed1487f7849990678b4ece4599745905", "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a",
"revisionTime": "2017-11-04T07:45:56Z" "revisionTime": "2017-11-23T17:41:24Z"
}, },
{ {
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
"path": "github.com/prometheus/tsdb/labels", "path": "github.com/prometheus/tsdb/labels",
"revision": "706602daed1487f7849990678b4ece4599745905", "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a",
"revisionTime": "2017-11-04T07:45:56Z" "revisionTime": "2017-11-23T17:41:24Z"
}, },
{ {
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",

View file

@ -380,7 +380,11 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
var set storage.SeriesSet var set storage.SeriesSet
for _, mset := range matcherSets { for _, mset := range matcherSets {
set = storage.DeduplicateSeriesSet(set, q.Select(mset...)) s, err := q.Select(mset...)
if err != nil {
return nil, &apiError{errorExec, err}
}
set = storage.DeduplicateSeriesSet(set, s)
} }
metrics := []labels.Labels{} metrics := []labels.Labels{}
@ -517,7 +521,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
} }
} }
resp.Results[i], err = remote.ToQueryResult(querier.Select(filteredMatchers...)) set, err := querier.Select(filteredMatchers...)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp.Results[i], err = remote.ToQueryResult(set)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return

View file

@ -75,7 +75,13 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
var set storage.SeriesSet var set storage.SeriesSet
for _, mset := range matcherSets { for _, mset := range matcherSets {
set = storage.DeduplicateSeriesSet(set, q.Select(mset...)) s, err := q.Select(mset...)
if err != nil {
federationErrors.Inc()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
set = storage.DeduplicateSeriesSet(set, s)
} }
if set == nil { if set == nil {
return return