Merge pull request #396 from codesome/new-metrics

Add new metrics.
This commit is contained in:
Simon Pasquier 2018-10-01 16:00:01 +02:00 committed by GitHub
commit 043e3bb5f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 39 deletions

View file

@ -23,8 +23,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/wal" "github.com/prometheus/tsdb/wal"
@ -100,12 +98,7 @@ const checkpointPrefix = "checkpoint."
// segmented format as the original WAL itself. // segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate // This makes it easy to read it through the WAL package and concatenate
// it with the original WAL. // it with the original WAL.
// func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
// Non-critical errors are logged and not returned.
func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
if logger == nil {
logger = log.NewNopLogger()
}
stats := &CheckpointStats{} stats := &CheckpointStats{}
var sr io.Reader var sr io.Reader
@ -272,17 +265,5 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
if err := closeAll(closers...); err != nil { if err := closeAll(closers...); err != nil {
return stats, errors.Wrap(err, "close opened files") return stats, errors.Wrap(err, "close opened files")
} }
if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(logger).Log("msg", "truncating segments failed", "err", err)
}
if err := DeleteCheckpoints(w.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(logger).Log("msg", "delete old checkpoints", "err", err)
}
return stats, nil return stats, nil
} }

View file

@ -137,10 +137,12 @@ func TestCheckpoint(t *testing.T) {
} }
testutil.Ok(t, w.Close()) testutil.Ok(t, w.Close())
_, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool { _, err = Checkpoint(w, 100, 106, func(x uint64) bool {
return x%2 == 0 return x%2 == 0
}, last/2) }, last/2)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, w.Truncate(107))
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))
// Only the new checkpoint should be left. // Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir) files, err := fileutil.ReadDir(dir)

62
head.go
View file

@ -89,6 +89,12 @@ type headMetrics struct {
maxTime prometheus.GaugeFunc maxTime prometheus.GaugeFunc
samplesAppended prometheus.Counter samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary walTruncateDuration prometheus.Summary
headTruncateFail prometheus.Counter
headTruncateTotal prometheus.Counter
checkpointDeleteFail prometheus.Counter
checkpointDeleteTotal prometheus.Counter
checkpointCreationFail prometheus.Counter
checkpointCreationTotal prometheus.Counter
} }
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
@ -150,6 +156,30 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_head_samples_appended_total", Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended samples.", Help: "Total number of appended samples.",
}) })
m.headTruncateFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_failed_total",
Help: "Total number of head truncations that failed.",
})
m.headTruncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_total",
Help: "Total number of head truncations attempted.",
})
m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
Help: "Total number of checkpoint deletions that failed.",
})
m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_total",
Help: "Total number of checkpoint deletions attempted.",
})
m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_failed_total",
Help: "Total number of checkpoint creations that failed.",
})
m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_total",
Help: "Total number of checkpoint creations attempted.",
})
if r != nil { if r != nil {
r.MustRegister( r.MustRegister(
@ -166,6 +196,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.gcDuration, m.gcDuration,
m.walTruncateDuration, m.walTruncateDuration,
m.samplesAppended, m.samplesAppended,
m.headTruncateFail,
m.headTruncateTotal,
m.checkpointDeleteFail,
m.checkpointDeleteTotal,
m.checkpointCreationFail,
m.checkpointCreationTotal,
) )
} }
return m return m
@ -421,7 +457,12 @@ func (h *Head) Init() error {
} }
// Truncate removes old data before mint from the head. // Truncate removes old data before mint from the head.
func (h *Head) Truncate(mint int64) error { func (h *Head) Truncate(mint int64) (err error) {
defer func() {
if err != nil {
h.metrics.headTruncateFail.Inc()
}
}()
initialize := h.MinTime() == math.MaxInt64 initialize := h.MinTime() == math.MaxInt64
if h.MinTime() >= mint && !initialize { if h.MinTime() >= mint && !initialize {
@ -440,6 +481,7 @@ func (h *Head) Truncate(mint int64) error {
return nil return nil
} }
h.metrics.headTruncateTotal.Inc()
start := time.Now() start := time.Now()
h.gc() h.gc()
@ -469,9 +511,25 @@ func (h *Head) Truncate(mint int64) error {
keep := func(id uint64) bool { keep := func(id uint64) bool {
return h.series.getByID(id) != nil return h.series.getByID(id) != nil
} }
if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint); err != nil { h.metrics.checkpointCreationTotal.Inc()
if _, err = Checkpoint(h.wal, m, n, keep, mint); err != nil {
h.metrics.checkpointCreationFail.Inc()
return errors.Wrap(err, "create checkpoint") return errors.Wrap(err, "create checkpoint")
} }
if err := h.wal.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
}
h.metrics.checkpointDeleteTotal.Inc()
if err := DeleteCheckpoints(h.wal.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err)
h.metrics.checkpointDeleteFail.Inc()
}
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
level.Info(h.logger).Log("msg", "WAL checkpoint complete", level.Info(h.logger).Log("msg", "WAL checkpoint complete",

View file

@ -162,6 +162,8 @@ type WAL struct {
fsyncDuration prometheus.Summary fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter pageFlushes prometheus.Counter
pageCompletions prometheus.Counter pageCompletions prometheus.Counter
truncateFail prometheus.Counter
truncateTotal prometheus.Counter
} }
// New returns a new WAL over the given directory. // New returns a new WAL over the given directory.
@ -201,8 +203,16 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
Name: "prometheus_tsdb_wal_completed_pages_total", Name: "prometheus_tsdb_wal_completed_pages_total",
Help: "Total number of completed pages.", Help: "Total number of completed pages.",
}) })
w.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_truncations_failed_total",
Help: "Total number of WAL truncations that failed.",
})
w.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_truncations_total",
Help: "Total number of WAL truncations attempted.",
})
if reg != nil { if reg != nil {
reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions) reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal)
} }
_, j, err := w.Segments() _, j, err := w.Segments()
@ -527,7 +537,13 @@ func (w *WAL) Segments() (m, n int, err error) {
} }
// Truncate drops all segments before i. // Truncate drops all segments before i.
func (w *WAL) Truncate(i int) error { func (w *WAL) Truncate(i int) (err error) {
w.truncateTotal.Inc()
defer func() {
if err != nil {
w.truncateFail.Inc()
}
}()
refs, err := listSegments(w.dir) refs, err := listSegments(w.dir)
if err != nil { if err != nil {
return err return err
@ -536,7 +552,7 @@ func (w *WAL) Truncate(i int) error {
if r.n >= i { if r.n >= i {
break break
} }
if err := os.Remove(filepath.Join(w.dir, r.s)); err != nil { if err = os.Remove(filepath.Join(w.dir, r.s)); err != nil {
return err return err
} }
} }