Merge pull request #67 from prometheus/feature/storage/rethinking

Swap the Fingerprint Generator
This commit is contained in:
Matt T. Proud 2013-02-08 07:01:14 -08:00
commit 75bd373223
6 changed files with 58 additions and 91 deletions

View file

@ -17,9 +17,7 @@ import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"errors"
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
"io"
"sort" "sort"
"time" "time"
) )
@ -79,12 +77,6 @@ func MetricToDTO(m *Metric) *dto.Metric {
} }
} }
func StringToFingerprint(v string) Fingerprint {
hash := md5.New()
io.WriteString(hash, v)
return Fingerprint(hex.EncodeToString(hash.Sum([]byte{})))
}
func BytesToFingerprint(v []byte) Fingerprint { func BytesToFingerprint(v []byte) Fingerprint {
hash := md5.New() hash := md5.New()
hash.Write(v) hash.Write(v)
@ -135,19 +127,6 @@ func FingerprintToDTO(f *Fingerprint) *dto.Fingerprint {
} }
} }
func MessageToFingerprintDTO(message proto.Message) (*dto.Fingerprint, error) {
if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil {
fingerprint := BytesToFingerprint(messageByteArray)
return &dto.Fingerprint{
Signature: proto.String(string(fingerprint)),
}, nil
} else {
return nil, marshalError
}
return nil, errors.New("Unknown error in generating FingerprintDTO from message.")
}
func SampleFromDTO(m *Metric, t *time.Time, v *dto.SampleValue) *Sample { func SampleFromDTO(m *Metric, t *time.Time, v *dto.SampleValue) *Sample {
s := &Sample{ s := &Sample{
Value: SampleValue(*v.Value), Value: SampleValue(*v.Value),
@ -158,3 +137,9 @@ func SampleFromDTO(m *Metric, t *time.Time, v *dto.SampleValue) *Sample {
return s return s
} }
func (f Fingerprint) ToDTO() *dto.Fingerprint {
return &dto.Fingerprint{
Signature: proto.String(string(f)),
}
}

View file

@ -50,7 +50,7 @@ type LabelSet map[LabelName]LabelValue
type Metric map[LabelName]LabelValue type Metric map[LabelName]LabelValue
// Fingerprint generates a fingerprint for this given Metric. // Fingerprint generates a fingerprint for this given Metric.
func (m Metric) Fingerprint() string { func (m Metric) Fingerprint() Fingerprint {
labelLength := len(m) labelLength := len(m)
labelNames := make([]string, 0, labelLength) labelNames := make([]string, 0, labelLength)
@ -70,7 +70,7 @@ func (m Metric) Fingerprint() string {
} }
summer.Write(buffer.Bytes()) summer.Write(buffer.Bytes())
return hex.EncodeToString(summer.Sum(nil)) return Fingerprint(hex.EncodeToString(summer.Sum(nil)))
} }
// A SampleValue is a representation of a value for a given sample at a given // A SampleValue is a representation of a value for a given sample at a given

View file

@ -21,7 +21,7 @@ import (
func testMetric(t test.Tester) { func testMetric(t test.Tester) {
var scenarios = []struct { var scenarios = []struct {
input map[string]string input map[string]string
output string output Fingerprint
}{ }{
{ {
input: map[string]string{}, input: map[string]string{},
@ -57,5 +57,7 @@ func TestMetric(t *testing.T) {
} }
func BenchmarkMetric(b *testing.B) { func BenchmarkMetric(b *testing.B) {
testMetric(b) for i := 0; i < b.N; i++ {
testMetric(b)
}
} }

View file

@ -119,63 +119,57 @@ func (l *LevelDBMetricPersistence) GetAllMetrics() ([]model.LabelSet, error) {
} }
func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) {
metricDTO := model.MetricToDTO(&metric) if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil {
defer closer.Close()
if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil { fingerprintDTO := metric.Fingerprint().ToDTO()
if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil { start := &dto.SampleKey{
defer closer.Close() Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(interval.OldestInclusive),
}
start := &dto.SampleKey{ emission := make([]model.Samples, 0)
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(interval.OldestInclusive),
}
emission := make([]model.Samples, 0) if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
iterator.Seek(encode)
if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil { predicate := keyIsAtMostOld(interval.NewestInclusive)
iterator.Seek(encode)
predicate := keyIsAtMostOld(interval.NewestInclusive) for iterator = iterator; iterator.Valid(); iterator.Next() {
key := &dto.SampleKey{}
for iterator = iterator; iterator.Valid(); iterator.Next() { value := &dto.SampleValue{}
key := &dto.SampleKey{} if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil {
value := &dto.SampleValue{} if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil {
if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil { if fingerprintsEqual(fingerprintDTO, key.Fingerprint) {
if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil { // Wart
if fingerprintsEqual(fingerprintDTO, key.Fingerprint) { if predicate(key) {
// Wart emission = append(emission, model.Samples{
if predicate(key) { Value: model.SampleValue(*value.Value),
emission = append(emission, model.Samples{ Timestamp: indexable.DecodeTime(key.Timestamp),
Value: model.SampleValue(*value.Value), })
Timestamp: indexable.DecodeTime(key.Timestamp),
})
} else {
break
}
} else { } else {
break break
} }
} else { } else {
return nil, valueUnmarshalErr break
} }
} else { } else {
return nil, keyUnmarshalErr return nil, valueUnmarshalErr
} }
} else {
return nil, keyUnmarshalErr
} }
return emission, nil
} else {
log.Printf("Could not encode the start key: %q\n", encodeErr)
return nil, encodeErr
} }
return emission, nil
} else { } else {
log.Printf("Could not acquire iterator: %q\n", iteratorErr) log.Printf("Could not encode the start key: %q\n", encodeErr)
return nil, iteratorErr return nil, encodeErr
} }
} else { } else {
log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr) log.Printf("Could not acquire iterator: %q\n", iteratorErr)
return nil, fingerprintDTOErr return nil, iteratorErr
} }
panic("unreachable") panic("unreachable")

View file

@ -121,7 +121,7 @@ func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.Lab
return return
} }
func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) (err error) { func (l *LevelDBMetricPersistence) appendFingerprints(sample model.Sample) (err error) {
begin := time.Now() begin := time.Now()
defer func() { defer func() {
@ -130,24 +130,22 @@ func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) (err error)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendFingerprints, result: success}, map[string]string{operation: appendFingerprints, result: failure}) recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendFingerprints, result: success}, map[string]string{operation: appendFingerprints, result: failure})
}() }()
fingerprintDTO, err := model.MessageToFingerprintDTO(m) fingerprintDTO := sample.Metric.Fingerprint().ToDTO()
if err != nil {
return
}
fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO) fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO)
metricDTOEncoder := coding.NewProtocolBufferEncoder(m) metricDTO := model.SampleToMetricDTO(&sample)
metricDTOEncoder := coding.NewProtocolBufferEncoder(metricDTO)
err = l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder) err = l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder)
if err != nil { if err != nil {
return return
} }
labelCount := len(m.LabelPair) labelCount := len(metricDTO.LabelPair)
labelPairErrors := make(chan error, labelCount) labelPairErrors := make(chan error, labelCount)
labelNameErrors := make(chan error, labelCount) labelNameErrors := make(chan error, labelCount)
for _, labelPair := range m.LabelPair { for _, labelPair := range metricDTO.LabelPair {
go func(labelPair *dto.LabelPair) { go func(labelPair *dto.LabelPair) {
labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO) labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO)
}(labelPair) }(labelPair)
@ -191,22 +189,21 @@ func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) (err error
return return
} }
fingerprint := sample.Metric.Fingerprint()
if !indexHas { if !indexHas {
err = l.indexMetric(metricDTO) err = l.indexMetric(metricDTO)
if err != nil { if err != nil {
return return
} }
err = l.appendFingerprints(metricDTO) err = l.appendFingerprints(*sample)
if err != nil { if err != nil {
return return
} }
} }
fingerprintDTO, err := model.MessageToFingerprintDTO(metricDTO) fingerprintDTO := fingerprint.ToDTO()
if err != nil {
return
}
sampleKeyDTO := &dto.SampleKey{ sampleKeyDTO := &dto.SampleKey{
Fingerprint: fingerprintDTO, Fingerprint: fingerprintDTO,

View file

@ -348,12 +348,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time,
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure}) recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure})
}() }()
d := model.MetricToDTO(m) f := m.Fingerprint().ToDTO()
f, err := model.MessageToFingerprintDTO(d)
if err != nil {
return
}
// Candidate for Refactoring // Candidate for Refactoring
k := &dto.SampleKey{ k := &dto.SampleKey{
@ -566,13 +561,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(m *model.Metric, i *model.Inte
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure}) recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure})
}() }()
f := m.Fingerprint().ToDTO()
d := model.MetricToDTO(m)
f, err := model.MessageToFingerprintDTO(d)
if err != nil {
return
}
k := &dto.SampleKey{ k := &dto.SampleKey{
Fingerprint: f, Fingerprint: f,