2013-02-07 02:38:01 -08:00
// Copyright 2013 Prometheus Team
2012-11-26 11:11:34 -08:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2012-11-26 10:56:51 -08:00
package leveldb
2012-11-24 03:33:34 -08:00
import (
2013-01-27 11:28:37 -08:00
"flag"
2012-11-24 03:33:34 -08:00
"github.com/jmhodges/levigo"
2013-01-27 09:49:45 -08:00
"github.com/prometheus/prometheus/coding"
2013-02-06 08:05:23 -08:00
"github.com/prometheus/prometheus/storage"
2013-01-27 09:49:45 -08:00
"github.com/prometheus/prometheus/storage/raw"
2012-11-24 03:33:34 -08:00
"io"
)
2013-01-27 11:28:37 -08:00
var (
2013-03-12 10:25:52 -07:00
leveldbFlushOnMutate = flag . Bool ( "leveldbFlushOnMutate" , false , "Whether LevelDB should flush every operation to disk upon mutation before returning (bool)." )
2013-02-01 04:35:07 -08:00
leveldbUseSnappy = flag . Bool ( "leveldbUseSnappy" , true , "Whether LevelDB attempts to use Snappy for compressing elements (bool)." )
leveldbUseParanoidChecks = flag . Bool ( "leveldbUseParanoidChecks" , true , "Whether LevelDB uses expensive checks (bool)." )
2013-01-27 11:28:37 -08:00
)
2013-03-04 11:43:07 -08:00
// LevelDBPersistence is a disk-backed sorted key-value store.
2012-11-28 11:22:49 -08:00
type LevelDBPersistence struct {
2012-11-24 03:33:34 -08:00
cache * levigo . Cache
filterPolicy * levigo . FilterPolicy
options * levigo . Options
storage * levigo . DB
readOptions * levigo . ReadOptions
writeOptions * levigo . WriteOptions
}
2013-02-08 09:03:26 -08:00
// LevelDB iterators have a number of resources that need to be closed.
// iteratorCloser encapsulates the various ones.
2012-11-26 10:56:51 -08:00
type iteratorCloser struct {
iterator * levigo . Iterator
readOptions * levigo . ReadOptions
snapshot * levigo . Snapshot
storage * levigo . DB
}
2012-12-25 04:50:36 -08:00
func NewLevelDBPersistence ( storageRoot string , cacheCapacity , bitsPerBloomFilterEncoded int ) ( p * LevelDBPersistence , err error ) {
2012-11-24 03:33:34 -08:00
options := levigo . NewOptions ( )
options . SetCreateIfMissing ( true )
2013-02-01 04:35:07 -08:00
options . SetParanoidChecks ( * leveldbUseParanoidChecks )
compression := levigo . NoCompression
if * leveldbUseSnappy {
compression = levigo . SnappyCompression
}
options . SetCompression ( compression )
2012-11-24 03:33:34 -08:00
cache := levigo . NewLRUCache ( cacheCapacity )
options . SetCache ( cache )
filterPolicy := levigo . NewBloomFilter ( bitsPerBloomFilterEncoded )
options . SetFilterPolicy ( filterPolicy )
2012-12-25 04:50:36 -08:00
storage , err := levigo . Open ( storageRoot , options )
if err != nil {
return
}
2012-11-24 03:33:34 -08:00
readOptions := levigo . NewReadOptions ( )
writeOptions := levigo . NewWriteOptions ( )
2013-01-27 11:28:37 -08:00
writeOptions . SetSync ( * leveldbFlushOnMutate )
2012-12-25 04:50:36 -08:00
p = & LevelDBPersistence {
2012-11-24 03:33:34 -08:00
cache : cache ,
filterPolicy : filterPolicy ,
options : options ,
readOptions : readOptions ,
storage : storage ,
writeOptions : writeOptions ,
}
2012-12-25 04:50:36 -08:00
return
2012-11-24 03:33:34 -08:00
}
2012-12-25 04:50:36 -08:00
func ( l * LevelDBPersistence ) Close ( ) ( err error ) {
// These are deferred to take advantage of forced closing in case of stack
// unwinding due to anomalies.
defer func ( ) {
if l . storage != nil {
l . storage . Close ( )
}
} ( )
2012-11-24 03:33:34 -08:00
defer func ( ) {
if l . filterPolicy != nil {
l . filterPolicy . Close ( )
}
} ( )
defer func ( ) {
if l . cache != nil {
l . cache . Close ( )
}
} ( )
defer func ( ) {
if l . options != nil {
l . options . Close ( )
}
} ( )
defer func ( ) {
if l . readOptions != nil {
l . readOptions . Close ( )
}
} ( )
defer func ( ) {
if l . writeOptions != nil {
l . writeOptions . Close ( )
}
} ( )
2012-12-25 04:50:36 -08:00
return
2012-11-24 03:33:34 -08:00
}
2012-12-25 04:50:36 -08:00
func ( l * LevelDBPersistence ) Get ( value coding . Encoder ) ( b [ ] byte , err error ) {
key , err := value . Encode ( )
if err != nil {
return
2012-11-24 03:33:34 -08:00
}
2012-12-25 04:50:36 -08:00
return l . storage . Get ( l . readOptions , key )
2012-11-24 03:33:34 -08:00
}
2012-12-25 04:50:36 -08:00
func ( l * LevelDBPersistence ) Has ( value coding . Encoder ) ( h bool , err error ) {
raw , err := l . Get ( value )
if err != nil {
return
2012-11-24 03:33:34 -08:00
}
2012-12-25 04:50:36 -08:00
h = raw != nil
2012-11-24 03:33:34 -08:00
2012-12-25 04:50:36 -08:00
return
}
2012-11-24 03:33:34 -08:00
2012-12-25 04:50:36 -08:00
func ( l * LevelDBPersistence ) Drop ( value coding . Encoder ) ( err error ) {
key , err := value . Encode ( )
if err != nil {
return
2012-11-24 03:33:34 -08:00
}
2012-12-25 04:50:36 -08:00
err = l . storage . Delete ( l . writeOptions , key )
return
2012-11-24 03:33:34 -08:00
}
2012-12-25 04:50:36 -08:00
func ( l * LevelDBPersistence ) Put ( key , value coding . Encoder ) ( err error ) {
keyEncoded , err := key . Encode ( )
if err != nil {
return
}
2012-11-24 03:33:34 -08:00
2012-12-25 04:50:36 -08:00
valueEncoded , err := value . Encode ( )
if err != nil {
return
2012-11-24 03:33:34 -08:00
}
2012-12-25 04:50:36 -08:00
err = l . storage . Put ( l . writeOptions , keyEncoded , valueEncoded )
return
2012-11-24 03:33:34 -08:00
}
2013-02-08 09:03:26 -08:00
func ( l * LevelDBPersistence ) Commit ( b Batch ) ( err error ) {
return l . storage . Write ( l . writeOptions , b . ( batch ) . batch )
}
2012-12-25 04:50:36 -08:00
func ( l * LevelDBPersistence ) GetAll ( ) ( pairs [ ] raw . Pair , err error ) {
2012-11-24 03:33:34 -08:00
snapshot := l . storage . NewSnapshot ( )
defer l . storage . ReleaseSnapshot ( snapshot )
readOptions := levigo . NewReadOptions ( )
defer readOptions . Close ( )
readOptions . SetSnapshot ( snapshot )
iterator := l . storage . NewIterator ( readOptions )
defer iterator . Close ( )
iterator . SeekToFirst ( )
for iterator := iterator ; iterator . Valid ( ) ; iterator . Next ( ) {
2012-12-25 04:50:36 -08:00
pairs = append ( pairs , raw . Pair { Left : iterator . Key ( ) , Right : iterator . Value ( ) } )
2012-11-24 03:33:34 -08:00
2012-12-25 04:50:36 -08:00
err = iterator . GetError ( )
if err != nil {
return
}
2012-11-24 03:33:34 -08:00
}
2012-12-25 04:50:36 -08:00
return
2012-11-24 03:33:34 -08:00
}
2012-12-25 04:50:36 -08:00
func ( i * iteratorCloser ) Close ( ) ( err error ) {
2012-11-24 03:33:34 -08:00
defer func ( ) {
if i . storage != nil {
if i . snapshot != nil {
i . storage . ReleaseSnapshot ( i . snapshot )
}
}
} ( )
defer func ( ) {
if i . iterator != nil {
i . iterator . Close ( )
}
} ( )
defer func ( ) {
if i . readOptions != nil {
i . readOptions . Close ( )
}
} ( )
2012-12-25 04:50:36 -08:00
return
2012-11-24 03:33:34 -08:00
}
2012-12-25 04:50:36 -08:00
func ( l * LevelDBPersistence ) GetIterator ( ) ( i * levigo . Iterator , c io . Closer , err error ) {
2012-11-24 03:33:34 -08:00
snapshot := l . storage . NewSnapshot ( )
readOptions := levigo . NewReadOptions ( )
readOptions . SetSnapshot ( snapshot )
2012-12-25 04:50:36 -08:00
i = l . storage . NewIterator ( readOptions )
2012-11-24 03:33:34 -08:00
2012-12-25 04:50:36 -08:00
c = & iteratorCloser {
iterator : i ,
2012-11-24 03:33:34 -08:00
readOptions : readOptions ,
snapshot : snapshot ,
storage : l . storage ,
}
2012-12-25 04:50:36 -08:00
return
2012-11-24 03:33:34 -08:00
}
2013-02-06 08:05:23 -08:00
func ( l * LevelDBPersistence ) ForEach ( decoder storage . RecordDecoder , filter storage . RecordFilter , operator storage . RecordOperator ) ( scannedEntireCorpus bool , err error ) {
iterator , closer , err := l . GetIterator ( )
if err != nil {
return
}
defer closer . Close ( )
for iterator . SeekToFirst ( ) ; iterator . Valid ( ) ; iterator . Next ( ) {
err = iterator . GetError ( )
if err != nil {
return
}
decodedKey , decodeErr := decoder . DecodeKey ( iterator . Key ( ) )
if decodeErr != nil {
continue
}
decodedValue , decodeErr := decoder . DecodeValue ( iterator . Value ( ) )
if decodeErr != nil {
continue
}
switch filter . Filter ( decodedKey , decodedValue ) {
case storage . STOP :
return
case storage . SKIP :
continue
case storage . ACCEPT :
opErr := operator . Operate ( decodedKey , decodedValue )
if opErr != nil {
if opErr . Continuable {
continue
}
break
}
}
}
scannedEntireCorpus = true
return
}
2013-02-08 09:03:26 -08:00
// Batch encapsulates a list of mutations to occur to the datastore. It must
// be closed once done.
type Batch interface {
Delete ( coding . Encoder )
Put ( coding . Encoder , coding . Encoder )
Close ( )
}
func NewBatch ( ) Batch {
return batch {
batch : levigo . NewWriteBatch ( ) ,
}
}
type batch struct {
batch * levigo . WriteBatch
}
func ( b batch ) Delete ( key coding . Encoder ) {
keyEncoded , err := key . Encode ( )
if err != nil {
panic ( err )
}
b . batch . Delete ( keyEncoded )
}
func ( b batch ) Put ( key , value coding . Encoder ) {
keyEncoded , err := key . Encode ( )
if err != nil {
panic ( err )
}
valueEncoded , err := value . Encode ( )
if err != nil {
panic ( err )
}
b . batch . Put ( keyEncoded , valueEncoded )
}
func ( b batch ) Close ( ) {
b . batch . Close ( )
}