mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 14:09:41 -08:00
vendor: update prometheus/tsdb
This commit is contained in:
parent
3845dfb715
commit
426125298e
8
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
8
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
|
@ -60,6 +60,11 @@ type Block interface {
|
||||||
type headBlock interface {
|
type headBlock interface {
|
||||||
Block
|
Block
|
||||||
Appendable
|
Appendable
|
||||||
|
|
||||||
|
// ActiveWriters returns the number of currently active appenders.
|
||||||
|
ActiveWriters() int
|
||||||
|
// HighTimestamp returns the highest currently inserted timestamp.
|
||||||
|
HighTimestamp() int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshottable defines an entity that can be backedup online.
|
// Snapshottable defines an entity that can be backedup online.
|
||||||
|
@ -71,9 +76,6 @@ type Snapshottable interface {
|
||||||
type Appendable interface {
|
type Appendable interface {
|
||||||
// Appender returns a new Appender against an underlying store.
|
// Appender returns a new Appender against an underlying store.
|
||||||
Appender() Appender
|
Appender() Appender
|
||||||
|
|
||||||
// Busy returns whether there are any currently active appenders.
|
|
||||||
Busy() bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Queryable defines an entity which provides a Querier.
|
// Queryable defines an entity which provides a Querier.
|
||||||
|
|
23
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
23
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
|
@ -80,6 +80,7 @@ type Appender interface {
|
||||||
// Returned reference numbers are ephemeral and may be rejected in calls
|
// Returned reference numbers are ephemeral and may be rejected in calls
|
||||||
// to AddFast() at any point. Adding the sample via Add() returns a new
|
// to AddFast() at any point. Adding the sample via Add() returns a new
|
||||||
// reference number.
|
// reference number.
|
||||||
|
// If the reference is the empty string it must not be used for caching.
|
||||||
Add(l labels.Labels, t int64, v float64) (string, error)
|
Add(l labels.Labels, t int64, v float64) (string, error)
|
||||||
|
|
||||||
// Add adds a sample pair for the referenced series. It is generally faster
|
// Add adds a sample pair for the referenced series. It is generally faster
|
||||||
|
@ -305,6 +306,15 @@ func (db *DB) retentionCutoff() (bool, error) {
|
||||||
return retentionCutoff(db.dir, mint)
|
return retentionCutoff(db.dir, mint)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// headFullness returns up to which fraction of a blocks time range samples
|
||||||
|
// were already inserted.
|
||||||
|
func headFullness(h headBlock) float64 {
|
||||||
|
m := h.Meta()
|
||||||
|
a := float64(h.HighTimestamp() - m.MinTime)
|
||||||
|
b := float64(m.MaxTime - m.MinTime)
|
||||||
|
return a / b
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) compact() (changes bool, err error) {
|
func (db *DB) compact() (changes bool, err error) {
|
||||||
db.cmtx.Lock()
|
db.cmtx.Lock()
|
||||||
defer db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
|
@ -319,12 +329,14 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
// returning the lock to not block Appenders.
|
// returning the lock to not block Appenders.
|
||||||
// Selected blocks are semantically ensured to not be written to afterwards
|
// Selected blocks are semantically ensured to not be written to afterwards
|
||||||
// by appendable().
|
// by appendable().
|
||||||
if len(db.heads) > 2 {
|
if len(db.heads) > 1 {
|
||||||
for _, h := range db.heads[:len(db.heads)-2] {
|
f := headFullness(db.heads[len(db.heads)-1])
|
||||||
|
|
||||||
|
for _, h := range db.heads[:len(db.heads)-1] {
|
||||||
// Blocks that won't be appendable when instantiating a new appender
|
// Blocks that won't be appendable when instantiating a new appender
|
||||||
// might still have active appenders on them.
|
// might still have active appenders on them.
|
||||||
// Abort at the first one we encounter.
|
// Abort at the first one we encounter.
|
||||||
if h.Busy() {
|
if h.ActiveWriters() > 0 || f < 0.5 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
singles = append(singles, h)
|
singles = append(singles, h)
|
||||||
|
@ -605,6 +617,9 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error)
|
||||||
}
|
}
|
||||||
a.samples++
|
a.samples++
|
||||||
|
|
||||||
|
if ref == "" {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
return string(append(h.meta.ULID[:], ref...)), nil
|
return string(append(h.meta.ULID[:], ref...)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -849,6 +864,8 @@ func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
db.logger.Log("msg", "created head block", "ulid", newHead.meta.ULID, "mint", mint, "maxt", maxt)
|
||||||
|
|
||||||
db.blocks = append(db.blocks, newHead) // TODO(fabxc): this is a race!
|
db.blocks = append(db.blocks, newHead) // TODO(fabxc): this is a race!
|
||||||
db.heads = append(db.heads, newHead)
|
db.heads = append(db.heads, newHead)
|
||||||
|
|
||||||
|
|
33
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
33
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
|
@ -57,6 +57,7 @@ type HeadBlock struct {
|
||||||
wal WAL
|
wal WAL
|
||||||
|
|
||||||
activeWriters uint64
|
activeWriters uint64
|
||||||
|
highTimestamp int64
|
||||||
closed bool
|
closed bool
|
||||||
|
|
||||||
// descs holds all chunk descs for the head block. Each chunk implicitly
|
// descs holds all chunk descs for the head block. Each chunk implicitly
|
||||||
|
@ -389,9 +390,14 @@ func (h *HeadBlock) Appender() Appender {
|
||||||
return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()}
|
return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Busy returns true if the block has open write transactions.
|
// ActiveWriters returns true if the block has open write transactions.
|
||||||
func (h *HeadBlock) Busy() bool {
|
func (h *HeadBlock) ActiveWriters() int {
|
||||||
return atomic.LoadUint64(&h.activeWriters) > 0
|
return int(atomic.LoadUint64(&h.activeWriters))
|
||||||
|
}
|
||||||
|
|
||||||
|
// HighTimestamp returns the highest inserted sample timestamp.
|
||||||
|
func (h *HeadBlock) HighTimestamp() int64 {
|
||||||
|
return atomic.LoadInt64(&h.highTimestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
var headPool = sync.Pool{}
|
var headPool = sync.Pool{}
|
||||||
|
@ -416,6 +422,7 @@ type headAppender struct {
|
||||||
newHashes map[uint64]uint64
|
newHashes map[uint64]uint64
|
||||||
|
|
||||||
samples []RefSample
|
samples []RefSample
|
||||||
|
highTimestamp int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type hashedLabels struct {
|
type hashedLabels struct {
|
||||||
|
@ -443,7 +450,7 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, erro
|
||||||
// XXX(fabxc): there's no fast path for multiple samples for the same new series
|
// XXX(fabxc): there's no fast path for multiple samples for the same new series
|
||||||
// in the same transaction. We always return the invalid empty ref. It's has not
|
// in the same transaction. We always return the invalid empty ref. It's has not
|
||||||
// been a relevant use case so far and is not worth the trouble.
|
// been a relevant use case so far and is not worth the trouble.
|
||||||
return nullRef, a.AddFast(string(refb), t, v)
|
return "", a.AddFast(string(refb), t, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The series is completely new.
|
// The series is completely new.
|
||||||
|
@ -464,11 +471,9 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, erro
|
||||||
a.newHashes[hash] = ref
|
a.newHashes[hash] = ref
|
||||||
binary.BigEndian.PutUint64(refb, ref)
|
binary.BigEndian.PutUint64(refb, ref)
|
||||||
|
|
||||||
return nullRef, a.AddFast(string(refb), t, v)
|
return "", a.AddFast(string(refb), t, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
var nullRef = string([]byte{0, 0, 0, 0, 0, 0, 0, 0})
|
|
||||||
|
|
||||||
func (a *headAppender) AddFast(ref string, t int64, v float64) error {
|
func (a *headAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
if len(ref) != 8 {
|
if len(ref) != 8 {
|
||||||
return errors.Wrap(ErrNotFound, "invalid ref length")
|
return errors.Wrap(ErrNotFound, "invalid ref length")
|
||||||
|
@ -513,6 +518,10 @@ func (a *headAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if t > a.highTimestamp {
|
||||||
|
a.highTimestamp = t
|
||||||
|
}
|
||||||
|
|
||||||
a.samples = append(a.samples, RefSample{
|
a.samples = append(a.samples, RefSample{
|
||||||
Ref: refn,
|
Ref: refn,
|
||||||
T: t,
|
T: t,
|
||||||
|
@ -593,6 +602,16 @@ func (a *headAppender) Commit() error {
|
||||||
atomic.AddUint64(&a.meta.Stats.NumSamples, total)
|
atomic.AddUint64(&a.meta.Stats.NumSamples, total)
|
||||||
atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries)))
|
atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries)))
|
||||||
|
|
||||||
|
for {
|
||||||
|
ht := a.HeadBlock.HighTimestamp()
|
||||||
|
if a.highTimestamp <= ht {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if atomic.CompareAndSwapInt64(&a.HeadBlock.highTimestamp, ht, a.highTimestamp) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
18
vendor/github.com/prometheus/tsdb/postings.go
generated
vendored
18
vendor/github.com/prometheus/tsdb/postings.go
generated
vendored
|
@ -78,12 +78,11 @@ func Intersect(its ...Postings) Postings {
|
||||||
if len(its) == 0 {
|
if len(its) == 0 {
|
||||||
return emptyPostings
|
return emptyPostings
|
||||||
}
|
}
|
||||||
a := its[0]
|
if len(its) == 1 {
|
||||||
|
return its[0]
|
||||||
for _, b := range its[1:] {
|
|
||||||
a = newIntersectPostings(a, b)
|
|
||||||
}
|
}
|
||||||
return a
|
l := len(its) / 2
|
||||||
|
return newIntersectPostings(Intersect(its[:l]...), Intersect(its[l:]...))
|
||||||
}
|
}
|
||||||
|
|
||||||
type intersectPostings struct {
|
type intersectPostings struct {
|
||||||
|
@ -145,12 +144,11 @@ func Merge(its ...Postings) Postings {
|
||||||
if len(its) == 0 {
|
if len(its) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
a := its[0]
|
if len(its) == 1 {
|
||||||
|
return its[0]
|
||||||
for _, b := range its[1:] {
|
|
||||||
a = newMergedPostings(a, b)
|
|
||||||
}
|
}
|
||||||
return a
|
l := len(its) / 2
|
||||||
|
return newMergedPostings(Merge(its[:l]...), Merge(its[l:]...))
|
||||||
}
|
}
|
||||||
|
|
||||||
type mergedPostings struct {
|
type mergedPostings struct {
|
||||||
|
|
77
vendor/github.com/prometheus/tsdb/querier.go
generated
vendored
77
vendor/github.com/prometheus/tsdb/querier.go
generated
vendored
|
@ -15,7 +15,6 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/prometheus/tsdb/chunks"
|
"github.com/prometheus/tsdb/chunks"
|
||||||
|
@ -38,7 +37,7 @@ type Querier interface {
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Series represents a single time series.
|
// Series exposes a single time series.
|
||||||
type Series interface {
|
type Series interface {
|
||||||
// Labels returns the complete set of labels identifying the series.
|
// Labels returns the complete set of labels identifying the series.
|
||||||
Labels() labels.Labels
|
Labels() labels.Labels
|
||||||
|
@ -75,22 +74,26 @@ func (s *DB) Querier(mint, maxt int64) Querier {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) LabelValues(n string) ([]string, error) {
|
func (q *querier) LabelValues(n string) ([]string, error) {
|
||||||
if len(q.blocks) == 0 {
|
return q.lvals(q.blocks, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *querier) lvals(qs []Querier, n string) ([]string, error) {
|
||||||
|
if len(qs) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
res, err := q.blocks[0].LabelValues(n)
|
if len(qs) == 1 {
|
||||||
|
return qs[0].LabelValues(n)
|
||||||
|
}
|
||||||
|
l := len(qs) / 2
|
||||||
|
s1, err := q.lvals(qs[:l], n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, bq := range q.blocks[1:] {
|
s2, err := q.lvals(qs[l:], n)
|
||||||
pr, err := bq.LabelValues(n)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Merge new values into deduplicated result.
|
return mergeStrings(s1, s2), nil
|
||||||
res = mergeStrings(res, pr)
|
|
||||||
}
|
|
||||||
return res, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
|
func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
|
||||||
|
@ -98,19 +101,19 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
|
func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
|
||||||
// Sets from different blocks have no time overlap. The reference numbers
|
return q.sel(q.blocks, ms)
|
||||||
// they emit point to series sorted in lexicographic order.
|
|
||||||
// We can fully connect partial series by simply comparing with the previous
|
}
|
||||||
// label set.
|
|
||||||
if len(q.blocks) == 0 {
|
func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet {
|
||||||
|
if len(qs) == 0 {
|
||||||
return nopSeriesSet{}
|
return nopSeriesSet{}
|
||||||
}
|
}
|
||||||
r := q.blocks[0].Select(ms...)
|
if len(qs) == 1 {
|
||||||
|
return qs[0].Select(ms...)
|
||||||
for _, s := range q.blocks[1:] {
|
|
||||||
r = newMergedSeriesSet(r, s.Select(ms...))
|
|
||||||
}
|
}
|
||||||
return r
|
l := len(qs) / 2
|
||||||
|
return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) Close() error {
|
func (q *querier) Close() error {
|
||||||
|
@ -657,10 +660,6 @@ func newChunkSeriesIterator(cs []*ChunkMeta, dranges intervals, mint, maxt int64
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *chunkSeriesIterator) inBounds(t int64) bool {
|
|
||||||
return t >= it.mint && t <= it.maxt
|
|
||||||
}
|
|
||||||
|
|
||||||
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
|
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
|
||||||
if t > it.maxt {
|
if t > it.maxt {
|
||||||
return false
|
return false
|
||||||
|
@ -671,23 +670,13 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
|
||||||
t = it.mint
|
t = it.mint
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only do binary search forward to stay in line with other iterators
|
for ; it.chunks[it.i].MaxTime < t; it.i++ {
|
||||||
// that can only move forward.
|
if it.i == len(it.chunks)-1 {
|
||||||
x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t })
|
return false
|
||||||
x += it.i
|
}
|
||||||
|
|
||||||
// If the timestamp was not found, it might be in the last chunk.
|
|
||||||
if x == len(it.chunks) {
|
|
||||||
x--
|
|
||||||
|
|
||||||
// Go to previous chunk if the chunk doesn't exactly start with t.
|
|
||||||
// If we are already at the first chunk, we use it as it's the best we have.
|
|
||||||
} else if x > 0 && it.chunks[x].MinTime > t {
|
|
||||||
x--
|
|
||||||
}
|
}
|
||||||
|
|
||||||
it.i = x
|
it.cur = it.chunks[it.i].Chunk.Iterator()
|
||||||
it.cur = it.chunks[x].Chunk.Iterator()
|
|
||||||
if len(it.intervals) > 0 {
|
if len(it.intervals) > 0 {
|
||||||
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
|
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
|
||||||
}
|
}
|
||||||
|
@ -708,9 +697,15 @@ func (it *chunkSeriesIterator) At() (t int64, v float64) {
|
||||||
func (it *chunkSeriesIterator) Next() bool {
|
func (it *chunkSeriesIterator) Next() bool {
|
||||||
for it.cur.Next() {
|
for it.cur.Next() {
|
||||||
t, _ := it.cur.At()
|
t, _ := it.cur.At()
|
||||||
if it.inBounds(t) {
|
if t < it.mint {
|
||||||
return true
|
return it.Seek(it.mint)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if t > it.maxt {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := it.cur.Err(); err != nil {
|
if err := it.cur.Err(); err != nil {
|
||||||
|
|
14
vendor/vendor.json
vendored
14
vendor/vendor.json
vendored
|
@ -763,22 +763,22 @@
|
||||||
"revisionTime": "2016-04-11T19:08:41Z"
|
"revisionTime": "2016-04-11T19:08:41Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "XXXDHMZe3Y3gosaF/1staHm3INc=",
|
"checksumSHA1": "kT9X/dKXjFCoxV48N2C9NZhPRvA=",
|
||||||
"path": "github.com/prometheus/tsdb",
|
"path": "github.com/prometheus/tsdb",
|
||||||
"revision": "9963a4c7c3b2a742e00a63c54084b051e3174b06",
|
"revision": "d492bfd973c24026ab784c1c1821af426bc80e90",
|
||||||
"revisionTime": "2017-06-12T09:17:49Z"
|
"revisionTime": "2017-06-30T13:17:34Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=",
|
"checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=",
|
||||||
"path": "github.com/prometheus/tsdb/chunks",
|
"path": "github.com/prometheus/tsdb/chunks",
|
||||||
"revision": "4f2eb2057ee0a7f2b984503886bff970a9dab1a8",
|
"revision": "d492bfd973c24026ab784c1c1821af426bc80e90",
|
||||||
"revisionTime": "2017-05-22T06:49:09Z"
|
"revisionTime": "2017-06-30T13:17:34Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=",
|
"checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=",
|
||||||
"path": "github.com/prometheus/tsdb/labels",
|
"path": "github.com/prometheus/tsdb/labels",
|
||||||
"revision": "4f2eb2057ee0a7f2b984503886bff970a9dab1a8",
|
"revision": "d492bfd973c24026ab784c1c1821af426bc80e90",
|
||||||
"revisionTime": "2017-05-22T06:49:09Z"
|
"revisionTime": "2017-06-30T13:17:34Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
||||||
|
|
Loading…
Reference in a new issue