vendor: update prometheus/tsdb

This commit is contained in:
Fabian Reinartz 2017-05-16 16:41:25 +02:00
parent 24ce79f75d
commit fbb22ddd8f
7 changed files with 334 additions and 258 deletions

View file

@ -48,8 +48,8 @@ type Block interface {
Queryable Queryable
} }
// HeadBlock is a regular block that can still be appended to. // headBlock is a regular block that can still be appended to.
type HeadBlock interface { type headBlock interface {
Block Block
Appendable Appendable
} }

View file

@ -118,7 +118,7 @@ type DB struct {
// or the general layout. // or the general layout.
// Must never be held when acquiring a blocks's mutex! // Must never be held when acquiring a blocks's mutex!
headmtx sync.RWMutex headmtx sync.RWMutex
heads []HeadBlock heads []headBlock
compactor Compactor compactor Compactor
@ -401,7 +401,7 @@ func (db *DB) reloadBlocks() error {
var ( var (
metas []*BlockMeta metas []*BlockMeta
blocks []Block blocks []Block
heads []HeadBlock heads []headBlock
seqBlocks = make(map[int]Block, len(dirs)) seqBlocks = make(map[int]Block, len(dirs))
) )
@ -418,7 +418,7 @@ func (db *DB) reloadBlocks() error {
if meta.Compaction.Generation == 0 { if meta.Compaction.Generation == 0 {
if !ok { if !ok {
b, err = openHeadBlock(dirs[i], db.logger) b, err = db.openHeadBlock(dirs[i])
if err != nil { if err != nil {
return errors.Wrapf(err, "load head at %s", dirs[i]) return errors.Wrapf(err, "load head at %s", dirs[i])
} }
@ -426,7 +426,7 @@ func (db *DB) reloadBlocks() error {
if meta.ULID != b.Meta().ULID { if meta.ULID != b.Meta().ULID {
return errors.Errorf("head block ULID changed unexpectedly") return errors.Errorf("head block ULID changed unexpectedly")
} }
heads = append(heads, b.(HeadBlock)) heads = append(heads, b.(headBlock))
} else { } else {
if !ok || meta.ULID != b.Meta().ULID { if !ok || meta.ULID != b.Meta().ULID {
b, err = newPersistedBlock(dirs[i]) b, err = newPersistedBlock(dirs[i])
@ -559,7 +559,7 @@ func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) {
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
a.db.headmtx.Lock() a.db.headmtx.Lock()
var newHeads []HeadBlock var newHeads []headBlock
if err := a.db.ensureHead(t); err != nil { if err := a.db.ensureHead(t); err != nil {
a.db.headmtx.Unlock() a.db.headmtx.Unlock()
@ -670,9 +670,9 @@ func (a *dbAppender) Rollback() error {
} }
// appendable returns a copy of a slice of HeadBlocks that can still be appended to. // appendable returns a copy of a slice of HeadBlocks that can still be appended to.
func (db *DB) appendable() []HeadBlock { func (db *DB) appendable() []headBlock {
var i int var i int
app := make([]HeadBlock, 0, db.opts.AppendableBlocks) app := make([]headBlock, 0, db.opts.AppendableBlocks)
if len(db.heads) > db.opts.AppendableBlocks { if len(db.heads) > db.opts.AppendableBlocks {
i = len(db.heads) - db.opts.AppendableBlocks i = len(db.heads) - db.opts.AppendableBlocks
@ -709,16 +709,37 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
return bs return bs
} }
// openHeadBlock opens the head block at dir.
func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
var (
wdir = filepath.Join(dir, "wal")
l = log.With(db.logger, "wal", wdir)
)
wal, err := OpenSegmentWAL(wdir, l, 5*time.Second)
if err != nil {
return nil, errors.Wrap(err, "open WAL %s")
}
h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal)
if err != nil {
return nil, errors.Wrapf(err, "open head block %s", dir)
}
return h, nil
}
// cut starts a new head block to append to. The completed head block // cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period. // will still be appendable for the configured grace period.
func (db *DB) cut(mint int64) (HeadBlock, error) { func (db *DB) cut(mint int64) (headBlock, error) {
maxt := mint + int64(db.opts.MinBlockDuration) maxt := mint + int64(db.opts.MinBlockDuration)
dir, seq, err := nextSequenceFile(db.dir, "b-") dir, seq, err := nextSequenceFile(db.dir, "b-")
if err != nil { if err != nil {
return nil, err return nil, err
} }
newHead, err := createHeadBlock(dir, seq, db.logger, mint, maxt) if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil {
return nil, errors.Wrapf(err, "touch head block %s", dir)
}
newHead, err := db.openHeadBlock(dir)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -47,11 +47,11 @@ var (
ErrOutOfBounds = errors.New("out of bounds") ErrOutOfBounds = errors.New("out of bounds")
) )
// headBlock handles reads and writes of time series data within a time window. // HeadBlock handles reads and writes of time series data within a time window.
type headBlock struct { type HeadBlock struct {
mtx sync.RWMutex mtx sync.RWMutex
dir string dir string
wal *WAL wal WAL
activeWriters uint64 activeWriters uint64
closed bool closed bool
@ -69,19 +69,21 @@ type headBlock struct {
meta BlockMeta meta BlockMeta
} }
func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { // TouchHeadBlock atomically touches a new head block in dir for
// samples in the range [mint,maxt).
func TouchHeadBlock(dir string, seq int, mint, maxt int64) error {
// Make head block creation appear atomic. // Make head block creation appear atomic.
tmp := dir + ".tmp" tmp := dir + ".tmp"
if err := os.MkdirAll(tmp, 0777); err != nil { if err := os.MkdirAll(tmp, 0777); err != nil {
return nil, err return err
} }
entropy := rand.New(rand.NewSource(time.Now().UnixNano())) entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
ulid, err := ulid.New(ulid.Now(), entropy) ulid, err := ulid.New(ulid.Now(), entropy)
if err != nil { if err != nil {
return nil, err return err
} }
if err := writeMetaFile(tmp, &BlockMeta{ if err := writeMetaFile(tmp, &BlockMeta{
@ -90,38 +92,33 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head
MinTime: mint, MinTime: mint,
MaxTime: maxt, MaxTime: maxt,
}); err != nil { }); err != nil {
return nil, err return err
} }
if err := renameFile(tmp, dir); err != nil { return renameFile(tmp, dir)
return nil, err
}
return openHeadBlock(dir, l)
} }
// openHeadBlock creates a new empty head block. // OpenHeadBlock opens the head block in dir.
func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
wal, err := OpenWAL(dir, log.With(l, "component", "wal"), 5*time.Second)
if err != nil {
return nil, err
}
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
h := &headBlock{ h := &HeadBlock{
dir: dir, dir: dir,
wal: wal, wal: wal,
series: []*memSeries{}, series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
hashes: map[uint64][]*memSeries{}, hashes: map[uint64][]*memSeries{},
values: map[string]stringset{}, values: map[string]stringset{},
postings: &memPostings{m: make(map[term][]uint32)}, postings: &memPostings{m: make(map[term][]uint32)},
meta: *meta, meta: *meta,
} }
return h, h.init()
}
r := wal.Reader() func (h *HeadBlock) init() error {
r := h.wal.Reader()
Outer:
for r.Next() { for r.Next() {
series, samples := r.At() series, samples := r.At()
@ -130,37 +127,32 @@ Outer:
h.meta.Stats.NumSeries++ h.meta.Stats.NumSeries++
} }
for _, s := range samples { for _, s := range samples {
if int(s.ref) >= len(h.series) { if int(s.Ref) >= len(h.series) {
l.Log("msg", "unknown series reference, abort WAL restore", "got", s.ref, "max", len(h.series)-1) return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series))
break Outer
} }
h.series[s.ref].append(s.t, s.v) h.series[s.Ref].append(s.T, s.V)
if !h.inBounds(s.t) { if !h.inBounds(s.T) {
return nil, errors.Wrap(ErrOutOfBounds, "consume WAL") return errors.Wrap(ErrOutOfBounds, "consume WAL")
} }
h.meta.Stats.NumSamples++ h.meta.Stats.NumSamples++
} }
} }
if err := r.Err(); err != nil { return errors.Wrap(r.Err(), "consume WAL")
return nil, errors.Wrap(err, "consume WAL")
}
return h, nil
} }
// inBounds returns true if the given timestamp is within the valid // inBounds returns true if the given timestamp is within the valid
// time bounds of the block. // time bounds of the block.
func (h *headBlock) inBounds(t int64) bool { func (h *HeadBlock) inBounds(t int64) bool {
return t >= h.meta.MinTime && t <= h.meta.MaxTime return t >= h.meta.MinTime && t <= h.meta.MaxTime
} }
func (h *headBlock) String() string { func (h *HeadBlock) String() string {
return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID) return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID)
} }
// Close syncs all data and closes underlying resources of the head block. // Close syncs all data and closes underlying resources of the head block.
func (h *headBlock) Close() error { func (h *HeadBlock) Close() error {
h.mtx.Lock() h.mtx.Lock()
defer h.mtx.Unlock() defer h.mtx.Unlock()
@ -184,7 +176,7 @@ func (h *headBlock) Close() error {
return nil return nil
} }
func (h *headBlock) Meta() BlockMeta { func (h *HeadBlock) Meta() BlockMeta {
m := BlockMeta{ m := BlockMeta{
ULID: h.meta.ULID, ULID: h.meta.ULID,
Sequence: h.meta.Sequence, Sequence: h.meta.Sequence,
@ -200,12 +192,12 @@ func (h *headBlock) Meta() BlockMeta {
return m return m
} }
func (h *headBlock) Dir() string { return h.dir } func (h *HeadBlock) Dir() string { return h.dir }
func (h *headBlock) Persisted() bool { return false } func (h *HeadBlock) Persisted() bool { return false }
func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
func (h *headBlock) Querier(mint, maxt int64) Querier { func (h *HeadBlock) Querier(mint, maxt int64) Querier {
h.mtx.RLock() h.mtx.RLock()
defer h.mtx.RUnlock() defer h.mtx.RUnlock()
@ -244,7 +236,7 @@ func (h *headBlock) Querier(mint, maxt int64) Querier {
} }
} }
func (h *headBlock) Appender() Appender { func (h *HeadBlock) Appender() Appender {
atomic.AddUint64(&h.activeWriters, 1) atomic.AddUint64(&h.activeWriters, 1)
h.mtx.RLock() h.mtx.RLock()
@ -252,36 +244,36 @@ func (h *headBlock) Appender() Appender {
if h.closed { if h.closed {
panic(fmt.Sprintf("block %s already closed", h.dir)) panic(fmt.Sprintf("block %s already closed", h.dir))
} }
return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()}
} }
func (h *headBlock) Busy() bool { func (h *HeadBlock) Busy() bool {
return atomic.LoadUint64(&h.activeWriters) > 0 return atomic.LoadUint64(&h.activeWriters) > 0
} }
var headPool = sync.Pool{} var headPool = sync.Pool{}
func getHeadAppendBuffer() []refdSample { func getHeadAppendBuffer() []RefSample {
b := headPool.Get() b := headPool.Get()
if b == nil { if b == nil {
return make([]refdSample, 0, 512) return make([]RefSample, 0, 512)
} }
return b.([]refdSample) return b.([]RefSample)
} }
func putHeadAppendBuffer(b []refdSample) { func putHeadAppendBuffer(b []RefSample) {
headPool.Put(b[:0]) headPool.Put(b[:0])
} }
type headAppender struct { type headAppender struct {
*headBlock *HeadBlock
newSeries map[uint64]hashedLabels newSeries map[uint64]hashedLabels
newHashes map[uint64]uint64 newHashes map[uint64]uint64
refmap map[uint64]uint64 refmap map[uint64]uint64
newLabels []labels.Labels newLabels []labels.Labels
samples []refdSample samples []RefSample
} }
type hashedLabels struct { type hashedLabels struct {
@ -289,12 +281,6 @@ type hashedLabels struct {
labels labels.Labels labels labels.Labels
} }
type refdSample struct {
ref uint64
t int64
v float64
}
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if !a.inBounds(t) { if !a.inBounds(t) {
return 0, ErrOutOfBounds return 0, ErrOutOfBounds
@ -369,10 +355,10 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
} }
} }
a.samples = append(a.samples, refdSample{ a.samples = append(a.samples, RefSample{
ref: ref, Ref: ref,
t: t, T: t,
v: v, V: v,
}) })
return nil return nil
} }
@ -418,8 +404,8 @@ func (a *headAppender) Commit() error {
for i := range a.samples { for i := range a.samples {
s := &a.samples[i] s := &a.samples[i]
if s.ref&(1<<32) > 0 { if s.Ref&(1<<32) > 0 {
s.ref = a.refmap[s.ref] s.Ref = a.refmap[s.Ref]
} }
} }
@ -433,7 +419,7 @@ func (a *headAppender) Commit() error {
total := uint64(len(a.samples)) total := uint64(len(a.samples))
for _, s := range a.samples { for _, s := range a.samples {
if !a.series[s.ref].append(s.t, s.v) { if !a.series[s.Ref].append(s.T, s.V) {
total-- total--
} }
} }
@ -454,7 +440,7 @@ func (a *headAppender) Rollback() error {
} }
type headChunkReader struct { type headChunkReader struct {
*headBlock *HeadBlock
} }
// Chunk returns the chunk for the reference number. // Chunk returns the chunk for the reference number.
@ -490,7 +476,7 @@ func (c *safeChunk) Iterator() chunks.Iterator {
// func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } // func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") }
type headIndexReader struct { type headIndexReader struct {
*headBlock *HeadBlock
} }
// LabelValues returns the possible label values // LabelValues returns the possible label values
@ -558,7 +544,7 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) {
// get retrieves the chunk with the hash and label set and creates // get retrieves the chunk with the hash and label set and creates
// a new one if it doesn't exist yet. // a new one if it doesn't exist yet.
func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { func (h *HeadBlock) get(hash uint64, lset labels.Labels) *memSeries {
series := h.hashes[hash] series := h.hashes[hash]
for _, s := range series { for _, s := range series {
@ -569,11 +555,13 @@ func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries {
return nil return nil
} }
func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries {
s := &memSeries{ s := &memSeries{
lset: lset, lset: lset,
ref: uint32(len(h.series)), ref: uint32(len(h.series)),
} }
// create the initial chunk and appender
s.cut()
// Allocate empty space until we can insert at the given index. // Allocate empty space until we can insert at the given index.
h.series = append(h.series, s) h.series = append(h.series, s)
@ -636,7 +624,7 @@ func (s *memSeries) append(t int64, v float64) bool {
var c *memChunk var c *memChunk
if s.app == nil || s.head().samples > 130 { if s.head().samples > 130 {
c = s.cut() c = s.cut()
c.minTime = t c.minTime = t
} else { } else {

View file

@ -33,7 +33,7 @@ func (p *memPostings) get(t term) Postings {
if l == nil { if l == nil {
return emptyPostings return emptyPostings
} }
return &listPostings{list: l, idx: -1} return newListPostings(l)
} }
// add adds a document to the index. The caller has to ensure that no // add adds a document to the index. The caller has to ensure that no
@ -70,18 +70,13 @@ func (e errPostings) Seek(uint32) bool { return false }
func (e errPostings) At() uint32 { return 0 } func (e errPostings) At() uint32 { return 0 }
func (e errPostings) Err() error { return e.err } func (e errPostings) Err() error { return e.err }
func expandPostings(p Postings) (res []uint32, err error) { var emptyPostings = errPostings{}
for p.Next() {
res = append(res, p.At())
}
return res, p.Err()
}
// Intersect returns a new postings list over the intersection of the // Intersect returns a new postings list over the intersection of the
// input postings. // input postings.
func Intersect(its ...Postings) Postings { func Intersect(its ...Postings) Postings {
if len(its) == 0 { if len(its) == 0 {
return errPostings{err: nil} return emptyPostings
} }
a := its[0] a := its[0]
@ -91,8 +86,6 @@ func Intersect(its ...Postings) Postings {
return a return a
} }
var emptyPostings = errPostings{}
type intersectPostings struct { type intersectPostings struct {
a, b Postings a, b Postings
aok, bok bool aok, bok bool
@ -100,41 +93,44 @@ type intersectPostings struct {
} }
func newIntersectPostings(a, b Postings) *intersectPostings { func newIntersectPostings(a, b Postings) *intersectPostings {
it := &intersectPostings{a: a, b: b} return &intersectPostings{a: a, b: b}
it.aok = it.a.Next()
it.bok = it.b.Next()
return it
} }
func (it *intersectPostings) At() uint32 { func (it *intersectPostings) At() uint32 {
return it.cur return it.cur
} }
func (it *intersectPostings) Next() bool { func (it *intersectPostings) doNext(id uint32) bool {
for { for {
if !it.aok || !it.bok { if !it.b.Seek(id) {
return false return false
} }
av, bv := it.a.At(), it.b.At() if vb := it.b.At(); vb != id {
if !it.a.Seek(vb) {
if av < bv { return false
it.aok = it.a.Seek(bv) }
} else if bv < av { id = it.a.At()
it.bok = it.b.Seek(av) if vb != id {
} else { continue
it.cur = av }
it.aok = it.a.Next()
it.bok = it.b.Next()
return true
} }
it.cur = id
return true
} }
} }
func (it *intersectPostings) Next() bool {
if !it.a.Next() {
return false
}
return it.doNext(it.a.At())
}
func (it *intersectPostings) Seek(id uint32) bool { func (it *intersectPostings) Seek(id uint32) bool {
it.aok = it.a.Seek(id) if !it.a.Seek(id) {
it.bok = it.b.Seek(id) return false
return it.Next() }
return it.doNext(it.a.At())
} }
func (it *intersectPostings) Err() error { func (it *intersectPostings) Err() error {
@ -158,17 +154,14 @@ func Merge(its ...Postings) Postings {
} }
type mergedPostings struct { type mergedPostings struct {
a, b Postings a, b Postings
aok, bok bool initialized bool
cur uint32 aok, bok bool
cur uint32
} }
func newMergedPostings(a, b Postings) *mergedPostings { func newMergedPostings(a, b Postings) *mergedPostings {
it := &mergedPostings{a: a, b: b} return &mergedPostings{a: a, b: b}
it.aok = it.a.Next()
it.bok = it.b.Next()
return it
} }
func (it *mergedPostings) At() uint32 { func (it *mergedPostings) At() uint32 {
@ -176,6 +169,12 @@ func (it *mergedPostings) At() uint32 {
} }
func (it *mergedPostings) Next() bool { func (it *mergedPostings) Next() bool {
if !it.initialized {
it.aok = it.a.Next()
it.bok = it.b.Next()
it.initialized = true
}
if !it.aok && !it.bok { if !it.aok && !it.bok {
return false return false
} }
@ -196,23 +195,25 @@ func (it *mergedPostings) Next() bool {
if acur < bcur { if acur < bcur {
it.cur = acur it.cur = acur
it.aok = it.a.Next() it.aok = it.a.Next()
return true } else if acur > bcur {
}
if bcur < acur {
it.cur = bcur it.cur = bcur
it.bok = it.b.Next() it.bok = it.b.Next()
return true } else {
it.cur = acur
it.aok = it.a.Next()
it.bok = it.b.Next()
} }
it.cur = acur
it.aok = it.a.Next()
it.bok = it.b.Next()
return true return true
} }
func (it *mergedPostings) Seek(id uint32) bool { func (it *mergedPostings) Seek(id uint32) bool {
if it.cur >= id {
return true
}
it.aok = it.a.Seek(id) it.aok = it.a.Seek(id)
it.bok = it.b.Seek(id) it.bok = it.b.Seek(id)
it.initialized = true
return it.Next() return it.Next()
} }
@ -227,28 +228,44 @@ func (it *mergedPostings) Err() error {
// listPostings implements the Postings interface over a plain list. // listPostings implements the Postings interface over a plain list.
type listPostings struct { type listPostings struct {
list []uint32 list []uint32
idx int cur uint32
} }
func newListPostings(list []uint32) *listPostings { func newListPostings(list []uint32) *listPostings {
return &listPostings{list: list, idx: -1} return &listPostings{list: list}
} }
func (it *listPostings) At() uint32 { func (it *listPostings) At() uint32 {
return it.list[it.idx] return it.cur
} }
func (it *listPostings) Next() bool { func (it *listPostings) Next() bool {
it.idx++ if len(it.list) > 0 {
return it.idx < len(it.list) it.cur = it.list[0]
it.list = it.list[1:]
return true
}
it.cur = 0
return false
} }
func (it *listPostings) Seek(x uint32) bool { func (it *listPostings) Seek(x uint32) bool {
// If the current value satisfies, then return.
if it.cur >= x {
return true
}
// Do binary search between current position and end. // Do binary search between current position and end.
it.idx += sort.Search(len(it.list)-it.idx, func(i int) bool { i := sort.Search(len(it.list), func(i int) bool {
return it.list[i+it.idx] >= x return it.list[i] >= x
}) })
return it.idx < len(it.list) if i < len(it.list) {
it.cur = it.list[i]
it.list = it.list[i+1:]
return true
}
it.list = nil
return false
} }
func (it *listPostings) Err() error { func (it *listPostings) Err() error {
@ -259,32 +276,44 @@ func (it *listPostings) Err() error {
// big endian numbers. // big endian numbers.
type bigEndianPostings struct { type bigEndianPostings struct {
list []byte list []byte
idx int cur uint32
} }
func newBigEndianPostings(list []byte) *bigEndianPostings { func newBigEndianPostings(list []byte) *bigEndianPostings {
return &bigEndianPostings{list: list, idx: -1} return &bigEndianPostings{list: list}
} }
func (it *bigEndianPostings) At() uint32 { func (it *bigEndianPostings) At() uint32 {
idx := 4 * it.idx return it.cur
return binary.BigEndian.Uint32(it.list[idx : idx+4])
} }
func (it *bigEndianPostings) Next() bool { func (it *bigEndianPostings) Next() bool {
it.idx++ if len(it.list) >= 4 {
return it.idx*4 < len(it.list) it.cur = binary.BigEndian.Uint32(it.list)
it.list = it.list[4:]
return true
}
return false
} }
func (it *bigEndianPostings) Seek(x uint32) bool { func (it *bigEndianPostings) Seek(x uint32) bool {
if it.cur >= x {
return true
}
num := len(it.list) / 4 num := len(it.list) / 4
// Do binary search between current position and end. // Do binary search between current position and end.
it.idx += sort.Search(num-it.idx, func(i int) bool { i := sort.Search(num, func(i int) bool {
idx := 4 * (it.idx + i) return binary.BigEndian.Uint32(it.list[i*4:]) >= x
val := binary.BigEndian.Uint32(it.list[idx : idx+4])
return val >= x
}) })
return it.idx*4 < len(it.list) if i < num {
j := i * 4
it.cur = binary.BigEndian.Uint32(it.list[j:])
it.list = it.list[j+4:]
return true
}
it.list = nil
return false
} }
func (it *bigEndianPostings) Err() error { func (it *bigEndianPostings) Err() error {

View file

@ -135,21 +135,9 @@ type blockQuerier struct {
} }
func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
var ( pr := newPostingsReader(q.index)
its []Postings
absent []string
)
for _, m := range ms {
// If the matcher checks absence of a label, don't select them
// but propagate the check into the series set.
if _, ok := m.(*labels.EqualMatcher); ok && m.Matches("") {
absent = append(absent, m.Name())
continue
}
its = append(its, q.selectSingle(m))
}
p := Intersect(its...) p, absent := pr.Select(ms...)
if q.postingsMapper != nil { if q.postingsMapper != nil {
p = q.postingsMapper(p) p = q.postingsMapper(p)
@ -172,50 +160,6 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
} }
} }
func (q *blockQuerier) selectSingle(m labels.Matcher) Postings {
// Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok {
it, err := q.index.Postings(em.Name(), em.Value())
if err != nil {
return errPostings{err: err}
}
return it
}
tpls, err := q.index.LabelValues(m.Name())
if err != nil {
return errPostings{err: err}
}
// TODO(fabxc): use interface upgrading to provide fast solution
// for equality and prefix matches. Tuples are lexicographically sorted.
var res []string
for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i)
if err != nil {
return errPostings{err: err}
}
if m.Matches(vals[0]) {
res = append(res, vals[0])
}
}
if len(res) == 0 {
return emptyPostings
}
var rit []Postings
for _, v := range res {
it, err := q.index.Postings(m.Name(), v)
if err != nil {
return errPostings{err: err}
}
rit = append(rit, it)
}
return Merge(rit...)
}
func (q *blockQuerier) LabelValues(name string) ([]string, error) { func (q *blockQuerier) LabelValues(name string) ([]string, error) {
tpls, err := q.index.LabelValues(name) tpls, err := q.index.LabelValues(name)
if err != nil { if err != nil {
@ -241,6 +185,81 @@ func (q *blockQuerier) Close() error {
return nil return nil
} }
// postingsReader is used to select matching postings from an IndexReader.
type postingsReader struct {
index IndexReader
}
func newPostingsReader(i IndexReader) *postingsReader {
return &postingsReader{index: i}
}
func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
var (
its []Postings
absent []string
)
for _, m := range ms {
// If the matcher checks absence of a label, don't select them
// but propagate the check into the series set.
if _, ok := m.(*labels.EqualMatcher); ok && m.Matches("") {
absent = append(absent, m.Name())
continue
}
its = append(its, r.selectSingle(m))
}
p := Intersect(its...)
return p, absent
}
func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
// Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok {
it, err := r.index.Postings(em.Name(), em.Value())
if err != nil {
return errPostings{err: err}
}
return it
}
// TODO(fabxc): use interface upgrading to provide fast solution
// for prefix matches. Tuples are lexicographically sorted.
tpls, err := r.index.LabelValues(m.Name())
if err != nil {
return errPostings{err: err}
}
var res []string
for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i)
if err != nil {
return errPostings{err: err}
}
if m.Matches(vals[0]) {
res = append(res, vals[0])
}
}
if len(res) == 0 {
return emptyPostings
}
var rit []Postings
for _, v := range res {
it, err := r.index.Postings(m.Name(), v)
if err != nil {
return errPostings{err: err}
}
rit = append(rit, it)
}
return Merge(rit...)
}
func mergeStrings(a, b []string) []string { func mergeStrings(a, b []string) []string {
maxl := len(a) maxl := len(a)
if len(b) > len(a) { if len(b) > len(a) {

View file

@ -49,9 +49,8 @@ const (
WALEntrySamples WALEntryType = 3 WALEntrySamples WALEntryType = 3
) )
// WAL is a write ahead log for series data. It can only be written to. // SegmentWAL is a write ahead log for series data.
// Use WALReader to read back from a write ahead log. type SegmentWAL struct {
type WAL struct {
mtx sync.Mutex mtx sync.Mutex
dirFile *os.File dirFile *os.File
@ -69,6 +68,28 @@ type WAL struct {
donec chan struct{} donec chan struct{}
} }
// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
type WAL interface {
Reader() WALReader
Log([]labels.Labels, []RefSample) error
Close() error
}
// WALReader reads entries from a WAL.
type WALReader interface {
At() ([]labels.Labels, []RefSample)
Next() bool
Err() error
}
// RefSample is a timestamp/value pair associated with a reference to a series.
type RefSample struct {
Ref uint64
T int64
V float64
}
const ( const (
walDirName = "wal" walDirName = "wal"
walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB
@ -83,9 +104,9 @@ func init() {
castagnoliTable = crc32.MakeTable(crc32.Castagnoli) castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
} }
// OpenWAL opens or creates a write ahead log in the given directory. // OpenSegmentWAL opens or creates a write ahead log in the given directory.
// The WAL must be read completely before new data is written. // The WAL must be read completely before new data is written.
func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) { func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) {
dir = filepath.Join(dir, walDirName) dir = filepath.Join(dir, walDirName)
if err := os.MkdirAll(dir, 0777); err != nil { if err := os.MkdirAll(dir, 0777); err != nil {
@ -99,7 +120,7 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL,
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
w := &WAL{ w := &SegmentWAL{
dirFile: df, dirFile: df,
logger: logger, logger: logger,
flushInterval: flushInterval, flushInterval: flushInterval,
@ -119,12 +140,12 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL,
// Reader returns a new reader over the the write ahead log data. // Reader returns a new reader over the the write ahead log data.
// It must be completely consumed before writing to the WAL. // It must be completely consumed before writing to the WAL.
func (w *WAL) Reader() *WALReader { func (w *SegmentWAL) Reader() WALReader {
return NewWALReader(w.logger, w) return newWALReader(w, w.logger)
} }
// Log writes a batch of new series labels and samples to the log. // Log writes a batch of new series labels and samples to the log.
func (w *WAL) Log(series []labels.Labels, samples []refdSample) error { func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error {
if err := w.encodeSeries(series); err != nil { if err := w.encodeSeries(series); err != nil {
return err return err
} }
@ -139,7 +160,7 @@ func (w *WAL) Log(series []labels.Labels, samples []refdSample) error {
// initSegments finds all existing segment files and opens them in the // initSegments finds all existing segment files and opens them in the
// appropriate file modes. // appropriate file modes.
func (w *WAL) initSegments() error { func (w *SegmentWAL) initSegments() error {
fns, err := sequenceFiles(w.dirFile.Name(), "") fns, err := sequenceFiles(w.dirFile.Name(), "")
if err != nil { if err != nil {
return err return err
@ -180,7 +201,7 @@ func (w *WAL) initSegments() error {
// cut finishes the currently active segments and opens the next one. // cut finishes the currently active segments and opens the next one.
// The encoder is reset to point to the new segment. // The encoder is reset to point to the new segment.
func (w *WAL) cut() error { func (w *SegmentWAL) cut() error {
// Sync current tail to disk and close. // Sync current tail to disk and close.
if tf := w.tail(); tf != nil { if tf := w.tail(); tf != nil {
if err := w.sync(); err != nil { if err := w.sync(); err != nil {
@ -229,7 +250,7 @@ func (w *WAL) cut() error {
return nil return nil
} }
func (w *WAL) tail() *os.File { func (w *SegmentWAL) tail() *os.File {
if len(w.files) == 0 { if len(w.files) == 0 {
return nil return nil
} }
@ -237,14 +258,14 @@ func (w *WAL) tail() *os.File {
} }
// Sync flushes the changes to disk. // Sync flushes the changes to disk.
func (w *WAL) Sync() error { func (w *SegmentWAL) Sync() error {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
return w.sync() return w.sync()
} }
func (w *WAL) sync() error { func (w *SegmentWAL) sync() error {
if w.cur == nil { if w.cur == nil {
return nil return nil
} }
@ -254,7 +275,7 @@ func (w *WAL) sync() error {
return fileutil.Fdatasync(w.tail()) return fileutil.Fdatasync(w.tail())
} }
func (w *WAL) run(interval time.Duration) { func (w *SegmentWAL) run(interval time.Duration) {
var tick <-chan time.Time var tick <-chan time.Time
if interval > 0 { if interval > 0 {
@ -277,7 +298,7 @@ func (w *WAL) run(interval time.Duration) {
} }
// Close syncs all data and closes the underlying resources. // Close syncs all data and closes the underlying resources.
func (w *WAL) Close() error { func (w *SegmentWAL) Close() error {
close(w.stopc) close(w.stopc)
<-w.donec <-w.donec
@ -305,7 +326,7 @@ const (
walPageBytes = 16 * minSectorSize walPageBytes = 16 * minSectorSize
) )
func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
@ -369,7 +390,7 @@ func putWALBuffer(b []byte) {
walBuffers.Put(b) walBuffers.Put(b)
} }
func (w *WAL) encodeSeries(series []labels.Labels) error { func (w *SegmentWAL) encodeSeries(series []labels.Labels) error {
if len(series) == 0 { if len(series) == 0 {
return nil return nil
} }
@ -395,7 +416,7 @@ func (w *WAL) encodeSeries(series []labels.Labels) error {
return w.entry(WALEntrySeries, walSeriesSimple, buf) return w.entry(WALEntrySeries, walSeriesSimple, buf)
} }
func (w *WAL) encodeSamples(samples []refdSample) error { func (w *SegmentWAL) encodeSamples(samples []RefSample) error {
if len(samples) == 0 { if len(samples) == 0 {
return nil return nil
} }
@ -409,67 +430,65 @@ func (w *WAL) encodeSamples(samples []refdSample) error {
// TODO(fabxc): optimize for all samples having the same timestamp. // TODO(fabxc): optimize for all samples having the same timestamp.
first := samples[0] first := samples[0]
binary.BigEndian.PutUint64(b, first.ref) binary.BigEndian.PutUint64(b, first.Ref)
buf = append(buf, b[:8]...) buf = append(buf, b[:8]...)
binary.BigEndian.PutUint64(b, uint64(first.t)) binary.BigEndian.PutUint64(b, uint64(first.T))
buf = append(buf, b[:8]...) buf = append(buf, b[:8]...)
for _, s := range samples { for _, s := range samples {
n := binary.PutVarint(b, int64(s.ref)-int64(first.ref)) n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref))
buf = append(buf, b[:n]...) buf = append(buf, b[:n]...)
n = binary.PutVarint(b, s.t-first.t) n = binary.PutVarint(b, s.T-first.T)
buf = append(buf, b[:n]...) buf = append(buf, b[:n]...)
binary.BigEndian.PutUint64(b, math.Float64bits(s.v)) binary.BigEndian.PutUint64(b, math.Float64bits(s.V))
buf = append(buf, b[:8]...) buf = append(buf, b[:8]...)
} }
return w.entry(WALEntrySamples, walSamplesSimple, buf) return w.entry(WALEntrySamples, walSamplesSimple, buf)
} }
// WALReader decodes and emits write ahead log entries. // walReader decodes and emits write ahead log entries.
type WALReader struct { type walReader struct {
logger log.Logger logger log.Logger
wal *WAL wal *SegmentWAL
cur int cur int
buf []byte buf []byte
crc32 hash.Hash32 crc32 hash.Hash32
err error err error
labels []labels.Labels labels []labels.Labels
samples []refdSample samples []RefSample
} }
// NewWALReader returns a new WALReader over the sequence of the given ReadClosers. func newWALReader(w *SegmentWAL, l log.Logger) *walReader {
func NewWALReader(logger log.Logger, w *WAL) *WALReader { if l == nil {
if logger == nil { l = log.NewNopLogger()
logger = log.NewNopLogger()
} }
r := &WALReader{ return &walReader{
logger: logger, logger: l,
wal: w, wal: w,
buf: make([]byte, 0, 128*4096), buf: make([]byte, 0, 128*4096),
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
} }
return r
} }
// At returns the last decoded entry of labels or samples. // At returns the last decoded entry of labels or samples.
// The returned slices are only valid until the next call to Next(). Their elements // The returned slices are only valid until the next call to Next(). Their elements
// have to be copied to preserve them. // have to be copied to preserve them.
func (r *WALReader) At() ([]labels.Labels, []refdSample) { func (r *walReader) At() ([]labels.Labels, []RefSample) {
return r.labels, r.samples return r.labels, r.samples
} }
// Err returns the last error the reader encountered. // Err returns the last error the reader encountered.
func (r *WALReader) Err() error { func (r *walReader) Err() error {
return r.err return r.err
} }
// nextEntry retrieves the next entry. It is also used as a testing hook. // nextEntry retrieves the next entry. It is also used as a testing hook.
func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
if r.cur >= len(r.wal.files) { if r.cur >= len(r.wal.files) {
return 0, 0, nil, io.EOF return 0, 0, nil, io.EOF
} }
@ -492,7 +511,7 @@ func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) {
// Next returns decodes the next entry pair and returns true // Next returns decodes the next entry pair and returns true
// if it was succesful. // if it was succesful.
func (r *WALReader) Next() bool { func (r *walReader) Next() bool {
r.labels = r.labels[:0] r.labels = r.labels[:0]
r.samples = r.samples[:0] r.samples = r.samples[:0]
@ -549,12 +568,12 @@ func (r *WALReader) Next() bool {
return r.err == nil return r.err == nil
} }
func (r *WALReader) current() *os.File { func (r *walReader) current() *os.File {
return r.wal.files[r.cur] return r.wal.files[r.cur]
} }
// truncate the WAL after the last valid entry. // truncate the WAL after the last valid entry.
func (r *WALReader) truncate(lastOffset int64) error { func (r *walReader) truncate(lastOffset int64) error {
r.logger.Log("msg", "WAL corruption detected; truncating", r.logger.Log("msg", "WAL corruption detected; truncating",
"err", r.err, "file", r.current().Name(), "pos", lastOffset) "err", r.err, "file", r.current().Name(), "pos", lastOffset)
@ -582,7 +601,7 @@ func walCorruptionErrf(s string, args ...interface{}) error {
return walCorruptionErr(errors.Errorf(s, args...)) return walCorruptionErr(errors.Errorf(s, args...))
} }
func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
r.crc32.Reset() r.crc32.Reset()
tr := io.TeeReader(cr, r.crc32) tr := io.TeeReader(cr, r.crc32)
@ -629,7 +648,7 @@ func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
return etype, flag, buf, nil return etype, flag, buf, nil
} }
func (r *WALReader) decodeSeries(flag byte, b []byte) error { func (r *walReader) decodeSeries(flag byte, b []byte) error {
for len(b) > 0 { for len(b) > 0 {
l, n := binary.Uvarint(b) l, n := binary.Uvarint(b)
if n < 1 { if n < 1 {
@ -659,7 +678,7 @@ func (r *WALReader) decodeSeries(flag byte, b []byte) error {
return nil return nil
} }
func (r *WALReader) decodeSamples(flag byte, b []byte) error { func (r *walReader) decodeSamples(flag byte, b []byte) error {
if len(b) < 16 { if len(b) < 16 {
return errors.Wrap(errInvalidSize, "header length") return errors.Wrap(errInvalidSize, "header length")
} }
@ -670,7 +689,7 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error {
b = b[16:] b = b[16:]
for len(b) > 0 { for len(b) > 0 {
var smpl refdSample var smpl RefSample
dref, n := binary.Varint(b) dref, n := binary.Varint(b)
if n < 1 { if n < 1 {
@ -678,19 +697,19 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error {
} }
b = b[n:] b = b[n:]
smpl.ref = uint64(int64(baseRef) + dref) smpl.Ref = uint64(int64(baseRef) + dref)
dtime, n := binary.Varint(b) dtime, n := binary.Varint(b)
if n < 1 { if n < 1 {
return errors.Wrap(errInvalidSize, "sample timestamp delta") return errors.Wrap(errInvalidSize, "sample timestamp delta")
} }
b = b[n:] b = b[n:]
smpl.t = baseTime + dtime smpl.T = baseTime + dtime
if len(b) < 8 { if len(b) < 8 {
return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b)) return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b))
} }
smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b)))
b = b[8:] b = b[8:]
r.samples = append(r.samples, smpl) r.samples = append(r.samples, smpl)

14
vendor/vendor.json vendored
View file

@ -661,22 +661,22 @@
"revisionTime": "2016-04-11T19:08:41Z" "revisionTime": "2016-04-11T19:08:41Z"
}, },
{ {
"checksumSHA1": "0wu/AzUWMurN/T5VBKCrvhf7c7E=", "checksumSHA1": "T+9Tl4utHkpYSdVFRpdfLloShTM=",
"path": "github.com/prometheus/tsdb", "path": "github.com/prometheus/tsdb",
"revision": "44769c1654f699931b2d3a2928326ac2d02d9149", "revision": "c8438cfc8113a39f75e398bf00c481d3cb1069f6",
"revisionTime": "2017-05-09T10:52:47Z" "revisionTime": "2017-05-14T09:51:56Z"
}, },
{ {
"checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=", "checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=",
"path": "github.com/prometheus/tsdb/chunks", "path": "github.com/prometheus/tsdb/chunks",
"revision": "44769c1654f699931b2d3a2928326ac2d02d9149", "revision": "c8438cfc8113a39f75e398bf00c481d3cb1069f6",
"revisionTime": "2017-05-09T10:52:47Z" "revisionTime": "2017-05-14T09:51:56Z"
}, },
{ {
"checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=", "checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=",
"path": "github.com/prometheus/tsdb/labels", "path": "github.com/prometheus/tsdb/labels",
"revision": "44769c1654f699931b2d3a2928326ac2d02d9149", "revision": "c8438cfc8113a39f75e398bf00c481d3cb1069f6",
"revisionTime": "2017-05-09T10:52:47Z" "revisionTime": "2017-05-14T09:51:56Z"
}, },
{ {
"checksumSHA1": "+49Vr4Me28p3cR+gxX5SUQHbbas=", "checksumSHA1": "+49Vr4Me28p3cR+gxX5SUQHbbas=",