Merge "Compact everything to the same sample group size."

This commit is contained in:
Julius Volz 2014-02-19 17:28:26 +01:00 committed by Gerrit Code Review
commit a8d4a7ce48

68
main.go
View file

@ -52,17 +52,9 @@ var (
diskAppendQueueCapacity = flag.Int("storage.queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.") diskAppendQueueCapacity = flag.Int("storage.queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.")
memoryAppendQueueCapacity = flag.Int("storage.queue.memoryAppendCapacity", 10000, "The size of the queue for items that are pending writing to memory.") memoryAppendQueueCapacity = flag.Int("storage.queue.memoryAppendCapacity", 10000, "The size of the queue for items that are pending writing to memory.")
headCompactInterval = flag.Duration("compact.headInterval", 3*time.Hour, "The amount of time between head compactions.") compactInterval = flag.Duration("compact.interval", 3*time.Hour, "The amount of time between compactions.")
bodyCompactInterval = flag.Duration("compact.bodyInterval", 5*time.Hour, "The amount of time between body compactions.") compactGroupSize = flag.Int("compact.groupSize", 500, "The minimum group size for compacted samples.")
tailCompactInterval = flag.Duration("compact.tailInterval", 7*time.Hour, "The amount of time between tail compactions.") compactAgeInclusiveness = flag.Duration("compact.ageInclusiveness", 5*time.Minute, "The age beyond which samples should be compacted.")
headGroupSize = flag.Int("compact.headGroupSize", 500, "The minimum group size for head samples.")
bodyGroupSize = flag.Int("compact.bodyGroupSize", 5000, "The minimum group size for body samples.")
tailGroupSize = flag.Int("compact.tailGroupSize", 10000, "The minimum group size for tail samples.")
headAge = flag.Duration("compact.headAgeInclusiveness", 5*time.Minute, "The relative inclusiveness of head samples.")
bodyAge = flag.Duration("compact.bodyAgeInclusiveness", time.Hour, "The relative inclusiveness of body samples.")
tailAge = flag.Duration("compact.tailAgeInclusiveness", 24*time.Hour, "The relative inclusiveness of tail samples.")
deleteInterval = flag.Duration("delete.interval", 11*time.Hour, "The amount of time between deletion of old values.") deleteInterval = flag.Duration("delete.interval", 11*time.Hour, "The amount of time between deletion of old values.")
@ -79,9 +71,7 @@ var (
) )
type prometheus struct { type prometheus struct {
headCompactionTimer *time.Ticker compactionTimer *time.Ticker
bodyCompactionTimer *time.Ticker
tailCompactionTimer *time.Ticker
deletionTimer *time.Ticker deletionTimer *time.Ticker
curationSema chan bool curationSema chan bool
@ -168,14 +158,8 @@ func (p *prometheus) close() {
default: default:
} }
if p.headCompactionTimer != nil { if p.compactionTimer != nil {
p.headCompactionTimer.Stop() p.compactionTimer.Stop()
}
if p.bodyCompactionTimer != nil {
p.bodyCompactionTimer.Stop()
}
if p.tailCompactionTimer != nil {
p.tailCompactionTimer.Stop()
} }
if p.deletionTimer != nil { if p.deletionTimer != nil {
p.deletionTimer.Stop() p.deletionTimer.Stop()
@ -238,10 +222,8 @@ func main() {
Ingester: retrieval.ChannelIngester(unwrittenSamples), Ingester: retrieval.ChannelIngester(unwrittenSamples),
} }
// Coprime numbers, fool!
headCompactionTimer := time.NewTicker(*headCompactInterval) compactionTimer := time.NewTicker(*compactInterval)
bodyCompactionTimer := time.NewTicker(*bodyCompactInterval)
tailCompactionTimer := time.NewTicker(*tailCompactInterval)
deletionTimer := time.NewTicker(*deleteInterval) deletionTimer := time.NewTicker(*deleteInterval)
// Queue depth will need to be exposed // Queue depth will need to be exposed
@ -304,9 +286,7 @@ func main() {
} }
prometheus := &prometheus{ prometheus := &prometheus{
bodyCompactionTimer: bodyCompactionTimer, compactionTimer: compactionTimer,
headCompactionTimer: headCompactionTimer,
tailCompactionTimer: tailCompactionTimer,
deletionTimer: deletionTimer, deletionTimer: deletionTimer,
@ -332,33 +312,9 @@ func main() {
go prometheus.interruptHandler() go prometheus.interruptHandler()
go func() { go func() {
for _ = range prometheus.headCompactionTimer.C { for _ = range prometheus.compactionTimer.C {
glog.Info("Starting head compaction...") glog.Info("Starting compaction...")
err := prometheus.compact(*headAge, *headGroupSize) err := prometheus.compact(*compactAgeInclusiveness, *compactGroupSize)
if err != nil {
glog.Error("could not compact: ", err)
}
glog.Info("Done")
}
}()
go func() {
for _ = range prometheus.bodyCompactionTimer.C {
glog.Info("Starting body compaction...")
err := prometheus.compact(*bodyAge, *bodyGroupSize)
if err != nil {
glog.Error("could not compact: ", err)
}
glog.Info("Done")
}
}()
go func() {
for _ = range prometheus.tailCompactionTimer.C {
glog.Info("Starting tail compaction...")
err := prometheus.compact(*tailAge, *tailGroupSize)
if err != nil { if err != nil {
glog.Error("could not compact: ", err) glog.Error("could not compact: ", err)