Include long-tail data deletion mechanism.

This commit introduces the long-tail deletion mechanism, which will
automatically cull old sample values.  It is an acceptable
hold-over until we get a resampling pipeline implemented.

Kill legacy OS X documentation, too.
This commit is contained in:
Matt T. Proud 2013-05-13 10:53:24 +02:00
parent 2a047fe416
commit d538b0382f
5 changed files with 64 additions and 80 deletions

View file

@ -91,9 +91,6 @@ Executing the following target will start up Prometheus for lazy users:
``$(ARGUMENTS)``. This is useful for quick one-off invocations and smoke ``$(ARGUMENTS)``. This is useful for quick one-off invocations and smoke
testing. testing.
### Mac OS X
We have a handy [Getting started on Mac OS X](documentation/guides/getting-started-osx.md) guide.
### Problems ### Problems
If at any point you run into an error with the ``make`` build system in terms of If at any point you run into an error with the ``make`` build system in terms of
its not properly scaffolding things on a given environment, please file a bug or its not properly scaffolding things on a given environment, please file a bug or

View file

@ -1,63 +0,0 @@
# Getting started
## Installation
### Go
First, create a `$HOME/mygo` directory and its src subdirectory:
```bash
mkdir -p $HOME/mygo/src # create a place to put source code
```
Next, set it as the GOPATH. You should also add the bin subdirectory to your PATH environment variable so that you can run the commands therein without specifying their full path. To do this, add the following lines to `$HOME/.profile` (or equivalent):
```bash
export GOPATH=$HOME/mygo
export PATH=$PATH:$HOME/mygo/bin
```
Now you can install Go:
```bash
brew install go
```
### Dependencies
Install leveldb and protobuf dependencies:
```bash
brew install leveldb protobuf
```
### Libraries
```bash
go get code.google.com/p/goprotobuf/{proto,protoc-gen-go}
go get github.com/jmhodges/levigo
go get code.google.com/p/gorest
go get github.com/prometheus/{prometheus,client_golang}
```
## Build
```bash
cd ${GOPATH}/src/github.com/prometheus/prometheus
make build
```
## Configure
```bash
cp prometheus.conf.example prometheus.conf
```
## Run
```bash
./prometheus
```

67
main.go
View file

@ -29,6 +29,10 @@ import (
"time" "time"
) )
const (
deletionBatchSize = 100
)
// Commandline flags. // Commandline flags.
var ( var (
printVersion = flag.Bool("version", false, "print version information") printVersion = flag.Bool("version", false, "print version information")
@ -51,13 +55,18 @@ var (
headAge = flag.Duration("compact.headAgeInclusiveness", 5*time.Minute, "The relative inclusiveness of head samples.") headAge = flag.Duration("compact.headAgeInclusiveness", 5*time.Minute, "The relative inclusiveness of head samples.")
bodyAge = flag.Duration("compact.bodyAgeInclusiveness", time.Hour, "The relative inclusiveness of body samples.") bodyAge = flag.Duration("compact.bodyAgeInclusiveness", time.Hour, "The relative inclusiveness of body samples.")
tailAge = flag.Duration("compact.tailAgeInclusiveness", 24*time.Hour, "The relative inclusiveness of tail samples.") tailAge = flag.Duration("compact.tailAgeInclusiveness", 24*time.Hour, "The relative inclusiveness of tail samples.")
deleteInterval = flag.Duration("delete.interval", 10*11*time.Minute, "The amount of time between deletion of old values.")
deleteAge = flag.Duration("delete.ageMaximum", 10*24*time.Hour, "The relative maximum age for values before they are deleted.")
) )
type prometheus struct { type prometheus struct {
headCompactionTimer *time.Ticker headCompactionTimer *time.Ticker
bodyCompactionTimer *time.Ticker bodyCompactionTimer *time.Ticker
tailCompactionTimer *time.Ticker tailCompactionTimer *time.Ticker
compactionMutex sync.Mutex deletionTimer *time.Ticker
curationMutex sync.Mutex
curationState chan metric.CurationState curationState chan metric.CurationState
stopBackgroundOperations chan bool stopBackgroundOperations chan bool
@ -79,8 +88,8 @@ func (p *prometheus) interruptHandler() {
} }
func (p *prometheus) compact(olderThan time.Duration, groupSize int) error { func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
p.compactionMutex.Lock() p.curationMutex.Lock()
defer p.compactionMutex.Unlock() defer p.curationMutex.Unlock()
processor := &metric.CompactionProcessor{ processor := &metric.CompactionProcessor{
MaximumMutationPoolBatch: groupSize * 3, MaximumMutationPoolBatch: groupSize * 3,
@ -94,6 +103,21 @@ func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState) return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
} }
func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
p.curationMutex.Lock()
defer p.curationMutex.Unlock()
processor := &metric.DeletionProcessor{
MaximumMutationPoolBatch: batchSize,
}
curator := metric.Curator{
Stop: p.stopBackgroundOperations,
}
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
}
func (p *prometheus) close() { func (p *prometheus) close() {
if p.headCompactionTimer != nil { if p.headCompactionTimer != nil {
p.headCompactionTimer.Stop() p.headCompactionTimer.Stop()
@ -104,12 +128,15 @@ func (p *prometheus) close() {
if p.tailCompactionTimer != nil { if p.tailCompactionTimer != nil {
p.tailCompactionTimer.Stop() p.tailCompactionTimer.Stop()
} }
if p.deletionTimer != nil {
p.deletionTimer.Stop()
}
if len(p.stopBackgroundOperations) == 0 { if len(p.stopBackgroundOperations) == 0 {
p.stopBackgroundOperations <- true p.stopBackgroundOperations <- true
} }
p.compactionMutex.Lock() p.curationMutex.Lock()
p.storage.Close() p.storage.Close()
close(p.stopBackgroundOperations) close(p.stopBackgroundOperations)
@ -148,6 +175,7 @@ func main() {
headCompactionTimer := time.NewTicker(*headCompactInterval) headCompactionTimer := time.NewTicker(*headCompactInterval)
bodyCompactionTimer := time.NewTicker(*bodyCompactInterval) bodyCompactionTimer := time.NewTicker(*bodyCompactInterval)
tailCompactionTimer := time.NewTicker(*tailCompactInterval) tailCompactionTimer := time.NewTicker(*tailCompactInterval)
deletionTimer := time.NewTicker(*deleteInterval)
// Queue depth will need to be exposed // Queue depth will need to be exposed
targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance) targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance)
@ -177,14 +205,19 @@ func main() {
} }
prometheus := prometheus{ prometheus := prometheus{
bodyCompactionTimer: bodyCompactionTimer, bodyCompactionTimer: bodyCompactionTimer,
curationState: curationState, headCompactionTimer: headCompactionTimer,
headCompactionTimer: headCompactionTimer, tailCompactionTimer: tailCompactionTimer,
ruleResults: ruleResults,
scrapeResults: scrapeResults, deletionTimer: deletionTimer,
curationState: curationState,
ruleResults: ruleResults,
scrapeResults: scrapeResults,
stopBackgroundOperations: make(chan bool, 1), stopBackgroundOperations: make(chan bool, 1),
storage: ts,
tailCompactionTimer: tailCompactionTimer, storage: ts,
} }
defer prometheus.close() defer prometheus.close()
@ -227,6 +260,18 @@ func main() {
} }
}() }()
go func() {
for _ = range prometheus.deletionTimer.C {
log.Println("Starting deletion of stale values...")
err := prometheus.delete(*deleteAge, deletionBatchSize)
if err != nil {
log.Printf("could not delete due to %s", err)
}
log.Println("Done")
}
}()
// Queue depth will need to be exposed // Queue depth will need to be exposed
ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval(), ts) ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval(), ts)

View file

@ -118,8 +118,11 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process
curationDurations.Add(labels, duration) curationDurations.Add(labels, duration)
}(time.Now()) }(time.Now())
defer func() { defer func() {
status <- CurationState{ select {
case status <- CurationState{
Active: false, Active: false,
}:
default:
} }
}() }()
@ -258,11 +261,14 @@ func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult)
}() }()
defer func() { defer func() {
w.status <- CurationState{ select {
case w.status <- CurationState{
Active: true, Active: true,
Name: w.processor.Name(), Name: w.processor.Name(),
Limit: w.ignoreYoungerThan, Limit: w.ignoreYoungerThan,
Fingerprint: fingerprint, Fingerprint: fingerprint,
}:
default:
} }
}() }()

View file

@ -203,7 +203,6 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
} }
pendingMutations++ pendingMutations++
default: default:
err = fmt.Errorf("Unhandled processing case.") err = fmt.Errorf("Unhandled processing case.")
} }