From 7280533c42e96feb17d5be5d32e5611e5c857039 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 31 Dec 2016 09:48:49 +0100 Subject: [PATCH] Add basic shard metrics --- db.go | 58 +++++++++++++++++++++++++++++++++++++++++++++++------ head.go | 4 +++- wal.go | 7 ------- wal_test.go | 19 +++++++++--------- 4 files changed, 64 insertions(+), 24 deletions(-) diff --git a/db.go b/db.go index 24d2cd5984..2b4923984b 100644 --- a/db.go +++ b/db.go @@ -17,6 +17,7 @@ import ( "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" ) // DefaultOptions used for the DB. They are sane for setups using @@ -43,7 +44,7 @@ type DB struct { // TODO(fabxc): make configurable const ( - shardShift = 0 + shardShift = 2 numShards = 1 << shardShift maxChunkSize = 1024 ) @@ -74,7 +75,7 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { l := log.NewContext(l).With("shard", i) d := shardDir(path, i) - s, err := OpenShard(d, l) + s, err := OpenShard(d, i, l) if err != nil { return nil, fmt.Errorf("initializing shard %q failed: %s", d, err) } @@ -181,14 +182,55 @@ type Shard struct { path string persistCh chan struct{} logger log.Logger + metrics *shardMetrics mtx sync.RWMutex persisted persistedBlocks head *HeadBlock } +type shardMetrics struct { + persistences prometheus.Counter + persistenceDuration prometheus.Histogram + samplesAppended prometheus.Counter +} + +func newShardMetrics(r prometheus.Registerer, i int) *shardMetrics { + shardLabel := prometheus.Labels{ + "shard": fmt.Sprintf("%d", i), + } + + m := &shardMetrics{ + persistences: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tsdb_shard_persistences_total", + Help: "Total number of head persistances that ran so far.", + ConstLabels: shardLabel, + }), + persistenceDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "tsdb_shard_persistence_duration_seconds", + Help: "Duration of persistences in seconds.", + ConstLabels: shardLabel, + Buckets: prometheus.ExponentialBuckets(0.25, 2, 5), + }), + samplesAppended: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tsdb_shard_samples_appended_total", + Help: "Total number of appended samples for the shard.", + ConstLabels: shardLabel, + }), + } + + if r != nil { + r.MustRegister( + m.persistences, + m.persistenceDuration, + m.samplesAppended, + ) + } + return m +} + // OpenShard returns a new Shard. -func OpenShard(path string, logger log.Logger) (*Shard, error) { +func OpenShard(path string, i int, logger log.Logger) (*Shard, error) { // Create directory if shard is new. if _, err := os.Stat(path); os.IsNotExist(err) { if err := os.MkdirAll(path, 0777); err != nil { @@ -219,9 +261,9 @@ func OpenShard(path string, logger log.Logger) (*Shard, error) { path: path, persistCh: make(chan struct{}, 1), logger: logger, + metrics: newShardMetrics(prometheus.DefaultRegisterer, i), head: head, persisted: pbs, - // TODO(fabxc): restore from checkpoint. } return s, nil } @@ -248,16 +290,20 @@ func (s *Shard) appendBatch(samples []hashedSample) error { // TODO(fabxc): distinguish samples between concurrent heads for // different time blocks. Those may occurr during transition to still // allow late samples to arrive for a previous block. - err := s.head.appendBatch(samples) + err := s.head.appendBatch(samples, s.metrics.samplesAppended) // TODO(fabxc): randomize over time and use better scoring function. - if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 24000 { + if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 400 { select { case s.persistCh <- struct{}{}: go func() { + start := time.Now() + defer func() { s.metrics.persistenceDuration.Observe(time.Since(start).Seconds()) }() + if err := s.persist(); err != nil { s.logger.Log("msg", "persistance error", "err", err) } + s.metrics.persistences.Inc() }() default: } diff --git a/head.go b/head.go index eb5ecdae9f..424446599b 100644 --- a/head.go +++ b/head.go @@ -8,6 +8,7 @@ import ( "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" + "github.com/prometheus/client_golang/prometheus" ) // HeadBlock handles reads and writes of time series data within a time window. @@ -182,7 +183,7 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { return cd } -func (h *HeadBlock) appendBatch(samples []hashedSample) error { +func (h *HeadBlock) appendBatch(samples []hashedSample, appended prometheus.Counter) error { // Find head chunks for all samples and allocate new IDs/refs for // ones we haven't seen before. var ( @@ -233,6 +234,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { continue } + appended.Inc() h.stats.SampleCount++ if s.t > h.stats.MaxTime { diff --git a/wal.go b/wal.go index dd06f8766b..18f66edb95 100644 --- a/wal.go +++ b/wal.go @@ -188,14 +188,7 @@ func (e *walEncoder) encodeSeries(series []labels.Labels) error { e.buf = append(e.buf, b[:n]...) for _, l := range lset { - // func() { - // defer func() { - // if recover() != nil { - // fmt.Println(l) - // } - // }() n = binary.PutUvarint(b, uint64(len(l.Name))) - // }() e.buf = append(e.buf, b[:n]...) e.buf = append(e.buf, l.Name...) diff --git a/wal_test.go b/wal_test.go index 0131d0230b..1cc91e8f06 100644 --- a/wal_test.go +++ b/wal_test.go @@ -1,7 +1,6 @@ package tsdb import ( - "fmt" "io" "io/ioutil" "os" @@ -28,7 +27,7 @@ func BenchmarkWALWrite(b *testing.B) { f, err := os.Open("cmd/tsdb/testdata.1m") require.NoError(b, err) - series, err := readPrometheusLabels(f, b.N) + series, err := readPrometheusLabels(f, b.N/300) require.NoError(b, err) var ( @@ -70,7 +69,7 @@ func BenchmarkWALRead(b *testing.B) { require.NoError(b, err) b.Run("test", func(b *testing.B) { - bseries := series[:b.N] + bseries := series[:b.N/300] d, err := ioutil.TempDir("", "wal_read_test") require.NoError(b, err) @@ -123,8 +122,8 @@ func BenchmarkWALRead(b *testing.B) { }) require.NoError(b, err) - stat, _ := wal.f.Stat() - fmt.Println("read series", numSeries, "read samples", numSamples, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024)) + // stat, _ := wal.f.Stat() + // fmt.Println("read series", numSeries, "read samples", numSamples, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024)) }) } @@ -136,7 +135,7 @@ func BenchmarkWALReadIntoHead(b *testing.B) { require.NoError(b, err) b.Run("test", func(b *testing.B) { - bseries := series[:b.N] + bseries := series[:b.N/300] d, err := ioutil.TempDir("", "wal_read_test") require.NoError(b, err) @@ -178,12 +177,12 @@ func BenchmarkWALReadIntoHead(b *testing.B) { b.ResetTimer() - head, err := OpenHeadBlock(d, 0) + _, err = OpenHeadBlock(d, 0) require.NoError(b, err) - stat, _ := head.wal.f.Stat() - fmt.Println("head block initialized from WAL") - fmt.Println("read series", head.stats.SeriesCount, "read samples", head.stats.SampleCount, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024)) + // stat, _ := head.wal.f.Stat() + // fmt.Println("head block initialized from WAL") + // fmt.Println("read series", head.stats.SeriesCount, "read samples", head.stats.SampleCount, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024)) }) }