From 612f9cb3614c2ea8c34977a8e6a705cd099ed965 Mon Sep 17 00:00:00 2001 From: johncming Date: Thu, 19 Sep 2019 19:24:34 +0800 Subject: [PATCH] tsdb/wal: pull out wal metrics separately as tsdb.DB (#5957) Signed-off-by: johncming --- tsdb/wal/wal.go | 93 +++++++++++++++++++++++++------------------- tsdb/wal/wal_test.go | 4 +- 2 files changed, 56 insertions(+), 41 deletions(-) diff --git a/tsdb/wal/wal.go b/tsdb/wal/wal.go index 6d64bb37ef..16f2e8794e 100644 --- a/tsdb/wal/wal.go +++ b/tsdb/wal/wal.go @@ -177,6 +177,10 @@ type WAL struct { compress bool snappyBuf []byte + metrics *walMetrics +} + +type walMetrics struct { fsyncDuration prometheus.Summary pageFlushes prometheus.Counter pageCompletions prometheus.Counter @@ -185,6 +189,49 @@ type WAL struct { currentSegment prometheus.Gauge } +func newWALMetrics(w *WAL, r prometheus.Registerer) *walMetrics { + m := &walMetrics{} + + m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "prometheus_tsdb_wal_fsync_duration_seconds", + Help: "Duration of WAL fsync.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }) + m.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_page_flushes_total", + Help: "Total number of page flushes.", + }) + m.pageCompletions = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_completed_pages_total", + Help: "Total number of completed pages.", + }) + m.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_truncations_failed_total", + Help: "Total number of WAL truncations that failed.", + }) + m.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_truncations_total", + Help: "Total number of WAL truncations attempted.", + }) + m.currentSegment = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_wal_segment_current", + Help: "WAL segment index that TSDB is currently writing to.", + }) + + if r != nil { + r.MustRegister( + m.fsyncDuration, + m.pageFlushes, + m.pageCompletions, + m.truncateFail, + m.truncateTotal, + m.currentSegment, + ) + } + + return m +} + // New returns a new WAL over the given directory. func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WAL, error) { return NewSize(logger, reg, dir, DefaultSegmentSize, compress) @@ -211,7 +258,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi stopc: make(chan chan struct{}), compress: compress, } - registerMetrics(reg, w) + w.metrics = newWALMetrics(w, reg) _, last, err := w.Segments() if err != nil { @@ -249,41 +296,9 @@ func Open(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error logger: logger, } - registerMetrics(reg, w) return w, nil } -func registerMetrics(reg prometheus.Registerer, w *WAL) { - w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_wal_fsync_duration_seconds", - Help: "Duration of WAL fsync.", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }) - w.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_page_flushes_total", - Help: "Total number of page flushes.", - }) - w.pageCompletions = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_completed_pages_total", - Help: "Total number of completed pages.", - }) - w.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_truncations_failed_total", - Help: "Total number of WAL truncations that failed.", - }) - w.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_truncations_total", - Help: "Total number of WAL truncations attempted.", - }) - w.currentSegment = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_wal_segment_current", - Help: "WAL segment index that TSDB is currently writing to.", - }) - if reg != nil { - reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal, w.currentSegment) - } -} - // CompressionEnabled returns if compression is enabled on this WAL. func (w *WAL) CompressionEnabled() bool { return w.compress @@ -476,7 +491,7 @@ func (w *WAL) setSegment(segment *Segment) error { return err } w.donePages = int(stat.Size() / pageSize) - w.currentSegment.Set(float64(segment.Index())) + w.metrics.currentSegment.Set(float64(segment.Index())) return nil } @@ -484,7 +499,7 @@ func (w *WAL) setSegment(segment *Segment) error { // the page, the remaining bytes will be set to zero and a new page will be started. // If clear is true, this is enforced regardless of how many bytes are left in the page. func (w *WAL) flushPage(clear bool) error { - w.pageFlushes.Inc() + w.metrics.pageFlushes.Inc() p := w.page clear = clear || p.full() @@ -504,7 +519,7 @@ func (w *WAL) flushPage(clear bool) error { if clear { p.reset() w.donePages++ - w.pageCompletions.Inc() + w.metrics.pageCompletions.Inc() } return nil } @@ -671,10 +686,10 @@ func (w *WAL) Segments() (first, last int, err error) { // Truncate drops all segments before i. func (w *WAL) Truncate(i int) (err error) { - w.truncateTotal.Inc() + w.metrics.truncateTotal.Inc() defer func() { if err != nil { - w.truncateFail.Inc() + w.metrics.truncateFail.Inc() } }() refs, err := listSegments(w.dir) @@ -695,7 +710,7 @@ func (w *WAL) Truncate(i int) (err error) { func (w *WAL) fsync(f *Segment) error { start := time.Now() err := f.File.Sync() - w.fsyncDuration.Observe(time.Since(start).Seconds()) + w.metrics.fsyncDuration.Observe(time.Since(start).Seconds()) return err } diff --git a/tsdb/wal/wal_test.go b/tsdb/wal/wal_test.go index 81a845fd4a..b1895407f9 100644 --- a/tsdb/wal/wal_test.go +++ b/tsdb/wal/wal_test.go @@ -361,7 +361,7 @@ func TestSegmentMetric(t *testing.T) { w, err := NewSize(nil, nil, dir, segmentSize, false) testutil.Ok(t, err) - initialSegment := client_testutil.ToFloat64(w.currentSegment) + initialSegment := client_testutil.ToFloat64(w.metrics.currentSegment) // Write 3 records, each of which is half the segment size, meaning we should rotate to the next segment. for i := 0; i < 3; i++ { @@ -372,7 +372,7 @@ func TestSegmentMetric(t *testing.T) { err = w.Log(buf) testutil.Ok(t, err) } - testutil.Assert(t, client_testutil.ToFloat64(w.currentSegment) == initialSegment+1, "segment metric did not increment after segment rotation") + testutil.Assert(t, client_testutil.ToFloat64(w.metrics.currentSegment) == initialSegment+1, "segment metric did not increment after segment rotation") testutil.Ok(t, w.Close()) }