Merge pull request #197 from prometheus/feature/storage/curation-table

Add curation remark table and refactor error mgmt.
This commit is contained in:
Matt T. Proud 2013-04-29 01:01:33 -07:00
commit 66bc3711ea
3 changed files with 255 additions and 39 deletions

View file

@ -16,6 +16,7 @@ package metric
import (
"code.google.com/p/goprotobuf/proto"
"flag"
"fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
@ -36,6 +37,7 @@ var (
)
type LevelDBMetricPersistence struct {
curationRemarks *leveldb.LevelDBPersistence
fingerprintToMetrics *leveldb.LevelDBPersistence
labelNameToFingerprints *leveldb.LevelDBPersistence
labelSetToFingerprints *leveldb.LevelDBPersistence
@ -47,12 +49,13 @@ type LevelDBMetricPersistence struct {
var (
// These flag values are back of the envelope, though they seem sensible.
// Please re-evaluate based on your own needs.
curationRemarksCacheSize = flag.Int("curationRemarksCacheSize", 50*1024*1024, "The size for the curation remarks cache (bytes).")
fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 100*1024*1024, "The size for the fingerprint to label pair index (bytes).")
highWatermarkCacheSize = flag.Int("highWatermarksByFingerprintSizeBytes", 50*1024*1024, "The size for the metric high watermarks (bytes).")
samplesByFingerprintCacheSize = flag.Int("samplesByFingerprintCacheSizeBytes", 500*1024*1024, "The size for the samples database (bytes).")
labelNameToFingerprintsCacheSize = flag.Int("labelNameToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label name to metric fingerprint index (bytes).")
labelPairToFingerprintsCacheSize = flag.Int("labelPairToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label pair to metric fingerprint index (bytes).")
metricMembershipIndexCacheSize = flag.Int("metricMembershipCacheSizeBytes", 50*1024*1024, "The size for the metric membership index (bytes).")
samplesByFingerprintCacheSize = flag.Int("samplesByFingerprintCacheSizeBytes", 500*1024*1024, "The size for the samples database (bytes).")
)
type leveldbOpener func()
@ -62,12 +65,13 @@ type leveldbCloser interface {
func (l *LevelDBMetricPersistence) Close() {
var persistences = []leveldbCloser{
l.curationRemarks,
l.fingerprintToMetrics,
l.metricHighWatermarks,
l.metricSamples,
l.labelNameToFingerprints,
l.labelSetToFingerprints,
l.metricHighWatermarks,
l.metricMembershipIndex,
l.metricSamples,
}
closerGroup := sync.WaitGroup{}
@ -85,8 +89,8 @@ func (l *LevelDBMetricPersistence) Close() {
closerGroup.Wait()
}
func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) {
errorChannel := make(chan error, 6)
func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) {
workers := utility.NewUncertaintyGroup(7)
emission := &LevelDBMetricPersistence{}
@ -99,7 +103,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
func() {
var err error
emission.fingerprintToMetrics, err = leveldb.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", *fingerprintsToLabelPairCacheSize, 10)
errorChannel <- err
workers.MayFail(err)
},
},
{
@ -107,7 +111,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
func() {
var err error
emission.metricSamples, err = leveldb.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10)
errorChannel <- err
workers.MayFail(err)
},
},
{
@ -115,7 +119,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
func() {
var err error
emission.metricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10)
errorChannel <- err
workers.MayFail(err)
},
},
{
@ -123,7 +127,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
func() {
var err error
emission.labelNameToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", *labelNameToFingerprintsCacheSize, 10)
errorChannel <- err
workers.MayFail(err)
},
},
{
@ -131,7 +135,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
func() {
var err error
emission.labelSetToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", *labelPairToFingerprintsCacheSize, 10)
errorChannel <- err
workers.MayFail(err)
},
},
{
@ -139,7 +143,15 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
func() {
var err error
emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", *metricMembershipIndexCacheSize, 10)
errorChannel <- err
workers.MayFail(err)
},
},
{
"Sample Curation Remarks",
func() {
var err error
emission.curationRemarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/curation_remarks", *curationRemarksCacheSize, 10)
workers.MayFail(err)
},
},
}
@ -149,18 +161,15 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
go opener()
}
for i := 0; i < cap(errorChannel); i++ {
err = <-errorChannel
if err != nil {
log.Printf("Could not open a LevelDBPersistence storage container: %q\n", err)
return
if !workers.Wait() {
for _, err := range workers.Errors() {
log.Printf("Could not open storage due to %s", err)
}
}
persistence = emission
return
return nil, fmt.Errorf("Unable to open metric persistence.")
}
return emission, nil
}
func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error) {
@ -432,33 +441,22 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
// TODO: For the missing fingerprints, determine what label names and pairs
// are absent and act accordingly and append fingerprints.
doneBuildingLabelNameIndex := make(chan error)
doneBuildingLabelPairIndex := make(chan error)
doneBuildingFingerprintIndex := make(chan error)
workers := utility.NewUncertaintyGroup(3)
go func() {
doneBuildingLabelNameIndex <- l.indexLabelNames(absentMetrics)
workers.MayFail(l.indexLabelNames(absentMetrics))
}()
go func() {
doneBuildingLabelPairIndex <- l.indexLabelPairs(absentMetrics)
workers.MayFail(l.indexLabelPairs(absentMetrics))
}()
go func() {
doneBuildingFingerprintIndex <- l.indexFingerprints(absentMetrics)
workers.MayFail(l.indexFingerprints(absentMetrics))
}()
err = <-doneBuildingLabelNameIndex
if err != nil {
return
}
err = <-doneBuildingLabelPairIndex
if err != nil {
return
}
err = <-doneBuildingFingerprintIndex
if err != nil {
return
if !workers.Wait() {
return fmt.Errorf("Could not index due to %s", workers.Errors())
}
// If any of the preceding operations failed, we will have inconsistent
@ -477,7 +475,8 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
err = l.metricMembershipIndex.Commit(batch)
if err != nil {
return err
// Not critical but undesirable.
log.Println(err)
}
return

140
utility/uncertaintygroup.go Normal file
View file

@ -0,0 +1,140 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utility
import (
"fmt"
)
type state int
func (s state) String() string {
switch s {
case unstarted:
return "unstarted"
case started:
return "started"
case finished:
return "finished"
}
panic("unreachable")
}
const (
unstarted state = iota
started
finished
)
// An UncertaintyGroup models a group of operations whose result disposition is
// tenuous and needs to be validated en masse in order to make a future
// decision.
type UncertaintyGroup interface {
// Succeed makes a remark that a given action succeeded, in part.
Succeed()
// Fail makes a remark that a given action failed, in part. Nil values are
// illegal.
Fail(error)
// MayFail makes a remark that a given action either succeeded or failed. The
// determination is made by whether the error is nil.
MayFail(error)
// Wait waits for the group to have finished and emits the result of what
// occurred for the group.
Wait() (succeeded bool)
// Errors emits any errors that could have occurred.
Errors() []error
}
type uncertaintyGroup struct {
state state
remaining uint
successes uint
results chan error
anomalies []error
}
func (g uncertaintyGroup) Succeed() {
if g.state == finished {
panic("cannot remark when done")
}
g.results <- nil
}
func (g uncertaintyGroup) Fail(err error) {
if g.state == finished {
panic("cannot remark when done")
}
if err == nil {
panic("expected a failure")
}
g.results <- err
}
func (g uncertaintyGroup) MayFail(err error) {
if g.state == finished {
panic("cannot remark when done")
}
g.results <- err
}
func (g *uncertaintyGroup) Wait() bool {
if g.state != unstarted {
panic("cannot restart")
}
defer close(g.results)
g.state = started
for g.remaining > 0 {
result := <-g.results
switch result {
case nil:
g.successes++
default:
g.anomalies = append(g.anomalies, result)
}
g.remaining--
}
g.state = finished
return len(g.anomalies) == 0
}
func (g uncertaintyGroup) Errors() []error {
if g.state != finished {
panic("cannot provide errors until finished")
}
return g.anomalies
}
func (g uncertaintyGroup) String() string {
return fmt.Sprintf("UncertaintyGroup %s with %s failures", g.state, g.anomalies)
}
// NewUncertaintyGroup furnishes an UncertaintyGroup for a given set of actions
// where their quantity is known a priori.
func NewUncertaintyGroup(count uint) UncertaintyGroup {
return &uncertaintyGroup{
remaining: count,
results: make(chan error),
}
}

View file

@ -0,0 +1,77 @@
package utility
import (
"fmt"
"testing"
"time"
)
func TestGroupSuccess(t *testing.T) {
uncertaintyGroup := NewUncertaintyGroup(10)
for i := 0; i < 10; i++ {
go uncertaintyGroup.Succeed()
}
result := make(chan bool)
go func() {
result <- uncertaintyGroup.Wait()
}()
select {
case v := <-result:
if !v {
t.Fatal("expected success")
}
case <-time.After(time.Second):
t.Fatal("deadline exceeded")
}
}
func TestGroupFail(t *testing.T) {
uncertaintyGroup := NewUncertaintyGroup(10)
for i := 0; i < 10; i++ {
go uncertaintyGroup.Fail(fmt.Errorf(""))
}
result := make(chan bool)
go func() {
result <- uncertaintyGroup.Wait()
}()
select {
case v := <-result:
if v {
t.Fatal("expected failure")
}
case <-time.After(time.Second):
t.Fatal("deadline exceeded")
}
}
func TestGroupFailMix(t *testing.T) {
uncertaintyGroup := NewUncertaintyGroup(10)
for i := 0; i < 10; i++ {
go func(i int) {
switch {
case i%2 == 0:
uncertaintyGroup.Fail(fmt.Errorf(""))
default:
uncertaintyGroup.Succeed()
}
}(i)
}
result := make(chan bool)
go func() {
result <- uncertaintyGroup.Wait()
}()
select {
case v := <-result:
if v {
t.Fatal("expected failure")
}
case <-time.After(time.Second):
t.Fatal("deadline exceeded")
}
}