Various cleanups.

Kill LevelDB watermarks due to redundancy.

General interface documentation has begun.

Creating custom types for the model to prevent errors down the
road.

Renaming of components for easier comprehension.

Exposition of interface in LevelDB.

Slew of simple refactorings.
This commit is contained in:
Matt T. Proud 2012-12-07 11:55:43 +01:00
parent 4f0f8f9552
commit 15a6681651
8 changed files with 500 additions and 354 deletions

View file

@ -22,7 +22,7 @@ import (
"sort" "sort"
) )
func SampleToMetricDDO(s *Sample) *data.MetricDDO { func SampleToMetricDTO(s *Sample) *data.Metric {
labelLength := len(s.Labels) labelLength := len(s.Labels)
labelNames := make([]string, 0, labelLength) labelNames := make([]string, 0, labelLength)
@ -32,24 +32,24 @@ func SampleToMetricDDO(s *Sample) *data.MetricDDO {
sort.Strings(labelNames) sort.Strings(labelNames)
labelPairs := make([]*data.LabelPairDDO, 0, labelLength) labelSets := make([]*data.LabelPair, 0, labelLength)
for _, labelName := range labelNames { for _, labelName := range labelNames {
labelValue := s.Labels[labelName] labelValue := s.Labels[LabelName(labelName)]
labelPair := &data.LabelPairDDO{ labelPair := &data.LabelPair{
Name: proto.String(string(labelName)), Name: proto.String(string(labelName)),
Value: proto.String(string(labelValue)), Value: proto.String(string(labelValue)),
} }
labelPairs = append(labelPairs, labelPair) labelSets = append(labelSets, labelPair)
} }
return &data.MetricDDO{ return &data.Metric{
LabelPair: labelPairs, LabelPair: labelSets,
} }
} }
func MetricToMetricDDO(m *Metric) *data.MetricDDO { func MetricToDTO(m *Metric) *data.Metric {
metricLength := len(*m) metricLength := len(*m)
labelNames := make([]string, 0, metricLength) labelNames := make([]string, 0, metricLength)
@ -59,26 +59,21 @@ func MetricToMetricDDO(m *Metric) *data.MetricDDO {
sort.Strings(labelNames) sort.Strings(labelNames)
labelPairs := make([]*data.LabelPairDDO, 0, metricLength) labelSets := make([]*data.LabelPair, 0, metricLength)
for _, labelName := range labelNames { for _, labelName := range labelNames {
labelValue := (*m)[labelName] l := LabelName(labelName)
labelPair := &data.LabelPairDDO{ labelValue := (*m)[l]
labelPair := &data.LabelPair{
Name: proto.String(string(labelName)), Name: proto.String(string(labelName)),
Value: proto.String(string(labelValue)), Value: proto.String(string(labelValue)),
} }
labelPairs = append(labelPairs, labelPair) labelSets = append(labelSets, labelPair)
} }
return &data.MetricDDO{ return &data.Metric{
LabelPair: labelPairs, LabelPair: labelSets,
}
}
func BytesToFingerprintDDO(b []byte) *data.FingerprintDDO {
return &data.FingerprintDDO{
Signature: proto.String(string(b)),
} }
} }
@ -93,3 +88,41 @@ func BytesToFingerprint(v []byte) Fingerprint {
hash.Write(v) hash.Write(v)
return Fingerprint(hex.EncodeToString(hash.Sum([]byte{}))) return Fingerprint(hex.EncodeToString(hash.Sum([]byte{})))
} }
func LabelSetToDTOs(s *LabelSet) []*data.LabelPair {
metricLength := len(*s)
labelNames := make([]string, 0, metricLength)
for labelName := range *s {
labelNames = append(labelNames, string(labelName))
}
sort.Strings(labelNames)
labelSets := make([]*data.LabelPair, 0, metricLength)
for _, labelName := range labelNames {
l := LabelName(labelName)
labelValue := (*s)[l]
labelPair := &data.LabelPair{
Name: proto.String(string(labelName)),
Value: proto.String(string(labelValue)),
}
labelSets = append(labelSets, labelPair)
}
return labelSets
}
func LabelSetToDTO(s *LabelSet) *data.LabelSet {
return &data.LabelSet{
Member: LabelSetToDTOs(s),
}
}
func LabelNameToDTO(l *LabelName) *data.LabelName {
return &data.LabelName{
Name: proto.String(string(*l)),
}
}

View file

@ -11,89 +11,58 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package generated; package dto;
message LabelPairDDO { message LabelPair {
optional int64 version = 1 [default = 1]; optional int64 version = 1 [default = 1];
optional string name = 2; optional string name = 2;
optional string value = 3; optional string value = 3;
} }
message MetricDDO { message LabelName {
optional int64 version = 1 [default = 1];
repeated LabelPairDDO label_pair = 2;
}
message FingerprintCollectionDDO {
optional int64 version = 1 [default = 1];
repeated FingerprintDDO member = 2;
}
message LabelPairCollectionDDO {
optional int64 version = 1 [default = 1];
repeated LabelPairDDO member = 2;
}
message LabelNameDDO {
optional int64 version = 1 [default = 1]; optional int64 version = 1 [default = 1];
optional string name = 2; optional string name = 2;
} }
message FingerprintDDO { message Metric {
optional int64 version = 1 [default = 1];
repeated LabelPair label_pair = 2;
}
message Fingerprint {
optional int64 version = 1 [default = 1]; optional int64 version = 1 [default = 1];
// bytes
optional string signature = 2; optional string signature = 2;
} }
message WatermarkDDO { message FingerprintCollection {
optional int64 version = 1 [default = 1]; optional int64 version = 1 [default = 1];
optional int64 timestamp = 2; repeated Fingerprint member = 2;
} }
message SampleKeyDDO { message LabelSet {
optional int64 version = 1 [default = 1]; optional int64 version = 1 [default = 1];
optional FingerprintDDO fingerprint = 2; repeated LabelPair member = 2;
}
message SampleKey {
optional int64 version = 1 [default = 1];
optional Fingerprint fingerprint = 2;
optional bytes timestamp = 3; optional bytes timestamp = 3;
} }
message SampleValueDDO { message SampleValue {
optional int64 version = 1 [default = 1]; optional int64 version = 1 [default = 1];
optional float value = 2; optional float value = 2;
} }
message MembershipIndexValue {
// TOO OLD optional int64 version = 1 [default = 1];
message MembershipIndexValueDDO {
}
message LabelNameAndValueIndexDDO {
optional string name = 1;
optional string value = 2;
}
message LabelNameValuesDDO {
repeated string value = 1;
}
message LabelNameAndValueToMetricDDO {
repeated string metric = 1;
}
message MetricToWindowDDO {
repeated int64 window = 1;
}
message WindowSampleDDO {
repeated float value = 1;
} }

View file

@ -17,15 +17,36 @@ import (
"time" "time"
) )
// A Fingerprint is a simplified representation of an entity---e.g., a hash of
// an entire Metric.
type Fingerprint string type Fingerprint string
type LabelPairs map[string]string // A LabelName is a key for a LabelSet or Metric. It has a value associated
type Metric map[string]string // therewith.
type LabelName string
// A LabelValue is an associated value for a LabelName.
type LabelValue string
// A LabelSet is a collection of LabelName and LabelValue pairs. The LabelSet
// may be fully-qualified down to the point where it may resolve to a single
// Metric in the data store or not. All operations that occur within the realm
// of a LabelSet can emit a vector of Metric entities to which the LabelSet may
// match.
type LabelSet map[LabelName]LabelValue
// A Metric is similar to a LabelSet, but the key difference is that a Metric is
// a singleton and refers to one and only one stream of samples.
type Metric map[LabelName]LabelValue
// A SampleValue is a representation of a value for a given sample at a given
// time. It is presently float32 due to that being the representation that
// Protocol Buffers provide of floats in Go. This is a smell and should be
// remedied down the road.
type SampleValue float32 type SampleValue float32
type Sample struct { type Sample struct {
Labels LabelPairs Labels LabelSet
Value SampleValue Value SampleValue
Timestamp time.Time Timestamp time.Time
} }

View file

@ -32,7 +32,7 @@ type MetricsService struct {
} }
func (m MetricsService) ListLabels() []string { func (m MetricsService) ListLabels() []string {
labels, labelsError := m.persistence.GetLabelNames() labels, labelsError := m.persistence.GetAllLabelNames()
if labelsError != nil { if labelsError != nil {
m.ResponseBuilder().SetResponseCode(500) m.ResponseBuilder().SetResponseCode(500)
@ -41,18 +41,18 @@ func (m MetricsService) ListLabels() []string {
return labels return labels
} }
func (m MetricsService) ListLabelPairs() []model.LabelPairs { func (m MetricsService) ListLabelPairs() []model.LabelSet {
labelPairs, labelPairsError := m.persistence.GetLabelPairs() labelSets, labelPairsError := m.persistence.GetAllLabelPairs()
if labelPairsError != nil { if labelPairsError != nil {
m.ResponseBuilder().SetResponseCode(500) m.ResponseBuilder().SetResponseCode(500)
} }
return labelPairs return labelSets
} }
func (m MetricsService) ListMetrics() []model.LabelPairs { func (m MetricsService) ListMetrics() []model.LabelSet {
metrics, metricsError := m.persistence.GetMetrics() metrics, metricsError := m.persistence.GetAllMetrics()
if metricsError != nil { if metricsError != nil {
m.ResponseBuilder().SetResponseCode(500) m.ResponseBuilder().SetResponseCode(500)

View file

@ -17,17 +17,31 @@ import (
"github.com/matttproud/prometheus/model" "github.com/matttproud/prometheus/model"
) )
// MetricPersistence is a system for storing metric samples in a persistence
// layer.
type MetricPersistence interface { type MetricPersistence interface {
// A storage system may rely on external resources and thusly should be
// closed when finished.
Close() error Close() error
// Record a new sample in the storage layer.
AppendSample(sample *model.Sample) error AppendSample(sample *model.Sample) error
GetLabelNames() ([]string, error) // Get all of the metric fingerprints that are associated with the provided
GetLabelPairs() ([]model.LabelPairs, error) // label set.
GetMetrics() ([]model.LabelPairs, error) GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error)
GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error)
GetMetricFingerprintsForLabelPairs(labelSets []*model.LabelPairs) ([]*model.Fingerprint, error) GetAllLabelNames() ([]string, error)
GetFingerprintLabelPairs(fingerprint model.Fingerprint) (model.LabelPairs, error) GetAllLabelPairs() ([]model.LabelSet, error)
GetAllMetrics() ([]model.LabelSet, error)
// // BEGIN QUERY PRIMITIVES
//
// GetMetricForFingerprint()
// GetMetricWatermarks(metrics ...) (watermarks ...)
// GetMetricValuesForIntervals(metric, interval) (values ...)
// GetMetricValueLast()
// // END QUERY PRIMITIVES
RecordFingerprintWatermark(sample *model.Sample) error
} }

View file

@ -28,13 +28,11 @@ import (
) )
type LevelDBMetricPersistence struct { type LevelDBMetricPersistence struct {
fingerprintHighWaterMarks *storage.LevelDBPersistence fingerprintToMetrics *storage.LevelDBPersistence
fingerprintLabelPairs *storage.LevelDBPersistence metricSamples *storage.LevelDBPersistence
fingerprintLowWaterMarks *storage.LevelDBPersistence labelNameToFingerprints *storage.LevelDBPersistence
fingerprintSamples *storage.LevelDBPersistence labelSetToFingerprints *storage.LevelDBPersistence
labelNameFingerprints *storage.LevelDBPersistence metricMembershipIndex *index.LevelDBMembershipIndex
labelPairFingerprints *storage.LevelDBPersistence
metricMembershipIndex *index.LevelDBMembershipIndex
} }
type leveldbOpener func() type leveldbOpener func()
@ -46,29 +44,21 @@ func (l *LevelDBMetricPersistence) Close() error {
name string name string
closer io.Closer closer io.Closer
}{ }{
{
"Fingerprint High-Water Marks",
l.fingerprintHighWaterMarks,
},
{ {
"Fingerprint to Label Name and Value Pairs", "Fingerprint to Label Name and Value Pairs",
l.fingerprintLabelPairs, l.fingerprintToMetrics,
},
{
"Fingerprint Low-Water Marks",
l.fingerprintLowWaterMarks,
}, },
{ {
"Fingerprint Samples", "Fingerprint Samples",
l.fingerprintSamples, l.metricSamples,
}, },
{ {
"Label Name to Fingerprints", "Label Name to Fingerprints",
l.labelNameFingerprints, l.labelNameToFingerprints,
}, },
{ {
"Label Name and Value Pairs to Fingerprints", "Label Name and Value Pairs to Fingerprints",
l.labelPairFingerprints, l.labelSetToFingerprints,
}, },
{ {
"Metric Membership Index", "Metric Membership Index",
@ -114,7 +104,7 @@ func (l *LevelDBMetricPersistence) Close() error {
func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) { func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) {
log.Printf("Opening LevelDBPersistence storage containers...") log.Printf("Opening LevelDBPersistence storage containers...")
errorChannel := make(chan error, 7) errorChannel := make(chan error, 5)
emission := &LevelDBMetricPersistence{} emission := &LevelDBMetricPersistence{}
@ -122,27 +112,11 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
name string name string
opener leveldbOpener opener leveldbOpener
}{ }{
{
"High-Water Marks by Fingerprint",
func() {
var err error
emission.fingerprintHighWaterMarks, err = storage.NewLevelDBPersistence(baseDirectory+"/high_water_marks_by_fingerprint", 1000000, 10)
errorChannel <- err
},
},
{ {
"Label Names and Value Pairs by Fingerprint", "Label Names and Value Pairs by Fingerprint",
func() { func() {
var err error var err error
emission.fingerprintLabelPairs, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) emission.fingerprintToMetrics, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10)
errorChannel <- err
},
},
{
"Low-Water Marks by Fingerprint",
func() {
var err error
emission.fingerprintLowWaterMarks, err = storage.NewLevelDBPersistence(baseDirectory+"/low_water_marks_by_fingerprint", 1000000, 10)
errorChannel <- err errorChannel <- err
}, },
}, },
@ -150,7 +124,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Samples by Fingerprint", "Samples by Fingerprint",
func() { func() {
var err error var err error
emission.fingerprintSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) emission.metricSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10)
errorChannel <- err errorChannel <- err
}, },
}, },
@ -158,7 +132,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name", "Fingerprints by Label Name",
func() { func() {
var err error var err error
emission.labelNameFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) emission.labelNameToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10)
errorChannel <- err errorChannel <- err
}, },
}, },
@ -166,7 +140,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name and Value Pair", "Fingerprints by Label Name and Value Pair",
func() { func() {
var err error var err error
emission.labelPairFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) emission.labelSetToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10)
errorChannel <- err errorChannel <- err
}, },
}, },
@ -204,43 +178,44 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
return emission, nil return emission, nil
} }
func (l *LevelDBMetricPersistence) hasIndexMetric(ddo *data.MetricDDO) (bool, error) { func (l *LevelDBMetricPersistence) hasIndexMetric(dto *data.Metric) (bool, error) {
ddoKey := coding.NewProtocolBufferEncoder(ddo) dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.metricMembershipIndex.Has(ddoKey) return l.metricMembershipIndex.Has(dtoKey)
} }
func (l *LevelDBMetricPersistence) indexMetric(ddo *data.MetricDDO) error { func (l *LevelDBMetricPersistence) indexMetric(dto *data.Metric) error {
ddoKey := coding.NewProtocolBufferEncoder(ddo) dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.metricMembershipIndex.Put(ddoKey) return l.metricMembershipIndex.Put(dtoKey)
} }
func fingerprintDDOForMessage(message proto.Message) (*data.FingerprintDDO, error) { // TODO(mtp): Candidate for refactoring.
func fingerprintDTOForMessage(message proto.Message) (*data.Fingerprint, error) {
if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil { if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil {
fingerprint := model.BytesToFingerprint(messageByteArray) fingerprint := model.BytesToFingerprint(messageByteArray)
return &data.FingerprintDDO{ return &data.Fingerprint{
Signature: proto.String(string(fingerprint)), Signature: proto.String(string(fingerprint)),
}, nil }, nil
} else { } else {
return nil, marshalError return nil, marshalError
} }
return nil, errors.New("Unknown error in generating FingerprintDDO from message.") return nil, errors.New("Unknown error in generating FingerprintDTO from message.")
} }
func (l *LevelDBMetricPersistence) HasLabelPair(ddo *data.LabelPairDDO) (bool, error) { func (l *LevelDBMetricPersistence) HasLabelPair(dto *data.LabelPair) (bool, error) {
ddoKey := coding.NewProtocolBufferEncoder(ddo) dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.labelPairFingerprints.Has(ddoKey) return l.labelSetToFingerprints.Has(dtoKey)
} }
func (l *LevelDBMetricPersistence) HasLabelName(ddo *data.LabelNameDDO) (bool, error) { func (l *LevelDBMetricPersistence) HasLabelName(dto *data.LabelName) (bool, error) {
ddoKey := coding.NewProtocolBufferEncoder(ddo) dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.labelNameFingerprints.Has(ddoKey) return l.labelNameToFingerprints.Has(dtoKey)
} }
func (l *LevelDBMetricPersistence) GetLabelPairFingerprints(ddo *data.LabelPairDDO) (*data.FingerprintCollectionDDO, error) { func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(dto *data.LabelPair) (*data.FingerprintCollection, error) {
ddoKey := coding.NewProtocolBufferEncoder(ddo) dtoKey := coding.NewProtocolBufferEncoder(dto)
if get, getError := l.labelPairFingerprints.Get(ddoKey); getError == nil { if get, getError := l.labelSetToFingerprints.Get(dtoKey); getError == nil {
value := &data.FingerprintCollectionDDO{} value := &data.FingerprintCollection{}
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil {
return value, nil return value, nil
} else { } else {
@ -249,13 +224,14 @@ func (l *LevelDBMetricPersistence) GetLabelPairFingerprints(ddo *data.LabelPairD
} else { } else {
return nil, getError return nil, getError
} }
return nil, errors.New("Unknown error while getting label name and value pair fingerprints.")
panic("unreachable")
} }
func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(ddo *data.LabelNameDDO) (*data.FingerprintCollectionDDO, error) { func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(dto *data.LabelName) (*data.FingerprintCollection, error) {
ddoKey := coding.NewProtocolBufferEncoder(ddo) dtoKey := coding.NewProtocolBufferEncoder(dto)
if get, getError := l.labelNameFingerprints.Get(ddoKey); getError == nil { if get, getError := l.labelNameToFingerprints.Get(dtoKey); getError == nil {
value := &data.FingerprintCollectionDDO{} value := &data.FingerprintCollection{}
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil {
return value, nil return value, nil
} else { } else {
@ -268,29 +244,29 @@ func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(ddo *data.LabelNameD
return nil, errors.New("Unknown error while getting label name fingerprints.") return nil, errors.New("Unknown error while getting label name fingerprints.")
} }
func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *data.LabelPairDDO, fingerprints *data.FingerprintCollectionDDO) error { func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *data.LabelPair, fingerprints *data.FingerprintCollection) error {
labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair) labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair)
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
return l.labelPairFingerprints.Put(labelPairEncoded, fingerprintsEncoded) return l.labelSetToFingerprints.Put(labelPairEncoded, fingerprintsEncoded)
} }
func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *data.LabelNameDDO, fingerprints *data.FingerprintCollectionDDO) error { func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *data.LabelName, fingerprints *data.FingerprintCollection) error {
labelNameEncoded := coding.NewProtocolBufferEncoder(labelName) labelNameEncoded := coding.NewProtocolBufferEncoder(labelName)
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
return l.labelNameFingerprints.Put(labelNameEncoded, fingerprintsEncoded) return l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded)
} }
func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error { func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *data.LabelPair, fingerprint *data.Fingerprint) error {
if has, hasError := l.HasLabelPair(labelPair); hasError == nil { if has, hasError := l.HasLabelPair(labelPair); hasError == nil {
var fingerprints *data.FingerprintCollectionDDO var fingerprints *data.FingerprintCollection
if has { if has {
if existing, existingError := l.GetLabelPairFingerprints(labelPair); existingError == nil { if existing, existingError := l.getFingerprintsForLabelSet(labelPair); existingError == nil {
fingerprints = existing fingerprints = existing
} else { } else {
return existingError return existingError
} }
} else { } else {
fingerprints = &data.FingerprintCollectionDDO{} fingerprints = &data.FingerprintCollection{}
} }
fingerprints.Member = append(fingerprints.Member, fingerprint) fingerprints.Member = append(fingerprints.Member, fingerprint)
@ -303,13 +279,13 @@ func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *data.La
return errors.New("Unknown error when appending fingerprint to label name and value pair.") return errors.New("Unknown error when appending fingerprint to label name and value pair.")
} }
func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error { func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *data.LabelPair, fingerprint *data.Fingerprint) error {
labelName := &data.LabelNameDDO{ labelName := &data.LabelName{
Name: labelPair.Name, Name: labelPair.Name,
} }
if has, hasError := l.HasLabelName(labelName); hasError == nil { if has, hasError := l.HasLabelName(labelName); hasError == nil {
var fingerprints *data.FingerprintCollectionDDO var fingerprints *data.FingerprintCollection
if has { if has {
if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil { if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil {
fingerprints = existing fingerprints = existing
@ -317,7 +293,7 @@ func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *data.La
return existingError return existingError
} }
} else { } else {
fingerprints = &data.FingerprintCollectionDDO{} fingerprints = &data.FingerprintCollection{}
} }
fingerprints.Member = append(fingerprints.Member, fingerprint) fingerprints.Member = append(fingerprints.Member, fingerprint)
@ -330,26 +306,23 @@ func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *data.La
return errors.New("Unknown error when appending fingerprint to label name and value pair.") return errors.New("Unknown error when appending fingerprint to label name and value pair.")
} }
func (l *LevelDBMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error { func (l *LevelDBMetricPersistence) appendFingerprints(dto *data.Metric) error {
if fingerprintDDO, fingerprintDDOError := fingerprintDDOForMessage(ddo); fingerprintDDOError == nil { if fingerprintDTO, fingerprintDTOError := fingerprintDTOForMessage(dto); fingerprintDTOError == nil {
labelPairCollectionDDO := &data.LabelPairCollectionDDO{ fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO)
Member: ddo.LabelPair, metricDTOEncoder := coding.NewProtocolBufferEncoder(dto)
}
fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDDO)
labelPairCollectionDDOEncoder := coding.NewProtocolBufferEncoder(labelPairCollectionDDO)
if putError := l.fingerprintLabelPairs.Put(fingerprintKey, labelPairCollectionDDOEncoder); putError == nil { if putError := l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder); putError == nil {
labelCount := len(ddo.LabelPair) labelCount := len(dto.LabelPair)
labelPairErrors := make(chan error, labelCount) labelPairErrors := make(chan error, labelCount)
labelNameErrors := make(chan error, labelCount) labelNameErrors := make(chan error, labelCount)
for _, labelPair := range ddo.LabelPair { for _, labelPair := range dto.LabelPair {
go func(labelPair *data.LabelPairDDO) { go func(labelPair *data.LabelPair) {
labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDDO) labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO)
}(labelPair) }(labelPair)
go func(labelPair *data.LabelPairDDO) { go func(labelPair *data.LabelPair) {
labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDDO) labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDTO)
}(labelPair) }(labelPair)
} }
@ -375,19 +348,19 @@ func (l *LevelDBMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error
return putError return putError
} }
} else { } else {
return fingerprintDDOError return fingerprintDTOError
} }
return errors.New("Unknown error in appending label pairs to fingerprint.") return errors.New("Unknown error in appending label pairs to fingerprint.")
} }
func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error { func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error {
metricDDO := model.SampleToMetricDDO(sample) metricDTO := model.SampleToMetricDTO(sample)
if indexHas, indexHasError := l.hasIndexMetric(metricDDO); indexHasError == nil { if indexHas, indexHasError := l.hasIndexMetric(metricDTO); indexHasError == nil {
if !indexHas { if !indexHas {
if indexPutError := l.indexMetric(metricDDO); indexPutError == nil { if indexPutError := l.indexMetric(metricDTO); indexPutError == nil {
if appendError := l.appendFingerprints(metricDDO); appendError != nil { if appendError := l.appendFingerprints(metricDTO); appendError != nil {
log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError) log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError)
return appendError return appendError
} }
@ -401,40 +374,40 @@ func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error {
return indexHasError return indexHasError
} }
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { if fingerprintDTO, fingerprintDTOErr := fingerprintDTOForMessage(metricDTO); fingerprintDTOErr == nil {
sampleKeyDDO := &data.SampleKeyDDO{ sampleKeyDTO := &data.SampleKey{
Fingerprint: fingerprintDDO, Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(sample.Timestamp), Timestamp: indexable.EncodeTime(sample.Timestamp),
} }
sampleValueDDO := &data.SampleValueDDO{ sampleValueDTO := &data.SampleValue{
Value: proto.Float32(float32(sample.Value)), Value: proto.Float32(float32(sample.Value)),
} }
sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDDO) sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO)
sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDDO) sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO)
if putError := l.fingerprintSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil { if putError := l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil {
log.Printf("Could not append metric sample: %q\n", putError) log.Printf("Could not append metric sample: %q\n", putError)
return putError return putError
} }
} else { } else {
log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDDOErr) log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDTOErr)
return fingerprintDDOErr return fingerprintDTOErr
} }
return nil return nil
} }
func (l *LevelDBMetricPersistence) GetLabelNames() ([]string, error) { func (l *LevelDBMetricPersistence) GetAllLabelNames() ([]string, error) {
if getAll, getAllError := l.labelNameFingerprints.GetAll(); getAllError == nil { if getAll, getAllError := l.labelNameToFingerprints.GetAll(); getAllError == nil {
result := make([]string, 0, len(getAll)) result := make([]string, 0, len(getAll))
labelNameDDO := &data.LabelNameDDO{} labelNameDTO := &data.LabelName{}
for _, pair := range getAll { for _, pair := range getAll {
if unmarshalError := proto.Unmarshal(pair.Left, labelNameDDO); unmarshalError == nil { if unmarshalError := proto.Unmarshal(pair.Left, labelNameDTO); unmarshalError == nil {
result = append(result, *labelNameDDO.Name) result = append(result, *labelNameDTO.Name)
} else { } else {
return nil, unmarshalError return nil, unmarshalError
} }
@ -449,16 +422,16 @@ func (l *LevelDBMetricPersistence) GetLabelNames() ([]string, error) {
return nil, errors.New("Unknown error encountered when querying label names.") return nil, errors.New("Unknown error encountered when querying label names.")
} }
func (l *LevelDBMetricPersistence) GetLabelPairs() ([]model.LabelPairs, error) { func (l *LevelDBMetricPersistence) GetAllLabelPairs() ([]model.LabelSet, error) {
if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil { if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil {
result := make([]model.LabelPairs, 0, len(getAll)) result := make([]model.LabelSet, 0, len(getAll))
labelPairDDO := &data.LabelPairDDO{} labelPairDTO := &data.LabelPair{}
for _, pair := range getAll { for _, pair := range getAll {
if unmarshalError := proto.Unmarshal(pair.Left, labelPairDDO); unmarshalError == nil { if unmarshalError := proto.Unmarshal(pair.Left, labelPairDTO); unmarshalError == nil {
item := model.LabelPairs{ n := model.LabelName(*labelPairDTO.Name)
*labelPairDDO.Name: *labelPairDDO.Value, v := model.LabelValue(*labelPairDTO.Value)
} item := model.LabelSet{n: v}
result = append(result, item) result = append(result, item)
} else { } else {
return nil, unmarshalError return nil, unmarshalError
@ -474,10 +447,10 @@ func (l *LevelDBMetricPersistence) GetLabelPairs() ([]model.LabelPairs, error) {
return nil, errors.New("Unknown error encountered when querying label pairs.") return nil, errors.New("Unknown error encountered when querying label pairs.")
} }
func (l *LevelDBMetricPersistence) GetMetrics() ([]model.LabelPairs, error) { func (l *LevelDBMetricPersistence) GetAllMetrics() ([]model.LabelSet, error) {
if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil { if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil {
result := make([]model.LabelPairs, 0) result := make([]model.LabelSet, 0)
fingerprintCollection := &data.FingerprintCollectionDDO{} fingerprintCollection := &data.FingerprintCollection{}
fingerprints := make(utility.Set) fingerprints := make(utility.Set)
@ -487,20 +460,22 @@ func (l *LevelDBMetricPersistence) GetMetrics() ([]model.LabelPairs, error) {
if !fingerprints.Has(*member.Signature) { if !fingerprints.Has(*member.Signature) {
fingerprints.Add(*member.Signature) fingerprints.Add(*member.Signature)
fingerprintEncoded := coding.NewProtocolBufferEncoder(member) fingerprintEncoded := coding.NewProtocolBufferEncoder(member)
if labelPairCollectionRaw, labelPairCollectionRawError := l.fingerprintLabelPairs.Get(fingerprintEncoded); labelPairCollectionRawError == nil { if labelPairCollectionRaw, labelPairCollectionRawError := l.fingerprintToMetrics.Get(fingerprintEncoded); labelPairCollectionRawError == nil {
labelPairCollectionDDO := &data.LabelPairCollectionDDO{} labelPairCollectionDTO := &data.LabelSet{}
if labelPairCollectionDDOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDDO); labelPairCollectionDDOMarshalError == nil { if labelPairCollectionDTOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDTO); labelPairCollectionDTOMarshalError == nil {
intermediate := make(model.LabelPairs, 0) intermediate := make(model.LabelSet, 0)
for _, member := range labelPairCollectionDDO.Member { for _, member := range labelPairCollectionDTO.Member {
intermediate[*member.Name] = *member.Value n := model.LabelName(*member.Name)
v := model.LabelValue(*member.Value)
intermediate[n] = v
} }
result = append(result, intermediate) result = append(result, intermediate)
} else { } else {
return nil, labelPairCollectionDDOMarshalError return nil, labelPairCollectionDTOMarshalError
} }
} else { } else {
return nil, labelPairCollectionRawError return nil, labelPairCollectionRawError
@ -519,86 +494,15 @@ func (l *LevelDBMetricPersistence) GetMetrics() ([]model.LabelPairs, error) {
return nil, errors.New("Unknown error encountered when querying metrics.") return nil, errors.New("Unknown error encountered when querying metrics.")
} }
func (l *LevelDBMetricPersistence) GetWatermarksForMetric(metric model.Metric) (*model.Interval, int, error) {
metricDDO := model.MetricToMetricDDO(&metric)
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil {
if iterator, closer, iteratorErr := l.fingerprintSamples.GetIterator(); iteratorErr == nil {
defer closer.Close()
start := &data.SampleKeyDDO{
Fingerprint: fingerprintDDO,
Timestamp: indexable.EarliestTime,
}
if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
iterator.Seek(encode)
if iterator.Valid() {
found := &data.SampleKeyDDO{}
if unmarshalErr := proto.Unmarshal(iterator.Key(), found); unmarshalErr == nil {
var foundEntries int = 0
if *fingerprintDDO.Signature == *found.Fingerprint.Signature {
emission := &model.Interval{
OldestInclusive: indexable.DecodeTime(found.Timestamp),
NewestInclusive: indexable.DecodeTime(found.Timestamp),
}
for iterator = iterator; iterator.Valid(); iterator.Next() {
if subsequentUnmarshalErr := proto.Unmarshal(iterator.Key(), found); subsequentUnmarshalErr == nil {
if *fingerprintDDO.Signature != *found.Fingerprint.Signature {
return emission, foundEntries, nil
}
foundEntries++
emission.NewestInclusive = indexable.DecodeTime(found.Timestamp)
} else {
log.Printf("Could not de-serialize subsequent key: %q\n", subsequentUnmarshalErr)
return nil, -7, subsequentUnmarshalErr
}
}
return emission, foundEntries, nil
} else {
return &model.Interval{}, -6, nil
}
} else {
log.Printf("Could not de-serialize start key: %q\n", unmarshalErr)
return nil, -5, unmarshalErr
}
} else {
iteratorErr := iterator.GetError()
log.Printf("Could not seek for metric watermark beginning: %q\n", iteratorErr)
return nil, -4, iteratorErr
}
} else {
log.Printf("Could not seek for metric watermark: %q\n", encodeErr)
return nil, -3, encodeErr
}
} else {
if closer != nil {
defer closer.Close()
}
log.Printf("Could not provision iterator for metric: %q\n", iteratorErr)
return nil, -3, iteratorErr
}
} else {
log.Printf("Could not encode metric: %q\n", fingerprintDDOErr)
return nil, -2, fingerprintDDOErr
}
return nil, -1, errors.New("Unknown error occurred while querying metric watermarks.")
}
func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) {
metricDDO := model.MetricToMetricDDO(&metric) metricDTO := model.MetricToDTO(&metric)
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { if fingerprintDTO, fingerprintDTOErr := fingerprintDTOForMessage(metricDTO); fingerprintDTOErr == nil {
if iterator, closer, iteratorErr := l.fingerprintSamples.GetIterator(); iteratorErr == nil { if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil {
defer closer.Close() defer closer.Close()
start := &data.SampleKeyDDO{ start := &data.SampleKey{
Fingerprint: fingerprintDDO, Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(interval.OldestInclusive), Timestamp: indexable.EncodeTime(interval.OldestInclusive),
} }
@ -608,11 +512,11 @@ func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, inte
iterator.Seek(encode) iterator.Seek(encode)
for iterator = iterator; iterator.Valid(); iterator.Next() { for iterator = iterator; iterator.Valid(); iterator.Next() {
key := &data.SampleKeyDDO{} key := &data.SampleKey{}
value := &data.SampleValueDDO{} value := &data.SampleValue{}
if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil { if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil {
if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil { if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil {
if *fingerprintDDO.Signature == *key.Fingerprint.Signature { if *fingerprintDTO.Signature == *key.Fingerprint.Signature {
// Wart // Wart
if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() { if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() {
emission = append(emission, model.Samples{ emission = append(emission, model.Samples{
@ -644,21 +548,53 @@ func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, inte
return nil, iteratorErr return nil, iteratorErr
} }
} else { } else {
log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDDOErr) log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr)
return nil, fingerprintDDOErr return nil, fingerprintDTOErr
} }
return nil, errors.New("Unknown error occurred while querying metric watermarks.") panic("unreachable")
} }
func (l *LevelDBMetricPersistence) GetFingerprintLabelPairs(f model.Fingerprint) (model.LabelPairs, error) { func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error) {
panic("NOT IMPLEMENTED") emission := make([]*model.Fingerprint, 0, 0)
for _, labelSetDTO := range model.LabelSetToDTOs(labelSet) {
if f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)); err == nil {
unmarshaled := &data.FingerprintCollection{}
if unmarshalErr := proto.Unmarshal(f, unmarshaled); unmarshalErr == nil {
for _, m := range unmarshaled.Member {
fp := model.Fingerprint(*m.Signature)
emission = append(emission, &fp)
}
} else {
return nil, err
}
} else {
return nil, err
}
}
return emission, nil
} }
func (l *LevelDBMetricPersistence) GetMetricFingerprintsForLabelPairs(p []*model.LabelPairs) ([]*model.Fingerprint, error) { func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error) {
panic("NOT IMPLEMENTED") emission := make([]*model.Fingerprint, 0, 0)
}
func (l *LevelDBMetricPersistence) RecordFingerprintWatermark(s *model.Sample) error { if raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(labelName))); err == nil {
panic("NOT IMPLEMENTED")
unmarshaled := &data.FingerprintCollection{}
if err = proto.Unmarshal(raw, unmarshaled); err == nil {
for _, m := range unmarshaled.Member {
fp := model.Fingerprint(*m.Signature)
emission = append(emission, &fp)
}
} else {
return nil, err
}
} else {
return nil, err
}
return emission, nil
} }

View file

@ -82,12 +82,12 @@ func TestReadEmpty(t *testing.T) {
name := string(x) name := string(x)
value := string(x) value := string(x)
ddo := &data.LabelPairDDO{ dto := &data.LabelPair{
Name: proto.String(name), Name: proto.String(name),
Value: proto.String(value), Value: proto.String(value),
} }
has, hasErr := persistence.HasLabelPair(ddo) has, hasErr := persistence.HasLabelPair(dto)
if hasErr != nil { if hasErr != nil {
return false return false
@ -102,11 +102,11 @@ func TestReadEmpty(t *testing.T) {
hasLabelName := func(x int) bool { hasLabelName := func(x int) bool {
name := string(x) name := string(x)
ddo := &data.LabelNameDDO{ dto := &data.LabelName{
Name: proto.String(name), Name: proto.String(name),
} }
has, hasErr := persistence.HasLabelName(ddo) has, hasErr := persistence.HasLabelName(dto)
if hasErr != nil { if hasErr != nil {
return false return false
@ -123,12 +123,12 @@ func TestReadEmpty(t *testing.T) {
name := string(x) name := string(x)
value := string(x) value := string(x)
ddo := &data.LabelPairDDO{ dto := &data.LabelPair{
Name: proto.String(name), Name: proto.String(name),
Value: proto.String(value), Value: proto.String(value),
} }
fingerprints, fingerprintsErr := persistence.GetLabelPairFingerprints(ddo) fingerprints, fingerprintsErr := persistence.getFingerprintsForLabelSet(dto)
if fingerprintsErr != nil { if fingerprintsErr != nil {
return false return false
@ -148,11 +148,11 @@ func TestReadEmpty(t *testing.T) {
getLabelNameFingerprints := func(x int) bool { getLabelNameFingerprints := func(x int) bool {
name := string(x) name := string(x)
ddo := &data.LabelNameDDO{ dto := &data.LabelName{
Name: proto.String(name), Name: proto.String(name),
} }
fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(ddo) fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(dto)
if fingerprintsErr != nil { if fingerprintsErr != nil {
return false return false
@ -186,10 +186,14 @@ func TestAppendSampleAsPureSparseAppend(t *testing.T) {
}() }()
appendSample := func(x int) bool { appendSample := func(x int) bool {
v := model.SampleValue(x)
t := time.Unix(int64(x), int64(x))
l := model.LabelSet{model.LabelName(x): model.LabelValue(x)}
sample := &model.Sample{ sample := &model.Sample{
Value: model.SampleValue(float32(x)), Value: v,
Timestamp: time.Unix(int64(x), int64(x)), Timestamp: t,
Labels: model.LabelPairs{string(x): string(x)}, Labels: l,
} }
appendErr := persistence.AppendSample(sample) appendErr := persistence.AppendSample(sample)
@ -218,10 +222,14 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
}() }()
appendSample := func(x int) bool { appendSample := func(x int) bool {
v := model.SampleValue(x)
t := time.Unix(int64(x), int64(x))
l := model.LabelSet{model.LabelName(x): model.LabelValue(x)}
sample := &model.Sample{ sample := &model.Sample{
Value: model.SampleValue(float32(x)), Value: v,
Timestamp: time.Unix(int64(x), int64(x)), Timestamp: t,
Labels: model.LabelPairs{string(x): string(x)}, Labels: l,
} }
appendErr := persistence.AppendSample(sample) appendErr := persistence.AppendSample(sample)
@ -230,11 +238,11 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
return false return false
} }
labelNameDDO := &data.LabelNameDDO{ labelNameDTO := &data.LabelName{
Name: proto.String(string(x)), Name: proto.String(string(x)),
} }
hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelNameDDO) hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelNameDTO)
if hasLabelNameErr != nil { if hasLabelNameErr != nil {
return false return false
@ -244,12 +252,12 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
return false return false
} }
labelPairDDO := &data.LabelPairDDO{ labelPairDTO := &data.LabelPair{
Name: proto.String(string(x)), Name: proto.String(string(x)),
Value: proto.String(string(x)), Value: proto.String(string(x)),
} }
hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPairDDO) hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPairDTO)
if hasLabelPairErr != nil { if hasLabelPairErr != nil {
return false return false
@ -259,7 +267,7 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
return false return false
} }
labelNameFingerprints, labelNameFingerprintsErr := persistence.GetLabelNameFingerprints(labelNameDDO) labelNameFingerprints, labelNameFingerprintsErr := persistence.GetLabelNameFingerprints(labelNameDTO)
if labelNameFingerprintsErr != nil { if labelNameFingerprintsErr != nil {
return false return false
@ -273,7 +281,7 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
return false return false
} }
labelPairFingerprints, labelPairFingerprintsErr := persistence.GetLabelPairFingerprints(labelPairDDO) labelPairFingerprints, labelPairFingerprintsErr := persistence.getFingerprintsForLabelSet(labelPairDTO)
if labelPairFingerprintsErr != nil { if labelPairFingerprintsErr != nil {
return false return false
@ -314,7 +322,7 @@ func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) {
sample := &model.Sample{ sample := &model.Sample{
Value: model.SampleValue(float32(x)), Value: model.SampleValue(float32(x)),
Timestamp: time.Unix(int64(x), 0), Timestamp: time.Unix(int64(x), 0),
Labels: model.LabelPairs{"name": "my_metric"}, Labels: model.LabelSet{"name": "my_metric"},
} }
appendErr := persistence.AppendSample(sample) appendErr := persistence.AppendSample(sample)
@ -359,17 +367,24 @@ func TestStochastic(t *testing.T) {
for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ { for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ {
sample := &model.Sample{ sample := &model.Sample{
Labels: model.LabelPairs{}, Labels: model.LabelSet{},
} }
sample.Labels["name"] = fmt.Sprintf("metric_index_%d", metricIndex) v := model.LabelValue(fmt.Sprintf("metric_index_%d", metricIndex))
sample.Labels["name"] = v
for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ { for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ {
sample.Labels[fmt.Sprintf("shared_label_%d", sharedLabelIndex)] = fmt.Sprintf("label_%d", sharedLabelIndex) l := model.LabelName(fmt.Sprintf("shared_label_%d", sharedLabelIndex))
v := model.LabelValue(fmt.Sprintf("label_%d", sharedLabelIndex))
sample.Labels[l] = v
} }
for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ { for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ {
sample.Labels[fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)] = fmt.Sprintf("private_label_%d", unsharedLabelIndex) l := model.LabelName(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex))
v := model.LabelValue(fmt.Sprintf("private_label_%d", unsharedLabelIndex))
sample.Labels[l] = v
} }
timestamps := make(map[int64]bool) timestamps := make(map[int64]bool)
@ -414,7 +429,7 @@ func TestStochastic(t *testing.T) {
metricNewestSample[metricIndex] = newestSample metricNewestSample[metricIndex] = newestSample
for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ { for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ {
labelPair := &data.LabelPairDDO{ labelPair := &data.LabelPair{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)), Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)),
Value: proto.String(fmt.Sprintf("label_%d", sharedLabelIndex)), Value: proto.String(fmt.Sprintf("label_%d", sharedLabelIndex)),
} }
@ -429,7 +444,7 @@ func TestStochastic(t *testing.T) {
return false return false
} }
labelName := &data.LabelNameDDO{ labelName := &data.LabelName{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)), Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)),
} }
@ -446,7 +461,7 @@ func TestStochastic(t *testing.T) {
} }
for sharedIndex := 0; sharedIndex < numberOfSharedLabels; sharedIndex++ { for sharedIndex := 0; sharedIndex < numberOfSharedLabels; sharedIndex++ {
labelName := &data.LabelNameDDO{ labelName := &data.LabelName{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedIndex)), Name: proto.String(fmt.Sprintf("shared_label_%d", sharedIndex)),
} }
fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(labelName) fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(labelName)
@ -466,7 +481,7 @@ func TestStochastic(t *testing.T) {
for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ { for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ {
for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ { for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ {
labelPair := &data.LabelPairDDO{ labelPair := &data.LabelPair{
Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)), Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)),
Value: proto.String(fmt.Sprintf("private_label_%d", unsharedLabelIndex)), Value: proto.String(fmt.Sprintf("private_label_%d", unsharedLabelIndex)),
} }
@ -481,7 +496,7 @@ func TestStochastic(t *testing.T) {
return false return false
} }
labelPairFingerprints, labelPairFingerprintsErr := persistence.GetLabelPairFingerprints(labelPair) labelPairFingerprints, labelPairFingerprintsErr := persistence.getFingerprintsForLabelSet(labelPair)
if labelPairFingerprintsErr != nil { if labelPairFingerprintsErr != nil {
return false return false
@ -495,7 +510,7 @@ func TestStochastic(t *testing.T) {
return false return false
} }
labelName := &data.LabelNameDDO{ labelName := &data.LabelName{
Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)), Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)),
} }
@ -526,28 +541,20 @@ func TestStochastic(t *testing.T) {
metric := make(model.Metric) metric := make(model.Metric)
metric["name"] = fmt.Sprintf("metric_index_%d", metricIndex) metric["name"] = model.LabelValue(fmt.Sprintf("metric_index_%d", metricIndex))
for i := 0; i < numberOfSharedLabels; i++ { for i := 0; i < numberOfSharedLabels; i++ {
metric[fmt.Sprintf("shared_label_%d", i)] = fmt.Sprintf("label_%d", i) l := model.LabelName(fmt.Sprintf("shared_label_%d", i))
v := model.LabelValue(fmt.Sprintf("label_%d", i))
metric[l] = v
} }
for i := 0; i < numberOfUnsharedLabels; i++ { for i := 0; i < numberOfUnsharedLabels; i++ {
metric[fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, i)] = fmt.Sprintf("private_label_%d", i) l := model.LabelName(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, i))
} v := model.LabelValue(fmt.Sprintf("private_label_%d", i))
watermarks, count, watermarksErr := persistence.GetWatermarksForMetric(metric) metric[l] = v
if watermarksErr != nil {
return false
}
if watermarks == nil {
return false
}
if count != numberOfSamples {
return false
} }
for i := 0; i < numberOfRangeScans; i++ { for i := 0; i < numberOfRangeScans; i++ {
@ -620,3 +627,169 @@ func TestStochastic(t *testing.T) {
t.Error(stochasticError) t.Error(stochasticError)
} }
} }
func TestGetFingerprintsForLabelSet(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendErr := persistence.AppendSample(&model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(0, 0),
Labels: model.LabelSet{
"name": "my_metric",
"request_type": "your_mom",
},
})
if appendErr != nil {
t.Error(appendErr)
}
appendErr = persistence.AppendSample(&model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(int64(0), 0),
Labels: model.LabelSet{
"name": "my_metric",
"request_type": "your_dad",
},
})
if appendErr != nil {
t.Error(appendErr)
}
result, getErr := persistence.GetFingerprintsForLabelSet(&(model.LabelSet{
model.LabelName("name"): model.LabelValue("my_metric"),
}))
if getErr != nil {
t.Error(getErr)
}
if len(result) != 2 {
t.Errorf("Expected two elements.")
}
result, getErr = persistence.GetFingerprintsForLabelSet(&(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_mom"),
}))
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
result, getErr = persistence.GetFingerprintsForLabelSet(&(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_dad"),
}))
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
}
func TestGetFingerprintsForLabelName(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendErr := persistence.AppendSample(&model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(0, 0),
Labels: model.LabelSet{
"name": "my_metric",
"request_type": "your_mom",
"language": "english",
},
})
if appendErr != nil {
t.Error(appendErr)
}
appendErr = persistence.AppendSample(&model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(int64(0), 0),
Labels: model.LabelSet{
"name": "my_metric",
"request_type": "your_dad",
"sprache": "deutsch",
},
})
if appendErr != nil {
t.Error(appendErr)
}
b := model.LabelName("name")
result, getErr := persistence.GetFingerprintsForLabelName(&b)
if getErr != nil {
t.Error(getErr)
}
if len(result) != 2 {
t.Errorf("Expected two elements.")
}
b = model.LabelName("request_type")
result, getErr = persistence.GetFingerprintsForLabelName(&b)
if getErr != nil {
t.Error(getErr)
}
if len(result) != 2 {
t.Errorf("Expected two elements.")
}
b = model.LabelName("language")
result, getErr = persistence.GetFingerprintsForLabelName(&b)
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
b = model.LabelName("sprache")
result, getErr = persistence.GetFingerprintsForLabelName(&b)
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
}

View file

@ -20,7 +20,7 @@ import (
) )
var ( var (
existenceValue = coding.NewProtocolBufferEncoder(&data.MembershipIndexValueDDO{}) existenceValue = coding.NewProtocolBufferEncoder(&data.MembershipIndexValue{})
) )
type LevelDBMembershipIndex struct { type LevelDBMembershipIndex struct {