diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 17cebfa1c..28fa01d57 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -16,6 +16,7 @@ package metric import ( "code.google.com/p/goprotobuf/proto" "flag" + "fmt" "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/model" dto "github.com/prometheus/prometheus/model/generated" @@ -36,6 +37,7 @@ var ( ) type LevelDBMetricPersistence struct { + curationRemarks *leveldb.LevelDBPersistence fingerprintToMetrics *leveldb.LevelDBPersistence labelNameToFingerprints *leveldb.LevelDBPersistence labelSetToFingerprints *leveldb.LevelDBPersistence @@ -47,12 +49,13 @@ type LevelDBMetricPersistence struct { var ( // These flag values are back of the envelope, though they seem sensible. // Please re-evaluate based on your own needs. + curationRemarksCacheSize = flag.Int("curationRemarksCacheSize", 50*1024*1024, "The size for the curation remarks cache (bytes).") fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 100*1024*1024, "The size for the fingerprint to label pair index (bytes).") highWatermarkCacheSize = flag.Int("highWatermarksByFingerprintSizeBytes", 50*1024*1024, "The size for the metric high watermarks (bytes).") - samplesByFingerprintCacheSize = flag.Int("samplesByFingerprintCacheSizeBytes", 500*1024*1024, "The size for the samples database (bytes).") labelNameToFingerprintsCacheSize = flag.Int("labelNameToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label name to metric fingerprint index (bytes).") labelPairToFingerprintsCacheSize = flag.Int("labelPairToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label pair to metric fingerprint index (bytes).") metricMembershipIndexCacheSize = flag.Int("metricMembershipCacheSizeBytes", 50*1024*1024, "The size for the metric membership index (bytes).") + samplesByFingerprintCacheSize = flag.Int("samplesByFingerprintCacheSizeBytes", 500*1024*1024, "The size for the samples database (bytes).") ) type leveldbOpener func() @@ -62,12 +65,13 @@ type leveldbCloser interface { func (l *LevelDBMetricPersistence) Close() { var persistences = []leveldbCloser{ + l.curationRemarks, l.fingerprintToMetrics, - l.metricHighWatermarks, - l.metricSamples, l.labelNameToFingerprints, l.labelSetToFingerprints, + l.metricHighWatermarks, l.metricMembershipIndex, + l.metricSamples, } closerGroup := sync.WaitGroup{} @@ -85,8 +89,8 @@ func (l *LevelDBMetricPersistence) Close() { closerGroup.Wait() } -func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) { - errorChannel := make(chan error, 6) +func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) { + workers := utility.NewUncertaintyGroup(7) emission := &LevelDBMetricPersistence{} @@ -99,7 +103,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr func() { var err error emission.fingerprintToMetrics, err = leveldb.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", *fingerprintsToLabelPairCacheSize, 10) - errorChannel <- err + workers.MayFail(err) }, }, { @@ -107,7 +111,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr func() { var err error emission.metricSamples, err = leveldb.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10) - errorChannel <- err + workers.MayFail(err) }, }, { @@ -115,7 +119,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr func() { var err error emission.metricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10) - errorChannel <- err + workers.MayFail(err) }, }, { @@ -123,7 +127,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr func() { var err error emission.labelNameToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", *labelNameToFingerprintsCacheSize, 10) - errorChannel <- err + workers.MayFail(err) }, }, { @@ -131,7 +135,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr func() { var err error emission.labelSetToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", *labelPairToFingerprintsCacheSize, 10) - errorChannel <- err + workers.MayFail(err) }, }, { @@ -139,7 +143,15 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr func() { var err error emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", *metricMembershipIndexCacheSize, 10) - errorChannel <- err + workers.MayFail(err) + }, + }, + { + "Sample Curation Remarks", + func() { + var err error + emission.curationRemarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/curation_remarks", *curationRemarksCacheSize, 10) + workers.MayFail(err) }, }, } @@ -149,18 +161,15 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr go opener() } - for i := 0; i < cap(errorChannel); i++ { - err = <-errorChannel - - if err != nil { - log.Printf("Could not open a LevelDBPersistence storage container: %q\n", err) - - return + if !workers.Wait() { + for _, err := range workers.Errors() { + log.Printf("Could not open storage due to %s", err) } - } - persistence = emission - return + return nil, fmt.Errorf("Unable to open metric persistence.") + } + + return emission, nil } func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error) { @@ -432,33 +441,22 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri // TODO: For the missing fingerprints, determine what label names and pairs // are absent and act accordingly and append fingerprints. - doneBuildingLabelNameIndex := make(chan error) - doneBuildingLabelPairIndex := make(chan error) - doneBuildingFingerprintIndex := make(chan error) + workers := utility.NewUncertaintyGroup(3) go func() { - doneBuildingLabelNameIndex <- l.indexLabelNames(absentMetrics) + workers.MayFail(l.indexLabelNames(absentMetrics)) }() go func() { - doneBuildingLabelPairIndex <- l.indexLabelPairs(absentMetrics) + workers.MayFail(l.indexLabelPairs(absentMetrics)) }() go func() { - doneBuildingFingerprintIndex <- l.indexFingerprints(absentMetrics) + workers.MayFail(l.indexFingerprints(absentMetrics)) }() - err = <-doneBuildingLabelNameIndex - if err != nil { - return - } - err = <-doneBuildingLabelPairIndex - if err != nil { - return - } - err = <-doneBuildingFingerprintIndex - if err != nil { - return + if !workers.Wait() { + return fmt.Errorf("Could not index due to %s", workers.Errors()) } // If any of the preceding operations failed, we will have inconsistent @@ -477,7 +475,8 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri err = l.metricMembershipIndex.Commit(batch) if err != nil { - return err + // Not critical but undesirable. + log.Println(err) } return diff --git a/utility/uncertaintygroup.go b/utility/uncertaintygroup.go new file mode 100644 index 000000000..b1c909744 --- /dev/null +++ b/utility/uncertaintygroup.go @@ -0,0 +1,140 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utility + +import ( + "fmt" +) + +type state int + +func (s state) String() string { + switch s { + case unstarted: + return "unstarted" + case started: + return "started" + case finished: + return "finished" + } + panic("unreachable") +} + +const ( + unstarted state = iota + started + finished +) + +// An UncertaintyGroup models a group of operations whose result disposition is +// tenuous and needs to be validated en masse in order to make a future +// decision. +type UncertaintyGroup interface { + // Succeed makes a remark that a given action succeeded, in part. + Succeed() + // Fail makes a remark that a given action failed, in part. Nil values are + // illegal. + Fail(error) + // MayFail makes a remark that a given action either succeeded or failed. The + // determination is made by whether the error is nil. + MayFail(error) + // Wait waits for the group to have finished and emits the result of what + // occurred for the group. + Wait() (succeeded bool) + // Errors emits any errors that could have occurred. + Errors() []error +} + +type uncertaintyGroup struct { + state state + remaining uint + successes uint + results chan error + anomalies []error +} + +func (g uncertaintyGroup) Succeed() { + if g.state == finished { + panic("cannot remark when done") + } + + g.results <- nil +} + +func (g uncertaintyGroup) Fail(err error) { + if g.state == finished { + panic("cannot remark when done") + } + + if err == nil { + panic("expected a failure") + } + + g.results <- err +} + +func (g uncertaintyGroup) MayFail(err error) { + if g.state == finished { + panic("cannot remark when done") + } + + g.results <- err +} + +func (g *uncertaintyGroup) Wait() bool { + if g.state != unstarted { + panic("cannot restart") + } + + defer close(g.results) + + g.state = started + + for g.remaining > 0 { + result := <-g.results + switch result { + case nil: + g.successes++ + default: + g.anomalies = append(g.anomalies, result) + } + + g.remaining-- + } + + g.state = finished + + return len(g.anomalies) == 0 +} + +func (g uncertaintyGroup) Errors() []error { + if g.state != finished { + panic("cannot provide errors until finished") + } + + return g.anomalies +} + +func (g uncertaintyGroup) String() string { + return fmt.Sprintf("UncertaintyGroup %s with %s failures", g.state, g.anomalies) +} + +// NewUncertaintyGroup furnishes an UncertaintyGroup for a given set of actions +// where their quantity is known a priori. +func NewUncertaintyGroup(count uint) UncertaintyGroup { + return &uncertaintyGroup{ + remaining: count, + results: make(chan error), + } +} diff --git a/utility/uncertaintygroup_test.go b/utility/uncertaintygroup_test.go new file mode 100644 index 000000000..d2a8a13c6 --- /dev/null +++ b/utility/uncertaintygroup_test.go @@ -0,0 +1,77 @@ +package utility + +import ( + "fmt" + "testing" + "time" +) + +func TestGroupSuccess(t *testing.T) { + uncertaintyGroup := NewUncertaintyGroup(10) + + for i := 0; i < 10; i++ { + go uncertaintyGroup.Succeed() + } + + result := make(chan bool) + go func() { + result <- uncertaintyGroup.Wait() + }() + select { + case v := <-result: + if !v { + t.Fatal("expected success") + } + case <-time.After(time.Second): + t.Fatal("deadline exceeded") + } +} + +func TestGroupFail(t *testing.T) { + uncertaintyGroup := NewUncertaintyGroup(10) + + for i := 0; i < 10; i++ { + go uncertaintyGroup.Fail(fmt.Errorf("")) + } + + result := make(chan bool) + go func() { + result <- uncertaintyGroup.Wait() + }() + select { + case v := <-result: + if v { + t.Fatal("expected failure") + } + case <-time.After(time.Second): + t.Fatal("deadline exceeded") + } +} + +func TestGroupFailMix(t *testing.T) { + uncertaintyGroup := NewUncertaintyGroup(10) + + for i := 0; i < 10; i++ { + go func(i int) { + switch { + case i%2 == 0: + uncertaintyGroup.Fail(fmt.Errorf("")) + default: + uncertaintyGroup.Succeed() + } + }(i) + } + + result := make(chan bool) + go func() { + result <- uncertaintyGroup.Wait() + }() + select { + case v := <-result: + if v { + t.Fatal("expected failure") + } + case <-time.After(time.Second): + t.Fatal("deadline exceeded") + } +}