Merge pull request #95 from prometheus/feature/persistence/batching

Several interface cleanups.
This commit is contained in:
Matt T. Proud 2013-03-24 00:19:46 -07:00
commit 70448711ec
5 changed files with 99 additions and 57 deletions

View file

@ -20,7 +20,3 @@ type Iterator interface {
Key() interface{} Key() interface{}
Value() interface{} Value() interface{}
} }
type IteratorManager interface {
Iterator() Iterator
}

View file

@ -16,6 +16,7 @@ package leveldb
import ( import (
"github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding"
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
) )
@ -57,6 +58,6 @@ func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFi
return return
} }
func (l *LevelDBMembershipIndex) Commit(batch leveldb.Batch) error { func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error {
return l.persistence.Commit(batch) return l.persistence.Commit(batch)
} }

View file

@ -16,6 +16,7 @@ package raw
import ( import (
"github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"io"
) )
type Pair struct { type Pair struct {
@ -25,13 +26,20 @@ type Pair struct {
type EachFunc func(pair *Pair) type EachFunc func(pair *Pair)
// Persistence models a key-value store for bytes that supports various
// additional operations.
type Persistence interface { type Persistence interface {
Has(key coding.Encoder) (bool, error) io.Closer
Get(key coding.Encoder) ([]byte, error)
Drop(key coding.Encoder) error
Put(key, value coding.Encoder) error
Close() error
// Has informs the user whether a given key exists in the database.
Has(key coding.Encoder) (bool, error)
// Get retrieves the key from the database if it exists or returns nil if
// it is absent.
Get(key coding.Encoder) ([]byte, error)
// Drop removes the key from the database.
Drop(key coding.Encoder) error
// Put sets the key to a given value.
Put(key, value coding.Encoder) error
// ForEach is responsible for iterating through all records in the database // ForEach is responsible for iterating through all records in the database
// until one of the following conditions are met: // until one of the following conditions are met:
// //
@ -41,7 +49,20 @@ type Persistence interface {
// //
// Decoding errors for an entity cause that entity to be skipped. // Decoding errors for an entity cause that entity to be skipped.
ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error)
// Commit applies the Batch operations to the database.
Commit(Batch) error
// Pending removal. // Pending removal.
GetAll() ([]Pair, error) GetAll() ([]Pair, error)
} }
// Batch models a pool of mutations for the database that can be committed
// en masse. The interface implies no protocol around the atomicity of
// effectuation.
type Batch interface {
io.Closer
// Put follows the same protocol as Persistence.Put.
Put(key, value coding.Encoder)
// Drop follows the same protocol as Persistence.Drop.
Drop(key coding.Encoder)
}

View file

@ -0,0 +1,57 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding"
)
type batch struct {
batch *levigo.WriteBatch
}
func NewBatch() batch {
return batch{
batch: levigo.NewWriteBatch(),
}
}
func (b batch) Drop(key coding.Encoder) {
keyEncoded, err := key.Encode()
if err != nil {
panic(err)
}
b.batch.Delete(keyEncoded)
}
func (b batch) Put(key, value coding.Encoder) {
keyEncoded, err := key.Encode()
if err != nil {
panic(err)
}
valueEncoded, err := value.Encode()
if err != nil {
panic(err)
}
b.batch.Put(keyEncoded, valueEncoded)
}
func (b batch) Close() (err error) {
b.batch.Close()
return
}

View file

@ -172,8 +172,17 @@ func (l *LevelDBPersistence) Put(key, value coding.Encoder) (err error) {
return return
} }
func (l *LevelDBPersistence) Commit(b Batch) (err error) { func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
return l.storage.Write(l.writeOptions, b.(batch).batch) // XXX: This is a wart to clean up later. Ideally, after doing extensive
// tests, we could create a Batch struct that journals pending
// operations which the given Persistence implementation could convert
// to its specific commit requirements.
batch, ok := b.(batch)
if !ok {
panic("leveldb.batch expected")
}
return l.storage.Write(l.writeOptions, batch.batch)
} }
func (l *LevelDBPersistence) GetAll() (pairs []raw.Pair, err error) { func (l *LevelDBPersistence) GetAll() (pairs []raw.Pair, err error) {
@ -229,6 +238,8 @@ func (l *LevelDBPersistence) GetIterator() (i *levigo.Iterator, c io.Closer, err
readOptions.SetSnapshot(snapshot) readOptions.SetSnapshot(snapshot)
i = l.storage.NewIterator(readOptions) i = l.storage.NewIterator(readOptions)
// TODO: Kill the return of an additional io.Closer and just use a decorated
// iterator interface.
c = &iteratorCloser{ c = &iteratorCloser{
iterator: i, iterator: i,
readOptions: readOptions, readOptions: readOptions,
@ -279,47 +290,3 @@ func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter stora
scannedEntireCorpus = true scannedEntireCorpus = true
return return
} }
// Batch encapsulates a list of mutations to occur to the datastore. It must
// be closed once done.
type Batch interface {
Delete(coding.Encoder)
Put(coding.Encoder, coding.Encoder)
Close()
}
func NewBatch() Batch {
return batch{
batch: levigo.NewWriteBatch(),
}
}
type batch struct {
batch *levigo.WriteBatch
}
func (b batch) Delete(key coding.Encoder) {
keyEncoded, err := key.Encode()
if err != nil {
panic(err)
}
b.batch.Delete(keyEncoded)
}
func (b batch) Put(key, value coding.Encoder) {
keyEncoded, err := key.Encode()
if err != nil {
panic(err)
}
valueEncoded, err := value.Encode()
if err != nil {
panic(err)
}
b.batch.Put(keyEncoded, valueEncoded)
}
func (b batch) Close() {
b.batch.Close()
}