diff --git a/.codeclimate.yml b/.codeclimate.yml deleted file mode 100644 index 6db7af0809..0000000000 --- a/.codeclimate.yml +++ /dev/null @@ -1,15 +0,0 @@ -engines: - gofmt: - enabled: true - golint: - enabled: true - govet: - enabled: true -ratings: - paths: - - "**.go" -exclude_paths: -- "/storage/remote/remote.pb.go" -- vendor/ -- web/ui/static/vendor/ -- "/web/ui/bindata.go" diff --git a/CHANGELOG.md b/CHANGELOG.md index 6233359d6a..3a93187274 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,18 @@ -## 2.0.0-rc.1 / 2017-10-17 +## 2.0.0-rc.2 / 2017-10-25 + +* [ENHANCEMENT] handle WAL segments with corrupted header gracefully +* [ENHANCEMENT] stabilize memory usage during WAL replay +* [CHANGE] Prefix all storage metrics with `prometheus_` +* [BUGFIX] Correctly handle label removal in remote read +* [BUGFIX] Fix chunk misalignment causing out-of-order samples +* [BUGFIX] Fix connection leak in Consul SD +* [BUGFIX] Handle invalid chunk derefernces gracefully +* [BUGFIX] Prevent potential deadlock during failing querier construction + +Data written in previous pre-release versions may have been affected by the out-of-order +bug. Reading this data may reveal artefacts and incorrect data. +Starting with a clean storage directory is advised. The WAL directory may safely be kept. -* [FEATURE] Added a warning for time-drift between the browser and the prometheus-server. -* [ENHANCEMENT] Much faster WAL read-back on restart. -* [BUGFIX] Fixed Remote-read to not drop the first series. -* [BUGFIX] Validate recording-rule names. -* [BUGFIX] Fix several races. -* [BUGFIX] Only close blocks if there are no iterators accessing it. ## 1.8.0 / 2017-10-06 diff --git a/VERSION b/VERSION index 97041a78cf..c8f3a156f7 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.0.0-rc.1 +2.0.0-rc.2 diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index c7095e5cdd..72527c540c 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -126,6 +126,10 @@ type Options struct { // Open returns a new storage backed by a TSDB database that is configured for Prometheus. func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*tsdb.DB, error) { + if opts.MinBlockDuration > opts.MaxBlockDuration { + return nil, errors.Errorf("tsdb max block duration (%v) must be larger than min block duration (%v)", + opts.MaxBlockDuration, opts.MinBlockDuration) + } // Start with smallest block duration and create exponential buckets until the exceed the // configured maximum block duration. rngs := tsdb.ExponentialBlockRanges(int64(time.Duration(opts.MinBlockDuration).Seconds()*1000), 10, 3) diff --git a/vendor/github.com/prometheus/tsdb/chunks/chunk.go b/vendor/github.com/prometheus/tsdb/chunks/chunk.go index 181693fae1..4d298041e1 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/chunk.go +++ b/vendor/github.com/prometheus/tsdb/chunks/chunk.go @@ -69,6 +69,17 @@ type Iterator interface { Next() bool } +// NewNopIterator returns a new chunk iterator that does not hold any data. +func NewNopIterator() Iterator { + return nopIterator{} +} + +type nopIterator struct{} + +func (nopIterator) At() (int64, float64) { return 0, 0 } +func (nopIterator) Next() bool { return false } +func (nopIterator) Err() error { return nil } + type Pool interface { Put(Chunk) error Get(e Encoding, b []byte) (Chunk, error) diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 5b66082aab..a70918a288 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -81,30 +81,30 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { m := &compactorMetrics{} m.ran = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_compactions_total", + Name: "prometheus_tsdb_compactions_total", Help: "Total number of compactions that were executed for the partition.", }) m.failed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_compactions_failed_total", + Name: "prometheus_tsdb_compactions_failed_total", Help: "Total number of compactions that failed for the partition.", }) m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_compaction_duration", + Name: "prometheus_tsdb_compaction_duration", Help: "Duration of compaction runs.", Buckets: prometheus.ExponentialBuckets(1, 2, 10), }) m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_compaction_chunk_size", + Name: "prometheus_tsdb_compaction_chunk_size", Help: "Final size of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(32, 1.5, 12), }) m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_compaction_chunk_samples", + Name: "prometheus_tsdb_compaction_chunk_samples", Help: "Final number of samples on their first compaction", Buckets: prometheus.ExponentialBuckets(4, 1.5, 12), }) m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_compaction_chunk_range", + Name: "prometheus_tsdb_compaction_chunk_range", Help: "Final time range of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(100, 4, 10), }) diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index ff1763762b..9d472e91a7 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -129,7 +129,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m := &dbMetrics{} m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_blocks_loaded", + Name: "prometheus_tsdb_blocks_loaded", Help: "Number of currently loaded data blocks", }, func() float64 { db.mtx.RLock() @@ -137,15 +137,15 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return float64(len(db.blocks)) }) m.reloads = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_reloads_total", + Name: "prometheus_tsdb_reloads_total", Help: "Number of times the database reloaded block data from disk.", }) m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_reloads_failures_total", + Name: "prometheus_tsdb_reloads_failures_total", Help: "Number of times the database failed to reload black data from disk.", }) m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_compactions_triggered_total", + Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) @@ -448,9 +448,6 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - var cs []io.Closer - defer func() { closeAll(cs...) }() - dirs, err := blockDirs(db.dir) if err != nil { return errors.Wrap(err, "find blocks") @@ -482,25 +479,25 @@ func (db *DB) reload() (err error) { return errors.Wrap(err, "invalid block sequence") } - // Close all opened blocks that no longer exist after we returned all locks. - // TODO(fabxc: probably races with querier still reading from them. Can - // we just abandon them and have the open FDs be GC'd automatically eventually? - for _, b := range db.blocks { - if _, ok := exist[b.Meta().ULID]; !ok { - cs = append(cs, b) - } - } - + // Swap in new blocks first for subsequently created readers to be seen. + // Then close previous blocks, which may block for pending readers to complete. db.mtx.Lock() + oldBlocks := db.blocks db.blocks = blocks db.mtx.Unlock() + for _, b := range oldBlocks { + if _, ok := exist[b.Meta().ULID]; !ok { + b.Close() + } + } + // Garbage collect data in the head if the most recent persisted block // covers data of its current time range. if len(blocks) == 0 { return nil } - maxt := blocks[len(db.blocks)-1].Meta().MaxTime + maxt := blocks[len(blocks)-1].Meta().MaxTime return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } @@ -593,7 +590,10 @@ func (db *DB) Snapshot(dir string) error { db.cmtx.Lock() defer db.cmtx.Unlock() - for _, b := range db.Blocks() { + db.mtx.RLock() + defer db.mtx.RUnlock() + + for _, b := range db.blocks { level.Info(db.logger).Log("msg", "snapshotting block", "block", b) if err := b.Snapshot(dir); err != nil { @@ -608,7 +608,10 @@ func (db *DB) Snapshot(dir string) error { func (db *DB) Querier(mint, maxt int64) (Querier, error) { var blocks []BlockReader - for _, b := range db.Blocks() { + db.mtx.RLock() + defer db.mtx.RUnlock() + + for _, b := range db.blocks { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { blocks = append(blocks, b) @@ -623,10 +626,15 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { } for _, b := range blocks { q, err := NewBlockQuerier(b, mint, maxt) - if err != nil { - return nil, errors.Wrapf(err, "open querier for block %s", b) + if err == nil { + sq.blocks = append(sq.blocks, q) + continue } - sq.blocks = append(sq.blocks, q) + // If we fail, all previously opened queriers must be closed. + for _, q := range sq.blocks { + q.Close() + } + return nil, errors.Wrapf(err, "open querier for block %s", b) } return sq, nil } @@ -643,7 +651,10 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { var g errgroup.Group - for _, b := range db.Blocks() { + db.mtx.RLock() + defer db.mtx.RUnlock() + + for _, b := range db.blocks { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { g.Go(func(b *Block) func() error { diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 37126363dc..92363659f8 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -89,59 +89,59 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m := &headMetrics{} m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_active_appenders", + Name: "prometheus_tsdb_head_active_appenders", Help: "Number of currently active appender transactions", }) m.series = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_series", + Name: "prometheus_tsdb_head_series", Help: "Total number of series in the head block.", }) m.seriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_series_created_total", + Name: "prometheus_tsdb_head_series_created_total", Help: "Total number of series created in the head", }) m.seriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_series_removed_total", + Name: "prometheus_tsdb_head_series_removed_total", Help: "Total number of series removed in the head", }) m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_head_series_not_found", + Name: "prometheus_tsdb_head_series_not_found", Help: "Total number of requests for series that were not found.", }) m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_chunks", + Name: "prometheus_tsdb_head_chunks", Help: "Total number of chunks in the head block.", }) m.chunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_chunks_created_total", + Name: "prometheus_tsdb_head_chunks_created_total", Help: "Total number of chunks created in the head", }) m.chunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_chunks_removed_total", + Name: "prometheus_tsdb_head_chunks_removed_total", Help: "Total number of chunks removed in the head", }) m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tsdb_head_gc_duration_seconds", + Name: "prometheus_tsdb_head_gc_duration_seconds", Help: "Runtime of garbage collection in the head block.", }) m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_head_max_time", + Name: "prometheus_tsdb_head_max_time", Help: "Maximum timestamp of the head block.", }, func() float64 { return float64(h.MaxTime()) }) m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_head_min_time", + Name: "prometheus_tsdb_head_min_time", Help: "Minimum time bound of the head block.", }, func() float64 { return float64(h.MinTime()) }) m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tsdb_wal_truncate_duration_seconds", + Name: "prometheus_tsdb_wal_truncate_duration_seconds", Help: "Duration of WAL truncation.", }) m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_head_samples_appended_total", + Name: "prometheus_tsdb_head_samples_appended_total", Help: "Total number of appended sampledb.", }) @@ -273,13 +273,23 @@ func (h *Head) ReadWAL() error { } } samplesFunc := func(samples []RefSample) { - var buf []RefSample - select { - case buf = <-input: - default: - buf = make([]RefSample, 0, len(samples)*11/10) + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(samples) > 0 { + n := 5000 + if len(samples) < n { + n = len(samples) + } + var buf []RefSample + select { + case buf = <-input: + default: + } + firstInput <- append(buf[:0], samples[:n]...) + samples = samples[n:] } - firstInput <- append(buf[:0], samples...) } deletesFunc := func(stones []Stone) { for _, s := range stones { @@ -665,7 +675,7 @@ func (h *Head) gc() { // Rebuild symbols and label value indices from what is left in the postings terms. h.postings.mtx.RLock() - symbols := make(map[string]struct{}, len(h.symbols)) + symbols := make(map[string]struct{}) values := make(map[string]stringset, len(h.values)) for t := range h.postings.m { @@ -1152,6 +1162,10 @@ func (s *memSeries) cut(mint int64) *memChunk { } s.chunks = append(s.chunks, c) + // Set upper bound on when the next chunk must be started. An earlier timestamp + // may be chosen dynamically at a later point. + _, s.nextAt = rangeForTimestamp(mint, s.chunkRange) + app, err := c.chunk.Appender() if err != nil { panic(err) @@ -1231,10 +1245,17 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { } numSamples := c.chunk.NumSamples() + // Out of order sample. if c.maxTime >= t { return false, chunkCreated } - if numSamples > samplesPerChunk/4 && t >= s.nextAt { + // If we reach 25% of a chunk's desired sample count, set a definitive time + // at which to start the next chunk. + // At latest it must happen at the timestamp set when the chunk was cut. + if numSamples == samplesPerChunk/4 { + s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) + } + if t >= s.nextAt { c = s.cut(t) chunkCreated = true } @@ -1242,11 +1263,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c.maxTime = t - if numSamples == samplesPerChunk/4 { - _, maxt := rangeForTimestamp(c.minTime, s.chunkRange) - s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt) - } - s.lastValue = v s.sampleBuf[0] = s.sampleBuf[1] @@ -1270,6 +1286,12 @@ func computeChunkEndTime(start, cur, max int64) int64 { func (s *memSeries) iterator(id int) chunks.Iterator { c := s.chunk(id) + // TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk, + // which got then garbage collected before it got accessed. + // We must ensure to not garbage collect as long as any readers still hold a reference. + if c == nil { + return chunks.NewNopIterator() + } if id-s.firstChunkID < len(s.chunks)-1 { return c.chunk.Iterator() diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index 12e05e79df..ed8b64ceac 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -114,10 +114,13 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { } chunkr, err := b.Chunks() if err != nil { + indexr.Close() return nil, errors.Wrapf(err, "open chunk reader") } tombsr, err := b.Tombstones() if err != nil { + indexr.Close() + chunkr.Close() return nil, errors.Wrapf(err, "open tombstone reader") } return &blockQuerier{ @@ -532,7 +535,6 @@ func (s *populatedChunkSeries) Next() bool { return false } } - if len(chks) == 0 { continue } diff --git a/vendor/github.com/prometheus/tsdb/tabwriter.go b/vendor/github.com/prometheus/tsdb/tabwriter.go deleted file mode 100644 index 8e84a9c67a..0000000000 --- a/vendor/github.com/prometheus/tsdb/tabwriter.go +++ /dev/null @@ -1,18 +0,0 @@ -package tsdb - -import ( - "io" - "text/tabwriter" -) - -const ( - minwidth = 0 - tabwidth = 0 - padding = 2 - padchar = ' ' - flags = 0 -) - -func GetNewTabWriter(output io.Writer) *tabwriter.Writer { - return tabwriter.NewWriter(output, minwidth, tabwidth, padding, padchar, flags) -} diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index 5c6e78d2eb..225851de12 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -222,12 +222,21 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, if err != nil { return nil, err } - for _, fn := range fns { + + for i, fn := range fns { f, err := w.openSegmentFile(fn) - if err != nil { - return nil, err + if err == nil { + w.files = append(w.files, newSegmentFile(f)) + continue } - w.files = append(w.files, newSegmentFile(f)) + level.Warn(logger).Log("msg", "invalid segment file detected, truncating WAL", "err", err, "file", fn) + + for _, fn := range fns[i:] { + if err := os.Remove(fn); err != nil { + return w, errors.Wrap(err, "removing segment failed") + } + } + break } go w.run(flushInterval) diff --git a/vendor/vendor.json b/vendor/vendor.json index 50e1577eaf..141e19a53b 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -843,28 +843,28 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "h3i8+wLSIqLvWBWjNPcARM0IQik=", + "checksumSHA1": "qbhdcw451oyIWXj+0zlkR+rDi9Y=", "path": "github.com/prometheus/tsdb", - "revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", - "revisionTime": "2017-10-12T13:27:08Z" + "revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce", + "revisionTime": "2017-10-25T14:52:11Z" }, { - "checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=", + "checksumSHA1": "uy6ySJ6EZqof+yMD2wTkYob8BeU=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", - "revisionTime": "2017-10-12T13:27:08Z" + "revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce", + "revisionTime": "2017-10-25T14:52:11Z" }, { "checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=", "path": "github.com/prometheus/tsdb/fileutil", - "revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", - "revisionTime": "2017-10-12T13:27:08Z" + "revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce", + "revisionTime": "2017-10-25T14:52:11Z" }, { "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "path": "github.com/prometheus/tsdb/labels", - "revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", - "revisionTime": "2017-10-12T13:27:08Z" + "revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce", + "revisionTime": "2017-10-25T14:52:11Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",