diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index 1dd1867aaf..8e4fdc5cf7 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -43,6 +43,7 @@ const ( hasLabelName = "has_label_name" hasLabelPair = "has_label_pair" indexMetric = "index_metric" + indexSamples = "index_samples" rebuildDiskFrontier = "rebuild_disk_frontier" renderView = "render_view" setLabelNameFingerprints = "set_label_name_fingerprints" diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index d50ed6d7a3..726650e073 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -199,15 +199,10 @@ func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error) return } -func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) { - begin := time.Now() - defer func() { - duration := time.Since(begin) - - recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure}) - }() - - // Group the samples by fingerprint. +// groupByFingerprint collects all of the provided samples, groups them +// together by their respective metric fingerprint, and finally sorts +// them chronologically. +func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Samples { var ( fingerprintToSamples = map[model.Fingerprint]model.Samples{} ) @@ -219,17 +214,18 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err fingerprintToSamples[fingerprint] = samples } - // Begin the sorting of grouped samples. var ( sortingSemaphore = make(chan bool, sortConcurrency) - doneSorting = sync.WaitGroup{} + doneSorting sync.WaitGroup ) + for i := 0; i < sortConcurrency; i++ { sortingSemaphore <- true } for _, samples := range fingerprintToSamples { doneSorting.Add(1) + go func(samples model.Samples) { <-sortingSemaphore sort.Sort(samples) @@ -240,56 +236,18 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err doneSorting.Wait() - var ( - doneCommitting = sync.WaitGroup{} - ) + return fingerprintToSamples +} - go func() { - doneCommitting.Add(1) - samplesBatch := leveldb.NewBatch() - defer samplesBatch.Close() - defer doneCommitting.Done() +// indexSamples takes groups of samples, determines which ones contain metrics +// that are unknown to the storage stack, and then proceeds to update all +// affected indices. +func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]model.Samples) (err error) { + begin := time.Now() + defer func() { + duration := time.Since(begin) - for fingerprint, group := range fingerprintToSamples { - for { - lengthOfGroup := len(group) - - if lengthOfGroup == 0 { - break - } - - take := *leveldbChunkSize - if lengthOfGroup < take { - take = lengthOfGroup - } - - chunk := group[0:take] - group = group[take:lengthOfGroup] - - key := &dto.SampleKey{ - Fingerprint: fingerprint.ToDTO(), - Timestamp: indexable.EncodeTime(chunk[0].Timestamp), - LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()), - SampleCount: proto.Uint32(uint32(take)), - } - - value := &dto.SampleValueSeries{} - for _, sample := range chunk { - value.Value = append(value.Value, &dto.SampleValueSeries_Value{ - Timestamp: proto.Int64(sample.Timestamp.Unix()), - Value: proto.Float32(float32(sample.Value)), - }) - } - - samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) - } - } - - err = l.metricSamples.Commit(samplesBatch) - - if err != nil { - panic(err) - } + recordOutcome(duration, err, map[string]string{operation: indexSamples, result: success}, map[string]string{operation: indexSamples, result: failure}) }() var ( @@ -298,7 +256,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err // Determine which metrics are unknown in the database. - for fingerprint, samples := range fingerprintToSamples { + for fingerprint, samples := range groups { sample := samples[0] metricDTO := model.SampleToMetricDTO(&sample) indexHas, err := l.hasIndexMetric(metricDTO) @@ -501,7 +459,80 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err } } + return +} + +func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) { + begin := time.Now() + defer func() { + duration := time.Since(begin) + + recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure}) + }() + + var ( + fingerprintToSamples = groupByFingerprint(samples) + indexErrChan = make(chan error) + doneCommitting sync.WaitGroup + ) + + go func(groups map[model.Fingerprint]model.Samples) { + indexErrChan <- l.indexSamples(groups) + }(fingerprintToSamples) + + go func() { + doneCommitting.Add(1) + samplesBatch := leveldb.NewBatch() + defer samplesBatch.Close() + defer doneCommitting.Done() + + for fingerprint, group := range fingerprintToSamples { + for { + lengthOfGroup := len(group) + + if lengthOfGroup == 0 { + break + } + + take := *leveldbChunkSize + if lengthOfGroup < take { + take = lengthOfGroup + } + + chunk := group[0:take] + group = group[take:lengthOfGroup] + + key := &dto.SampleKey{ + Fingerprint: fingerprint.ToDTO(), + Timestamp: indexable.EncodeTime(chunk[0].Timestamp), + LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()), + SampleCount: proto.Uint32(uint32(take)), + } + + value := &dto.SampleValueSeries{} + for _, sample := range chunk { + value.Value = append(value.Value, &dto.SampleValueSeries_Value{ + Timestamp: proto.Int64(sample.Timestamp.Unix()), + Value: proto.Float32(float32(sample.Value)), + }) + } + + samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + } + } + + err = l.metricSamples.Commit(samplesBatch) + + if err != nil { + panic(err) + } + }() + doneCommitting.Wait() + err = <-indexErrChan + if err != nil { + panic(err) + } return }