2013-02-07 02:38:01 -08:00
// Copyright 2013 Prometheus Team
2012-12-09 07:27:12 -08:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2013-02-08 09:03:26 -08:00
package metric
2012-12-09 07:27:12 -08:00
import (
2013-02-08 09:03:26 -08:00
"flag"
2013-04-28 05:47:43 -07:00
"fmt"
2013-02-08 09:03:26 -08:00
"sort"
2013-03-04 11:43:07 -08:00
"sync"
2012-12-12 03:53:34 -08:00
"time"
2012-12-09 07:27:12 -08:00
2013-06-08 01:27:44 -07:00
"code.google.com/p/goprotobuf/proto"
2013-08-12 08:18:02 -07:00
"github.com/golang/glog"
2013-06-08 01:27:44 -07:00
2013-06-25 05:02:27 -07:00
clientmodel "github.com/prometheus/client_golang/model"
2013-06-08 01:27:44 -07:00
"github.com/prometheus/prometheus/storage"
2013-08-05 08:31:49 -07:00
"github.com/prometheus/prometheus/storage/raw"
2013-06-08 01:27:44 -07:00
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
2013-08-12 08:18:02 -07:00
dto "github.com/prometheus/prometheus/model/generated"
2013-02-08 09:03:26 -08:00
)
2013-06-08 01:27:44 -07:00
const sortConcurrency = 2
2013-02-08 09:03:26 -08:00
type LevelDBMetricPersistence struct {
2013-08-06 03:00:31 -07:00
CurationRemarks CurationRemarker
2013-08-12 15:36:12 -07:00
FingerprintToMetrics FingerprintMetricIndex
LabelNameToFingerprints LabelNameFingerprintIndex
LabelSetToFingerprints LabelPairFingerprintIndex
2013-08-05 00:25:47 -07:00
MetricHighWatermarks HighWatermarker
2013-08-12 15:36:12 -07:00
MetricMembershipIndex MetricMembershipIndex
2013-08-07 03:07:35 -07:00
Indexer MetricIndexer
MetricSamples * leveldb . LevelDBPersistence
// The remaining indices will be replaced with generalized interface resolvers:
//
// type FingerprintResolver interface {
// GetFingerprintForMetric(clientmodel.Metric) (*clientmodel.Fingerprint, bool, error)
// GetFingerprintsForLabelName(clientmodel.LabelName) (clientmodel.Fingerprints, bool, error)
// GetFingerprintsForLabelSet(LabelPair) (clientmodel.Fingerprints, bool, error)
// }
// type MetricResolver interface {
// GetMetricsForFingerprint(clientmodel.Fingerprints) (FingerprintMetricMapping, bool, error)
// }
2013-02-08 09:03:26 -08:00
}
var (
2013-05-21 07:11:35 -07:00
leveldbChunkSize = flag . Int ( "leveldbChunkSize" , 200 , "Maximum number of samples stored under one key." )
2013-02-08 09:03:26 -08:00
// These flag values are back of the envelope, though they seem sensible.
// Please re-evaluate based on your own needs.
2013-06-24 03:09:16 -07:00
curationRemarksCacheSize = flag . Int ( "curationRemarksCacheSize" , 5 * 1024 * 1024 , "The size for the curation remarks cache (bytes)." )
fingerprintsToLabelPairCacheSize = flag . Int ( "fingerprintsToLabelPairCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the fingerprint to label pair index (bytes)." )
highWatermarkCacheSize = flag . Int ( "highWatermarksByFingerprintSizeBytes" , 5 * 1024 * 1024 , "The size for the metric high watermarks (bytes)." )
labelNameToFingerprintsCacheSize = flag . Int ( "labelNameToFingerprintsCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the label name to metric fingerprint index (bytes)." )
labelPairToFingerprintsCacheSize = flag . Int ( "labelPairToFingerprintsCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the label pair to metric fingerprint index (bytes)." )
metricMembershipIndexCacheSize = flag . Int ( "metricMembershipCacheSizeBytes" , 5 * 1024 * 1024 , "The size for the metric membership index (bytes)." )
samplesByFingerprintCacheSize = flag . Int ( "samplesByFingerprintCacheSizeBytes" , 50 * 1024 * 1024 , "The size for the samples database (bytes)." )
2013-02-08 09:03:26 -08:00
)
type leveldbOpener func ( )
2013-08-03 09:46:02 -07:00
type errorCloser interface {
Close ( ) error
}
type closer interface {
2013-04-01 04:22:38 -07:00
Close ( )
}
2013-02-08 09:03:26 -08:00
2013-04-01 04:22:38 -07:00
func ( l * LevelDBMetricPersistence ) Close ( ) {
2013-08-03 09:46:02 -07:00
var persistences = [ ] interface { } {
2013-05-02 03:49:13 -07:00
l . CurationRemarks ,
2013-08-12 15:36:12 -07:00
l . FingerprintToMetrics ,
l . LabelNameToFingerprints ,
l . LabelSetToFingerprints ,
2013-05-02 03:49:13 -07:00
l . MetricHighWatermarks ,
2013-08-12 15:36:12 -07:00
l . MetricMembershipIndex ,
2013-05-02 03:49:13 -07:00
l . MetricSamples ,
2013-02-08 09:03:26 -08:00
}
2013-04-01 04:22:38 -07:00
closerGroup := sync . WaitGroup { }
2013-02-08 09:03:26 -08:00
2013-08-03 09:46:02 -07:00
for _ , c := range persistences {
2013-04-01 04:22:38 -07:00
closerGroup . Add ( 1 )
2013-08-03 09:46:02 -07:00
go func ( c interface { } ) {
if c != nil {
switch closer := c . ( type ) {
case closer :
closer . Close ( )
case errorCloser :
if err := closer . Close ( ) ; err != nil {
2013-08-12 09:22:48 -07:00
glog . Error ( "Error closing persistence: " , err )
2013-08-03 09:46:02 -07:00
}
}
2013-04-24 02:51:40 -07:00
}
2013-04-01 04:22:38 -07:00
closerGroup . Done ( )
2013-08-03 09:46:02 -07:00
} ( c )
2013-02-08 09:03:26 -08:00
}
2013-04-01 04:22:38 -07:00
closerGroup . Wait ( )
2013-02-08 09:03:26 -08:00
}
2013-04-28 05:47:43 -07:00
func NewLevelDBMetricPersistence ( baseDirectory string ) ( * LevelDBMetricPersistence , error ) {
workers := utility . NewUncertaintyGroup ( 7 )
2013-02-08 09:03:26 -08:00
2013-06-25 05:02:27 -07:00
emission := new ( LevelDBMetricPersistence )
2013-02-08 09:03:26 -08:00
var subsystemOpeners = [ ] struct {
name string
opener leveldbOpener
} {
{
"Label Names and Value Pairs by Fingerprint" ,
func ( ) {
var err error
2013-08-12 15:36:12 -07:00
emission . FingerprintToMetrics , err = NewLevelDBFingerprintMetricIndex ( LevelDBFingerprintMetricIndexOptions {
2013-08-03 09:46:02 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Metrics by Fingerprint" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/label_name_and_value_pairs_by_fingerprint" ,
CacheSizeBytes : * fingerprintsToLabelPairCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
{
"Samples by Fingerprint" ,
func ( ) {
var err error
2013-08-10 08:02:28 -07:00
emission . MetricSamples , err = leveldb . NewLevelDBPersistence ( leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Samples" ,
Purpose : "Timeseries" ,
2013-08-03 08:25:03 -07:00
Path : baseDirectory + "/samples_by_fingerprint" ,
CacheSizeBytes : * fingerprintsToLabelPairCacheSize ,
2013-08-10 08:02:28 -07:00
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
2013-03-14 19:24:28 -07:00
{
"High Watermarks by Fingerprint" ,
func ( ) {
var err error
2013-08-10 08:02:28 -07:00
emission . MetricHighWatermarks , err = NewLevelDBHighWatermarker ( LevelDBHighWatermarkerOptions {
2013-08-05 00:25:47 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
Name : "High Watermarks" ,
Purpose : "The youngest sample in the database per metric." ,
Path : baseDirectory + "/high_watermarks_by_fingerprint" ,
CacheSizeBytes : * highWatermarkCacheSize ,
} } )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-03-14 19:24:28 -07:00
} ,
} ,
2013-02-08 09:03:26 -08:00
{
"Fingerprints by Label Name" ,
func ( ) {
var err error
2013-08-12 15:36:12 -07:00
emission . LabelNameToFingerprints , err = NewLevelLabelNameFingerprintIndex ( LevelDBLabelNameFingerprintIndexOptions {
2013-08-03 09:46:02 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Fingerprints by Label Name" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/fingerprints_by_label_name" ,
CacheSizeBytes : * labelNameToFingerprintsCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
{
"Fingerprints by Label Name and Value Pair" ,
func ( ) {
var err error
2013-08-12 15:36:12 -07:00
emission . LabelSetToFingerprints , err = NewLevelDBLabelSetFingerprintIndex ( LevelDBLabelSetFingerprintIndexOptions {
2013-08-03 09:46:02 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Fingerprints by Label Pair" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/fingerprints_by_label_name_and_value_pair" ,
CacheSizeBytes : * labelPairToFingerprintsCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
{
"Metric Membership Index" ,
func ( ) {
var err error
2013-08-12 15:36:12 -07:00
emission . MetricMembershipIndex , err = NewLevelDBMetricMembershipIndex (
2013-08-10 08:02:28 -07:00
LevelDBMetricMembershipIndexOptions {
2013-08-03 09:46:02 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 00:25:47 -07:00
Name : "Metric Membership" ,
Purpose : "Index" ,
2013-08-03 09:46:02 -07:00
Path : baseDirectory + "/metric_membership_index" ,
CacheSizeBytes : * metricMembershipIndexCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
} ,
} ,
{
"Sample Curation Remarks" ,
func ( ) {
var err error
2013-08-10 08:02:28 -07:00
emission . CurationRemarks , err = NewLevelDBCurationRemarker ( LevelDBCurationRemarkerOptions {
2013-08-06 03:00:31 -07:00
LevelDBOptions : leveldb . LevelDBOptions {
Name : "Sample Curation Remarks" ,
Purpose : "Ledger of Progress for Various Curators" ,
Path : baseDirectory + "/curation_remarks" ,
CacheSizeBytes : * curationRemarksCacheSize ,
} ,
} )
2013-04-28 05:47:43 -07:00
workers . MayFail ( err )
2013-02-08 09:03:26 -08:00
} ,
} ,
}
for _ , subsystem := range subsystemOpeners {
opener := subsystem . opener
go opener ( )
}
2013-04-28 05:47:43 -07:00
if ! workers . Wait ( ) {
for _ , err := range workers . Errors ( ) {
2013-08-12 09:22:48 -07:00
glog . Error ( "Could not open storage: " , err )
2013-02-08 09:03:26 -08:00
}
2013-04-28 05:47:43 -07:00
return nil , fmt . Errorf ( "Unable to open metric persistence." )
2013-02-08 09:03:26 -08:00
}
2013-08-07 03:07:35 -07:00
emission . Indexer = & TotalIndexer {
2013-08-12 15:36:12 -07:00
FingerprintToMetric : emission . FingerprintToMetrics ,
LabelNameToFingerprint : emission . LabelNameToFingerprints ,
LabelPairToFingerprint : emission . LabelSetToFingerprints ,
MetricMembership : emission . MetricMembershipIndex ,
2013-08-07 03:07:35 -07:00
}
2013-04-28 05:47:43 -07:00
return emission , nil
2013-02-08 09:03:26 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) AppendSample ( sample * clientmodel . Sample ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-02-08 09:03:26 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : appendSample , result : success } , map [ string ] string { operation : appendSample , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-02-08 09:03:26 -08:00
2013-06-25 05:02:27 -07:00
err = l . AppendSamples ( clientmodel . Samples { sample } )
2013-02-08 09:03:26 -08:00
return
}
2013-03-14 15:42:28 -07:00
// groupByFingerprint collects all of the provided samples, groups them
// together by their respective metric fingerprint, and finally sorts
// them chronologically.
2013-06-25 05:02:27 -07:00
func groupByFingerprint ( samples clientmodel . Samples ) map [ clientmodel . Fingerprint ] clientmodel . Samples {
fingerprintToSamples := map [ clientmodel . Fingerprint ] clientmodel . Samples { }
2013-02-08 09:03:26 -08:00
for _ , sample := range samples {
2013-06-25 05:02:27 -07:00
fingerprint := & clientmodel . Fingerprint { }
fingerprint . LoadFromMetric ( sample . Metric )
samples := fingerprintToSamples [ * fingerprint ]
2013-02-08 09:03:26 -08:00
samples = append ( samples , sample )
2013-06-25 05:02:27 -07:00
fingerprintToSamples [ * fingerprint ] = samples
2013-02-08 09:03:26 -08:00
}
2013-05-21 07:11:35 -07:00
sortingSemaphore := make ( chan bool , sortConcurrency )
doneSorting := sync . WaitGroup { }
2013-03-14 15:42:28 -07:00
2013-02-08 09:03:26 -08:00
for _ , samples := range fingerprintToSamples {
2013-03-04 11:43:07 -08:00
doneSorting . Add ( 1 )
2013-03-14 15:42:28 -07:00
2013-06-25 05:02:27 -07:00
sortingSemaphore <- true
go func ( samples clientmodel . Samples ) {
2013-02-08 09:03:26 -08:00
sort . Sort ( samples )
2013-06-25 05:02:27 -07:00
<- sortingSemaphore
2013-03-04 11:43:07 -08:00
doneSorting . Done ( )
2013-02-08 09:03:26 -08:00
} ( samples )
}
2013-03-04 11:43:07 -08:00
doneSorting . Wait ( )
2013-02-08 09:03:26 -08:00
2013-03-14 15:42:28 -07:00
return fingerprintToSamples
}
2013-03-07 11:01:32 -08:00
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) refreshHighWatermarks ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-14 19:24:28 -07:00
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : refreshHighWatermarks , result : success } , map [ string ] string { operation : refreshHighWatermarks , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-03-14 19:24:28 -07:00
2013-08-05 00:25:47 -07:00
b := FingerprintHighWatermarkMapping { }
for fp , ss := range groups {
if len ( ss ) == 0 {
2013-06-08 01:27:44 -07:00
continue
}
2013-03-14 19:24:28 -07:00
2013-08-05 00:25:47 -07:00
b [ fp ] = ss [ len ( ss ) - 1 ] . Timestamp
2013-03-14 19:24:28 -07:00
}
2013-08-05 00:25:47 -07:00
return l . MetricHighWatermarks . UpdateBatch ( b )
2013-03-14 19:24:28 -07:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) AppendSamples ( samples clientmodel . Samples ) ( err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-14 15:42:28 -07:00
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : appendSamples , result : success } , map [ string ] string { operation : appendSamples , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-03-14 15:42:28 -07:00
2013-05-21 07:11:35 -07:00
fingerprintToSamples := groupByFingerprint ( samples )
indexErrChan := make ( chan error , 1 )
watermarkErrChan := make ( chan error , 1 )
2013-03-14 15:42:28 -07:00
2013-06-25 05:02:27 -07:00
go func ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) {
2013-08-03 09:46:02 -07:00
metrics := FingerprintMetricMapping { }
2013-03-14 16:55:50 -07:00
for fingerprint , samples := range groups {
metrics [ fingerprint ] = samples [ 0 ] . Metric
}
2013-08-07 03:07:35 -07:00
indexErrChan <- l . Indexer . IndexMetrics ( metrics )
2013-03-14 15:42:28 -07:00
} ( fingerprintToSamples )
2013-06-25 05:02:27 -07:00
go func ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) {
2013-03-14 19:24:28 -07:00
watermarkErrChan <- l . refreshHighWatermarks ( groups )
} ( fingerprintToSamples )
2013-03-14 18:09:19 -07:00
samplesBatch := leveldb . NewBatch ( )
defer samplesBatch . Close ( )
2013-03-14 15:42:28 -07:00
2013-09-05 01:14:34 -07:00
key := new ( SampleKey )
keyDto := new ( dto . SampleKey )
value := new ( dto . SampleValueSeries )
2013-03-14 18:09:19 -07:00
for fingerprint , group := range fingerprintToSamples {
for {
2013-09-05 01:14:34 -07:00
value . Reset ( )
2013-03-14 18:09:19 -07:00
lengthOfGroup := len ( group )
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
if lengthOfGroup == 0 {
break
}
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
take := * leveldbChunkSize
if lengthOfGroup < take {
take = lengthOfGroup
}
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
chunk := group [ 0 : take ]
group = group [ take : lengthOfGroup ]
2013-03-14 15:42:28 -07:00
2013-09-05 01:14:34 -07:00
key . Fingerprint = & fingerprint
key . FirstTimestamp = chunk [ 0 ] . Timestamp
key . LastTimestamp = chunk [ take - 1 ] . Timestamp
key . SampleCount = uint32 ( take )
2013-03-14 15:42:28 -07:00
2013-03-14 18:09:19 -07:00
for _ , sample := range chunk {
2013-09-05 01:14:34 -07:00
// XXX: Candidate for allocation reduction.
2013-03-14 18:09:19 -07:00
value . Value = append ( value . Value , & dto . SampleValueSeries_Value {
Timestamp : proto . Int64 ( sample . Timestamp . Unix ( ) ) ,
2013-06-25 05:02:27 -07:00
Value : proto . Float64 ( float64 ( sample . Value ) ) ,
2013-03-14 18:09:19 -07:00
} )
2013-03-14 15:42:28 -07:00
}
2013-03-14 18:09:19 -07:00
2013-09-05 01:14:34 -07:00
key . Dump ( keyDto )
samplesBatch . Put ( keyDto , value )
2013-03-14 15:42:28 -07:00
}
2013-03-14 18:09:19 -07:00
}
2013-03-14 15:42:28 -07:00
2013-05-02 03:49:13 -07:00
err = l . MetricSamples . Commit ( samplesBatch )
2013-03-14 18:09:19 -07:00
if err != nil {
2013-04-05 09:03:45 -07:00
return
2013-03-14 18:09:19 -07:00
}
2013-03-14 15:42:28 -07:00
err = <- indexErrChan
if err != nil {
2013-04-05 09:03:45 -07:00
return
2013-03-14 15:42:28 -07:00
}
2013-02-08 09:03:26 -08:00
2013-03-14 19:24:28 -07:00
err = <- watermarkErrChan
if err != nil {
2013-04-05 09:03:45 -07:00
return
2013-03-14 19:24:28 -07:00
}
2013-02-08 09:03:26 -08:00
return
}
2013-06-25 05:02:27 -07:00
func extractSampleKey ( i leveldb . Iterator ) ( * SampleKey , error ) {
2013-04-21 10:16:15 -07:00
k := & dto . SampleKey { }
2013-08-29 06:15:22 -07:00
if err := i . Key ( k ) ; err != nil {
2013-06-25 05:02:27 -07:00
return nil , err
2013-03-06 18:16:20 -08:00
}
2013-06-25 05:02:27 -07:00
key := & SampleKey { }
key . Load ( k )
2012-12-25 04:50:36 -08:00
2013-06-25 05:02:27 -07:00
return key , nil
2012-12-25 04:50:36 -08:00
}
2013-06-25 05:02:27 -07:00
func extractSampleValues ( i leveldb . Iterator ) ( Values , error ) {
2013-04-22 04:30:16 -07:00
v := & dto . SampleValueSeries { }
2013-08-29 06:15:22 -07:00
if err := i . Value ( v ) ; err != nil {
2013-06-25 05:02:27 -07:00
return nil , err
2012-12-25 04:50:36 -08:00
}
2013-06-25 05:02:27 -07:00
return NewValuesFromDTO ( v ) , nil
2012-12-25 04:50:36 -08:00
}
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) hasIndexMetric ( m clientmodel . Metric ) ( value bool , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasIndexMetric , result : success } , map [ string ] string { operation : hasIndexMetric , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-01-23 08:18:45 -08:00
2013-08-12 15:36:12 -07:00
return l . MetricMembershipIndex . Has ( m )
2012-12-09 07:27:12 -08:00
}
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) HasLabelPair ( p * LabelPair ) ( value bool , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasLabelPair , result : success } , map [ string ] string { operation : hasLabelPair , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-01-23 08:18:45 -08:00
2013-08-12 15:36:12 -07:00
return l . LabelSetToFingerprints . Has ( p )
2012-12-09 07:27:12 -08:00
}
2013-08-03 09:46:02 -07:00
func ( l * LevelDBMetricPersistence ) HasLabelName ( n clientmodel . LabelName ) ( value bool , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2013-01-23 08:18:45 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : hasLabelName , result : success } , map [ string ] string { operation : hasLabelName , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2013-01-23 08:18:45 -08:00
2013-08-12 15:36:12 -07:00
value , err = l . LabelNameToFingerprints . Has ( n )
2013-01-23 08:18:45 -08:00
return
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetFingerprintsForLabelSet ( labelSet clientmodel . LabelSet ) ( fps clientmodel . Fingerprints , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-09 07:27:12 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getFingerprintsForLabelSet , result : success } , map [ string ] string { operation : getFingerprintsForLabelSet , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2012-12-09 07:27:12 -08:00
2013-01-13 02:15:01 -08:00
sets := [ ] utility . Set { }
2013-06-25 05:02:27 -07:00
for name , value := range labelSet {
2013-08-12 15:36:12 -07:00
fps , _ , err := l . LabelSetToFingerprints . Lookup ( & LabelPair {
2013-08-03 09:46:02 -07:00
Name : name ,
Value : value ,
} )
2012-12-25 04:50:36 -08:00
if err != nil {
2013-08-03 09:46:02 -07:00
return nil , err
2013-06-08 01:27:44 -07:00
}
2012-12-25 04:50:36 -08:00
2013-01-13 02:15:01 -08:00
set := utility . Set { }
2013-08-03 09:46:02 -07:00
for _ , fp := range fps {
2013-05-17 03:58:15 -07:00
set . Add ( * fp )
2012-12-09 07:27:12 -08:00
}
2013-01-13 02:15:01 -08:00
sets = append ( sets , set )
}
numberOfSets := len ( sets )
if numberOfSets == 0 {
2013-06-25 05:02:27 -07:00
return nil , nil
2013-01-13 02:15:01 -08:00
}
base := sets [ 0 ]
for i := 1 ; i < numberOfSets ; i ++ {
base = base . Intersection ( sets [ i ] )
}
for _ , e := range base . Elements ( ) {
2013-06-25 05:02:27 -07:00
fingerprint := e . ( clientmodel . Fingerprint )
2013-05-17 03:58:15 -07:00
fps = append ( fps , & fingerprint )
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
return fps , nil
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetFingerprintsForLabelName ( labelName clientmodel . LabelName ) ( fps clientmodel . Fingerprints , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-09 07:27:12 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getFingerprintsForLabelName , result : success } , map [ string ] string { operation : getFingerprintsForLabelName , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2012-12-09 07:27:12 -08:00
2013-08-03 09:46:02 -07:00
// TODO(matt): Update signature to work with ok.
2013-08-12 15:36:12 -07:00
fps , _ , err = l . LabelNameToFingerprints . Lookup ( labelName )
2012-12-25 04:50:36 -08:00
2013-08-03 09:46:02 -07:00
return fps , err
2012-12-09 07:27:12 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetMetricForFingerprint ( f * clientmodel . Fingerprint ) ( m clientmodel . Metric , err error ) {
2013-04-05 09:03:45 -07:00
defer func ( begin time . Time ) {
2013-03-11 14:21:25 -07:00
duration := time . Since ( begin )
2012-12-25 04:50:36 -08:00
2013-03-01 09:51:36 -08:00
recordOutcome ( duration , err , map [ string ] string { operation : getMetricForFingerprint , result : success } , map [ string ] string { operation : getMetricForFingerprint , result : failure } )
2013-04-05 09:03:45 -07:00
} ( time . Now ( ) )
2012-12-25 04:50:36 -08:00
2013-08-03 09:46:02 -07:00
// TODO(matt): Update signature to work with ok.
2013-08-12 15:36:12 -07:00
m , _ , err = l . FingerprintToMetrics . Lookup ( f )
2012-12-12 03:53:34 -08:00
2013-06-08 01:27:44 -07:00
return m , nil
2012-12-12 03:53:34 -08:00
}
Use custom timestamp type for sample timestamps and related code.
So far we've been using Go's native time.Time for anything related to sample
timestamps. Since the range of time.Time is much bigger than what we need, this
has created two problems:
- there could be time.Time values which were out of the range/precision of the
time type that we persist to disk, therefore causing incorrectly ordered keys.
One bug caused by this was:
https://github.com/prometheus/prometheus/issues/367
It would be good to use a timestamp type that's more closely aligned with
what the underlying storage supports.
- sizeof(time.Time) is 192, while Prometheus should be ok with a single 64-bit
Unix timestamp (possibly even a 32-bit one). Since we store samples in large
numbers, this seriously affects memory usage. Furthermore, copying/working
with the data will be faster if it's smaller.
*MEMORY USAGE RESULTS*
Initial memory usage comparisons for a running Prometheus with 1 timeseries and
100,000 samples show roughly a 13% decrease in total (VIRT) memory usage. In my
tests, this advantage for some reason decreased a bit the more samples the
timeseries had (to 5-7% for millions of samples). This I can't fully explain,
but perhaps garbage collection issues were involved.
*WHEN TO USE THE NEW TIMESTAMP TYPE*
The new clientmodel.Timestamp type should be used whenever time
calculations are either directly or indirectly related to sample
timestamps.
For example:
- the timestamp of a sample itself
- all kinds of watermarks
- anything that may become or is compared to a sample timestamp (like the timestamp
passed into Target.Scrape()).
When to still use time.Time:
- for measuring durations/times not related to sample timestamps, like duration
telemetry exporting, timers that indicate how frequently to execute some
action, etc.
*NOTE ON OPERATOR OPTIMIZATION TESTS*
We don't use operator optimization code anymore, but it still lives in
the code as dead code. It still has tests, but I couldn't get all of them to
pass with the new timestamp format. I commented out the failing cases for now,
but we should probably remove the dead code soon. I just didn't want to do that
in the same change as this.
Change-Id: I821787414b0debe85c9fffaeb57abd453727af0f
2013-10-28 06:35:02 -07:00
func ( l * LevelDBMetricPersistence ) GetValueAtTime ( f * clientmodel . Fingerprint , t clientmodel . Timestamp ) Values {
2013-04-18 07:10:52 -07:00
panic ( "Not implemented" )
2012-12-12 03:53:34 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetBoundaryValues ( f * clientmodel . Fingerprint , i Interval ) Values {
2013-04-18 07:10:52 -07:00
panic ( "Not implemented" )
2012-12-19 11:34:54 -08:00
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetRangeValues ( f * clientmodel . Fingerprint , i Interval ) Values {
2013-04-18 07:10:52 -07:00
panic ( "Not implemented" )
2012-12-12 03:53:34 -08:00
}
2013-02-06 08:06:39 -08:00
type MetricKeyDecoder struct { }
func ( d * MetricKeyDecoder ) DecodeKey ( in interface { } ) ( out interface { } , err error ) {
2013-03-26 06:46:02 -07:00
unmarshaled := dto . LabelPair { }
err = proto . Unmarshal ( in . ( [ ] byte ) , & unmarshaled )
2013-02-06 08:06:39 -08:00
if err != nil {
return
}
2013-02-08 09:03:26 -08:00
2013-06-25 05:02:27 -07:00
out = LabelPair {
Name : clientmodel . LabelName ( * unmarshaled . Name ) ,
Value : clientmodel . LabelValue ( * unmarshaled . Value ) ,
2013-03-26 06:46:02 -07:00
}
2013-02-08 09:03:26 -08:00
return
2013-02-06 08:06:39 -08:00
}
func ( d * MetricKeyDecoder ) DecodeValue ( in interface { } ) ( out interface { } , err error ) {
return
}
2013-03-26 03:45:56 -07:00
type LabelNameFilter struct {
2013-06-25 05:02:27 -07:00
labelName clientmodel . LabelName
2013-03-26 03:45:56 -07:00
}
2013-02-06 08:06:39 -08:00
2013-03-26 06:46:02 -07:00
func ( f LabelNameFilter ) Filter ( key , value interface { } ) ( filterResult storage . FilterResult ) {
2013-06-25 05:02:27 -07:00
labelPair , ok := key . ( LabelPair )
2013-03-26 06:46:02 -07:00
if ok && labelPair . Name == f . labelName {
2013-02-07 02:38:01 -08:00
return storage . ACCEPT
}
return storage . SKIP
2013-02-06 08:06:39 -08:00
}
2013-03-26 03:45:56 -07:00
type CollectLabelValuesOp struct {
2013-06-25 05:02:27 -07:00
labelValues [ ] clientmodel . LabelValue
2013-02-06 08:06:39 -08:00
}
2013-03-26 03:45:56 -07:00
func ( op * CollectLabelValuesOp ) Operate ( key , value interface { } ) ( err * storage . OperatorError ) {
2013-06-25 05:02:27 -07:00
labelPair := key . ( LabelPair )
op . labelValues = append ( op . labelValues , clientmodel . LabelValue ( labelPair . Value ) )
2013-02-06 08:06:39 -08:00
return
}
2013-06-25 05:02:27 -07:00
func ( l * LevelDBMetricPersistence ) GetAllValuesForLabel ( labelName clientmodel . LabelName ) ( values clientmodel . LabelValues , err error ) {
2013-03-26 03:45:56 -07:00
filter := & LabelNameFilter {
labelName : labelName ,
}
labelValuesOp := & CollectLabelValuesOp { }
2013-02-06 08:06:39 -08:00
2013-08-12 15:36:12 -07:00
_ , err = l . LabelSetToFingerprints . ForEach ( & MetricKeyDecoder { } , filter , labelValuesOp )
2013-02-06 08:06:39 -08:00
if err != nil {
return
}
2013-03-26 03:45:56 -07:00
values = labelValuesOp . labelValues
2013-02-06 08:06:39 -08:00
return
}
2013-02-08 09:03:26 -08:00
2013-08-06 03:00:31 -07:00
// Prune compacts each database's keyspace serially.
2013-05-10 07:41:02 -07:00
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
2013-08-05 08:31:49 -07:00
func ( l * LevelDBMetricPersistence ) Prune ( ) {
l . CurationRemarks . Prune ( )
2013-08-12 15:36:12 -07:00
l . FingerprintToMetrics . Prune ( )
l . LabelNameToFingerprints . Prune ( )
l . LabelSetToFingerprints . Prune ( )
2013-08-05 08:31:49 -07:00
l . MetricHighWatermarks . Prune ( )
2013-08-12 15:36:12 -07:00
l . MetricMembershipIndex . Prune ( )
2013-08-05 08:31:49 -07:00
l . MetricSamples . Prune ( )
2013-05-10 16:02:57 -07:00
}
2013-08-06 03:00:31 -07:00
func ( l * LevelDBMetricPersistence ) Sizes ( ) ( total uint64 , err error ) {
2013-05-10 16:02:57 -07:00
size := uint64 ( 0 )
2013-08-06 03:00:31 -07:00
if size , _ , err = l . CurationRemarks . Size ( ) ; err != nil {
2013-05-10 16:02:57 -07:00
return 0 , err
2013-05-10 07:41:02 -07:00
}
2013-05-10 16:02:57 -07:00
total += size
2013-08-12 15:36:12 -07:00
if size , _ , err = l . FingerprintToMetrics . Size ( ) ; err != nil {
2013-08-05 00:25:47 -07:00
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-12 15:36:12 -07:00
if size , _ , err = l . LabelNameToFingerprints . Size ( ) ; err != nil {
2013-08-05 00:25:47 -07:00
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-12 15:36:12 -07:00
if size , _ , err = l . LabelSetToFingerprints . Size ( ) ; err != nil {
2013-08-05 00:25:47 -07:00
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-05 00:25:47 -07:00
if size , _ , err = l . MetricHighWatermarks . Size ( ) ; err != nil {
2013-05-10 16:02:57 -07:00
return 0 , err
2013-05-10 07:41:02 -07:00
}
2013-05-10 16:02:57 -07:00
total += size
2013-08-12 15:36:12 -07:00
if size , _ , err = l . MetricMembershipIndex . Size ( ) ; err != nil {
2013-08-05 00:25:47 -07:00
return 0 , err
}
total += size
2013-05-10 16:02:57 -07:00
2013-08-06 03:00:31 -07:00
if size , err = l . MetricSamples . Size ( ) ; err != nil {
2013-05-10 16:02:57 -07:00
return 0 , err
2013-05-10 07:41:02 -07:00
}
2013-05-10 16:02:57 -07:00
total += size
2013-05-10 07:41:02 -07:00
2013-05-10 16:02:57 -07:00
return total , nil
2013-05-10 07:41:02 -07:00
}
2013-05-14 02:21:27 -07:00
2013-08-05 08:31:49 -07:00
func ( l * LevelDBMetricPersistence ) States ( ) raw . DatabaseStates {
return raw . DatabaseStates {
2013-08-05 00:25:47 -07:00
l . CurationRemarks . State ( ) ,
2013-08-12 15:36:12 -07:00
l . FingerprintToMetrics . State ( ) ,
l . LabelNameToFingerprints . State ( ) ,
l . LabelSetToFingerprints . State ( ) ,
2013-08-05 00:25:47 -07:00
l . MetricHighWatermarks . State ( ) ,
2013-08-12 15:36:12 -07:00
l . MetricMembershipIndex . State ( ) ,
2013-08-05 00:25:47 -07:00
l . MetricSamples . State ( ) ,
}
2013-05-14 02:21:27 -07:00
}
2013-10-22 16:06:49 -07:00
Use custom timestamp type for sample timestamps and related code.
So far we've been using Go's native time.Time for anything related to sample
timestamps. Since the range of time.Time is much bigger than what we need, this
has created two problems:
- there could be time.Time values which were out of the range/precision of the
time type that we persist to disk, therefore causing incorrectly ordered keys.
One bug caused by this was:
https://github.com/prometheus/prometheus/issues/367
It would be good to use a timestamp type that's more closely aligned with
what the underlying storage supports.
- sizeof(time.Time) is 192, while Prometheus should be ok with a single 64-bit
Unix timestamp (possibly even a 32-bit one). Since we store samples in large
numbers, this seriously affects memory usage. Furthermore, copying/working
with the data will be faster if it's smaller.
*MEMORY USAGE RESULTS*
Initial memory usage comparisons for a running Prometheus with 1 timeseries and
100,000 samples show roughly a 13% decrease in total (VIRT) memory usage. In my
tests, this advantage for some reason decreased a bit the more samples the
timeseries had (to 5-7% for millions of samples). This I can't fully explain,
but perhaps garbage collection issues were involved.
*WHEN TO USE THE NEW TIMESTAMP TYPE*
The new clientmodel.Timestamp type should be used whenever time
calculations are either directly or indirectly related to sample
timestamps.
For example:
- the timestamp of a sample itself
- all kinds of watermarks
- anything that may become or is compared to a sample timestamp (like the timestamp
passed into Target.Scrape()).
When to still use time.Time:
- for measuring durations/times not related to sample timestamps, like duration
telemetry exporting, timers that indicate how frequently to execute some
action, etc.
*NOTE ON OPERATOR OPTIMIZATION TESTS*
We don't use operator optimization code anymore, but it still lives in
the code as dead code. It still has tests, but I couldn't get all of them to
pass with the new timestamp format. I commented out the failing cases for now,
but we should probably remove the dead code soon. I just didn't want to do that
in the same change as this.
Change-Id: I821787414b0debe85c9fffaeb57abd453727af0f
2013-10-28 06:35:02 -07:00
type MetricSamplesDecoder struct { }
2013-10-22 16:06:49 -07:00
func ( d * MetricSamplesDecoder ) DecodeKey ( in interface { } ) ( interface { } , error ) {
key := & dto . SampleKey { }
err := proto . Unmarshal ( in . ( [ ] byte ) , key )
if err != nil {
return nil , err
}
sampleKey := & SampleKey { }
sampleKey . Load ( key )
return sampleKey , nil
}
func ( d * MetricSamplesDecoder ) DecodeValue ( in interface { } ) ( interface { } , error ) {
values := & dto . SampleValueSeries { }
err := proto . Unmarshal ( in . ( [ ] byte ) , values )
if err != nil {
return nil , err
}
return NewValuesFromDTO ( values ) , nil
}
Use custom timestamp type for sample timestamps and related code.
So far we've been using Go's native time.Time for anything related to sample
timestamps. Since the range of time.Time is much bigger than what we need, this
has created two problems:
- there could be time.Time values which were out of the range/precision of the
time type that we persist to disk, therefore causing incorrectly ordered keys.
One bug caused by this was:
https://github.com/prometheus/prometheus/issues/367
It would be good to use a timestamp type that's more closely aligned with
what the underlying storage supports.
- sizeof(time.Time) is 192, while Prometheus should be ok with a single 64-bit
Unix timestamp (possibly even a 32-bit one). Since we store samples in large
numbers, this seriously affects memory usage. Furthermore, copying/working
with the data will be faster if it's smaller.
*MEMORY USAGE RESULTS*
Initial memory usage comparisons for a running Prometheus with 1 timeseries and
100,000 samples show roughly a 13% decrease in total (VIRT) memory usage. In my
tests, this advantage for some reason decreased a bit the more samples the
timeseries had (to 5-7% for millions of samples). This I can't fully explain,
but perhaps garbage collection issues were involved.
*WHEN TO USE THE NEW TIMESTAMP TYPE*
The new clientmodel.Timestamp type should be used whenever time
calculations are either directly or indirectly related to sample
timestamps.
For example:
- the timestamp of a sample itself
- all kinds of watermarks
- anything that may become or is compared to a sample timestamp (like the timestamp
passed into Target.Scrape()).
When to still use time.Time:
- for measuring durations/times not related to sample timestamps, like duration
telemetry exporting, timers that indicate how frequently to execute some
action, etc.
*NOTE ON OPERATOR OPTIMIZATION TESTS*
We don't use operator optimization code anymore, but it still lives in
the code as dead code. It still has tests, but I couldn't get all of them to
pass with the new timestamp format. I commented out the failing cases for now,
but we should probably remove the dead code soon. I just didn't want to do that
in the same change as this.
Change-Id: I821787414b0debe85c9fffaeb57abd453727af0f
2013-10-28 06:35:02 -07:00
type AcceptAllFilter struct { }
2013-10-22 16:06:49 -07:00
func ( d * AcceptAllFilter ) Filter ( _ , _ interface { } ) storage . FilterResult {
return storage . ACCEPT
}