Merge pull request #96 from prometheus/refactor/persistence/iterator-interface

Serious Cleaning of LevelDB Iterator Code
This commit is contained in:
Matt T. Proud 2013-03-25 04:58:36 -07:00
commit 0980aeac52
6 changed files with 199 additions and 134 deletions

View file

@ -19,6 +19,7 @@ import (
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"time"
)
@ -42,9 +43,9 @@ func (f *diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool {
return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint))
}
func newDiskFrontier(i iterator) (d *diskFrontier, err error) {
i.SeekToLast()
if !i.Valid() || i.Key() == nil {
func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) {
if !i.SeekToLast() || i.Key() == nil {
return
}
lastKey, err := extractSampleKey(i)
@ -85,7 +86,7 @@ func (f seriesFrontier) String() string {
// newSeriesFrontier furnishes a populated diskFrontier for a given
// fingerprint. A nil diskFrontier will be returned if the series cannot
// be found in the store.
func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seriesFrontier, err error) {
func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) {
var (
lowerSeek = firstSupertime
upperSeek = lastSupertime
@ -129,7 +130,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri
//
//
if !retrievedFingerprint.Equal(f) {
i.Prev()
i.Previous()
retrievedKey, err = extractSampleKey(i)
if err != nil {

View file

@ -1,22 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
type Iterator interface {
Seek(key interface{}) (ok bool)
Next() (ok bool)
Previous() (ok bool)
Key() interface{}
Value() interface{}
}

View file

@ -683,7 +683,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
return
}
func extractSampleKey(i iterator) (k *dto.SampleKey, err error) {
func extractSampleKey(i leveldb.Iterator) (k *dto.SampleKey, err error) {
if i == nil {
panic("nil iterator")
}
@ -698,7 +698,7 @@ func extractSampleKey(i iterator) (k *dto.SampleKey, err error) {
return
}
func extractSampleValues(i iterator) (v *dto.SampleValueSeries, err error) {
func extractSampleValues(i leveldb.Iterator) (v *dto.SampleValueSeries, err error) {
if i == nil {
panic("nil iterator")
}
@ -937,18 +937,6 @@ func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 {
return y1 + (offset * dDt)
}
type iterator interface {
Close()
Key() []byte
Next()
Prev()
Seek([]byte)
SeekToFirst()
SeekToLast()
Valid() bool
Value() []byte
}
func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.Time, s StalenessPolicy) (sample *model.Sample, err error) {
begin := time.Now()
@ -975,15 +963,10 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T
return
}
iterator, closer, err := l.metricSamples.GetIterator()
if err != nil {
return
}
iterator := l.metricSamples.NewIterator(true)
defer iterator.Close()
defer closer.Close()
iterator.Seek(e)
if !iterator.Valid() {
if !iterator.Seek(e) {
/*
* Two cases for this:
* 1.) Corruption in LevelDB.
@ -994,13 +977,10 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T
* database is sufficient for our purposes. This is, in all reality, a
* corner case but one that could bring down the system.
*/
iterator, closer, err = l.metricSamples.GetIterator()
if err != nil {
return
}
defer closer.Close()
iterator.SeekToLast()
if !iterator.Valid() {
iterator = l.metricSamples.NewIterator(true)
defer iterator.Close()
if !iterator.SeekToLast() {
/*
* For whatever reason, the LevelDB cannot be recovered.
*/
@ -1048,8 +1028,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T
firstTime := indexable.DecodeTime(firstKey.Timestamp)
if t.Before(firstTime) || peekAhead {
iterator.Prev()
if !iterator.Valid() {
if !iterator.Previous() {
/*
* Two cases for this:
* 1.) Corruption in LevelDB.
@ -1106,8 +1085,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T
return
}
iterator.Next()
if !iterator.Valid() {
if !iterator.Next() {
/*
* Two cases for this:
* 1.) Corruption in LevelDB.
@ -1188,17 +1166,12 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model.
return
}
iterator, closer, err := l.metricSamples.GetIterator()
if err != nil {
return
}
defer closer.Close()
iterator.Seek(e)
iterator := l.metricSamples.NewIterator(true)
defer iterator.Close()
predicate := keyIsOlderThan(i.NewestInclusive)
for ; iterator.Valid(); iterator.Next() {
for valid := iterator.Seek(e); valid; valid = iterator.Next() {
retrievedKey := &dto.SampleKey{}
retrievedKey, err = extractSampleKey(iterator)

View file

@ -15,12 +15,12 @@ package metric
import (
"fmt"
"github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"sort"
"sync"
"time"
@ -139,13 +139,10 @@ func (t *tieredStorage) rebuildDiskFrontier() (err error) {
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure})
}()
i, closer, err := t.diskStorage.metricSamples.GetIterator()
if closer != nil {
defer closer.Close()
}
if err != nil {
panic(err)
}
i := t.diskStorage.metricSamples.NewIterator(true)
defer i.Close()
t.diskFrontier, err = newDiskFrontier(i)
if err != nil {
panic(err)
@ -365,13 +362,8 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
}
// Get a single iterator that will be used for all data extraction below.
iterator, closer, err := t.diskStorage.metricSamples.GetIterator()
if closer != nil {
defer closer.Close()
}
if err != nil {
panic(err)
}
iterator := t.diskStorage.metricSamples.NewIterator(true)
defer iterator.Close()
for _, scanJob := range scans {
seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
@ -442,7 +434,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
return
}
func (t *tieredStorage) loadChunkAroundTime(iterator *levigo.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk []model.SamplePair) {
func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk []model.SamplePair) {
var (
targetKey = &dto.SampleKey{
Fingerprint: fingerprint.ToDTO(),
@ -481,7 +473,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator *levigo.Iterator, frontier
rewound := false
firstTime := indexable.DecodeTime(foundKey.Timestamp)
if ts.Before(firstTime) && !frontier.firstSupertime.After(ts) {
iterator.Prev()
iterator.Previous()
rewound = true
}

View file

@ -0,0 +1,41 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
// TODO: Evaluate whether to use coding.Encoder for the key and values instead
// raw bytes for consistency reasons.
// Iterator wraps Levigo and LevelDB's iterator behaviors in a manner that is
// conducive to IO-free testing.
//
// It borrows some of the operational assumptions from goskiplist, which
// functions very similarly, in that it uses no separate Valid method to
// determine health. All methods that have a return signature of (ok bool)
// assume in the real LevelDB case that if ok == false that the iterator
// must be disposed of at this given instance and recreated if future
// work is desired. This is a quirk of LevelDB itself!
type Iterator interface {
// GetError reports low-level errors, if available. This should not indicate
// that the iterator is necessarily unhealthy but maybe that the underlying
// table is corrupted itself. See the notes above for (ok bool) return
// signatures to determine iterator health.
GetError() error
Key() []byte
Next() (ok bool)
Previous() (ok bool)
Seek(key []byte) (ok bool)
SeekToFirst() (ok bool)
SeekToLast() (ok bool)
Value() []byte
}

View file

@ -19,7 +19,6 @@ import (
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"io"
)
var (
@ -38,13 +37,94 @@ type LevelDBPersistence struct {
writeOptions *levigo.WriteOptions
}
// LevelDB iterators have a number of resources that need to be closed.
// iteratorCloser encapsulates the various ones.
type iteratorCloser struct {
// levigoIterator wraps the LevelDB resources in a convenient manner for uniform
// resource access and closing through the raw.Iterator protocol.
type levigoIterator struct {
// iterator is the receiver of most proxied operation calls.
iterator *levigo.Iterator
// readOptions is only set if the iterator is a snapshot of an underlying
// database. This signals that it needs to be explicitly reaped upon the
// end of this iterator's life.
readOptions *levigo.ReadOptions
// snapshot is only set if the iterator is a snapshot of an underlying
// database. This signals that it needs to be explicitly reaped upon the
// end of this this iterator's life.
snapshot *levigo.Snapshot
// storage is only set if the iterator is a snapshot of an underlying
// database. This signals that it needs to be explicitly reaped upon the
// end of this this iterator's life. The snapshot must be freed in the
// context of an actual database.
storage *levigo.DB
// closed indicates whether the iterator has been closed before.
closed bool
}
func (i *levigoIterator) Close() (err error) {
if i.closed {
return
}
if i.iterator != nil {
i.iterator.Close()
}
if i.readOptions != nil {
i.readOptions.Close()
}
if i.snapshot != nil {
i.storage.ReleaseSnapshot(i.snapshot)
}
// Explicitly dereference the pointers to prevent cycles, however unlikely.
i.iterator = nil
i.readOptions = nil
i.snapshot = nil
i.storage = nil
i.closed = true
return
}
func (i levigoIterator) Seek(key []byte) (ok bool) {
i.iterator.Seek(key)
return i.iterator.Valid()
}
func (i levigoIterator) SeekToFirst() (ok bool) {
i.iterator.SeekToFirst()
return i.iterator.Valid()
}
func (i levigoIterator) SeekToLast() (ok bool) {
i.iterator.SeekToLast()
return i.iterator.Valid()
}
func (i levigoIterator) Next() (ok bool) {
i.iterator.Next()
return i.iterator.Valid()
}
func (i levigoIterator) Previous() (ok bool) {
i.iterator.Prev()
return i.iterator.Valid()
}
func (i levigoIterator) Key() (key []byte) {
return i.iterator.Key()
}
func (i levigoIterator) Value() (value []byte) {
return i.iterator.Value()
}
func (i levigoIterator) GetError() (err error) {
return i.iterator.GetError()
}
func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (p *LevelDBPersistence, err error) {
@ -68,8 +148,11 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter
return
}
readOptions := levigo.NewReadOptions()
writeOptions := levigo.NewWriteOptions()
var (
readOptions = levigo.NewReadOptions()
writeOptions = levigo.NewWriteOptions()
)
writeOptions.SetSync(*leveldbFlushOnMutate)
p = &LevelDBPersistence{
cache: cache,
@ -185,56 +268,53 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
return l.storage.Write(l.writeOptions, batch.batch)
}
func (i *iteratorCloser) Close() (err error) {
defer func() {
if i.storage != nil {
if i.snapshot != nil {
i.storage.ReleaseSnapshot(i.snapshot)
}
}
}()
// NewIterator creates a new levigoIterator, which follows the Iterator
// interface.
//
// Important notes:
//
// For each of the iterator methods that have a return signature of (ok bool),
// if ok == false, the iterator may not be used any further and must be closed.
// Further work with the database requires the creation of a new iterator. This
// is due to LevelDB and Levigo design. Please refer to Jeff and Sanjay's notes
// in the LevelDB documentation for this behavior's rationale.
//
// The returned iterator must explicitly be closed; otherwise non-managed memory
// will be leaked.
//
// The iterator is optionally snapshotable.
func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator {
var (
snapshot *levigo.Snapshot
readOptions *levigo.ReadOptions
iterator *levigo.Iterator
)
defer func() {
if i.iterator != nil {
i.iterator.Close()
}
}()
defer func() {
if i.readOptions != nil {
i.readOptions.Close()
}
}()
return
}
func (l *LevelDBPersistence) GetIterator() (i *levigo.Iterator, c io.Closer, err error) {
snapshot := l.storage.NewSnapshot()
readOptions := levigo.NewReadOptions()
if snapshotted {
snapshot = l.storage.NewSnapshot()
readOptions = levigo.NewReadOptions()
readOptions.SetSnapshot(snapshot)
i = l.storage.NewIterator(readOptions)
iterator = l.storage.NewIterator(readOptions)
} else {
iterator = l.storage.NewIterator(l.readOptions)
}
// TODO: Kill the return of an additional io.Closer and just use a decorated
// iterator interface.
c = &iteratorCloser{
iterator: i,
return levigoIterator{
iterator: iterator,
readOptions: readOptions,
snapshot: snapshot,
storage: l.storage,
}
return
}
func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) {
iterator, closer, err := l.GetIterator()
if err != nil {
return
}
defer closer.Close()
var (
iterator = l.NewIterator(true)
valid bool
)
defer iterator.Close()
for iterator.SeekToFirst(); iterator.Valid(); iterator.Next() {
for valid = iterator.SeekToFirst(); valid; valid = iterator.Next() {
err = iterator.GetError()
if err != nil {
return