From d538b0382fe417c7d0e3fd5a3fba557ad459fb38 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Mon, 13 May 2013 10:53:24 +0200 Subject: [PATCH] Include long-tail data deletion mechanism. This commit introduces the long-tail deletion mechanism, which will automatically cull old sample values. It is an acceptable hold-over until we get a resampling pipeline implemented. Kill legacy OS X documentation, too. --- README.md | 3 - documentation/guides/getting-started-osx.md | 63 ------------------- main.go | 67 +++++++++++++++++---- storage/metric/curator.go | 10 ++- storage/metric/processor.go | 1 - 5 files changed, 64 insertions(+), 80 deletions(-) delete mode 100644 documentation/guides/getting-started-osx.md diff --git a/README.md b/README.md index 1bcafbbb7..cec2a80ad 100644 --- a/README.md +++ b/README.md @@ -91,9 +91,6 @@ Executing the following target will start up Prometheus for lazy users: ``$(ARGUMENTS)``. This is useful for quick one-off invocations and smoke testing. -### Mac OS X -We have a handy [Getting started on Mac OS X](documentation/guides/getting-started-osx.md) guide. - ### Problems If at any point you run into an error with the ``make`` build system in terms of its not properly scaffolding things on a given environment, please file a bug or diff --git a/documentation/guides/getting-started-osx.md b/documentation/guides/getting-started-osx.md deleted file mode 100644 index bce1dbd24..000000000 --- a/documentation/guides/getting-started-osx.md +++ /dev/null @@ -1,63 +0,0 @@ -# Getting started - -## Installation - -### Go - -First, create a `$HOME/mygo` directory and its src subdirectory: - -```bash -mkdir -p $HOME/mygo/src # create a place to put source code -``` - -Next, set it as the GOPATH. You should also add the bin subdirectory to your PATH environment variable so that you can run the commands therein without specifying their full path. To do this, add the following lines to `$HOME/.profile` (or equivalent): - -```bash -export GOPATH=$HOME/mygo -export PATH=$PATH:$HOME/mygo/bin -``` - -Now you can install Go: - -```bash -brew install go -``` - - -### Dependencies - -Install leveldb and protobuf dependencies: - -```bash -brew install leveldb protobuf -``` - - -### Libraries - -```bash -go get code.google.com/p/goprotobuf/{proto,protoc-gen-go} -go get github.com/jmhodges/levigo -go get code.google.com/p/gorest -go get github.com/prometheus/{prometheus,client_golang} -``` - - -## Build - -```bash -cd ${GOPATH}/src/github.com/prometheus/prometheus -make build -``` - -## Configure - -```bash -cp prometheus.conf.example prometheus.conf -``` - -## Run - -```bash -./prometheus -``` diff --git a/main.go b/main.go index 3f2d522c2..022f27dfd 100644 --- a/main.go +++ b/main.go @@ -29,6 +29,10 @@ import ( "time" ) +const ( + deletionBatchSize = 100 +) + // Commandline flags. var ( printVersion = flag.Bool("version", false, "print version information") @@ -51,13 +55,18 @@ var ( headAge = flag.Duration("compact.headAgeInclusiveness", 5*time.Minute, "The relative inclusiveness of head samples.") bodyAge = flag.Duration("compact.bodyAgeInclusiveness", time.Hour, "The relative inclusiveness of body samples.") tailAge = flag.Duration("compact.tailAgeInclusiveness", 24*time.Hour, "The relative inclusiveness of tail samples.") + + deleteInterval = flag.Duration("delete.interval", 10*11*time.Minute, "The amount of time between deletion of old values.") + + deleteAge = flag.Duration("delete.ageMaximum", 10*24*time.Hour, "The relative maximum age for values before they are deleted.") ) type prometheus struct { headCompactionTimer *time.Ticker bodyCompactionTimer *time.Ticker tailCompactionTimer *time.Ticker - compactionMutex sync.Mutex + deletionTimer *time.Ticker + curationMutex sync.Mutex curationState chan metric.CurationState stopBackgroundOperations chan bool @@ -79,8 +88,8 @@ func (p *prometheus) interruptHandler() { } func (p *prometheus) compact(olderThan time.Duration, groupSize int) error { - p.compactionMutex.Lock() - defer p.compactionMutex.Unlock() + p.curationMutex.Lock() + defer p.curationMutex.Unlock() processor := &metric.CompactionProcessor{ MaximumMutationPoolBatch: groupSize * 3, @@ -94,6 +103,21 @@ func (p *prometheus) compact(olderThan time.Duration, groupSize int) error { return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState) } +func (p *prometheus) delete(olderThan time.Duration, batchSize int) error { + p.curationMutex.Lock() + defer p.curationMutex.Unlock() + + processor := &metric.DeletionProcessor{ + MaximumMutationPoolBatch: batchSize, + } + + curator := metric.Curator{ + Stop: p.stopBackgroundOperations, + } + + return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState) +} + func (p *prometheus) close() { if p.headCompactionTimer != nil { p.headCompactionTimer.Stop() @@ -104,12 +128,15 @@ func (p *prometheus) close() { if p.tailCompactionTimer != nil { p.tailCompactionTimer.Stop() } + if p.deletionTimer != nil { + p.deletionTimer.Stop() + } if len(p.stopBackgroundOperations) == 0 { p.stopBackgroundOperations <- true } - p.compactionMutex.Lock() + p.curationMutex.Lock() p.storage.Close() close(p.stopBackgroundOperations) @@ -148,6 +175,7 @@ func main() { headCompactionTimer := time.NewTicker(*headCompactInterval) bodyCompactionTimer := time.NewTicker(*bodyCompactInterval) tailCompactionTimer := time.NewTicker(*tailCompactInterval) + deletionTimer := time.NewTicker(*deleteInterval) // Queue depth will need to be exposed targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance) @@ -177,14 +205,19 @@ func main() { } prometheus := prometheus{ - bodyCompactionTimer: bodyCompactionTimer, - curationState: curationState, - headCompactionTimer: headCompactionTimer, - ruleResults: ruleResults, - scrapeResults: scrapeResults, + bodyCompactionTimer: bodyCompactionTimer, + headCompactionTimer: headCompactionTimer, + tailCompactionTimer: tailCompactionTimer, + + deletionTimer: deletionTimer, + + curationState: curationState, + ruleResults: ruleResults, + scrapeResults: scrapeResults, + stopBackgroundOperations: make(chan bool, 1), - storage: ts, - tailCompactionTimer: tailCompactionTimer, + + storage: ts, } defer prometheus.close() @@ -227,6 +260,18 @@ func main() { } }() + go func() { + for _ = range prometheus.deletionTimer.C { + log.Println("Starting deletion of stale values...") + err := prometheus.delete(*deleteAge, deletionBatchSize) + + if err != nil { + log.Printf("could not delete due to %s", err) + } + log.Println("Done") + } + }() + // Queue depth will need to be exposed ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval(), ts) diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 9b6330915..eaeaf9dad 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -118,8 +118,11 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process curationDurations.Add(labels, duration) }(time.Now()) defer func() { - status <- CurationState{ + select { + case status <- CurationState{ Active: false, + }: + default: } }() @@ -258,11 +261,14 @@ func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) }() defer func() { - w.status <- CurationState{ + select { + case w.status <- CurationState{ Active: true, Name: w.processor.Name(), Limit: w.ignoreYoungerThan, Fingerprint: fingerprint, + }: + default: } }() diff --git a/storage/metric/processor.go b/storage/metric/processor.go index 64a18bdb8..f3fcf7b6c 100644 --- a/storage/metric/processor.go +++ b/storage/metric/processor.go @@ -203,7 +203,6 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp } pendingMutations++ - default: err = fmt.Errorf("Unhandled processing case.") }