2013-02-07 02:38:01 -08:00
// Copyright 2013 Prometheus Team
2012-12-09 07:27:12 -08:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2013-02-08 09:03:26 -08:00
package metric
2012-12-09 07:27:12 -08:00
import (
2013-02-08 09:03:26 -08:00
"flag"
2013-04-28 05:47:43 -07:00
"fmt"
2013-02-08 09:03:26 -08:00
"sort"
2013-03-04 11:43:07 -08:00
"sync"
2012-12-12 03:53:34 -08:00
"time"
2012-12-09 07:27:12 -08:00
2013-06-08 01:27:44 -07:00
"code.google.com/p/goprotobuf/proto"
2013-08-12 08:18:02 -07:00
"github.com/golang/glog"
2013-06-08 01:27:44 -07:00
2013-06-25 05:02:27 -07:00
clientmodel "github.com/prometheus/client_golang/model"
2013-06-08 01:27:44 -07:00
"github.com/prometheus/prometheus/storage"
2013-08-05 08:31:49 -07:00
"github.com/prometheus/prometheus/storage/raw"
2013-06-08 01:27:44 -07:00
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
2013-08-12 08:18:02 -07:00
dto "github.com/prometheus/prometheus/model/generated"
2013-02-08 09:03:26 -08:00
)
2013-06-08 01:27:44 -07:00
const sortConcurrency = 2
2013-02-08 09:03:26 -08:00
type LevelDBMetricPersistence struct {
2013-08-06 03:00:31 -07:00
CurationRemarks CurationRemarker
2013-08-12 15:36:12 -07:00
FingerprintToMetrics FingerprintMetricIndex
LabelNameToFingerprints LabelNameFingerprintIndex
LabelSetToFingerprints LabelPairFingerprintIndex
2013-08-05 00:25:47 -07:00
MetricHighWatermarks HighWatermarker
2013-08-12 15:36:12 -07:00
MetricMembershipIndex MetricMembershipIndex
2013-08-07 03:07:35 -07:00
Indexer MetricIndexer
MetricSamples * leveldb . LevelDBPersistence
// The remaining indices will be replaced with generalized interface resolvers:
//
// type FingerprintResolver interface {
// GetFingerprintForMetric(clientmodel.Metric) (*clientmodel.Fingerprint, bool, error)
// GetFingerprintsForLabelName(clientmodel.LabelName) (clientmodel.Fingerprints, bool, error)
// GetFingerprintsForLabelSet(LabelPair) (clientmodel.Fingerprints, bool, error)
// }
// type MetricResolver interface {
// GetMetricsForFingerprint(clientmodel.Fingerprints) (FingerprintMetricMapping, bool, error)
// }
2013-02-08 09:03:26 -08:00
}
var (
2013-05-21 07:11:35 -07:00
leveldbChunkSize = flag . Int ( "leveldbChunkSize" , 200 , "Maximum number of samples stored under one key." )
2013-02-08 09:03:26 -08:00
// These flag values are back of the envelope, though they seem sensible.
// Please re-evaluate based on your own needs.
2013-06-24 03:09:16 -07:00
curationRemarksCacheSize = flag . Int ( "curationRemarksCacheSize" , 5 * 1024 * 1024 , "The size for the curation remarks cache (bytes)." )
fingerprintsToLabelPairCacheSize = flag . Int ( "fingerprintsToLabelPairCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the fingerprint to label pair index (bytes)." )
highWatermarkCacheSize = flag . Int ( "highWatermarksByFingerprintSizeBytes" , 5 * 1024 * 1024 , "The size for the metric high watermarks (bytes)." )
labelNameToFingerprintsCacheSize = flag . Int ( "labelNameToFingerprintsCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the label name to metric fingerprint index (bytes)." )
labelPairToFingerprintsCacheSize = flag . Int ( "labelPairToFingerprintsCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the label pair to metric fingerprint index (bytes)." )
metricMembershipIndexCacheSize = flag . Int ( "metricMembershipCacheSizeBytes" , 5 * 1024 * 1024 , "The size for the metric membership index (bytes)." )
samplesByFingerprintCacheSize = flag . Int ( "samplesByFingerprintCacheSizeBytes" , 50 * 1024 * 1024 , "The size for the samples database (bytes)." )
2013-02-08 09:03:26 -08:00
)
type leveldbOpener func ( )
2013-08-03 09:46:02 -07:00
type errorCloser interface {
Close ( ) error
}
type closer interface {
2013-04-01 04:22:38 -07:00
Close ( )
}
2013-02-08 09:03:26 -08:00
2013-04-01 04:22:38 -07:00
func ( l * LevelDBMetricPersistence ) Close ( ) {
2013-08-03 09:46:02 -07:00
var persistences = [ ] interface { } {
2013-05-02 03:49:13 -07:00
l . CurationRemarks ,
2013-08-12 15:36:12 -07:00
l . FingerprintToMetrics ,
l . LabelNameToFingerprints ,
l . LabelSetToFingerprints ,
2013-05-02 03:49:13 -07:00
l . MetricHighWatermarks ,
2013-08-12 15:36:12 -07:00
l . MetricMembershipIndex ,
2013-05-02 03:49:13 -07:00
l . MetricSamples ,
2013-02-08 09:03:26 -08:00
}
2013-04-01 04:22:38 -07:00
closerGroup := sync . WaitGroup { }
2013-02-08 09:03:26 -08:00
2013-08-03 09:46:02 -07:00
for _ , c := range persistences {
2013-04-01 04:22:38 -07:00
closerGroup . Add ( 1 )
2013-08-03 09:46:02 -07:00
go func ( c interface { } ) {
if c != nil {
switch closer := c . ( type ) {
case closer :
closer . Close ( )
case errorCloser :
if err := closer . Close ( ) ; err != nil {
2013-08-12 09:22:48 -07:00
glog . Error ( "Error closing persistence: " , err )
2013-08-03 09:46:02 -07:00
}
}
2013-04-24 02:51:40 -07:00
}
2013-04-01 04:22:38 -07:00
closerGroup . Done ( )
2013-08-03 09:46:02 -07:00
} ( c )
2013-02-08 09:03:26 -08:00
}
2013-04-01 04:22:38 -07:00
closerGroup . Wait ( )
2013-02-08 09:03:26 -08:00
}
2013-04-28 05:47:43 -07:00
func NewLevelDBMetricPersistence ( baseDirectory string ) ( * LevelDBMetricPersistence , error ) {
workers := utility . NewUncertaintyGroup ( 7 )
2013-02-08 09:03:26 -08:00
2013-06-25 05:02:27 -07:00
emission := new ( LevelDBMetricPersistence )
2013-02-08 09:03:26 -08:00
var subsystemOpeners = [ ] struct {
name string
opener leveldbOpener
} {
{
"Label Names and Value Pairs by Fingerprint" ,
func ( ) {
var err error
2013-08-12 15:36:12 -07:00
emission . FingerprintToMetrics , err = NewLevelDBFingerprintMetricIndex ( LevelDBFingerprintMetricIndexOptions {
2013-08-03 09:46:02 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Metrics by Fingerprint" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/label_name_and_value_pairs_by_fingerprint" ,
CacheSizeBytes : * fingerprintsToLabelPairCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
{
"Samples by Fingerprint" ,
func ( ) {
var err error
2013-08-10 08:02:28 -07:00
emission . MetricSamples , err = leveldb . NewLevelDBPersistence ( leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Samples" ,
Purpose : "Timeseries" ,
2013-08-03 08:25:03 -07:00
Path : baseDirectory + "/samples_by_fingerprint" ,
CacheSizeBytes : * fingerprintsToLabelPairCacheSize ,
2013-08-10 08:02:28 -07:00
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
2013-03-14 19:24:28 -07:00
{
"High Watermarks by Fingerprint" ,
func ( ) {
var err error
2013-08-10 08:02:28 -07:00
emission . MetricHighWatermarks , err = NewLevelDBHighWatermarker ( LevelDBHighWatermarkerOptions {
2013-08-05 00:25:47 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
Name : "High Watermarks" ,
Purpose : "The youngest sample in the database per metric." ,
Path : baseDirectory + "/high_watermarks_by_fingerprint" ,
CacheSizeBytes : * highWatermarkCacheSize ,
} } )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-03-14 19:24:28 -07:00
} ,
} ,
2013-02-08 09:03:26 -08:00
{
"Fingerprints by Label Name" ,
func ( ) {
var err error
2013-08-12 15:36:12 -07:00
emission . LabelNameToFingerprints , err = NewLevelLabelNameFingerprintIndex ( LevelDBLabelNameFingerprintIndexOptions {
2013-08-03 09:46:02 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Fingerprints by Label Name" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/fingerprints_by_label_name" ,
CacheSizeBytes : * labelNameToFingerprintsCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
{
"Fingerprints by Label Name and Value Pair" ,
func ( ) {
var err error
2013-08-12 15:36:12 -07:00
emission . LabelSetToFingerprints , err = NewLevelDBLabelSetFingerprintIndex ( LevelDBLabelSetFingerprintIndexOptions {
2013-08-03 09:46:02 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Fingerprints by Label Pair" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/fingerprints_by_label_name_and_value_pair" ,
CacheSizeBytes : * labelPairToFingerprintsCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
{
"Metric Membership Index" ,
func ( ) {
var err error
2013-08-12 15:36:12 -07:00
emission . MetricMembershipIndex , err = NewLevelDBMetricMembershipIndex (
2013-08-10 08:02:28 -07:00
LevelDBMetricMembershipIndexOptions {
2013-08-03 09:46:02 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Metric Membership" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/metric_membership_index" ,
CacheSizeBytes : * metricMembershipIndexCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
} ,
} ,
{
"Sample Curation Remarks" ,
func ( ) {
var err error
2013-08-10 08:02:28 -07:00
emission . CurationRemarks , err = NewLevelDBCurationRemarker ( LevelDBCurationRemarkerOptions {
2013-08-06 03:00:31 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
Name : "Sample Curation Remarks" ,
Purpose : "Ledger of Progress for Various Curators" ,
Path : baseDirectory + "/curation_remarks" ,
CacheSizeBytes : * curationRemarksCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
}
for _ , subsystem := range subsystemOpeners {
opener := subsystem . opener
go opener ( )
}
2013-04-28 05:47:43 -07:00
if ! workers . Wait ( ) {
for _ , err := range workers . Errors ( ) {
2013-08-12 09:22:48 -07:00
glog . Error ( "Could not open storage: " , err )
2013-02-08 09:03:26 -08:00
}
2013-04-28 05:47:43 -07:00
return nil , fmt . Errorf ( "Unable to open metric persistence." )
2013-02-08 09:03:26 -08:00
}
2013-08-07 03:07:35 -07:00
emission . Indexer = & TotalIndexer {
2013-08-12 15:36:12 -07:00
FingerprintToMetric : emission . FingerprintToMetrics ,
LabelNameToFingerprint : emission . LabelNameToFingerprints ,
LabelPairToFingerprint : emission . LabelSetToFingerprints ,
MetricMembership : emission . MetricMembershipIndex ,
2013-08-07 03:07:35 -07:00
}
2013-04-28 05:47:43 -07:00
return emission , nil
2013-02-08 09:03:26 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) AppendSample ( sample * clientmodel . Sample ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-02-08 09:03:26 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : appendSample , result : success } , map [ string ] string { operation : appendSample , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-02-08 09:03:26 -08:00
2013-06-25 05:02:27 -07:00
err = l . AppendSamples ( clientmodel . Samples { sample } )
2013-02-08 09:03:26 -08:00
return
}
2013-03-14 15:42:28 -07:00
// groupByFingerprint collects all of the provided samples, groups them
// together by their respective metric fingerprint, and finally sorts
// them chronologically.
2013-06-25 05:02:27 -07:00
func groupByFingerprint ( samples clientmodel . Samples ) map [ clientmodel . Fingerprint ] clientmodel . Samples {
fingerprintToSamples := map [ clientmodel . Fingerprint ] clientmodel . Samples { }
2013-02-08 09:03:26 -08:00
for _ , sample := range samples {
2013-06-25 05:02:27 -07:00
fingerprint := & clientmodel . Fingerprint { }
fingerprint . LoadFromMetric ( sample . Metric )
samples := fingerprintToSamples [ * fingerprint ]
2013-02-08 09:03:26 -08:00
samples = append ( samples , sample )
2013-06-25 05:02:27 -07:00
fingerprintToSamples [ * fingerprint ] = samples
2013-02-08 09:03:26 -08:00
}
2013-05-21 07:11:35 -07:00
sortingSemaphore := make ( chan bool , sortConcurrency )
doneSorting := sync . WaitGroup { }
2013-03-14 15:42:28 -07:00
2013-02-08 09:03:26 -08:00
for _ , samples := range fingerprintToSamples {
2013-03-04 11:43:07 -08:00
doneSorting . Add ( 1 )
2013-03-14 15:42:28 -07:00
2013-06-25 05:02:27 -07:00
sortingSemaphore <- true
go func ( samples clientmodel . Samples ) {
2013-02-08 09:03:26 -08:00
sort . Sort ( samples )
2013-06-25 05:02:27 -07:00
<- sortingSemaphore
2013-03-04 11:43:07 -08:00
doneSorting . Done ( )
2013-02-08 09:03:26 -08:00
} ( samples )
}
2013-03-04 11:43:07 -08:00
doneSorting . Wait ( )
2013-02-08 09:03:26 -08:00
2013-03-14 15:42:28 -07:00
return fingerprintToSamples
}
2013-03-07 11:01:32 -08:00
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) refreshHighWatermarks ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-14 19:24:28 -07:00
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : refreshHighWatermarks , result : success } , map [ string ] string { operation : refreshHighWatermarks , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-03-14 19:24:28 -07:00
2013-08-05 00:25:47 -07:00
b := FingerprintHighWatermarkMapping { }
for fp , ss := range groups {
if len ( ss ) == 0 {
2013-06-08 01:27:44 -07:00
continue
}
2013-03-14 19:24:28 -07:00
2013-08-05 00:25:47 -07:00
b [ fp ] = ss [ len ( ss ) - 1 ] . Timestamp
2013-03-14 19:24:28 -07:00
}
2013-08-05 00:25:47 -07:00
return l . MetricHighWatermarks . UpdateBatch ( b )
2013-03-14 19:24:28 -07:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) AppendSamples ( samples clientmodel . Samples ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-14 15:42:28 -07:00
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : appendSamples , result : success } , map [ string ] string { operation : appendSamples , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-03-14 15:42:28 -07:00
2013-05-21 07:11:35 -07:00
fingerprintToSamples := groupByFingerprint ( samples )
indexErrChan := make ( chan error , 1 )
watermarkErrChan := make ( chan error , 1 )
2013-03-14 15:42:28 -07:00
2013-06-25 05:02:27 -07:00
go func ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) {
2013-08-03 09:46:02 -07:00
metrics := FingerprintMetricMapping { }
2013-03-14 16:55:50 -07:00
for fingerprint , samples := range groups {
metrics [ fingerprint ] = samples [ 0 ] . Metric
}
2013-08-07 03:07:35 -07:00
indexErrChan <- l . Indexer . IndexMetrics ( metrics )
2013-03-14 15:42:28 -07:00
} ( fingerprintToSamples )
2013-06-25 05:02:27 -07:00
go func ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) {
2013-03-14 19:24:28 -07:00
watermarkErrChan <- l . refreshHighWatermarks ( groups )
} ( fingerprintToSamples )
2013-03-14 18:09:19 -07:00
samplesBatch := leveldb . NewBatch ( )
defer samplesBatch . Close ( )
2013-03-14 15:42:28 -07:00
2013-09-05 01:14:34 -07:00
key := new ( SampleKey )
keyDto := new ( dto . SampleKey )
value := new ( dto . SampleValueSeries )
2013-03-14 18:09:19 -07:00
for fingerprint , group := range fingerprintToSamples {
for {
2013-09-05 01:14:34 -07:00
value . Reset ( )
2013-03-14 18:09:19 -07:00
lengthOfGroup := len ( group )
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
if lengthOfGroup == 0 {
break
}
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
take := * leveldbChunkSize
if lengthOfGroup < take {
take = lengthOfGroup
}
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
chunk := group [ 0 : take ]
group = group [ take : lengthOfGroup ]
2013-03-14 15:42:28 -07:00
2013-09-05 01:14:34 -07:00
key . Fingerprint = & fingerprint
key . FirstTimestamp = chunk [ 0 ] . Timestamp
key . LastTimestamp = chunk [ take - 1 ] . Timestamp
key . SampleCount = uint32 ( take )
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
for _ , sample := range chunk {
2013-09-05 01:14:34 -07:00
// XXX: Candidate for allocation reduction.
2013-03-14 18:09:19 -07:00
value . Value = append ( value . Value , & dto . SampleValueSeries_Value {
Timestamp : proto . Int64 ( sample . Timestamp . Unix ( ) ) ,
2013-06-25 05:02:27 -07:00
Value : proto . Float64 ( float64 ( sample . Value ) ) ,
2013-03-14 18:09:19 -07:00
} )
2013-03-14 15:42:28 -07:00
}
2013-03-14 18:09:19 -07:00
2013-09-05 01:14:34 -07:00
key . Dump ( keyDto )
samplesBatch . Put ( keyDto , value )
2013-03-14 15:42:28 -07:00
}
2013-03-14 18:09:19 -07:00
}
2013-03-14 15:42:28 -07:00
2013-05-02 03:49:13 -07:00
err = l . MetricSamples . Commit ( samplesBatch )
2013-03-14 18:09:19 -07:00
if err != nil {
2013-04-05 09:03:45 -07:00
return
2013-03-14 18:09:19 -07:00
}
2013-03-14 15:42:28 -07:00
err = <- indexErrChan
if err != nil {
2013-04-05 09:03:45 -07:00
return
2013-03-14 15:42:28 -07:00
}
2013-02-08 09:03:26 -08:00
2013-03-14 19:24:28 -07:00
err = <- watermarkErrChan
if err != nil {
2013-04-05 09:03:45 -07:00
return
2013-03-14 19:24:28 -07:00
}
2013-02-08 09:03:26 -08:00
return
}
2013-06-25 05:02:27 -07:00
func extractSampleKey ( i leveldb . Iterator ) ( * SampleKey , error ) {
2013-04-21 10:16:15 -07:00
k := & dto . SampleKey { }
2013-08-29 06:15:22 -07:00
if err := i . Key ( k ) ; err != nil {
2013-06-25 05:02:27 -07:00
return nil , err
2013-03-06 18:16:20 -08:00
}
2013-06-25 05:02:27 -07:00
key := & SampleKey { }
key . Load ( k )
2012-12-25 04:50:36 -08:00
2013-06-25 05:02:27 -07:00
return key , nil
2012-12-25 04:50:36 -08:00
}
2013-06-25 05:02:27 -07:00
func extractSampleValues ( i leveldb . Iterator ) ( Values , error ) {
2013-04-22 04:30:16 -07:00
v := & dto . SampleValueSeries { }
2013-08-29 06:15:22 -07:00
if err := i . Value ( v ) ; err != nil {
2013-06-25 05:02:27 -07:00
return nil , err
2012-12-25 04:50:36 -08:00
}
2013-06-25 05:02:27 -07:00
return NewValuesFromDTO ( v ) , nil
2012-12-25 04:50:36 -08:00
}
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) hasIndexMetric ( m clientmodel . Metric ) ( value bool , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasIndexMetric , result : success } , map [ string ] string { operation : hasIndexMetric , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-01-23 08:18:45 -08:00
2013-08-12 15:36:12 -07:00
return l . MetricMembershipIndex . Has ( m )
2012-12-09 07:27:12 -08:00
}
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) HasLabelPair ( p * LabelPair ) ( value bool , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasLabelPair , result : success } , map [ string ] string { operation : hasLabelPair , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-01-23 08:18:45 -08:00
2013-08-12 15:36:12 -07:00
return l . LabelSetToFingerprints . Has ( p )
2012-12-09 07:27:12 -08:00
}
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) HasLabelName ( n clientmodel . LabelName ) ( value bool , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasLabelName , result : success } , map [ string ] string { operation : hasLabelName , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-01-23 08:18:45 -08:00
2013-08-12 15:36:12 -07:00
value , err = l . LabelNameToFingerprints . Has ( n )
2013-01-23 08:18:45 -08:00
return
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetFingerprintsForLabelSet ( labelSet clientmodel . LabelSet ) ( fps clientmodel . Fingerprints , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-09 07:27:12 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getFingerprintsForLabelSet , result : success } , map [ string ] string { operation : getFingerprintsForLabelSet , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2012-12-09 07:27:12 -08:00
2013-01-13 02:15:01 -08:00
sets := [ ] utility . Set { }
2013-06-25 05:02:27 -07:00
for name , value := range labelSet {
2013-08-12 15:36:12 -07:00
fps , _ , err := l . LabelSetToFingerprints . Lookup ( & LabelPair {
2013-08-03 09:46:02 -07:00
Name : name ,
Value : value ,
} )
2012-12-25 04:50:36 -08:00
if err != nil {
2013-08-03 09:46:02 -07:00
return nil , err
2013-06-08 01:27:44 -07:00
}
2012-12-25 04:50:36 -08:00
2013-01-13 02:15:01 -08:00
set := utility . Set { }
2013-08-03 09:46:02 -07:00
for _ , fp := range fps {
2013-05-17 03:58:15 -07:00
set . Add ( * fp )
2012-12-09 07:27:12 -08:00
}
2013-01-13 02:15:01 -08:00
sets = append ( sets , set )
}
numberOfSets := len ( sets )
if numberOfSets == 0 {
2013-06-25 05:02:27 -07:00
return nil , nil
2013-01-13 02:15:01 -08:00
}
base := sets [ 0 ]
for i := 1 ; i < numberOfSets ; i ++ {
base = base . Intersection ( sets [ i ] )
}
for _ , e := range base . Elements ( ) {
2013-06-25 05:02:27 -07:00
fingerprint := e . ( clientmodel . Fingerprint )
2013-05-17 03:58:15 -07:00
fps = append ( fps , & fingerprint )
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
return fps , nil
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetFingerprintsForLabelName ( labelName clientmodel . LabelName ) ( fps clientmodel . Fingerprints , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-09 07:27:12 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getFingerprintsForLabelName , result : success } , map [ string ] string { operation : getFingerprintsForLabelName , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2012-12-09 07:27:12 -08:00
2013-08-03 09:46:02 -07:00
// TODO(matt): Update signature to work with ok.
2013-08-12 15:36:12 -07:00
fps , _ , err = l . LabelNameToFingerprints . Lookup ( labelName )
2012-12-25 04:50:36 -08:00
2013-08-03 09:46:02 -07:00
return fps , err
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetMetricForFingerprint ( f * clientmodel . Fingerprint ) ( m clientmodel . Metric , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-25 04:50:36 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getMetricForFingerprint , result : success } , map [ string ] string { operation : getMetricForFingerprint , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2012-12-25 04:50:36 -08:00
2013-08-03 09:46:02 -07:00
// TODO(matt): Update signature to work with ok.
2013-08-12 15:36:12 -07:00
m , _ , err = l . FingerprintToMetrics . Lookup ( f )
2012-12-12 03:53:34 -08:00
2013-06-08 01:27:44 -07:00
return m , nil
2012-12-12 03:53:34 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetValueAtTime ( f * clientmodel . Fingerprint , t time . Time ) Values {
2013-04-18 07:10:52 -07:00
panic ( "Not implemented" )
2012-12-12 03:53:34 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetBoundaryValues ( f * clientmodel . Fingerprint , i Interval ) Values {
2013-04-18 07:10:52 -07:00
panic ( "Not implemented" )
2012-12-19 11:34:54 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetRangeValues ( f * clientmodel . Fingerprint , i Interval ) Values {
2013-04-18 07:10:52 -07:00
panic ( "Not implemented" )
2012-12-12 03:53:34 -08:00
}
2013-02-06 08:06:39 -08:00
type MetricKeyDecoder struct { }
func ( d * MetricKeyDecoder ) DecodeKey ( in interface { } ) ( out interface { } , err error ) {
2013-03-26 06:46:02 -07:00
unmarshaled := dto . LabelPair { }
err = proto . Unmarshal ( in . ( [ ] byte ) , & unmarshaled )
2013-02-06 08:06:39 -08:00
if err != nil {
return
}
2013-02-08 09:03:26 -08:00
2013-06-25 05:02:27 -07:00
out = LabelPair {
Name : clientmodel . LabelName ( * unmarshaled . Name ) ,
Value : clientmodel . LabelValue ( * unmarshaled . Value ) ,
2013-03-26 06:46:02 -07:00
}
2013-02-08 09:03:26 -08:00
return
2013-02-06 08:06:39 -08:00
}
func ( d * MetricKeyDecoder ) DecodeValue ( in interface { } ) ( out interface { } , err error ) {
return
}
2013-03-26 03:45:56 -07:00
type LabelNameFilter struct {
2013-06-25 05:02:27 -07:00
labelName clientmodel . LabelName
2013-03-26 03:45:56 -07:00
}
2013-02-06 08:06:39 -08:00
2013-03-26 06:46:02 -07:00
func ( f LabelNameFilter ) Filter ( key , value interface { } ) ( filterResult storage . FilterResult ) {
2013-06-25 05:02:27 -07:00
labelPair , ok := key . ( LabelPair )
2013-03-26 06:46:02 -07:00
if ok && labelPair . Name == f . labelName {
2013-02-07 02:38:01 -08:00
return storage . ACCEPT
}
return storage . SKIP
2013-02-06 08:06:39 -08:00
}
2013-03-26 03:45:56 -07:00
type CollectLabelValuesOp struct {
2013-06-25 05:02:27 -07:00
labelValues [ ] clientmodel . LabelValue
2013-02-06 08:06:39 -08:00
}
2013-03-26 03:45:56 -07:00
func ( op * CollectLabelValuesOp ) Operate ( key , value interface { } ) ( err * storage . OperatorError ) {
2013-06-25 05:02:27 -07:00
labelPair := key . ( LabelPair )
op . labelValues = append ( op . labelValues , clientmodel . LabelValue ( labelPair . Value ) )
2013-02-06 08:06:39 -08:00
return
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetAllValuesForLabel ( labelName clientmodel . LabelName ) ( values clientmodel . LabelValues , err error ) {
2013-03-26 03:45:56 -07:00
filter := & LabelNameFilter {
labelName : labelName ,
}
labelValuesOp := & CollectLabelValuesOp { }
2013-02-06 08:06:39 -08:00
2013-08-12 15:36:12 -07:00
_ , err = l . LabelSetToFingerprints . ForEach ( & MetricKeyDecoder { } , filter , labelValuesOp )
2013-02-06 08:06:39 -08:00
if err != nil {
return
}
2013-03-26 03:45:56 -07:00
values = labelValuesOp . labelValues
2013-02-06 08:06:39 -08:00
return
}
2013-02-08 09:03:26 -08:00
2013-08-06 03:00:31 -07:00
// Prune compacts each database's keyspace serially.
2013-05-10 07:41:02 -07:00
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
2013-08-05 08:31:49 -07:00
func ( l * LevelDBMetricPersistence ) Prune ( ) {
l . CurationRemarks . Prune ( )
2013-08-12 15:36:12 -07:00
l . FingerprintToMetrics . Prune ( )
l . LabelNameToFingerprints . Prune ( )
l . LabelSetToFingerprints . Prune ( )
2013-08-05 08:31:49 -07:00
l . MetricHighWatermarks . Prune ( )
2013-08-12 15:36:12 -07:00
l . MetricMembershipIndex . Prune ( )
2013-08-05 08:31:49 -07:00
l . MetricSamples . Prune ( )
2013-05-10 16:02:57 -07:00
}
2013-08-06 03:00:31 -07:00
func ( l * LevelDBMetricPersistence ) Sizes ( ) ( total uint64 , err error ) {
2013-05-10 16:02:57 -07:00
size := uint64 ( 0 )
2013-08-06 03:00:31 -07:00
if size , _ , err = l . CurationRemarks . Size ( ) ; err != nil {
2013-05-10 16:02:57 -07:00
return 0 , err
2013-05-10 07:41:02 -07:00
}
2013-05-10 16:02:57 -07:00
total += size
2013-08-12 15:36:12 -07:00
if size , _ , err = l . FingerprintToMetrics . Size ( ) ; err != nil {
2013-08-05 00:25:47 -07:00
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-12 15:36:12 -07:00
if size , _ , err = l . LabelNameToFingerprints . Size ( ) ; err != nil {
2013-08-05 00:25:47 -07:00
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-12 15:36:12 -07:00
if size , _ , err = l . LabelSetToFingerprints . Size ( ) ; err != nil {
2013-08-05 00:25:47 -07:00
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-05 00:25:47 -07:00
if size , _ , err = l . MetricHighWatermarks . Size ( ) ; err != nil {
2013-05-10 16:02:57 -07:00
return 0 , err
2013-05-10 07:41:02 -07:00
}
2013-05-10 16:02:57 -07:00
total += size
2013-08-12 15:36:12 -07:00
if size , _ , err = l . MetricMembershipIndex . Size ( ) ; err != nil {
2013-08-05 00:25:47 -07:00
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-06 03:00:31 -07:00
if size , err = l . MetricSamples . Size ( ) ; err != nil {
2013-05-10 16:02:57 -07:00
return 0 , err
2013-05-10 07:41:02 -07:00
}
2013-05-10 16:02:57 -07:00
total += size
2013-05-10 07:41:02 -07:00
2013-05-10 16:02:57 -07:00
return total , nil
2013-05-10 07:41:02 -07:00
}
2013-05-14 02:21:27 -07:00
2013-08-05 08:31:49 -07:00
func ( l * LevelDBMetricPersistence ) States ( ) raw . DatabaseStates {
return raw . DatabaseStates {
2013-08-05 00:25:47 -07:00
l . CurationRemarks . State ( ) ,
2013-08-12 15:36:12 -07:00
l . FingerprintToMetrics . State ( ) ,
l . LabelNameToFingerprints . State ( ) ,
l . LabelSetToFingerprints . State ( ) ,
2013-08-05 00:25:47 -07:00
l . MetricHighWatermarks . State ( ) ,
2013-08-12 15:36:12 -07:00
l . MetricMembershipIndex . State ( ) ,
2013-08-05 00:25:47 -07:00
l . MetricSamples . State ( ) ,
}
2013-05-14 02:21:27 -07:00
}