From a746fbb8bc2a86947b782ab95cec235a55f4c121 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Wed, 24 Sep 2014 16:51:18 +0200 Subject: [PATCH] Instrument indexing: queue length, batch sizes and latencies. Change-Id: I60bcbd24b160e47d418a485d8cffa39344a257c6 --- main.go | 1 + storage/local/persistence.go | 65 +++++++++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 5523bfc39f..c5d4f1118b 100644 --- a/main.go +++ b/main.go @@ -141,6 +141,7 @@ func main() { if err != nil { glog.Fatal("Error opening disk persistence: ", err) } + registry.MustRegister(persistence) o := &local.MemorySeriesStorageOptions{ Persistence: persistence, diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 1543b56685..0c6bb19e37 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -23,6 +23,7 @@ import ( "time" "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" clientmodel "github.com/prometheus/client_golang/model" @@ -32,6 +33,9 @@ import ( ) const ( + namespace = "prometheus" + subsystem = "persistence" + seriesFileName = "series.db" seriesTempFileName = "series.db.tmp" @@ -82,10 +86,15 @@ type diskPersistence struct { indexingQueue chan indexingOp indexingStopped chan struct{} indexingFlush chan chan int + + indexingQueueLength prometheus.Gauge + indexingQueueCapacity prometheus.Metric + indexingBatchSizes prometheus.Summary + indexingBatchLatency prometheus.Summary } // NewDiskPersistence returns a newly allocated Persistence backed by local disk storage, ready to use. -func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) { +func NewDiskPersistence(basePath string, chunkLen int) (*diskPersistence, error) { if err := os.MkdirAll(basePath, 0700); err != nil { return nil, err } @@ -119,11 +128,61 @@ func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) { indexingQueue: make(chan indexingOp, indexingQueueCapacity), indexingStopped: make(chan struct{}), indexingFlush: make(chan chan int), + + indexingQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "indexing_queue_length", + Help: "The number of metrics waiting to be indexed.", + }), + indexingQueueCapacity: prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "indexing_queue_capacity"), + "The capacity of the indexing queue.", + nil, nil, + ), + prometheus.GaugeValue, + float64(indexingQueueCapacity), + ), + indexingBatchSizes: prometheus.NewSummary( + prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "indexing_batch_sizes", + Help: "Quantiles for indexing batch sizes (number of metrics per batch).", + }, + ), + indexingBatchLatency: prometheus.NewSummary( + prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "indexing_batch_latency_milliseconds", + Help: "Quantiles for batch indexing latencies in milliseconds.", + }, + ), } go p.processIndexingQueue() return p, nil } +// Describe implements prometheus.Collector. +func (p *diskPersistence) Describe(ch chan<- *prometheus.Desc) { + ch <- p.indexingQueueLength.Desc() + ch <- p.indexingQueueCapacity.Desc() + p.indexingBatchSizes.Describe(ch) + p.indexingBatchLatency.Describe(ch) +} + +// Collect implements prometheus.Collector. +func (p *diskPersistence) Collect(ch chan<- prometheus.Metric) { + p.indexingQueueLength.Set(float64(len(p.indexingQueue))) + + ch <- p.indexingQueueLength + ch <- p.indexingQueueCapacity + p.indexingBatchSizes.Collect(ch) + p.indexingBatchLatency.Collect(ch) +} + // GetFingerprintsForLabelPair implements persistence. func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) { fps, _, err := p.labelPairToFingerprints.Lookup(lp) @@ -651,6 +710,10 @@ func (p *diskPersistence) processIndexingQueue() { defer batchTimeout.Stop() commitBatch := func() { + begin := time.Now() + defer p.indexingBatchLatency.Observe(float64(time.Since(begin) / time.Millisecond)) + p.indexingBatchSizes.Observe(float64(batchSize)) + if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil { glog.Error("Error indexing label pair to fingerprints batch: ", err) }