Extract indexing of Fingerprint to Metrics.

This commit is contained in:
Matt T. Proud 2013-03-14 17:38:34 -07:00
parent 532589f728
commit 187cd4cdbc
2 changed files with 50 additions and 22 deletions

View file

@ -43,6 +43,7 @@ const (
hasIndexMetric = "has_index_metric" hasIndexMetric = "has_index_metric"
hasLabelName = "has_label_name" hasLabelName = "has_label_name"
hasLabelPair = "has_label_pair" hasLabelPair = "has_label_pair"
indexFingerprints = "index_fingerprints"
indexLabelNames = "index_label_names" indexLabelNames = "index_label_names"
indexLabelPairs = "index_label_pairs" indexLabelPairs = "index_label_pairs"
indexMetric = "index_metric" indexMetric = "index_metric"

View file

@ -272,6 +272,8 @@ func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fin
// indexLabelNames accumulates all label name to fingerprint index entries for // indexLabelNames accumulates all label name to fingerprint index entries for
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates // the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
// the index to reflect the new state. // the index to reflect the new state.
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint]model.Metric) (err error) { func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint]model.Metric) (err error) {
begin := time.Now() begin := time.Now()
defer func() { defer func() {
@ -338,6 +340,8 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
// indexLabelPairs accumulates all label pair to fingerprint index entries for // indexLabelPairs accumulates all label pair to fingerprint index entries for
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates // the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
// the index to reflect the new state. // the index to reflect the new state.
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint]model.Metric) (err error) { func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint]model.Metric) (err error) {
begin := time.Now() begin := time.Now()
defer func() { defer func() {
@ -408,6 +412,35 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
return return
} }
// indexFingerprints updates all of the Fingerprint to Metric reverse lookups
// in the index and then bulk updates.
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerprint]model.Metric) (err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexFingerprints, result: success}, map[string]string{operation: indexFingerprints, result: failure})
}()
batch := leveldb.NewBatch()
defer batch.Close()
for fingerprint, metric := range metrics {
key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO())
value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric))
batch.Put(key, value)
}
err = l.fingerprintToMetrics.Commit(batch)
if err != nil {
panic(err)
}
return
}
// indexMetrics takes groups of samples, determines which ones contain metrics // indexMetrics takes groups of samples, determines which ones contain metrics
// that are unknown to the storage stack, and then proceeds to update all // that are unknown to the storage stack, and then proceeds to update all
// affected indices. // affected indices.
@ -435,19 +468,23 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
// TODO: For the missing fingerprints, determine what label names and pairs // TODO: For the missing fingerprints, determine what label names and pairs
// are absent and act accordingly and append fingerprints. // are absent and act accordingly and append fingerprints.
var ( var (
doneBuildingLabelNameIndex = make(chan error) doneBuildingLabelNameIndex = make(chan error)
doneBuildingLabelPairIndex = make(chan error) doneBuildingLabelPairIndex = make(chan error)
doneBuildingFingerprintIndex = make(chan error)
) )
go func() { go func() {
doneBuildingLabelNameIndex <- l.indexLabelNames(absentMetrics) doneBuildingLabelNameIndex <- l.indexLabelNames(absentMetrics)
}() }()
// Update LabelPair -> Fingerprint index.
go func() { go func() {
doneBuildingLabelPairIndex <- l.indexLabelPairs(absentMetrics) doneBuildingLabelPairIndex <- l.indexLabelPairs(absentMetrics)
}() }()
go func() {
doneBuildingFingerprintIndex <- l.indexFingerprints(absentMetrics)
}()
makeTopLevelIndex := true makeTopLevelIndex := true
err = <-doneBuildingLabelNameIndex err = <-doneBuildingLabelNameIndex
@ -460,27 +497,17 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
panic(err) panic(err)
makeTopLevelIndex = false makeTopLevelIndex = false
} }
err = <-doneBuildingFingerprintIndex
// Update the Metric existence index. if err != nil {
panic(err)
if len(absentMetrics) > 0 { makeTopLevelIndex = false
batch := leveldb.NewBatch()
defer batch.Close()
for fingerprint, metric := range absentMetrics {
key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO())
value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric))
batch.Put(key, value)
}
err = l.fingerprintToMetrics.Commit(batch)
if err != nil {
panic(err)
// Critical
log.Println(err)
}
} }
// If any of the preceding operations failed, we will have inconsistent
// indices. Thusly, the Metric membership index should NOT be updated, as
// its state is used to determine whether to bulk update the other indices.
// Given that those operations are idempotent, it is OK to repeat them;
// however, it will consume considerable amounts of time.
if makeTopLevelIndex { if makeTopLevelIndex {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()