diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index e1ccfd40d..72ebb1f8a 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -48,8 +48,8 @@ type Block interface { Queryable } -// HeadBlock is a regular block that can still be appended to. -type HeadBlock interface { +// headBlock is a regular block that can still be appended to. +type headBlock interface { Block Appendable } diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index 864b9cfbd..c4e97abcc 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -118,7 +118,7 @@ type DB struct { // or the general layout. // Must never be held when acquiring a blocks's mutex! headmtx sync.RWMutex - heads []HeadBlock + heads []headBlock compactor Compactor @@ -401,7 +401,7 @@ func (db *DB) reloadBlocks() error { var ( metas []*BlockMeta blocks []Block - heads []HeadBlock + heads []headBlock seqBlocks = make(map[int]Block, len(dirs)) ) @@ -418,7 +418,7 @@ func (db *DB) reloadBlocks() error { if meta.Compaction.Generation == 0 { if !ok { - b, err = openHeadBlock(dirs[i], db.logger) + b, err = db.openHeadBlock(dirs[i]) if err != nil { return errors.Wrapf(err, "load head at %s", dirs[i]) } @@ -426,7 +426,7 @@ func (db *DB) reloadBlocks() error { if meta.ULID != b.Meta().ULID { return errors.Errorf("head block ULID changed unexpectedly") } - heads = append(heads, b.(HeadBlock)) + heads = append(heads, b.(headBlock)) } else { if !ok || meta.ULID != b.Meta().ULID { b, err = newPersistedBlock(dirs[i]) @@ -559,7 +559,7 @@ func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) { if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { a.db.headmtx.Lock() - var newHeads []HeadBlock + var newHeads []headBlock if err := a.db.ensureHead(t); err != nil { a.db.headmtx.Unlock() @@ -670,9 +670,9 @@ func (a *dbAppender) Rollback() error { } // appendable returns a copy of a slice of HeadBlocks that can still be appended to. -func (db *DB) appendable() []HeadBlock { +func (db *DB) appendable() []headBlock { var i int - app := make([]HeadBlock, 0, db.opts.AppendableBlocks) + app := make([]headBlock, 0, db.opts.AppendableBlocks) if len(db.heads) > db.opts.AppendableBlocks { i = len(db.heads) - db.opts.AppendableBlocks @@ -709,16 +709,37 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { return bs } +// openHeadBlock opens the head block at dir. +func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { + var ( + wdir = filepath.Join(dir, "wal") + l = log.With(db.logger, "wal", wdir) + ) + wal, err := OpenSegmentWAL(wdir, l, 5*time.Second) + if err != nil { + return nil, errors.Wrap(err, "open WAL %s") + } + + h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal) + if err != nil { + return nil, errors.Wrapf(err, "open head block %s", dir) + } + return h, nil +} + // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (db *DB) cut(mint int64) (HeadBlock, error) { +func (db *DB) cut(mint int64) (headBlock, error) { maxt := mint + int64(db.opts.MinBlockDuration) dir, seq, err := nextSequenceFile(db.dir, "b-") if err != nil { return nil, err } - newHead, err := createHeadBlock(dir, seq, db.logger, mint, maxt) + if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil { + return nil, errors.Wrapf(err, "touch head block %s", dir) + } + newHead, err := db.openHeadBlock(dir) if err != nil { return nil, err } diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 678654b3e..b71bbafc0 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -47,11 +47,11 @@ var ( ErrOutOfBounds = errors.New("out of bounds") ) -// headBlock handles reads and writes of time series data within a time window. -type headBlock struct { +// HeadBlock handles reads and writes of time series data within a time window. +type HeadBlock struct { mtx sync.RWMutex dir string - wal *WAL + wal WAL activeWriters uint64 closed bool @@ -69,19 +69,21 @@ type headBlock struct { meta BlockMeta } -func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { +// TouchHeadBlock atomically touches a new head block in dir for +// samples in the range [mint,maxt). +func TouchHeadBlock(dir string, seq int, mint, maxt int64) error { // Make head block creation appear atomic. tmp := dir + ".tmp" if err := os.MkdirAll(tmp, 0777); err != nil { - return nil, err + return err } entropy := rand.New(rand.NewSource(time.Now().UnixNano())) ulid, err := ulid.New(ulid.Now(), entropy) if err != nil { - return nil, err + return err } if err := writeMetaFile(tmp, &BlockMeta{ @@ -90,38 +92,33 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head MinTime: mint, MaxTime: maxt, }); err != nil { - return nil, err + return err } - if err := renameFile(tmp, dir); err != nil { - return nil, err - } - return openHeadBlock(dir, l) + return renameFile(tmp, dir) } -// openHeadBlock creates a new empty head block. -func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { - wal, err := OpenWAL(dir, log.With(l, "component", "wal"), 5*time.Second) - if err != nil { - return nil, err - } +// OpenHeadBlock opens the head block in dir. +func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { meta, err := readMetaFile(dir) if err != nil { return nil, err } - h := &headBlock{ + h := &HeadBlock{ dir: dir, wal: wal, - series: []*memSeries{}, + series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, postings: &memPostings{m: make(map[term][]uint32)}, meta: *meta, } + return h, h.init() +} - r := wal.Reader() +func (h *HeadBlock) init() error { + r := h.wal.Reader() -Outer: for r.Next() { series, samples := r.At() @@ -130,37 +127,32 @@ Outer: h.meta.Stats.NumSeries++ } for _, s := range samples { - if int(s.ref) >= len(h.series) { - l.Log("msg", "unknown series reference, abort WAL restore", "got", s.ref, "max", len(h.series)-1) - break Outer + if int(s.Ref) >= len(h.series) { + return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series)) } - h.series[s.ref].append(s.t, s.v) + h.series[s.Ref].append(s.T, s.V) - if !h.inBounds(s.t) { - return nil, errors.Wrap(ErrOutOfBounds, "consume WAL") + if !h.inBounds(s.T) { + return errors.Wrap(ErrOutOfBounds, "consume WAL") } h.meta.Stats.NumSamples++ } } - if err := r.Err(); err != nil { - return nil, errors.Wrap(err, "consume WAL") - } - - return h, nil + return errors.Wrap(r.Err(), "consume WAL") } // inBounds returns true if the given timestamp is within the valid // time bounds of the block. -func (h *headBlock) inBounds(t int64) bool { +func (h *HeadBlock) inBounds(t int64) bool { return t >= h.meta.MinTime && t <= h.meta.MaxTime } -func (h *headBlock) String() string { +func (h *HeadBlock) String() string { return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID) } // Close syncs all data and closes underlying resources of the head block. -func (h *headBlock) Close() error { +func (h *HeadBlock) Close() error { h.mtx.Lock() defer h.mtx.Unlock() @@ -184,7 +176,7 @@ func (h *headBlock) Close() error { return nil } -func (h *headBlock) Meta() BlockMeta { +func (h *HeadBlock) Meta() BlockMeta { m := BlockMeta{ ULID: h.meta.ULID, Sequence: h.meta.Sequence, @@ -200,12 +192,12 @@ func (h *headBlock) Meta() BlockMeta { return m } -func (h *headBlock) Dir() string { return h.dir } -func (h *headBlock) Persisted() bool { return false } -func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } -func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } +func (h *HeadBlock) Dir() string { return h.dir } +func (h *HeadBlock) Persisted() bool { return false } +func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } +func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } -func (h *headBlock) Querier(mint, maxt int64) Querier { +func (h *HeadBlock) Querier(mint, maxt int64) Querier { h.mtx.RLock() defer h.mtx.RUnlock() @@ -244,7 +236,7 @@ func (h *headBlock) Querier(mint, maxt int64) Querier { } } -func (h *headBlock) Appender() Appender { +func (h *HeadBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) h.mtx.RLock() @@ -252,36 +244,36 @@ func (h *headBlock) Appender() Appender { if h.closed { panic(fmt.Sprintf("block %s already closed", h.dir)) } - return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} + return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()} } -func (h *headBlock) Busy() bool { +func (h *HeadBlock) Busy() bool { return atomic.LoadUint64(&h.activeWriters) > 0 } var headPool = sync.Pool{} -func getHeadAppendBuffer() []refdSample { +func getHeadAppendBuffer() []RefSample { b := headPool.Get() if b == nil { - return make([]refdSample, 0, 512) + return make([]RefSample, 0, 512) } - return b.([]refdSample) + return b.([]RefSample) } -func putHeadAppendBuffer(b []refdSample) { +func putHeadAppendBuffer(b []RefSample) { headPool.Put(b[:0]) } type headAppender struct { - *headBlock + *HeadBlock newSeries map[uint64]hashedLabels newHashes map[uint64]uint64 refmap map[uint64]uint64 newLabels []labels.Labels - samples []refdSample + samples []RefSample } type hashedLabels struct { @@ -289,12 +281,6 @@ type hashedLabels struct { labels labels.Labels } -type refdSample struct { - ref uint64 - t int64 - v float64 -} - func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { if !a.inBounds(t) { return 0, ErrOutOfBounds @@ -369,10 +355,10 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { } } - a.samples = append(a.samples, refdSample{ - ref: ref, - t: t, - v: v, + a.samples = append(a.samples, RefSample{ + Ref: ref, + T: t, + V: v, }) return nil } @@ -418,8 +404,8 @@ func (a *headAppender) Commit() error { for i := range a.samples { s := &a.samples[i] - if s.ref&(1<<32) > 0 { - s.ref = a.refmap[s.ref] + if s.Ref&(1<<32) > 0 { + s.Ref = a.refmap[s.Ref] } } @@ -433,7 +419,7 @@ func (a *headAppender) Commit() error { total := uint64(len(a.samples)) for _, s := range a.samples { - if !a.series[s.ref].append(s.t, s.v) { + if !a.series[s.Ref].append(s.T, s.V) { total-- } } @@ -454,7 +440,7 @@ func (a *headAppender) Rollback() error { } type headChunkReader struct { - *headBlock + *HeadBlock } // Chunk returns the chunk for the reference number. @@ -490,7 +476,7 @@ func (c *safeChunk) Iterator() chunks.Iterator { // func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } type headIndexReader struct { - *headBlock + *HeadBlock } // LabelValues returns the possible label values @@ -558,7 +544,7 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) { // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. -func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { +func (h *HeadBlock) get(hash uint64, lset labels.Labels) *memSeries { series := h.hashes[hash] for _, s := range series { @@ -569,11 +555,13 @@ func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { return nil } -func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { +func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries { s := &memSeries{ lset: lset, ref: uint32(len(h.series)), } + // create the initial chunk and appender + s.cut() // Allocate empty space until we can insert at the given index. h.series = append(h.series, s) @@ -636,7 +624,7 @@ func (s *memSeries) append(t int64, v float64) bool { var c *memChunk - if s.app == nil || s.head().samples > 130 { + if s.head().samples > 130 { c = s.cut() c.minTime = t } else { diff --git a/vendor/github.com/prometheus/tsdb/postings.go b/vendor/github.com/prometheus/tsdb/postings.go index 180ac099d..f2f452ac1 100644 --- a/vendor/github.com/prometheus/tsdb/postings.go +++ b/vendor/github.com/prometheus/tsdb/postings.go @@ -33,7 +33,7 @@ func (p *memPostings) get(t term) Postings { if l == nil { return emptyPostings } - return &listPostings{list: l, idx: -1} + return newListPostings(l) } // add adds a document to the index. The caller has to ensure that no @@ -70,18 +70,13 @@ func (e errPostings) Seek(uint32) bool { return false } func (e errPostings) At() uint32 { return 0 } func (e errPostings) Err() error { return e.err } -func expandPostings(p Postings) (res []uint32, err error) { - for p.Next() { - res = append(res, p.At()) - } - return res, p.Err() -} +var emptyPostings = errPostings{} // Intersect returns a new postings list over the intersection of the // input postings. func Intersect(its ...Postings) Postings { if len(its) == 0 { - return errPostings{err: nil} + return emptyPostings } a := its[0] @@ -91,8 +86,6 @@ func Intersect(its ...Postings) Postings { return a } -var emptyPostings = errPostings{} - type intersectPostings struct { a, b Postings aok, bok bool @@ -100,41 +93,44 @@ type intersectPostings struct { } func newIntersectPostings(a, b Postings) *intersectPostings { - it := &intersectPostings{a: a, b: b} - it.aok = it.a.Next() - it.bok = it.b.Next() - - return it + return &intersectPostings{a: a, b: b} } func (it *intersectPostings) At() uint32 { return it.cur } -func (it *intersectPostings) Next() bool { +func (it *intersectPostings) doNext(id uint32) bool { for { - if !it.aok || !it.bok { + if !it.b.Seek(id) { return false } - av, bv := it.a.At(), it.b.At() - - if av < bv { - it.aok = it.a.Seek(bv) - } else if bv < av { - it.bok = it.b.Seek(av) - } else { - it.cur = av - it.aok = it.a.Next() - it.bok = it.b.Next() - return true + if vb := it.b.At(); vb != id { + if !it.a.Seek(vb) { + return false + } + id = it.a.At() + if vb != id { + continue + } } + it.cur = id + return true } } +func (it *intersectPostings) Next() bool { + if !it.a.Next() { + return false + } + return it.doNext(it.a.At()) +} + func (it *intersectPostings) Seek(id uint32) bool { - it.aok = it.a.Seek(id) - it.bok = it.b.Seek(id) - return it.Next() + if !it.a.Seek(id) { + return false + } + return it.doNext(it.a.At()) } func (it *intersectPostings) Err() error { @@ -158,17 +154,14 @@ func Merge(its ...Postings) Postings { } type mergedPostings struct { - a, b Postings - aok, bok bool - cur uint32 + a, b Postings + initialized bool + aok, bok bool + cur uint32 } func newMergedPostings(a, b Postings) *mergedPostings { - it := &mergedPostings{a: a, b: b} - it.aok = it.a.Next() - it.bok = it.b.Next() - - return it + return &mergedPostings{a: a, b: b} } func (it *mergedPostings) At() uint32 { @@ -176,6 +169,12 @@ func (it *mergedPostings) At() uint32 { } func (it *mergedPostings) Next() bool { + if !it.initialized { + it.aok = it.a.Next() + it.bok = it.b.Next() + it.initialized = true + } + if !it.aok && !it.bok { return false } @@ -196,23 +195,25 @@ func (it *mergedPostings) Next() bool { if acur < bcur { it.cur = acur it.aok = it.a.Next() - return true - } - if bcur < acur { + } else if acur > bcur { it.cur = bcur it.bok = it.b.Next() - return true + } else { + it.cur = acur + it.aok = it.a.Next() + it.bok = it.b.Next() } - it.cur = acur - it.aok = it.a.Next() - it.bok = it.b.Next() - return true } func (it *mergedPostings) Seek(id uint32) bool { + if it.cur >= id { + return true + } + it.aok = it.a.Seek(id) it.bok = it.b.Seek(id) + it.initialized = true return it.Next() } @@ -227,28 +228,44 @@ func (it *mergedPostings) Err() error { // listPostings implements the Postings interface over a plain list. type listPostings struct { list []uint32 - idx int + cur uint32 } func newListPostings(list []uint32) *listPostings { - return &listPostings{list: list, idx: -1} + return &listPostings{list: list} } func (it *listPostings) At() uint32 { - return it.list[it.idx] + return it.cur } func (it *listPostings) Next() bool { - it.idx++ - return it.idx < len(it.list) + if len(it.list) > 0 { + it.cur = it.list[0] + it.list = it.list[1:] + return true + } + it.cur = 0 + return false } func (it *listPostings) Seek(x uint32) bool { + // If the current value satisfies, then return. + if it.cur >= x { + return true + } + // Do binary search between current position and end. - it.idx += sort.Search(len(it.list)-it.idx, func(i int) bool { - return it.list[i+it.idx] >= x + i := sort.Search(len(it.list), func(i int) bool { + return it.list[i] >= x }) - return it.idx < len(it.list) + if i < len(it.list) { + it.cur = it.list[i] + it.list = it.list[i+1:] + return true + } + it.list = nil + return false } func (it *listPostings) Err() error { @@ -259,32 +276,44 @@ func (it *listPostings) Err() error { // big endian numbers. type bigEndianPostings struct { list []byte - idx int + cur uint32 } func newBigEndianPostings(list []byte) *bigEndianPostings { - return &bigEndianPostings{list: list, idx: -1} + return &bigEndianPostings{list: list} } func (it *bigEndianPostings) At() uint32 { - idx := 4 * it.idx - return binary.BigEndian.Uint32(it.list[idx : idx+4]) + return it.cur } func (it *bigEndianPostings) Next() bool { - it.idx++ - return it.idx*4 < len(it.list) + if len(it.list) >= 4 { + it.cur = binary.BigEndian.Uint32(it.list) + it.list = it.list[4:] + return true + } + return false } func (it *bigEndianPostings) Seek(x uint32) bool { + if it.cur >= x { + return true + } + num := len(it.list) / 4 // Do binary search between current position and end. - it.idx += sort.Search(num-it.idx, func(i int) bool { - idx := 4 * (it.idx + i) - val := binary.BigEndian.Uint32(it.list[idx : idx+4]) - return val >= x + i := sort.Search(num, func(i int) bool { + return binary.BigEndian.Uint32(it.list[i*4:]) >= x }) - return it.idx*4 < len(it.list) + if i < num { + j := i * 4 + it.cur = binary.BigEndian.Uint32(it.list[j:]) + it.list = it.list[j+4:] + return true + } + it.list = nil + return false } func (it *bigEndianPostings) Err() error { diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index 97970e830..86dd76b99 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -135,21 +135,9 @@ type blockQuerier struct { } func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { - var ( - its []Postings - absent []string - ) - for _, m := range ms { - // If the matcher checks absence of a label, don't select them - // but propagate the check into the series set. - if _, ok := m.(*labels.EqualMatcher); ok && m.Matches("") { - absent = append(absent, m.Name()) - continue - } - its = append(its, q.selectSingle(m)) - } + pr := newPostingsReader(q.index) - p := Intersect(its...) + p, absent := pr.Select(ms...) if q.postingsMapper != nil { p = q.postingsMapper(p) @@ -172,50 +160,6 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { } } -func (q *blockQuerier) selectSingle(m labels.Matcher) Postings { - // Fast-path for equal matching. - if em, ok := m.(*labels.EqualMatcher); ok { - it, err := q.index.Postings(em.Name(), em.Value()) - if err != nil { - return errPostings{err: err} - } - return it - } - - tpls, err := q.index.LabelValues(m.Name()) - if err != nil { - return errPostings{err: err} - } - // TODO(fabxc): use interface upgrading to provide fast solution - // for equality and prefix matches. Tuples are lexicographically sorted. - var res []string - - for i := 0; i < tpls.Len(); i++ { - vals, err := tpls.At(i) - if err != nil { - return errPostings{err: err} - } - if m.Matches(vals[0]) { - res = append(res, vals[0]) - } - } - if len(res) == 0 { - return emptyPostings - } - - var rit []Postings - - for _, v := range res { - it, err := q.index.Postings(m.Name(), v) - if err != nil { - return errPostings{err: err} - } - rit = append(rit, it) - } - - return Merge(rit...) -} - func (q *blockQuerier) LabelValues(name string) ([]string, error) { tpls, err := q.index.LabelValues(name) if err != nil { @@ -241,6 +185,81 @@ func (q *blockQuerier) Close() error { return nil } +// postingsReader is used to select matching postings from an IndexReader. +type postingsReader struct { + index IndexReader +} + +func newPostingsReader(i IndexReader) *postingsReader { + return &postingsReader{index: i} +} + +func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) { + var ( + its []Postings + absent []string + ) + for _, m := range ms { + // If the matcher checks absence of a label, don't select them + // but propagate the check into the series set. + if _, ok := m.(*labels.EqualMatcher); ok && m.Matches("") { + absent = append(absent, m.Name()) + continue + } + its = append(its, r.selectSingle(m)) + } + + p := Intersect(its...) + + return p, absent +} + +func (r *postingsReader) selectSingle(m labels.Matcher) Postings { + // Fast-path for equal matching. + if em, ok := m.(*labels.EqualMatcher); ok { + it, err := r.index.Postings(em.Name(), em.Value()) + if err != nil { + return errPostings{err: err} + } + return it + } + + // TODO(fabxc): use interface upgrading to provide fast solution + // for prefix matches. Tuples are lexicographically sorted. + tpls, err := r.index.LabelValues(m.Name()) + if err != nil { + return errPostings{err: err} + } + + var res []string + + for i := 0; i < tpls.Len(); i++ { + vals, err := tpls.At(i) + if err != nil { + return errPostings{err: err} + } + if m.Matches(vals[0]) { + res = append(res, vals[0]) + } + } + + if len(res) == 0 { + return emptyPostings + } + + var rit []Postings + + for _, v := range res { + it, err := r.index.Postings(m.Name(), v) + if err != nil { + return errPostings{err: err} + } + rit = append(rit, it) + } + + return Merge(rit...) +} + func mergeStrings(a, b []string) []string { maxl := len(a) if len(b) > len(a) { diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index 853065f69..251944f0b 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -49,9 +49,8 @@ const ( WALEntrySamples WALEntryType = 3 ) -// WAL is a write ahead log for series data. It can only be written to. -// Use WALReader to read back from a write ahead log. -type WAL struct { +// SegmentWAL is a write ahead log for series data. +type SegmentWAL struct { mtx sync.Mutex dirFile *os.File @@ -69,6 +68,28 @@ type WAL struct { donec chan struct{} } +// WAL is a write ahead log that can log new series labels and samples. +// It must be completely read before new entries are logged. +type WAL interface { + Reader() WALReader + Log([]labels.Labels, []RefSample) error + Close() error +} + +// WALReader reads entries from a WAL. +type WALReader interface { + At() ([]labels.Labels, []RefSample) + Next() bool + Err() error +} + +// RefSample is a timestamp/value pair associated with a reference to a series. +type RefSample struct { + Ref uint64 + T int64 + V float64 +} + const ( walDirName = "wal" walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB @@ -83,9 +104,9 @@ func init() { castagnoliTable = crc32.MakeTable(crc32.Castagnoli) } -// OpenWAL opens or creates a write ahead log in the given directory. +// OpenSegmentWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. -func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) { +func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) { dir = filepath.Join(dir, walDirName) if err := os.MkdirAll(dir, 0777); err != nil { @@ -99,7 +120,7 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, logger = log.NewNopLogger() } - w := &WAL{ + w := &SegmentWAL{ dirFile: df, logger: logger, flushInterval: flushInterval, @@ -119,12 +140,12 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, // Reader returns a new reader over the the write ahead log data. // It must be completely consumed before writing to the WAL. -func (w *WAL) Reader() *WALReader { - return NewWALReader(w.logger, w) +func (w *SegmentWAL) Reader() WALReader { + return newWALReader(w, w.logger) } // Log writes a batch of new series labels and samples to the log. -func (w *WAL) Log(series []labels.Labels, samples []refdSample) error { +func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error { if err := w.encodeSeries(series); err != nil { return err } @@ -139,7 +160,7 @@ func (w *WAL) Log(series []labels.Labels, samples []refdSample) error { // initSegments finds all existing segment files and opens them in the // appropriate file modes. -func (w *WAL) initSegments() error { +func (w *SegmentWAL) initSegments() error { fns, err := sequenceFiles(w.dirFile.Name(), "") if err != nil { return err @@ -180,7 +201,7 @@ func (w *WAL) initSegments() error { // cut finishes the currently active segments and opens the next one. // The encoder is reset to point to the new segment. -func (w *WAL) cut() error { +func (w *SegmentWAL) cut() error { // Sync current tail to disk and close. if tf := w.tail(); tf != nil { if err := w.sync(); err != nil { @@ -229,7 +250,7 @@ func (w *WAL) cut() error { return nil } -func (w *WAL) tail() *os.File { +func (w *SegmentWAL) tail() *os.File { if len(w.files) == 0 { return nil } @@ -237,14 +258,14 @@ func (w *WAL) tail() *os.File { } // Sync flushes the changes to disk. -func (w *WAL) Sync() error { +func (w *SegmentWAL) Sync() error { w.mtx.Lock() defer w.mtx.Unlock() return w.sync() } -func (w *WAL) sync() error { +func (w *SegmentWAL) sync() error { if w.cur == nil { return nil } @@ -254,7 +275,7 @@ func (w *WAL) sync() error { return fileutil.Fdatasync(w.tail()) } -func (w *WAL) run(interval time.Duration) { +func (w *SegmentWAL) run(interval time.Duration) { var tick <-chan time.Time if interval > 0 { @@ -277,7 +298,7 @@ func (w *WAL) run(interval time.Duration) { } // Close syncs all data and closes the underlying resources. -func (w *WAL) Close() error { +func (w *SegmentWAL) Close() error { close(w.stopc) <-w.donec @@ -305,7 +326,7 @@ const ( walPageBytes = 16 * minSectorSize ) -func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { +func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error { w.mtx.Lock() defer w.mtx.Unlock() @@ -369,7 +390,7 @@ func putWALBuffer(b []byte) { walBuffers.Put(b) } -func (w *WAL) encodeSeries(series []labels.Labels) error { +func (w *SegmentWAL) encodeSeries(series []labels.Labels) error { if len(series) == 0 { return nil } @@ -395,7 +416,7 @@ func (w *WAL) encodeSeries(series []labels.Labels) error { return w.entry(WALEntrySeries, walSeriesSimple, buf) } -func (w *WAL) encodeSamples(samples []refdSample) error { +func (w *SegmentWAL) encodeSamples(samples []RefSample) error { if len(samples) == 0 { return nil } @@ -409,67 +430,65 @@ func (w *WAL) encodeSamples(samples []refdSample) error { // TODO(fabxc): optimize for all samples having the same timestamp. first := samples[0] - binary.BigEndian.PutUint64(b, first.ref) + binary.BigEndian.PutUint64(b, first.Ref) buf = append(buf, b[:8]...) - binary.BigEndian.PutUint64(b, uint64(first.t)) + binary.BigEndian.PutUint64(b, uint64(first.T)) buf = append(buf, b[:8]...) for _, s := range samples { - n := binary.PutVarint(b, int64(s.ref)-int64(first.ref)) + n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref)) buf = append(buf, b[:n]...) - n = binary.PutVarint(b, s.t-first.t) + n = binary.PutVarint(b, s.T-first.T) buf = append(buf, b[:n]...) - binary.BigEndian.PutUint64(b, math.Float64bits(s.v)) + binary.BigEndian.PutUint64(b, math.Float64bits(s.V)) buf = append(buf, b[:8]...) } return w.entry(WALEntrySamples, walSamplesSimple, buf) } -// WALReader decodes and emits write ahead log entries. -type WALReader struct { +// walReader decodes and emits write ahead log entries. +type walReader struct { logger log.Logger - wal *WAL + wal *SegmentWAL cur int buf []byte crc32 hash.Hash32 err error labels []labels.Labels - samples []refdSample + samples []RefSample } -// NewWALReader returns a new WALReader over the sequence of the given ReadClosers. -func NewWALReader(logger log.Logger, w *WAL) *WALReader { - if logger == nil { - logger = log.NewNopLogger() +func newWALReader(w *SegmentWAL, l log.Logger) *walReader { + if l == nil { + l = log.NewNopLogger() } - r := &WALReader{ - logger: logger, + return &walReader{ + logger: l, wal: w, buf: make([]byte, 0, 128*4096), crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } - return r } // At returns the last decoded entry of labels or samples. // The returned slices are only valid until the next call to Next(). Their elements // have to be copied to preserve them. -func (r *WALReader) At() ([]labels.Labels, []refdSample) { +func (r *walReader) At() ([]labels.Labels, []RefSample) { return r.labels, r.samples } // Err returns the last error the reader encountered. -func (r *WALReader) Err() error { +func (r *walReader) Err() error { return r.err } // nextEntry retrieves the next entry. It is also used as a testing hook. -func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { +func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { if r.cur >= len(r.wal.files) { return 0, 0, nil, io.EOF } @@ -492,7 +511,7 @@ func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { // Next returns decodes the next entry pair and returns true // if it was succesful. -func (r *WALReader) Next() bool { +func (r *walReader) Next() bool { r.labels = r.labels[:0] r.samples = r.samples[:0] @@ -549,12 +568,12 @@ func (r *WALReader) Next() bool { return r.err == nil } -func (r *WALReader) current() *os.File { +func (r *walReader) current() *os.File { return r.wal.files[r.cur] } // truncate the WAL after the last valid entry. -func (r *WALReader) truncate(lastOffset int64) error { +func (r *walReader) truncate(lastOffset int64) error { r.logger.Log("msg", "WAL corruption detected; truncating", "err", r.err, "file", r.current().Name(), "pos", lastOffset) @@ -582,7 +601,7 @@ func walCorruptionErrf(s string, args ...interface{}) error { return walCorruptionErr(errors.Errorf(s, args...)) } -func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { +func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { r.crc32.Reset() tr := io.TeeReader(cr, r.crc32) @@ -629,7 +648,7 @@ func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return etype, flag, buf, nil } -func (r *WALReader) decodeSeries(flag byte, b []byte) error { +func (r *walReader) decodeSeries(flag byte, b []byte) error { for len(b) > 0 { l, n := binary.Uvarint(b) if n < 1 { @@ -659,7 +678,7 @@ func (r *WALReader) decodeSeries(flag byte, b []byte) error { return nil } -func (r *WALReader) decodeSamples(flag byte, b []byte) error { +func (r *walReader) decodeSamples(flag byte, b []byte) error { if len(b) < 16 { return errors.Wrap(errInvalidSize, "header length") } @@ -670,7 +689,7 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error { b = b[16:] for len(b) > 0 { - var smpl refdSample + var smpl RefSample dref, n := binary.Varint(b) if n < 1 { @@ -678,19 +697,19 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error { } b = b[n:] - smpl.ref = uint64(int64(baseRef) + dref) + smpl.Ref = uint64(int64(baseRef) + dref) dtime, n := binary.Varint(b) if n < 1 { return errors.Wrap(errInvalidSize, "sample timestamp delta") } b = b[n:] - smpl.t = baseTime + dtime + smpl.T = baseTime + dtime if len(b) < 8 { return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b)) } - smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) + smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) b = b[8:] r.samples = append(r.samples, smpl) diff --git a/vendor/vendor.json b/vendor/vendor.json index 2f3479fb3..03a20e388 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -661,22 +661,22 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "0wu/AzUWMurN/T5VBKCrvhf7c7E=", + "checksumSHA1": "T+9Tl4utHkpYSdVFRpdfLloShTM=", "path": "github.com/prometheus/tsdb", - "revision": "44769c1654f699931b2d3a2928326ac2d02d9149", - "revisionTime": "2017-05-09T10:52:47Z" + "revision": "c8438cfc8113a39f75e398bf00c481d3cb1069f6", + "revisionTime": "2017-05-14T09:51:56Z" }, { "checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "44769c1654f699931b2d3a2928326ac2d02d9149", - "revisionTime": "2017-05-09T10:52:47Z" + "revision": "c8438cfc8113a39f75e398bf00c481d3cb1069f6", + "revisionTime": "2017-05-14T09:51:56Z" }, { "checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=", "path": "github.com/prometheus/tsdb/labels", - "revision": "44769c1654f699931b2d3a2928326ac2d02d9149", - "revisionTime": "2017-05-09T10:52:47Z" + "revision": "c8438cfc8113a39f75e398bf00c481d3cb1069f6", + "revisionTime": "2017-05-14T09:51:56Z" }, { "checksumSHA1": "+49Vr4Me28p3cR+gxX5SUQHbbas=",