Add various metrics

This commit is contained in:
Fabian Reinartz 2017-08-30 17:38:25 +02:00
parent 3901b6e70b
commit 8209e3ec23
2 changed files with 159 additions and 63 deletions

161
db.go
View file

@ -123,12 +123,23 @@ type DB struct {
}
type dbMetrics struct {
activeAppenders prometheus.Gauge
loadedBlocks prometheus.GaugeFunc
reloads prometheus.Counter
reloadsFailed prometheus.Counter
reloadDuration prometheus.Summary
samplesAppended prometheus.Counter
activeAppenders prometheus.Gauge
loadedBlocks prometheus.GaugeFunc
reloads prometheus.Counter
reloadsFailed prometheus.Counter
walTruncateDuration prometheus.Summary
samplesAppended prometheus.Counter
headSeries prometheus.Gauge
headSeriesCreated prometheus.Counter
headSeriesRemoved prometheus.Counter
headChunks prometheus.Gauge
headChunksCreated prometheus.Gauge
headChunksRemoved prometheus.Gauge
headGCDuration prometheus.Summary
headMinTime prometheus.GaugeFunc
headMaxTime prometheus.GaugeFunc
compactionsTriggered prometheus.Counter
}
@ -155,10 +166,53 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "tsdb_reloads_failures_total",
Help: "Number of times the database failed to reload black data from disk.",
})
m.reloadDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_reload_duration_seconds",
Help: "Duration of block reloads.",
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
})
m.headSeries = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series",
Help: "Total number of series in the head block.",
})
m.headSeriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series_created_total",
Help: "Total number of series created in the head",
})
m.headSeriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series_removed_total",
Help: "Total number of series removed in the head",
})
m.headChunks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks",
Help: "Total number of chunks in the head block.",
})
m.headChunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks_created_total",
Help: "Total number of chunks created in the head",
})
m.headChunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks_removed_total",
Help: "Total number of chunks removed in the head",
})
m.headGCDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_head_gc_duration_seconds",
Help: "Runtime of garbage collection in the head block.",
})
m.headMinTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_max_time",
Help: "Maximum timestamp of the head block.",
}, func() float64 {
return float64(db.head.MaxTime())
})
m.headMaxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_min_time",
Help: "Minimum time bound of the head block.",
}, func() float64 {
return float64(db.head.MinTime())
})
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_samples_appended_total",
Help: "Total number of appended sampledb.",
@ -174,7 +228,18 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m.loadedBlocks,
m.reloads,
m.reloadsFailed,
m.reloadDuration,
m.walTruncateDuration,
m.headChunks,
m.headChunksCreated,
m.headChunksRemoved,
m.headSeries,
m.headSeriesCreated,
m.headSeriesRemoved,
m.headMinTime,
m.headMaxTime,
m.headGCDuration,
m.samplesAppended,
m.compactionsTriggered,
)
@ -247,10 +312,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
db.compactor = NewLeveledCompactor(r, l, copts)
db.head, err = NewHead(l, db.wal.Reader(), copts.blockRanges[0])
db.head, err = NewHead(l, copts.blockRanges[0])
if err != nil {
return nil, err
}
if err := db.readWAL(db.wal.Reader()); err != nil {
return nil, err
}
if err := db.reloadBlocks(); err != nil {
return nil, err
}
@ -444,14 +512,56 @@ func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) {
return nil, false
}
func (db *DB) readWAL(r WALReader) error {
seriesFunc := func(series []labels.Labels) error {
for _, lset := range series {
db.head.create(lset.Hash(), lset)
db.metrics.headSeries.Inc()
db.metrics.headSeriesCreated.Inc()
}
return nil
}
samplesFunc := func(samples []RefSample) error {
for _, s := range samples {
ms, ok := db.head.series[uint32(s.Ref)]
if !ok {
return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref)
}
_, chunkCreated := ms.append(s.T, s.V)
if chunkCreated {
db.metrics.headChunksCreated.Inc()
db.metrics.headChunks.Inc()
}
}
return nil
}
deletesFunc := func(stones []Stone) error {
for _, s := range stones {
for _, itv := range s.intervals {
db.head.tombstones.add(s.ref, itv)
}
}
return nil
}
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
return errors.Wrap(err, "consume WAL")
}
return nil
}
func (db *DB) reloadBlocks() (err error) {
defer func(t time.Time) {
defer func() {
if err != nil {
db.metrics.reloadsFailed.Inc()
}
db.metrics.reloads.Inc()
db.metrics.reloadDuration.Observe(time.Since(t).Seconds())
}(time.Now())
}()
var cs []io.Closer
defer func() { closeAll(cs...) }()
@ -511,12 +621,21 @@ func (db *DB) reloadBlocks() (err error) {
}
start := time.Now()
atomic.StoreInt64(&db.head.minTime, maxt)
db.head.gc()
series, chunks := db.head.gc()
db.metrics.headSeriesRemoved.Add(float64(series))
db.metrics.headSeries.Sub(float64(series))
db.metrics.headChunksRemoved.Add(float64(chunks))
db.metrics.headChunks.Sub(float64(chunks))
db.logger.Log("msg", "head GC completed", "duration", time.Since(start))
start = time.Now()
if err := db.wal.Truncate(maxt); err != nil {
return errors.Wrapf(err, "truncate WAL at %d", maxt)
}
db.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
return nil
}
@ -852,6 +971,9 @@ func (a *dbAppender) createSeries() error {
s := a.head.create(l.hash, l.labels)
l.ref = uint64(s.ref)
a.db.metrics.headSeriesCreated.Inc()
a.db.metrics.headSeries.Inc()
}
// Write all new series to the WAL.
@ -893,11 +1015,18 @@ func (a *dbAppender) Commit() error {
if !ok {
return errors.Errorf("series with ID %d not found", s.Ref)
}
if !series.append(s.T, s.V) {
ok, chunkCreated := series.append(s.T, s.V)
if !ok {
total--
}
if chunkCreated {
a.db.metrics.headChunks.Inc()
a.db.metrics.headChunksCreated.Inc()
}
}
a.db.metrics.samplesAppended.Add(float64(total))
for {
ht := a.head.MaxTime()
if a.highTimestamp <= ht {

61
head.go
View file

@ -66,7 +66,7 @@ type Head struct {
}
// NewHead opens the head block in dir.
func NewHead(l log.Logger, wal WALReader, chunkRange int64) (*Head, error) {
func NewHead(l log.Logger, chunkRange int64) (*Head, error) {
h := &Head{
chunkRange: chunkRange,
minTime: math.MaxInt64,
@ -78,54 +78,15 @@ func NewHead(l log.Logger, wal WALReader, chunkRange int64) (*Head, error) {
postings: &memPostings{m: make(map[term][]uint32)},
tombstones: newEmptyTombstoneReader(),
}
if wal == nil {
wal = NopWAL{}
}
return h, h.init(wal)
return h, nil
}
func (h *Head) String() string {
return "<head>"
}
func (h *Head) init(r WALReader) error {
seriesFunc := func(series []labels.Labels) error {
for _, lset := range series {
h.create(lset.Hash(), lset)
}
return nil
}
samplesFunc := func(samples []RefSample) error {
for _, s := range samples {
if int(s.Ref) >= len(h.series) {
return errors.Errorf("unknown series reference %d (max %d); abort WAL restore",
s.Ref, len(h.series))
}
h.series[uint32(s.Ref)].append(s.T, s.V)
}
return nil
}
deletesFunc := func(stones []Stone) error {
for _, s := range stones {
for _, itv := range s.intervals {
h.tombstones.add(s.ref, itv)
}
}
return nil
}
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
return errors.Wrap(err, "consume WAL")
}
return nil
}
// gc removes data before the minimum timestmap from the head.
func (h *Head) gc() {
func (h *Head) gc() (seriesRemoved, chunksRemoved int) {
// Only data strictly lower than this timestamp must be deleted.
mint := h.MinTime()
@ -136,7 +97,7 @@ func (h *Head) gc() {
for hash, ss := range h.hashes {
for _, s := range ss {
s.mtx.Lock()
s.truncateChunksBefore(mint)
chunksRemoved += s.truncateChunksBefore(mint)
if len(s.chunks) == 0 {
deletedHashes[hash] = append(deletedHashes[hash], s.ref)
@ -186,6 +147,7 @@ func (h *Head) gc() {
h.hashes[hash] = rem
} else {
delete(h.hashes, hash)
seriesRemoved++
}
}
@ -222,6 +184,8 @@ func (h *Head) gc() {
h.symbols = symbols
h.values = values
return seriesRemoved, chunksRemoved
}
func (h *Head) Tombstones() TombstoneReader {
@ -574,7 +538,7 @@ func (s *memSeries) chunkID(pos int) int {
// truncateChunksBefore removes all chunks from the series that have not timestamp
// at or after mint. Chunk IDs remain unchanged.
func (s *memSeries) truncateChunksBefore(mint int64) {
func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
var k int
for i, c := range s.chunks {
if c.maxTime >= mint {
@ -584,10 +548,12 @@ func (s *memSeries) truncateChunksBefore(mint int64) {
}
s.chunks = append(s.chunks[:0], s.chunks[k:]...)
s.firstChunkID += k
return k
}
// append adds the sample (t, v) to the series.
func (s *memSeries) append(t int64, v float64) bool {
func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
const samplesPerChunk = 120
s.mtx.Lock()
@ -597,10 +563,11 @@ func (s *memSeries) append(t int64, v float64) bool {
if len(s.chunks) == 0 {
c = s.cut(t)
chunkCreated = true
}
c = s.head()
if c.maxTime >= t {
return false
return false, chunkCreated
}
if c.samples > samplesPerChunk/4 && t >= s.nextAt {
c = s.cut(t)
@ -622,7 +589,7 @@ func (s *memSeries) append(t int64, v float64) bool {
s.sampleBuf[2] = s.sampleBuf[3]
s.sampleBuf[3] = sample{t: t, v: v}
return true
return true, chunkCreated
}
// computeChunkEndTime estimates the end timestamp based the beginning of a chunk,