vendor: update prometheus/tsdb

This commit is contained in:
Fabian Reinartz 2017-10-23 22:00:05 +02:00
parent a57ea79660
commit b5d1ec53da
8 changed files with 125 additions and 88 deletions

View file

@ -69,6 +69,17 @@ type Iterator interface {
Next() bool Next() bool
} }
// NewNopIterator returns a new chunk iterator that does not hold any data.
func NewNopIterator() Iterator {
return nopIterator{}
}
type nopIterator struct{}
func (nopIterator) At() (int64, float64) { return 0, 0 }
func (nopIterator) Next() bool { return false }
func (nopIterator) Err() error { return nil }
type Pool interface { type Pool interface {
Put(Chunk) error Put(Chunk) error
Get(e Encoding, b []byte) (Chunk, error) Get(e Encoding, b []byte) (Chunk, error)

View file

@ -81,30 +81,30 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
m := &compactorMetrics{} m := &compactorMetrics{}
m.ran = prometheus.NewCounter(prometheus.CounterOpts{ m.ran = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_total", Name: "prometheus_tsdb_compactions_total",
Help: "Total number of compactions that were executed for the partition.", Help: "Total number of compactions that were executed for the partition.",
}) })
m.failed = prometheus.NewCounter(prometheus.CounterOpts{ m.failed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_failed_total", Name: "prometheus_tsdb_compactions_failed_total",
Help: "Total number of compactions that failed for the partition.", Help: "Total number of compactions that failed for the partition.",
}) })
m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_duration", Name: "prometheus_tsdb_compaction_duration",
Help: "Duration of compaction runs.", Help: "Duration of compaction runs.",
Buckets: prometheus.ExponentialBuckets(1, 2, 10), Buckets: prometheus.ExponentialBuckets(1, 2, 10),
}) })
m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{ m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_size", Name: "prometheus_tsdb_compaction_chunk_size",
Help: "Final size of chunks on their first compaction", Help: "Final size of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(32, 1.5, 12), Buckets: prometheus.ExponentialBuckets(32, 1.5, 12),
}) })
m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{ m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_samples", Name: "prometheus_tsdb_compaction_chunk_samples",
Help: "Final number of samples on their first compaction", Help: "Final number of samples on their first compaction",
Buckets: prometheus.ExponentialBuckets(4, 1.5, 12), Buckets: prometheus.ExponentialBuckets(4, 1.5, 12),
}) })
m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{ m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_range", Name: "prometheus_tsdb_compaction_chunk_range",
Help: "Final time range of chunks on their first compaction", Help: "Final time range of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(100, 4, 10), Buckets: prometheus.ExponentialBuckets(100, 4, 10),
}) })

View file

@ -129,7 +129,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m := &dbMetrics{} m := &dbMetrics{}
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_blocks_loaded", Name: "prometheus_tsdb_blocks_loaded",
Help: "Number of currently loaded data blocks", Help: "Number of currently loaded data blocks",
}, func() float64 { }, func() float64 {
db.mtx.RLock() db.mtx.RLock()
@ -137,15 +137,15 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
return float64(len(db.blocks)) return float64(len(db.blocks))
}) })
m.reloads = prometheus.NewCounter(prometheus.CounterOpts{ m.reloads = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_reloads_total", Name: "prometheus_tsdb_reloads_total",
Help: "Number of times the database reloaded block data from disk.", Help: "Number of times the database reloaded block data from disk.",
}) })
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{ m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_reloads_failures_total", Name: "prometheus_tsdb_reloads_failures_total",
Help: "Number of times the database failed to reload black data from disk.", Help: "Number of times the database failed to reload black data from disk.",
}) })
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{ m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_triggered_total", Name: "prometheus_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.", Help: "Total number of triggered compactions for the partition.",
}) })
@ -448,9 +448,6 @@ func (db *DB) reload() (err error) {
db.metrics.reloads.Inc() db.metrics.reloads.Inc()
}() }()
var cs []io.Closer
defer func() { closeAll(cs...) }()
dirs, err := blockDirs(db.dir) dirs, err := blockDirs(db.dir)
if err != nil { if err != nil {
return errors.Wrap(err, "find blocks") return errors.Wrap(err, "find blocks")
@ -482,25 +479,25 @@ func (db *DB) reload() (err error) {
return errors.Wrap(err, "invalid block sequence") return errors.Wrap(err, "invalid block sequence")
} }
// Close all opened blocks that no longer exist after we returned all locks. // Swap in new blocks first for subsequently created readers to be seen.
// TODO(fabxc: probably races with querier still reading from them. Can // Then close previous blocks, which may block for pending readers to complete.
// we just abandon them and have the open FDs be GC'd automatically eventually?
for _, b := range db.blocks {
if _, ok := exist[b.Meta().ULID]; !ok {
cs = append(cs, b)
}
}
db.mtx.Lock() db.mtx.Lock()
oldBlocks := db.blocks
db.blocks = blocks db.blocks = blocks
db.mtx.Unlock() db.mtx.Unlock()
for _, b := range oldBlocks {
if _, ok := exist[b.Meta().ULID]; !ok {
b.Close()
}
}
// Garbage collect data in the head if the most recent persisted block // Garbage collect data in the head if the most recent persisted block
// covers data of its current time range. // covers data of its current time range.
if len(blocks) == 0 { if len(blocks) == 0 {
return nil return nil
} }
maxt := blocks[len(db.blocks)-1].Meta().MaxTime maxt := blocks[len(blocks)-1].Meta().MaxTime
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
} }
@ -593,7 +590,10 @@ func (db *DB) Snapshot(dir string) error {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
for _, b := range db.Blocks() { db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
level.Info(db.logger).Log("msg", "snapshotting block", "block", b) level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil { if err := b.Snapshot(dir); err != nil {
@ -608,7 +608,10 @@ func (db *DB) Snapshot(dir string) error {
func (db *DB) Querier(mint, maxt int64) (Querier, error) { func (db *DB) Querier(mint, maxt int64) (Querier, error) {
var blocks []BlockReader var blocks []BlockReader
for _, b := range db.Blocks() { db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
m := b.Meta() m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
blocks = append(blocks, b) blocks = append(blocks, b)
@ -623,10 +626,15 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
} }
for _, b := range blocks { for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt) q, err := NewBlockQuerier(b, mint, maxt)
if err != nil { if err == nil {
return nil, errors.Wrapf(err, "open querier for block %s", b) sq.blocks = append(sq.blocks, q)
continue
} }
sq.blocks = append(sq.blocks, q) // If we fail, all previously opened queriers must be closed.
for _, q := range sq.blocks {
q.Close()
}
return nil, errors.Wrapf(err, "open querier for block %s", b)
} }
return sq, nil return sq, nil
} }
@ -643,7 +651,10 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
var g errgroup.Group var g errgroup.Group
for _, b := range db.Blocks() { db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
m := b.Meta() m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
g.Go(func(b *Block) func() error { g.Go(func(b *Block) func() error {

View file

@ -89,59 +89,59 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m := &headMetrics{} m := &headMetrics{}
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{ m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_active_appenders", Name: "prometheus_tsdb_head_active_appenders",
Help: "Number of currently active appender transactions", Help: "Number of currently active appender transactions",
}) })
m.series = prometheus.NewGauge(prometheus.GaugeOpts{ m.series = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series", Name: "prometheus_tsdb_head_series",
Help: "Total number of series in the head block.", Help: "Total number of series in the head block.",
}) })
m.seriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{ m.seriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series_created_total", Name: "prometheus_tsdb_head_series_created_total",
Help: "Total number of series created in the head", Help: "Total number of series created in the head",
}) })
m.seriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ m.seriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series_removed_total", Name: "prometheus_tsdb_head_series_removed_total",
Help: "Total number of series removed in the head", Help: "Total number of series removed in the head",
}) })
m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{ m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_head_series_not_found", Name: "prometheus_tsdb_head_series_not_found",
Help: "Total number of requests for series that were not found.", Help: "Total number of requests for series that were not found.",
}) })
m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{ m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks", Name: "prometheus_tsdb_head_chunks",
Help: "Total number of chunks in the head block.", Help: "Total number of chunks in the head block.",
}) })
m.chunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{ m.chunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks_created_total", Name: "prometheus_tsdb_head_chunks_created_total",
Help: "Total number of chunks created in the head", Help: "Total number of chunks created in the head",
}) })
m.chunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ m.chunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks_removed_total", Name: "prometheus_tsdb_head_chunks_removed_total",
Help: "Total number of chunks removed in the head", Help: "Total number of chunks removed in the head",
}) })
m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{ m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_head_gc_duration_seconds", Name: "prometheus_tsdb_head_gc_duration_seconds",
Help: "Runtime of garbage collection in the head block.", Help: "Runtime of garbage collection in the head block.",
}) })
m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_max_time", Name: "prometheus_tsdb_head_max_time",
Help: "Maximum timestamp of the head block.", Help: "Maximum timestamp of the head block.",
}, func() float64 { }, func() float64 {
return float64(h.MaxTime()) return float64(h.MaxTime())
}) })
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_min_time", Name: "prometheus_tsdb_head_min_time",
Help: "Minimum time bound of the head block.", Help: "Minimum time bound of the head block.",
}, func() float64 { }, func() float64 {
return float64(h.MinTime()) return float64(h.MinTime())
}) })
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_wal_truncate_duration_seconds", Name: "prometheus_tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.", Help: "Duration of WAL truncation.",
}) })
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_head_samples_appended_total", Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended sampledb.", Help: "Total number of appended sampledb.",
}) })
@ -273,13 +273,23 @@ func (h *Head) ReadWAL() error {
} }
} }
samplesFunc := func(samples []RefSample) { samplesFunc := func(samples []RefSample) {
var buf []RefSample // We split up the samples into chunks of 5000 samples or less.
select { // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
case buf = <-input: // cause thousands of very large in flight buffers occupying large amounts
default: // of unused memory.
buf = make([]RefSample, 0, len(samples)*11/10) for len(samples) > 0 {
n := 5000
if len(samples) < n {
n = len(samples)
}
var buf []RefSample
select {
case buf = <-input:
default:
}
firstInput <- append(buf[:0], samples[:n]...)
samples = samples[n:]
} }
firstInput <- append(buf[:0], samples...)
} }
deletesFunc := func(stones []Stone) { deletesFunc := func(stones []Stone) {
for _, s := range stones { for _, s := range stones {
@ -665,7 +675,7 @@ func (h *Head) gc() {
// Rebuild symbols and label value indices from what is left in the postings terms. // Rebuild symbols and label value indices from what is left in the postings terms.
h.postings.mtx.RLock() h.postings.mtx.RLock()
symbols := make(map[string]struct{}, len(h.symbols)) symbols := make(map[string]struct{})
values := make(map[string]stringset, len(h.values)) values := make(map[string]stringset, len(h.values))
for t := range h.postings.m { for t := range h.postings.m {
@ -1152,6 +1162,10 @@ func (s *memSeries) cut(mint int64) *memChunk {
} }
s.chunks = append(s.chunks, c) s.chunks = append(s.chunks, c)
// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
_, s.nextAt = rangeForTimestamp(mint, s.chunkRange)
app, err := c.chunk.Appender() app, err := c.chunk.Appender()
if err != nil { if err != nil {
panic(err) panic(err)
@ -1231,10 +1245,17 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
} }
numSamples := c.chunk.NumSamples() numSamples := c.chunk.NumSamples()
// Out of order sample.
if c.maxTime >= t { if c.maxTime >= t {
return false, chunkCreated return false, chunkCreated
} }
if numSamples > samplesPerChunk/4 && t >= s.nextAt { // If we reach 25% of a chunk's desired sample count, set a definitive time
// at which to start the next chunk.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == samplesPerChunk/4 {
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
}
if t >= s.nextAt {
c = s.cut(t) c = s.cut(t)
chunkCreated = true chunkCreated = true
} }
@ -1242,11 +1263,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
c.maxTime = t c.maxTime = t
if numSamples == samplesPerChunk/4 {
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
}
s.lastValue = v s.lastValue = v
s.sampleBuf[0] = s.sampleBuf[1] s.sampleBuf[0] = s.sampleBuf[1]
@ -1270,6 +1286,12 @@ func computeChunkEndTime(start, cur, max int64) int64 {
func (s *memSeries) iterator(id int) chunks.Iterator { func (s *memSeries) iterator(id int) chunks.Iterator {
c := s.chunk(id) c := s.chunk(id)
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk,
// which got then garbage collected before it got accessed.
// We must ensure to not garbage collect as long as any readers still hold a reference.
if c == nil {
return chunks.NewNopIterator()
}
if id-s.firstChunkID < len(s.chunks)-1 { if id-s.firstChunkID < len(s.chunks)-1 {
return c.chunk.Iterator() return c.chunk.Iterator()

View file

@ -114,10 +114,13 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) {
} }
chunkr, err := b.Chunks() chunkr, err := b.Chunks()
if err != nil { if err != nil {
indexr.Close()
return nil, errors.Wrapf(err, "open chunk reader") return nil, errors.Wrapf(err, "open chunk reader")
} }
tombsr, err := b.Tombstones() tombsr, err := b.Tombstones()
if err != nil { if err != nil {
indexr.Close()
chunkr.Close()
return nil, errors.Wrapf(err, "open tombstone reader") return nil, errors.Wrapf(err, "open tombstone reader")
} }
return &blockQuerier{ return &blockQuerier{
@ -532,7 +535,6 @@ func (s *populatedChunkSeries) Next() bool {
return false return false
} }
} }
if len(chks) == 0 { if len(chks) == 0 {
continue continue
} }

View file

@ -1,18 +0,0 @@
package tsdb
import (
"io"
"text/tabwriter"
)
const (
minwidth = 0
tabwidth = 0
padding = 2
padchar = ' '
flags = 0
)
func GetNewTabWriter(output io.Writer) *tabwriter.Writer {
return tabwriter.NewWriter(output, minwidth, tabwidth, padding, padchar, flags)
}

View file

@ -222,12 +222,21 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration,
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, fn := range fns {
for i, fn := range fns {
f, err := w.openSegmentFile(fn) f, err := w.openSegmentFile(fn)
if err != nil { if err == nil {
return nil, err w.files = append(w.files, newSegmentFile(f))
continue
} }
w.files = append(w.files, newSegmentFile(f)) level.Warn(logger).Log("msg", "invalid segment file detected, truncating WAL", "err", err, "file", fn)
for _, fn := range fns[i:] {
if err := os.Remove(fn); err != nil {
return w, errors.Wrap(err, "removing segment failed")
}
}
break
} }
go w.run(flushInterval) go w.run(flushInterval)

20
vendor/vendor.json vendored
View file

@ -843,28 +843,28 @@
"revisionTime": "2016-04-11T19:08:41Z" "revisionTime": "2016-04-11T19:08:41Z"
}, },
{ {
"checksumSHA1": "h3i8+wLSIqLvWBWjNPcARM0IQik=", "checksumSHA1": "qbhdcw451oyIWXj+0zlkR+rDi9Y=",
"path": "github.com/prometheus/tsdb", "path": "github.com/prometheus/tsdb",
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", "revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce",
"revisionTime": "2017-10-12T13:27:08Z" "revisionTime": "2017-10-25T14:52:11Z"
}, },
{ {
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=", "checksumSHA1": "uy6ySJ6EZqof+yMD2wTkYob8BeU=",
"path": "github.com/prometheus/tsdb/chunks", "path": "github.com/prometheus/tsdb/chunks",
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", "revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce",
"revisionTime": "2017-10-12T13:27:08Z" "revisionTime": "2017-10-25T14:52:11Z"
}, },
{ {
"checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=", "checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=",
"path": "github.com/prometheus/tsdb/fileutil", "path": "github.com/prometheus/tsdb/fileutil",
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", "revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce",
"revisionTime": "2017-10-12T13:27:08Z" "revisionTime": "2017-10-25T14:52:11Z"
}, },
{ {
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
"path": "github.com/prometheus/tsdb/labels", "path": "github.com/prometheus/tsdb/labels",
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", "revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce",
"revisionTime": "2017-10-12T13:27:08Z" "revisionTime": "2017-10-25T14:52:11Z"
}, },
{ {
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",