Consolidate LevelDB storage construction.

There are too many parameters to constructing a LevelDB storage
instance for a construction method, so I've opted to take an
idiomatic approach of embedding them in a struct for easier
mediation and versioning.
This commit is contained in:
Matt T. Proud 2013-08-03 17:25:03 +02:00
parent fcf784c13c
commit 772d3d6b11
6 changed files with 90 additions and 46 deletions

View file

@ -103,7 +103,11 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Label Names and Value Pairs by Fingerprint", "Label Names and Value Pairs by Fingerprint",
func() { func() {
var err error var err error
emission.fingerprintToMetrics, err = leveldb.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", *fingerprintsToLabelPairCacheSize, 10) o := &leveldb.LevelDBOptions{
Path: baseDirectory + "/label_name_and_value_pairs_by_fingerprint",
CacheSizeBytes: *fingerprintsToLabelPairCacheSize,
}
emission.fingerprintToMetrics, err = leveldb.NewLevelDBPersistence(o)
workers.MayFail(err) workers.MayFail(err)
}, },
}, },
@ -111,7 +115,11 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Samples by Fingerprint", "Samples by Fingerprint",
func() { func() {
var err error var err error
emission.MetricSamples, err = leveldb.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10) o := &leveldb.LevelDBOptions{
Path: baseDirectory + "/samples_by_fingerprint",
CacheSizeBytes: *fingerprintsToLabelPairCacheSize,
}
emission.MetricSamples, err = leveldb.NewLevelDBPersistence(o)
workers.MayFail(err) workers.MayFail(err)
}, },
}, },
@ -119,7 +127,11 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"High Watermarks by Fingerprint", "High Watermarks by Fingerprint",
func() { func() {
var err error var err error
emission.MetricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10) o := &leveldb.LevelDBOptions{
Path: baseDirectory + "/high_watermarks_by_fingerprint",
CacheSizeBytes: *highWatermarkCacheSize,
}
emission.MetricHighWatermarks, err = leveldb.NewLevelDBPersistence(o)
workers.MayFail(err) workers.MayFail(err)
}, },
}, },
@ -127,7 +139,11 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name", "Fingerprints by Label Name",
func() { func() {
var err error var err error
emission.labelNameToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", *labelNameToFingerprintsCacheSize, 10) o := &leveldb.LevelDBOptions{
Path: baseDirectory + "/fingerprints_by_label_name",
CacheSizeBytes: *labelNameToFingerprintsCacheSize,
}
emission.labelNameToFingerprints, err = leveldb.NewLevelDBPersistence(o)
workers.MayFail(err) workers.MayFail(err)
}, },
}, },
@ -135,7 +151,11 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name and Value Pair", "Fingerprints by Label Name and Value Pair",
func() { func() {
var err error var err error
emission.labelSetToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", *labelPairToFingerprintsCacheSize, 10) o := &leveldb.LevelDBOptions{
Path: baseDirectory + "/fingerprints_by_label_name_and_value_pair",
CacheSizeBytes: *labelPairToFingerprintsCacheSize,
}
emission.labelSetToFingerprints, err = leveldb.NewLevelDBPersistence(o)
workers.MayFail(err) workers.MayFail(err)
}, },
}, },
@ -143,7 +163,13 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Metric Membership Index", "Metric Membership Index",
func() { func() {
var err error var err error
emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", *metricMembershipIndexCacheSize, 10) o := &index.LevelDBIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: baseDirectory + "/metric_membership_index",
CacheSizeBytes: *metricMembershipIndexCacheSize,
},
}
emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(o)
workers.MayFail(err) workers.MayFail(err)
}, },
}, },
@ -151,7 +177,11 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Sample Curation Remarks", "Sample Curation Remarks",
func() { func() {
var err error var err error
emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/curation_remarks", *curationRemarksCacheSize, 10) o := &leveldb.LevelDBOptions{
Path: baseDirectory + "/curation_remarks",
CacheSizeBytes: *curationRemarksCacheSize,
}
emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(o)
workers.MayFail(err) workers.MayFail(err)
}, },
}, },

View file

@ -1820,7 +1820,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual) t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual)
} }
for j, out := range scenario.out { for j, out := range scenario.out {
if out != actual[j] { if !out.Equal(actual[j]) {
t.Fatalf("%d. expected output %v, got %v", i, scenario.out, actual) t.Fatalf("%d. expected output %v, got %v", i, scenario.out, actual)
} }
} }

View file

@ -848,19 +848,25 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close() defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: curatorDirectory.Path(),
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer curatorStates.Close() defer curatorStates.Close()
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) watermarkStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: watermarkDirectory.Path(),
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer watermarkStates.Close() defer watermarkStates.Close()
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) samples, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: sampleDirectory.Path(),
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1367,19 +1373,21 @@ func TestCuratorDeletionProcessor(t *testing.T) {
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close() defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: curatorDirectory.Path()})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer curatorStates.Close() defer curatorStates.Close()
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) watermarkStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: watermarkDirectory.Path()})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer watermarkStates.Close() defer watermarkStates.Close()
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) samples, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{Path: sampleDirectory.Path()})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View file

@ -22,7 +22,7 @@ import (
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
) )
var existenceValue = &dto.MembershipIndexValue{} var existenceValue = new(dto.MembershipIndexValue)
type LevelDBMembershipIndex struct { type LevelDBMembershipIndex struct {
persistence *leveldb.LevelDBPersistence persistence *leveldb.LevelDBPersistence
@ -44,18 +44,19 @@ func (l *LevelDBMembershipIndex) Put(k proto.Message) error {
return l.persistence.Put(k, existenceValue) return l.persistence.Put(k, existenceValue)
} }
func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (i *LevelDBMembershipIndex, err error) { type LevelDBIndexOptions struct {
leveldb.LevelDBOptions
}
leveldbPersistence, err := leveldb.NewLevelDBPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded) func NewLevelDBMembershipIndex(o *LevelDBIndexOptions) (i *LevelDBMembershipIndex, err error) {
leveldbPersistence, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil { if err != nil {
return return nil, err
} }
i = &LevelDBMembershipIndex{ return &LevelDBMembershipIndex{
persistence: leveldbPersistence, persistence: leveldbPersistence,
} }, nil
return
} }
func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error { func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error {

View file

@ -14,7 +14,6 @@
package leveldb package leveldb
import ( import (
"flag"
"fmt" "fmt"
"time" "time"
@ -26,13 +25,6 @@ import (
"github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw"
) )
var (
leveldbFlushOnMutate = flag.Bool("leveldbFlushOnMutate", false, "Whether LevelDB should flush every operation to disk upon mutation before returning (bool).")
leveldbUseSnappy = flag.Bool("leveldbUseSnappy", true, "Whether LevelDB attempts to use Snappy for compressing elements (bool).")
leveldbUseParanoidChecks = flag.Bool("leveldbUseParanoidChecks", false, "Whether LevelDB uses expensive checks (bool).")
maximumOpenFiles = flag.Int("leveldb.maximumOpenFiles", 128, "The maximum number of files each LevelDB may maintain.")
)
// LevelDBPersistence is a disk-backed sorted key-value store. // LevelDBPersistence is a disk-backed sorted key-value store.
type LevelDBPersistence struct { type LevelDBPersistence struct {
path string path string
@ -169,25 +161,37 @@ func (i levigoIterator) GetError() (err error) {
return i.iterator.GetError() return i.iterator.GetError()
} }
func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevelDBPersistence, error) { type LevelDBOptions struct {
Path string
CacheSizeBytes int
OpenFileAllowance int
FlushOnMutate bool
UseParanoidChecks bool
NotUseSnappy bool
}
func NewLevelDBPersistence(o *LevelDBOptions) (*LevelDBPersistence, error) {
options := levigo.NewOptions() options := levigo.NewOptions()
options.SetCreateIfMissing(true) options.SetCreateIfMissing(true)
options.SetParanoidChecks(*leveldbUseParanoidChecks) options.SetParanoidChecks(o.UseParanoidChecks)
compression := levigo.NoCompression compression := levigo.SnappyCompression
if *leveldbUseSnappy { if !o.NotUseSnappy {
compression = levigo.SnappyCompression compression = levigo.NoCompression
} }
options.SetCompression(compression) options.SetCompression(compression)
cache := levigo.NewLRUCache(cacheCapacity) cache := levigo.NewLRUCache(o.CacheSizeBytes)
options.SetCache(cache) options.SetCache(cache)
filterPolicy := levigo.NewBloomFilter(bitsPerBloomFilterEncoded) filterPolicy := levigo.NewBloomFilter(10)
options.SetFilterPolicy(filterPolicy) options.SetFilterPolicy(filterPolicy)
options.SetMaxOpenFiles(*maximumOpenFiles) options.SetMaxOpenFiles(o.OpenFileAllowance)
storage, err := levigo.Open(storageRoot, options) storage, err := levigo.Open(o.Path, options)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -195,10 +199,10 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter
readOptions := levigo.NewReadOptions() readOptions := levigo.NewReadOptions()
writeOptions := levigo.NewWriteOptions() writeOptions := levigo.NewWriteOptions()
writeOptions.SetSync(*leveldbFlushOnMutate) writeOptions.SetSync(o.FlushOnMutate)
return &LevelDBPersistence{ return &LevelDBPersistence{
path: storageRoot, path: o.Path,
cache: cache, cache: cache,
filterPolicy: filterPolicy, filterPolicy: filterPolicy,

View file

@ -20,10 +20,7 @@ import (
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
) )
const ( const cacheCapacity = 0
cacheCapacity = 0
bitsPerBloomFilterEncoded = 0
)
type ( type (
// Pair models a prospective (key, value) double that will be committed to // Pair models a prospective (key, value) double that will be committed to
@ -64,7 +61,11 @@ type (
func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) { func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) {
t = test.NewTemporaryDirectory(n, p.tester) t = test.NewTemporaryDirectory(n, p.tester)
persistence, err := leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) o := &leveldb.LevelDBOptions{
Path: t.Path(),
CacheSizeBytes: cacheCapacity,
}
persistence, err := leveldb.NewLevelDBPersistence(o)
if err != nil { if err != nil {
defer t.Close() defer t.Close()
p.tester.Fatal(err) p.tester.Fatal(err)