mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-21 10:41:35 -08:00
733 lines
22 KiB
Go
733 lines
22 KiB
Go
// Copyright 2012 Prometheus Team
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package leveldb
|
|
|
|
import (
|
|
"code.google.com/p/goprotobuf/proto"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/matttproud/prometheus/coding"
|
|
"github.com/matttproud/prometheus/coding/indexable"
|
|
"github.com/matttproud/prometheus/model"
|
|
data "github.com/matttproud/prometheus/model/generated"
|
|
index "github.com/matttproud/prometheus/storage/raw/index/leveldb"
|
|
storage "github.com/matttproud/prometheus/storage/raw/leveldb"
|
|
"github.com/matttproud/prometheus/utility"
|
|
"io"
|
|
"log"
|
|
"sort"
|
|
)
|
|
|
|
type LevigoMetricPersistence struct {
|
|
fingerprintHighWaterMarks *storage.LevigoPersistence
|
|
fingerprintLabelPairs *storage.LevigoPersistence
|
|
fingerprintLowWaterMarks *storage.LevigoPersistence
|
|
fingerprintSamples *storage.LevigoPersistence
|
|
labelNameFingerprints *storage.LevigoPersistence
|
|
labelPairFingerprints *storage.LevigoPersistence
|
|
metricMembershipIndex *index.LevigoMembershipIndex
|
|
}
|
|
|
|
type levigoOpener func()
|
|
|
|
func (l *LevigoMetricPersistence) Close() error {
|
|
log.Printf("Closing LevigoPersistence storage containers...")
|
|
|
|
var persistences = []struct {
|
|
name string
|
|
closer io.Closer
|
|
}{
|
|
{
|
|
"Fingerprint High-Water Marks",
|
|
l.fingerprintHighWaterMarks,
|
|
},
|
|
{
|
|
"Fingerprint to Label Name and Value Pairs",
|
|
l.fingerprintLabelPairs,
|
|
},
|
|
{
|
|
"Fingerprint Low-Water Marks",
|
|
l.fingerprintLowWaterMarks,
|
|
},
|
|
{
|
|
"Fingerprint Samples",
|
|
l.fingerprintSamples,
|
|
},
|
|
{
|
|
"Label Name to Fingerprints",
|
|
l.labelNameFingerprints,
|
|
},
|
|
{
|
|
"Label Name and Value Pairs to Fingerprints",
|
|
l.labelPairFingerprints,
|
|
},
|
|
{
|
|
"Metric Membership Index",
|
|
l.metricMembershipIndex,
|
|
},
|
|
}
|
|
|
|
errorChannel := make(chan error, len(persistences))
|
|
|
|
for _, persistence := range persistences {
|
|
name := persistence.name
|
|
closer := persistence.closer
|
|
|
|
if closer != nil {
|
|
log.Printf("Closing LevigoPersistence storage container: %s\n", name)
|
|
closingError := closer.Close()
|
|
|
|
if closingError != nil {
|
|
log.Printf("Could not close a LevigoPersistence storage container; inconsistencies are possible: %q\n", closingError)
|
|
}
|
|
|
|
errorChannel <- closingError
|
|
} else {
|
|
errorChannel <- nil
|
|
}
|
|
}
|
|
|
|
for i := 0; i < cap(errorChannel); i++ {
|
|
closingError := <-errorChannel
|
|
|
|
if closingError != nil {
|
|
return closingError
|
|
}
|
|
}
|
|
|
|
log.Printf("Successfully closed all LevigoPersistence storage containers.")
|
|
|
|
return nil
|
|
}
|
|
|
|
func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, error) {
|
|
log.Printf("Opening LevigoPersistence storage containers...")
|
|
|
|
errorChannel := make(chan error, 7)
|
|
|
|
emission := &LevigoMetricPersistence{}
|
|
|
|
var subsystemOpeners = []struct {
|
|
name string
|
|
opener levigoOpener
|
|
}{
|
|
{
|
|
"High-Water Marks by Fingerprint",
|
|
func() {
|
|
var anomaly error
|
|
emission.fingerprintHighWaterMarks, anomaly = storage.NewLevigoPersistence(baseDirectory+"/high_water_marks_by_fingerprint", 1000000, 10)
|
|
errorChannel <- anomaly
|
|
},
|
|
},
|
|
{
|
|
"Label Names and Value Pairs by Fingerprint",
|
|
func() {
|
|
var anomaly error
|
|
emission.fingerprintLabelPairs, anomaly = storage.NewLevigoPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10)
|
|
errorChannel <- anomaly
|
|
},
|
|
},
|
|
{
|
|
"Low-Water Marks by Fingerprint",
|
|
func() {
|
|
var anomaly error
|
|
emission.fingerprintLowWaterMarks, anomaly = storage.NewLevigoPersistence(baseDirectory+"/low_water_marks_by_fingerprint", 1000000, 10)
|
|
errorChannel <- anomaly
|
|
},
|
|
},
|
|
{
|
|
"Samples by Fingerprint",
|
|
func() {
|
|
var anomaly error
|
|
emission.fingerprintSamples, anomaly = storage.NewLevigoPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10)
|
|
errorChannel <- anomaly
|
|
},
|
|
},
|
|
{
|
|
"Fingerprints by Label Name",
|
|
func() {
|
|
var anomaly error
|
|
emission.labelNameFingerprints, anomaly = storage.NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10)
|
|
errorChannel <- anomaly
|
|
},
|
|
},
|
|
{
|
|
"Fingerprints by Label Name and Value Pair",
|
|
func() {
|
|
var anomaly error
|
|
emission.labelPairFingerprints, anomaly = storage.NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10)
|
|
errorChannel <- anomaly
|
|
},
|
|
},
|
|
{
|
|
"Metric Membership Index",
|
|
func() {
|
|
var anomaly error
|
|
emission.metricMembershipIndex, anomaly = index.NewLevigoMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10)
|
|
errorChannel <- anomaly
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, subsystem := range subsystemOpeners {
|
|
name := subsystem.name
|
|
opener := subsystem.opener
|
|
|
|
log.Printf("Opening LevigoPersistence storage container: %s\n", name)
|
|
|
|
go opener()
|
|
}
|
|
|
|
for i := 0; i < cap(errorChannel); i++ {
|
|
openingError := <-errorChannel
|
|
|
|
if openingError != nil {
|
|
|
|
log.Printf("Could not open a LevigoPersistence storage container: %q\n", openingError)
|
|
|
|
return nil, openingError
|
|
}
|
|
}
|
|
|
|
log.Printf("Successfully opened all LevigoPersistence storage containers.\n")
|
|
|
|
return emission, nil
|
|
}
|
|
|
|
func ddoFromSample(sample *model.Sample) *data.MetricDDO {
|
|
labelNames := make([]string, 0, len(sample.Labels))
|
|
|
|
for labelName, _ := range sample.Labels {
|
|
labelNames = append(labelNames, string(labelName))
|
|
}
|
|
|
|
sort.Strings(labelNames)
|
|
|
|
labelPairs := make([]*data.LabelPairDDO, 0, len(sample.Labels))
|
|
|
|
for _, labelName := range labelNames {
|
|
labelValue := sample.Labels[labelName]
|
|
labelPair := &data.LabelPairDDO{
|
|
Name: proto.String(string(labelName)),
|
|
Value: proto.String(string(labelValue)),
|
|
}
|
|
|
|
labelPairs = append(labelPairs, labelPair)
|
|
}
|
|
|
|
metricDDO := &data.MetricDDO{
|
|
LabelPair: labelPairs,
|
|
}
|
|
|
|
return metricDDO
|
|
}
|
|
|
|
func ddoFromMetric(metric model.Metric) *data.MetricDDO {
|
|
labelNames := make([]string, 0, len(metric))
|
|
|
|
for labelName, _ := range metric {
|
|
labelNames = append(labelNames, string(labelName))
|
|
}
|
|
|
|
sort.Strings(labelNames)
|
|
|
|
labelPairs := make([]*data.LabelPairDDO, 0, len(metric))
|
|
|
|
for _, labelName := range labelNames {
|
|
labelValue := metric[labelName]
|
|
labelPair := &data.LabelPairDDO{
|
|
Name: proto.String(string(labelName)),
|
|
Value: proto.String(string(labelValue)),
|
|
}
|
|
|
|
labelPairs = append(labelPairs, labelPair)
|
|
}
|
|
|
|
metricDDO := &data.MetricDDO{
|
|
LabelPair: labelPairs,
|
|
}
|
|
|
|
return metricDDO
|
|
}
|
|
|
|
func fingerprintDDOFromByteArray(fingerprint []byte) *data.FingerprintDDO {
|
|
fingerprintDDO := &data.FingerprintDDO{
|
|
Signature: proto.String(string(fingerprint)),
|
|
}
|
|
|
|
return fingerprintDDO
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) hasIndexMetric(ddo *data.MetricDDO) (bool, error) {
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo)
|
|
return l.metricMembershipIndex.Has(ddoKey)
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) indexMetric(ddo *data.MetricDDO) error {
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo)
|
|
return l.metricMembershipIndex.Put(ddoKey)
|
|
}
|
|
|
|
func fingerprintDDOForMessage(message proto.Message) (*data.FingerprintDDO, error) {
|
|
if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil {
|
|
fingerprint := model.FingerprintFromByteArray(messageByteArray)
|
|
return &data.FingerprintDDO{
|
|
Signature: proto.String(string(fingerprint)),
|
|
}, nil
|
|
} else {
|
|
return nil, marshalError
|
|
}
|
|
|
|
return nil, errors.New("Unknown error in generating FingerprintDDO from message.")
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) HasLabelPair(ddo *data.LabelPairDDO) (bool, error) {
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo)
|
|
return l.labelPairFingerprints.Has(ddoKey)
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) HasLabelName(ddo *data.LabelNameDDO) (bool, error) {
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo)
|
|
return l.labelNameFingerprints.Has(ddoKey)
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) GetLabelPairFingerprints(ddo *data.LabelPairDDO) (*data.FingerprintCollectionDDO, error) {
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo)
|
|
if get, getError := l.labelPairFingerprints.Get(ddoKey); getError == nil {
|
|
value := &data.FingerprintCollectionDDO{}
|
|
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil {
|
|
return value, nil
|
|
} else {
|
|
return nil, unmarshalError
|
|
}
|
|
} else {
|
|
return nil, getError
|
|
}
|
|
return nil, errors.New("Unknown error while getting label name and value pair fingerprints.")
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) GetLabelNameFingerprints(ddo *data.LabelNameDDO) (*data.FingerprintCollectionDDO, error) {
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo)
|
|
if get, getError := l.labelNameFingerprints.Get(ddoKey); getError == nil {
|
|
value := &data.FingerprintCollectionDDO{}
|
|
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil {
|
|
return value, nil
|
|
} else {
|
|
return nil, unmarshalError
|
|
}
|
|
} else {
|
|
return nil, getError
|
|
}
|
|
|
|
return nil, errors.New("Unknown error while getting label name fingerprints.")
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) setLabelPairFingerprints(labelPair *data.LabelPairDDO, fingerprints *data.FingerprintCollectionDDO) error {
|
|
labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair)
|
|
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
|
|
return l.labelPairFingerprints.Put(labelPairEncoded, fingerprintsEncoded)
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) setLabelNameFingerprints(labelName *data.LabelNameDDO, fingerprints *data.FingerprintCollectionDDO) error {
|
|
labelNameEncoded := coding.NewProtocolBufferEncoder(labelName)
|
|
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
|
|
return l.labelNameFingerprints.Put(labelNameEncoded, fingerprintsEncoded)
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) appendLabelPairFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error {
|
|
if has, hasError := l.HasLabelPair(labelPair); hasError == nil {
|
|
var fingerprints *data.FingerprintCollectionDDO
|
|
if has {
|
|
if existing, existingError := l.GetLabelPairFingerprints(labelPair); existingError == nil {
|
|
fingerprints = existing
|
|
} else {
|
|
return existingError
|
|
}
|
|
} else {
|
|
fingerprints = &data.FingerprintCollectionDDO{}
|
|
}
|
|
|
|
fingerprints.Member = append(fingerprints.Member, fingerprint)
|
|
|
|
return l.setLabelPairFingerprints(labelPair, fingerprints)
|
|
} else {
|
|
return hasError
|
|
}
|
|
|
|
return errors.New("Unknown error when appending fingerprint to label name and value pair.")
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) appendLabelNameFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error {
|
|
labelName := &data.LabelNameDDO{
|
|
Name: labelPair.Name,
|
|
}
|
|
|
|
if has, hasError := l.HasLabelName(labelName); hasError == nil {
|
|
var fingerprints *data.FingerprintCollectionDDO
|
|
if has {
|
|
if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil {
|
|
fingerprints = existing
|
|
} else {
|
|
return existingError
|
|
}
|
|
} else {
|
|
fingerprints = &data.FingerprintCollectionDDO{}
|
|
}
|
|
|
|
fingerprints.Member = append(fingerprints.Member, fingerprint)
|
|
|
|
return l.setLabelNameFingerprints(labelName, fingerprints)
|
|
} else {
|
|
return hasError
|
|
}
|
|
|
|
return errors.New("Unknown error when appending fingerprint to label name and value pair.")
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error {
|
|
if fingerprintDDO, fingerprintDDOError := fingerprintDDOForMessage(ddo); fingerprintDDOError == nil {
|
|
labelPairCollectionDDO := &data.LabelPairCollectionDDO{
|
|
Member: ddo.LabelPair,
|
|
}
|
|
fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDDO)
|
|
labelPairCollectionDDOEncoder := coding.NewProtocolBufferEncoder(labelPairCollectionDDO)
|
|
|
|
if putError := l.fingerprintLabelPairs.Put(fingerprintKey, labelPairCollectionDDOEncoder); putError == nil {
|
|
labelCount := len(ddo.LabelPair)
|
|
labelPairErrors := make(chan error, labelCount)
|
|
labelNameErrors := make(chan error, labelCount)
|
|
|
|
for _, labelPair := range ddo.LabelPair {
|
|
go func(labelPair *data.LabelPairDDO) {
|
|
labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDDO)
|
|
}(labelPair)
|
|
|
|
go func(labelPair *data.LabelPairDDO) {
|
|
labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDDO)
|
|
}(labelPair)
|
|
}
|
|
|
|
for i := 0; i < cap(labelPairErrors); i++ {
|
|
appendError := <-labelPairErrors
|
|
|
|
if appendError != nil {
|
|
return appendError
|
|
}
|
|
}
|
|
|
|
for i := 0; i < cap(labelNameErrors); i++ {
|
|
appendError := <-labelNameErrors
|
|
|
|
if appendError != nil {
|
|
return appendError
|
|
}
|
|
}
|
|
|
|
return nil
|
|
|
|
} else {
|
|
return putError
|
|
}
|
|
} else {
|
|
return fingerprintDDOError
|
|
}
|
|
|
|
return errors.New("Unknown error in appending label pairs to fingerprint.")
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) AppendSample(sample *model.Sample) error {
|
|
fmt.Printf("Sample: %q\n", sample)
|
|
|
|
metricDDO := ddoFromSample(sample)
|
|
|
|
if indexHas, indexHasError := l.hasIndexMetric(metricDDO); indexHasError == nil {
|
|
if !indexHas {
|
|
if indexPutError := l.indexMetric(metricDDO); indexPutError == nil {
|
|
if appendError := l.appendFingerprints(metricDDO); appendError != nil {
|
|
log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError)
|
|
return appendError
|
|
}
|
|
} else {
|
|
log.Printf("Could not add metric to membership index: %q\n", indexPutError)
|
|
return indexPutError
|
|
}
|
|
}
|
|
} else {
|
|
log.Printf("Could not query membership index for metric: %q\n", indexHasError)
|
|
return indexHasError
|
|
}
|
|
|
|
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil {
|
|
|
|
sampleKeyDDO := &data.SampleKeyDDO{
|
|
Fingerprint: fingerprintDDO,
|
|
Timestamp: indexable.EncodeTime(sample.Timestamp),
|
|
}
|
|
|
|
sampleValueDDO := &data.SampleValueDDO{
|
|
Value: proto.Float32(float32(sample.Value)),
|
|
}
|
|
|
|
sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDDO)
|
|
sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDDO)
|
|
|
|
if putError := l.fingerprintSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil {
|
|
log.Printf("Could not append metric sample: %q\n", putError)
|
|
return putError
|
|
}
|
|
} else {
|
|
log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDDOErr)
|
|
return fingerprintDDOErr
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) GetLabelNames() ([]string, error) {
|
|
if getAll, getAllError := l.labelNameFingerprints.GetAll(); getAllError == nil {
|
|
result := make([]string, 0, len(getAll))
|
|
labelNameDDO := &data.LabelNameDDO{}
|
|
|
|
for _, pair := range getAll {
|
|
if unmarshalError := proto.Unmarshal(pair.Left, labelNameDDO); unmarshalError == nil {
|
|
result = append(result, *labelNameDDO.Name)
|
|
} else {
|
|
return nil, unmarshalError
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
|
|
} else {
|
|
return nil, getAllError
|
|
}
|
|
|
|
return nil, errors.New("Unknown error encountered when querying label names.")
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) GetLabelPairs() ([]model.LabelPairs, error) {
|
|
if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil {
|
|
result := make([]model.LabelPairs, 0, len(getAll))
|
|
labelPairDDO := &data.LabelPairDDO{}
|
|
|
|
for _, pair := range getAll {
|
|
if unmarshalError := proto.Unmarshal(pair.Left, labelPairDDO); unmarshalError == nil {
|
|
item := model.LabelPairs{
|
|
*labelPairDDO.Name: *labelPairDDO.Value,
|
|
}
|
|
result = append(result, item)
|
|
} else {
|
|
return nil, unmarshalError
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
|
|
} else {
|
|
return nil, getAllError
|
|
}
|
|
|
|
return nil, errors.New("Unknown error encountered when querying label pairs.")
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) GetMetrics() ([]model.LabelPairs, error) {
|
|
log.Printf("GetMetrics()\n")
|
|
|
|
if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil {
|
|
log.Printf("getAll: %q\n", getAll)
|
|
result := make([]model.LabelPairs, 0)
|
|
fingerprintCollection := &data.FingerprintCollectionDDO{}
|
|
|
|
fingerprints := make(utility.Set)
|
|
|
|
for _, pair := range getAll {
|
|
log.Printf("pair: %q\n", pair)
|
|
if unmarshalError := proto.Unmarshal(pair.Right, fingerprintCollection); unmarshalError == nil {
|
|
for _, member := range fingerprintCollection.Member {
|
|
log.Printf("member: %q\n", member)
|
|
if !fingerprints.Has(*member.Signature) {
|
|
log.Printf("!Has: %q\n", member.Signature)
|
|
fingerprints.Add(*member.Signature)
|
|
log.Printf("fingerprints %q\n", fingerprints)
|
|
fingerprintEncoded := coding.NewProtocolBufferEncoder(member)
|
|
if labelPairCollectionRaw, labelPairCollectionRawError := l.fingerprintLabelPairs.Get(fingerprintEncoded); labelPairCollectionRawError == nil {
|
|
log.Printf("labelPairCollectionRaw: %q\n", labelPairCollectionRaw)
|
|
|
|
labelPairCollectionDDO := &data.LabelPairCollectionDDO{}
|
|
|
|
if labelPairCollectionDDOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDDO); labelPairCollectionDDOMarshalError == nil {
|
|
intermediate := make(model.LabelPairs, 0)
|
|
|
|
for _, member := range labelPairCollectionDDO.Member {
|
|
intermediate[*member.Name] = *member.Value
|
|
}
|
|
|
|
log.Printf("intermediate: %q\n", intermediate)
|
|
|
|
result = append(result, intermediate)
|
|
} else {
|
|
return nil, labelPairCollectionDDOMarshalError
|
|
}
|
|
} else {
|
|
return nil, labelPairCollectionRawError
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
return nil, unmarshalError
|
|
}
|
|
}
|
|
return result, nil
|
|
} else {
|
|
return nil, getAllError
|
|
}
|
|
|
|
return nil, errors.New("Unknown error encountered when querying metrics.")
|
|
}
|
|
|
|
func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric model.Metric) (*model.Interval, int, error) {
|
|
metricDDO := ddoFromMetric(metric)
|
|
|
|
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil {
|
|
if iterator, closer, iteratorErr := l.fingerprintSamples.GetIterator(); iteratorErr == nil {
|
|
defer closer.Close()
|
|
|
|
start := &data.SampleKeyDDO{
|
|
Fingerprint: fingerprintDDO,
|
|
Timestamp: indexable.EarliestTime,
|
|
}
|
|
|
|
if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
|
|
iterator.Seek(encode)
|
|
|
|
if iterator.Valid() {
|
|
found := &data.SampleKeyDDO{}
|
|
if unmarshalErr := proto.Unmarshal(iterator.Key(), found); unmarshalErr == nil {
|
|
var foundEntries int = 0
|
|
|
|
if *fingerprintDDO.Signature == *found.Fingerprint.Signature {
|
|
emission := &model.Interval{
|
|
OldestInclusive: indexable.DecodeTime(found.Timestamp),
|
|
NewestInclusive: indexable.DecodeTime(found.Timestamp),
|
|
}
|
|
|
|
for iterator = iterator; iterator.Valid(); iterator.Next() {
|
|
if subsequentUnmarshalErr := proto.Unmarshal(iterator.Key(), found); subsequentUnmarshalErr == nil {
|
|
if *fingerprintDDO.Signature != *found.Fingerprint.Signature {
|
|
return emission, foundEntries, nil
|
|
}
|
|
foundEntries++
|
|
log.Printf("b foundEntries++ %d\n", foundEntries)
|
|
emission.NewestInclusive = indexable.DecodeTime(found.Timestamp)
|
|
} else {
|
|
log.Printf("Could not de-serialize subsequent key: %q\n", subsequentUnmarshalErr)
|
|
return nil, -7, subsequentUnmarshalErr
|
|
}
|
|
}
|
|
return emission, foundEntries, nil
|
|
} else {
|
|
return &model.Interval{}, -6, nil
|
|
}
|
|
} else {
|
|
log.Printf("Could not de-serialize start key: %q\n", unmarshalErr)
|
|
return nil, -5, unmarshalErr
|
|
}
|
|
} else {
|
|
iteratorErr := iterator.GetError()
|
|
log.Printf("Could not seek for metric watermark beginning: %q\n", iteratorErr)
|
|
return nil, -4, iteratorErr
|
|
}
|
|
} else {
|
|
log.Printf("Could not seek for metric watermark: %q\n", encodeErr)
|
|
return nil, -3, encodeErr
|
|
}
|
|
} else {
|
|
if closer != nil {
|
|
defer closer.Close()
|
|
}
|
|
|
|
log.Printf("Could not provision iterator for metric: %q\n", iteratorErr)
|
|
return nil, -3, iteratorErr
|
|
}
|
|
} else {
|
|
log.Printf("Could not encode metric: %q\n", fingerprintDDOErr)
|
|
return nil, -2, fingerprintDDOErr
|
|
}
|
|
|
|
return nil, -1, errors.New("Unknown error occurred while querying metric watermarks.")
|
|
}
|
|
|
|
// TODO(mtp): Holes in the data!
|
|
|
|
func (l *LevigoMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) {
|
|
metricDDO := ddoFromMetric(metric)
|
|
|
|
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil {
|
|
if iterator, closer, iteratorErr := l.fingerprintSamples.GetIterator(); iteratorErr == nil {
|
|
defer closer.Close()
|
|
|
|
start := &data.SampleKeyDDO{
|
|
Fingerprint: fingerprintDDO,
|
|
Timestamp: indexable.EncodeTime(interval.OldestInclusive),
|
|
}
|
|
|
|
emission := make([]model.Samples, 0)
|
|
|
|
if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
|
|
iterator.Seek(encode)
|
|
|
|
for iterator = iterator; iterator.Valid(); iterator.Next() {
|
|
key := &data.SampleKeyDDO{}
|
|
value := &data.SampleValueDDO{}
|
|
if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil {
|
|
if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil {
|
|
if *fingerprintDDO.Signature == *key.Fingerprint.Signature {
|
|
// Wart
|
|
if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() {
|
|
emission = append(emission, model.Samples{
|
|
Value: model.SampleValue(*value.Value),
|
|
Timestamp: indexable.DecodeTime(key.Timestamp),
|
|
})
|
|
} else {
|
|
break
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
} else {
|
|
return nil, valueUnmarshalErr
|
|
}
|
|
} else {
|
|
return nil, keyUnmarshalErr
|
|
}
|
|
}
|
|
|
|
return emission, nil
|
|
|
|
} else {
|
|
log.Printf("Could not encode the start key: %q\n", encodeErr)
|
|
return nil, encodeErr
|
|
}
|
|
} else {
|
|
log.Printf("Could not acquire iterator: %q\n", iteratorErr)
|
|
return nil, iteratorErr
|
|
}
|
|
} else {
|
|
log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDDOErr)
|
|
return nil, fingerprintDDOErr
|
|
}
|
|
|
|
return nil, errors.New("Unknown error occurred while querying metric watermarks.")
|
|
}
|