Merge pull request #262 from prometheus/feature/storage/fix-grouping

Spawn grouping of fingerprints with free semaphore.
This commit is contained in:
juliusv 2013-05-21 07:17:51 -07:00
commit 1958063cc7

View file

@ -30,9 +30,7 @@ import (
"time" "time"
) )
var ( const (
leveldbChunkSize = flag.Int("leveldbChunkSize", 200, "Maximum number of samples stored under one key.")
sortConcurrency = 2 sortConcurrency = 2
) )
@ -47,6 +45,8 @@ type LevelDBMetricPersistence struct {
} }
var ( var (
leveldbChunkSize = flag.Int("leveldbChunkSize", 200, "Maximum number of samples stored under one key.")
// These flag values are back of the envelope, though they seem sensible. // These flag values are back of the envelope, though they seem sensible.
// Please re-evaluate based on your own needs. // Please re-evaluate based on your own needs.
curationRemarksCacheSize = flag.Int("curationRemarksCacheSize", 50*1024*1024, "The size for the curation remarks cache (bytes).") curationRemarksCacheSize = flag.Int("curationRemarksCacheSize", 50*1024*1024, "The size for the curation remarks cache (bytes).")
@ -188,9 +188,7 @@ func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error)
// together by their respective metric fingerprint, and finally sorts // together by their respective metric fingerprint, and finally sorts
// them chronologically. // them chronologically.
func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Samples { func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Samples {
var ( fingerprintToSamples := map[model.Fingerprint]model.Samples{}
fingerprintToSamples = map[model.Fingerprint]model.Samples{}
)
for _, sample := range samples { for _, sample := range samples {
fingerprint := *model.NewFingerprintFromMetric(sample.Metric) fingerprint := *model.NewFingerprintFromMetric(sample.Metric)
@ -199,10 +197,8 @@ func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Sampl
fingerprintToSamples[fingerprint] = samples fingerprintToSamples[fingerprint] = samples
} }
var ( sortingSemaphore := make(chan bool, sortConcurrency)
sortingSemaphore = make(chan bool, sortConcurrency) doneSorting := sync.WaitGroup{}
doneSorting sync.WaitGroup
)
for i := 0; i < sortConcurrency; i++ { for i := 0; i < sortConcurrency; i++ {
sortingSemaphore <- true sortingSemaphore <- true
@ -211,8 +207,8 @@ func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Sampl
for _, samples := range fingerprintToSamples { for _, samples := range fingerprintToSamples {
doneSorting.Add(1) doneSorting.Add(1)
<-sortingSemaphore
go func(samples model.Samples) { go func(samples model.Samples) {
<-sortingSemaphore
sort.Sort(samples) sort.Sort(samples)
sortingSemaphore <- true sortingSemaphore <- true
doneSorting.Done() doneSorting.Done()
@ -538,16 +534,12 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure}) recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure})
}(time.Now()) }(time.Now())
var ( fingerprintToSamples := groupByFingerprint(samples)
fingerprintToSamples = groupByFingerprint(samples) indexErrChan := make(chan error, 1)
indexErrChan = make(chan error, 1) watermarkErrChan := make(chan error, 1)
watermarkErrChan = make(chan error, 1)
)
go func(groups map[model.Fingerprint]model.Samples) { go func(groups map[model.Fingerprint]model.Samples) {
var ( metrics := map[model.Fingerprint]model.Metric{}
metrics = map[model.Fingerprint]model.Metric{}
)
for fingerprint, samples := range groups { for fingerprint, samples := range groups {
metrics[fingerprint] = samples[0].Metric metrics[fingerprint] = samples[0].Metric