diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index 1cf9bac3b..9c4183902 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" dto "github.com/prometheus/prometheus/model/generated" + "github.com/prometheus/prometheus/storage/raw/leveldb" "time" ) @@ -42,9 +43,9 @@ func (f *diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool { return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint)) } -func newDiskFrontier(i iterator) (d *diskFrontier, err error) { - i.SeekToLast() - if !i.Valid() || i.Key() == nil { +func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) { + + if !i.SeekToLast() || i.Key() == nil { return } lastKey, err := extractSampleKey(i) @@ -85,7 +86,7 @@ func (f seriesFrontier) String() string { // newSeriesFrontier furnishes a populated diskFrontier for a given // fingerprint. A nil diskFrontier will be returned if the series cannot // be found in the store. -func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seriesFrontier, err error) { +func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) { var ( lowerSeek = firstSupertime upperSeek = lastSupertime @@ -129,7 +130,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri // // if !retrievedFingerprint.Equal(f) { - i.Prev() + i.Previous() retrievedKey, err = extractSampleKey(i) if err != nil { diff --git a/storage/metric/iterator.go b/storage/metric/iterator.go deleted file mode 100644 index ce386088d..000000000 --- a/storage/metric/iterator.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2013 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric - -type Iterator interface { - Seek(key interface{}) (ok bool) - Next() (ok bool) - Previous() (ok bool) - Key() interface{} - Value() interface{} -} diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index c7bdbf914..db1ceb9b5 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -683,7 +683,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err return } -func extractSampleKey(i iterator) (k *dto.SampleKey, err error) { +func extractSampleKey(i leveldb.Iterator) (k *dto.SampleKey, err error) { if i == nil { panic("nil iterator") } @@ -698,7 +698,7 @@ func extractSampleKey(i iterator) (k *dto.SampleKey, err error) { return } -func extractSampleValues(i iterator) (v *dto.SampleValueSeries, err error) { +func extractSampleValues(i leveldb.Iterator) (v *dto.SampleValueSeries, err error) { if i == nil { panic("nil iterator") } @@ -937,18 +937,6 @@ func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 { return y1 + (offset * dDt) } -type iterator interface { - Close() - Key() []byte - Next() - Prev() - Seek([]byte) - SeekToFirst() - SeekToLast() - Valid() bool - Value() []byte -} - func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.Time, s StalenessPolicy) (sample *model.Sample, err error) { begin := time.Now() @@ -975,15 +963,10 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T return } - iterator, closer, err := l.metricSamples.GetIterator() - if err != nil { - return - } + iterator := l.metricSamples.NewIterator(true) + defer iterator.Close() - defer closer.Close() - - iterator.Seek(e) - if !iterator.Valid() { + if !iterator.Seek(e) { /* * Two cases for this: * 1.) Corruption in LevelDB. @@ -994,13 +977,10 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T * database is sufficient for our purposes. This is, in all reality, a * corner case but one that could bring down the system. */ - iterator, closer, err = l.metricSamples.GetIterator() - if err != nil { - return - } - defer closer.Close() - iterator.SeekToLast() - if !iterator.Valid() { + iterator = l.metricSamples.NewIterator(true) + defer iterator.Close() + + if !iterator.SeekToLast() { /* * For whatever reason, the LevelDB cannot be recovered. */ @@ -1048,8 +1028,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T firstTime := indexable.DecodeTime(firstKey.Timestamp) if t.Before(firstTime) || peekAhead { - iterator.Prev() - if !iterator.Valid() { + if !iterator.Previous() { /* * Two cases for this: * 1.) Corruption in LevelDB. @@ -1106,8 +1085,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T return } - iterator.Next() - if !iterator.Valid() { + if !iterator.Next() { /* * Two cases for this: * 1.) Corruption in LevelDB. @@ -1188,17 +1166,12 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model. return } - iterator, closer, err := l.metricSamples.GetIterator() - if err != nil { - return - } - defer closer.Close() - - iterator.Seek(e) + iterator := l.metricSamples.NewIterator(true) + defer iterator.Close() predicate := keyIsOlderThan(i.NewestInclusive) - for ; iterator.Valid(); iterator.Next() { + for valid := iterator.Seek(e); valid; valid = iterator.Next() { retrievedKey := &dto.SampleKey{} retrievedKey, err = extractSampleKey(iterator) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 33acf4435..98e0fa263 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -15,12 +15,12 @@ package metric import ( "fmt" - "github.com/jmhodges/levigo" "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/raw/leveldb" "sort" "sync" "time" @@ -139,13 +139,10 @@ func (t *tieredStorage) rebuildDiskFrontier() (err error) { recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure}) }() - i, closer, err := t.diskStorage.metricSamples.GetIterator() - if closer != nil { - defer closer.Close() - } - if err != nil { - panic(err) - } + + i := t.diskStorage.metricSamples.NewIterator(true) + defer i.Close() + t.diskFrontier, err = newDiskFrontier(i) if err != nil { panic(err) @@ -365,13 +362,8 @@ func (t *tieredStorage) renderView(viewJob viewJob) { } // Get a single iterator that will be used for all data extraction below. - iterator, closer, err := t.diskStorage.metricSamples.GetIterator() - if closer != nil { - defer closer.Close() - } - if err != nil { - panic(err) - } + iterator := t.diskStorage.metricSamples.NewIterator(true) + defer iterator.Close() for _, scanJob := range scans { seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator) @@ -442,7 +434,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) { return } -func (t *tieredStorage) loadChunkAroundTime(iterator *levigo.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk []model.SamplePair) { +func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk []model.SamplePair) { var ( targetKey = &dto.SampleKey{ Fingerprint: fingerprint.ToDTO(), @@ -481,7 +473,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator *levigo.Iterator, frontier rewound := false firstTime := indexable.DecodeTime(foundKey.Timestamp) if ts.Before(firstTime) && !frontier.firstSupertime.After(ts) { - iterator.Prev() + iterator.Previous() rewound = true } diff --git a/storage/raw/leveldb/iterator.go b/storage/raw/leveldb/iterator.go new file mode 100644 index 000000000..4f47198b4 --- /dev/null +++ b/storage/raw/leveldb/iterator.go @@ -0,0 +1,41 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +// TODO: Evaluate whether to use coding.Encoder for the key and values instead +// raw bytes for consistency reasons. + +// Iterator wraps Levigo and LevelDB's iterator behaviors in a manner that is +// conducive to IO-free testing. +// +// It borrows some of the operational assumptions from goskiplist, which +// functions very similarly, in that it uses no separate Valid method to +// determine health. All methods that have a return signature of (ok bool) +// assume in the real LevelDB case that if ok == false that the iterator +// must be disposed of at this given instance and recreated if future +// work is desired. This is a quirk of LevelDB itself! +type Iterator interface { + // GetError reports low-level errors, if available. This should not indicate + // that the iterator is necessarily unhealthy but maybe that the underlying + // table is corrupted itself. See the notes above for (ok bool) return + // signatures to determine iterator health. + GetError() error + Key() []byte + Next() (ok bool) + Previous() (ok bool) + Seek(key []byte) (ok bool) + SeekToFirst() (ok bool) + SeekToLast() (ok bool) + Value() []byte +} diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index ad280f556..712d0a99f 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -19,7 +19,6 @@ import ( "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" - "io" ) var ( @@ -38,13 +37,94 @@ type LevelDBPersistence struct { writeOptions *levigo.WriteOptions } -// LevelDB iterators have a number of resources that need to be closed. -// iteratorCloser encapsulates the various ones. -type iteratorCloser struct { - iterator *levigo.Iterator +// levigoIterator wraps the LevelDB resources in a convenient manner for uniform +// resource access and closing through the raw.Iterator protocol. +type levigoIterator struct { + // iterator is the receiver of most proxied operation calls. + iterator *levigo.Iterator + // readOptions is only set if the iterator is a snapshot of an underlying + // database. This signals that it needs to be explicitly reaped upon the + // end of this iterator's life. readOptions *levigo.ReadOptions - snapshot *levigo.Snapshot - storage *levigo.DB + // snapshot is only set if the iterator is a snapshot of an underlying + // database. This signals that it needs to be explicitly reaped upon the + // end of this this iterator's life. + snapshot *levigo.Snapshot + // storage is only set if the iterator is a snapshot of an underlying + // database. This signals that it needs to be explicitly reaped upon the + // end of this this iterator's life. The snapshot must be freed in the + // context of an actual database. + storage *levigo.DB + // closed indicates whether the iterator has been closed before. + closed bool +} + +func (i *levigoIterator) Close() (err error) { + if i.closed { + return + } + + if i.iterator != nil { + i.iterator.Close() + } + if i.readOptions != nil { + i.readOptions.Close() + } + if i.snapshot != nil { + i.storage.ReleaseSnapshot(i.snapshot) + } + + // Explicitly dereference the pointers to prevent cycles, however unlikely. + i.iterator = nil + i.readOptions = nil + i.snapshot = nil + i.storage = nil + + i.closed = true + + return +} + +func (i levigoIterator) Seek(key []byte) (ok bool) { + i.iterator.Seek(key) + + return i.iterator.Valid() +} + +func (i levigoIterator) SeekToFirst() (ok bool) { + i.iterator.SeekToFirst() + + return i.iterator.Valid() +} + +func (i levigoIterator) SeekToLast() (ok bool) { + i.iterator.SeekToLast() + + return i.iterator.Valid() +} + +func (i levigoIterator) Next() (ok bool) { + i.iterator.Next() + + return i.iterator.Valid() +} + +func (i levigoIterator) Previous() (ok bool) { + i.iterator.Prev() + + return i.iterator.Valid() +} + +func (i levigoIterator) Key() (key []byte) { + return i.iterator.Key() +} + +func (i levigoIterator) Value() (value []byte) { + return i.iterator.Value() +} + +func (i levigoIterator) GetError() (err error) { + return i.iterator.GetError() } func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (p *LevelDBPersistence, err error) { @@ -68,8 +148,11 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter return } - readOptions := levigo.NewReadOptions() - writeOptions := levigo.NewWriteOptions() + var ( + readOptions = levigo.NewReadOptions() + writeOptions = levigo.NewWriteOptions() + ) + writeOptions.SetSync(*leveldbFlushOnMutate) p = &LevelDBPersistence{ cache: cache, @@ -185,56 +268,53 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { return l.storage.Write(l.writeOptions, batch.batch) } -func (i *iteratorCloser) Close() (err error) { - defer func() { - if i.storage != nil { - if i.snapshot != nil { - i.storage.ReleaseSnapshot(i.snapshot) - } - } - }() +// NewIterator creates a new levigoIterator, which follows the Iterator +// interface. +// +// Important notes: +// +// For each of the iterator methods that have a return signature of (ok bool), +// if ok == false, the iterator may not be used any further and must be closed. +// Further work with the database requires the creation of a new iterator. This +// is due to LevelDB and Levigo design. Please refer to Jeff and Sanjay's notes +// in the LevelDB documentation for this behavior's rationale. +// +// The returned iterator must explicitly be closed; otherwise non-managed memory +// will be leaked. +// +// The iterator is optionally snapshotable. +func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator { + var ( + snapshot *levigo.Snapshot + readOptions *levigo.ReadOptions + iterator *levigo.Iterator + ) - defer func() { - if i.iterator != nil { - i.iterator.Close() - } - }() + if snapshotted { + snapshot = l.storage.NewSnapshot() + readOptions = levigo.NewReadOptions() + readOptions.SetSnapshot(snapshot) + iterator = l.storage.NewIterator(readOptions) + } else { + iterator = l.storage.NewIterator(l.readOptions) + } - defer func() { - if i.readOptions != nil { - i.readOptions.Close() - } - }() - - return -} - -func (l *LevelDBPersistence) GetIterator() (i *levigo.Iterator, c io.Closer, err error) { - snapshot := l.storage.NewSnapshot() - readOptions := levigo.NewReadOptions() - readOptions.SetSnapshot(snapshot) - i = l.storage.NewIterator(readOptions) - - // TODO: Kill the return of an additional io.Closer and just use a decorated - // iterator interface. - c = &iteratorCloser{ - iterator: i, + return levigoIterator{ + iterator: iterator, readOptions: readOptions, snapshot: snapshot, storage: l.storage, } - - return } func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) { - iterator, closer, err := l.GetIterator() - if err != nil { - return - } - defer closer.Close() + var ( + iterator = l.NewIterator(true) + valid bool + ) + defer iterator.Close() - for iterator.SeekToFirst(); iterator.Valid(); iterator.Next() { + for valid = iterator.SeekToFirst(); valid; valid = iterator.Next() { err = iterator.GetError() if err != nil { return