From b2e4c88b800facb0ce800749fcf3635d4fa3aed5 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Mon, 25 Mar 2013 10:24:59 +0100 Subject: [PATCH] Wrap LevelDB iterator operations behind interface. The LevelDB storage types return an interface type now that wraps around the underlying iterator. This both enhances testability but improves upon, in my opinion, the interface design for the LevelDB iterator. Secondarily, the resource reaping behaviors for the LevelDB iterators have been improved by dropping the externalized io.Closer object. Finally, the iterator provisioning methods provide the option for indicating whether one wants a snapshotted iterator or not. --- storage/metric/frontier.go | 11 +- storage/metric/iterator.go | 22 ---- storage/metric/leveldb.go | 55 +++------- storage/metric/tiered.go | 26 ++--- storage/raw/leveldb/iterator.go | 41 ++++++++ storage/raw/leveldb/leveldb.go | 178 +++++++++++++++++++++++--------- 6 files changed, 199 insertions(+), 134 deletions(-) delete mode 100644 storage/metric/iterator.go create mode 100644 storage/raw/leveldb/iterator.go diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index 1cf9bac3b4..9c4183902d 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" dto "github.com/prometheus/prometheus/model/generated" + "github.com/prometheus/prometheus/storage/raw/leveldb" "time" ) @@ -42,9 +43,9 @@ func (f *diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool { return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint)) } -func newDiskFrontier(i iterator) (d *diskFrontier, err error) { - i.SeekToLast() - if !i.Valid() || i.Key() == nil { +func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) { + + if !i.SeekToLast() || i.Key() == nil { return } lastKey, err := extractSampleKey(i) @@ -85,7 +86,7 @@ func (f seriesFrontier) String() string { // newSeriesFrontier furnishes a populated diskFrontier for a given // fingerprint. A nil diskFrontier will be returned if the series cannot // be found in the store. -func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seriesFrontier, err error) { +func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) { var ( lowerSeek = firstSupertime upperSeek = lastSupertime @@ -129,7 +130,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri // // if !retrievedFingerprint.Equal(f) { - i.Prev() + i.Previous() retrievedKey, err = extractSampleKey(i) if err != nil { diff --git a/storage/metric/iterator.go b/storage/metric/iterator.go deleted file mode 100644 index ce386088de..0000000000 --- a/storage/metric/iterator.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2013 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric - -type Iterator interface { - Seek(key interface{}) (ok bool) - Next() (ok bool) - Previous() (ok bool) - Key() interface{} - Value() interface{} -} diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index c7bdbf9149..db1ceb9b50 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -683,7 +683,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err return } -func extractSampleKey(i iterator) (k *dto.SampleKey, err error) { +func extractSampleKey(i leveldb.Iterator) (k *dto.SampleKey, err error) { if i == nil { panic("nil iterator") } @@ -698,7 +698,7 @@ func extractSampleKey(i iterator) (k *dto.SampleKey, err error) { return } -func extractSampleValues(i iterator) (v *dto.SampleValueSeries, err error) { +func extractSampleValues(i leveldb.Iterator) (v *dto.SampleValueSeries, err error) { if i == nil { panic("nil iterator") } @@ -937,18 +937,6 @@ func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 { return y1 + (offset * dDt) } -type iterator interface { - Close() - Key() []byte - Next() - Prev() - Seek([]byte) - SeekToFirst() - SeekToLast() - Valid() bool - Value() []byte -} - func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.Time, s StalenessPolicy) (sample *model.Sample, err error) { begin := time.Now() @@ -975,15 +963,10 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T return } - iterator, closer, err := l.metricSamples.GetIterator() - if err != nil { - return - } + iterator := l.metricSamples.NewIterator(true) + defer iterator.Close() - defer closer.Close() - - iterator.Seek(e) - if !iterator.Valid() { + if !iterator.Seek(e) { /* * Two cases for this: * 1.) Corruption in LevelDB. @@ -994,13 +977,10 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T * database is sufficient for our purposes. This is, in all reality, a * corner case but one that could bring down the system. */ - iterator, closer, err = l.metricSamples.GetIterator() - if err != nil { - return - } - defer closer.Close() - iterator.SeekToLast() - if !iterator.Valid() { + iterator = l.metricSamples.NewIterator(true) + defer iterator.Close() + + if !iterator.SeekToLast() { /* * For whatever reason, the LevelDB cannot be recovered. */ @@ -1048,8 +1028,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T firstTime := indexable.DecodeTime(firstKey.Timestamp) if t.Before(firstTime) || peekAhead { - iterator.Prev() - if !iterator.Valid() { + if !iterator.Previous() { /* * Two cases for this: * 1.) Corruption in LevelDB. @@ -1106,8 +1085,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T return } - iterator.Next() - if !iterator.Valid() { + if !iterator.Next() { /* * Two cases for this: * 1.) Corruption in LevelDB. @@ -1188,17 +1166,12 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model. return } - iterator, closer, err := l.metricSamples.GetIterator() - if err != nil { - return - } - defer closer.Close() - - iterator.Seek(e) + iterator := l.metricSamples.NewIterator(true) + defer iterator.Close() predicate := keyIsOlderThan(i.NewestInclusive) - for ; iterator.Valid(); iterator.Next() { + for valid := iterator.Seek(e); valid; valid = iterator.Next() { retrievedKey := &dto.SampleKey{} retrievedKey, err = extractSampleKey(iterator) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 33acf4435c..98e0fa2631 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -15,12 +15,12 @@ package metric import ( "fmt" - "github.com/jmhodges/levigo" "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/raw/leveldb" "sort" "sync" "time" @@ -139,13 +139,10 @@ func (t *tieredStorage) rebuildDiskFrontier() (err error) { recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure}) }() - i, closer, err := t.diskStorage.metricSamples.GetIterator() - if closer != nil { - defer closer.Close() - } - if err != nil { - panic(err) - } + + i := t.diskStorage.metricSamples.NewIterator(true) + defer i.Close() + t.diskFrontier, err = newDiskFrontier(i) if err != nil { panic(err) @@ -365,13 +362,8 @@ func (t *tieredStorage) renderView(viewJob viewJob) { } // Get a single iterator that will be used for all data extraction below. - iterator, closer, err := t.diskStorage.metricSamples.GetIterator() - if closer != nil { - defer closer.Close() - } - if err != nil { - panic(err) - } + iterator := t.diskStorage.metricSamples.NewIterator(true) + defer iterator.Close() for _, scanJob := range scans { seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator) @@ -442,7 +434,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) { return } -func (t *tieredStorage) loadChunkAroundTime(iterator *levigo.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk []model.SamplePair) { +func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk []model.SamplePair) { var ( targetKey = &dto.SampleKey{ Fingerprint: fingerprint.ToDTO(), @@ -481,7 +473,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator *levigo.Iterator, frontier rewound := false firstTime := indexable.DecodeTime(foundKey.Timestamp) if ts.Before(firstTime) && !frontier.firstSupertime.After(ts) { - iterator.Prev() + iterator.Previous() rewound = true } diff --git a/storage/raw/leveldb/iterator.go b/storage/raw/leveldb/iterator.go new file mode 100644 index 0000000000..4f47198b47 --- /dev/null +++ b/storage/raw/leveldb/iterator.go @@ -0,0 +1,41 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +// TODO: Evaluate whether to use coding.Encoder for the key and values instead +// raw bytes for consistency reasons. + +// Iterator wraps Levigo and LevelDB's iterator behaviors in a manner that is +// conducive to IO-free testing. +// +// It borrows some of the operational assumptions from goskiplist, which +// functions very similarly, in that it uses no separate Valid method to +// determine health. All methods that have a return signature of (ok bool) +// assume in the real LevelDB case that if ok == false that the iterator +// must be disposed of at this given instance and recreated if future +// work is desired. This is a quirk of LevelDB itself! +type Iterator interface { + // GetError reports low-level errors, if available. This should not indicate + // that the iterator is necessarily unhealthy but maybe that the underlying + // table is corrupted itself. See the notes above for (ok bool) return + // signatures to determine iterator health. + GetError() error + Key() []byte + Next() (ok bool) + Previous() (ok bool) + Seek(key []byte) (ok bool) + SeekToFirst() (ok bool) + SeekToLast() (ok bool) + Value() []byte +} diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index ad280f556c..712d0a99fa 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -19,7 +19,6 @@ import ( "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" - "io" ) var ( @@ -38,13 +37,94 @@ type LevelDBPersistence struct { writeOptions *levigo.WriteOptions } -// LevelDB iterators have a number of resources that need to be closed. -// iteratorCloser encapsulates the various ones. -type iteratorCloser struct { - iterator *levigo.Iterator +// levigoIterator wraps the LevelDB resources in a convenient manner for uniform +// resource access and closing through the raw.Iterator protocol. +type levigoIterator struct { + // iterator is the receiver of most proxied operation calls. + iterator *levigo.Iterator + // readOptions is only set if the iterator is a snapshot of an underlying + // database. This signals that it needs to be explicitly reaped upon the + // end of this iterator's life. readOptions *levigo.ReadOptions - snapshot *levigo.Snapshot - storage *levigo.DB + // snapshot is only set if the iterator is a snapshot of an underlying + // database. This signals that it needs to be explicitly reaped upon the + // end of this this iterator's life. + snapshot *levigo.Snapshot + // storage is only set if the iterator is a snapshot of an underlying + // database. This signals that it needs to be explicitly reaped upon the + // end of this this iterator's life. The snapshot must be freed in the + // context of an actual database. + storage *levigo.DB + // closed indicates whether the iterator has been closed before. + closed bool +} + +func (i *levigoIterator) Close() (err error) { + if i.closed { + return + } + + if i.iterator != nil { + i.iterator.Close() + } + if i.readOptions != nil { + i.readOptions.Close() + } + if i.snapshot != nil { + i.storage.ReleaseSnapshot(i.snapshot) + } + + // Explicitly dereference the pointers to prevent cycles, however unlikely. + i.iterator = nil + i.readOptions = nil + i.snapshot = nil + i.storage = nil + + i.closed = true + + return +} + +func (i levigoIterator) Seek(key []byte) (ok bool) { + i.iterator.Seek(key) + + return i.iterator.Valid() +} + +func (i levigoIterator) SeekToFirst() (ok bool) { + i.iterator.SeekToFirst() + + return i.iterator.Valid() +} + +func (i levigoIterator) SeekToLast() (ok bool) { + i.iterator.SeekToLast() + + return i.iterator.Valid() +} + +func (i levigoIterator) Next() (ok bool) { + i.iterator.Next() + + return i.iterator.Valid() +} + +func (i levigoIterator) Previous() (ok bool) { + i.iterator.Prev() + + return i.iterator.Valid() +} + +func (i levigoIterator) Key() (key []byte) { + return i.iterator.Key() +} + +func (i levigoIterator) Value() (value []byte) { + return i.iterator.Value() +} + +func (i levigoIterator) GetError() (err error) { + return i.iterator.GetError() } func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (p *LevelDBPersistence, err error) { @@ -68,8 +148,11 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter return } - readOptions := levigo.NewReadOptions() - writeOptions := levigo.NewWriteOptions() + var ( + readOptions = levigo.NewReadOptions() + writeOptions = levigo.NewWriteOptions() + ) + writeOptions.SetSync(*leveldbFlushOnMutate) p = &LevelDBPersistence{ cache: cache, @@ -185,56 +268,53 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { return l.storage.Write(l.writeOptions, batch.batch) } -func (i *iteratorCloser) Close() (err error) { - defer func() { - if i.storage != nil { - if i.snapshot != nil { - i.storage.ReleaseSnapshot(i.snapshot) - } - } - }() +// NewIterator creates a new levigoIterator, which follows the Iterator +// interface. +// +// Important notes: +// +// For each of the iterator methods that have a return signature of (ok bool), +// if ok == false, the iterator may not be used any further and must be closed. +// Further work with the database requires the creation of a new iterator. This +// is due to LevelDB and Levigo design. Please refer to Jeff and Sanjay's notes +// in the LevelDB documentation for this behavior's rationale. +// +// The returned iterator must explicitly be closed; otherwise non-managed memory +// will be leaked. +// +// The iterator is optionally snapshotable. +func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator { + var ( + snapshot *levigo.Snapshot + readOptions *levigo.ReadOptions + iterator *levigo.Iterator + ) - defer func() { - if i.iterator != nil { - i.iterator.Close() - } - }() + if snapshotted { + snapshot = l.storage.NewSnapshot() + readOptions = levigo.NewReadOptions() + readOptions.SetSnapshot(snapshot) + iterator = l.storage.NewIterator(readOptions) + } else { + iterator = l.storage.NewIterator(l.readOptions) + } - defer func() { - if i.readOptions != nil { - i.readOptions.Close() - } - }() - - return -} - -func (l *LevelDBPersistence) GetIterator() (i *levigo.Iterator, c io.Closer, err error) { - snapshot := l.storage.NewSnapshot() - readOptions := levigo.NewReadOptions() - readOptions.SetSnapshot(snapshot) - i = l.storage.NewIterator(readOptions) - - // TODO: Kill the return of an additional io.Closer and just use a decorated - // iterator interface. - c = &iteratorCloser{ - iterator: i, + return levigoIterator{ + iterator: iterator, readOptions: readOptions, snapshot: snapshot, storage: l.storage, } - - return } func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) { - iterator, closer, err := l.GetIterator() - if err != nil { - return - } - defer closer.Close() + var ( + iterator = l.NewIterator(true) + valid bool + ) + defer iterator.Close() - for iterator.SeekToFirst(); iterator.Valid(); iterator.Next() { + for valid = iterator.SeekToFirst(); valid; valid = iterator.Next() { err = iterator.GetError() if err != nil { return