2013-02-07 02:38:01 -08:00
// Copyright 2013 Prometheus Team
2012-12-09 07:27:12 -08:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2013-02-08 09:03:26 -08:00
package metric
2012-12-09 07:27:12 -08:00
import (
2013-02-08 09:03:26 -08:00
"flag"
2013-04-28 05:47:43 -07:00
"fmt"
2013-02-08 09:03:26 -08:00
"log"
2013-02-08 09:03:26 -08:00
"sort"
2013-03-04 11:43:07 -08:00
"sync"
2012-12-12 03:53:34 -08:00
"time"
2012-12-09 07:27:12 -08:00
2013-06-08 01:27:44 -07:00
"code.google.com/p/goprotobuf/proto"
2013-06-25 05:02:27 -07:00
clientmodel "github.com/prometheus/client_golang/model"
2013-06-08 01:27:44 -07:00
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
2013-08-05 08:31:49 -07:00
"github.com/prometheus/prometheus/storage/raw"
2013-06-08 01:27:44 -07:00
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
2013-02-08 09:03:26 -08:00
)
2013-06-08 01:27:44 -07:00
const sortConcurrency = 2
2013-02-08 09:03:26 -08:00
type LevelDBMetricPersistence struct {
2013-08-06 03:00:31 -07:00
CurationRemarks CurationRemarker
2013-08-03 09:46:02 -07:00
fingerprintToMetrics FingerprintMetricIndex
labelNameToFingerprints LabelNameFingerprintIndex
labelSetToFingerprints LabelSetFingerprintIndex
2013-08-05 00:25:47 -07:00
MetricHighWatermarks HighWatermarker
2013-08-03 09:46:02 -07:00
metricMembershipIndex MetricMembershipIndex
2013-05-02 03:49:13 -07:00
MetricSamples * leveldb . LevelDBPersistence
2013-02-08 09:03:26 -08:00
}
var (
2013-05-21 07:11:35 -07:00
leveldbChunkSize = flag . Int ( "leveldbChunkSize" , 200 , "Maximum number of samples stored under one key." )
2013-02-08 09:03:26 -08:00
// These flag values are back of the envelope, though they seem sensible.
// Please re-evaluate based on your own needs.
2013-06-24 03:09:16 -07:00
curationRemarksCacheSize = flag . Int ( "curationRemarksCacheSize" , 5 * 1024 * 1024 , "The size for the curation remarks cache (bytes)." )
fingerprintsToLabelPairCacheSize = flag . Int ( "fingerprintsToLabelPairCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the fingerprint to label pair index (bytes)." )
highWatermarkCacheSize = flag . Int ( "highWatermarksByFingerprintSizeBytes" , 5 * 1024 * 1024 , "The size for the metric high watermarks (bytes)." )
labelNameToFingerprintsCacheSize = flag . Int ( "labelNameToFingerprintsCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the label name to metric fingerprint index (bytes)." )
labelPairToFingerprintsCacheSize = flag . Int ( "labelPairToFingerprintsCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the label pair to metric fingerprint index (bytes)." )
metricMembershipIndexCacheSize = flag . Int ( "metricMembershipCacheSizeBytes" , 5 * 1024 * 1024 , "The size for the metric membership index (bytes)." )
samplesByFingerprintCacheSize = flag . Int ( "samplesByFingerprintCacheSizeBytes" , 50 * 1024 * 1024 , "The size for the samples database (bytes)." )
2013-02-08 09:03:26 -08:00
)
type leveldbOpener func ( )
2013-08-03 09:46:02 -07:00
type errorCloser interface {
Close ( ) error
}
type closer interface {
2013-04-01 04:22:38 -07:00
Close ( )
}
2013-02-08 09:03:26 -08:00
2013-04-01 04:22:38 -07:00
func ( l * LevelDBMetricPersistence ) Close ( ) {
2013-08-03 09:46:02 -07:00
var persistences = [ ] interface { } {
2013-05-02 03:49:13 -07:00
l . CurationRemarks ,
2013-04-01 04:22:38 -07:00
l . fingerprintToMetrics ,
l . labelNameToFingerprints ,
l . labelSetToFingerprints ,
2013-05-02 03:49:13 -07:00
l . MetricHighWatermarks ,
2013-04-01 04:22:38 -07:00
l . metricMembershipIndex ,
2013-05-02 03:49:13 -07:00
l . MetricSamples ,
2013-02-08 09:03:26 -08:00
}
2013-04-01 04:22:38 -07:00
closerGroup := sync . WaitGroup { }
2013-02-08 09:03:26 -08:00
2013-08-03 09:46:02 -07:00
for _ , c := range persistences {
2013-04-01 04:22:38 -07:00
closerGroup . Add ( 1 )
2013-08-03 09:46:02 -07:00
go func ( c interface { } ) {
if c != nil {
switch closer := c . ( type ) {
case closer :
closer . Close ( )
case errorCloser :
if err := closer . Close ( ) ; err != nil {
2013-08-05 08:31:49 -07:00
log . Println ( "anomaly closing:" , err )
2013-08-03 09:46:02 -07:00
}
}
2013-04-24 02:51:40 -07:00
}
2013-04-01 04:22:38 -07:00
closerGroup . Done ( )
2013-08-03 09:46:02 -07:00
} ( c )
2013-02-08 09:03:26 -08:00
}
2013-04-01 04:22:38 -07:00
closerGroup . Wait ( )
2013-02-08 09:03:26 -08:00
}
2013-04-28 05:47:43 -07:00
func NewLevelDBMetricPersistence ( baseDirectory string ) ( * LevelDBMetricPersistence , error ) {
workers := utility . NewUncertaintyGroup ( 7 )
2013-02-08 09:03:26 -08:00
2013-06-25 05:02:27 -07:00
emission := new ( LevelDBMetricPersistence )
2013-02-08 09:03:26 -08:00
var subsystemOpeners = [ ] struct {
name string
opener leveldbOpener
} {
{
"Label Names and Value Pairs by Fingerprint" ,
func ( ) {
var err error
2013-08-03 09:46:02 -07:00
emission . fingerprintToMetrics , err = NewLevelDBFingerprintMetricIndex ( & LevelDBFingerprintMetricIndexOptions {
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Metrics by Fingerprint" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/label_name_and_value_pairs_by_fingerprint" ,
CacheSizeBytes : * fingerprintsToLabelPairCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
{
"Samples by Fingerprint" ,
func ( ) {
var err error
2013-08-03 08:25:03 -07:00
o := & leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Samples" ,
Purpose : "Timeseries" ,
2013-08-03 08:25:03 -07:00
Path : baseDirectory + "/samples_by_fingerprint" ,
CacheSizeBytes : * fingerprintsToLabelPairCacheSize ,
}
emission . MetricSamples , err = leveldb . NewLevelDBPersistence ( o )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
2013-03-14 19:24:28 -07:00
{
"High Watermarks by Fingerprint" ,
func ( ) {
var err error
2013-08-05 00:25:47 -07:00
emission . MetricHighWatermarks , err = NewLevelDBHighWatermarker ( & LevelDBHighWatermarkerOptions {
LevelDBOptions : leveldb . LevelDBOptions {
Name : "High Watermarks" ,
Purpose : "The youngest sample in the database per metric." ,
Path : baseDirectory + "/high_watermarks_by_fingerprint" ,
CacheSizeBytes : * highWatermarkCacheSize ,
} } )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-03-14 19:24:28 -07:00
} ,
} ,
2013-02-08 09:03:26 -08:00
{
"Fingerprints by Label Name" ,
func ( ) {
var err error
2013-08-03 09:46:02 -07:00
emission . labelNameToFingerprints , err = NewLevelLabelNameFingerprintIndex ( & LevelDBLabelNameFingerprintIndexOptions {
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Fingerprints by Label Name" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/fingerprints_by_label_name" ,
CacheSizeBytes : * labelNameToFingerprintsCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
{
"Fingerprints by Label Name and Value Pair" ,
func ( ) {
var err error
2013-08-03 09:46:02 -07:00
emission . labelSetToFingerprints , err = NewLevelDBLabelSetFingerprintIndex ( & LevelDBLabelSetFingerprintIndexOptions {
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Fingerprints by Label Pair" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/fingerprints_by_label_name_and_value_pair" ,
CacheSizeBytes : * labelPairToFingerprintsCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
{
"Metric Membership Index" ,
func ( ) {
var err error
2013-08-03 09:46:02 -07:00
emission . metricMembershipIndex , err = NewLevelDBMetricMembershipIndex (
& LevelDBMetricMembershipIndexOptions {
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Metric Membership" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/metric_membership_index" ,
CacheSizeBytes : * metricMembershipIndexCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
} ,
} ,
{
"Sample Curation Remarks" ,
func ( ) {
var err error
2013-08-06 03:00:31 -07:00
emission . CurationRemarks , err = NewLevelDBCurationRemarker ( & LevelDBCurationRemarkerOptions {
LevelDBOptions : leveldb . LevelDBOptions {
Name : "Sample Curation Remarks" ,
Purpose : "Ledger of Progress for Various Curators" ,
Path : baseDirectory + "/curation_remarks" ,
CacheSizeBytes : * curationRemarksCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
}
for _ , subsystem := range subsystemOpeners {
opener := subsystem . opener
go opener ( )
}
2013-04-28 05:47:43 -07:00
if ! workers . Wait ( ) {
for _ , err := range workers . Errors ( ) {
log . Printf ( "Could not open storage due to %s" , err )
2013-02-08 09:03:26 -08:00
}
2013-04-28 05:47:43 -07:00
return nil , fmt . Errorf ( "Unable to open metric persistence." )
2013-02-08 09:03:26 -08:00
}
2013-04-28 05:47:43 -07:00
return emission , nil
2013-02-08 09:03:26 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) AppendSample ( sample * clientmodel . Sample ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-02-08 09:03:26 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : appendSample , result : success } , map [ string ] string { operation : appendSample , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-02-08 09:03:26 -08:00
2013-06-25 05:02:27 -07:00
err = l . AppendSamples ( clientmodel . Samples { sample } )
2013-02-08 09:03:26 -08:00
return
}
2013-03-14 15:42:28 -07:00
// groupByFingerprint collects all of the provided samples, groups them
// together by their respective metric fingerprint, and finally sorts
// them chronologically.
2013-06-25 05:02:27 -07:00
func groupByFingerprint ( samples clientmodel . Samples ) map [ clientmodel . Fingerprint ] clientmodel . Samples {
fingerprintToSamples := map [ clientmodel . Fingerprint ] clientmodel . Samples { }
2013-02-08 09:03:26 -08:00
for _ , sample := range samples {
2013-06-25 05:02:27 -07:00
fingerprint := & clientmodel . Fingerprint { }
fingerprint . LoadFromMetric ( sample . Metric )
samples := fingerprintToSamples [ * fingerprint ]
2013-02-08 09:03:26 -08:00
samples = append ( samples , sample )
2013-06-25 05:02:27 -07:00
fingerprintToSamples [ * fingerprint ] = samples
2013-02-08 09:03:26 -08:00
}
2013-05-21 07:11:35 -07:00
sortingSemaphore := make ( chan bool , sortConcurrency )
doneSorting := sync . WaitGroup { }
2013-03-14 15:42:28 -07:00
2013-02-08 09:03:26 -08:00
for _ , samples := range fingerprintToSamples {
2013-03-04 11:43:07 -08:00
doneSorting . Add ( 1 )
2013-03-14 15:42:28 -07:00
2013-06-25 05:02:27 -07:00
sortingSemaphore <- true
go func ( samples clientmodel . Samples ) {
2013-02-08 09:03:26 -08:00
sort . Sort ( samples )
2013-06-25 05:02:27 -07:00
<- sortingSemaphore
2013-03-04 11:43:07 -08:00
doneSorting . Done ( )
2013-02-08 09:03:26 -08:00
} ( samples )
}
2013-03-04 11:43:07 -08:00
doneSorting . Wait ( )
2013-02-08 09:03:26 -08:00
2013-03-14 15:42:28 -07:00
return fingerprintToSamples
}
2013-03-07 11:01:32 -08:00
2013-03-14 16:55:50 -07:00
// findUnindexedMetrics scours the metric membership index for each given Metric
// in the keyspace and returns a map of Fingerprint-Metric pairs that are
// absent.
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) findUnindexedMetrics ( candidates map [ clientmodel . Fingerprint ] clientmodel . Metric ) ( unindexed FingerprintMetricMapping , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-14 15:42:28 -07:00
duration := time . Since ( begin )
2013-03-07 11:01:32 -08:00
2013-03-14 16:55:50 -07:00
recordOutcome ( duration , err , map [ string ] string { operation : findUnindexedMetrics , result : success } , map [ string ] string { operation : findUnindexedMetrics , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-03-07 11:01:32 -08:00
2013-08-03 09:46:02 -07:00
unindexed = FingerprintMetricMapping { }
2013-03-14 16:55:50 -07:00
for fingerprint , metric := range candidates {
2013-08-03 09:46:02 -07:00
indexHas , err := l . hasIndexMetric ( metric )
2013-02-08 09:03:26 -08:00
if err != nil {
2013-04-05 09:03:45 -07:00
return unindexed , err
2013-02-08 09:03:26 -08:00
}
if ! indexHas {
2013-03-14 16:55:50 -07:00
unindexed [ fingerprint ] = metric
2013-02-08 09:03:26 -08:00
}
}
2013-06-25 05:02:27 -07:00
return unindexed , nil
2013-03-14 16:55:50 -07:00
}
2013-03-14 17:19:45 -07:00
// indexLabelNames accumulates all label name to fingerprint index entries for
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
// the index to reflect the new state.
2013-03-14 17:38:34 -07:00
//
// This operation is idempotent.
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) indexLabelNames ( metrics FingerprintMetricMapping ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-14 16:55:50 -07:00
duration := time . Since ( begin )
2013-03-14 17:19:45 -07:00
recordOutcome ( duration , err , map [ string ] string { operation : indexLabelNames , result : success } , map [ string ] string { operation : indexLabelNames , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-03-14 16:55:50 -07:00
2013-08-03 09:46:02 -07:00
retrieved := map [ clientmodel . LabelName ] utility . Set { }
2013-03-14 16:55:50 -07:00
2013-03-14 17:19:45 -07:00
for fingerprint , metric := range metrics {
for labelName := range metric {
2013-08-03 09:46:02 -07:00
fingerprintSet , ok := retrieved [ labelName ]
2013-03-14 17:19:45 -07:00
if ! ok {
fingerprints , err := l . GetFingerprintsForLabelName ( labelName )
if err != nil {
return err
}
2013-08-03 09:46:02 -07:00
fingerprintSet = utility . Set { }
retrieved [ labelName ] = fingerprintSet
2013-03-14 17:19:45 -07:00
for _ , fingerprint := range fingerprints {
2013-05-17 03:58:15 -07:00
fingerprintSet . Add ( * fingerprint )
2013-03-14 17:19:45 -07:00
}
}
fingerprintSet . Add ( fingerprint )
}
2013-03-14 16:55:50 -07:00
}
2013-08-03 09:46:02 -07:00
pending := LabelNameFingerprintMapping { }
for name , set := range retrieved {
fps := pending [ name ]
for fp := range set {
f := fp . ( clientmodel . Fingerprint )
fps = append ( fps , & f )
2013-03-14 17:19:45 -07:00
}
2013-08-03 09:46:02 -07:00
pending [ name ] = fps
2013-03-14 17:19:45 -07:00
}
2013-08-03 09:46:02 -07:00
return l . labelNameToFingerprints . IndexBatch ( pending )
2013-03-14 17:19:45 -07:00
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
// indexLabelPairs accumulates all label pair to fingerprint index entries for
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
// the index to reflect the new state.
2013-03-14 17:38:34 -07:00
//
// This operation is idempotent.
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) indexLabelPairs ( metrics map [ clientmodel . Fingerprint ] clientmodel . Metric ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-14 17:19:45 -07:00
duration := time . Since ( begin )
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
recordOutcome ( duration , err , map [ string ] string { operation : indexLabelPairs , result : success } , map [ string ] string { operation : indexLabelPairs , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-03-14 17:19:45 -07:00
2013-08-03 09:46:02 -07:00
collection := map [ LabelPair ] utility . Set { }
2013-03-14 17:19:45 -07:00
for fingerprint , metric := range metrics {
for labelName , labelValue := range metric {
2013-06-25 05:02:27 -07:00
labelPair := LabelPair {
2013-03-14 17:19:45 -07:00
Name : labelName ,
Value : labelValue ,
2013-02-08 09:03:26 -08:00
}
2013-08-03 09:46:02 -07:00
fingerprintSet , ok := collection [ labelPair ]
2013-03-14 17:19:45 -07:00
if ! ok {
2013-08-03 09:46:02 -07:00
fingerprints , _ , err := l . labelSetToFingerprints . Lookup ( & labelPair )
2013-03-14 17:19:45 -07:00
if err != nil {
return err
}
2013-08-03 09:46:02 -07:00
fingerprintSet = utility . Set { }
2013-03-14 17:19:45 -07:00
for _ , fingerprint := range fingerprints {
2013-05-17 03:58:15 -07:00
fingerprintSet . Add ( * fingerprint )
2013-03-14 17:19:45 -07:00
}
2013-08-03 09:46:02 -07:00
collection [ labelPair ] = fingerprintSet
2013-02-08 09:03:26 -08:00
}
2013-03-14 17:19:45 -07:00
fingerprintSet . Add ( fingerprint )
2013-02-08 09:03:26 -08:00
}
2013-03-14 17:19:45 -07:00
}
2013-02-08 09:03:26 -08:00
2013-08-03 09:46:02 -07:00
batch := LabelSetFingerprintMapping { }
2013-02-08 09:03:26 -08:00
2013-08-03 09:46:02 -07:00
for pair , elements := range collection {
fps := batch [ pair ]
for element := range elements {
fp := element . ( clientmodel . Fingerprint )
fps = append ( fps , & fp )
2013-03-14 17:19:45 -07:00
}
2013-08-03 09:46:02 -07:00
batch [ pair ] = fps
2013-03-14 17:19:45 -07:00
}
2013-02-08 09:03:26 -08:00
2013-08-03 09:46:02 -07:00
return l . labelSetToFingerprints . IndexBatch ( batch )
2013-03-14 17:19:45 -07:00
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:38:34 -07:00
// indexFingerprints updates all of the Fingerprint to Metric reverse lookups
// in the index and then bulk updates.
//
// This operation is idempotent.
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) indexFingerprints ( b FingerprintMetricMapping ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-14 17:38:34 -07:00
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : indexFingerprints , result : success } , map [ string ] string { operation : indexFingerprints , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-03-14 17:38:34 -07:00
2013-08-03 09:46:02 -07:00
return l . fingerprintToMetrics . IndexBatch ( b )
2013-03-14 17:38:34 -07:00
}
2013-03-14 17:19:45 -07:00
// indexMetrics takes groups of samples, determines which ones contain metrics
// that are unknown to the storage stack, and then proceeds to update all
// affected indices.
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) indexMetrics ( fingerprints FingerprintMetricMapping ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-14 17:19:45 -07:00
duration := time . Since ( begin )
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
recordOutcome ( duration , err , map [ string ] string { operation : indexMetrics , result : success } , map [ string ] string { operation : indexMetrics , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-02-08 09:03:26 -08:00
2013-08-03 09:46:02 -07:00
absentees , err := l . findUnindexedMetrics ( fingerprints )
2013-03-14 17:19:45 -07:00
if err != nil {
2013-04-05 09:03:45 -07:00
return
2013-03-14 17:19:45 -07:00
}
2013-02-08 09:03:26 -08:00
2013-08-03 09:46:02 -07:00
if len ( absentees ) == 0 {
2013-03-14 17:19:45 -07:00
return
}
2013-02-08 09:03:26 -08:00
2013-03-14 17:19:45 -07:00
// TODO: For the missing fingerprints, determine what label names and pairs
// are absent and act accordingly and append fingerprints.
2013-04-28 05:47:43 -07:00
workers := utility . NewUncertaintyGroup ( 3 )
2013-03-14 17:19:45 -07:00
go func ( ) {
2013-08-03 09:46:02 -07:00
workers . MayFail ( l . indexLabelNames ( absentees ) )
2013-03-14 17:19:45 -07:00
} ( )
go func ( ) {
2013-08-03 09:46:02 -07:00
workers . MayFail ( l . indexLabelPairs ( absentees ) )
2013-02-08 09:03:26 -08:00
} ( )
2013-03-14 17:38:34 -07:00
go func ( ) {
2013-08-03 09:46:02 -07:00
workers . MayFail ( l . indexFingerprints ( absentees ) )
2013-03-14 17:38:34 -07:00
} ( )
// If any of the preceding operations failed, we will have inconsistent
// indices. Thusly, the Metric membership index should NOT be updated, as
// its state is used to determine whether to bulk update the other indices.
// Given that those operations are idempotent, it is OK to repeat them;
// however, it will consume considerable amounts of time.
2013-08-03 09:46:02 -07:00
if ! workers . Wait ( ) {
return fmt . Errorf ( "Could not index due to %s" , workers . Errors ( ) )
2013-04-05 09:03:45 -07:00
}
2013-08-03 09:46:02 -07:00
ms := [ ] clientmodel . Metric { }
for _ , m := range absentees {
ms = append ( ms , m )
2013-02-08 09:03:26 -08:00
}
2013-08-03 09:46:02 -07:00
return l . metricMembershipIndex . IndexBatch ( ms )
2013-03-14 15:42:28 -07:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) refreshHighWatermarks ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-14 19:24:28 -07:00
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : refreshHighWatermarks , result : success } , map [ string ] string { operation : refreshHighWatermarks , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-03-14 19:24:28 -07:00
2013-08-05 00:25:47 -07:00
b := FingerprintHighWatermarkMapping { }
for fp , ss := range groups {
if len ( ss ) == 0 {
2013-06-08 01:27:44 -07:00
continue
}
2013-03-14 19:24:28 -07:00
2013-08-05 00:25:47 -07:00
b [ fp ] = ss [ len ( ss ) - 1 ] . Timestamp
2013-03-14 19:24:28 -07:00
}
2013-08-05 00:25:47 -07:00
return l . MetricHighWatermarks . UpdateBatch ( b )
2013-03-14 19:24:28 -07:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) AppendSamples ( samples clientmodel . Samples ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-14 15:42:28 -07:00
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : appendSamples , result : success } , map [ string ] string { operation : appendSamples , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-03-14 15:42:28 -07:00
2013-05-21 07:11:35 -07:00
fingerprintToSamples := groupByFingerprint ( samples )
indexErrChan := make ( chan error , 1 )
watermarkErrChan := make ( chan error , 1 )
2013-03-14 15:42:28 -07:00
2013-06-25 05:02:27 -07:00
go func ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) {
2013-08-03 09:46:02 -07:00
metrics := FingerprintMetricMapping { }
2013-03-14 16:55:50 -07:00
for fingerprint , samples := range groups {
metrics [ fingerprint ] = samples [ 0 ] . Metric
}
indexErrChan <- l . indexMetrics ( metrics )
2013-03-14 15:42:28 -07:00
} ( fingerprintToSamples )
2013-06-25 05:02:27 -07:00
go func ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) {
2013-03-14 19:24:28 -07:00
watermarkErrChan <- l . refreshHighWatermarks ( groups )
} ( fingerprintToSamples )
2013-03-14 18:09:19 -07:00
samplesBatch := leveldb . NewBatch ( )
defer samplesBatch . Close ( )
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
for fingerprint , group := range fingerprintToSamples {
for {
lengthOfGroup := len ( group )
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
if lengthOfGroup == 0 {
break
}
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
take := * leveldbChunkSize
if lengthOfGroup < take {
take = lengthOfGroup
}
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
chunk := group [ 0 : take ]
group = group [ take : lengthOfGroup ]
2013-03-14 15:42:28 -07:00
2013-06-25 05:02:27 -07:00
key := SampleKey {
2013-05-17 03:58:15 -07:00
Fingerprint : & fingerprint ,
2013-04-21 10:16:15 -07:00
FirstTimestamp : chunk [ 0 ] . Timestamp ,
LastTimestamp : chunk [ take - 1 ] . Timestamp ,
SampleCount : uint32 ( take ) ,
2013-06-25 05:02:27 -07:00
}
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
value := & dto . SampleValueSeries { }
for _ , sample := range chunk {
value . Value = append ( value . Value , & dto . SampleValueSeries_Value {
Timestamp : proto . Int64 ( sample . Timestamp . Unix ( ) ) ,
2013-06-25 05:02:27 -07:00
Value : proto . Float64 ( float64 ( sample . Value ) ) ,
2013-03-14 18:09:19 -07:00
} )
2013-03-14 15:42:28 -07:00
}
2013-03-14 18:09:19 -07:00
2013-06-25 05:02:27 -07:00
k := & dto . SampleKey { }
key . Dump ( k )
samplesBatch . Put ( k , value )
2013-03-14 15:42:28 -07:00
}
2013-03-14 18:09:19 -07:00
}
2013-03-14 15:42:28 -07:00
2013-05-02 03:49:13 -07:00
err = l . MetricSamples . Commit ( samplesBatch )
2013-03-14 18:09:19 -07:00
if err != nil {
2013-04-05 09:03:45 -07:00
return
2013-03-14 18:09:19 -07:00
}
2013-03-14 15:42:28 -07:00
err = <- indexErrChan
if err != nil {
2013-04-05 09:03:45 -07:00
return
2013-03-14 15:42:28 -07:00
}
2013-02-08 09:03:26 -08:00
2013-03-14 19:24:28 -07:00
err = <- watermarkErrChan
if err != nil {
2013-04-05 09:03:45 -07:00
return
2013-03-14 19:24:28 -07:00
}
2013-02-08 09:03:26 -08:00
return
}
2013-06-25 05:02:27 -07:00
func extractSampleKey ( i leveldb . Iterator ) ( * SampleKey , error ) {
2013-04-21 10:16:15 -07:00
k := & dto . SampleKey { }
2013-06-25 05:02:27 -07:00
err := proto . Unmarshal ( i . Key ( ) , k )
2013-04-21 10:16:15 -07:00
if err != nil {
2013-06-25 05:02:27 -07:00
return nil , err
2013-03-06 18:16:20 -08:00
}
2013-06-25 05:02:27 -07:00
key := & SampleKey { }
key . Load ( k )
2012-12-25 04:50:36 -08:00
2013-06-25 05:02:27 -07:00
return key , nil
2012-12-25 04:50:36 -08:00
}
2013-06-25 05:02:27 -07:00
func extractSampleValues ( i leveldb . Iterator ) ( Values , error ) {
2013-04-22 04:30:16 -07:00
v := & dto . SampleValueSeries { }
2013-06-25 05:02:27 -07:00
err := proto . Unmarshal ( i . Value ( ) , v )
2013-04-22 04:30:16 -07:00
if err != nil {
2013-06-25 05:02:27 -07:00
return nil , err
2012-12-25 04:50:36 -08:00
}
2013-06-25 05:02:27 -07:00
return NewValuesFromDTO ( v ) , nil
2012-12-25 04:50:36 -08:00
}
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) hasIndexMetric ( m clientmodel . Metric ) ( value bool , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasIndexMetric , result : success } , map [ string ] string { operation : hasIndexMetric , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-01-23 08:18:45 -08:00
2013-08-03 09:46:02 -07:00
return l . metricMembershipIndex . Has ( m )
2012-12-09 07:27:12 -08:00
}
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) HasLabelPair ( p * LabelPair ) ( value bool , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasLabelPair , result : success } , map [ string ] string { operation : hasLabelPair , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-01-23 08:18:45 -08:00
2013-08-03 09:46:02 -07:00
return l . labelSetToFingerprints . Has ( p )
2012-12-09 07:27:12 -08:00
}
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) HasLabelName ( n clientmodel . LabelName ) ( value bool , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasLabelName , result : success } , map [ string ] string { operation : hasLabelName , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-01-23 08:18:45 -08:00
2013-08-03 09:46:02 -07:00
value , err = l . labelNameToFingerprints . Has ( n )
2013-01-23 08:18:45 -08:00
return
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetFingerprintsForLabelSet ( labelSet clientmodel . LabelSet ) ( fps clientmodel . Fingerprints , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-09 07:27:12 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getFingerprintsForLabelSet , result : success } , map [ string ] string { operation : getFingerprintsForLabelSet , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2012-12-09 07:27:12 -08:00
2013-01-13 02:15:01 -08:00
sets := [ ] utility . Set { }
2013-06-25 05:02:27 -07:00
for name , value := range labelSet {
2013-08-03 09:46:02 -07:00
fps , _ , err := l . labelSetToFingerprints . Lookup ( & LabelPair {
Name : name ,
Value : value ,
} )
2012-12-25 04:50:36 -08:00
if err != nil {
2013-08-03 09:46:02 -07:00
return nil , err
2013-06-08 01:27:44 -07:00
}
2012-12-25 04:50:36 -08:00
2013-01-13 02:15:01 -08:00
set := utility . Set { }
2013-08-03 09:46:02 -07:00
for _ , fp := range fps {
2013-05-17 03:58:15 -07:00
set . Add ( * fp )
2012-12-09 07:27:12 -08:00
}
2013-01-13 02:15:01 -08:00
sets = append ( sets , set )
}
numberOfSets := len ( sets )
if numberOfSets == 0 {
2013-06-25 05:02:27 -07:00
return nil , nil
2013-01-13 02:15:01 -08:00
}
base := sets [ 0 ]
for i := 1 ; i < numberOfSets ; i ++ {
base = base . Intersection ( sets [ i ] )
}
for _ , e := range base . Elements ( ) {
2013-06-25 05:02:27 -07:00
fingerprint := e . ( clientmodel . Fingerprint )
2013-05-17 03:58:15 -07:00
fps = append ( fps , & fingerprint )
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
return fps , nil
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetFingerprintsForLabelName ( labelName clientmodel . LabelName ) ( fps clientmodel . Fingerprints , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-09 07:27:12 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getFingerprintsForLabelName , result : success } , map [ string ] string { operation : getFingerprintsForLabelName , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2012-12-09 07:27:12 -08:00
2013-08-03 09:46:02 -07:00
// TODO(matt): Update signature to work with ok.
fps , _ , err = l . labelNameToFingerprints . Lookup ( labelName )
2012-12-25 04:50:36 -08:00
2013-08-03 09:46:02 -07:00
return fps , err
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetMetricForFingerprint ( f * clientmodel . Fingerprint ) ( m clientmodel . Metric , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-25 04:50:36 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getMetricForFingerprint , result : success } , map [ string ] string { operation : getMetricForFingerprint , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2012-12-25 04:50:36 -08:00
2013-08-03 09:46:02 -07:00
// TODO(matt): Update signature to work with ok.
m , _ , err = l . fingerprintToMetrics . Lookup ( f )
2012-12-12 03:53:34 -08:00
2013-06-08 01:27:44 -07:00
return m , nil
2012-12-12 03:53:34 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetValueAtTime ( f * clientmodel . Fingerprint , t time . Time ) Values {
2013-04-18 07:10:52 -07:00
panic ( "Not implemented" )
2012-12-12 03:53:34 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetBoundaryValues ( f * clientmodel . Fingerprint , i Interval ) Values {
2013-04-18 07:10:52 -07:00
panic ( "Not implemented" )
2012-12-19 11:34:54 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetRangeValues ( f * clientmodel . Fingerprint , i Interval ) Values {
2013-04-18 07:10:52 -07:00
panic ( "Not implemented" )
2012-12-12 03:53:34 -08:00
}
2013-02-06 08:06:39 -08:00
type MetricKeyDecoder struct { }
func ( d * MetricKeyDecoder ) DecodeKey ( in interface { } ) ( out interface { } , err error ) {
2013-03-26 06:46:02 -07:00
unmarshaled := dto . LabelPair { }
err = proto . Unmarshal ( in . ( [ ] byte ) , & unmarshaled )
2013-02-06 08:06:39 -08:00
if err != nil {
return
}
2013-02-08 09:03:26 -08:00
2013-06-25 05:02:27 -07:00
out = LabelPair {
Name : clientmodel . LabelName ( * unmarshaled . Name ) ,
Value : clientmodel . LabelValue ( * unmarshaled . Value ) ,
2013-03-26 06:46:02 -07:00
}
2013-02-08 09:03:26 -08:00
return
2013-02-06 08:06:39 -08:00
}
func ( d * MetricKeyDecoder ) DecodeValue ( in interface { } ) ( out interface { } , err error ) {
return
}
2013-03-26 03:45:56 -07:00
type LabelNameFilter struct {
2013-06-25 05:02:27 -07:00
labelName clientmodel . LabelName
2013-03-26 03:45:56 -07:00
}
2013-02-06 08:06:39 -08:00
2013-03-26 06:46:02 -07:00
func ( f LabelNameFilter ) Filter ( key , value interface { } ) ( filterResult storage . FilterResult ) {
2013-06-25 05:02:27 -07:00
labelPair , ok := key . ( LabelPair )
2013-03-26 06:46:02 -07:00
if ok && labelPair . Name == f . labelName {
2013-02-07 02:38:01 -08:00
return storage . ACCEPT
}
return storage . SKIP
2013-02-06 08:06:39 -08:00
}
2013-03-26 03:45:56 -07:00
type CollectLabelValuesOp struct {
2013-06-25 05:02:27 -07:00
labelValues [ ] clientmodel . LabelValue
2013-02-06 08:06:39 -08:00
}
2013-03-26 03:45:56 -07:00
func ( op * CollectLabelValuesOp ) Operate ( key , value interface { } ) ( err * storage . OperatorError ) {
2013-06-25 05:02:27 -07:00
labelPair := key . ( LabelPair )
op . labelValues = append ( op . labelValues , clientmodel . LabelValue ( labelPair . Value ) )
2013-02-06 08:06:39 -08:00
return
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetAllValuesForLabel ( labelName clientmodel . LabelName ) ( values clientmodel . LabelValues , err error ) {
2013-03-26 03:45:56 -07:00
filter := & LabelNameFilter {
labelName : labelName ,
}
labelValuesOp := & CollectLabelValuesOp { }
2013-02-06 08:06:39 -08:00
2013-03-26 03:45:56 -07:00
_ , err = l . labelSetToFingerprints . ForEach ( & MetricKeyDecoder { } , filter , labelValuesOp )
2013-02-06 08:06:39 -08:00
if err != nil {
return
}
2013-03-26 03:45:56 -07:00
values = labelValuesOp . labelValues
2013-02-06 08:06:39 -08:00
return
}
2013-02-08 09:03:26 -08:00
2013-08-06 03:00:31 -07:00
// Prune compacts each database's keyspace serially.
2013-05-10 07:41:02 -07:00
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
2013-08-05 08:31:49 -07:00
func ( l * LevelDBMetricPersistence ) Prune ( ) {
l . CurationRemarks . Prune ( )
l . fingerprintToMetrics . Prune ( )
l . labelNameToFingerprints . Prune ( )
l . labelSetToFingerprints . Prune ( )
l . MetricHighWatermarks . Prune ( )
l . metricMembershipIndex . Prune ( )
l . MetricSamples . Prune ( )
2013-05-10 16:02:57 -07:00
}
2013-08-06 03:00:31 -07:00
func ( l * LevelDBMetricPersistence ) Sizes ( ) ( total uint64 , err error ) {
2013-05-10 16:02:57 -07:00
size := uint64 ( 0 )
2013-08-06 03:00:31 -07:00
if size , _ , err = l . CurationRemarks . Size ( ) ; err != nil {
2013-05-10 16:02:57 -07:00
return 0 , err
2013-05-10 07:41:02 -07:00
}
2013-05-10 16:02:57 -07:00
total += size
2013-08-05 00:25:47 -07:00
if size , _ , err = l . fingerprintToMetrics . Size ( ) ; err != nil {
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-05 00:25:47 -07:00
if size , _ , err = l . labelNameToFingerprints . Size ( ) ; err != nil {
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-05 00:25:47 -07:00
if size , _ , err = l . labelSetToFingerprints . Size ( ) ; err != nil {
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-05 00:25:47 -07:00
if size , _ , err = l . MetricHighWatermarks . Size ( ) ; err != nil {
2013-05-10 16:02:57 -07:00
return 0 , err
2013-05-10 07:41:02 -07:00
}
2013-05-10 16:02:57 -07:00
total += size
2013-08-05 00:25:47 -07:00
if size , _ , err = l . metricMembershipIndex . Size ( ) ; err != nil {
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-06 03:00:31 -07:00
if size , err = l . MetricSamples . Size ( ) ; err != nil {
2013-05-10 16:02:57 -07:00
return 0 , err
2013-05-10 07:41:02 -07:00
}
2013-05-10 16:02:57 -07:00
total += size
2013-05-10 07:41:02 -07:00
2013-05-10 16:02:57 -07:00
return total , nil
2013-05-10 07:41:02 -07:00
}
2013-05-14 02:21:27 -07:00
2013-08-05 08:31:49 -07:00
func ( l * LevelDBMetricPersistence ) States ( ) raw . DatabaseStates {
return raw . DatabaseStates {
2013-08-05 00:25:47 -07:00
l . CurationRemarks . State ( ) ,
l . fingerprintToMetrics . State ( ) ,
l . labelNameToFingerprints . State ( ) ,
l . labelSetToFingerprints . State ( ) ,
l . MetricHighWatermarks . State ( ) ,
l . metricMembershipIndex . State ( ) ,
l . MetricSamples . State ( ) ,
}
2013-05-14 02:21:27 -07:00
}