diff --git a/storage/metric/leveldb/lifecycle.go b/storage/metric/leveldb/lifecycle.go index e09c83f10..cbe8cd1e8 100644 --- a/storage/metric/leveldb/lifecycle.go +++ b/storage/metric/leveldb/lifecycle.go @@ -14,12 +14,23 @@ package leveldb import ( + "flag" index "github.com/prometheus/prometheus/storage/raw/index/leveldb" storage "github.com/prometheus/prometheus/storage/raw/leveldb" "io" "log" ) +var ( + // These flag values are back of the envelope, though they seem sensible. + // Please re-evaluate based on your own needs. + fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 100*1024*1024, "The size for the fingerprint to label pair index (bytes).") + samplesByFingerprintCacheSize = flag.Int("samplesByFingerprintCacheSizeBytes", 500*1024*1024, "The size for the samples database (bytes).") + labelNameToFingerprintsCacheSize = flag.Int("labelNameToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label name to metric fingerprint index (bytes).") + labelPairToFingerprintsCacheSize = flag.Int("labelPairToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label pair to metric fingerprint index (bytes).") + metricMembershipIndexCacheSize = flag.Int("metricMembershipCacheSizeBytes", 50*1024*1024, "The size for the metric membership index (bytes).") +) + type leveldbOpener func() func (l *LevelDBMetricPersistence) Close() error { @@ -86,7 +97,7 @@ func (l *LevelDBMetricPersistence) Close() error { return nil } -func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) { +func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) { log.Printf("Opening LevelDBPersistence storage containers...") errorChannel := make(chan error, 5) @@ -101,7 +112,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Label Names and Value Pairs by Fingerprint", func() { var err error - emission.fingerprintToMetrics, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) + emission.fingerprintToMetrics, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", *fingerprintsToLabelPairCacheSize, 10) errorChannel <- err }, }, @@ -109,7 +120,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Samples by Fingerprint", func() { var err error - emission.metricSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) + emission.metricSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10) errorChannel <- err }, }, @@ -117,7 +128,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Fingerprints by Label Name", func() { var err error - emission.labelNameToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) + emission.labelNameToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", *labelNameToFingerprintsCacheSize, 10) errorChannel <- err }, }, @@ -125,7 +136,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Fingerprints by Label Name and Value Pair", func() { var err error - emission.labelSetToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) + emission.labelSetToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", *labelPairToFingerprintsCacheSize, 10) errorChannel <- err }, }, @@ -133,7 +144,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Metric Membership Index", func() { var err error - emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10) + emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", *metricMembershipIndexCacheSize, 10) errorChannel <- err }, }, @@ -149,16 +160,18 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc } for i := 0; i < cap(errorChannel); i++ { - openingError := <-errorChannel + err = <-errorChannel - if openingError != nil { - log.Printf("Could not open a LevelDBPersistence storage container: %q\n", openingError) + if err != nil { + log.Printf("Could not open a LevelDBPersistence storage container: %q\n", err) - return nil, openingError + return } } log.Printf("Successfully opened all LevelDBPersistence storage containers.\n") - return emission, nil + persistence = emission + + return } diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 264a4ada1..34c414796 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -14,12 +14,17 @@ package leveldb import ( + "flag" "github.com/jmhodges/levigo" "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage/raw" "io" ) +var ( + leveldbFlushOnMutate = flag.Bool("leveldbFlushOnMutate", true, "Whether LevelDB should flush every operation to disk upon mutation before returning (bool).") +) + type LevelDBPersistence struct { cache *levigo.Cache filterPolicy *levigo.FilterPolicy @@ -54,8 +59,7 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter readOptions := levigo.NewReadOptions() writeOptions := levigo.NewWriteOptions() - writeOptions.SetSync(true) - + writeOptions.SetSync(*leveldbFlushOnMutate) p = &LevelDBPersistence{ cache: cache, filterPolicy: filterPolicy,