Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2018-09-25 19:18:33 +05:30
parent 632dfb349e
commit 61b000ee0e
No known key found for this signature in database
GPG key ID: 0241A11211763456
4 changed files with 91 additions and 50 deletions

View file

@ -23,10 +23,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/wal" "github.com/prometheus/tsdb/wal"
) )
@ -101,12 +98,7 @@ const checkpointPrefix = "checkpoint."
// segmented format as the original WAL itself. // segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate // This makes it easy to read it through the WAL package and concatenate
// it with the original WAL. // it with the original WAL.
// func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
// Non-critical errors are logged and not returned.
func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64, checkpointDeleteFail prometheus.Counter) (*CheckpointStats, error) {
if logger == nil {
logger = log.NewNopLogger()
}
stats := &CheckpointStats{} stats := &CheckpointStats{}
var sr io.Reader var sr io.Reader
@ -273,18 +265,5 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
if err := closeAll(closers...); err != nil { if err := closeAll(closers...); err != nil {
return stats, errors.Wrap(err, "close opened files") return stats, errors.Wrap(err, "close opened files")
} }
if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(logger).Log("msg", "truncating segments failed", "err", err)
}
if err := DeleteCheckpoints(w.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(logger).Log("msg", "delete old checkpoints", "err", err)
checkpointDeleteFail.Add(float64(1))
}
return stats, nil return stats, nil
} }

View file

@ -20,7 +20,6 @@ import (
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
@ -138,10 +137,12 @@ func TestCheckpoint(t *testing.T) {
} }
testutil.Ok(t, w.Close()) testutil.Ok(t, w.Close())
_, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool { _, err = Checkpoint(w, 100, 106, func(x uint64) bool {
return x%2 == 0 return x%2 == 0
}, last/2, prometheus.NewCounter(prometheus.CounterOpts{})) }, last/2)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, w.Truncate(107))
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))
// Only the new checkpoint should be left. // Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir) files, err := fileutil.ReadDir(dir)

88
head.go
View file

@ -76,20 +76,25 @@ type Head struct {
} }
type headMetrics struct { type headMetrics struct {
activeAppenders prometheus.Gauge activeAppenders prometheus.Gauge
series prometheus.Gauge series prometheus.Gauge
seriesCreated prometheus.Counter seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter seriesNotFound prometheus.Counter
chunks prometheus.Gauge chunks prometheus.Gauge
chunksCreated prometheus.Counter chunksCreated prometheus.Counter
chunksRemoved prometheus.Counter chunksRemoved prometheus.Counter
gcDuration prometheus.Summary gcDuration prometheus.Summary
minTime prometheus.GaugeFunc minTime prometheus.GaugeFunc
maxTime prometheus.GaugeFunc maxTime prometheus.GaugeFunc
samplesAppended prometheus.Counter samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary walTruncateDuration prometheus.Summary
checkpointDeleteFail prometheus.Counter headTruncateFail prometheus.Counter
headTruncateTotal prometheus.Counter
checkpointDeleteFail prometheus.Counter
checkpointDeleteTotal prometheus.Counter
checkpointCreationFail prometheus.Counter
checkpointCreationTotal prometheus.Counter
} }
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
@ -151,9 +156,29 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_head_samples_appended_total", Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended samples.", Help: "Total number of appended samples.",
}) })
m.headTruncateFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_failed_total",
Help: "Total number of head truncations that failed.",
})
m.headTruncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_total",
Help: "Total number of head truncations attempted.",
})
m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{ m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_delete_fail", Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
Help: "Number of times deletion of old checkpoint failed.", Help: "Total number of checkpoint deletions that failed.",
})
m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_total",
Help: "Total number of checkpoint deletions attempted.",
})
m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_failed_total",
Help: "Total number of checkpoint creations that failed.",
})
m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_total",
Help: "Total number of checkpoint creations attempted.",
}) })
if r != nil { if r != nil {
@ -171,7 +196,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.gcDuration, m.gcDuration,
m.walTruncateDuration, m.walTruncateDuration,
m.samplesAppended, m.samplesAppended,
m.headTruncateFail,
m.headTruncateTotal,
m.checkpointDeleteFail, m.checkpointDeleteFail,
m.checkpointDeleteTotal,
m.checkpointCreationFail,
m.checkpointCreationTotal,
) )
} }
return m return m
@ -427,7 +457,12 @@ func (h *Head) Init() error {
} }
// Truncate removes old data before mint from the head. // Truncate removes old data before mint from the head.
func (h *Head) Truncate(mint int64) error { func (h *Head) Truncate(mint int64) (err error) {
defer func() {
if err != nil {
h.metrics.headTruncateFail.Inc()
}
}()
initialize := h.MinTime() == math.MaxInt64 initialize := h.MinTime() == math.MaxInt64
if h.MinTime() >= mint && !initialize { if h.MinTime() >= mint && !initialize {
@ -446,6 +481,7 @@ func (h *Head) Truncate(mint int64) error {
return nil return nil
} }
h.metrics.headTruncateTotal.Inc()
start := time.Now() start := time.Now()
h.gc() h.gc()
@ -475,9 +511,25 @@ func (h *Head) Truncate(mint int64) error {
keep := func(id uint64) bool { keep := func(id uint64) bool {
return h.series.getByID(id) != nil return h.series.getByID(id) != nil
} }
if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint, h.metrics.checkpointDeleteFail); err != nil { h.metrics.checkpointCreationTotal.Inc()
if _, err = Checkpoint(h.wal, m, n, keep, mint); err != nil {
h.metrics.checkpointCreationFail.Inc()
return errors.Wrap(err, "create checkpoint") return errors.Wrap(err, "create checkpoint")
} }
if err := h.wal.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
}
h.metrics.checkpointDeleteTotal.Inc()
if err := DeleteCheckpoints(h.wal.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err)
h.metrics.checkpointDeleteFail.Inc()
}
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
level.Info(h.logger).Log("msg", "WAL checkpoint complete", level.Info(h.logger).Log("msg", "WAL checkpoint complete",

View file

@ -163,6 +163,7 @@ type WAL struct {
pageFlushes prometheus.Counter pageFlushes prometheus.Counter
pageCompletions prometheus.Counter pageCompletions prometheus.Counter
truncateFail prometheus.Counter truncateFail prometheus.Counter
truncateTotal prometheus.Counter
} }
// New returns a new WAL over the given directory. // New returns a new WAL over the given directory.
@ -203,11 +204,15 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
Help: "Total number of completed pages.", Help: "Total number of completed pages.",
}) })
w.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{ w.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_truncate_fail", Name: "prometheus_tsdb_wal_truncations_failed_total",
Help: "Number of times WAL truncation failed.", Help: "Total number of WAL truncations that failed.",
})
w.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_truncations_total",
Help: "Total number of WAL truncations attempted.",
}) })
if reg != nil { if reg != nil {
reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail) reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal)
} }
_, j, err := w.Segments() _, j, err := w.Segments()
@ -532,18 +537,22 @@ func (w *WAL) Segments() (m, n int, err error) {
} }
// Truncate drops all segments before i. // Truncate drops all segments before i.
func (w *WAL) Truncate(i int) error { func (w *WAL) Truncate(i int) (err error) {
w.truncateTotal.Inc()
defer func() {
if err != nil {
w.truncateFail.Inc()
}
}()
refs, err := listSegments(w.dir) refs, err := listSegments(w.dir)
if err != nil { if err != nil {
w.truncateFail.Add(float64(1))
return err return err
} }
for _, r := range refs { for _, r := range refs {
if r.n >= i { if r.n >= i {
break break
} }
if err := os.Remove(filepath.Join(w.dir, r.s)); err != nil { if err = os.Remove(filepath.Join(w.dir, r.s)); err != nil {
w.truncateFail.Add(float64(1))
return err return err
} }
} }