vendor: update prometheus/tsdb (#4631)

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
This commit is contained in:
Goutham Veeramachaneni 2018-09-19 14:35:33 +05:30 committed by GitHub
parent 2d7f562ed6
commit 5a24560d2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 119 additions and 60 deletions

View file

@ -15,6 +15,7 @@
package tsdb package tsdb
import ( import (
"encoding/binary"
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
"os" "os"
@ -248,6 +249,10 @@ type Block struct {
dir string dir string
meta BlockMeta meta BlockMeta
// Symbol Table Size in bytes.
// We maintain this variable to avoid recalculation everytime.
symbolTableSize uint64
chunkr ChunkReader chunkr ChunkReader
indexr IndexReader indexr IndexReader
tombstones TombstoneReader tombstones TombstoneReader
@ -275,12 +280,23 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return nil, err return nil, err
} }
// Calculating symbol table size.
tmp := make([]byte, 8)
symTblSize := uint64(0)
for _, v := range ir.SymbolTable() {
// Size of varint length of the symbol.
symTblSize += uint64(binary.PutUvarint(tmp, uint64(len(v))))
// Size of the symbol.
symTblSize += uint64(len(v))
}
pb := &Block{ pb := &Block{
dir: dir, dir: dir,
meta: *meta, meta: *meta,
chunkr: cr, chunkr: cr,
indexr: ir, indexr: ir,
tombstones: tr, tombstones: tr,
symbolTableSize: symTblSize,
} }
return pb, nil return pb, nil
} }
@ -350,6 +366,11 @@ func (pb *Block) Tombstones() (TombstoneReader, error) {
return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil
} }
// GetSymbolTableSize returns the Symbol Table Size in the index of this block.
func (pb *Block) GetSymbolTableSize() uint64 {
return pb.symbolTableSize
}
func (pb *Block) setCompactionFailed() error { func (pb *Block) setCompactionFailed() error {
pb.meta.Compaction.Failed = true pb.meta.Compaction.Failed = true
return writeMetaFile(pb.dir, &pb.meta) return writeMetaFile(pb.dir, &pb.meta)

View file

@ -97,7 +97,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Buckets: prometheus.ExponentialBuckets(1, 2, 10), Buckets: prometheus.ExponentialBuckets(1, 2, 10),
}) })
m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{ m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_chunk_size", Name: "prometheus_tsdb_compaction_chunk_size_bytes",
Help: "Final size of chunks on their first compaction", Help: "Final size of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(32, 1.5, 12), Buckets: prometheus.ExponentialBuckets(32, 1.5, 12),
}) })
@ -107,7 +107,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Buckets: prometheus.ExponentialBuckets(4, 1.5, 12), Buckets: prometheus.ExponentialBuckets(4, 1.5, 12),
}) })
m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{ m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_chunk_range", Name: "prometheus_tsdb_compaction_chunk_range_seconds",
Help: "Final time range of chunks on their first compaction", Help: "Final time range of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(100, 4, 10), Buckets: prometheus.ExponentialBuckets(100, 4, 10),
}) })

View file

@ -119,11 +119,13 @@ type DB struct {
type dbMetrics struct { type dbMetrics struct {
loadedBlocks prometheus.GaugeFunc loadedBlocks prometheus.GaugeFunc
symbolTableSize prometheus.GaugeFunc
reloads prometheus.Counter reloads prometheus.Counter
reloadsFailed prometheus.Counter reloadsFailed prometheus.Counter
compactionsTriggered prometheus.Counter compactionsTriggered prometheus.Counter
cutoffs prometheus.Counter cutoffs prometheus.Counter
cutoffsFailed prometheus.Counter cutoffsFailed prometheus.Counter
startTime prometheus.GaugeFunc
tombCleanTimer prometheus.Histogram tombCleanTimer prometheus.Histogram
} }
@ -138,6 +140,19 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
defer db.mtx.RUnlock() defer db.mtx.RUnlock()
return float64(len(db.blocks)) return float64(len(db.blocks))
}) })
m.symbolTableSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_symbol_table_size_bytes",
Help: "Size of symbol table on disk (in bytes)",
}, func() float64 {
db.mtx.RLock()
blocks := db.blocks[:]
db.mtx.RUnlock()
symTblSize := uint64(0)
for _, b := range blocks {
symTblSize += b.GetSymbolTableSize()
}
return float64(symTblSize)
})
m.reloads = prometheus.NewCounter(prometheus.CounterOpts{ m.reloads = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_reloads_total", Name: "prometheus_tsdb_reloads_total",
Help: "Number of times the database reloaded block data from disk.", Help: "Number of times the database reloaded block data from disk.",
@ -158,6 +173,17 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_retention_cutoffs_failures_total", Name: "prometheus_tsdb_retention_cutoffs_failures_total",
Help: "Number of times the database failed to cut off block data from disk.", Help: "Number of times the database failed to cut off block data from disk.",
}) })
m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_lowest_timestamp",
Help: "Lowest timestamp value stored in the database.",
}, func() float64 {
db.mtx.RLock()
defer db.mtx.RUnlock()
if len(db.blocks) == 0 {
return float64(db.head.minTime)
}
return float64(db.blocks[0].meta.MinTime)
})
m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{ m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_tombstone_cleanup_seconds", Name: "prometheus_tsdb_tombstone_cleanup_seconds",
Help: "The time taken to recompact blocks to remove tombstones.", Help: "The time taken to recompact blocks to remove tombstones.",
@ -166,11 +192,13 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
if r != nil { if r != nil {
r.MustRegister( r.MustRegister(
m.loadedBlocks, m.loadedBlocks,
m.symbolTableSize,
m.reloads, m.reloads,
m.reloadsFailed, m.reloadsFailed,
m.cutoffs, m.cutoffs,
m.cutoffsFailed, m.cutoffsFailed,
m.compactionsTriggered, m.compactionsTriggered,
m.startTime,
m.tombCleanTimer, m.tombCleanTimer,
) )
} }
@ -192,7 +220,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err := repairBadIndexVersion(l, dir); err != nil { if err := repairBadIndexVersion(l, dir); err != nil {
return nil, err return nil, err
} }
// Migrate old WAL. // Migrate old WAL if one exists.
if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil { if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil {
return nil, errors.Wrap(err, "migrate WAL") return nil, errors.Wrap(err, "migrate WAL")
} }

View file

@ -82,8 +82,8 @@ type headMetrics struct {
seriesRemoved prometheus.Counter seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter seriesNotFound prometheus.Counter
chunks prometheus.Gauge chunks prometheus.Gauge
chunksCreated prometheus.Gauge chunksCreated prometheus.Counter
chunksRemoved prometheus.Gauge chunksRemoved prometheus.Counter
gcDuration prometheus.Summary gcDuration prometheus.Summary
minTime prometheus.GaugeFunc minTime prometheus.GaugeFunc
maxTime prometheus.GaugeFunc maxTime prometheus.GaugeFunc
@ -102,27 +102,27 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_head_series", Name: "prometheus_tsdb_head_series",
Help: "Total number of series in the head block.", Help: "Total number of series in the head block.",
}) })
m.seriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{ m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_created_total", Name: "prometheus_tsdb_head_series_created_total",
Help: "Total number of series created in the head", Help: "Total number of series created in the head",
}) })
m.seriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ m.seriesRemoved = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_removed_total", Name: "prometheus_tsdb_head_series_removed_total",
Help: "Total number of series removed in the head", Help: "Total number of series removed in the head",
}) })
m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{ m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_not_found", Name: "prometheus_tsdb_head_series_not_found_total",
Help: "Total number of requests for series that were not found.", Help: "Total number of requests for series that were not found.",
}) })
m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{ m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_chunks", Name: "prometheus_tsdb_head_chunks",
Help: "Total number of chunks in the head block.", Help: "Total number of chunks in the head block.",
}) })
m.chunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{ m.chunksCreated = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_chunks_created_total", Name: "prometheus_tsdb_head_chunks_created_total",
Help: "Total number of chunks created in the head", Help: "Total number of chunks created in the head",
}) })
m.chunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ m.chunksRemoved = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_chunks_removed_total", Name: "prometheus_tsdb_head_chunks_removed_total",
Help: "Total number of chunks removed in the head", Help: "Total number of chunks removed in the head",
}) })
@ -620,21 +620,22 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
} }
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
s := a.head.series.getByID(ref) if t < a.minValidTime {
return ErrOutOfBounds
}
s := a.head.series.getByID(ref)
if s == nil { if s == nil {
return errors.Wrap(ErrNotFound, "unknown series") return errors.Wrap(ErrNotFound, "unknown series")
} }
s.Lock() s.Lock()
err := s.appendable(t, v) if err := s.appendable(t, v); err != nil {
s.Unlock() s.Unlock()
if err != nil {
return err return err
} }
if t < a.minValidTime { s.pendingCommit = true
return ErrOutOfBounds s.Unlock()
}
if t < a.mint { if t < a.mint {
a.mint = t a.mint = t
} }
@ -694,6 +695,7 @@ func (a *headAppender) Commit() error {
for _, s := range a.samples { for _, s := range a.samples {
s.series.Lock() s.series.Lock()
ok, chunkCreated := s.series.append(s.T, s.V) ok, chunkCreated := s.series.append(s.T, s.V)
s.series.pendingCommit = false
s.series.Unlock() s.series.Unlock()
if !ok { if !ok {
@ -713,6 +715,11 @@ func (a *headAppender) Commit() error {
func (a *headAppender) Rollback() error { func (a *headAppender) Rollback() error {
a.head.metrics.activeAppenders.Dec() a.head.metrics.activeAppenders.Dec()
for _, s := range a.samples {
s.series.Lock()
s.series.pendingCommit = false
s.series.Unlock()
}
a.head.putAppendBuffer(a.samples) a.head.putAppendBuffer(a.samples)
// Series are created in the head memory regardless of rollback. Thus we have // Series are created in the head memory regardless of rollback. Thus we have
@ -1165,7 +1172,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
series.Lock() series.Lock()
rmChunks += series.truncateChunksBefore(mint) rmChunks += series.truncateChunksBefore(mint)
if len(series.chunks) > 0 { if len(series.chunks) > 0 || series.pendingCommit {
series.Unlock() series.Unlock()
continue continue
} }
@ -1256,9 +1263,10 @@ type memSeries struct {
chunkRange int64 chunkRange int64
firstChunkID int firstChunkID int
nextAt int64 // timestamp at which to cut the next chunk. nextAt int64 // Timestamp at which to cut the next chunk.
lastValue float64 lastValue float64
sampleBuf [4]sample sampleBuf [4]sample
pendingCommit bool // Whether there are samples waiting to be committed to this series.
app chunkenc.Appender // Current appender for the chunk. app chunkenc.Appender // Current appender for the chunk.
} }

View file

@ -5,12 +5,10 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"path"
"path/filepath" "path/filepath"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/fileutil"
) )
@ -20,20 +18,18 @@ import (
func repairBadIndexVersion(logger log.Logger, dir string) error { func repairBadIndexVersion(logger log.Logger, dir string) error {
// All blocks written by Prometheus 2.1 with a meta.json version of 2 are affected. // All blocks written by Prometheus 2.1 with a meta.json version of 2 are affected.
// We must actually set the index file version to 2 and revert the meta.json version back to 1. // We must actually set the index file version to 2 and revert the meta.json version back to 1.
subdirs, err := fileutil.ReadDir(dir) dirs, err := blockDirs(dir)
if err != nil { if err != nil {
return err return errors.Wrapf(err, "list block dirs in %q", dir)
} }
for _, d := range subdirs {
// Skip non-block dirs.
if _, err := ulid.Parse(d); err != nil {
continue
}
d = path.Join(dir, d)
wrapErr := func(err error, d string) error {
return errors.Wrapf(err, "block dir: %q", d)
}
for _, d := range dirs {
meta, err := readBogusMetaFile(d) meta, err := readBogusMetaFile(d)
if err != nil { if err != nil {
return err return wrapErr(err, d)
} }
if meta.Version == 1 { if meta.Version == 1 {
level.Info(logger).Log( level.Info(logger).Log(
@ -53,35 +49,35 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
repl, err := os.Create(filepath.Join(d, "index.repaired")) repl, err := os.Create(filepath.Join(d, "index.repaired"))
if err != nil { if err != nil {
return err return wrapErr(err, d)
} }
broken, err := os.Open(filepath.Join(d, "index")) broken, err := os.Open(filepath.Join(d, "index"))
if err != nil { if err != nil {
return err return wrapErr(err, d)
} }
if _, err := io.Copy(repl, broken); err != nil { if _, err := io.Copy(repl, broken); err != nil {
return err return wrapErr(err, d)
} }
// Set the 5th byte to 2 to indiciate the correct file format version. // Set the 5th byte to 2 to indiciate the correct file format version.
if _, err := repl.WriteAt([]byte{2}, 4); err != nil { if _, err := repl.WriteAt([]byte{2}, 4); err != nil {
return err return wrapErr(err, d)
} }
if err := fileutil.Fsync(repl); err != nil { if err := fileutil.Fsync(repl); err != nil {
return err return wrapErr(err, d)
} }
if err := repl.Close(); err != nil { if err := repl.Close(); err != nil {
return err return wrapErr(err, d)
} }
if err := broken.Close(); err != nil { if err := broken.Close(); err != nil {
return err return wrapErr(err, d)
} }
if err := renameFile(repl.Name(), broken.Name()); err != nil { if err := renameFile(repl.Name(), broken.Name()); err != nil {
return err return wrapErr(err, d)
} }
// Reset version of meta.json to 1. // Reset version of meta.json to 1.
meta.Version = 1 meta.Version = 1
if err := writeMetaFile(d, meta); err != nil { if err := writeMetaFile(d, meta); err != nil {
return err return wrapErr(err, d)
} }
} }
return nil return nil

View file

@ -1212,38 +1212,44 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
return nil return nil
} }
// MigrateWAL rewrites the deprecated write ahead log into the new format. func deprecatedWALExists(logger log.Logger, dir string) (bool, error) {
func MigrateWAL(logger log.Logger, dir string) (err error) {
if logger == nil {
logger = log.NewNopLogger()
}
// Detect whether we still have the old WAL. // Detect whether we still have the old WAL.
fns, err := sequenceFiles(dir) fns, err := sequenceFiles(dir)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "list sequence files") return false, errors.Wrap(err, "list sequence files")
} }
if len(fns) == 0 { if len(fns) == 0 {
return nil // No WAL at all yet. return false, nil // No WAL at all yet.
} }
// Check header of first segment to see whether we are still dealing with an // Check header of first segment to see whether we are still dealing with an
// old WAL. // old WAL.
f, err := os.Open(fns[0]) f, err := os.Open(fns[0])
if err != nil { if err != nil {
return errors.Wrap(err, "check first existing segment") return false, errors.Wrap(err, "check first existing segment")
} }
defer f.Close() defer f.Close()
var hdr [4]byte var hdr [4]byte
if _, err := f.Read(hdr[:]); err != nil && err != io.EOF { if _, err := f.Read(hdr[:]); err != nil && err != io.EOF {
return errors.Wrap(err, "read header from first segment") return false, errors.Wrap(err, "read header from first segment")
} }
// If we cannot read the magic header for segments of the old WAL, abort. // If we cannot read the magic header for segments of the old WAL, abort.
// Either it's migrated already or there's a corruption issue with which // Either it's migrated already or there's a corruption issue with which
// we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case. // we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case.
if binary.BigEndian.Uint32(hdr[:]) != WALMagic { if binary.BigEndian.Uint32(hdr[:]) != WALMagic {
return nil return false, nil
} }
return true, nil
}
// MigrateWAL rewrites the deprecated write ahead log into the new format.
func MigrateWAL(logger log.Logger, dir string) (err error) {
if logger == nil {
logger = log.NewNopLogger()
}
if exists, err := deprecatedWALExists(logger, dir); err != nil || !exists {
return err
}
level.Info(logger).Log("msg", "migrating WAL format") level.Info(logger).Log("msg", "migrating WAL format")
tmpdir := dir + ".tmp" tmpdir := dir + ".tmp"

6
vendor/vendor.json vendored
View file

@ -841,10 +841,10 @@
"revisionTime": "2016-04-11T19:08:41Z" "revisionTime": "2016-04-11T19:08:41Z"
}, },
{ {
"checksumSHA1": "vRK6HrNOeJheYudfpCIUyh42T3o=", "checksumSHA1": "E6jWeQZVEyeMLNq4ceHx9NRw4B8=",
"path": "github.com/prometheus/tsdb", "path": "github.com/prometheus/tsdb",
"revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a", "revision": "dfcb7d0d5034b97de2abdf5369f4813e7fb7c07c",
"revisionTime": "2018-08-07T11:25:08Z" "revisionTime": "2018-09-19T06:47:24Z"
}, },
{ {
"checksumSHA1": "QI0UME2olSr4kH6Z8UkpffM59Mc=", "checksumSHA1": "QI0UME2olSr4kH6Z8UkpffM59Mc=",