2013-02-07 02:38:01 -08:00
// Copyright 2013 Prometheus Team
2012-12-09 07:27:12 -08:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2013-02-08 09:03:26 -08:00
package metric
2012-12-09 07:27:12 -08:00
import (
"code.google.com/p/goprotobuf/proto"
2013-02-08 09:03:26 -08:00
"flag"
2013-01-27 09:49:45 -08:00
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
2013-02-06 08:06:39 -08:00
"github.com/prometheus/prometheus/storage"
2013-02-08 09:03:26 -08:00
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
leveldb "github.com/prometheus/prometheus/storage/raw/leveldb"
2013-01-27 09:49:45 -08:00
"github.com/prometheus/prometheus/utility"
2013-02-08 09:03:26 -08:00
"io"
"log"
2013-02-08 09:03:26 -08:00
"sort"
2013-03-04 11:43:07 -08:00
"sync"
2012-12-12 03:53:34 -08:00
"time"
2012-12-09 07:27:12 -08:00
)
2013-02-08 09:03:26 -08:00
var (
2013-03-12 10:20:16 -07:00
leveldbChunkSize = flag . Int ( "leveldbChunkSize" , 200 , "Maximum number of samples stored under one key." )
sortConcurrency = 2
2013-02-08 09:03:26 -08:00
)
type LevelDBMetricPersistence struct {
fingerprintToMetrics * leveldb . LevelDBPersistence
labelNameToFingerprints * leveldb . LevelDBPersistence
labelSetToFingerprints * leveldb . LevelDBPersistence
2013-03-14 19:24:28 -07:00
metricHighWatermarks * leveldb . LevelDBPersistence
2013-02-08 09:03:26 -08:00
metricMembershipIndex * index . LevelDBMembershipIndex
2013-03-14 19:24:28 -07:00
metricSamples * leveldb . LevelDBPersistence
2013-02-08 09:03:26 -08:00
}
var (
// These flag values are back of the envelope, though they seem sensible.
// Please re-evaluate based on your own needs.
fingerprintsToLabelPairCacheSize = flag . Int ( "fingerprintsToLabelPairCacheSizeBytes" , 100 * 1024 * 1024 , "The size for the fingerprint to label pair index (bytes)." )
2013-03-14 19:24:28 -07:00
highWatermarkCacheSize = flag . Int ( "highWatermarksByFingerprintSizeBytes" , 50 * 1024 * 1024 , "The size for the metric high watermarks (bytes)." )
2013-02-08 09:03:26 -08:00
samplesByFingerprintCacheSize = flag . Int ( "samplesByFingerprintCacheSizeBytes" , 500 * 1024 * 1024 , "The size for the samples database (bytes)." )
labelNameToFingerprintsCacheSize = flag . Int ( "labelNameToFingerprintsCacheSizeBytes" , 100 * 1024 * 1024 , "The size for the label name to metric fingerprint index (bytes)." )
labelPairToFingerprintsCacheSize = flag . Int ( "labelPairToFingerprintsCacheSizeBytes" , 100 * 1024 * 1024 , "The size for the label pair to metric fingerprint index (bytes)." )
metricMembershipIndexCacheSize = flag . Int ( "metricMembershipCacheSizeBytes" , 50 * 1024 * 1024 , "The size for the metric membership index (bytes)." )
)
type leveldbOpener func ( )
func ( l * LevelDBMetricPersistence ) Close ( ) error {
var persistences = [ ] struct {
name string
closer io . Closer
} {
{
"Fingerprint to Label Name and Value Pairs" ,
l . fingerprintToMetrics ,
} ,
2013-03-14 19:24:28 -07:00
{
"Fingerprint High Watermarks" ,
l . metricHighWatermarks ,
} ,
2013-02-08 09:03:26 -08:00
{
"Fingerprint Samples" ,
l . metricSamples ,
} ,
{
"Label Name to Fingerprints" ,
l . labelNameToFingerprints ,
} ,
{
"Label Name and Value Pairs to Fingerprints" ,
l . labelSetToFingerprints ,
} ,
{
"Metric Membership Index" ,
l . metricMembershipIndex ,
} ,
}
errorChannel := make ( chan error , len ( persistences ) )
for _ , persistence := range persistences {
name := persistence . name
closer := persistence . closer
go func ( name string , closer io . Closer ) {
if closer != nil {
closingError := closer . Close ( )
if closingError != nil {
log . Printf ( "Could not close a LevelDBPersistence storage container; inconsistencies are possible: %q\n" , closingError )
}
errorChannel <- closingError
} else {
errorChannel <- nil
}
} ( name , closer )
}
for i := 0 ; i < cap ( errorChannel ) ; i ++ {
closingError := <- errorChannel
if closingError != nil {
return closingError
}
}
return nil
}
func NewLevelDBMetricPersistence ( baseDirectory string ) ( persistence * LevelDBMetricPersistence , err error ) {
2013-03-14 19:24:28 -07:00
errorChannel := make ( chan error , 6 )
2013-02-08 09:03:26 -08:00
emission := & LevelDBMetricPersistence { }
var subsystemOpeners = [ ] struct {
name string
opener leveldbOpener
} {
{
"Label Names and Value Pairs by Fingerprint" ,
func ( ) {
var err error
emission . fingerprintToMetrics , err = leveldb . NewLevelDBPersistence ( baseDirectory + "/label_name_and_value_pairs_by_fingerprint" , * fingerprintsToLabelPairCacheSize , 10 )
errorChannel <- err
} ,
} ,
{
"Samples by Fingerprint" ,
func ( ) {
var err error
emission . metricSamples , err = leveldb . NewLevelDBPersistence ( baseDirectory + "/samples_by_fingerprint" , * samplesByFingerprintCacheSize , 10 )
errorChannel <- err
} ,
} ,
2013-03-14 19:24:28 -07:00
{
"High Watermarks by Fingerprint" ,
func ( ) {
var err error
emission . metricHighWatermarks , err = leveldb . NewLevelDBPersistence ( baseDirectory + "/high_watermarks_by_fingerprint" , * highWatermarkCacheSize , 10 )
errorChannel <- err
} ,
} ,
2013-02-08 09:03:26 -08:00
{
"Fingerprints by Label Name" ,
func ( ) {
var err error
emission . labelNameToFingerprints , err = leveldb . NewLevelDBPersistence ( baseDirectory + "/fingerprints_by_label_name" , * labelNameToFingerprintsCacheSize , 10 )
errorChannel <- err
} ,
} ,
{
"Fingerprints by Label Name and Value Pair" ,
func ( ) {
var err error
emission . labelSetToFingerprints , err = leveldb . NewLevelDBPersistence ( baseDirectory + "/fingerprints_by_label_name_and_value_pair" , * labelPairToFingerprintsCacheSize , 10 )
errorChannel <- err
} ,
} ,
{
"Metric Membership Index" ,
func ( ) {
var err error
emission . metricMembershipIndex , err = index . NewLevelDBMembershipIndex ( baseDirectory + "/metric_membership_index" , * metricMembershipIndexCacheSize , 10 )
errorChannel <- err
} ,
} ,
}
for _ , subsystem := range subsystemOpeners {
opener := subsystem . opener
go opener ( )
}
for i := 0 ; i < cap ( errorChannel ) ; i ++ {
err = <- errorChannel
if err != nil {
log . Printf ( "Could not open a LevelDBPersistence storage container: %q\n" , err )
return
}
}
persistence = emission
return
}
func ( l * LevelDBMetricPersistence ) AppendSample ( sample model . Sample ) ( err error ) {
begin := time . Now ( )
defer func ( ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-02-08 09:03:26 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : appendSample , result : success } , map [ string ] string { operation : appendSample , result : failure } )
2013-02-08 09:03:26 -08:00
} ( )
err = l . AppendSamples ( model . Samples { sample } )
return
}
2013-03-14 15:42:28 -07:00
// groupByFingerprint collects all of the provided samples, groups them
// together by their respective metric fingerprint, and finally sorts
// them chronologically.
func groupByFingerprint ( samples model . Samples ) map [ model . Fingerprint ] model . Samples {
2013-02-08 09:03:26 -08:00
var (
fingerprintToSamples = map [ model . Fingerprint ] model . Samples { }
)
for _ , sample := range samples {
fingerprint := model . NewFingerprintFromMetric ( sample . Metric )
samples := fingerprintToSamples [ fingerprint ]
samples = append ( samples , sample )
fingerprintToSamples [ fingerprint ] = samples
}
2013-03-04 11:43:07 -08:00
var (
sortingSemaphore = make ( chan bool , sortConcurrency )
2013-03-14 15:42:28 -07:00
doneSorting sync . WaitGroup
2013-03-04 11:43:07 -08:00
)
2013-03-14 15:42:28 -07:00
2013-02-08 09:03:26 -08:00
for i := 0 ; i < sortConcurrency ; i ++ {
sortingSemaphore <- true
}
for _ , samples := range fingerprintToSamples {
2013-03-04 11:43:07 -08:00
doneSorting . Add ( 1 )
2013-03-14 15:42:28 -07:00
2013-02-08 09:03:26 -08:00
go func ( samples model . Samples ) {
<- sortingSemaphore
sort . Sort ( samples )
sortingSemaphore <- true
2013-03-04 11:43:07 -08:00
doneSorting . Done ( )
2013-02-08 09:03:26 -08:00
} ( samples )
}
2013-03-04 11:43:07 -08:00
doneSorting . Wait ( )
2013-02-08 09:03:26 -08:00
2013-03-14 15:42:28 -07:00
return fingerprintToSamples
}
2013-03-07 11:01:32 -08:00
2013-03-14 16:55:50 -07:00
// findUnindexedMetrics scours the metric membership index for each given Metric
// in the keyspace and returns a map of Fingerprint-Metric pairs that are
// absent.
func ( l * LevelDBMetricPersistence ) findUnindexedMetrics ( candidates map [ model . Fingerprint ] model . Metric ) ( unindexed map [ model . Fingerprint ] model . Metric , err error ) {
2013-03-14 15:42:28 -07:00
begin := time . Now ( )
defer func ( ) {
duration := time . Since ( begin )
2013-03-07 11:01:32 -08:00
2013-03-14 16:55:50 -07:00
recordOutcome ( duration , err , map [ string ] string { operation : findUnindexedMetrics , result : success } , map [ string ] string { operation : findUnindexedMetrics , result : failure } )
2013-03-07 11:01:32 -08:00
} ( )
2013-03-14 16:55:50 -07:00
unindexed = make ( map [ model . Fingerprint ] model . Metric )
2013-02-08 09:03:26 -08:00
// Determine which metrics are unknown in the database.
2013-03-14 16:55:50 -07:00
for fingerprint , metric := range candidates {
var (
dto = model . MetricToDTO ( metric )
indexHas , err = l . hasIndexMetric ( dto )
)
2013-02-08 09:03:26 -08:00
if err != nil {
panic ( err )
}
if ! indexHas {
2013-03-14 16:55:50 -07:00
unindexed [ fingerprint ] = metric
2013-02-08 09:03:26 -08:00
}
}
2013-03-14 16:55:50 -07:00
return
}
2013-03-14 17:19:45 -07:00
// indexLabelNames accumulates all label name to fingerprint index entries for
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
// the index to reflect the new state.
2013-03-14 17:38:34 -07:00
//
// This operation is idempotent.
2013-03-14 17:19:45 -07:00
func ( l * LevelDBMetricPersistence ) indexLabelNames ( metrics map [ model . Fingerprint ] model . Metric ) ( err error ) {
2013-03-14 16:55:50 -07:00
begin := time . Now ( )
defer func ( ) {
duration := time . Since ( begin )
2013-03-14 17:19:45 -07:00
recordOutcome ( duration , err , map [ string ] string { operation : indexLabelNames , result : success } , map [ string ] string { operation : indexLabelNames , result : failure } )
2013-03-14 16:55:50 -07:00
} ( )
2013-03-14 17:19:45 -07:00
labelNameFingerprints := map [ model . LabelName ] utility . Set { }
2013-03-14 16:55:50 -07:00
2013-03-14 17:19:45 -07:00
for fingerprint , metric := range metrics {
for labelName := range metric {
fingerprintSet , ok := labelNameFingerprints [ labelName ]
if ! ok {
fingerprintSet = utility . Set { }
fingerprints , err := l . GetFingerprintsForLabelName ( labelName )
if err != nil {
panic ( err )
return err
}
for _ , fingerprint := range fingerprints {
fingerprintSet . Add ( fingerprint )
}
}
fingerprintSet . Add ( fingerprint )
labelNameFingerprints [ labelName ] = fingerprintSet
}
2013-03-14 16:55:50 -07:00
}
2013-03-14 17:19:45 -07:00
batch := leveldb . NewBatch ( )
defer batch . Close ( )
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
for labelName , fingerprintSet := range labelNameFingerprints {
fingerprints := model . Fingerprints { }
for fingerprint := range fingerprintSet {
fingerprints = append ( fingerprints , fingerprint . ( model . Fingerprint ) )
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
sort . Sort ( fingerprints )
2013-03-06 19:04:51 -08:00
2013-03-14 17:19:45 -07:00
key := & dto . LabelName {
Name : proto . String ( string ( labelName ) ) ,
}
value := & dto . FingerprintCollection { }
for _ , fingerprint := range fingerprints {
value . Member = append ( value . Member , fingerprint . ToDTO ( ) )
2013-02-08 09:03:26 -08:00
}
2013-03-14 17:19:45 -07:00
batch . Put ( coding . NewProtocolBufferEncoder ( key ) , coding . NewProtocolBufferEncoder ( value ) )
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
err = l . labelNameToFingerprints . Commit ( batch )
if err != nil {
panic ( err )
return
}
return
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
// indexLabelPairs accumulates all label pair to fingerprint index entries for
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
// the index to reflect the new state.
2013-03-14 17:38:34 -07:00
//
// This operation is idempotent.
2013-03-14 17:19:45 -07:00
func ( l * LevelDBMetricPersistence ) indexLabelPairs ( metrics map [ model . Fingerprint ] model . Metric ) ( err error ) {
begin := time . Now ( )
defer func ( ) {
duration := time . Since ( begin )
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
recordOutcome ( duration , err , map [ string ] string { operation : indexLabelPairs , result : success } , map [ string ] string { operation : indexLabelPairs , result : failure } )
} ( )
labelPairFingerprints := map [ model . LabelPair ] utility . Set { }
for fingerprint , metric := range metrics {
for labelName , labelValue := range metric {
labelPair := model . LabelPair {
Name : labelName ,
Value : labelValue ,
2013-02-08 09:03:26 -08:00
}
2013-03-14 17:19:45 -07:00
fingerprintSet , ok := labelPairFingerprints [ labelPair ]
if ! ok {
fingerprintSet = utility . Set { }
fingerprints , err := l . GetFingerprintsForLabelSet ( model . LabelSet {
labelName : labelValue ,
} )
if err != nil {
panic ( err )
return err
}
for _ , fingerprint := range fingerprints {
fingerprintSet . Add ( fingerprint )
}
2013-02-08 09:03:26 -08:00
}
2013-03-14 17:19:45 -07:00
fingerprintSet . Add ( fingerprint )
labelPairFingerprints [ labelPair ] = fingerprintSet
2013-02-08 09:03:26 -08:00
}
2013-03-14 17:19:45 -07:00
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
batch := leveldb . NewBatch ( )
defer batch . Close ( )
for labelPair , fingerprintSet := range labelPairFingerprints {
fingerprints := model . Fingerprints { }
for fingerprint := range fingerprintSet {
fingerprints = append ( fingerprints , fingerprint . ( model . Fingerprint ) )
2013-02-08 09:03:26 -08:00
}
2013-03-14 17:19:45 -07:00
sort . Sort ( fingerprints )
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
key := & dto . LabelPair {
Name : proto . String ( string ( labelPair . Name ) ) ,
Value : proto . String ( string ( labelPair . Value ) ) ,
}
value := & dto . FingerprintCollection { }
for _ , fingerprint := range fingerprints {
value . Member = append ( value . Member , fingerprint . ToDTO ( ) )
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
batch . Put ( coding . NewProtocolBufferEncoder ( key ) , coding . NewProtocolBufferEncoder ( value ) )
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
err = l . labelSetToFingerprints . Commit ( batch )
if err != nil {
panic ( err )
return
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
return
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:38:34 -07:00
// indexFingerprints updates all of the Fingerprint to Metric reverse lookups
// in the index and then bulk updates.
//
// This operation is idempotent.
func ( l * LevelDBMetricPersistence ) indexFingerprints ( metrics map [ model . Fingerprint ] model . Metric ) ( err error ) {
begin := time . Now ( )
defer func ( ) {
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : indexFingerprints , result : success } , map [ string ] string { operation : indexFingerprints , result : failure } )
} ( )
batch := leveldb . NewBatch ( )
defer batch . Close ( )
for fingerprint , metric := range metrics {
key := coding . NewProtocolBufferEncoder ( fingerprint . ToDTO ( ) )
value := coding . NewProtocolBufferEncoder ( model . MetricToDTO ( metric ) )
batch . Put ( key , value )
}
err = l . fingerprintToMetrics . Commit ( batch )
if err != nil {
panic ( err )
}
return
}
2013-03-14 17:19:45 -07:00
// indexMetrics takes groups of samples, determines which ones contain metrics
// that are unknown to the storage stack, and then proceeds to update all
// affected indices.
func ( l * LevelDBMetricPersistence ) indexMetrics ( fingerprints map [ model . Fingerprint ] model . Metric ) ( err error ) {
begin := time . Now ( )
defer func ( ) {
duration := time . Since ( begin )
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
recordOutcome ( duration , err , map [ string ] string { operation : indexMetrics , result : success } , map [ string ] string { operation : indexMetrics , result : failure } )
} ( )
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
var (
absentMetrics map [ model . Fingerprint ] model . Metric
)
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
absentMetrics , err = l . findUnindexedMetrics ( fingerprints )
if err != nil {
panic ( err )
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
if len ( absentMetrics ) == 0 {
return
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
// TODO: For the missing fingerprints, determine what label names and pairs
// are absent and act accordingly and append fingerprints.
var (
2013-03-14 17:38:34 -07:00
doneBuildingLabelNameIndex = make ( chan error )
doneBuildingLabelPairIndex = make ( chan error )
doneBuildingFingerprintIndex = make ( chan error )
2013-03-14 17:19:45 -07:00
)
go func ( ) {
doneBuildingLabelNameIndex <- l . indexLabelNames ( absentMetrics )
} ( )
go func ( ) {
doneBuildingLabelPairIndex <- l . indexLabelPairs ( absentMetrics )
2013-02-08 09:03:26 -08:00
} ( )
2013-03-14 17:38:34 -07:00
go func ( ) {
doneBuildingFingerprintIndex <- l . indexFingerprints ( absentMetrics )
} ( )
2013-02-08 09:03:26 -08:00
makeTopLevelIndex := true
2013-03-14 17:19:45 -07:00
err = <- doneBuildingLabelNameIndex
if err != nil {
2013-02-08 09:03:26 -08:00
panic ( err )
makeTopLevelIndex = false
}
2013-03-14 17:19:45 -07:00
err = <- doneBuildingLabelPairIndex
if err != nil {
2013-02-08 09:03:26 -08:00
panic ( err )
makeTopLevelIndex = false
}
2013-03-14 17:38:34 -07:00
err = <- doneBuildingFingerprintIndex
if err != nil {
panic ( err )
makeTopLevelIndex = false
2013-02-08 09:03:26 -08:00
}
2013-03-14 17:38:34 -07:00
// If any of the preceding operations failed, we will have inconsistent
// indices. Thusly, the Metric membership index should NOT be updated, as
// its state is used to determine whether to bulk update the other indices.
// Given that those operations are idempotent, it is OK to repeat them;
// however, it will consume considerable amounts of time.
2013-02-08 09:03:26 -08:00
if makeTopLevelIndex {
batch := leveldb . NewBatch ( )
defer batch . Close ( )
// WART: We should probably encode simple fingerprints.
2013-03-14 16:55:50 -07:00
for _ , metric := range absentMetrics {
key := coding . NewProtocolBufferEncoder ( model . MetricToDTO ( metric ) )
2013-02-08 09:03:26 -08:00
batch . Put ( key , key )
}
err := l . metricMembershipIndex . Commit ( batch )
if err != nil {
panic ( err )
// Not critical.
log . Println ( err )
}
}
2013-03-14 15:42:28 -07:00
return
}
2013-03-14 19:24:28 -07:00
func ( l * LevelDBMetricPersistence ) refreshHighWatermarks ( groups map [ model . Fingerprint ] model . Samples ) ( err error ) {
begin := time . Now ( )
defer func ( ) {
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : refreshHighWatermarks , result : success } , map [ string ] string { operation : refreshHighWatermarks , result : failure } )
} ( )
batch := leveldb . NewBatch ( )
defer batch . Close ( )
var (
mutationCount = 0
)
for fingerprint , samples := range groups {
var (
key = & dto . Fingerprint { }
value = & dto . MetricHighWatermark { }
raw [ ] byte
oldestSampleTimestamp = samples [ len ( samples ) - 1 ] . Timestamp
keyEncoded = coding . NewProtocolBufferEncoder ( key )
)
key . Signature = proto . String ( fingerprint . ToRowKey ( ) )
raw , err = l . metricHighWatermarks . Get ( keyEncoded )
if err != nil {
panic ( err )
return
}
if raw != nil {
err = proto . Unmarshal ( raw , value )
if err != nil {
panic ( err )
continue
}
if oldestSampleTimestamp . Before ( time . Unix ( * value . Timestamp , 0 ) ) {
continue
}
}
value . Timestamp = proto . Int64 ( oldestSampleTimestamp . Unix ( ) )
batch . Put ( keyEncoded , coding . NewProtocolBufferEncoder ( value ) )
mutationCount ++
}
err = l . metricHighWatermarks . Commit ( batch )
if err != nil {
panic ( err )
}
return
}
2013-03-14 15:42:28 -07:00
func ( l * LevelDBMetricPersistence ) AppendSamples ( samples model . Samples ) ( err error ) {
begin := time . Now ( )
defer func ( ) {
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : appendSamples , result : success } , map [ string ] string { operation : appendSamples , result : failure } )
} ( )
var (
fingerprintToSamples = groupByFingerprint ( samples )
indexErrChan = make ( chan error )
2013-03-14 19:24:28 -07:00
watermarkErrChan = make ( chan error )
2013-03-14 15:42:28 -07:00
)
go func ( groups map [ model . Fingerprint ] model . Samples ) {
2013-03-14 16:55:50 -07:00
var (
metrics = map [ model . Fingerprint ] model . Metric { }
)
for fingerprint , samples := range groups {
metrics [ fingerprint ] = samples [ 0 ] . Metric
}
indexErrChan <- l . indexMetrics ( metrics )
2013-03-14 15:42:28 -07:00
} ( fingerprintToSamples )
2013-03-14 19:24:28 -07:00
go func ( groups map [ model . Fingerprint ] model . Samples ) {
watermarkErrChan <- l . refreshHighWatermarks ( groups )
} ( fingerprintToSamples )
2013-03-14 18:09:19 -07:00
samplesBatch := leveldb . NewBatch ( )
defer samplesBatch . Close ( )
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
for fingerprint , group := range fingerprintToSamples {
for {
lengthOfGroup := len ( group )
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
if lengthOfGroup == 0 {
break
}
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
take := * leveldbChunkSize
if lengthOfGroup < take {
take = lengthOfGroup
}
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
chunk := group [ 0 : take ]
group = group [ take : lengthOfGroup ]
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
key := & dto . SampleKey {
Fingerprint : fingerprint . ToDTO ( ) ,
Timestamp : indexable . EncodeTime ( chunk [ 0 ] . Timestamp ) ,
LastTimestamp : proto . Int64 ( chunk [ take - 1 ] . Timestamp . Unix ( ) ) ,
SampleCount : proto . Uint32 ( uint32 ( take ) ) ,
}
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
value := & dto . SampleValueSeries { }
for _ , sample := range chunk {
value . Value = append ( value . Value , & dto . SampleValueSeries_Value {
Timestamp : proto . Int64 ( sample . Timestamp . Unix ( ) ) ,
Value : proto . Float32 ( float32 ( sample . Value ) ) ,
} )
2013-03-14 15:42:28 -07:00
}
2013-03-14 18:09:19 -07:00
samplesBatch . Put ( coding . NewProtocolBufferEncoder ( key ) , coding . NewProtocolBufferEncoder ( value ) )
2013-03-14 15:42:28 -07:00
}
2013-03-14 18:09:19 -07:00
}
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
err = l . metricSamples . Commit ( samplesBatch )
if err != nil {
panic ( err )
}
2013-03-14 15:42:28 -07:00
err = <- indexErrChan
if err != nil {
panic ( err )
}
2013-02-08 09:03:26 -08:00
2013-03-14 19:24:28 -07:00
err = <- watermarkErrChan
if err != nil {
panic ( err )
}
2013-02-08 09:03:26 -08:00
return
}
2012-12-25 04:50:36 -08:00
func extractSampleKey ( i iterator ) ( k * dto . SampleKey , err error ) {
2013-03-06 18:16:20 -08:00
if i == nil {
panic ( "nil iterator" )
}
2012-12-25 04:50:36 -08:00
k = & dto . SampleKey { }
2013-03-01 09:51:36 -08:00
rawKey := i . Key ( )
if rawKey == nil {
panic ( "illegal condition; got nil key..." )
}
err = proto . Unmarshal ( rawKey , k )
2012-12-25 04:50:36 -08:00
return
}
2013-03-16 01:28:22 -07:00
func extractSampleValues ( i iterator ) ( v * dto . SampleValueSeries , err error ) {
2013-03-06 18:16:20 -08:00
if i == nil {
panic ( "nil iterator" )
}
2013-02-08 09:03:26 -08:00
v = & dto . SampleValueSeries { }
2012-12-25 04:50:36 -08:00
err = proto . Unmarshal ( i . Value ( ) , v )
return
}
func fingerprintsEqual ( l * dto . Fingerprint , r * dto . Fingerprint ) bool {
if l == r {
return true
}
if l == nil && r == nil {
return true
}
if r . Signature == l . Signature {
return true
}
if * r . Signature == * l . Signature {
return true
}
return false
}
type sampleKeyPredicate func ( k * dto . SampleKey ) bool
func keyIsOlderThan ( t time . Time ) sampleKeyPredicate {
return func ( k * dto . SampleKey ) bool {
2013-02-08 09:03:26 -08:00
return indexable . DecodeTime ( k . Timestamp ) . After ( t )
2012-12-25 04:50:36 -08:00
}
}
func keyIsAtMostOld ( t time . Time ) sampleKeyPredicate {
return func ( k * dto . SampleKey ) bool {
2013-02-08 09:03:26 -08:00
return ! indexable . DecodeTime ( k . Timestamp ) . After ( t )
2012-12-25 04:50:36 -08:00
}
}
2013-01-23 08:18:45 -08:00
func ( l * LevelDBMetricPersistence ) hasIndexMetric ( dto * dto . Metric ) ( value bool , err error ) {
begin := time . Now ( )
defer func ( ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasIndexMetric , result : success } , map [ string ] string { operation : hasIndexMetric , result : failure } )
2013-01-23 08:18:45 -08:00
} ( )
2012-12-09 07:27:12 -08:00
dtoKey := coding . NewProtocolBufferEncoder ( dto )
2013-01-23 08:18:45 -08:00
value , err = l . metricMembershipIndex . Has ( dtoKey )
return
2012-12-09 07:27:12 -08:00
}
2013-01-23 08:18:45 -08:00
func ( l * LevelDBMetricPersistence ) HasLabelPair ( dto * dto . LabelPair ) ( value bool , err error ) {
begin := time . Now ( )
defer func ( ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasLabelPair , result : success } , map [ string ] string { operation : hasLabelPair , result : failure } )
2013-01-23 08:18:45 -08:00
} ( )
2012-12-09 07:27:12 -08:00
dtoKey := coding . NewProtocolBufferEncoder ( dto )
2013-01-23 08:18:45 -08:00
value , err = l . labelSetToFingerprints . Has ( dtoKey )
return
2012-12-09 07:27:12 -08:00
}
2013-01-23 08:18:45 -08:00
func ( l * LevelDBMetricPersistence ) HasLabelName ( dto * dto . LabelName ) ( value bool , err error ) {
begin := time . Now ( )
defer func ( ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasLabelName , result : success } , map [ string ] string { operation : hasLabelName , result : failure } )
2013-01-23 08:18:45 -08:00
} ( )
2012-12-09 07:27:12 -08:00
dtoKey := coding . NewProtocolBufferEncoder ( dto )
2013-01-23 08:18:45 -08:00
value , err = l . labelNameToFingerprints . Has ( dtoKey )
return
2012-12-09 07:27:12 -08:00
}
2013-02-08 09:03:26 -08:00
func ( l * LevelDBMetricPersistence ) GetFingerprintsForLabelSet ( labelSet model . LabelSet ) ( fps model . Fingerprints , err error ) {
2013-01-23 08:18:45 -08:00
begin := time . Now ( )
2012-12-25 04:50:36 -08:00
defer func ( ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-09 07:27:12 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getFingerprintsForLabelSet , result : success } , map [ string ] string { operation : getFingerprintsForLabelSet , result : failure } )
2012-12-25 04:50:36 -08:00
} ( )
2012-12-09 07:27:12 -08:00
2013-01-13 02:15:01 -08:00
sets := [ ] utility . Set { }
2012-12-09 07:27:12 -08:00
2013-02-13 17:13:41 -08:00
for _ , labelSetDTO := range model . LabelSetToDTOs ( & labelSet ) {
2012-12-25 04:50:36 -08:00
f , err := l . labelSetToFingerprints . Get ( coding . NewProtocolBufferEncoder ( labelSetDTO ) )
if err != nil {
return fps , err
}
unmarshaled := & dto . FingerprintCollection { }
err = proto . Unmarshal ( f , unmarshaled )
if err != nil {
return fps , err
}
2013-01-13 02:15:01 -08:00
set := utility . Set { }
2012-12-25 04:50:36 -08:00
for _ , m := range unmarshaled . Member {
2013-02-08 09:03:26 -08:00
fp := model . NewFingerprintFromRowKey ( * m . Signature )
2013-01-13 02:15:01 -08:00
set . Add ( fp )
2012-12-09 07:27:12 -08:00
}
2013-01-13 02:15:01 -08:00
sets = append ( sets , set )
}
numberOfSets := len ( sets )
if numberOfSets == 0 {
return
}
base := sets [ 0 ]
for i := 1 ; i < numberOfSets ; i ++ {
base = base . Intersection ( sets [ i ] )
}
for _ , e := range base . Elements ( ) {
fingerprint := e . ( model . Fingerprint )
2013-02-14 08:07:59 -08:00
fps = append ( fps , fingerprint )
2012-12-09 07:27:12 -08:00
}
2012-12-25 04:50:36 -08:00
return
2012-12-09 07:27:12 -08:00
}
2013-02-08 09:03:26 -08:00
func ( l * LevelDBMetricPersistence ) GetFingerprintsForLabelName ( labelName model . LabelName ) ( fps model . Fingerprints , err error ) {
2013-01-23 08:18:45 -08:00
begin := time . Now ( )
2012-12-25 04:50:36 -08:00
defer func ( ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-09 07:27:12 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getFingerprintsForLabelName , result : success } , map [ string ] string { operation : getFingerprintsForLabelName , result : failure } )
2012-12-25 04:50:36 -08:00
} ( )
2012-12-09 07:27:12 -08:00
2013-02-13 17:44:35 -08:00
raw , err := l . labelNameToFingerprints . Get ( coding . NewProtocolBufferEncoder ( model . LabelNameToDTO ( & labelName ) ) )
2012-12-25 04:50:36 -08:00
if err != nil {
return
2012-12-09 07:27:12 -08:00
}
2012-12-25 04:50:36 -08:00
unmarshaled := & dto . FingerprintCollection { }
err = proto . Unmarshal ( raw , unmarshaled )
if err != nil {
return
}
for _ , m := range unmarshaled . Member {
2013-02-08 09:03:26 -08:00
fp := model . NewFingerprintFromRowKey ( * m . Signature )
2013-02-14 08:19:41 -08:00
fps = append ( fps , fp )
2012-12-25 04:50:36 -08:00
}
return
2012-12-09 07:27:12 -08:00
}
2013-02-08 09:03:26 -08:00
func ( l * LevelDBMetricPersistence ) GetMetricForFingerprint ( f model . Fingerprint ) ( m * model . Metric , err error ) {
2013-01-23 08:18:45 -08:00
begin := time . Now ( )
2012-12-25 04:50:36 -08:00
defer func ( ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-25 04:50:36 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getMetricForFingerprint , result : success } , map [ string ] string { operation : getMetricForFingerprint , result : failure } )
2012-12-25 04:50:36 -08:00
} ( )
2013-02-08 09:03:26 -08:00
raw , err := l . fingerprintToMetrics . Get ( coding . NewProtocolBufferEncoder ( model . FingerprintToDTO ( f ) ) )
2012-12-25 04:50:36 -08:00
if err != nil {
return
2012-12-09 07:27:12 -08:00
}
2012-12-25 04:50:36 -08:00
unmarshaled := & dto . Metric { }
err = proto . Unmarshal ( raw , unmarshaled )
if err != nil {
return
}
2013-02-08 09:03:26 -08:00
metric := model . Metric { }
2013-02-14 08:43:02 -08:00
2012-12-25 04:50:36 -08:00
for _ , v := range unmarshaled . LabelPair {
2013-02-08 09:03:26 -08:00
metric [ model . LabelName ( * v . Name ) ] = model . LabelValue ( * v . Value )
2012-12-25 04:50:36 -08:00
}
2012-12-12 03:53:34 -08:00
2013-02-08 09:03:26 -08:00
// Explicit address passing here shaves immense amounts of time off of the
// code flow due to less tight-loop dereferencing.
m = & metric
2012-12-25 04:50:36 -08:00
return
2012-12-12 03:53:34 -08:00
}
2013-03-18 08:46:52 -07:00
func ( l * LevelDBMetricPersistence ) GetBoundaryValues ( fp model . Fingerprint , i model . Interval , s StalenessPolicy ) ( open * model . Sample , end * model . Sample , err error ) {
2013-01-23 08:18:45 -08:00
begin := time . Now ( )
2012-12-25 04:50:36 -08:00
defer func ( ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-25 04:50:36 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getBoundaryValues , result : success } , map [ string ] string { operation : getBoundaryValues , result : failure } )
2012-12-25 04:50:36 -08:00
} ( )
// XXX: Maybe we will want to emit incomplete sets?
2013-03-18 08:46:52 -07:00
open , err = l . GetValueAtTime ( fp , i . OldestInclusive , s )
2012-12-25 04:50:36 -08:00
if err != nil {
return
} else if open == nil {
return
}
2013-03-18 08:46:52 -07:00
end , err = l . GetValueAtTime ( fp , i . NewestInclusive , s )
2012-12-25 04:50:36 -08:00
if err != nil {
return
} else if end == nil {
open = nil
}
return
2012-12-12 03:53:34 -08:00
}
2012-12-19 11:34:54 -08:00
func interpolate ( x1 , x2 time . Time , y1 , y2 float32 , e time . Time ) float32 {
yDelta := y2 - y1
xDelta := x2 . Sub ( x1 )
dDt := yDelta / float32 ( xDelta )
offset := float32 ( e . Sub ( x1 ) )
return y1 + ( offset * dDt )
}
type iterator interface {
Close ( )
Key ( ) [ ] byte
Next ( )
Prev ( )
Seek ( [ ] byte )
SeekToFirst ( )
SeekToLast ( )
Valid ( ) bool
Value ( ) [ ] byte
}
2013-03-18 08:46:52 -07:00
func ( l * LevelDBMetricPersistence ) GetValueAtTime ( fp model . Fingerprint , t time . Time , s StalenessPolicy ) ( sample * model . Sample , err error ) {
2013-01-23 08:18:45 -08:00
begin := time . Now ( )
defer func ( ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getValueAtTime , result : success } , map [ string ] string { operation : getValueAtTime , result : failure } )
2013-01-23 08:18:45 -08:00
} ( )
2013-03-18 08:46:52 -07:00
// TODO: memoize/cache this or change the return type to metric.SamplePair.
m , err := l . GetMetricForFingerprint ( fp )
if err != nil {
return
}
2012-12-19 11:34:54 -08:00
2013-01-06 09:33:22 -08:00
// Candidate for Refactoring
k := & dto . SampleKey {
2013-03-18 08:46:52 -07:00
Fingerprint : fp . ToDTO ( ) ,
2013-02-13 21:05:01 -08:00
Timestamp : indexable . EncodeTime ( t ) ,
2012-12-19 11:34:54 -08:00
}
2013-01-06 09:33:22 -08:00
e , err := coding . NewProtocolBufferEncoder ( k ) . Encode ( )
2012-12-19 11:34:54 -08:00
if err != nil {
return
}
2013-01-06 09:33:22 -08:00
iterator , closer , err := l . metricSamples . GetIterator ( )
2012-12-19 11:34:54 -08:00
if err != nil {
return
}
2013-01-06 09:33:22 -08:00
defer closer . Close ( )
2012-12-19 11:34:54 -08:00
2013-01-06 09:33:22 -08:00
iterator . Seek ( e )
if ! iterator . Valid ( ) {
/ *
* Two cases for this :
* 1. ) Corruption in LevelDB .
* 2. ) Key seek after AND outside known range .
*
* Once a LevelDB iterator goes invalid , it cannot be recovered ; thusly ,
* we need to create a new in order to check if the last value in the
* database is sufficient for our purposes . This is , in all reality , a
* corner case but one that could bring down the system .
* /
iterator , closer , err = l . metricSamples . GetIterator ( )
if err != nil {
return
}
defer closer . Close ( )
iterator . SeekToLast ( )
if ! iterator . Valid ( ) {
/ *
* For whatever reason , the LevelDB cannot be recovered .
* /
return
}
2012-12-19 11:34:54 -08:00
}
var (
2013-01-06 09:33:22 -08:00
firstKey * dto . SampleKey
2013-02-08 09:03:26 -08:00
firstValue * dto . SampleValueSeries
2012-12-19 11:34:54 -08:00
)
2013-01-06 09:33:22 -08:00
firstKey , err = extractSampleKey ( iterator )
2012-12-19 11:34:54 -08:00
if err != nil {
return
}
2013-01-07 02:23:09 -08:00
peekAhead := false
2013-01-06 09:33:22 -08:00
if ! fingerprintsEqual ( firstKey . Fingerprint , k . Fingerprint ) {
2013-01-07 02:23:09 -08:00
/ *
* This allows us to grab values for metrics if our request time is after
* the last recorded time subject to the staleness policy due to the nuances
* of LevelDB storage :
*
* # Assumptions :
* - K0 < K1 in terms of sorting .
* - T0 < T1 in terms of sorting .
*
* # Data
*
* K0 - T0
* K0 - T1
* K0 - T2
* K1 - T0
* K1 - T1
*
* # Scenario
* K0 - T3 , which does not exist , is requested . LevelDB will thusly seek to
* K1 - T1 , when K0 - T2 exists as a perfectly good candidate to check subject
* to the provided staleness policy and such .
* /
peekAhead = true
2012-12-19 11:34:54 -08:00
}
2013-01-06 09:33:22 -08:00
firstTime := indexable . DecodeTime ( firstKey . Timestamp )
2013-01-07 02:23:09 -08:00
if t . Before ( firstTime ) || peekAhead {
2013-01-06 09:33:22 -08:00
iterator . Prev ( )
if ! iterator . Valid ( ) {
/ *
* Two cases for this :
* 1. ) Corruption in LevelDB .
* 2. ) Key seek before AND outside known range .
*
* This is an explicit validation to ensure that if no previous values for
* the series are found , the query aborts .
* /
return
}
2012-12-19 11:34:54 -08:00
2013-01-06 09:33:22 -08:00
var (
alternativeKey * dto . SampleKey
2013-02-08 09:03:26 -08:00
alternativeValue * dto . SampleValueSeries
2013-01-06 09:33:22 -08:00
)
2012-12-19 11:34:54 -08:00
2013-01-06 09:33:22 -08:00
alternativeKey , err = extractSampleKey ( iterator )
if err != nil {
return
}
2012-12-19 11:34:54 -08:00
2013-01-07 02:23:09 -08:00
if ! fingerprintsEqual ( alternativeKey . Fingerprint , k . Fingerprint ) {
return
2013-01-06 09:33:22 -08:00
}
2013-01-07 02:23:09 -08:00
/ *
* At this point , we found a previous value in the same series in the
* database . LevelDB originally seeked to the subsequent element given
* the key , but we need to consider this adjacency instead .
* /
alternativeTime := indexable . DecodeTime ( alternativeKey . Timestamp )
firstKey = alternativeKey
firstValue = alternativeValue
firstTime = alternativeTime
2012-12-19 11:34:54 -08:00
}
2013-02-13 21:05:01 -08:00
firstDelta := firstTime . Sub ( t )
2013-01-06 09:33:22 -08:00
if firstDelta < 0 {
firstDelta *= - 1
2012-12-19 11:34:54 -08:00
}
2013-01-06 09:33:22 -08:00
if firstDelta > s . DeltaAllowance {
2012-12-19 11:34:54 -08:00
return
}
2013-03-16 01:28:22 -07:00
firstValue , err = extractSampleValues ( iterator )
2012-12-19 11:34:54 -08:00
if err != nil {
return
}
2013-03-18 08:46:52 -07:00
sample = model . SampleFromDTO ( m , & t , firstValue )
2012-12-19 11:34:54 -08:00
2013-01-06 09:33:22 -08:00
if firstDelta == time . Duration ( 0 ) {
2012-12-19 11:34:54 -08:00
return
}
2013-01-06 09:33:22 -08:00
iterator . Next ( )
if ! iterator . Valid ( ) {
/ *
* Two cases for this :
* 1. ) Corruption in LevelDB .
* 2. ) Key seek after AND outside known range .
*
* This means that there are no more values left in the storage ; and if this
* point is reached , we know that the one that has been found is within the
* allowed staleness limits .
* /
2012-12-25 04:50:36 -08:00
return
}
2013-01-06 09:33:22 -08:00
var secondKey * dto . SampleKey
2012-12-19 11:34:54 -08:00
2013-01-06 09:33:22 -08:00
secondKey , err = extractSampleKey ( iterator )
2012-12-25 04:50:36 -08:00
if err != nil {
2012-12-19 11:34:54 -08:00
return
}
2013-01-06 09:33:22 -08:00
if ! fingerprintsEqual ( secondKey . Fingerprint , k . Fingerprint ) {
2012-12-25 04:50:36 -08:00
return
2013-01-06 09:33:22 -08:00
} else {
/ *
* At this point , current entry in the database has the same key as the
* previous . For this reason , the validation logic will expect that the
* distance between the two points shall not exceed the staleness policy
* allowed limit to reduce interpolation errors .
*
* For this reason , the sample is reset in case of other subsequent
* validation behaviors .
* /
sample = nil
2012-12-19 11:34:54 -08:00
}
2013-01-06 09:33:22 -08:00
secondTime := indexable . DecodeTime ( secondKey . Timestamp )
2012-12-19 11:34:54 -08:00
2013-01-06 09:33:22 -08:00
totalDelta := secondTime . Sub ( firstTime )
if totalDelta > s . DeltaAllowance {
2012-12-19 11:34:54 -08:00
return
}
2013-02-08 09:03:26 -08:00
var secondValue * dto . SampleValueSeries
2012-12-19 11:34:54 -08:00
2013-03-16 01:28:22 -07:00
secondValue , err = extractSampleValues ( iterator )
2013-01-06 09:33:22 -08:00
if err != nil {
2012-12-25 04:50:36 -08:00
return
2012-12-19 11:34:54 -08:00
}
2013-02-08 09:03:26 -08:00
fValue := * firstValue . Value [ 0 ] . Value
sValue := * secondValue . Value [ 0 ] . Value
2012-12-25 04:50:36 -08:00
2013-02-08 09:03:26 -08:00
interpolated := interpolate ( firstTime , secondTime , fValue , sValue , t )
sampleValue := & dto . SampleValueSeries { }
sampleValue . Value = append ( sampleValue . Value , & dto . SampleValueSeries_Value { Value : & interpolated } )
2012-12-19 11:34:54 -08:00
2013-03-18 08:46:52 -07:00
sample = model . SampleFromDTO ( m , & t , sampleValue )
2013-01-06 09:33:22 -08:00
2012-12-25 04:50:36 -08:00
return
2012-12-12 03:53:34 -08:00
}
2013-03-18 08:46:52 -07:00
func ( l * LevelDBMetricPersistence ) GetRangeValues ( fp model . Fingerprint , i model . Interval ) ( v * model . SampleSet , err error ) {
2013-01-23 08:18:45 -08:00
begin := time . Now ( )
defer func ( ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getRangeValues , result : success } , map [ string ] string { operation : getRangeValues , result : failure } )
2013-01-23 08:18:45 -08:00
} ( )
2012-12-25 04:50:36 -08:00
k := & dto . SampleKey {
2013-03-18 08:46:52 -07:00
Fingerprint : fp . ToDTO ( ) ,
2012-12-25 04:50:36 -08:00
Timestamp : indexable . EncodeTime ( i . OldestInclusive ) ,
}
e , err := coding . NewProtocolBufferEncoder ( k ) . Encode ( )
if err != nil {
return
}
iterator , closer , err := l . metricSamples . GetIterator ( )
if err != nil {
return
}
defer closer . Close ( )
iterator . Seek ( e )
predicate := keyIsOlderThan ( i . NewestInclusive )
for ; iterator . Valid ( ) ; iterator . Next ( ) {
retrievedKey := & dto . SampleKey { }
retrievedKey , err = extractSampleKey ( iterator )
if err != nil {
return
}
if predicate ( retrievedKey ) {
break
}
if ! fingerprintsEqual ( retrievedKey . Fingerprint , k . Fingerprint ) {
break
}
2013-03-16 01:28:22 -07:00
retrievedValue , err := extractSampleValues ( iterator )
2012-12-25 04:50:36 -08:00
if err != nil {
return nil , err
}
if v == nil {
2013-03-18 09:11:23 -07:00
// TODO: memoize/cache this or change the return type to metric.SamplePair.
m , err := l . GetMetricForFingerprint ( fp )
if err != nil {
return v , err
}
v = & model . SampleSet {
Metric : * m ,
}
2012-12-25 04:50:36 -08:00
}
v . Values = append ( v . Values , model . SamplePair {
2013-02-08 09:03:26 -08:00
Value : model . SampleValue ( * retrievedValue . Value [ 0 ] . Value ) ,
2012-12-25 04:50:36 -08:00
Timestamp : indexable . DecodeTime ( retrievedKey . Timestamp ) ,
} )
}
2013-02-08 09:03:26 -08:00
// XXX: We should not explicitly sort here but rather rely on the datastore.
// This adds appreciable overhead.
if v != nil {
sort . Sort ( v . Values )
}
2012-12-25 04:50:36 -08:00
return
2012-12-12 03:53:34 -08:00
}
2013-02-06 08:06:39 -08:00
type MetricKeyDecoder struct { }
func ( d * MetricKeyDecoder ) DecodeKey ( in interface { } ) ( out interface { } , err error ) {
unmarshaled := & dto . LabelPair { }
err = proto . Unmarshal ( in . ( [ ] byte ) , unmarshaled )
if err != nil {
return
}
2013-02-08 09:03:26 -08:00
out = unmarshaled
return
2013-02-06 08:06:39 -08:00
}
func ( d * MetricKeyDecoder ) DecodeValue ( in interface { } ) ( out interface { } , err error ) {
return
}
2013-02-07 02:38:01 -08:00
type MetricNamesFilter struct { }
2013-02-06 08:06:39 -08:00
2013-02-07 02:38:01 -08:00
func ( f * MetricNamesFilter ) Filter ( key , value interface { } ) ( filterResult storage . FilterResult ) {
unmarshaled , ok := key . ( * dto . LabelPair )
if ok && * unmarshaled . Name == "name" {
return storage . ACCEPT
}
return storage . SKIP
2013-02-06 08:06:39 -08:00
}
type CollectMetricNamesOp struct {
2013-02-07 02:38:01 -08:00
metricNames [ ] string
2013-02-06 08:06:39 -08:00
}
func ( op * CollectMetricNamesOp ) Operate ( key , value interface { } ) ( err * storage . OperatorError ) {
2013-02-07 02:38:01 -08:00
unmarshaled := key . ( * dto . LabelPair )
op . metricNames = append ( op . metricNames , * unmarshaled . Value )
2013-02-06 08:06:39 -08:00
return
}
func ( l * LevelDBMetricPersistence ) GetAllMetricNames ( ) ( metricNames [ ] string , err error ) {
2013-02-07 02:38:01 -08:00
metricNamesOp := & CollectMetricNamesOp { }
2013-02-06 08:06:39 -08:00
2013-02-07 02:38:01 -08:00
_ , err = l . labelSetToFingerprints . ForEach ( & MetricKeyDecoder { } , & MetricNamesFilter { } , metricNamesOp )
2013-02-06 08:06:39 -08:00
if err != nil {
return
}
2013-02-07 02:38:01 -08:00
metricNames = metricNamesOp . metricNames
2013-02-06 08:06:39 -08:00
return
}
2013-02-08 09:03:26 -08:00
func ( l * LevelDBMetricPersistence ) ForEachSample ( builder IteratorsForFingerprintBuilder ) ( err error ) {
panic ( "not implemented" )
}