From 577acf4fe7167bb462b6f8b52091a4aaddcbe784 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Sun, 9 Dec 2012 16:27:12 +0100 Subject: [PATCH] Exploding the storage infrastructure by contexts. --- model/{conversion.go => dto.go} | 52 ++- storage/metric/interface.go | 5 + storage/metric/leveldb/diagnostics.go | 117 +++++ storage/metric/leveldb/leveldb.go | 600 ------------------------- storage/metric/leveldb/leveldb_test.go | 173 ++++++- storage/metric/leveldb/lifecycle.go | 164 +++++++ storage/metric/leveldb/mutable.go | 180 ++++++++ storage/metric/leveldb/reading.go | 200 +++++++++ storage/metric/leveldb/type.go | 27 ++ storage/raw/index/leveldb/leveldb.go | 4 +- 10 files changed, 888 insertions(+), 634 deletions(-) rename model/{conversion.go => dto.go} (66%) create mode 100644 storage/metric/leveldb/diagnostics.go delete mode 100644 storage/metric/leveldb/leveldb.go create mode 100644 storage/metric/leveldb/lifecycle.go create mode 100644 storage/metric/leveldb/mutable.go create mode 100644 storage/metric/leveldb/reading.go create mode 100644 storage/metric/leveldb/type.go diff --git a/model/conversion.go b/model/dto.go similarity index 66% rename from model/conversion.go rename to model/dto.go index 2cf68d94fa..ffe62ddb86 100644 --- a/model/conversion.go +++ b/model/dto.go @@ -17,12 +17,13 @@ import ( "code.google.com/p/goprotobuf/proto" "crypto/md5" "encoding/hex" - data "github.com/matttproud/prometheus/model/generated" + "errors" + dto "github.com/matttproud/prometheus/model/generated" "io" "sort" ) -func SampleToMetricDTO(s *Sample) *data.Metric { +func SampleToMetricDTO(s *Sample) *dto.Metric { labelLength := len(s.Labels) labelNames := make([]string, 0, labelLength) @@ -32,11 +33,11 @@ func SampleToMetricDTO(s *Sample) *data.Metric { sort.Strings(labelNames) - labelSets := make([]*data.LabelPair, 0, labelLength) + labelSets := make([]*dto.LabelPair, 0, labelLength) for _, labelName := range labelNames { labelValue := s.Labels[LabelName(labelName)] - labelPair := &data.LabelPair{ + labelPair := &dto.LabelPair{ Name: proto.String(string(labelName)), Value: proto.String(string(labelValue)), } @@ -44,12 +45,12 @@ func SampleToMetricDTO(s *Sample) *data.Metric { labelSets = append(labelSets, labelPair) } - return &data.Metric{ + return &dto.Metric{ LabelPair: labelSets, } } -func MetricToDTO(m *Metric) *data.Metric { +func MetricToDTO(m *Metric) *dto.Metric { metricLength := len(*m) labelNames := make([]string, 0, metricLength) @@ -59,12 +60,12 @@ func MetricToDTO(m *Metric) *data.Metric { sort.Strings(labelNames) - labelSets := make([]*data.LabelPair, 0, metricLength) + labelSets := make([]*dto.LabelPair, 0, metricLength) for _, labelName := range labelNames { l := LabelName(labelName) labelValue := (*m)[l] - labelPair := &data.LabelPair{ + labelPair := &dto.LabelPair{ Name: proto.String(string(labelName)), Value: proto.String(string(labelValue)), } @@ -72,7 +73,7 @@ func MetricToDTO(m *Metric) *data.Metric { labelSets = append(labelSets, labelPair) } - return &data.Metric{ + return &dto.Metric{ LabelPair: labelSets, } } @@ -89,7 +90,7 @@ func BytesToFingerprint(v []byte) Fingerprint { return Fingerprint(hex.EncodeToString(hash.Sum([]byte{}))) } -func LabelSetToDTOs(s *LabelSet) []*data.LabelPair { +func LabelSetToDTOs(s *LabelSet) []*dto.LabelPair { metricLength := len(*s) labelNames := make([]string, 0, metricLength) @@ -99,12 +100,12 @@ func LabelSetToDTOs(s *LabelSet) []*data.LabelPair { sort.Strings(labelNames) - labelSets := make([]*data.LabelPair, 0, metricLength) + labelSets := make([]*dto.LabelPair, 0, metricLength) for _, labelName := range labelNames { l := LabelName(labelName) labelValue := (*s)[l] - labelPair := &data.LabelPair{ + labelPair := &dto.LabelPair{ Name: proto.String(string(labelName)), Value: proto.String(string(labelValue)), } @@ -115,14 +116,33 @@ func LabelSetToDTOs(s *LabelSet) []*data.LabelPair { return labelSets } -func LabelSetToDTO(s *LabelSet) *data.LabelSet { - return &data.LabelSet{ +func LabelSetToDTO(s *LabelSet) *dto.LabelSet { + return &dto.LabelSet{ Member: LabelSetToDTOs(s), } } -func LabelNameToDTO(l *LabelName) *data.LabelName { - return &data.LabelName{ +func LabelNameToDTO(l *LabelName) *dto.LabelName { + return &dto.LabelName{ Name: proto.String(string(*l)), } } + +func FingerprintToDTO(f *Fingerprint) *dto.Fingerprint { + return &dto.Fingerprint{ + Signature: proto.String(string(*f)), + } +} + +func MessageToFingerprintDTO(message proto.Message) (*dto.Fingerprint, error) { + if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil { + fingerprint := BytesToFingerprint(messageByteArray) + return &dto.Fingerprint{ + Signature: proto.String(string(fingerprint)), + }, nil + } else { + return nil, marshalError + } + + return nil, errors.New("Unknown error in generating FingerprintDTO from message.") +} diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 109a406192..719112a670 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -30,8 +30,13 @@ type MetricPersistence interface { // Get all of the metric fingerprints that are associated with the provided // label set. GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error) + + // Get all of the metric fingerprints that are associated for a given label + // name. GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error) + GetMetricForFingerprint(f *model.Fingerprint) (*model.Metric, error) + GetAllLabelNames() ([]string, error) GetAllLabelPairs() ([]model.LabelSet, error) GetAllMetrics() ([]model.LabelSet, error) diff --git a/storage/metric/leveldb/diagnostics.go b/storage/metric/leveldb/diagnostics.go new file mode 100644 index 0000000000..bf8112d1c1 --- /dev/null +++ b/storage/metric/leveldb/diagnostics.go @@ -0,0 +1,117 @@ +// Copyright 2012 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "code.google.com/p/goprotobuf/proto" + "errors" + "github.com/matttproud/prometheus/coding" + "github.com/matttproud/prometheus/model" + dto "github.com/matttproud/prometheus/model/generated" + "github.com/matttproud/prometheus/utility" +) + +func (l *LevelDBMetricPersistence) GetAllLabelNames() ([]string, error) { + if getAll, getAllError := l.labelNameToFingerprints.GetAll(); getAllError == nil { + result := make([]string, 0, len(getAll)) + labelNameDTO := &dto.LabelName{} + + for _, pair := range getAll { + if unmarshalError := proto.Unmarshal(pair.Left, labelNameDTO); unmarshalError == nil { + result = append(result, *labelNameDTO.Name) + } else { + return nil, unmarshalError + } + } + + return result, nil + + } else { + return nil, getAllError + } + + return nil, errors.New("Unknown error encountered when querying label names.") +} + +func (l *LevelDBMetricPersistence) GetAllLabelPairs() ([]model.LabelSet, error) { + if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil { + result := make([]model.LabelSet, 0, len(getAll)) + labelPairDTO := &dto.LabelPair{} + + for _, pair := range getAll { + if unmarshalError := proto.Unmarshal(pair.Left, labelPairDTO); unmarshalError == nil { + n := model.LabelName(*labelPairDTO.Name) + v := model.LabelValue(*labelPairDTO.Value) + item := model.LabelSet{n: v} + result = append(result, item) + } else { + return nil, unmarshalError + } + } + + return result, nil + + } else { + return nil, getAllError + } + + return nil, errors.New("Unknown error encountered when querying label pairs.") +} + +func (l *LevelDBMetricPersistence) GetAllMetrics() ([]model.LabelSet, error) { + if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil { + result := make([]model.LabelSet, 0) + fingerprintCollection := &dto.FingerprintCollection{} + + fingerprints := make(utility.Set) + + for _, pair := range getAll { + if unmarshalError := proto.Unmarshal(pair.Right, fingerprintCollection); unmarshalError == nil { + for _, member := range fingerprintCollection.Member { + if !fingerprints.Has(*member.Signature) { + fingerprints.Add(*member.Signature) + fingerprintEncoded := coding.NewProtocolBufferEncoder(member) + if labelPairCollectionRaw, labelPairCollectionRawError := l.fingerprintToMetrics.Get(fingerprintEncoded); labelPairCollectionRawError == nil { + + labelPairCollectionDTO := &dto.LabelSet{} + + if labelPairCollectionDTOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDTO); labelPairCollectionDTOMarshalError == nil { + intermediate := make(model.LabelSet, 0) + + for _, member := range labelPairCollectionDTO.Member { + n := model.LabelName(*member.Name) + v := model.LabelValue(*member.Value) + intermediate[n] = v + } + + result = append(result, intermediate) + } else { + return nil, labelPairCollectionDTOMarshalError + } + } else { + return nil, labelPairCollectionRawError + } + } + } + } else { + return nil, unmarshalError + } + } + return result, nil + } else { + return nil, getAllError + } + + return nil, errors.New("Unknown error encountered when querying metrics.") +} diff --git a/storage/metric/leveldb/leveldb.go b/storage/metric/leveldb/leveldb.go deleted file mode 100644 index c477b41824..0000000000 --- a/storage/metric/leveldb/leveldb.go +++ /dev/null @@ -1,600 +0,0 @@ -// Copyright 2012 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package leveldb - -import ( - "code.google.com/p/goprotobuf/proto" - "errors" - "github.com/matttproud/prometheus/coding" - "github.com/matttproud/prometheus/coding/indexable" - "github.com/matttproud/prometheus/model" - data "github.com/matttproud/prometheus/model/generated" - index "github.com/matttproud/prometheus/storage/raw/index/leveldb" - storage "github.com/matttproud/prometheus/storage/raw/leveldb" - "github.com/matttproud/prometheus/utility" - "io" - "log" -) - -type LevelDBMetricPersistence struct { - fingerprintToMetrics *storage.LevelDBPersistence - metricSamples *storage.LevelDBPersistence - labelNameToFingerprints *storage.LevelDBPersistence - labelSetToFingerprints *storage.LevelDBPersistence - metricMembershipIndex *index.LevelDBMembershipIndex -} - -type leveldbOpener func() - -func (l *LevelDBMetricPersistence) Close() error { - log.Printf("Closing LevelDBPersistence storage containers...") - - var persistences = []struct { - name string - closer io.Closer - }{ - { - "Fingerprint to Label Name and Value Pairs", - l.fingerprintToMetrics, - }, - { - "Fingerprint Samples", - l.metricSamples, - }, - { - "Label Name to Fingerprints", - l.labelNameToFingerprints, - }, - { - "Label Name and Value Pairs to Fingerprints", - l.labelSetToFingerprints, - }, - { - "Metric Membership Index", - l.metricMembershipIndex, - }, - } - - errorChannel := make(chan error, len(persistences)) - - for _, persistence := range persistences { - name := persistence.name - closer := persistence.closer - - go func(name string, closer io.Closer) { - if closer != nil { - log.Printf("Closing LevelDBPersistence storage container: %s\n", name) - closingError := closer.Close() - - if closingError != nil { - log.Printf("Could not close a LevelDBPersistence storage container; inconsistencies are possible: %q\n", closingError) - } - - errorChannel <- closingError - } else { - errorChannel <- nil - } - }(name, closer) - } - - for i := 0; i < cap(errorChannel); i++ { - closingError := <-errorChannel - - if closingError != nil { - return closingError - } - } - - log.Printf("Successfully closed all LevelDBPersistence storage containers.") - - return nil -} - -func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) { - log.Printf("Opening LevelDBPersistence storage containers...") - - errorChannel := make(chan error, 5) - - emission := &LevelDBMetricPersistence{} - - var subsystemOpeners = []struct { - name string - opener leveldbOpener - }{ - { - "Label Names and Value Pairs by Fingerprint", - func() { - var err error - emission.fingerprintToMetrics, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) - errorChannel <- err - }, - }, - { - "Samples by Fingerprint", - func() { - var err error - emission.metricSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) - errorChannel <- err - }, - }, - { - "Fingerprints by Label Name", - func() { - var err error - emission.labelNameToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) - errorChannel <- err - }, - }, - { - "Fingerprints by Label Name and Value Pair", - func() { - var err error - emission.labelSetToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) - errorChannel <- err - }, - }, - { - "Metric Membership Index", - func() { - var err error - emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10) - errorChannel <- err - }, - }, - } - - for _, subsystem := range subsystemOpeners { - name := subsystem.name - opener := subsystem.opener - - log.Printf("Opening LevelDBPersistence storage container: %s\n", name) - - go opener() - } - - for i := 0; i < cap(errorChannel); i++ { - openingError := <-errorChannel - - if openingError != nil { - log.Printf("Could not open a LevelDBPersistence storage container: %q\n", openingError) - - return nil, openingError - } - } - - log.Printf("Successfully opened all LevelDBPersistence storage containers.\n") - - return emission, nil -} - -func (l *LevelDBMetricPersistence) hasIndexMetric(dto *data.Metric) (bool, error) { - dtoKey := coding.NewProtocolBufferEncoder(dto) - return l.metricMembershipIndex.Has(dtoKey) -} - -func (l *LevelDBMetricPersistence) indexMetric(dto *data.Metric) error { - dtoKey := coding.NewProtocolBufferEncoder(dto) - return l.metricMembershipIndex.Put(dtoKey) -} - -// TODO(mtp): Candidate for refactoring. -func fingerprintDTOForMessage(message proto.Message) (*data.Fingerprint, error) { - if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil { - fingerprint := model.BytesToFingerprint(messageByteArray) - return &data.Fingerprint{ - Signature: proto.String(string(fingerprint)), - }, nil - } else { - return nil, marshalError - } - - return nil, errors.New("Unknown error in generating FingerprintDTO from message.") -} - -func (l *LevelDBMetricPersistence) HasLabelPair(dto *data.LabelPair) (bool, error) { - dtoKey := coding.NewProtocolBufferEncoder(dto) - return l.labelSetToFingerprints.Has(dtoKey) -} - -func (l *LevelDBMetricPersistence) HasLabelName(dto *data.LabelName) (bool, error) { - dtoKey := coding.NewProtocolBufferEncoder(dto) - return l.labelNameToFingerprints.Has(dtoKey) -} - -func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(dto *data.LabelPair) (*data.FingerprintCollection, error) { - dtoKey := coding.NewProtocolBufferEncoder(dto) - if get, getError := l.labelSetToFingerprints.Get(dtoKey); getError == nil { - value := &data.FingerprintCollection{} - if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { - return value, nil - } else { - return nil, unmarshalError - } - } else { - return nil, getError - } - - panic("unreachable") -} - -func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(dto *data.LabelName) (*data.FingerprintCollection, error) { - dtoKey := coding.NewProtocolBufferEncoder(dto) - if get, getError := l.labelNameToFingerprints.Get(dtoKey); getError == nil { - value := &data.FingerprintCollection{} - if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { - return value, nil - } else { - return nil, unmarshalError - } - } else { - return nil, getError - } - - return nil, errors.New("Unknown error while getting label name fingerprints.") -} - -func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *data.LabelPair, fingerprints *data.FingerprintCollection) error { - labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair) - fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) - return l.labelSetToFingerprints.Put(labelPairEncoded, fingerprintsEncoded) -} - -func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *data.LabelName, fingerprints *data.FingerprintCollection) error { - labelNameEncoded := coding.NewProtocolBufferEncoder(labelName) - fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) - return l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded) -} - -func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *data.LabelPair, fingerprint *data.Fingerprint) error { - if has, hasError := l.HasLabelPair(labelPair); hasError == nil { - var fingerprints *data.FingerprintCollection - if has { - if existing, existingError := l.getFingerprintsForLabelSet(labelPair); existingError == nil { - fingerprints = existing - } else { - return existingError - } - } else { - fingerprints = &data.FingerprintCollection{} - } - - fingerprints.Member = append(fingerprints.Member, fingerprint) - - return l.setLabelPairFingerprints(labelPair, fingerprints) - } else { - return hasError - } - - return errors.New("Unknown error when appending fingerprint to label name and value pair.") -} - -func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *data.LabelPair, fingerprint *data.Fingerprint) error { - labelName := &data.LabelName{ - Name: labelPair.Name, - } - - if has, hasError := l.HasLabelName(labelName); hasError == nil { - var fingerprints *data.FingerprintCollection - if has { - if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil { - fingerprints = existing - } else { - return existingError - } - } else { - fingerprints = &data.FingerprintCollection{} - } - - fingerprints.Member = append(fingerprints.Member, fingerprint) - - return l.setLabelNameFingerprints(labelName, fingerprints) - } else { - return hasError - } - - return errors.New("Unknown error when appending fingerprint to label name and value pair.") -} - -func (l *LevelDBMetricPersistence) appendFingerprints(dto *data.Metric) error { - if fingerprintDTO, fingerprintDTOError := fingerprintDTOForMessage(dto); fingerprintDTOError == nil { - fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO) - metricDTOEncoder := coding.NewProtocolBufferEncoder(dto) - - if putError := l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder); putError == nil { - labelCount := len(dto.LabelPair) - labelPairErrors := make(chan error, labelCount) - labelNameErrors := make(chan error, labelCount) - - for _, labelPair := range dto.LabelPair { - go func(labelPair *data.LabelPair) { - labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO) - }(labelPair) - - go func(labelPair *data.LabelPair) { - labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDTO) - }(labelPair) - } - - for i := 0; i < cap(labelPairErrors); i++ { - appendError := <-labelPairErrors - - if appendError != nil { - return appendError - } - } - - for i := 0; i < cap(labelNameErrors); i++ { - appendError := <-labelNameErrors - - if appendError != nil { - return appendError - } - } - - return nil - - } else { - return putError - } - } else { - return fingerprintDTOError - } - - return errors.New("Unknown error in appending label pairs to fingerprint.") -} - -func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error { - metricDTO := model.SampleToMetricDTO(sample) - - if indexHas, indexHasError := l.hasIndexMetric(metricDTO); indexHasError == nil { - if !indexHas { - if indexPutError := l.indexMetric(metricDTO); indexPutError == nil { - if appendError := l.appendFingerprints(metricDTO); appendError != nil { - log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError) - return appendError - } - } else { - log.Printf("Could not add metric to membership index: %q\n", indexPutError) - return indexPutError - } - } - } else { - log.Printf("Could not query membership index for metric: %q\n", indexHasError) - return indexHasError - } - - if fingerprintDTO, fingerprintDTOErr := fingerprintDTOForMessage(metricDTO); fingerprintDTOErr == nil { - - sampleKeyDTO := &data.SampleKey{ - Fingerprint: fingerprintDTO, - Timestamp: indexable.EncodeTime(sample.Timestamp), - } - - sampleValueDTO := &data.SampleValue{ - Value: proto.Float32(float32(sample.Value)), - } - - sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO) - sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO) - - if putError := l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil { - log.Printf("Could not append metric sample: %q\n", putError) - return putError - } - } else { - log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDTOErr) - return fingerprintDTOErr - } - - return nil -} - -func (l *LevelDBMetricPersistence) GetAllLabelNames() ([]string, error) { - if getAll, getAllError := l.labelNameToFingerprints.GetAll(); getAllError == nil { - result := make([]string, 0, len(getAll)) - labelNameDTO := &data.LabelName{} - - for _, pair := range getAll { - if unmarshalError := proto.Unmarshal(pair.Left, labelNameDTO); unmarshalError == nil { - result = append(result, *labelNameDTO.Name) - } else { - return nil, unmarshalError - } - } - - return result, nil - - } else { - return nil, getAllError - } - - return nil, errors.New("Unknown error encountered when querying label names.") -} - -func (l *LevelDBMetricPersistence) GetAllLabelPairs() ([]model.LabelSet, error) { - if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil { - result := make([]model.LabelSet, 0, len(getAll)) - labelPairDTO := &data.LabelPair{} - - for _, pair := range getAll { - if unmarshalError := proto.Unmarshal(pair.Left, labelPairDTO); unmarshalError == nil { - n := model.LabelName(*labelPairDTO.Name) - v := model.LabelValue(*labelPairDTO.Value) - item := model.LabelSet{n: v} - result = append(result, item) - } else { - return nil, unmarshalError - } - } - - return result, nil - - } else { - return nil, getAllError - } - - return nil, errors.New("Unknown error encountered when querying label pairs.") -} - -func (l *LevelDBMetricPersistence) GetAllMetrics() ([]model.LabelSet, error) { - if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil { - result := make([]model.LabelSet, 0) - fingerprintCollection := &data.FingerprintCollection{} - - fingerprints := make(utility.Set) - - for _, pair := range getAll { - if unmarshalError := proto.Unmarshal(pair.Right, fingerprintCollection); unmarshalError == nil { - for _, member := range fingerprintCollection.Member { - if !fingerprints.Has(*member.Signature) { - fingerprints.Add(*member.Signature) - fingerprintEncoded := coding.NewProtocolBufferEncoder(member) - if labelPairCollectionRaw, labelPairCollectionRawError := l.fingerprintToMetrics.Get(fingerprintEncoded); labelPairCollectionRawError == nil { - - labelPairCollectionDTO := &data.LabelSet{} - - if labelPairCollectionDTOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDTO); labelPairCollectionDTOMarshalError == nil { - intermediate := make(model.LabelSet, 0) - - for _, member := range labelPairCollectionDTO.Member { - n := model.LabelName(*member.Name) - v := model.LabelValue(*member.Value) - intermediate[n] = v - } - - result = append(result, intermediate) - } else { - return nil, labelPairCollectionDTOMarshalError - } - } else { - return nil, labelPairCollectionRawError - } - } - } - } else { - return nil, unmarshalError - } - } - return result, nil - } else { - return nil, getAllError - } - - return nil, errors.New("Unknown error encountered when querying metrics.") -} - -func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { - metricDTO := model.MetricToDTO(&metric) - - if fingerprintDTO, fingerprintDTOErr := fingerprintDTOForMessage(metricDTO); fingerprintDTOErr == nil { - if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil { - defer closer.Close() - - start := &data.SampleKey{ - Fingerprint: fingerprintDTO, - Timestamp: indexable.EncodeTime(interval.OldestInclusive), - } - - emission := make([]model.Samples, 0) - - if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil { - iterator.Seek(encode) - - for iterator = iterator; iterator.Valid(); iterator.Next() { - key := &data.SampleKey{} - value := &data.SampleValue{} - if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil { - if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil { - if *fingerprintDTO.Signature == *key.Fingerprint.Signature { - // Wart - if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() { - emission = append(emission, model.Samples{ - Value: model.SampleValue(*value.Value), - Timestamp: indexable.DecodeTime(key.Timestamp), - }) - } else { - break - } - } else { - break - } - } else { - return nil, valueUnmarshalErr - } - } else { - return nil, keyUnmarshalErr - } - } - - return emission, nil - - } else { - log.Printf("Could not encode the start key: %q\n", encodeErr) - return nil, encodeErr - } - } else { - log.Printf("Could not acquire iterator: %q\n", iteratorErr) - return nil, iteratorErr - } - } else { - log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr) - return nil, fingerprintDTOErr - } - - panic("unreachable") -} - -func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error) { - emission := make([]*model.Fingerprint, 0, 0) - - for _, labelSetDTO := range model.LabelSetToDTOs(labelSet) { - if f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)); err == nil { - unmarshaled := &data.FingerprintCollection{} - if unmarshalErr := proto.Unmarshal(f, unmarshaled); unmarshalErr == nil { - for _, m := range unmarshaled.Member { - fp := model.Fingerprint(*m.Signature) - emission = append(emission, &fp) - } - } else { - return nil, err - } - } else { - return nil, err - } - } - - return emission, nil -} - -func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error) { - emission := make([]*model.Fingerprint, 0, 0) - - if raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(labelName))); err == nil { - - unmarshaled := &data.FingerprintCollection{} - - if err = proto.Unmarshal(raw, unmarshaled); err == nil { - for _, m := range unmarshaled.Member { - fp := model.Fingerprint(*m.Signature) - emission = append(emission, &fp) - } - } else { - return nil, err - } - } else { - return nil, err - } - - return emission, nil -} diff --git a/storage/metric/leveldb/leveldb_test.go b/storage/metric/leveldb/leveldb_test.go index d78fe1ac2d..2bef4b5c2e 100644 --- a/storage/metric/leveldb/leveldb_test.go +++ b/storage/metric/leveldb/leveldb_test.go @@ -17,7 +17,7 @@ import ( "code.google.com/p/goprotobuf/proto" "fmt" "github.com/matttproud/prometheus/model" - data "github.com/matttproud/prometheus/model/generated" + dto "github.com/matttproud/prometheus/model/generated" "io/ioutil" "math" "math/rand" @@ -31,7 +31,12 @@ const ( stochasticMaximumVariance = 64 ) -func TestBasicLifecycle(t *testing.T) { +type tester interface { + Errorf(format string, args ...interface{}) + Error(args ...interface{}) +} + +var testBasicLifecycle func(t tester) = func(t tester) { temporaryDirectory, temporaryDirectoryErr := ioutil.TempDir("", "leveldb_metric_persistence_test") if temporaryDirectoryErr != nil { @@ -63,7 +68,17 @@ func TestBasicLifecycle(t *testing.T) { } } -func TestReadEmpty(t *testing.T) { +func TestBasicLifecycle(t *testing.T) { + testBasicLifecycle(t) +} + +func BenchmarkBasicLifecycle(b *testing.B) { + for i := 0; i < b.N; i++ { + testBasicLifecycle(b) + } +} + +var testReadEmpty func(t tester) = func(t tester) { temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") defer func() { @@ -82,7 +97,7 @@ func TestReadEmpty(t *testing.T) { name := string(x) value := string(x) - dto := &data.LabelPair{ + dto := &dto.LabelPair{ Name: proto.String(name), Value: proto.String(value), } @@ -102,7 +117,7 @@ func TestReadEmpty(t *testing.T) { hasLabelName := func(x int) bool { name := string(x) - dto := &data.LabelName{ + dto := &dto.LabelName{ Name: proto.String(name), } @@ -123,7 +138,7 @@ func TestReadEmpty(t *testing.T) { name := string(x) value := string(x) - dto := &data.LabelPair{ + dto := &dto.LabelPair{ Name: proto.String(name), Value: proto.String(value), } @@ -148,7 +163,7 @@ func TestReadEmpty(t *testing.T) { getLabelNameFingerprints := func(x int) bool { name := string(x) - dto := &data.LabelName{ + dto := &dto.LabelName{ Name: proto.String(name), } @@ -170,7 +185,17 @@ func TestReadEmpty(t *testing.T) { } } -func TestAppendSampleAsPureSparseAppend(t *testing.T) { +func TestReadEmpty(t *testing.T) { + testReadEmpty(t) +} + +func BenchmarkReadEmpty(b *testing.B) { + for i := 0; i < b.N; i++ { + testReadEmpty(b) + } +} + +var testAppendSampleAsPureSparseAppend = func(t tester) { temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") defer func() { @@ -206,7 +231,17 @@ func TestAppendSampleAsPureSparseAppend(t *testing.T) { } } -func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { +func TestAppendSampleAsPureSparseAppend(t *testing.T) { + testAppendSampleAsPureSparseAppend(t) +} + +func BenchmarkAppendSampleAsPureSparseAppend(b *testing.B) { + for i := 0; i < b.N; i++ { + testAppendSampleAsPureSparseAppend(b) + } +} + +var testAppendSampleAsSparseAppendWithReads func(t tester) = func(t tester) { temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") defer func() { @@ -238,7 +273,7 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { return false } - labelNameDTO := &data.LabelName{ + labelNameDTO := &dto.LabelName{ Name: proto.String(string(x)), } @@ -252,7 +287,7 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { return false } - labelPairDTO := &data.LabelPair{ + labelPairDTO := &dto.LabelPair{ Name: proto.String(string(x)), Value: proto.String(string(x)), } @@ -303,6 +338,16 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { } } +func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { + testAppendSampleAsSparseAppendWithReads(t) +} + +func BenchmarkAppendSampleAsSparseAppendWithReads(b *testing.B) { + for i := 0; i < b.N; i++ { + testAppendSampleAsSparseAppendWithReads(b) + } +} + func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) { temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") @@ -429,7 +474,7 @@ func TestStochastic(t *testing.T) { metricNewestSample[metricIndex] = newestSample for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ { - labelPair := &data.LabelPair{ + labelPair := &dto.LabelPair{ Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)), Value: proto.String(fmt.Sprintf("label_%d", sharedLabelIndex)), } @@ -444,7 +489,7 @@ func TestStochastic(t *testing.T) { return false } - labelName := &data.LabelName{ + labelName := &dto.LabelName{ Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)), } @@ -461,7 +506,7 @@ func TestStochastic(t *testing.T) { } for sharedIndex := 0; sharedIndex < numberOfSharedLabels; sharedIndex++ { - labelName := &data.LabelName{ + labelName := &dto.LabelName{ Name: proto.String(fmt.Sprintf("shared_label_%d", sharedIndex)), } fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(labelName) @@ -481,7 +526,7 @@ func TestStochastic(t *testing.T) { for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ { for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ { - labelPair := &data.LabelPair{ + labelPair := &dto.LabelPair{ Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)), Value: proto.String(fmt.Sprintf("private_label_%d", unsharedLabelIndex)), } @@ -510,7 +555,7 @@ func TestStochastic(t *testing.T) { return false } - labelName := &data.LabelName{ + labelName := &dto.LabelName{ Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)), } @@ -793,3 +838,99 @@ func TestGetFingerprintsForLabelName(t *testing.T) { t.Errorf("Expected one element.") } } + +func TestGetMetricForFingerprint(t *testing.T) { + temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") + + defer func() { + if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { + t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) + } + }() + + persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + appendErr := persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(0), + Timestamp: time.Unix(0, 0), + Labels: model.LabelSet{ + "request_type": "your_mom", + }, + }) + + if appendErr != nil { + t.Error(appendErr) + } + + appendErr = persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(0), + Timestamp: time.Unix(int64(0), 0), + Labels: model.LabelSet{ + "request_type": "your_dad", + "one-off": "value", + }, + }) + + if appendErr != nil { + t.Error(appendErr) + } + + result, getErr := persistence.GetFingerprintsForLabelSet(&(model.LabelSet{ + model.LabelName("request_type"): model.LabelValue("your_mom"), + })) + + if getErr != nil { + t.Error(getErr) + } + + if len(result) != 1 { + t.Errorf("Expected one element.") + } + + v, e := persistence.GetMetricForFingerprint(result[0]) + if e != nil { + t.Error(e) + } + + if len(*v) != 1 { + t.Errorf("Expected one-dimensional metric.") + } + + if (*v)["request_type"] != "your_mom" { + t.Errorf("Expected metric to match.") + } + + result, getErr = persistence.GetFingerprintsForLabelSet(&(model.LabelSet{ + model.LabelName("request_type"): model.LabelValue("your_dad"), + })) + + if getErr != nil { + t.Error(getErr) + } + + if len(result) != 1 { + t.Errorf("Expected one element.") + } + + v, e = persistence.GetMetricForFingerprint(result[0]) + + if e != nil { + t.Error(e) + } + + if len(*v) != 2 { + t.Errorf("Expected one-dimensional metric.") + } + + if (*v)["request_type"] != "your_dad" { + t.Errorf("Expected metric to match.") + } + + if (*v)["one-off"] != "value" { + t.Errorf("Expected metric to match.") + } +} diff --git a/storage/metric/leveldb/lifecycle.go b/storage/metric/leveldb/lifecycle.go new file mode 100644 index 0000000000..a9938193a5 --- /dev/null +++ b/storage/metric/leveldb/lifecycle.go @@ -0,0 +1,164 @@ +// Copyright 2012 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + index "github.com/matttproud/prometheus/storage/raw/index/leveldb" + storage "github.com/matttproud/prometheus/storage/raw/leveldb" + "io" + "log" +) + +type leveldbOpener func() + +func (l *LevelDBMetricPersistence) Close() error { + log.Printf("Closing LevelDBPersistence storage containers...") + + var persistences = []struct { + name string + closer io.Closer + }{ + { + "Fingerprint to Label Name and Value Pairs", + l.fingerprintToMetrics, + }, + { + "Fingerprint Samples", + l.metricSamples, + }, + { + "Label Name to Fingerprints", + l.labelNameToFingerprints, + }, + { + "Label Name and Value Pairs to Fingerprints", + l.labelSetToFingerprints, + }, + { + "Metric Membership Index", + l.metricMembershipIndex, + }, + } + + errorChannel := make(chan error, len(persistences)) + + for _, persistence := range persistences { + name := persistence.name + closer := persistence.closer + + go func(name string, closer io.Closer) { + if closer != nil { + log.Printf("Closing LevelDBPersistence storage container: %s\n", name) + closingError := closer.Close() + + if closingError != nil { + log.Printf("Could not close a LevelDBPersistence storage container; inconsistencies are possible: %q\n", closingError) + } + + errorChannel <- closingError + } else { + errorChannel <- nil + } + }(name, closer) + } + + for i := 0; i < cap(errorChannel); i++ { + closingError := <-errorChannel + + if closingError != nil { + return closingError + } + } + + log.Printf("Successfully closed all LevelDBPersistence storage containers.") + + return nil +} + +func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) { + log.Printf("Opening LevelDBPersistence storage containers...") + + errorChannel := make(chan error, 5) + + emission := &LevelDBMetricPersistence{} + + var subsystemOpeners = []struct { + name string + opener leveldbOpener + }{ + { + "Label Names and Value Pairs by Fingerprint", + func() { + var err error + emission.fingerprintToMetrics, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) + errorChannel <- err + }, + }, + { + "Samples by Fingerprint", + func() { + var err error + emission.metricSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) + errorChannel <- err + }, + }, + { + "Fingerprints by Label Name", + func() { + var err error + emission.labelNameToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) + errorChannel <- err + }, + }, + { + "Fingerprints by Label Name and Value Pair", + func() { + var err error + emission.labelSetToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) + errorChannel <- err + }, + }, + { + "Metric Membership Index", + func() { + var err error + emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10) + errorChannel <- err + }, + }, + } + + for _, subsystem := range subsystemOpeners { + name := subsystem.name + opener := subsystem.opener + + log.Printf("Opening LevelDBPersistence storage container: %s\n", name) + + go opener() + } + + for i := 0; i < cap(errorChannel); i++ { + openingError := <-errorChannel + + if openingError != nil { + log.Printf("Could not open a LevelDBPersistence storage container: %q\n", openingError) + + return nil, openingError + } + } + + log.Printf("Successfully opened all LevelDBPersistence storage containers.\n") + + return emission, nil +} diff --git a/storage/metric/leveldb/mutable.go b/storage/metric/leveldb/mutable.go new file mode 100644 index 0000000000..bcff51efa6 --- /dev/null +++ b/storage/metric/leveldb/mutable.go @@ -0,0 +1,180 @@ +// Copyright 2012 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "code.google.com/p/goprotobuf/proto" + "errors" + "github.com/matttproud/prometheus/coding" + "github.com/matttproud/prometheus/coding/indexable" + "github.com/matttproud/prometheus/model" + dto "github.com/matttproud/prometheus/model/generated" + "log" +) + +func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *dto.LabelPair, fingerprints *dto.FingerprintCollection) error { + labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair) + fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) + return l.labelSetToFingerprints.Put(labelPairEncoded, fingerprintsEncoded) +} + +func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *dto.LabelName, fingerprints *dto.FingerprintCollection) error { + labelNameEncoded := coding.NewProtocolBufferEncoder(labelName) + fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) + return l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded) +} + +func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) error { + if has, hasError := l.HasLabelPair(labelPair); hasError == nil { + var fingerprints *dto.FingerprintCollection + if has { + if existing, existingError := l.getFingerprintsForLabelSet(labelPair); existingError == nil { + fingerprints = existing + } else { + return existingError + } + } else { + fingerprints = &dto.FingerprintCollection{} + } + + fingerprints.Member = append(fingerprints.Member, fingerprint) + + return l.setLabelPairFingerprints(labelPair, fingerprints) + } else { + return hasError + } + + return errors.New("Unknown error when appending fingerprint to label name and value pair.") +} + +func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) error { + labelName := &dto.LabelName{ + Name: labelPair.Name, + } + + if has, hasError := l.HasLabelName(labelName); hasError == nil { + var fingerprints *dto.FingerprintCollection + if has { + if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil { + fingerprints = existing + } else { + return existingError + } + } else { + fingerprints = &dto.FingerprintCollection{} + } + + fingerprints.Member = append(fingerprints.Member, fingerprint) + + return l.setLabelNameFingerprints(labelName, fingerprints) + } else { + return hasError + } + + return errors.New("Unknown error when appending fingerprint to label name and value pair.") +} + +func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) error { + if fingerprintDTO, fingerprintDTOError := model.MessageToFingerprintDTO(m); fingerprintDTOError == nil { + fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO) + metricDTOEncoder := coding.NewProtocolBufferEncoder(m) + + if putError := l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder); putError == nil { + labelCount := len(m.LabelPair) + labelPairErrors := make(chan error, labelCount) + labelNameErrors := make(chan error, labelCount) + + for _, labelPair := range m.LabelPair { + go func(labelPair *dto.LabelPair) { + labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO) + }(labelPair) + + go func(labelPair *dto.LabelPair) { + labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDTO) + }(labelPair) + } + + for i := 0; i < cap(labelPairErrors); i++ { + appendError := <-labelPairErrors + + if appendError != nil { + return appendError + } + } + + for i := 0; i < cap(labelNameErrors); i++ { + appendError := <-labelNameErrors + + if appendError != nil { + return appendError + } + } + + return nil + + } else { + return putError + } + } else { + return fingerprintDTOError + } + + return errors.New("Unknown error in appending label pairs to fingerprint.") +} + +func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error { + metricDTO := model.SampleToMetricDTO(sample) + + if indexHas, indexHasError := l.hasIndexMetric(metricDTO); indexHasError == nil { + if !indexHas { + if indexPutError := l.indexMetric(metricDTO); indexPutError == nil { + if appendError := l.appendFingerprints(metricDTO); appendError != nil { + log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError) + return appendError + } + } else { + log.Printf("Could not add metric to membership index: %q\n", indexPutError) + return indexPutError + } + } + } else { + log.Printf("Could not query membership index for metric: %q\n", indexHasError) + return indexHasError + } + + if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil { + + sampleKeyDTO := &dto.SampleKey{ + Fingerprint: fingerprintDTO, + Timestamp: indexable.EncodeTime(sample.Timestamp), + } + + sampleValueDTO := &dto.SampleValue{ + Value: proto.Float32(float32(sample.Value)), + } + + sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO) + sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO) + + if putError := l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil { + log.Printf("Could not append metric sample: %q\n", putError) + return putError + } + } else { + log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDTOErr) + return fingerprintDTOErr + } + + return nil +} diff --git a/storage/metric/leveldb/reading.go b/storage/metric/leveldb/reading.go new file mode 100644 index 0000000000..0c6c8974cc --- /dev/null +++ b/storage/metric/leveldb/reading.go @@ -0,0 +1,200 @@ +// Copyright 2012 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "code.google.com/p/goprotobuf/proto" + "errors" + "github.com/matttproud/prometheus/coding" + "github.com/matttproud/prometheus/coding/indexable" + "github.com/matttproud/prometheus/model" + dto "github.com/matttproud/prometheus/model/generated" + "log" +) + +func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (bool, error) { + dtoKey := coding.NewProtocolBufferEncoder(dto) + return l.metricMembershipIndex.Has(dtoKey) +} + +func (l *LevelDBMetricPersistence) indexMetric(dto *dto.Metric) error { + dtoKey := coding.NewProtocolBufferEncoder(dto) + return l.metricMembershipIndex.Put(dtoKey) +} + +func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (bool, error) { + dtoKey := coding.NewProtocolBufferEncoder(dto) + return l.labelSetToFingerprints.Has(dtoKey) +} + +func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (bool, error) { + dtoKey := coding.NewProtocolBufferEncoder(dto) + return l.labelNameToFingerprints.Has(dtoKey) +} + +func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(p *dto.LabelPair) (*dto.FingerprintCollection, error) { + dtoKey := coding.NewProtocolBufferEncoder(p) + if get, getError := l.labelSetToFingerprints.Get(dtoKey); getError == nil { + value := &dto.FingerprintCollection{} + if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { + return value, nil + } else { + return nil, unmarshalError + } + } else { + return nil, getError + } + + panic("unreachable") +} + +func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(n *dto.LabelName) (*dto.FingerprintCollection, error) { + dtoKey := coding.NewProtocolBufferEncoder(n) + if get, getError := l.labelNameToFingerprints.Get(dtoKey); getError == nil { + value := &dto.FingerprintCollection{} + if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { + return value, nil + } else { + return nil, unmarshalError + } + } else { + return nil, getError + } + + return nil, errors.New("Unknown error while getting label name fingerprints.") +} + +func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { + metricDTO := model.MetricToDTO(&metric) + + if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil { + if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil { + defer closer.Close() + + start := &dto.SampleKey{ + Fingerprint: fingerprintDTO, + Timestamp: indexable.EncodeTime(interval.OldestInclusive), + } + + emission := make([]model.Samples, 0) + + if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil { + iterator.Seek(encode) + + for iterator = iterator; iterator.Valid(); iterator.Next() { + key := &dto.SampleKey{} + value := &dto.SampleValue{} + if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil { + if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil { + if *fingerprintDTO.Signature == *key.Fingerprint.Signature { + // Wart + if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() { + emission = append(emission, model.Samples{ + Value: model.SampleValue(*value.Value), + Timestamp: indexable.DecodeTime(key.Timestamp), + }) + } else { + break + } + } else { + break + } + } else { + return nil, valueUnmarshalErr + } + } else { + return nil, keyUnmarshalErr + } + } + + return emission, nil + + } else { + log.Printf("Could not encode the start key: %q\n", encodeErr) + return nil, encodeErr + } + } else { + log.Printf("Could not acquire iterator: %q\n", iteratorErr) + return nil, iteratorErr + } + } else { + log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr) + return nil, fingerprintDTOErr + } + + panic("unreachable") +} + +func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error) { + emission := make([]*model.Fingerprint, 0, 0) + + for _, labelSetDTO := range model.LabelSetToDTOs(labelSet) { + if f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)); err == nil { + unmarshaled := &dto.FingerprintCollection{} + if unmarshalErr := proto.Unmarshal(f, unmarshaled); unmarshalErr == nil { + for _, m := range unmarshaled.Member { + fp := model.Fingerprint(*m.Signature) + emission = append(emission, &fp) + } + } else { + return nil, err + } + } else { + return nil, err + } + } + + return emission, nil +} + +func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error) { + emission := make([]*model.Fingerprint, 0, 0) + + if raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(labelName))); err == nil { + + unmarshaled := &dto.FingerprintCollection{} + + if err = proto.Unmarshal(raw, unmarshaled); err == nil { + for _, m := range unmarshaled.Member { + fp := model.Fingerprint(*m.Signature) + emission = append(emission, &fp) + } + } else { + return nil, err + } + } else { + return nil, err + } + + return emission, nil +} + +func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (*model.Metric, error) { + if raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))); err == nil { + unmarshaled := &dto.Metric{} + if unmarshalErr := proto.Unmarshal(raw, unmarshaled); unmarshalErr == nil { + m := model.Metric{} + for _, v := range unmarshaled.LabelPair { + m[model.LabelName(*v.Name)] = model.LabelValue(*v.Value) + } + return &m, nil + } else { + return nil, unmarshalErr + } + } else { + return nil, err + } + + panic("unreachable") +} diff --git a/storage/metric/leveldb/type.go b/storage/metric/leveldb/type.go new file mode 100644 index 0000000000..92d84121bb --- /dev/null +++ b/storage/metric/leveldb/type.go @@ -0,0 +1,27 @@ +// Copyright 2012 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + index "github.com/matttproud/prometheus/storage/raw/index/leveldb" + storage "github.com/matttproud/prometheus/storage/raw/leveldb" +) + +type LevelDBMetricPersistence struct { + fingerprintToMetrics *storage.LevelDBPersistence + metricSamples *storage.LevelDBPersistence + labelNameToFingerprints *storage.LevelDBPersistence + labelSetToFingerprints *storage.LevelDBPersistence + metricMembershipIndex *index.LevelDBMembershipIndex +} diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index eec9515dd6..bc3b1791e3 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -15,12 +15,12 @@ package leveldb import ( "github.com/matttproud/prometheus/coding" - data "github.com/matttproud/prometheus/model/generated" + dto "github.com/matttproud/prometheus/model/generated" "github.com/matttproud/prometheus/storage/raw/leveldb" ) var ( - existenceValue = coding.NewProtocolBufferEncoder(&data.MembershipIndexValue{}) + existenceValue = coding.NewProtocolBufferEncoder(&dto.MembershipIndexValue{}) ) type LevelDBMembershipIndex struct {