diff --git a/model/conversion.go b/model/conversion.go index 63580c454..2cf68d94f 100644 --- a/model/conversion.go +++ b/model/conversion.go @@ -22,7 +22,7 @@ import ( "sort" ) -func SampleToMetricDDO(s *Sample) *data.MetricDDO { +func SampleToMetricDTO(s *Sample) *data.Metric { labelLength := len(s.Labels) labelNames := make([]string, 0, labelLength) @@ -32,24 +32,24 @@ func SampleToMetricDDO(s *Sample) *data.MetricDDO { sort.Strings(labelNames) - labelPairs := make([]*data.LabelPairDDO, 0, labelLength) + labelSets := make([]*data.LabelPair, 0, labelLength) for _, labelName := range labelNames { - labelValue := s.Labels[labelName] - labelPair := &data.LabelPairDDO{ + labelValue := s.Labels[LabelName(labelName)] + labelPair := &data.LabelPair{ Name: proto.String(string(labelName)), Value: proto.String(string(labelValue)), } - labelPairs = append(labelPairs, labelPair) + labelSets = append(labelSets, labelPair) } - return &data.MetricDDO{ - LabelPair: labelPairs, + return &data.Metric{ + LabelPair: labelSets, } } -func MetricToMetricDDO(m *Metric) *data.MetricDDO { +func MetricToDTO(m *Metric) *data.Metric { metricLength := len(*m) labelNames := make([]string, 0, metricLength) @@ -59,26 +59,21 @@ func MetricToMetricDDO(m *Metric) *data.MetricDDO { sort.Strings(labelNames) - labelPairs := make([]*data.LabelPairDDO, 0, metricLength) + labelSets := make([]*data.LabelPair, 0, metricLength) for _, labelName := range labelNames { - labelValue := (*m)[labelName] - labelPair := &data.LabelPairDDO{ + l := LabelName(labelName) + labelValue := (*m)[l] + labelPair := &data.LabelPair{ Name: proto.String(string(labelName)), Value: proto.String(string(labelValue)), } - labelPairs = append(labelPairs, labelPair) + labelSets = append(labelSets, labelPair) } - return &data.MetricDDO{ - LabelPair: labelPairs, - } -} - -func BytesToFingerprintDDO(b []byte) *data.FingerprintDDO { - return &data.FingerprintDDO{ - Signature: proto.String(string(b)), + return &data.Metric{ + LabelPair: labelSets, } } @@ -93,3 +88,41 @@ func BytesToFingerprint(v []byte) Fingerprint { hash.Write(v) return Fingerprint(hex.EncodeToString(hash.Sum([]byte{}))) } + +func LabelSetToDTOs(s *LabelSet) []*data.LabelPair { + metricLength := len(*s) + labelNames := make([]string, 0, metricLength) + + for labelName := range *s { + labelNames = append(labelNames, string(labelName)) + } + + sort.Strings(labelNames) + + labelSets := make([]*data.LabelPair, 0, metricLength) + + for _, labelName := range labelNames { + l := LabelName(labelName) + labelValue := (*s)[l] + labelPair := &data.LabelPair{ + Name: proto.String(string(labelName)), + Value: proto.String(string(labelValue)), + } + + labelSets = append(labelSets, labelPair) + } + + return labelSets +} + +func LabelSetToDTO(s *LabelSet) *data.LabelSet { + return &data.LabelSet{ + Member: LabelSetToDTOs(s), + } +} + +func LabelNameToDTO(l *LabelName) *data.LabelName { + return &data.LabelName{ + Name: proto.String(string(*l)), + } +} diff --git a/model/data.proto b/model/data.proto index 3dc02bd28..7a957ea4d 100644 --- a/model/data.proto +++ b/model/data.proto @@ -11,89 +11,58 @@ // See the License for the specific language governing permissions and // limitations under the License. -package generated; +package dto; -message LabelPairDDO { +message LabelPair { optional int64 version = 1 [default = 1]; optional string name = 2; optional string value = 3; } -message MetricDDO { - optional int64 version = 1 [default = 1]; - - repeated LabelPairDDO label_pair = 2; -} - -message FingerprintCollectionDDO { - optional int64 version = 1 [default = 1]; - - repeated FingerprintDDO member = 2; -} - -message LabelPairCollectionDDO { - optional int64 version = 1 [default = 1]; - - repeated LabelPairDDO member = 2; -} - -message LabelNameDDO { +message LabelName { optional int64 version = 1 [default = 1]; optional string name = 2; } -message FingerprintDDO { +message Metric { + optional int64 version = 1 [default = 1]; + + repeated LabelPair label_pair = 2; +} + +message Fingerprint { optional int64 version = 1 [default = 1]; - // bytes optional string signature = 2; } -message WatermarkDDO { +message FingerprintCollection { optional int64 version = 1 [default = 1]; - optional int64 timestamp = 2; + repeated Fingerprint member = 2; } -message SampleKeyDDO { +message LabelSet { optional int64 version = 1 [default = 1]; - optional FingerprintDDO fingerprint = 2; + repeated LabelPair member = 2; +} + +message SampleKey { + optional int64 version = 1 [default = 1]; + + optional Fingerprint fingerprint = 2; optional bytes timestamp = 3; } -message SampleValueDDO { +message SampleValue { optional int64 version = 1 [default = 1]; optional float value = 2; } - -// TOO OLD - -message MembershipIndexValueDDO { -} - - -message LabelNameAndValueIndexDDO { - optional string name = 1; - optional string value = 2; -} - -message LabelNameValuesDDO { - repeated string value = 1; -} - -message LabelNameAndValueToMetricDDO { - repeated string metric = 1; -} - -message MetricToWindowDDO { - repeated int64 window = 1; -} - -message WindowSampleDDO { - repeated float value = 1; +message MembershipIndexValue { + optional int64 version = 1 [default = 1]; } diff --git a/model/metric.go b/model/metric.go index 4b0e5e645..d92cd46ef 100644 --- a/model/metric.go +++ b/model/metric.go @@ -17,15 +17,36 @@ import ( "time" ) +// A Fingerprint is a simplified representation of an entity---e.g., a hash of +// an entire Metric. type Fingerprint string -type LabelPairs map[string]string -type Metric map[string]string +// A LabelName is a key for a LabelSet or Metric. It has a value associated +// therewith. +type LabelName string +// A LabelValue is an associated value for a LabelName. +type LabelValue string + +// A LabelSet is a collection of LabelName and LabelValue pairs. The LabelSet +// may be fully-qualified down to the point where it may resolve to a single +// Metric in the data store or not. All operations that occur within the realm +// of a LabelSet can emit a vector of Metric entities to which the LabelSet may +// match. +type LabelSet map[LabelName]LabelValue + +// A Metric is similar to a LabelSet, but the key difference is that a Metric is +// a singleton and refers to one and only one stream of samples. +type Metric map[LabelName]LabelValue + +// A SampleValue is a representation of a value for a given sample at a given +// time. It is presently float32 due to that being the representation that +// Protocol Buffers provide of floats in Go. This is a smell and should be +// remedied down the road. type SampleValue float32 type Sample struct { - Labels LabelPairs + Labels LabelSet Value SampleValue Timestamp time.Time } diff --git a/service.go b/service.go index e86c46731..ab42dd935 100644 --- a/service.go +++ b/service.go @@ -32,7 +32,7 @@ type MetricsService struct { } func (m MetricsService) ListLabels() []string { - labels, labelsError := m.persistence.GetLabelNames() + labels, labelsError := m.persistence.GetAllLabelNames() if labelsError != nil { m.ResponseBuilder().SetResponseCode(500) @@ -41,18 +41,18 @@ func (m MetricsService) ListLabels() []string { return labels } -func (m MetricsService) ListLabelPairs() []model.LabelPairs { - labelPairs, labelPairsError := m.persistence.GetLabelPairs() +func (m MetricsService) ListLabelPairs() []model.LabelSet { + labelSets, labelPairsError := m.persistence.GetAllLabelPairs() if labelPairsError != nil { m.ResponseBuilder().SetResponseCode(500) } - return labelPairs + return labelSets } -func (m MetricsService) ListMetrics() []model.LabelPairs { - metrics, metricsError := m.persistence.GetMetrics() +func (m MetricsService) ListMetrics() []model.LabelSet { + metrics, metricsError := m.persistence.GetAllMetrics() if metricsError != nil { m.ResponseBuilder().SetResponseCode(500) diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 495fb70ed..109a40619 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -17,17 +17,31 @@ import ( "github.com/matttproud/prometheus/model" ) +// MetricPersistence is a system for storing metric samples in a persistence +// layer. type MetricPersistence interface { + // A storage system may rely on external resources and thusly should be + // closed when finished. Close() error + // Record a new sample in the storage layer. AppendSample(sample *model.Sample) error - GetLabelNames() ([]string, error) - GetLabelPairs() ([]model.LabelPairs, error) - GetMetrics() ([]model.LabelPairs, error) + // Get all of the metric fingerprints that are associated with the provided + // label set. + GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error) + GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error) - GetMetricFingerprintsForLabelPairs(labelSets []*model.LabelPairs) ([]*model.Fingerprint, error) - GetFingerprintLabelPairs(fingerprint model.Fingerprint) (model.LabelPairs, error) + GetAllLabelNames() ([]string, error) + GetAllLabelPairs() ([]model.LabelSet, error) + GetAllMetrics() ([]model.LabelSet, error) + + // // BEGIN QUERY PRIMITIVES + // + // GetMetricForFingerprint() + // GetMetricWatermarks(metrics ...) (watermarks ...) + // GetMetricValuesForIntervals(metric, interval) (values ...) + // GetMetricValueLast() + // // END QUERY PRIMITIVES - RecordFingerprintWatermark(sample *model.Sample) error } diff --git a/storage/metric/leveldb/leveldb.go b/storage/metric/leveldb/leveldb.go index 2c5d8b4ec..c477b4182 100644 --- a/storage/metric/leveldb/leveldb.go +++ b/storage/metric/leveldb/leveldb.go @@ -28,13 +28,11 @@ import ( ) type LevelDBMetricPersistence struct { - fingerprintHighWaterMarks *storage.LevelDBPersistence - fingerprintLabelPairs *storage.LevelDBPersistence - fingerprintLowWaterMarks *storage.LevelDBPersistence - fingerprintSamples *storage.LevelDBPersistence - labelNameFingerprints *storage.LevelDBPersistence - labelPairFingerprints *storage.LevelDBPersistence - metricMembershipIndex *index.LevelDBMembershipIndex + fingerprintToMetrics *storage.LevelDBPersistence + metricSamples *storage.LevelDBPersistence + labelNameToFingerprints *storage.LevelDBPersistence + labelSetToFingerprints *storage.LevelDBPersistence + metricMembershipIndex *index.LevelDBMembershipIndex } type leveldbOpener func() @@ -46,29 +44,21 @@ func (l *LevelDBMetricPersistence) Close() error { name string closer io.Closer }{ - { - "Fingerprint High-Water Marks", - l.fingerprintHighWaterMarks, - }, { "Fingerprint to Label Name and Value Pairs", - l.fingerprintLabelPairs, - }, - { - "Fingerprint Low-Water Marks", - l.fingerprintLowWaterMarks, + l.fingerprintToMetrics, }, { "Fingerprint Samples", - l.fingerprintSamples, + l.metricSamples, }, { "Label Name to Fingerprints", - l.labelNameFingerprints, + l.labelNameToFingerprints, }, { "Label Name and Value Pairs to Fingerprints", - l.labelPairFingerprints, + l.labelSetToFingerprints, }, { "Metric Membership Index", @@ -114,7 +104,7 @@ func (l *LevelDBMetricPersistence) Close() error { func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) { log.Printf("Opening LevelDBPersistence storage containers...") - errorChannel := make(chan error, 7) + errorChannel := make(chan error, 5) emission := &LevelDBMetricPersistence{} @@ -122,27 +112,11 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc name string opener leveldbOpener }{ - { - "High-Water Marks by Fingerprint", - func() { - var err error - emission.fingerprintHighWaterMarks, err = storage.NewLevelDBPersistence(baseDirectory+"/high_water_marks_by_fingerprint", 1000000, 10) - errorChannel <- err - }, - }, { "Label Names and Value Pairs by Fingerprint", func() { var err error - emission.fingerprintLabelPairs, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) - errorChannel <- err - }, - }, - { - "Low-Water Marks by Fingerprint", - func() { - var err error - emission.fingerprintLowWaterMarks, err = storage.NewLevelDBPersistence(baseDirectory+"/low_water_marks_by_fingerprint", 1000000, 10) + emission.fingerprintToMetrics, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) errorChannel <- err }, }, @@ -150,7 +124,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Samples by Fingerprint", func() { var err error - emission.fingerprintSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) + emission.metricSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) errorChannel <- err }, }, @@ -158,7 +132,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Fingerprints by Label Name", func() { var err error - emission.labelNameFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) + emission.labelNameToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) errorChannel <- err }, }, @@ -166,7 +140,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Fingerprints by Label Name and Value Pair", func() { var err error - emission.labelPairFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) + emission.labelSetToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) errorChannel <- err }, }, @@ -204,43 +178,44 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc return emission, nil } -func (l *LevelDBMetricPersistence) hasIndexMetric(ddo *data.MetricDDO) (bool, error) { - ddoKey := coding.NewProtocolBufferEncoder(ddo) - return l.metricMembershipIndex.Has(ddoKey) +func (l *LevelDBMetricPersistence) hasIndexMetric(dto *data.Metric) (bool, error) { + dtoKey := coding.NewProtocolBufferEncoder(dto) + return l.metricMembershipIndex.Has(dtoKey) } -func (l *LevelDBMetricPersistence) indexMetric(ddo *data.MetricDDO) error { - ddoKey := coding.NewProtocolBufferEncoder(ddo) - return l.metricMembershipIndex.Put(ddoKey) +func (l *LevelDBMetricPersistence) indexMetric(dto *data.Metric) error { + dtoKey := coding.NewProtocolBufferEncoder(dto) + return l.metricMembershipIndex.Put(dtoKey) } -func fingerprintDDOForMessage(message proto.Message) (*data.FingerprintDDO, error) { +// TODO(mtp): Candidate for refactoring. +func fingerprintDTOForMessage(message proto.Message) (*data.Fingerprint, error) { if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil { fingerprint := model.BytesToFingerprint(messageByteArray) - return &data.FingerprintDDO{ + return &data.Fingerprint{ Signature: proto.String(string(fingerprint)), }, nil } else { return nil, marshalError } - return nil, errors.New("Unknown error in generating FingerprintDDO from message.") + return nil, errors.New("Unknown error in generating FingerprintDTO from message.") } -func (l *LevelDBMetricPersistence) HasLabelPair(ddo *data.LabelPairDDO) (bool, error) { - ddoKey := coding.NewProtocolBufferEncoder(ddo) - return l.labelPairFingerprints.Has(ddoKey) +func (l *LevelDBMetricPersistence) HasLabelPair(dto *data.LabelPair) (bool, error) { + dtoKey := coding.NewProtocolBufferEncoder(dto) + return l.labelSetToFingerprints.Has(dtoKey) } -func (l *LevelDBMetricPersistence) HasLabelName(ddo *data.LabelNameDDO) (bool, error) { - ddoKey := coding.NewProtocolBufferEncoder(ddo) - return l.labelNameFingerprints.Has(ddoKey) +func (l *LevelDBMetricPersistence) HasLabelName(dto *data.LabelName) (bool, error) { + dtoKey := coding.NewProtocolBufferEncoder(dto) + return l.labelNameToFingerprints.Has(dtoKey) } -func (l *LevelDBMetricPersistence) GetLabelPairFingerprints(ddo *data.LabelPairDDO) (*data.FingerprintCollectionDDO, error) { - ddoKey := coding.NewProtocolBufferEncoder(ddo) - if get, getError := l.labelPairFingerprints.Get(ddoKey); getError == nil { - value := &data.FingerprintCollectionDDO{} +func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(dto *data.LabelPair) (*data.FingerprintCollection, error) { + dtoKey := coding.NewProtocolBufferEncoder(dto) + if get, getError := l.labelSetToFingerprints.Get(dtoKey); getError == nil { + value := &data.FingerprintCollection{} if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { return value, nil } else { @@ -249,13 +224,14 @@ func (l *LevelDBMetricPersistence) GetLabelPairFingerprints(ddo *data.LabelPairD } else { return nil, getError } - return nil, errors.New("Unknown error while getting label name and value pair fingerprints.") + + panic("unreachable") } -func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(ddo *data.LabelNameDDO) (*data.FingerprintCollectionDDO, error) { - ddoKey := coding.NewProtocolBufferEncoder(ddo) - if get, getError := l.labelNameFingerprints.Get(ddoKey); getError == nil { - value := &data.FingerprintCollectionDDO{} +func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(dto *data.LabelName) (*data.FingerprintCollection, error) { + dtoKey := coding.NewProtocolBufferEncoder(dto) + if get, getError := l.labelNameToFingerprints.Get(dtoKey); getError == nil { + value := &data.FingerprintCollection{} if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { return value, nil } else { @@ -268,29 +244,29 @@ func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(ddo *data.LabelNameD return nil, errors.New("Unknown error while getting label name fingerprints.") } -func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *data.LabelPairDDO, fingerprints *data.FingerprintCollectionDDO) error { +func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *data.LabelPair, fingerprints *data.FingerprintCollection) error { labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair) fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) - return l.labelPairFingerprints.Put(labelPairEncoded, fingerprintsEncoded) + return l.labelSetToFingerprints.Put(labelPairEncoded, fingerprintsEncoded) } -func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *data.LabelNameDDO, fingerprints *data.FingerprintCollectionDDO) error { +func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *data.LabelName, fingerprints *data.FingerprintCollection) error { labelNameEncoded := coding.NewProtocolBufferEncoder(labelName) fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) - return l.labelNameFingerprints.Put(labelNameEncoded, fingerprintsEncoded) + return l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded) } -func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error { +func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *data.LabelPair, fingerprint *data.Fingerprint) error { if has, hasError := l.HasLabelPair(labelPair); hasError == nil { - var fingerprints *data.FingerprintCollectionDDO + var fingerprints *data.FingerprintCollection if has { - if existing, existingError := l.GetLabelPairFingerprints(labelPair); existingError == nil { + if existing, existingError := l.getFingerprintsForLabelSet(labelPair); existingError == nil { fingerprints = existing } else { return existingError } } else { - fingerprints = &data.FingerprintCollectionDDO{} + fingerprints = &data.FingerprintCollection{} } fingerprints.Member = append(fingerprints.Member, fingerprint) @@ -303,13 +279,13 @@ func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *data.La return errors.New("Unknown error when appending fingerprint to label name and value pair.") } -func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error { - labelName := &data.LabelNameDDO{ +func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *data.LabelPair, fingerprint *data.Fingerprint) error { + labelName := &data.LabelName{ Name: labelPair.Name, } if has, hasError := l.HasLabelName(labelName); hasError == nil { - var fingerprints *data.FingerprintCollectionDDO + var fingerprints *data.FingerprintCollection if has { if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil { fingerprints = existing @@ -317,7 +293,7 @@ func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *data.La return existingError } } else { - fingerprints = &data.FingerprintCollectionDDO{} + fingerprints = &data.FingerprintCollection{} } fingerprints.Member = append(fingerprints.Member, fingerprint) @@ -330,26 +306,23 @@ func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *data.La return errors.New("Unknown error when appending fingerprint to label name and value pair.") } -func (l *LevelDBMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error { - if fingerprintDDO, fingerprintDDOError := fingerprintDDOForMessage(ddo); fingerprintDDOError == nil { - labelPairCollectionDDO := &data.LabelPairCollectionDDO{ - Member: ddo.LabelPair, - } - fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDDO) - labelPairCollectionDDOEncoder := coding.NewProtocolBufferEncoder(labelPairCollectionDDO) +func (l *LevelDBMetricPersistence) appendFingerprints(dto *data.Metric) error { + if fingerprintDTO, fingerprintDTOError := fingerprintDTOForMessage(dto); fingerprintDTOError == nil { + fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO) + metricDTOEncoder := coding.NewProtocolBufferEncoder(dto) - if putError := l.fingerprintLabelPairs.Put(fingerprintKey, labelPairCollectionDDOEncoder); putError == nil { - labelCount := len(ddo.LabelPair) + if putError := l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder); putError == nil { + labelCount := len(dto.LabelPair) labelPairErrors := make(chan error, labelCount) labelNameErrors := make(chan error, labelCount) - for _, labelPair := range ddo.LabelPair { - go func(labelPair *data.LabelPairDDO) { - labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDDO) + for _, labelPair := range dto.LabelPair { + go func(labelPair *data.LabelPair) { + labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO) }(labelPair) - go func(labelPair *data.LabelPairDDO) { - labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDDO) + go func(labelPair *data.LabelPair) { + labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDTO) }(labelPair) } @@ -375,19 +348,19 @@ func (l *LevelDBMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error return putError } } else { - return fingerprintDDOError + return fingerprintDTOError } return errors.New("Unknown error in appending label pairs to fingerprint.") } func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error { - metricDDO := model.SampleToMetricDDO(sample) + metricDTO := model.SampleToMetricDTO(sample) - if indexHas, indexHasError := l.hasIndexMetric(metricDDO); indexHasError == nil { + if indexHas, indexHasError := l.hasIndexMetric(metricDTO); indexHasError == nil { if !indexHas { - if indexPutError := l.indexMetric(metricDDO); indexPutError == nil { - if appendError := l.appendFingerprints(metricDDO); appendError != nil { + if indexPutError := l.indexMetric(metricDTO); indexPutError == nil { + if appendError := l.appendFingerprints(metricDTO); appendError != nil { log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError) return appendError } @@ -401,40 +374,40 @@ func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error { return indexHasError } - if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { + if fingerprintDTO, fingerprintDTOErr := fingerprintDTOForMessage(metricDTO); fingerprintDTOErr == nil { - sampleKeyDDO := &data.SampleKeyDDO{ - Fingerprint: fingerprintDDO, + sampleKeyDTO := &data.SampleKey{ + Fingerprint: fingerprintDTO, Timestamp: indexable.EncodeTime(sample.Timestamp), } - sampleValueDDO := &data.SampleValueDDO{ + sampleValueDTO := &data.SampleValue{ Value: proto.Float32(float32(sample.Value)), } - sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDDO) - sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDDO) + sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO) + sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO) - if putError := l.fingerprintSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil { + if putError := l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil { log.Printf("Could not append metric sample: %q\n", putError) return putError } } else { - log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDDOErr) - return fingerprintDDOErr + log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDTOErr) + return fingerprintDTOErr } return nil } -func (l *LevelDBMetricPersistence) GetLabelNames() ([]string, error) { - if getAll, getAllError := l.labelNameFingerprints.GetAll(); getAllError == nil { +func (l *LevelDBMetricPersistence) GetAllLabelNames() ([]string, error) { + if getAll, getAllError := l.labelNameToFingerprints.GetAll(); getAllError == nil { result := make([]string, 0, len(getAll)) - labelNameDDO := &data.LabelNameDDO{} + labelNameDTO := &data.LabelName{} for _, pair := range getAll { - if unmarshalError := proto.Unmarshal(pair.Left, labelNameDDO); unmarshalError == nil { - result = append(result, *labelNameDDO.Name) + if unmarshalError := proto.Unmarshal(pair.Left, labelNameDTO); unmarshalError == nil { + result = append(result, *labelNameDTO.Name) } else { return nil, unmarshalError } @@ -449,16 +422,16 @@ func (l *LevelDBMetricPersistence) GetLabelNames() ([]string, error) { return nil, errors.New("Unknown error encountered when querying label names.") } -func (l *LevelDBMetricPersistence) GetLabelPairs() ([]model.LabelPairs, error) { - if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil { - result := make([]model.LabelPairs, 0, len(getAll)) - labelPairDDO := &data.LabelPairDDO{} +func (l *LevelDBMetricPersistence) GetAllLabelPairs() ([]model.LabelSet, error) { + if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil { + result := make([]model.LabelSet, 0, len(getAll)) + labelPairDTO := &data.LabelPair{} for _, pair := range getAll { - if unmarshalError := proto.Unmarshal(pair.Left, labelPairDDO); unmarshalError == nil { - item := model.LabelPairs{ - *labelPairDDO.Name: *labelPairDDO.Value, - } + if unmarshalError := proto.Unmarshal(pair.Left, labelPairDTO); unmarshalError == nil { + n := model.LabelName(*labelPairDTO.Name) + v := model.LabelValue(*labelPairDTO.Value) + item := model.LabelSet{n: v} result = append(result, item) } else { return nil, unmarshalError @@ -474,10 +447,10 @@ func (l *LevelDBMetricPersistence) GetLabelPairs() ([]model.LabelPairs, error) { return nil, errors.New("Unknown error encountered when querying label pairs.") } -func (l *LevelDBMetricPersistence) GetMetrics() ([]model.LabelPairs, error) { - if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil { - result := make([]model.LabelPairs, 0) - fingerprintCollection := &data.FingerprintCollectionDDO{} +func (l *LevelDBMetricPersistence) GetAllMetrics() ([]model.LabelSet, error) { + if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil { + result := make([]model.LabelSet, 0) + fingerprintCollection := &data.FingerprintCollection{} fingerprints := make(utility.Set) @@ -487,20 +460,22 @@ func (l *LevelDBMetricPersistence) GetMetrics() ([]model.LabelPairs, error) { if !fingerprints.Has(*member.Signature) { fingerprints.Add(*member.Signature) fingerprintEncoded := coding.NewProtocolBufferEncoder(member) - if labelPairCollectionRaw, labelPairCollectionRawError := l.fingerprintLabelPairs.Get(fingerprintEncoded); labelPairCollectionRawError == nil { + if labelPairCollectionRaw, labelPairCollectionRawError := l.fingerprintToMetrics.Get(fingerprintEncoded); labelPairCollectionRawError == nil { - labelPairCollectionDDO := &data.LabelPairCollectionDDO{} + labelPairCollectionDTO := &data.LabelSet{} - if labelPairCollectionDDOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDDO); labelPairCollectionDDOMarshalError == nil { - intermediate := make(model.LabelPairs, 0) + if labelPairCollectionDTOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDTO); labelPairCollectionDTOMarshalError == nil { + intermediate := make(model.LabelSet, 0) - for _, member := range labelPairCollectionDDO.Member { - intermediate[*member.Name] = *member.Value + for _, member := range labelPairCollectionDTO.Member { + n := model.LabelName(*member.Name) + v := model.LabelValue(*member.Value) + intermediate[n] = v } result = append(result, intermediate) } else { - return nil, labelPairCollectionDDOMarshalError + return nil, labelPairCollectionDTOMarshalError } } else { return nil, labelPairCollectionRawError @@ -519,86 +494,15 @@ func (l *LevelDBMetricPersistence) GetMetrics() ([]model.LabelPairs, error) { return nil, errors.New("Unknown error encountered when querying metrics.") } -func (l *LevelDBMetricPersistence) GetWatermarksForMetric(metric model.Metric) (*model.Interval, int, error) { - metricDDO := model.MetricToMetricDDO(&metric) - - if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { - if iterator, closer, iteratorErr := l.fingerprintSamples.GetIterator(); iteratorErr == nil { - defer closer.Close() - - start := &data.SampleKeyDDO{ - Fingerprint: fingerprintDDO, - Timestamp: indexable.EarliestTime, - } - - if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil { - iterator.Seek(encode) - - if iterator.Valid() { - found := &data.SampleKeyDDO{} - if unmarshalErr := proto.Unmarshal(iterator.Key(), found); unmarshalErr == nil { - var foundEntries int = 0 - - if *fingerprintDDO.Signature == *found.Fingerprint.Signature { - emission := &model.Interval{ - OldestInclusive: indexable.DecodeTime(found.Timestamp), - NewestInclusive: indexable.DecodeTime(found.Timestamp), - } - - for iterator = iterator; iterator.Valid(); iterator.Next() { - if subsequentUnmarshalErr := proto.Unmarshal(iterator.Key(), found); subsequentUnmarshalErr == nil { - if *fingerprintDDO.Signature != *found.Fingerprint.Signature { - return emission, foundEntries, nil - } - foundEntries++ - emission.NewestInclusive = indexable.DecodeTime(found.Timestamp) - } else { - log.Printf("Could not de-serialize subsequent key: %q\n", subsequentUnmarshalErr) - return nil, -7, subsequentUnmarshalErr - } - } - return emission, foundEntries, nil - } else { - return &model.Interval{}, -6, nil - } - } else { - log.Printf("Could not de-serialize start key: %q\n", unmarshalErr) - return nil, -5, unmarshalErr - } - } else { - iteratorErr := iterator.GetError() - log.Printf("Could not seek for metric watermark beginning: %q\n", iteratorErr) - return nil, -4, iteratorErr - } - } else { - log.Printf("Could not seek for metric watermark: %q\n", encodeErr) - return nil, -3, encodeErr - } - } else { - if closer != nil { - defer closer.Close() - } - - log.Printf("Could not provision iterator for metric: %q\n", iteratorErr) - return nil, -3, iteratorErr - } - } else { - log.Printf("Could not encode metric: %q\n", fingerprintDDOErr) - return nil, -2, fingerprintDDOErr - } - - return nil, -1, errors.New("Unknown error occurred while querying metric watermarks.") -} - func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { - metricDDO := model.MetricToMetricDDO(&metric) + metricDTO := model.MetricToDTO(&metric) - if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { - if iterator, closer, iteratorErr := l.fingerprintSamples.GetIterator(); iteratorErr == nil { + if fingerprintDTO, fingerprintDTOErr := fingerprintDTOForMessage(metricDTO); fingerprintDTOErr == nil { + if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil { defer closer.Close() - start := &data.SampleKeyDDO{ - Fingerprint: fingerprintDDO, + start := &data.SampleKey{ + Fingerprint: fingerprintDTO, Timestamp: indexable.EncodeTime(interval.OldestInclusive), } @@ -608,11 +512,11 @@ func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, inte iterator.Seek(encode) for iterator = iterator; iterator.Valid(); iterator.Next() { - key := &data.SampleKeyDDO{} - value := &data.SampleValueDDO{} + key := &data.SampleKey{} + value := &data.SampleValue{} if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil { if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil { - if *fingerprintDDO.Signature == *key.Fingerprint.Signature { + if *fingerprintDTO.Signature == *key.Fingerprint.Signature { // Wart if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() { emission = append(emission, model.Samples{ @@ -644,21 +548,53 @@ func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, inte return nil, iteratorErr } } else { - log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDDOErr) - return nil, fingerprintDDOErr + log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr) + return nil, fingerprintDTOErr } - return nil, errors.New("Unknown error occurred while querying metric watermarks.") + panic("unreachable") } -func (l *LevelDBMetricPersistence) GetFingerprintLabelPairs(f model.Fingerprint) (model.LabelPairs, error) { - panic("NOT IMPLEMENTED") +func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error) { + emission := make([]*model.Fingerprint, 0, 0) + + for _, labelSetDTO := range model.LabelSetToDTOs(labelSet) { + if f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)); err == nil { + unmarshaled := &data.FingerprintCollection{} + if unmarshalErr := proto.Unmarshal(f, unmarshaled); unmarshalErr == nil { + for _, m := range unmarshaled.Member { + fp := model.Fingerprint(*m.Signature) + emission = append(emission, &fp) + } + } else { + return nil, err + } + } else { + return nil, err + } + } + + return emission, nil } -func (l *LevelDBMetricPersistence) GetMetricFingerprintsForLabelPairs(p []*model.LabelPairs) ([]*model.Fingerprint, error) { - panic("NOT IMPLEMENTED") -} +func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error) { + emission := make([]*model.Fingerprint, 0, 0) -func (l *LevelDBMetricPersistence) RecordFingerprintWatermark(s *model.Sample) error { - panic("NOT IMPLEMENTED") + if raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(labelName))); err == nil { + + unmarshaled := &data.FingerprintCollection{} + + if err = proto.Unmarshal(raw, unmarshaled); err == nil { + for _, m := range unmarshaled.Member { + fp := model.Fingerprint(*m.Signature) + emission = append(emission, &fp) + } + } else { + return nil, err + } + } else { + return nil, err + } + + return emission, nil } diff --git a/storage/metric/leveldb/leveldb_test.go b/storage/metric/leveldb/leveldb_test.go index f7b937b77..d78fe1ac2 100644 --- a/storage/metric/leveldb/leveldb_test.go +++ b/storage/metric/leveldb/leveldb_test.go @@ -82,12 +82,12 @@ func TestReadEmpty(t *testing.T) { name := string(x) value := string(x) - ddo := &data.LabelPairDDO{ + dto := &data.LabelPair{ Name: proto.String(name), Value: proto.String(value), } - has, hasErr := persistence.HasLabelPair(ddo) + has, hasErr := persistence.HasLabelPair(dto) if hasErr != nil { return false @@ -102,11 +102,11 @@ func TestReadEmpty(t *testing.T) { hasLabelName := func(x int) bool { name := string(x) - ddo := &data.LabelNameDDO{ + dto := &data.LabelName{ Name: proto.String(name), } - has, hasErr := persistence.HasLabelName(ddo) + has, hasErr := persistence.HasLabelName(dto) if hasErr != nil { return false @@ -123,12 +123,12 @@ func TestReadEmpty(t *testing.T) { name := string(x) value := string(x) - ddo := &data.LabelPairDDO{ + dto := &data.LabelPair{ Name: proto.String(name), Value: proto.String(value), } - fingerprints, fingerprintsErr := persistence.GetLabelPairFingerprints(ddo) + fingerprints, fingerprintsErr := persistence.getFingerprintsForLabelSet(dto) if fingerprintsErr != nil { return false @@ -148,11 +148,11 @@ func TestReadEmpty(t *testing.T) { getLabelNameFingerprints := func(x int) bool { name := string(x) - ddo := &data.LabelNameDDO{ + dto := &data.LabelName{ Name: proto.String(name), } - fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(ddo) + fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(dto) if fingerprintsErr != nil { return false @@ -186,10 +186,14 @@ func TestAppendSampleAsPureSparseAppend(t *testing.T) { }() appendSample := func(x int) bool { + v := model.SampleValue(x) + t := time.Unix(int64(x), int64(x)) + l := model.LabelSet{model.LabelName(x): model.LabelValue(x)} + sample := &model.Sample{ - Value: model.SampleValue(float32(x)), - Timestamp: time.Unix(int64(x), int64(x)), - Labels: model.LabelPairs{string(x): string(x)}, + Value: v, + Timestamp: t, + Labels: l, } appendErr := persistence.AppendSample(sample) @@ -218,10 +222,14 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { }() appendSample := func(x int) bool { + v := model.SampleValue(x) + t := time.Unix(int64(x), int64(x)) + l := model.LabelSet{model.LabelName(x): model.LabelValue(x)} + sample := &model.Sample{ - Value: model.SampleValue(float32(x)), - Timestamp: time.Unix(int64(x), int64(x)), - Labels: model.LabelPairs{string(x): string(x)}, + Value: v, + Timestamp: t, + Labels: l, } appendErr := persistence.AppendSample(sample) @@ -230,11 +238,11 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { return false } - labelNameDDO := &data.LabelNameDDO{ + labelNameDTO := &data.LabelName{ Name: proto.String(string(x)), } - hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelNameDDO) + hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelNameDTO) if hasLabelNameErr != nil { return false @@ -244,12 +252,12 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { return false } - labelPairDDO := &data.LabelPairDDO{ + labelPairDTO := &data.LabelPair{ Name: proto.String(string(x)), Value: proto.String(string(x)), } - hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPairDDO) + hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPairDTO) if hasLabelPairErr != nil { return false @@ -259,7 +267,7 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { return false } - labelNameFingerprints, labelNameFingerprintsErr := persistence.GetLabelNameFingerprints(labelNameDDO) + labelNameFingerprints, labelNameFingerprintsErr := persistence.GetLabelNameFingerprints(labelNameDTO) if labelNameFingerprintsErr != nil { return false @@ -273,7 +281,7 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { return false } - labelPairFingerprints, labelPairFingerprintsErr := persistence.GetLabelPairFingerprints(labelPairDDO) + labelPairFingerprints, labelPairFingerprintsErr := persistence.getFingerprintsForLabelSet(labelPairDTO) if labelPairFingerprintsErr != nil { return false @@ -314,7 +322,7 @@ func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) { sample := &model.Sample{ Value: model.SampleValue(float32(x)), Timestamp: time.Unix(int64(x), 0), - Labels: model.LabelPairs{"name": "my_metric"}, + Labels: model.LabelSet{"name": "my_metric"}, } appendErr := persistence.AppendSample(sample) @@ -359,17 +367,24 @@ func TestStochastic(t *testing.T) { for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ { sample := &model.Sample{ - Labels: model.LabelPairs{}, + Labels: model.LabelSet{}, } - sample.Labels["name"] = fmt.Sprintf("metric_index_%d", metricIndex) + v := model.LabelValue(fmt.Sprintf("metric_index_%d", metricIndex)) + sample.Labels["name"] = v for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ { - sample.Labels[fmt.Sprintf("shared_label_%d", sharedLabelIndex)] = fmt.Sprintf("label_%d", sharedLabelIndex) + l := model.LabelName(fmt.Sprintf("shared_label_%d", sharedLabelIndex)) + v := model.LabelValue(fmt.Sprintf("label_%d", sharedLabelIndex)) + + sample.Labels[l] = v } for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ { - sample.Labels[fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)] = fmt.Sprintf("private_label_%d", unsharedLabelIndex) + l := model.LabelName(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)) + v := model.LabelValue(fmt.Sprintf("private_label_%d", unsharedLabelIndex)) + + sample.Labels[l] = v } timestamps := make(map[int64]bool) @@ -414,7 +429,7 @@ func TestStochastic(t *testing.T) { metricNewestSample[metricIndex] = newestSample for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ { - labelPair := &data.LabelPairDDO{ + labelPair := &data.LabelPair{ Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)), Value: proto.String(fmt.Sprintf("label_%d", sharedLabelIndex)), } @@ -429,7 +444,7 @@ func TestStochastic(t *testing.T) { return false } - labelName := &data.LabelNameDDO{ + labelName := &data.LabelName{ Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)), } @@ -446,7 +461,7 @@ func TestStochastic(t *testing.T) { } for sharedIndex := 0; sharedIndex < numberOfSharedLabels; sharedIndex++ { - labelName := &data.LabelNameDDO{ + labelName := &data.LabelName{ Name: proto.String(fmt.Sprintf("shared_label_%d", sharedIndex)), } fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(labelName) @@ -466,7 +481,7 @@ func TestStochastic(t *testing.T) { for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ { for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ { - labelPair := &data.LabelPairDDO{ + labelPair := &data.LabelPair{ Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)), Value: proto.String(fmt.Sprintf("private_label_%d", unsharedLabelIndex)), } @@ -481,7 +496,7 @@ func TestStochastic(t *testing.T) { return false } - labelPairFingerprints, labelPairFingerprintsErr := persistence.GetLabelPairFingerprints(labelPair) + labelPairFingerprints, labelPairFingerprintsErr := persistence.getFingerprintsForLabelSet(labelPair) if labelPairFingerprintsErr != nil { return false @@ -495,7 +510,7 @@ func TestStochastic(t *testing.T) { return false } - labelName := &data.LabelNameDDO{ + labelName := &data.LabelName{ Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)), } @@ -526,28 +541,20 @@ func TestStochastic(t *testing.T) { metric := make(model.Metric) - metric["name"] = fmt.Sprintf("metric_index_%d", metricIndex) + metric["name"] = model.LabelValue(fmt.Sprintf("metric_index_%d", metricIndex)) for i := 0; i < numberOfSharedLabels; i++ { - metric[fmt.Sprintf("shared_label_%d", i)] = fmt.Sprintf("label_%d", i) + l := model.LabelName(fmt.Sprintf("shared_label_%d", i)) + v := model.LabelValue(fmt.Sprintf("label_%d", i)) + + metric[l] = v } for i := 0; i < numberOfUnsharedLabels; i++ { - metric[fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, i)] = fmt.Sprintf("private_label_%d", i) - } + l := model.LabelName(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, i)) + v := model.LabelValue(fmt.Sprintf("private_label_%d", i)) - watermarks, count, watermarksErr := persistence.GetWatermarksForMetric(metric) - - if watermarksErr != nil { - return false - } - - if watermarks == nil { - return false - } - - if count != numberOfSamples { - return false + metric[l] = v } for i := 0; i < numberOfRangeScans; i++ { @@ -620,3 +627,169 @@ func TestStochastic(t *testing.T) { t.Error(stochasticError) } } + +func TestGetFingerprintsForLabelSet(t *testing.T) { + temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") + + defer func() { + if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { + t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) + } + }() + + persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + appendErr := persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(0), + Timestamp: time.Unix(0, 0), + Labels: model.LabelSet{ + "name": "my_metric", + "request_type": "your_mom", + }, + }) + + if appendErr != nil { + t.Error(appendErr) + } + + appendErr = persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(0), + Timestamp: time.Unix(int64(0), 0), + Labels: model.LabelSet{ + "name": "my_metric", + "request_type": "your_dad", + }, + }) + + if appendErr != nil { + t.Error(appendErr) + } + + result, getErr := persistence.GetFingerprintsForLabelSet(&(model.LabelSet{ + model.LabelName("name"): model.LabelValue("my_metric"), + })) + + if getErr != nil { + t.Error(getErr) + } + + if len(result) != 2 { + t.Errorf("Expected two elements.") + } + + result, getErr = persistence.GetFingerprintsForLabelSet(&(model.LabelSet{ + model.LabelName("request_type"): model.LabelValue("your_mom"), + })) + + if getErr != nil { + t.Error(getErr) + } + + if len(result) != 1 { + t.Errorf("Expected one element.") + } + + result, getErr = persistence.GetFingerprintsForLabelSet(&(model.LabelSet{ + model.LabelName("request_type"): model.LabelValue("your_dad"), + })) + + if getErr != nil { + t.Error(getErr) + } + + if len(result) != 1 { + t.Errorf("Expected one element.") + } +} + +func TestGetFingerprintsForLabelName(t *testing.T) { + temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") + + defer func() { + if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { + t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) + } + }() + + persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + appendErr := persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(0), + Timestamp: time.Unix(0, 0), + Labels: model.LabelSet{ + "name": "my_metric", + "request_type": "your_mom", + "language": "english", + }, + }) + + if appendErr != nil { + t.Error(appendErr) + } + + appendErr = persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(0), + Timestamp: time.Unix(int64(0), 0), + Labels: model.LabelSet{ + "name": "my_metric", + "request_type": "your_dad", + "sprache": "deutsch", + }, + }) + + if appendErr != nil { + t.Error(appendErr) + } + + b := model.LabelName("name") + result, getErr := persistence.GetFingerprintsForLabelName(&b) + + if getErr != nil { + t.Error(getErr) + } + + if len(result) != 2 { + t.Errorf("Expected two elements.") + } + + b = model.LabelName("request_type") + result, getErr = persistence.GetFingerprintsForLabelName(&b) + + if getErr != nil { + t.Error(getErr) + } + + if len(result) != 2 { + t.Errorf("Expected two elements.") + } + + b = model.LabelName("language") + result, getErr = persistence.GetFingerprintsForLabelName(&b) + + if getErr != nil { + t.Error(getErr) + } + + if len(result) != 1 { + t.Errorf("Expected one element.") + } + + b = model.LabelName("sprache") + result, getErr = persistence.GetFingerprintsForLabelName(&b) + + if getErr != nil { + t.Error(getErr) + } + + if len(result) != 1 { + t.Errorf("Expected one element.") + } +} diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index eebd7b022..eec9515dd 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -20,7 +20,7 @@ import ( ) var ( - existenceValue = coding.NewProtocolBufferEncoder(&data.MembershipIndexValueDDO{}) + existenceValue = coding.NewProtocolBufferEncoder(&data.MembershipIndexValue{}) ) type LevelDBMembershipIndex struct {