From 7f0d8165748004638f0e909ce7d46fb3bdb4eb58 Mon Sep 17 00:00:00 2001 From: Matt Proud Date: Tue, 7 May 2013 17:14:04 +0200 Subject: [PATCH] Schedule the background compactors to run. This commit introduces three background compactors, which compact sparse samples together. 1. Older than five minutes is grouped together into chunks of 50 every 30 minutes. 2. Older than 60 minutes is grouped together into chunks of 250 every 50 minutes. 3. Older than one day is grouped together into chunks of 5000 every 70 minutes. --- main.go | 117 +++++++++++++++++++++++++++--- storage/metric/curator.go | 2 +- storage/metric/instrumentation.go | 2 + web/templates/status.html | 2 +- 4 files changed, 112 insertions(+), 11 deletions(-) diff --git a/main.go b/main.go index bd95b8eca6..3f2d522c21 100644 --- a/main.go +++ b/main.go @@ -25,6 +25,7 @@ import ( "log" "os" "os/signal" + "sync" "time" ) @@ -38,16 +39,35 @@ var ( concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.") diskAppendQueueCapacity = flag.Int("queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.") memoryAppendQueueCapacity = flag.Int("queue.memoryAppendCapacity", 10000, "The size of the queue for items that are pending writing to memory.") + + headCompactInterval = flag.Duration("compact.headInterval", 10*3*time.Minute, "The amount of time between head compactions.") + bodyCompactInterval = flag.Duration("compact.bodyInterval", 10*5*time.Minute, "The amount of time between body compactions.") + tailCompactInterval = flag.Duration("compact.tailInterval", 10*7*time.Minute, "The amount of time between tail compactions.") + + headGroupSize = flag.Int("compact.headGroupSize", 50, "The minimum group size for head samples.") + bodyGroupSize = flag.Int("compact.bodyGroupSize", 250, "The minimum group size for body samples.") + tailGroupSize = flag.Int("compact.tailGroupSize", 5000, "The minimum group size for tail samples.") + + headAge = flag.Duration("compact.headAgeInclusiveness", 5*time.Minute, "The relative inclusiveness of head samples.") + bodyAge = flag.Duration("compact.bodyAgeInclusiveness", time.Hour, "The relative inclusiveness of body samples.") + tailAge = flag.Duration("compact.tailAgeInclusiveness", 24*time.Hour, "The relative inclusiveness of tail samples.") ) type prometheus struct { - curationState chan metric.CurationState + headCompactionTimer *time.Ticker + bodyCompactionTimer *time.Ticker + tailCompactionTimer *time.Ticker + compactionMutex sync.Mutex + curationState chan metric.CurationState + stopBackgroundOperations chan bool + ruleResults chan *rules.Result - storage metric.TieredStorage scrapeResults chan format.Result + + storage *metric.TieredStorage } -func (p prometheus) interruptHandler() { +func (p *prometheus) interruptHandler() { notifier := make(chan os.Signal) signal.Notify(notifier, os.Interrupt) @@ -58,9 +78,42 @@ func (p prometheus) interruptHandler() { os.Exit(0) } -func (p prometheus) close() { - close(p.curationState) +func (p *prometheus) compact(olderThan time.Duration, groupSize int) error { + p.compactionMutex.Lock() + defer p.compactionMutex.Unlock() + + processor := &metric.CompactionProcessor{ + MaximumMutationPoolBatch: groupSize * 3, + MinimumGroupSize: groupSize, + } + + curator := metric.Curator{ + Stop: p.stopBackgroundOperations, + } + + return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState) +} + +func (p *prometheus) close() { + if p.headCompactionTimer != nil { + p.headCompactionTimer.Stop() + } + if p.bodyCompactionTimer != nil { + p.bodyCompactionTimer.Stop() + } + if p.tailCompactionTimer != nil { + p.tailCompactionTimer.Stop() + } + + if len(p.stopBackgroundOperations) == 0 { + p.stopBackgroundOperations <- true + } + + p.compactionMutex.Lock() + p.storage.Close() + close(p.stopBackgroundOperations) + close(p.curationState) } func main() { @@ -91,6 +144,10 @@ func main() { scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity) ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity) curationState := make(chan metric.CurationState, 1) + // Coprime numbers, fool! + headCompactionTimer := time.NewTicker(*headCompactInterval) + bodyCompactionTimer := time.NewTicker(*bodyCompactInterval) + tailCompactionTimer := time.NewTicker(*tailCompactInterval) // Queue depth will need to be exposed targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance) @@ -120,16 +177,58 @@ func main() { } prometheus := prometheus{ - curationState: curationState, - ruleResults: ruleResults, - scrapeResults: scrapeResults, - storage: *ts, + bodyCompactionTimer: bodyCompactionTimer, + curationState: curationState, + headCompactionTimer: headCompactionTimer, + ruleResults: ruleResults, + scrapeResults: scrapeResults, + stopBackgroundOperations: make(chan bool, 1), + storage: ts, + tailCompactionTimer: tailCompactionTimer, } defer prometheus.close() go ts.Serve() go prometheus.interruptHandler() + go func() { + for _ = range prometheus.headCompactionTimer.C { + log.Println("Starting head compaction...") + err := prometheus.compact(*headAge, *headGroupSize) + + if err != nil { + log.Printf("could not compact due to %s", err) + } + log.Println("Done") + } + }() + + go func() { + for _ = range prometheus.bodyCompactionTimer.C { + log.Println("Starting body compaction...") + err := prometheus.compact(*bodyAge, *bodyGroupSize) + + if err != nil { + log.Printf("could not compact due to %s", err) + } + log.Println("Done") + } + }() + + go func() { + for _ = range prometheus.tailCompactionTimer.C { + log.Println("Starting tail compaction...") + err := prometheus.compact(*tailAge, *tailGroupSize) + + if err != nil { + log.Printf("could not compact due to %s", err) + } + log.Println("Done") + } + }() + + // Queue depth will need to be exposed + ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval(), ts) err = ruleManager.AddRulesFromConfig(conf) if err != nil { diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 771dd27578..9b63309150 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -103,7 +103,7 @@ type watermarkOperator struct { // how much progress has been made. func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) { defer func(t time.Time) { - duration := float64(time.Since(t)) + duration := float64(time.Since(t) / time.Millisecond) labels := map[string]string{ cutOff: fmt.Sprint(ignoreYoungerThan), diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index 6c07f4db00..71e95238d4 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -91,4 +91,6 @@ func init() { prometheus.Register("prometheus_storage_operation_time_total_microseconds", "The total time spent performing a given storage operation.", prometheus.NilLabels, storageOperationDurations) prometheus.Register("prometheus_storage_queue_sizes_total", "The various sizes and capacities of the storage queues.", prometheus.NilLabels, queueSizes) prometheus.Register("curation_filter_operations_total", "The number of curation filter operations completed.", prometheus.NilLabels, curationFilterOperations) + prometheus.Register("curation_duration_ms_total", "The total time spent in curation (ms).", prometheus.NilLabels, curationDuration) + prometheus.Register("curation_durations_ms", "Histogram of time spent in curation (ms).", prometheus.NilLabels, curationDurations) } diff --git a/web/templates/status.html b/web/templates/status.html index 337a7f0ba7..9b08cb32ae 100644 --- a/web/templates/status.html +++ b/web/templates/status.html @@ -51,6 +51,7 @@ Active {{.Curation.Active}} + {{if .Curation.Active}} Processor Name {{.Curation.Name}} @@ -59,7 +60,6 @@ Recency Limit {{.Curation.Limit}} - {{if .Curation.Fingerprint}} Current Fingerprint {{.Curation.Fingerprint}}